实现Saga分布式事务的UI状态同步 C# Kit Recoil与Envoy gRPC流的架构权衡


一个看似简单的用户操作,例如“预订旅行套餐”,在微服务架构下可能触发一系列复杂的后台交互:锁定航班、预订酒店、租赁车辆。这是一个典型的分布式事务场景。任何一步的失败都必须触发已完成步骤的回滚或补偿。在真实项目中,我们不能让用户界面(UI)在整个过程中冻结,也不能让用户面对一个永久旋转的加载图标后被告知“操作失败”。用户需要清晰、实时地了解这个长耗时事务的进展,例如“机票已确认”、“酒店预订中”、“车辆租赁失败,正在回滚机票预订…”。

这个问题的核心在于,如何将后端一个长生命周期、异步、多阶段、状态多变的 Saga 事务,高效且可靠地映射到前端的 UI 状态上。

方案A:轮询与回调的传统困境

最初的构想是基于 HTTP 的短连接模型。前端发起一个启动事务的 API 请求,后端创建一个 Saga 实例,返回一个唯一的事务ID。

// 后端 C# Kit Controller
[Post("/api/trip/book")]
public async Task<IResult> StartBookingSaga([FromBody] BookingRequest request)
{
    var transactionId = Guid.NewGuid();
    _sagaCoordinator.StartBookingSaga(transactionId, request); // 异步启动,不阻塞
    return Results.Accepted(new { TransactionId = transactionId });
}

[Get("/api/trip/status/{transactionId}")]
public async Task<IResult> GetSagaStatus(Guid transactionId)
{
    var status = _sagaStateRepository.GetStatus(transactionId);
    if (status == null) return Results.NotFound();
    return Results.Ok(status);
}

前端拿到 transactionId 后,启动一个定时器,每隔2-3秒轮询 /api/trip/status/{transactionId} 接口。

优势分析:

  1. 实现简单: 这是最直观的方案,前后端都易于理解和实现。
  2. 无状态: 后端的状态接口是无状态的,易于水平扩展。

劣势分析:

  1. 实时性差: UI 更新有明显延迟,取决于轮询间隔。缩短间隔会急剧增加服务端和网络的负载。
  2. 资源浪费: 大部分轮询请求都是无效的,因为后端状态并未改变。这在移动端会造成不必要的电量消耗。
  3. 客户端复杂性: 前端需要管理定时器、处理网络错误、以及在页面卸载时清理定时器,这些逻辑很容易出错。

另一种变体是 Webhook,但它主要适用于服务器到服务器的通信,在浏览器客户端场景下,由于 NAT 和防火墙的存在,几乎不可行。

方案B:WebSocket 的双刃剑

为了解决实时性问题,WebSocket 自然而然地进入了视野。通过建立一个持久的双向连接,服务器可以在 Saga 状态变更时主动推送消息给客户端。

架构设想:

  1. 前端通过 WebSocket 连接到一个专用的网关服务。
  2. 在发起 Saga 事务后,客户端通过 WebSocket 发送一个“订阅”消息,包含 transactionId
  3. Saga 协调器在每次状态变更后,通过消息队列或内部事件总线,将更新通知到 WebSocket 网关,再由网关推送给对应的客户端。

优势分析:

  1. 真·实时: 状态更新几乎是瞬时的,用户体验极佳。
  2. 高效: 没有无效的轮询请求,节省了网络带宽和服务器资源。

劣势分析:

  1. 基础设施复杂性: 需要引入并维护一套 WebSocket 网关集群。这包括连接管理、心跳维持、水平扩展、状态同步等一系列复杂问题。它成了系统架构中一个新的、有状态的关键组件。
  2. 连接管理: 浏览器中断网、刷新页面、切换网络环境都会导致连接断开。需要设计一套可靠的重连和状态恢复机制。客户端重连后,如何获取到断连期间错过的所有状态更新?这通常需要在服务端为每个连接维护一个消息缓冲区。
  3. 安全与鉴权: WebSocket 的鉴权模型与 HTTP 不同,需要额外的 token 传递和验证机制。

在真实项目中,引入一套全新的、有状态的基础设施组件需要慎重评估其运维成本。如果团队已经在使用服务网格(Service Mesh),那么应该优先考虑是否能利用现有设施解决问题。

最终选择:基于 Envoy 与 gRPC 流的解决方案

我们的技术栈中已经包含了用于服务间通信的 Envoy Proxy。Envoy 对 gRPC 提供了强大的原生支持,包括 gRPC-Web 过滤器,它能将浏览器无法直接使用的标准 gRPC 协议转换为 HTTP/1.1 或 HTTP/2 兼容的格式。这为我们提供了一条全新的思路:使用 gRPC 的服务器流(Server Streaming)模式。

架构设计:

  1. 前端 (React/Recoil): 使用 gRPC-Web 客户端库。
  2. 通信层 (Envoy Proxy): 部署为边缘代理,接收来自前端的 gRPC-Web 请求,通过 envoy.grpc_web 过滤器将其转换为标准的 gRPC 请求,并路由到后端的 Saga 协调器服务。
  3. 后端 (C# Kit): 实现一个 gRPC 服务,提供一个服务器流 RPC 方法,用于订阅 Saga 状态。
sequenceDiagram
    participant User
    participant ReactApp (Recoil)
    participant Envoy Proxy
    participant SagaOrchestrator (C# Kit)
    participant FlightService
    participant HotelService

    User->>ReactApp (Recoil): 点击"预订"
    ReactApp (Recoil)->>+Envoy Proxy: POST /api.SagaService/StartTripSaga
    Envoy Proxy->>+SagaOrchestrator (C# Kit): gRPC: StartTripSaga()
    SagaOrchestrator (C# Kit)-->>-Envoy Proxy: response { transactionId }
    Envoy Proxy-->>-ReactApp (Recoil): response { transactionId }

    Note right of ReactApp (Recoil): 获取 transactionId, 准备订阅状态
    ReactApp (Recoil)->>+Envoy Proxy: [gRPC-Web Stream] /api.SagaService/SubscribeSagaUpdates
    Envoy Proxy->>+SagaOrchestrator (C# Kit): [gRPC Stream] SubscribeSagaUpdates()
    Note right of SagaOrchestrator (C# Kit): 记录订阅者, 开始执行Saga

    SagaOrchestrator (C# Kit)->>+FlightService: ReserveFlight()
    FlightService-->>-SagaOrchestrator (C# Kit): Success
    SagaOrchestrator (C# Kit)-->>Envoy Proxy: Stream Push: { step: "Flight", status: "Completed" }
    Envoy Proxy-->>ReactApp (Recoil): Stream Data
    Note right of ReactApp (Recoil): Recoil atom 更新, UI 实时变化

    SagaOrchestrator (C# Kit)->>+HotelService: BookHotel()
    HotelService-->>-SagaOrchestrator (C# Kit): Fail (No rooms)
    SagaOrchestrator (C# Kit)-->>Envoy Proxy: Stream Push: { step: "Hotel", status: "Failed" }
    Envoy Proxy-->>ReactApp (Recoil): Stream Data
    Note right of ReactApp (Recoil): Recoil atom 更新, UI 显示错误

    Note right of SagaOrchestrator (C# Kit): 开始执行补偿事务
    SagaOrchestrator (C# Kit)->>+FlightService: CancelFlightReservation()
    FlightService-->>-SagaOrchestrator (C# Kit): Success
    SagaOrchestrator (C# Kit)-->>Envoy Proxy: Stream Push: { step: "Flight", status: "Compensated" }
    Envoy Proxy-->>ReactApp (Recoil): Stream Data
    Note right of ReactApp (Recoil): Recoil atom 更新, UI 显示已回滚

    SagaOrchestrator (C# Kit)-->>Envoy Proxy: Stream End
    Envoy Proxy-->>ReactApp (Recoil): Stream End

这个方案的精妙之处在于,它将协议转换的复杂性完全交给了 Envoy,C# 后端服务只需实现标准的 gRPC 服务即可,无需关心任何 Web 特有的逻辑。前端则获得了强类型的、实时的流式通信能力。


核心实现概览

1. Protobuf 定义

首先,定义 gRPC 服务和消息的契约。这是前后端通信的基石。

saga.proto:

syntax = "proto3";

package api;

// 开启Saga事务的服务
service SagaService {
  // 启动一个新的旅行预订Saga
  rpc StartTripSaga(StartTripSagaRequest) returns (StartTripSagaResponse);
  
  // 订阅Saga的状态更新, 这是一个服务器流RPC
  rpc SubscribeSagaUpdates(SagaSubscriptionRequest) returns (stream SagaUpdateResponse);
}

message StartTripSagaRequest {
  string user_id = 1;
  string flight_number = 2;
  string hotel_name = 3;
}

message StartTripSagaResponse {
  string transaction_id = 1;
}

message SagaSubscriptionRequest {
  string transaction_id = 1;
}

enum SagaStepStatus {
  UNKNOWN = 0;
  PENDING = 1;
  COMPLETED = 2;
  FAILED = 3;
  COMPENSATING = 4;
  COMPENSATED = 5;
}

message SagaUpdateResponse {
  string transaction_id = 1;
  string step_name = 2; // e.g., "FlightBooking", "HotelReservation"
  SagaStepStatus status = 3;
  string details = 4; // 额外信息,如错误原因
}

2. C# Kit 后端 Saga 协调器与 gRPC 服务

我们使用 Kit 框架来承载 gRPC 服务。这里的 Saga 协调器是简化的,用于演示核心逻辑。在生产环境中,状态和订阅者列表需要持久化到 Redis 或数据库中以支持服务重启和扩展。

SagaOrchestratorService.cs:

using Grpc.Core;
using System.Collections.Concurrent;
using System.Threading.Channels;

namespace MyProject.Services;

// 一个简化的、内存中的Saga状态机和订阅管理器
// 生产环境需要替换为持久化存储,例如Redis/PostgreSQL
public class InMemorySagaManager
{
    // transactionId -> Channel<SagaUpdateResponse>
    private readonly ConcurrentDictionary<string, Channel<SagaUpdateResponse>> _subscribers = new();
    // transactionId -> ConcurrentDictionary<stepName, SagaUpdateResponse>
    private readonly ConcurrentDictionary<string, ConcurrentDictionary<string, SagaUpdateResponse>> _sagaStates = new();

    public ChannelReader<SagaUpdateResponse> Subscribe(string transactionId)
    {
        var channel = Channel.CreateUnbounded<SagaUpdateResponse>();
        _subscribers.AddOrUpdate(transactionId, channel, (_, existing) => channel);
        
        // 如果已经有历史状态,立即推送
        if (_sagaStates.TryGetValue(transactionId, out var existingStates))
        {
            foreach (var state in existingStates.Values)
            {
                channel.Writer.TryWrite(state);
            }
        }

        return channel.Reader;
    }

    public void Unsubscribe(string transactionId)
    {
        if (_subscribers.TryRemove(transactionId, out var channel))
        {
            channel.Writer.TryComplete();
        }
    }

    public async Task PublishUpdate(SagaUpdateResponse update)
    {
        // 1. 持久化当前状态
        var states = _sagaStates.GetOrAdd(update.TransactionId, new ConcurrentDictionary<string, SagaUpdateResponse>());
        states[update.StepName] = update;

        // 2. 推送给订阅者
        if (_subscribers.TryGetValue(update.TransactionId, out var channel))
        {
            await channel.Writer.WriteAsync(update);
        }
    }

    // ... 其他启动、执行Saga的逻辑
}


public class SagaGrpcService : Api.SagaService.SagaServiceBase
{
    private readonly ILogger<SagaGrpcService> _logger;
    private readonly InMemorySagaManager _sagaManager;
    // 假设这是执行Saga逻辑的服务
    private readonly ITripBookingSaga _tripBookingSaga;

    public SagaGrpcService(ILogger<SagaGrpcService> logger, InMemorySagaManager sagaManager, ITripBookingSaga tripBookingSaga)
    {
        _logger = logger;
        _sagaManager = sagaManager;
        _tripBookingSaga = tripBookingSaga;
    }

    public override Task<StartTripSagaResponse> StartTripSaga(StartTripSagaRequest request, ServerCallContext context)
    {
        var transactionId = Guid.NewGuid().ToString();
        _logger.LogInformation("Starting new Saga with TransactionId: {TransactionId}", transactionId);

        // 异步执行,不阻塞gRPC调用
        _ = _tripBookingSaga.ExecuteAsync(transactionId, request);

        return Task.FromResult(new StartTripSagaResponse { TransactionId = transactionId });
    }

    public override async Task SubscribeSagaUpdates(
        SagaSubscriptionRequest request, 
        IServerStreamWriter<SagaUpdateResponse> responseStream, 
        ServerCallContext context)
    {
        var transactionId = request.TransactionId;
        _logger.LogInformation("Client subscribed for Saga updates on TransactionId: {TransactionId}", transactionId);
        
        var reader = _sagaManager.Subscribe(transactionId);
        
        // 监听取消事件,以便及时清理资源
        context.CancellationToken.Register(() =>
        {
            _logger.LogWarning("Client disconnected for TransactionId: {TransactionId}", transactionId);
            _sagaManager.Unsubscribe(transactionId);
        });

        try
        {
            // 从Channel中读取更新并写入响应流
            await foreach (var update in reader.ReadAllAsync(context.CancellationToken))
            {
                await responseStream.WriteAsync(update);
            }
        }
        catch (OperationCanceledException)
        {
             _logger.LogWarning("Subscription cancelled for TransactionId: {TransactionId}", transactionId);
        }
        finally
        {
            _sagaManager.Unsubscribe(transactionId);
        }
    }
}

单元测试思路: 可以模拟 ITripBookingSaga 的行为,验证 SagaGrpcService 是否能正确调用 _sagaManagerSubscribePublishUpdate 方法,并检查 responseStream 是否被写入了预期的数据。对于 InMemorySagaManager,需要测试其线程安全性和订阅/发布逻辑的正确性。

3. Envoy Proxy 配置

这是连接前后端的桥梁。关键配置在于 HttpConnectionManager 中的 envoy.filters.http.grpc_web 和 CORS 策略。

envoy.yaml:

static_resources:
  listeners:
  - name: listener_0
    address:
      socket_address: { address: 0.0.0.0, port_value: 8080 }
    filter_chains:
    - filters:
      - name: envoy.filters.network.http_connection_manager
        typed_config:
          "@type": type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager
          stat_prefix: ingress_http
          route_config:
            name: local_route
            virtual_hosts:
            - name: local_service
              domains: ["*"]
              routes:
              - match: { prefix: "/" }
                route:
                  cluster: saga_service
                  # 允许gRPC流超时时间更长
                  timeout: 0s
              cors:
                allow_origin_string_match:
                  - prefix: "*"
                allow_methods: GET, PUT, DELETE, POST, OPTIONS
                allow_headers: keep-alive,user-agent,cache-control,content-type,content-transfer-encoding,custom-header-1,x-accept-content-transfer-encoding,x-accept-response-streaming,x-user-agent,x-grpc-web,grpc-timeout
                expose_headers: custom-header-1,grpc-status,grpc-message
          http_filters:
          - name: envoy.filters.http.grpc_web
            typed_config:
              "@type": type.googleapis.com/envoy.extensions.filters.http.grpc_web.v3.GrpcWeb
          - name: envoy.filters.http.cors
            typed_config:
              "@type": type.googleapis.com/envoy.extensions.filters.http.cors.v3.Cors
          - name: envoy.filters.http.router
            typed_config:
              "@type": type.googleapis.com/envoy.extensions.filters.http.router.v3.Router
  clusters:
  - name: saga_service
    connect_timeout: 0.25s
    type: LOGICAL_DNS
    # 使用HTTP/2与后端gRPC服务通信
    typed_extension_protocol_options:
      envoy.extensions.upstreams.http.v3.HttpProtocolOptions:
        "@type": type.googleapis.com/envoy.extensions.upstreams.http.v3.HttpProtocolOptions
        explicit_http_config:
          http2_protocol_options: {}
    lb_policy: ROUND_ROBIN
    load_assignment:
      cluster_name: saga_service
      endpoints:
      - lb_endpoints:
        - endpoint:
            address:
              # 使用host.docker.internal从Docker容器访问宿主机服务
              socket_address: { address: host.docker.internal, port_value: 5001 }

一个常见的坑在于CORS配置。gRPC-Web 会发送一些特定的 x-grpc- 前缀的头部,这些必须在 allow_headers 中明确列出,否则浏览器的 preflight OPTIONS 请求就会失败。

4. React 前端与 Recoil 状态管理

Recoil 的原子化状态管理模型非常适合这个场景。我们可以为每个 Saga 步骤创建一个状态原子,然后用一个组件来管理 gRPC 流的生命周期,并将收到的更新分发到对应的原子中。

首先生成客户端代码:
protoc --js_out=import_style=commonjs,binary:. --grpc-web_out=import_style=typescript,mode=grpcwebtext:. saga.proto

state.js (Recoil Atoms):

import { atomFamily, selectorFamily } from 'recoil';

// 定义每个Saga步骤的状态结构
export const sagaStepStateFamily = atomFamily({
  key: 'SagaStepState',
  default: (params) => ({ // params will be { transactionId, stepName }
    status: 'PENDING',
    details: 'Waiting to start...',
  }),
});

// 一个selector用于获取整个Saga的所有步骤状态
export const sagaOverallStateSelector = selectorFamily({
  key: 'SagaOverallState',
  get: (transactionId) => ({ get }) => {
    // 假设我们知道一个Saga有哪些步骤
    const steps = ['FlightBooking', 'HotelReservation', 'CarRental']; 
    const stepStates = steps.map(stepName => 
      get(sagaStepStateFamily({ transactionId, stepName }))
    );
    return stepStates;
  },
});

SagaSubscriptionManager.jsx (管理gRPC流的组件):

import { useEffect } from 'react';
import { useSetRecoilState } from 'recoil';
import { SagaServiceClient } from './saga_grpc_web_pb';
import { SagaSubscriptionRequest } from './saga_pb';
import { sagaStepStateFamily } from './state';

const client = new SagaServiceClient('http://localhost:8080'); // Envoy的地址

// 这是一个无UI的逻辑组件,负责后台订阅和更新Recoil状态
export const SagaSubscriptionManager = ({ transactionId }) => {
  const setStepState = useSetRecoilState(sagaStepStateFamily);

  useEffect(() => {
    if (!transactionId) return;

    console.log(`Subscribing to updates for transaction: ${transactionId}`);
    const request = new SagaSubscriptionRequest();
    request.setTransactionId(transactionId);

    const stream = client.subscribeSagaUpdates(request, {});

    stream.on('data', (response) => {
      const update = response.toObject();
      console.log('Received saga update:', update);
      // 更新对应的Recoil atom
      setStepState({ transactionId, stepName: update.stepName }, (prevState) => ({
        ...prevState,
        status: update.status, // Note: status enum需要从数字映射到字符串
        details: update.details,
      }));
    });

    stream.on('status', (status) => {
      if (status.code !== 0) {
        console.error(`Stream error: ${status.details}`);
        // 可以在这里设置一个全局的错误状态atom
      }
    });

    stream.on('end', () => {
      console.log(`Stream ended for transaction: ${transactionId}`);
    });

    // 组件卸载时,取消gRPC流
    return () => {
      console.log('Cleaning up stream.');
      stream.cancel();
    };
  }, [transactionId, setStepState]);

  return null; // No UI rendered by this component
};

BookingStatus.jsx (显示状态的UI组件):

import { useRecoilValue } from 'recoil';
import { sagaStepStateFamily } from './state';

const StepDisplay = ({ transactionId, stepName }) => {
  const { status, details } = useRecoilValue(sagaStepStateFamily({ transactionId, stepName }));
  return (
    <div>
      <h3>{stepName}</h3>
      <p>Status: <strong>{status}</strong></p>
      <p>Details: <em>{details}</em></p>
    </div>
  );
};

export const BookingStatus = ({ transactionId }) => {
  if (!transactionId) return <p>Start a booking to see its status.</p>;

  return (
    <div>
      <h2>Booking Status (Transaction ID: {transactionId})</h2>
      <StepDisplay transactionId={transactionId} stepName="FlightBooking" />
      <StepDisplay transactionId={transactionId} stepName="HotelReservation" />
      {/* ... other steps */}
    </div>
  );
};

Recoil 的优势在这里体现得淋漓尽致:SagaSubscriptionManager 负责与外部世界(gRPC流)交互,它唯一的职责就是将数据推入 Recoil 的状态池。StepDisplay 组件则完全与数据源解耦,它只关心 sagaStepStateFamily 这个状态原子。当状态更新时,只有订阅了该特定原子的组件会重新渲染,实现了高效、精确的UI更新。

架构的扩展性与局限性

该架构在扩展性上表现良好。增加一个新的 Saga 步骤,只需要在后端协调器和 Protobuf 中定义,前端增加一个对应的 StepDisplay 组件即可,核心通信逻辑无需改动。Envoy 的能力远不止于此,我们可以轻松地在其上叠加认证(JWT)、速率限制、精细化路由等策略。

然而,当前方案也存在一些局限性:

  1. Saga 协调器的单点问题与状态持久化: 示例中的 InMemorySagaManager 在生产环境中是不可接受的。协调器的状态必须被持久化,例如写入 Redis 或数据库。同时,为了高可用,协调器服务本身需要部署多个实例。这引入了新的问题:当一个 gRPC 流连接到实例A,而 Saga 的状态更新事件由实例B处理时,如何将更新推送给实例A上的流?这通常需要一个 Pub/Sub 系统(如 Redis Pub/Sub, Kafka)来在实例间广播状态更新。

  2. 客户端重连与状态恢复: 当前实现没有处理网络断开重连的场景。一个健壮的客户端应该在重连后,能够向服务端请求自上次断开以来的所有状态变更,或者直接请求所有步骤的当前全量状态,然后再继续监听流。这需要在 gRPC 服务中增加一个新的 RPC 方法,如 GetSagaCurrentState(transactionId)

  3. gRPC-Web 协议限制: gRPC-Web 目前不支持客户端流和双向流,仅支持一元调用和服务器流。对于当前“服务器推送状态”的场景来说这已足够,但如果未来有需要客户端持续向服务器推流的场景,则需要评估其他技术或等待 gRPC-Web 的发展。


  目录