一个标准的 Tekton PipelineRun 对于典型的 CI/CD 场景——构建、测试、部署——是完全足够的。它的工作空间(Workspaces)和任务结果(Results)机制可以在线性的、短时的工作流中传递状态。但当需求从 CI 转向业务流程编排(Business Process Orchestration)时,这套机制就显得捉襟见肘。我们要处理的不再是几分钟内完成的构建任务,而是可能持续数小时甚至数天的、包含复杂分支和重试逻辑的业务工作流,例如视频处理、基因测序或金融清算。
在这样的场景下,Tekton Pod 的短暂性成了一个巨大的风险。任何节点故障或 Pod 的驱逐都可能导致工作流状态丢失。依赖 Tekton 内置的机制来维护一个长期、关键的业务状态,无异于将大厦建在沙滩上。我们需要一个独立于 Tekton 执行生命周期的、高可用的持久化状态存储。
初步构想是将工作流的核心状态外部化。Tekton 依然负责定义工作流的“骨架”——任务的依赖关系和执行顺序。OpenFaaS 则作为无服务器函数执行器,负责运行具体的、无状态的业务逻辑单元。而连接这两者的“脊髓”,则是 etcd
。我们选择 etcd
而非传统数据库,看中的是它的两个核心能力:基于 CAS
(Compare-And-Swap) 的原子事务操作,以及强大的 Watch
机制。这使得 etcd
不仅仅是一个数据存储,更是一个分布式的协调和信令中枢。
整个架构的目标是:Tekton TaskRun
变成一个纯粹的调度器,它从 etcd
读取当前状态,调用相应的 OpenFaaS 函数,然后将函数的执行结果原子性地更新回 etcd
。工作流的每一步都成为一个幂等的、可重试的原子操作。即使 Tekton TaskRun
失败并重试,它也能从 etcd
中获取到最新的、一致的状态,从而决定是跳过已完成的步骤,还是重新执行失败的逻辑。
定义工作流状态模型
第一步是设计存储在 etcd
中的数据结构。这个结构必须能完整地描述一个工作流实例的生命周期,并为后续的弹性设计(如重试、熔断)预留字段。
我们将工作流的状态定义在一个 Go struct 中,并使用 JSON 进行序列化存储。key 的格式为 /workflows/async/{workflowID}
。
// pkg/state/state.go
package state
import (
"encoding/json"
"time"
)
// CircuitState 定义了熔断器的状态
type CircuitState string
const (
CircuitClosed CircuitState = "CLOSED"
CircuitOpen CircuitState = "OPEN"
CircuitHalfOpen CircuitState = "HALF_OPEN"
)
// WorkflowStatus 定义了工作流的整体状态
type WorkflowStatus string
const (
StatusPending WorkflowStatus = "PENDING"
StatusProcessing WorkflowStatus = "PROCESSING"
StatusStage1Done WorkflowStatus = "STAGE_1_DONE"
StatusStage2Done WorkflowStatus = "STAGE_2_DONE"
StatusCompleted WorkflowStatus = "COMPLETED"
StatusFailed WorkflowStatus = "FAILED"
StatusTerminated WorkflowStatus = "TERMINATED"
)
// WorkflowState 是存储在 etcd 中的核心数据结构
type WorkflowState struct {
ID string `json:"id"`
Status WorkflowStatus `json:"status"`
Payload string `json:"payload"` // 简单的用 string 存储输入数据
Result string `json:"result"`
Retries int `json:"retries"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
// 用于实现熔断机制的字段
CircuitBreaker CircuitBreakerState `json:"circuit_breaker"`
}
// CircuitBreakerState 存储了特定阶段熔断器的状态
type CircuitBreakerState struct {
State CircuitState `json:"state"`
ConsecutiveFailures int `json:"consecutive_failures"`
LastFailureTime *time.Time `json:"last_failure_time,omitempty"`
OpenUntil *time.Time `json:"open_until,omitempty"`
}
// NewWorkflowState 创建一个新的工作流状态实例
func NewWorkflowState(id, payload string) *WorkflowState {
return &WorkflowState{
ID: id,
Status: StatusPending,
Payload: payload,
CreatedAt: time.Now().UTC(),
UpdatedAt: time.Now().UTC(),
CircuitBreaker: CircuitBreakerState{
State: CircuitClosed,
},
}
}
// ToJSON 将状态序列化为 JSON
func (ws *WorkflowState) ToJSON() ([]byte, error) {
return json.Marshal(ws)
}
// FromJSON 从 JSON 反序列化状态
func FromJSON(data []byte) (*WorkflowState, error) {
var ws WorkflowState
err := json.Unmarshal(data, &ws)
if err != nil {
return nil, err
}
return &ws, nil
}
这个结构不仅仅包含了工作流的ID、状态和数据,还特意加入了 Retries
和一个完整的 CircuitBreakerState
结构。在真实项目中,这些弹性控制相关的状态与业务状态放在一起管理,是保证一致性的关键。
构建与 etcd 交互的原子性客户端
接下来是整个方案的核心:一个与 etcd
交互的客户端。这个客户端必须保证所有状态更新都是原子性的,以避免在并发执行或任务重试时出现数据不一致。etcd
的 Txn
(事务) API 是实现这一点的关键。我们利用 ModRevision
来实现乐观锁,确保我们只在数据未被其他进程修改过的情况下才进行更新。
// pkg/store/etcd_client.go
package store
import (
"context"
"fmt"
"log"
"time"
"github.com/your-repo/workflow-engine/pkg/state"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/client/v3/concurrency"
)
const (
workflowPrefix = "/workflows/async/"
requestTimeout = 5 * time.Second
)
// StateStore 定义了与状态存储交互的接口
type StateStore interface {
CreateWorkflow(ctx context.Context, s *state.WorkflowState) error
GetWorkflow(ctx context.Context, workflowID string) (*state.WorkflowState, int64, error)
UpdateWorkflow(ctx context.Context, modRevision int64, s *state.WorkflowState) error
// ... 其他可能的方法
}
// EtcdStore 是 StateStore 的 etcd 实现
type EtcdStore struct {
client *clientv3.Client
}
// NewEtcdStore 创建一个新的 EtcdStore 实例
func NewEtcdStore(endpoints []string) (*EtcdStore, error) {
cli, err := clientv3.New(clientv3.Config{
Endpoints: endpoints,
DialTimeout: 5 * time.Second,
})
if err != nil {
return nil, fmt.Errorf("failed to connect to etcd: %w", err)
}
return &EtcdStore{client: cli}, nil
}
// keyForWorkflow 生成工作流的 etcd key
func keyForWorkflow(workflowID string) string {
return fmt.Sprintf("%s%s", workflowPrefix, workflowID)
}
// CreateWorkflow 创建一个新的工作流状态
// 使用事务确保 key 不存在时才创建
func (s *EtcdStore) CreateWorkflow(ctx context.Context, st *state.WorkflowState) error {
key := keyForWorkflow(st.ID)
value, err := st.ToJSON()
if err != nil {
return fmt.Errorf("failed to serialize state: %w", err)
}
// 使用事务确保原子性创建
// If(key的ModRevision为0) Then(Put key)
resp, err := s.client.Txn(ctx).
If(clientv3.Compare(clientv3.ModRevision(key), "=", 0)).
Then(clientv3.OpPut(key, string(value))).
Commit()
if err != nil {
return fmt.Errorf("etcd transaction failed: %w", err)
}
if !resp.Succeeded {
return fmt.Errorf("workflow with ID '%s' already exists", st.ID)
}
log.Printf("Successfully created workflow '%s'", st.ID)
return nil
}
// GetWorkflow 获取工作流状态及其 ModRevision
func (s *EtcdStore) GetWorkflow(ctx context.Context, workflowID string) (*state.WorkflowState, int64, error) {
key := keyForWorkflow(workflowID)
resp, err := s.client.Get(ctx, key)
if err != nil {
return nil, 0, fmt.Errorf("failed to get key from etcd: %w", err)
}
if len(resp.Kvs) == 0 {
return nil, 0, fmt.Errorf("workflow '%s' not found", workflowID)
}
kv := resp.Kvs[0]
st, err := state.FromJSON(kv.Value)
if err != nil {
return nil, 0, fmt.Errorf("failed to deserialize state for workflow '%s': %w", workflowID, err)
}
// 返回状态对象和 ModRevision
return st, kv.ModRevision, nil
}
// UpdateWorkflow 原子性地更新工作流状态
// 必须传入 GetWorkflow 时获取的 modRevision
func (s *EtcdStore) UpdateWorkflow(ctx context.Context, modRevision int64, st *state.WorkflowState) error {
key := keyForWorkflow(st.ID)
st.UpdatedAt = time.Now().UTC()
value, err := st.ToJSON()
if err != nil {
return fmt.Errorf("failed to serialize state for update: %w", err)
}
// 使用事务和 ModRevision 实现 Compare-And-Swap
// If(key的ModRevision等于传入的modRevision) Then(Put 新的 value)
resp, err := s.client.Txn(ctx).
If(clientv3.Compare(clientv3.ModRevision(key), "=", modRevision)).
Then(clientv3.OpPut(key, string(value))).
Commit()
if err != nil {
return fmt.Errorf("etcd transaction failed for update: %w", err)
}
if !resp.Succeeded {
// 这里的错误是生产环境中最需要关注的:状态竞争
return fmt.Errorf("state conflict: workflow '%s' was modified by another process", st.ID)
}
log.Printf("Successfully updated workflow '%s' to status '%s'", st.ID, st.Status)
return nil
}
func (s *EtcdStore) Close() {
s.client.Close()
}
UpdateWorkflow
函数是这个客户端的灵魂。它强制调用者传入 modRevision
,并在 Txn
的 If
条件中进行比较。如果在此期间有其他进程修改了 etcd
中的数据,modRevision
将会改变,导致事务失败。这迫使调用者必须重新获取最新状态,再进行修改,从而从根本上杜绝了状态覆盖的风险。
用 Tekton 和 OpenFaaS 串联流程
现在我们有了状态模型和原子性客户端,可以开始定义工作流的实际步骤了。
1. 初始化工作流的 Tekton Task
工作流的起点是一个 Tekton Task
,它负责生成一个唯一的 workflowID
,并调用我们的 Go 程序将初始状态写入 etcd
。
# tekton/tasks/initialize-workflow.yaml
apiVersion: tekton.dev/v1beta1
kind: Task
metadata:
name: initialize-workflow
spec:
params:
- name: payload
description: The initial data for the workflow
type: string
results:
- name: workflow-id
description: The unique ID generated for this workflow run
steps:
- name: create-state
image: your-registry/workflow-tool:latest # 包含我们Go客户端的镜像
command: ["/app/workflow-cli"]
args:
- "create"
- "--payload=$(params.payload)"
- "--output-path=$(results.workflow-id.path)"
env:
- name: ETCD_ENDPOINTS
value: "etcd-client.etcd.svc.cluster.local:2379"
这个 Task
中的 workflow-cli
是一个简单的命令行工具,它封装了前面编写的 EtcdStore
。create
命令会生成 UUID,调用 CreateWorkflow
,并将 ID 写入 Tekton 的 result
路径,以便后续 Task
使用。
2. 执行业务逻辑的 OpenFaaS 函数
业务逻辑本身被封装在一个 OpenFaaS 函数中。这个函数是无状态的,它接收 workflowID
,从 etcd
加载状态,执行业务,然后将更新后的状态写回。
// functions/stage1-processor/handler.go
package function
import (
"context"
"fmt"
"log"
"os"
"strings"
"time"
"github.com/your-repo/workflow-engine/pkg/state"
"github.com/your-repo/workflow-engine/pkg/store"
)
// Handle a serverless request
func Handle(req []byte) string {
workflowID := string(req)
if workflowID == "" {
return "error: workflowID is required"
}
log.Printf("Processing stage 1 for workflow: %s", workflowID)
etcdEndpoints := strings.Split(os.Getenv("ETCD_ENDPOINTS"), ",")
stateStore, err := store.NewEtcdStore(etcdEndpoints)
if err != nil {
log.Printf("Error connecting to etcd: %v", err)
return fmt.Sprintf("error: could not connect to state store: %v", err)
}
defer stateStore.Close()
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
// 1. 获取当前状态和 revision
currentState, modRevision, err := stateStore.GetWorkflow(ctx, workflowID)
if err != nil {
log.Printf("Error getting workflow state: %v", err)
return fmt.Sprintf("error: could not get workflow state: %v", err)
}
// 幂等性检查:如果状态已经不是我们期望的 PENDING,则直接返回成功
if currentState.Status != state.StatusPending {
log.Printf("Workflow %s already processed for this stage (current status: %s). Skipping.", workflowID, currentState.Status)
return "success: already processed"
}
// 2. 模拟业务逻辑
log.Printf("Performing business logic for payload: %s", currentState.Payload)
time.Sleep(2 * time.Second) // 模拟耗时操作
// 假设这里有可能会失败
// if someCondition {
// // ... 处理失败 ...
// }
// 3. 更新状态
currentState.Status = state.StatusStage1Done
currentState.Result = "Stage 1 completed successfully."
err = stateStore.UpdateWorkflow(ctx, modRevision, currentState)
if err != nil {
// 这里的错误通常是状态竞争,需要上层(Tekton)进行重试
log.Printf("Error updating workflow state (conflict likely): %v", err)
return fmt.Sprintf("error: could not update workflow state: %v", err)
}
log.Printf("Successfully completed stage 1 for workflow: %s", workflowID)
return "success"
}
这个函数的关键在于幂等性检查。在分布式系统中,任务可能会被重复执行。通过检查 etcd
中的状态,我们可以安全地跳过已经完成的步骤。
3. 实现熔断器看门人(Gatekeeper) Tekton Task
这是弹性设计的核心体现。在调用真正消耗资源的业务函数(如视频转码)之前,我们先用一个轻量的 Task
检查 etcd
中的熔断器状态。
# tekton/tasks/gatekeeper.yaml
apiVersion: tekton.dev/v1beta1
kind: Task
metadata:
name: gatekeeper
spec:
params:
- name: workflow-id
type: string
- name: stage-name # e.g., "stage1", "stage2"
type: string
steps:
- name: check-circuit-breaker
image: your-registry/workflow-tool:latest
command: ["/app/workflow-cli"]
args:
- "check-breaker"
- "--workflow-id=$(params.workflow-id)"
- "--stage=$(params.stage-name)" # CLI内部会根据stage处理对应的熔断状态
env:
- name: ETCD_ENDPOINTS
value: "etcd-client.etcd.svc.cluster.local:2379"
workflow-cli check-breaker
命令的逻辑如下:
- 从
etcd
获取工作流状态。 - 检查
CircuitBreaker
字段。 - 如果状态是
CLOSED
或HALF_OPEN
,程序正常退出(exit code 0)。 - 如果状态是
OPEN
,检查OpenUntil
时间。- 如果当前时间还没到
OpenUntil
,则打印错误并以非零状态码退出(exit code 1),这将导致 TektonTaskRun
失败,工作流在此中断。 - 如果已经过了
OpenUntil
时间,则将状态原子性地更新为HALF_OPEN
,然后正常退出,允许下一次尝试。
- 如果当前时间还没到
- 这个 CLI 还会处理失败后的状态更新逻辑(例如,另一个
report-failure
命令),当业务函数失败时,它会增加ConsecutiveFailures
计数,并在达到阈值时将状态置为OPEN
。
4. 完整的 Tekton Pipeline
最后,我们将所有部分用 Pipeline
串联起来。
# tekton/pipeline/async-workflow-pipeline.yaml
apiVersion: tekton.dev/v1beta1
kind: Pipeline
metadata:
name: async-workflow-pipeline
spec:
params:
- name: payload
type: string
tasks:
- name: init
taskRef:
name: initialize-workflow
params:
- name: payload
value: $(params.payload)
- name: stage-1-gatekeeper
taskRef:
name: gatekeeper
runAfter: [init]
params:
- name: workflow-id
value: $(tasks.init.results.workflow-id)
- name: stage-name
value: "stage1"
- name: run-stage-1
taskRef:
name: call-openfaas-function # 一个通用的调用OpenFaaS的Task
runAfter: [stage-1-gatekeeper]
params:
- name: function-name
value: "stage1-processor"
- name: function-payload
value: $(tasks.init.results.workflow-id)
# 在真实项目中,这里会有 on-error 步骤来报告失败
工作流的视觉化表示如下:
graph TD A[Start PipelineRun] --> B(init: initialize-workflow); B -- workflow-id --> C(stage-1-gatekeeper: check etcd); subgraph "State in etcd" direction LR S1(Status: PENDING) -- read --> C; end C -- Circuit CLOSED --> D(run-stage-1: call OpenFaaS); C -- Circuit OPEN --> E[Fail TaskRun]; D -- workflow-id --> F[OpenFaaS: stage1-processor]; F -- reads --> S2(Status: PENDING, Rev: X); F -- business logic --> G{Update State}; G -- atomic update (CAS on Rev: X) --> S3(Status: STAGE_1_DONE, Rev: X+1);
局限性与未来迭代方向
这套架构解决了 Tekton 在长时工作流场景下状态管理的根本问题,并引入了基于 etcd
的弹性控制。但它并非银弹。
首先,etcd
成为了整个系统的中心瓶颈和单点故障源(尽管 etcd
集群本身是高可用的)。对同一个工作流状态的频繁更新可能会导致热点问题和事务冲突,尤其是在高并发场景下。设计 etcd
的 key 空间和减小状态对象的大小是需要注意的优化点。
其次,当前的设计仍然是“编排”模式(Orchestration),即由 Tekton Pipeline
显式地驱动流程。gatekeeper
任务本质上是一种轮询检查。一个更优雅的演进方向是走向“协同”模式(Choreography)。我们可以部署一个常驻的控制器(Controller/Operator),它使用 etcd
的 Watch
API 监听 /workflows/async/
前缀下的状态变更。当一个工作流状态从 STAGE_1_DONE
变为 STAGE_2_PENDING
时,这个控制器可以直接创建下一个阶段的 TaskRun
或调用相应的 OpenFaaS 函数,从而构建一个完全事件驱动的、反应式的系统。这种方式响应更及时,也减少了 Tekton Task
之间不必要的轮询等待。
最后,此方案中的 workflow-cli
工具和 OpenFaaS 函数中的状态管理逻辑存在重复。在生产环境中,应将其封装成一个统一的、经过充分测试的 Go SDK,供所有业务组件使用,以确保状态操作的一致性和正确性。