在拆分单体应用时,跨多个微服务的事务一致性是首先要面对的硬骨头。一个典型的场景是创建订单,需要依次调用库存服务扣减库存、账户服务扣减余额、订单服务创建订单。如果其中任何一步失败,整个操作需要回滚。在单体应用中,一个数据库事务就能搞定。但在分布式环境中,依赖两阶段提交(2PC)或XA协议通常不是一个好的选择,因为它对系统产生了强耦合和同步阻塞,极大地损害了系统的可用性和性能。
BASE理论和最终一致性模型是更实用的选择。Saga模式是实现最终一致性的经典模式之一,它将一个长事务拆分为一系列本地事务,每个本地事务都有一个对应的补偿操作。当某个步骤失败时,Saga执行器会依次调用前面已成功执行步骤的补偿操作,从而使系统状态回退到事务开始前的状态。
核心挑战在于Saga执行器本身的可靠性。如果执行器在发出一个指令后、记录结果前崩溃,系统将陷入不一致状态。为了解决这个问题,我们将不引入额外的消息队列中间件,而是利用许多项目已经依赖的核心组件——PostgreSQL,通过实现事务性发件箱(Transactional Outbox)模式来构建一个健壮、可观测的Saga执行器。
技术痛点与架构决策
一个简单的Saga流程可能是这样的:
- Saga执行器启动,记录Saga开始状态。
- 执行器调用服务A的
doAction
接口。 - 服务A成功,回调执行器。
- 执行器记录步骤1成功,调用服务B的
doAction
接口。 - 服务B失败,回调执行器。
- 执行器记录步骤2失败,进入补偿流程,调用服务A的
compensateAction
接口。 - 服务A补偿成功,回调执行器。
- 执行器记录补偿成功,Saga结束,状态为“已回滚”。
这里的脆弱点在于“调用”和“记录”这两个动作之间。例如,第4步,如果执行器在成功调用服务B之后、将“调用服务B”这个事实持久化之前崩溃,那么重启后,执行器会认为自己从未调用过服务B,导致流程卡死或重复执行。
事务性发件箱模式通过将“业务数据变更”和“发送消息”这两个操作放在同一个本地数据库事务中来解决此问题。具体到Saga执行器,就是将“更新Saga状态”和“将要发送给下一个服务的指令存入‘发件箱’表”这两个动作合并为一个原子事务。随后,一个独立的“中继器”(Relay)进程会轮询发件箱表,将消息真正地发送出去。
这个方案的好处是:
- 强一致性保障:Saga状态的推进和触发下一步动作的“意图”是原子性地持久化的。只要数据库事务成功,指令就不会丢失。
- 减少外部依赖:无需为Saga引入Kafka或RabbitMQ,简化了技术栈和运维成本。对于中等规模的系统,这是一个非常务实的选择。
- 天然的重试机制:中继器可以安全地重试发送失败的消息,直到下游服务成功接收。这要求下游服务必须实现幂等性。
数据库模型设计
我们需要三张核心表来支撑Saga执行器的运行。
-
sagas
: 存储每个Saga实例的当前状态。 -
saga_steps
: 定义一个Saga模板包含哪些步骤和补偿逻辑。 -
outbox
: 事务性发件箱,存储待发送的指令。
-- file: schema.sql
-- 禁用默认事务,方便在应用层精细控制
SET session_replication_role = 'replica';
-- Saga实例状态表
-- 记录每一个正在运行或已结束的Saga流程实例
CREATE TABLE IF NOT EXISTS sagas (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
saga_name VARCHAR(255) NOT NULL,
current_step INT NOT NULL DEFAULT 0,
status VARCHAR(50) NOT NULL CHECK (status IN ('running', 'compensating', 'completed', 'failed', 'rolled_back')),
payload JSONB NOT NULL, -- 存储整个Saga流程的上下文数据
trace_context JSONB, -- 存储分布式追踪的上下文,用于跨进程传递
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
-- Saga步骤定义表
-- 作为一个静态模板,定义一个特定类型的Saga由哪些步骤构成
CREATE TABLE IF NOT EXISTS saga_steps (
id SERIAL PRIMARY KEY,
saga_name VARCHAR(255) NOT NULL,
step_index INT NOT NULL,
step_name VARCHAR(255) NOT NULL,
action_endpoint VARCHAR(1024) NOT NULL, -- 正常流程调用的服务接口
compensation_endpoint VARCHAR(1024) NOT NULL, -- 补偿流程调用的服务接口
UNIQUE(saga_name, step_index)
);
-- 事务性发件箱表
-- 核心!所有需要发送到外部服务的指令都先写入此表
CREATE TABLE IF NOT EXISTS outbox (
id BIGSERIAL PRIMARY KEY,
saga_id UUID NOT NULL,
saga_step INT NOT NULL,
message_type VARCHAR(50) NOT NULL CHECK (message_type IN ('action', 'compensation')),
endpoint TEXT NOT NULL,
payload JSONB,
trace_context JSONB, -- 同样携带追踪上下文
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
-- `processed_at`为NULL表示消息待发送,非NULL表示已发送
-- 中继器(Relay)会轮询`processed_at IS NULL`的记录
processed_at TIMESTAMPTZ
);
CREATE INDEX idx_outbox_unprocessed ON outbox(created_at) WHERE processed_at IS NULL;
CREATE INDEX idx_sagas_running ON sagas(status);
-- 为演示插入一个名为"create_order"的Saga定义
INSERT INTO saga_steps (saga_name, step_index, step_name, action_endpoint, compensation_endpoint) VALUES
('create_order', 1, 'deduct_inventory', 'http://inventory-service/api/deduct', 'http://inventory-service/api/add'),
('create_order', 2, 'charge_payment', 'http://payment-service/api/charge', 'http://payment-service/api/refund')
ON CONFLICT (saga_name, step_index) DO NOTHING;
这里的关键在于outbox
表。任何Saga状态的变更,如果需要触发外部调用,都必须在同一个事务里向outbox
插入一条记录。
Saga执行器核心实现 (Golang)
我们将使用Go来实现执行器。代码将包含:
- 启动Saga的API。
- 处理参与者回调的API。
- 后台运行的发件箱中继器。
- 完整的Jaeger分布式追踪集成。
1. 启动一个新的Saga流程
这是整个流程的入口。它接收业务请求,开启一个数据库事务,在sagas
表创建实例,并向outbox
表插入第一步的执行指令。
// file: orchestrator.go
package main
import (
"context"
"database/sql"
"encoding/json"
"log"
"net/http"
"time"
"github.com/google/uuid"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/propagation"
semconv "go.opentelemetry.io/otel/semconv/v1.17.0"
"go.opentelemetry.io/otel/trace"
)
type SagaOrchestrator struct {
db *sql.DB
tracer trace.Tracer
}
// StartSagaRequest 定义了启动Saga的请求体
type StartSagaRequest struct {
SagaName string `json:"saga_name"`
Payload json.RawMessage `json:"payload"`
}
// startSagaHandler 是启动Saga的HTTP处理函数
func (s *SagaOrchestrator) startSagaHandler(w http.ResponseWriter, r *http.Request) {
// 从请求头中提取追踪上下文
ctx := otel.GetTextMapPropagator().Extract(r.Context(), propagation.HeaderCarrier(r.Header))
// 开启一个新的追踪Span
ctx, span := s.tracer.Start(ctx, "StartSaga", trace.WithAttributes(semconv.MessagingSystemKey.String("PostgreSQL/Saga")))
defer span.End()
var req StartSagaRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, "Invalid request body", http.StatusBadRequest)
return
}
span.SetAttributes(semconv.MessagingDestinationNameKey.String(req.SagaName))
// 核心逻辑:在一个事务中创建Saga实例并发出第一条指令
sagaID, err := s.initiateSaga(ctx, req.SagaName, req.Payload)
if err != nil {
log.Printf("Error initiating saga: %v", err)
span.RecordError(err)
http.Error(w, "Failed to start saga", http.StatusInternalServerError)
return
}
span.SetAttributes(semconv.MessagingConversationIDKey.String(sagaID.String()))
log.Printf("Saga %s initiated with ID: %s", req.SagaName, sagaID)
w.WriteHeader(http.StatusAccepted)
w.Write([]byte(`{"saga_id": "` + sagaID.String() + `"}`))
}
// initiateSaga 在一个事务内完成Saga的初始化和第一步指令的入队
func (s *SagaOrchestrator) initiateSaga(ctx context.Context, sagaName string, payload json.RawMessage) (uuid.UUID, error) {
ctx, span := s.tracer.Start(ctx, "initiateSagaInDB")
defer span.End()
tx, err := s.db.BeginTx(ctx, nil)
if err != nil {
return uuid.Nil, err
}
// 使用defer确保事务在函数退出时能被处理(回滚或提交)
// 这是一个常见的错误点:忘记处理事务导致连接泄露
defer tx.Rollback()
// 1. 获取第一步的定义
var firstStep struct {
Endpoint string
}
err = tx.QueryRowContext(ctx,
`SELECT action_endpoint FROM saga_steps WHERE saga_name = $1 AND step_index = 1`,
sagaName,
).Scan(&firstStep.Endpoint)
if err != nil {
if err == sql.ErrNoRows {
log.Printf("Saga definition not found for: %s", sagaName)
return uuid.Nil, err
}
return uuid.Nil, err
}
// 2. 创建Saga实例
sagaID := uuid.New()
// 将追踪上下文序列化以便存入数据库
traceContextCarrier := propagation.MapCarrier{}
otel.GetTextMapPropagator().Inject(ctx, traceContextCarrier)
traceContextJSON, _ := json.Marshal(traceContextCarrier)
_, err = tx.ExecContext(ctx,
`INSERT INTO sagas (id, saga_name, status, payload, trace_context) VALUES ($1, $2, 'running', $3, $4)`,
sagaID, sagaName, payload, traceContextJSON,
)
if err != nil {
return uuid.Nil, err
}
// 3. 向发件箱插入第一条指令
_, err = tx.ExecContext(ctx,
`INSERT INTO outbox (saga_id, saga_step, message_type, endpoint, payload, trace_context) VALUES ($1, 1, 'action', $2, $3, $4)`,
sagaID, firstStep.Endpoint, payload, traceContextJSON,
)
if err != nil {
return uuid.Nil, err
}
// 只有当所有数据库操作都成功时,才提交事务
if err = tx.Commit(); err != nil {
return uuid.Nil, err
}
return sagaID, nil
}
注意 initiateSaga
函数,它完美地诠释了事务性发件箱的核心:sagas
表的插入和outbox
表的插入被包裹在同一个tx.Commit()
中,保证了原子性。同时,我们将Jaeger的trace_context
也一并存入数据库,当中继器发送消息时,可以恢复追踪链。
2. 发件箱中继器 (The Relay)
这是一个后台goroutine,它定期扫描outbox
表,并将未处理的消息通过HTTP发送出去。
// file: relay.go
package main
import (
"bytes"
"context"
"database/sql"
"encoding/json"
"log"
"net/http"
"time"
"github.com/google/uuid"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/propagation"
semconv "go.opentelemetry.io/otel/semconv/v1.17.0"
"go.opentelemetry.io/otel/trace"
)
type OutboxRelay struct {
db *sql.DB
client *http.Client
tracer trace.Tracer
}
// NewOutboxRelay 创建一个新的中继器实例
func NewOutboxRelay(db *sql.DB, tracer trace.Tracer) *OutboxRelay {
return &OutboxRelay{
db: db,
client: &http.Client{
Timeout: 10 * time.Second,
},
tracer: tracer,
}
}
// Start 启动中继器的轮询循环
func (r *OutboxRelay) Start(ctx context.Context) {
log.Println("Outbox relay started")
ticker := time.NewTicker(2 * time.Second) // 轮询间隔
defer ticker.Stop()
for {
select {
case <-ctx.Done():
log.Println("Outbox relay stopping")
return
case <-ticker.C:
// 在真实项目中,这里应该有错误处理和重试逻辑
// 为了简化,我们只记录错误
if err := r.processOutbox(ctx); err != nil {
log.Printf("Error processing outbox: %v", err)
}
}
}
}
// processOutbox 查询并处理一批未发送的消息
func (r *OutboxRelay) processOutbox(ctx context.Context) error {
// 注意这里的FOR UPDATE SKIP LOCKED,这是实现多个中继器实例并发处理的关键
// 它会锁定选中的行,其他并发的事务会跳过这些锁定的行,避免了重复处理
rows, err := r.db.QueryContext(ctx, `
SELECT id, saga_id, endpoint, payload, trace_context
FROM outbox
WHERE processed_at IS NULL
ORDER BY created_at ASC
LIMIT 10
FOR UPDATE SKIP LOCKED
`)
if err != nil {
return err
}
defer rows.Close()
for rows.Next() {
var id int64
var sagaID uuid.UUID
var endpoint string
var payload, traceContextJSON []byte
if err := rows.Scan(&id, &sagaID, &endpoint, &payload, &traceContextJSON); err != nil {
log.Printf("Failed to scan outbox row: %v", err)
continue // 处理下一条
}
go r.sendMessage(id, sagaID, endpoint, payload, traceContextJSON)
}
return rows.Err()
}
// sendMessage 实际发送HTTP请求并更新outbox状态
func (r *OutboxRelay) sendMessage(id int64, sagaID uuid.UUID, endpoint string, payload, traceContextJSON []byte) {
// 反序列化追踪上下文,并创建子Span
traceContextCarrier := propagation.MapCarrier{}
_ = json.Unmarshal(traceContextJSON, &traceContextCarrier)
parentCtx := otel.GetTextMapPropagator().Extract(context.Background(), traceContextCarrier)
ctx, span := r.tracer.Start(parentCtx, "RelaySendMessage",
trace.WithSpanKind(trace.SpanKindProducer),
trace.WithAttributes(
semconv.MessagingSystemKey.String("HTTP"),
semconv.MessagingDestinationNameKey.String(endpoint),
semconv.MessagingConversationIDKey.String(sagaID.String()),
),
)
defer span.End()
req, err := http.NewRequestWithContext(ctx, "POST", endpoint, bytes.NewBuffer(payload))
if err != nil {
log.Printf("Failed to create request for outbox message %d: %v", id, err)
span.RecordError(err)
return
}
req.Header.Set("Content-Type", "application/json")
// 这里是关键:将追踪上下文注入到即将发出的HTTP请求头中
otel.GetTextMapPropagator().Inject(ctx, propagation.HeaderCarrier(req.Header))
// 在生产环境中,这里应该有重试逻辑,比如指数退避
resp, err := r.client.Do(req)
if err != nil {
log.Printf("Failed to send outbox message %d to %s: %v", id, endpoint, err)
span.RecordError(err)
return // 失败则不更新processed_at,下次轮询会重试
}
defer resp.Body.Close()
if resp.StatusCode >= 200 && resp.StatusCode < 300 {
// 发送成功,更新processed_at字段
_, err := r.db.ExecContext(ctx, "UPDATE outbox SET processed_at = NOW() WHERE id = $1", id)
if err != nil {
log.Printf("Failed to mark outbox message %d as processed: %v", id, err)
span.RecordError(err)
} else {
log.Printf("Successfully processed and sent outbox message %d", id)
}
} else {
log.Printf("Received non-success status %d for outbox message %d", resp.StatusCode, id)
// 同样不更新,等待下次重试。
// 一个常见的错误是,对于明确的客户端错误(4xx)也进行无限重试,这会浪费资源
// 生产级代码需要区分可重试和不可重试的错误。
}
}
FOR UPDATE SKIP LOCKED
是这个并发中继器模式的精髓。它允许多个中继器实例同时运行而不会争抢同一条消息,提供了水平扩展的能力。
3. 处理参与者的回调
当参与者服务(如库存服务)完成操作后,它会回调执行器的一个接口,告知结果。执行器根据结果决定是推进到下一步还是开始补偿。
// file: orchestrator.go (continued)
type SagaCallbackRequest struct {
SagaID uuid.UUID `json:"saga_id"`
Step int `json:"step"`
Success bool `json:"success"`
Payload json.RawMessage `json:"payload"`
}
func (s *SagaOrchestrator) sagaCallbackHandler(w http.ResponseWriter, r *http.Request) {
ctx := otel.GetTextMapPropagator().Extract(r.Context(), propagation.HeaderCarrier(r.Header))
var req SagaCallbackRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, "Invalid request body", http.StatusBadRequest)
return
}
ctx, span := s.tracer.Start(ctx, "HandleSagaCallback",
trace.WithAttributes(
semconv.MessagingConversationIDKey.String(req.SagaID.String()),
),
)
defer span.End()
// 同样,所有逻辑必须在一个事务中
err := s.processNextStep(ctx, req.SagaID, req.Step, req.Success, req.Payload)
if err != nil {
log.Printf("Error processing next step for saga %s: %v", req.SagaID, err)
span.RecordError(err)
http.Error(w, "Failed to process saga callback", http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusOK)
}
func (s *SagaOrchestrator) processNextStep(ctx context.Context, sagaID uuid.UUID, completedStep int, success bool, newPayload json.RawMessage) error {
ctx, span := s.tracer.Start(ctx, "processNextStepInDB")
defer span.End()
tx, err := s.db.BeginTx(ctx, nil)
if err != nil {
return err
}
defer tx.Rollback()
// 1. 获取当前Saga状态,并加锁
var current saga
err = tx.QueryRowContext(ctx,
`SELECT saga_name, current_step, status, payload, trace_context FROM sagas WHERE id = $1 FOR UPDATE`,
sagaID,
).Scan(¤t.name, ¤t.step, ¤t.status, ¤t.payload, ¤t.traceContext)
if err != nil {
return err
}
// 幂等性检查:如果收到的回调是针对已经处理过的步骤,则直接忽略
if completedStep != current.step {
log.Printf("Ignoring stale callback for saga %s. Expected step %d, got %d", sagaID, current.step, completedStep)
return tx.Commit() // 提交空事务
}
// 合并payload
// 这是一个简化实现,生产环境需要更健壮的JSON合并策略
current.payload = newPayload
if success {
// 推进到下一步
nextStepIndex := current.step + 1
var nextStep stepDefinition
err = tx.QueryRowContext(ctx,
`SELECT action_endpoint FROM saga_steps WHERE saga_name = $1 AND step_index = $2`,
current.name, nextStepIndex,
).Scan(&nextStep.endpoint)
if err == sql.ErrNoRows { // 没有下一步了,Saga成功完成
_, err = tx.ExecContext(ctx,
`UPDATE sagas SET status = 'completed', payload = $1, updated_at = NOW() WHERE id = $2`,
current.payload, sagaID,
)
} else if err != nil {
return err
} else { // 准备执行下一步
_, err = tx.ExecContext(ctx,
`UPDATE sagas SET current_step = $1, payload = $2, updated_at = NOW() WHERE id = $3`,
nextStepIndex, current.payload, sagaID,
)
if err != nil { return err }
_, err = tx.ExecContext(ctx,
`INSERT INTO outbox (saga_id, saga_step, message_type, endpoint, payload, trace_context) VALUES ($1, $2, 'action', $3, $4, $5)`,
sagaID, nextStepIndex, nextStep.endpoint, current.payload, current.traceContext,
)
}
} else {
// 步骤失败,开始补偿
compensationStep := current.step
var compStep stepDefinition
err = tx.QueryRowContext(ctx,
`SELECT compensation_endpoint FROM saga_steps WHERE saga_name = $1 AND step_index = $2`,
current.name, compensationStep,
).Scan(&compStep.endpoint)
if err != nil { return err }
_, err = tx.ExecContext(ctx,
`UPDATE sagas SET status = 'compensating', current_step = $1, payload = $2, updated_at = NOW() WHERE id = $3`,
compensationStep, current.payload, sagaID, // current_step指向正在补偿的步骤
)
if err != nil { return err }
_, err = tx.ExecContext(ctx,
`INSERT INTO outbox (saga_id, saga_step, message_type, endpoint, payload, trace_context) VALUES ($1, $2, 'compensation', $3, $4, $5)`,
sagaID, compensationStep, compStep.endpoint, current.payload, current.traceContext,
)
}
if err != nil {
return err
}
return tx.Commit()
}
// 辅助结构体
type saga struct {
name string
step int
status string
payload json.RawMessage
traceContext json.RawMessage
}
type stepDefinition struct {
endpoint string
}
processNextStep
的逻辑同样被包裹在数据库事务中。它首先用 FOR UPDATE
锁住对应的 sagas
行,防止并发的回调请求导致状态错乱。然后根据回调结果,决定是生成下一步的action
消息,还是当前步骤的compensation
消息,并写入outbox
。
可观测性闭环
通过在每个关键节点(启动、中继器发送、回调处理)都正确地传递和使用trace_context
,我们在Jaeger中可以看到一幅完整的Saga执行图谱。
sequenceDiagram participant Client participant Orchestrator API participant PostgreSQL DB participant Outbox Relay participant Inventory Service Client->>+Orchestrator API: POST /saga/start (create_order) Orchestrator API->>+PostgreSQL DB: BEGIN TX Note right of Orchestrator API: Span: StartSaga PostgreSQL DB->>Orchestrator API: Orchestrator API->>PostgreSQL DB: INSERT INTO sagas Orchestrator API->>PostgreSQL DB: INSERT INTO outbox (deduct_inventory) PostgreSQL DB->>-Orchestrator API: Orchestrator API->>+PostgreSQL DB: COMMIT PostgreSQL DB->>-Orchestrator API: Orchestrator API-->>-Client: 202 Accepted (saga_id) loop Poll for messages Outbox Relay->>+PostgreSQL DB: SELECT FROM outbox FOR UPDATE SKIP LOCKED Note right of Outbox Relay: Span: RelaySendMessage (child of StartSaga) PostgreSQL DB->>-Outbox Relay: returns (deduct_inventory) message end Outbox Relay->>+Inventory Service: POST /api/deduct Note right of Inventory Service: Trace context propagated in headers Inventory Service-->>-Outbox Relay: 200 OK Outbox Relay->>+PostgreSQL DB: UPDATE outbox SET processed_at=NOW() PostgreSQL DB->>-Outbox Relay: Inventory Service->>+Orchestrator API: POST /saga/callback (success) Orchestrator API->>+PostgreSQL DB: BEGIN TX Note right of Orchestrator API: Span: HandleSagaCallback (child of RelaySendMessage) PostgreSQL DB->>Orchestrator API: Orchestrator API->>PostgreSQL DB: SELECT FROM sagas FOR UPDATE Orchestrator API->>PostgreSQL DB: UPDATE sagas (step=2) Orchestrator API->>PostgreSQL DB: INSERT INTO outbox (charge_payment) PostgreSQL DB->>-Orchestrator API: Orchestrator API->>+PostgreSQL DB: COMMIT PostgreSQL DB->>-Orchestrator API: Orchestrator API-->>-Inventory Service: 200 OK
当一个Saga失败并触发补偿时,Jaeger的火焰图会清晰地展示出流程的逆转,哪个服务失败了,哪个补偿操作被触发,以及每个环节的耗时,这对于调试复杂的分布式系统是无价的。
方案的局限性与未来迭代
尽管这个基于PostgreSQL的方案非常务实,但它并非没有缺点。
轮询延迟:中继器的轮询机制带来了固有的延迟。虽然可以通过缩短轮询间隔来缓解,但这会增加数据库的查询压力。在对延迟极度敏感的场景下,可以探索使用PostgreSQL的
LISTEN/NOTIFY
机制来替代轮询,实现事件驱动的实时消息发送,但这会增加实现的复杂性。数据库成为瓶颈:所有Saga指令都流经
outbox
表,在高吞吐量下,这张表可能成为写入热点。可以通过对outbox
表进行分区,并部署多个中继器实例分别处理不同分区来扩展。补偿逻辑的复杂性:本实现只处理了简单的“下一步或补偿上一步”。现实世界中的Saga可能需要更复杂的补偿逻辑,例如补偿顺序的依赖关系、重试补偿操作等。这些需要一个更完备的状态机模型来支持。
“毒丸”消息:如果某条消息因为逻辑错误而永远无法被下游服务成功处理,它会被中继器无限次重试,形成“毒丸”。需要引入一个最大重试次数和死信队列机制,将失败多次的消息转移到单独的表中,供人工排查。
这个方案提供了一个在不引入过多外部依赖的情况下,构建可靠且可观测的分布式事务处理框架的坚实基础。它体现了架构设计中的权衡艺术:利用现有组件的强大功能,以一种优雅且务实的方式解决复杂问题。