使用 etcd 作为状态后端实现 Tekton 与 OpenFaaS 间的弹性异步工作流


一个标准的 Tekton PipelineRun 对于典型的 CI/CD 场景——构建、测试、部署——是完全足够的。它的工作空间(Workspaces)和任务结果(Results)机制可以在线性的、短时的工作流中传递状态。但当需求从 CI 转向业务流程编排(Business Process Orchestration)时,这套机制就显得捉襟见肘。我们要处理的不再是几分钟内完成的构建任务,而是可能持续数小时甚至数天的、包含复杂分支和重试逻辑的业务工作流,例如视频处理、基因测序或金融清算。

在这样的场景下,Tekton Pod 的短暂性成了一个巨大的风险。任何节点故障或 Pod 的驱逐都可能导致工作流状态丢失。依赖 Tekton 内置的机制来维护一个长期、关键的业务状态,无异于将大厦建在沙滩上。我们需要一个独立于 Tekton 执行生命周期的、高可用的持久化状态存储。

初步构想是将工作流的核心状态外部化。Tekton 依然负责定义工作流的“骨架”——任务的依赖关系和执行顺序。OpenFaaS 则作为无服务器函数执行器,负责运行具体的、无状态的业务逻辑单元。而连接这两者的“脊髓”,则是 etcd。我们选择 etcd 而非传统数据库,看中的是它的两个核心能力:基于 CAS (Compare-And-Swap) 的原子事务操作,以及强大的 Watch 机制。这使得 etcd 不仅仅是一个数据存储,更是一个分布式的协调和信令中枢。

整个架构的目标是:Tekton TaskRun 变成一个纯粹的调度器,它从 etcd 读取当前状态,调用相应的 OpenFaaS 函数,然后将函数的执行结果原子性地更新回 etcd。工作流的每一步都成为一个幂等的、可重试的原子操作。即使 Tekton TaskRun 失败并重试,它也能从 etcd 中获取到最新的、一致的状态,从而决定是跳过已完成的步骤,还是重新执行失败的逻辑。

定义工作流状态模型

第一步是设计存储在 etcd 中的数据结构。这个结构必须能完整地描述一个工作流实例的生命周期,并为后续的弹性设计(如重试、熔断)预留字段。

我们将工作流的状态定义在一个 Go struct 中,并使用 JSON 进行序列化存储。key 的格式为 /workflows/async/{workflowID}

// pkg/state/state.go

package state

import (
	"encoding/json"
	"time"
)

// CircuitState 定义了熔断器的状态
type CircuitState string

const (
	CircuitClosed   CircuitState = "CLOSED"
	CircuitOpen     CircuitState = "OPEN"
	CircuitHalfOpen CircuitState = "HALF_OPEN"
)

// WorkflowStatus 定义了工作流的整体状态
type WorkflowStatus string

const (
	StatusPending      WorkflowStatus = "PENDING"
	StatusProcessing   WorkflowStatus = "PROCESSING"
	StatusStage1Done   WorkflowStatus = "STAGE_1_DONE"
	StatusStage2Done   WorkflowStatus = "STAGE_2_DONE"
	StatusCompleted    WorkflowStatus = "COMPLETED"
	StatusFailed       WorkflowStatus = "FAILED"
	StatusTerminated   WorkflowStatus = "TERMINATED"
)

// WorkflowState 是存储在 etcd 中的核心数据结构
type WorkflowState struct {
	ID          string         `json:"id"`
	Status      WorkflowStatus `json:"status"`
	Payload     string         `json:"payload"` // 简单的用 string 存储输入数据
	Result      string         `json:"result"`
	Retries     int            `json:"retries"`
	CreatedAt   time.Time      `json:"created_at"`
	UpdatedAt   time.Time      `json:"updated_at"`

	// 用于实现熔断机制的字段
	CircuitBreaker CircuitBreakerState `json:"circuit_breaker"`
}

// CircuitBreakerState 存储了特定阶段熔断器的状态
type CircuitBreakerState struct {
	State               CircuitState `json:"state"`
	ConsecutiveFailures int          `json:"consecutive_failures"`
	LastFailureTime     *time.Time   `json:"last_failure_time,omitempty"`
	OpenUntil           *time.Time   `json:"open_until,omitempty"`
}

// NewWorkflowState 创建一个新的工作流状态实例
func NewWorkflowState(id, payload string) *WorkflowState {
	return &WorkflowState{
		ID:        id,
		Status:    StatusPending,
		Payload:   payload,
		CreatedAt: time.Now().UTC(),
		UpdatedAt: time.Now().UTC(),
		CircuitBreaker: CircuitBreakerState{
			State: CircuitClosed,
		},
	}
}

// ToJSON 将状态序列化为 JSON
func (ws *WorkflowState) ToJSON() ([]byte, error) {
	return json.Marshal(ws)
}

// FromJSON 从 JSON 反序列化状态
func FromJSON(data []byte) (*WorkflowState, error) {
	var ws WorkflowState
	err := json.Unmarshal(data, &ws)
	if err != nil {
		return nil, err
	}
	return &ws, nil
}

这个结构不仅仅包含了工作流的ID、状态和数据,还特意加入了 Retries 和一个完整的 CircuitBreakerState 结构。在真实项目中,这些弹性控制相关的状态与业务状态放在一起管理,是保证一致性的关键。

构建与 etcd 交互的原子性客户端

接下来是整个方案的核心:一个与 etcd 交互的客户端。这个客户端必须保证所有状态更新都是原子性的,以避免在并发执行或任务重试时出现数据不一致。etcdTxn (事务) API 是实现这一点的关键。我们利用 ModRevision 来实现乐观锁,确保我们只在数据未被其他进程修改过的情况下才进行更新。

// pkg/store/etcd_client.go

package store

import (
	"context"
	"fmt"
	"log"
	"time"

	"github.com/your-repo/workflow-engine/pkg/state"
	clientv3 "go.etcd.io/etcd/client/v3"
	"go.etcd.io/etcd/client/v3/concurrency"
)

const (
	workflowPrefix = "/workflows/async/"
	requestTimeout = 5 * time.Second
)

// StateStore 定义了与状态存储交互的接口
type StateStore interface {
	CreateWorkflow(ctx context.Context, s *state.WorkflowState) error
	GetWorkflow(ctx context.Context, workflowID string) (*state.WorkflowState, int64, error)
	UpdateWorkflow(ctx context.Context, modRevision int64, s *state.WorkflowState) error
	// ... 其他可能的方法
}

// EtcdStore 是 StateStore 的 etcd 实现
type EtcdStore struct {
	client *clientv3.Client
}

// NewEtcdStore 创建一个新的 EtcdStore 实例
func NewEtcdStore(endpoints []string) (*EtcdStore, error) {
	cli, err := clientv3.New(clientv3.Config{
		Endpoints:   endpoints,
		DialTimeout: 5 * time.Second,
	})
	if err != nil {
		return nil, fmt.Errorf("failed to connect to etcd: %w", err)
	}
	return &EtcdStore{client: cli}, nil
}

// keyForWorkflow 生成工作流的 etcd key
func keyForWorkflow(workflowID string) string {
	return fmt.Sprintf("%s%s", workflowPrefix, workflowID)
}

// CreateWorkflow 创建一个新的工作流状态
// 使用事务确保 key 不存在时才创建
func (s *EtcdStore) CreateWorkflow(ctx context.Context, st *state.WorkflowState) error {
	key := keyForWorkflow(st.ID)
	value, err := st.ToJSON()
	if err != nil {
		return fmt.Errorf("failed to serialize state: %w", err)
	}

	// 使用事务确保原子性创建
	// If(key的ModRevision为0) Then(Put key)
	resp, err := s.client.Txn(ctx).
		If(clientv3.Compare(clientv3.ModRevision(key), "=", 0)).
		Then(clientv3.OpPut(key, string(value))).
		Commit()

	if err != nil {
		return fmt.Errorf("etcd transaction failed: %w", err)
	}
	if !resp.Succeeded {
		return fmt.Errorf("workflow with ID '%s' already exists", st.ID)
	}

	log.Printf("Successfully created workflow '%s'", st.ID)
	return nil
}

// GetWorkflow 获取工作流状态及其 ModRevision
func (s *EtcdStore) GetWorkflow(ctx context.Context, workflowID string) (*state.WorkflowState, int64, error) {
	key := keyForWorkflow(workflowID)
	
	resp, err := s.client.Get(ctx, key)
	if err != nil {
		return nil, 0, fmt.Errorf("failed to get key from etcd: %w", err)
	}
	if len(resp.Kvs) == 0 {
		return nil, 0, fmt.Errorf("workflow '%s' not found", workflowID)
	}

	kv := resp.Kvs[0]
	st, err := state.FromJSON(kv.Value)
	if err != nil {
		return nil, 0, fmt.Errorf("failed to deserialize state for workflow '%s': %w", workflowID, err)
	}

	// 返回状态对象和 ModRevision
	return st, kv.ModRevision, nil
}

// UpdateWorkflow 原子性地更新工作流状态
// 必须传入 GetWorkflow 时获取的 modRevision
func (s *EtcdStore) UpdateWorkflow(ctx context.Context, modRevision int64, st *state.WorkflowState) error {
	key := keyForWorkflow(st.ID)
	st.UpdatedAt = time.Now().UTC()
	value, err := st.ToJSON()
	if err != nil {
		return fmt.Errorf("failed to serialize state for update: %w", err)
	}
	
	// 使用事务和 ModRevision 实现 Compare-And-Swap
	// If(key的ModRevision等于传入的modRevision) Then(Put 新的 value)
	resp, err := s.client.Txn(ctx).
		If(clientv3.Compare(clientv3.ModRevision(key), "=", modRevision)).
		Then(clientv3.OpPut(key, string(value))).
		Commit()

	if err != nil {
		return fmt.Errorf("etcd transaction failed for update: %w", err)
	}
	if !resp.Succeeded {
		// 这里的错误是生产环境中最需要关注的:状态竞争
		return fmt.Errorf("state conflict: workflow '%s' was modified by another process", st.ID)
	}
	
	log.Printf("Successfully updated workflow '%s' to status '%s'", st.ID, st.Status)
	return nil
}

func (s *EtcdStore) Close() {
	s.client.Close()
}

UpdateWorkflow 函数是这个客户端的灵魂。它强制调用者传入 modRevision,并在 TxnIf 条件中进行比较。如果在此期间有其他进程修改了 etcd 中的数据,modRevision 将会改变,导致事务失败。这迫使调用者必须重新获取最新状态,再进行修改,从而从根本上杜绝了状态覆盖的风险。

用 Tekton 和 OpenFaaS 串联流程

现在我们有了状态模型和原子性客户端,可以开始定义工作流的实际步骤了。

1. 初始化工作流的 Tekton Task

工作流的起点是一个 Tekton Task,它负责生成一个唯一的 workflowID,并调用我们的 Go 程序将初始状态写入 etcd

# tekton/tasks/initialize-workflow.yaml
apiVersion: tekton.dev/v1beta1
kind: Task
metadata:
  name: initialize-workflow
spec:
  params:
    - name: payload
      description: The initial data for the workflow
      type: string
  results:
    - name: workflow-id
      description: The unique ID generated for this workflow run
  steps:
    - name: create-state
      image: your-registry/workflow-tool:latest # 包含我们Go客户端的镜像
      command: ["/app/workflow-cli"]
      args:
        - "create"
        - "--payload=$(params.payload)"
        - "--output-path=$(results.workflow-id.path)"
      env:
        - name: ETCD_ENDPOINTS
          value: "etcd-client.etcd.svc.cluster.local:2379"

这个 Task 中的 workflow-cli 是一个简单的命令行工具,它封装了前面编写的 EtcdStorecreate 命令会生成 UUID,调用 CreateWorkflow,并将 ID 写入 Tekton 的 result 路径,以便后续 Task 使用。

2. 执行业务逻辑的 OpenFaaS 函数

业务逻辑本身被封装在一个 OpenFaaS 函数中。这个函数是无状态的,它接收 workflowID,从 etcd 加载状态,执行业务,然后将更新后的状态写回。

// functions/stage1-processor/handler.go
package function

import (
	"context"
	"fmt"
	"log"
	"os"
	"strings"
	"time"

	"github.com/your-repo/workflow-engine/pkg/state"
	"github.com/your-repo/workflow-engine/pkg/store"
)

// Handle a serverless request
func Handle(req []byte) string {
	workflowID := string(req)
	if workflowID == "" {
		return "error: workflowID is required"
	}
	log.Printf("Processing stage 1 for workflow: %s", workflowID)

	etcdEndpoints := strings.Split(os.Getenv("ETCD_ENDPOINTS"), ",")
	stateStore, err := store.NewEtcdStore(etcdEndpoints)
	if err != nil {
		log.Printf("Error connecting to etcd: %v", err)
		return fmt.Sprintf("error: could not connect to state store: %v", err)
	}
	defer stateStore.Close()

	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
	defer cancel()

	// 1. 获取当前状态和 revision
	currentState, modRevision, err := stateStore.GetWorkflow(ctx, workflowID)
	if err != nil {
		log.Printf("Error getting workflow state: %v", err)
		return fmt.Sprintf("error: could not get workflow state: %v", err)
	}

	// 幂等性检查:如果状态已经不是我们期望的 PENDING,则直接返回成功
	if currentState.Status != state.StatusPending {
		log.Printf("Workflow %s already processed for this stage (current status: %s). Skipping.", workflowID, currentState.Status)
		return "success: already processed"
	}

	// 2. 模拟业务逻辑
	log.Printf("Performing business logic for payload: %s", currentState.Payload)
	time.Sleep(2 * time.Second) // 模拟耗时操作
	// 假设这里有可能会失败
	// if someCondition {
	//    // ... 处理失败 ...
	// }
	
	// 3. 更新状态
	currentState.Status = state.StatusStage1Done
	currentState.Result = "Stage 1 completed successfully."
	err = stateStore.UpdateWorkflow(ctx, modRevision, currentState)
	if err != nil {
		// 这里的错误通常是状态竞争,需要上层(Tekton)进行重试
		log.Printf("Error updating workflow state (conflict likely): %v", err)
		return fmt.Sprintf("error: could not update workflow state: %v", err)
	}

	log.Printf("Successfully completed stage 1 for workflow: %s", workflowID)
	return "success"
}

这个函数的关键在于幂等性检查。在分布式系统中,任务可能会被重复执行。通过检查 etcd 中的状态,我们可以安全地跳过已经完成的步骤。

3. 实现熔断器看门人(Gatekeeper) Tekton Task

这是弹性设计的核心体现。在调用真正消耗资源的业务函数(如视频转码)之前,我们先用一个轻量的 Task 检查 etcd 中的熔断器状态。

# tekton/tasks/gatekeeper.yaml
apiVersion: tekton.dev/v1beta1
kind: Task
metadata:
  name: gatekeeper
spec:
  params:
    - name: workflow-id
      type: string
    - name: stage-name # e.g., "stage1", "stage2"
      type: string
  steps:
    - name: check-circuit-breaker
      image: your-registry/workflow-tool:latest
      command: ["/app/workflow-cli"]
      args:
        - "check-breaker"
        - "--workflow-id=$(params.workflow-id)"
        - "--stage=$(params.stage-name)" # CLI内部会根据stage处理对应的熔断状态
      env:
        - name: ETCD_ENDPOINTS
          value: "etcd-client.etcd.svc.cluster.local:2379"

workflow-cli check-breaker 命令的逻辑如下:

  1. etcd 获取工作流状态。
  2. 检查 CircuitBreaker 字段。
  3. 如果状态是 CLOSEDHALF_OPEN,程序正常退出(exit code 0)。
  4. 如果状态是 OPEN,检查 OpenUntil 时间。
    • 如果当前时间还没到 OpenUntil,则打印错误并以非零状态码退出(exit code 1),这将导致 Tekton TaskRun 失败,工作流在此中断。
    • 如果已经过了 OpenUntil 时间,则将状态原子性地更新为 HALF_OPEN,然后正常退出,允许下一次尝试。
  5. 这个 CLI 还会处理失败后的状态更新逻辑(例如,另一个 report-failure 命令),当业务函数失败时,它会增加 ConsecutiveFailures 计数,并在达到阈值时将状态置为 OPEN

4. 完整的 Tekton Pipeline

最后,我们将所有部分用 Pipeline 串联起来。

# tekton/pipeline/async-workflow-pipeline.yaml
apiVersion: tekton.dev/v1beta1
kind: Pipeline
metadata:
  name: async-workflow-pipeline
spec:
  params:
    - name: payload
      type: string
  tasks:
    - name: init
      taskRef:
        name: initialize-workflow
      params:
        - name: payload
          value: $(params.payload)

    - name: stage-1-gatekeeper
      taskRef:
        name: gatekeeper
      runAfter: [init]
      params:
        - name: workflow-id
          value: $(tasks.init.results.workflow-id)
        - name: stage-name
          value: "stage1"

    - name: run-stage-1
      taskRef:
        name: call-openfaas-function # 一个通用的调用OpenFaaS的Task
      runAfter: [stage-1-gatekeeper]
      params:
        - name: function-name
          value: "stage1-processor"
        - name: function-payload
          value: $(tasks.init.results.workflow-id)
      # 在真实项目中,这里会有 on-error 步骤来报告失败

工作流的视觉化表示如下:

graph TD
    A[Start PipelineRun] --> B(init: initialize-workflow);
    B -- workflow-id --> C(stage-1-gatekeeper: check etcd);
    subgraph "State in etcd"
        direction LR
        S1(Status: PENDING) -- read --> C;
    end
    C -- Circuit CLOSED --> D(run-stage-1: call OpenFaaS);
    C -- Circuit OPEN --> E[Fail TaskRun];
    D -- workflow-id --> F[OpenFaaS: stage1-processor];
    F -- reads --> S2(Status: PENDING, Rev: X);
    F -- business logic --> G{Update State};
    G -- atomic update (CAS on Rev: X) --> S3(Status: STAGE_1_DONE, Rev: X+1);

局限性与未来迭代方向

这套架构解决了 Tekton 在长时工作流场景下状态管理的根本问题,并引入了基于 etcd 的弹性控制。但它并非银弹。

首先,etcd 成为了整个系统的中心瓶颈和单点故障源(尽管 etcd 集群本身是高可用的)。对同一个工作流状态的频繁更新可能会导致热点问题和事务冲突,尤其是在高并发场景下。设计 etcd 的 key 空间和减小状态对象的大小是需要注意的优化点。

其次,当前的设计仍然是“编排”模式(Orchestration),即由 Tekton Pipeline 显式地驱动流程。gatekeeper 任务本质上是一种轮询检查。一个更优雅的演进方向是走向“协同”模式(Choreography)。我们可以部署一个常驻的控制器(Controller/Operator),它使用 etcdWatch API 监听 /workflows/async/ 前缀下的状态变更。当一个工作流状态从 STAGE_1_DONE 变为 STAGE_2_PENDING 时,这个控制器可以直接创建下一个阶段的 TaskRun 或调用相应的 OpenFaaS 函数,从而构建一个完全事件驱动的、反应式的系统。这种方式响应更及时,也减少了 Tekton Task 之间不必要的轮询等待。

最后,此方案中的 workflow-cli 工具和 OpenFaaS 函数中的状态管理逻辑存在重复。在生产环境中,应将其封装成一个统一的、经过充分测试的 Go SDK,供所有业务组件使用,以确保状态操作的一致性和正确性。


  目录