一个看似简单的用户操作,例如“预订旅行套餐”,在微服务架构下可能触发一系列复杂的后台交互:锁定航班、预订酒店、租赁车辆。这是一个典型的分布式事务场景。任何一步的失败都必须触发已完成步骤的回滚或补偿。在真实项目中,我们不能让用户界面(UI)在整个过程中冻结,也不能让用户面对一个永久旋转的加载图标后被告知“操作失败”。用户需要清晰、实时地了解这个长耗时事务的进展,例如“机票已确认”、“酒店预订中”、“车辆租赁失败,正在回滚机票预订…”。
这个问题的核心在于,如何将后端一个长生命周期、异步、多阶段、状态多变的 Saga 事务,高效且可靠地映射到前端的 UI 状态上。
方案A:轮询与回调的传统困境
最初的构想是基于 HTTP 的短连接模型。前端发起一个启动事务的 API 请求,后端创建一个 Saga 实例,返回一个唯一的事务ID。
// 后端 C# Kit Controller
[Post("/api/trip/book")]
public async Task<IResult> StartBookingSaga([FromBody] BookingRequest request)
{
var transactionId = Guid.NewGuid();
_sagaCoordinator.StartBookingSaga(transactionId, request); // 异步启动,不阻塞
return Results.Accepted(new { TransactionId = transactionId });
}
[Get("/api/trip/status/{transactionId}")]
public async Task<IResult> GetSagaStatus(Guid transactionId)
{
var status = _sagaStateRepository.GetStatus(transactionId);
if (status == null) return Results.NotFound();
return Results.Ok(status);
}
前端拿到 transactionId
后,启动一个定时器,每隔2-3秒轮询 /api/trip/status/{transactionId}
接口。
优势分析:
- 实现简单: 这是最直观的方案,前后端都易于理解和实现。
- 无状态: 后端的状态接口是无状态的,易于水平扩展。
劣势分析:
- 实时性差: UI 更新有明显延迟,取决于轮询间隔。缩短间隔会急剧增加服务端和网络的负载。
- 资源浪费: 大部分轮询请求都是无效的,因为后端状态并未改变。这在移动端会造成不必要的电量消耗。
- 客户端复杂性: 前端需要管理定时器、处理网络错误、以及在页面卸载时清理定时器,这些逻辑很容易出错。
另一种变体是 Webhook,但它主要适用于服务器到服务器的通信,在浏览器客户端场景下,由于 NAT 和防火墙的存在,几乎不可行。
方案B:WebSocket 的双刃剑
为了解决实时性问题,WebSocket 自然而然地进入了视野。通过建立一个持久的双向连接,服务器可以在 Saga 状态变更时主动推送消息给客户端。
架构设想:
- 前端通过 WebSocket 连接到一个专用的网关服务。
- 在发起 Saga 事务后,客户端通过 WebSocket 发送一个“订阅”消息,包含
transactionId
。 - Saga 协调器在每次状态变更后,通过消息队列或内部事件总线,将更新通知到 WebSocket 网关,再由网关推送给对应的客户端。
优势分析:
- 真·实时: 状态更新几乎是瞬时的,用户体验极佳。
- 高效: 没有无效的轮询请求,节省了网络带宽和服务器资源。
劣势分析:
- 基础设施复杂性: 需要引入并维护一套 WebSocket 网关集群。这包括连接管理、心跳维持、水平扩展、状态同步等一系列复杂问题。它成了系统架构中一个新的、有状态的关键组件。
- 连接管理: 浏览器中断网、刷新页面、切换网络环境都会导致连接断开。需要设计一套可靠的重连和状态恢复机制。客户端重连后,如何获取到断连期间错过的所有状态更新?这通常需要在服务端为每个连接维护一个消息缓冲区。
- 安全与鉴权: WebSocket 的鉴权模型与 HTTP 不同,需要额外的 token 传递和验证机制。
在真实项目中,引入一套全新的、有状态的基础设施组件需要慎重评估其运维成本。如果团队已经在使用服务网格(Service Mesh),那么应该优先考虑是否能利用现有设施解决问题。
最终选择:基于 Envoy 与 gRPC 流的解决方案
我们的技术栈中已经包含了用于服务间通信的 Envoy Proxy。Envoy 对 gRPC 提供了强大的原生支持,包括 gRPC-Web 过滤器,它能将浏览器无法直接使用的标准 gRPC 协议转换为 HTTP/1.1 或 HTTP/2 兼容的格式。这为我们提供了一条全新的思路:使用 gRPC 的服务器流(Server Streaming)模式。
架构设计:
- 前端 (React/Recoil): 使用 gRPC-Web 客户端库。
- 通信层 (Envoy Proxy): 部署为边缘代理,接收来自前端的 gRPC-Web 请求,通过
envoy.grpc_web
过滤器将其转换为标准的 gRPC 请求,并路由到后端的 Saga 协调器服务。 - 后端 (C# Kit): 实现一个 gRPC 服务,提供一个服务器流 RPC 方法,用于订阅 Saga 状态。
sequenceDiagram participant User participant ReactApp (Recoil) participant Envoy Proxy participant SagaOrchestrator (C# Kit) participant FlightService participant HotelService User->>ReactApp (Recoil): 点击"预订" ReactApp (Recoil)->>+Envoy Proxy: POST /api.SagaService/StartTripSaga Envoy Proxy->>+SagaOrchestrator (C# Kit): gRPC: StartTripSaga() SagaOrchestrator (C# Kit)-->>-Envoy Proxy: response { transactionId } Envoy Proxy-->>-ReactApp (Recoil): response { transactionId } Note right of ReactApp (Recoil): 获取 transactionId, 准备订阅状态 ReactApp (Recoil)->>+Envoy Proxy: [gRPC-Web Stream] /api.SagaService/SubscribeSagaUpdates Envoy Proxy->>+SagaOrchestrator (C# Kit): [gRPC Stream] SubscribeSagaUpdates() Note right of SagaOrchestrator (C# Kit): 记录订阅者, 开始执行Saga SagaOrchestrator (C# Kit)->>+FlightService: ReserveFlight() FlightService-->>-SagaOrchestrator (C# Kit): Success SagaOrchestrator (C# Kit)-->>Envoy Proxy: Stream Push: { step: "Flight", status: "Completed" } Envoy Proxy-->>ReactApp (Recoil): Stream Data Note right of ReactApp (Recoil): Recoil atom 更新, UI 实时变化 SagaOrchestrator (C# Kit)->>+HotelService: BookHotel() HotelService-->>-SagaOrchestrator (C# Kit): Fail (No rooms) SagaOrchestrator (C# Kit)-->>Envoy Proxy: Stream Push: { step: "Hotel", status: "Failed" } Envoy Proxy-->>ReactApp (Recoil): Stream Data Note right of ReactApp (Recoil): Recoil atom 更新, UI 显示错误 Note right of SagaOrchestrator (C# Kit): 开始执行补偿事务 SagaOrchestrator (C# Kit)->>+FlightService: CancelFlightReservation() FlightService-->>-SagaOrchestrator (C# Kit): Success SagaOrchestrator (C# Kit)-->>Envoy Proxy: Stream Push: { step: "Flight", status: "Compensated" } Envoy Proxy-->>ReactApp (Recoil): Stream Data Note right of ReactApp (Recoil): Recoil atom 更新, UI 显示已回滚 SagaOrchestrator (C# Kit)-->>Envoy Proxy: Stream End Envoy Proxy-->>ReactApp (Recoil): Stream End
这个方案的精妙之处在于,它将协议转换的复杂性完全交给了 Envoy,C# 后端服务只需实现标准的 gRPC 服务即可,无需关心任何 Web 特有的逻辑。前端则获得了强类型的、实时的流式通信能力。
核心实现概览
1. Protobuf 定义
首先,定义 gRPC 服务和消息的契约。这是前后端通信的基石。
saga.proto
:
syntax = "proto3";
package api;
// 开启Saga事务的服务
service SagaService {
// 启动一个新的旅行预订Saga
rpc StartTripSaga(StartTripSagaRequest) returns (StartTripSagaResponse);
// 订阅Saga的状态更新, 这是一个服务器流RPC
rpc SubscribeSagaUpdates(SagaSubscriptionRequest) returns (stream SagaUpdateResponse);
}
message StartTripSagaRequest {
string user_id = 1;
string flight_number = 2;
string hotel_name = 3;
}
message StartTripSagaResponse {
string transaction_id = 1;
}
message SagaSubscriptionRequest {
string transaction_id = 1;
}
enum SagaStepStatus {
UNKNOWN = 0;
PENDING = 1;
COMPLETED = 2;
FAILED = 3;
COMPENSATING = 4;
COMPENSATED = 5;
}
message SagaUpdateResponse {
string transaction_id = 1;
string step_name = 2; // e.g., "FlightBooking", "HotelReservation"
SagaStepStatus status = 3;
string details = 4; // 额外信息,如错误原因
}
2. C# Kit 后端 Saga 协调器与 gRPC 服务
我们使用 Kit 框架来承载 gRPC 服务。这里的 Saga 协调器是简化的,用于演示核心逻辑。在生产环境中,状态和订阅者列表需要持久化到 Redis 或数据库中以支持服务重启和扩展。
SagaOrchestratorService.cs
:
using Grpc.Core;
using System.Collections.Concurrent;
using System.Threading.Channels;
namespace MyProject.Services;
// 一个简化的、内存中的Saga状态机和订阅管理器
// 生产环境需要替换为持久化存储,例如Redis/PostgreSQL
public class InMemorySagaManager
{
// transactionId -> Channel<SagaUpdateResponse>
private readonly ConcurrentDictionary<string, Channel<SagaUpdateResponse>> _subscribers = new();
// transactionId -> ConcurrentDictionary<stepName, SagaUpdateResponse>
private readonly ConcurrentDictionary<string, ConcurrentDictionary<string, SagaUpdateResponse>> _sagaStates = new();
public ChannelReader<SagaUpdateResponse> Subscribe(string transactionId)
{
var channel = Channel.CreateUnbounded<SagaUpdateResponse>();
_subscribers.AddOrUpdate(transactionId, channel, (_, existing) => channel);
// 如果已经有历史状态,立即推送
if (_sagaStates.TryGetValue(transactionId, out var existingStates))
{
foreach (var state in existingStates.Values)
{
channel.Writer.TryWrite(state);
}
}
return channel.Reader;
}
public void Unsubscribe(string transactionId)
{
if (_subscribers.TryRemove(transactionId, out var channel))
{
channel.Writer.TryComplete();
}
}
public async Task PublishUpdate(SagaUpdateResponse update)
{
// 1. 持久化当前状态
var states = _sagaStates.GetOrAdd(update.TransactionId, new ConcurrentDictionary<string, SagaUpdateResponse>());
states[update.StepName] = update;
// 2. 推送给订阅者
if (_subscribers.TryGetValue(update.TransactionId, out var channel))
{
await channel.Writer.WriteAsync(update);
}
}
// ... 其他启动、执行Saga的逻辑
}
public class SagaGrpcService : Api.SagaService.SagaServiceBase
{
private readonly ILogger<SagaGrpcService> _logger;
private readonly InMemorySagaManager _sagaManager;
// 假设这是执行Saga逻辑的服务
private readonly ITripBookingSaga _tripBookingSaga;
public SagaGrpcService(ILogger<SagaGrpcService> logger, InMemorySagaManager sagaManager, ITripBookingSaga tripBookingSaga)
{
_logger = logger;
_sagaManager = sagaManager;
_tripBookingSaga = tripBookingSaga;
}
public override Task<StartTripSagaResponse> StartTripSaga(StartTripSagaRequest request, ServerCallContext context)
{
var transactionId = Guid.NewGuid().ToString();
_logger.LogInformation("Starting new Saga with TransactionId: {TransactionId}", transactionId);
// 异步执行,不阻塞gRPC调用
_ = _tripBookingSaga.ExecuteAsync(transactionId, request);
return Task.FromResult(new StartTripSagaResponse { TransactionId = transactionId });
}
public override async Task SubscribeSagaUpdates(
SagaSubscriptionRequest request,
IServerStreamWriter<SagaUpdateResponse> responseStream,
ServerCallContext context)
{
var transactionId = request.TransactionId;
_logger.LogInformation("Client subscribed for Saga updates on TransactionId: {TransactionId}", transactionId);
var reader = _sagaManager.Subscribe(transactionId);
// 监听取消事件,以便及时清理资源
context.CancellationToken.Register(() =>
{
_logger.LogWarning("Client disconnected for TransactionId: {TransactionId}", transactionId);
_sagaManager.Unsubscribe(transactionId);
});
try
{
// 从Channel中读取更新并写入响应流
await foreach (var update in reader.ReadAllAsync(context.CancellationToken))
{
await responseStream.WriteAsync(update);
}
}
catch (OperationCanceledException)
{
_logger.LogWarning("Subscription cancelled for TransactionId: {TransactionId}", transactionId);
}
finally
{
_sagaManager.Unsubscribe(transactionId);
}
}
}
单元测试思路: 可以模拟 ITripBookingSaga
的行为,验证 SagaGrpcService
是否能正确调用 _sagaManager
的 Subscribe
和 PublishUpdate
方法,并检查 responseStream
是否被写入了预期的数据。对于 InMemorySagaManager
,需要测试其线程安全性和订阅/发布逻辑的正确性。
3. Envoy Proxy 配置
这是连接前后端的桥梁。关键配置在于 HttpConnectionManager
中的 envoy.filters.http.grpc_web
和 CORS 策略。
envoy.yaml
:
static_resources:
listeners:
- name: listener_0
address:
socket_address: { address: 0.0.0.0, port_value: 8080 }
filter_chains:
- filters:
- name: envoy.filters.network.http_connection_manager
typed_config:
"@type": type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager
stat_prefix: ingress_http
route_config:
name: local_route
virtual_hosts:
- name: local_service
domains: ["*"]
routes:
- match: { prefix: "/" }
route:
cluster: saga_service
# 允许gRPC流超时时间更长
timeout: 0s
cors:
allow_origin_string_match:
- prefix: "*"
allow_methods: GET, PUT, DELETE, POST, OPTIONS
allow_headers: keep-alive,user-agent,cache-control,content-type,content-transfer-encoding,custom-header-1,x-accept-content-transfer-encoding,x-accept-response-streaming,x-user-agent,x-grpc-web,grpc-timeout
expose_headers: custom-header-1,grpc-status,grpc-message
http_filters:
- name: envoy.filters.http.grpc_web
typed_config:
"@type": type.googleapis.com/envoy.extensions.filters.http.grpc_web.v3.GrpcWeb
- name: envoy.filters.http.cors
typed_config:
"@type": type.googleapis.com/envoy.extensions.filters.http.cors.v3.Cors
- name: envoy.filters.http.router
typed_config:
"@type": type.googleapis.com/envoy.extensions.filters.http.router.v3.Router
clusters:
- name: saga_service
connect_timeout: 0.25s
type: LOGICAL_DNS
# 使用HTTP/2与后端gRPC服务通信
typed_extension_protocol_options:
envoy.extensions.upstreams.http.v3.HttpProtocolOptions:
"@type": type.googleapis.com/envoy.extensions.upstreams.http.v3.HttpProtocolOptions
explicit_http_config:
http2_protocol_options: {}
lb_policy: ROUND_ROBIN
load_assignment:
cluster_name: saga_service
endpoints:
- lb_endpoints:
- endpoint:
address:
# 使用host.docker.internal从Docker容器访问宿主机服务
socket_address: { address: host.docker.internal, port_value: 5001 }
一个常见的坑在于CORS配置。gRPC-Web 会发送一些特定的 x-
和 grpc-
前缀的头部,这些必须在 allow_headers
中明确列出,否则浏览器的 preflight OPTIONS
请求就会失败。
4. React 前端与 Recoil 状态管理
Recoil 的原子化状态管理模型非常适合这个场景。我们可以为每个 Saga 步骤创建一个状态原子,然后用一个组件来管理 gRPC 流的生命周期,并将收到的更新分发到对应的原子中。
首先生成客户端代码:protoc --js_out=import_style=commonjs,binary:. --grpc-web_out=import_style=typescript,mode=grpcwebtext:. saga.proto
state.js
(Recoil Atoms):
import { atomFamily, selectorFamily } from 'recoil';
// 定义每个Saga步骤的状态结构
export const sagaStepStateFamily = atomFamily({
key: 'SagaStepState',
default: (params) => ({ // params will be { transactionId, stepName }
status: 'PENDING',
details: 'Waiting to start...',
}),
});
// 一个selector用于获取整个Saga的所有步骤状态
export const sagaOverallStateSelector = selectorFamily({
key: 'SagaOverallState',
get: (transactionId) => ({ get }) => {
// 假设我们知道一个Saga有哪些步骤
const steps = ['FlightBooking', 'HotelReservation', 'CarRental'];
const stepStates = steps.map(stepName =>
get(sagaStepStateFamily({ transactionId, stepName }))
);
return stepStates;
},
});
SagaSubscriptionManager.jsx
(管理gRPC流的组件):
import { useEffect } from 'react';
import { useSetRecoilState } from 'recoil';
import { SagaServiceClient } from './saga_grpc_web_pb';
import { SagaSubscriptionRequest } from './saga_pb';
import { sagaStepStateFamily } from './state';
const client = new SagaServiceClient('http://localhost:8080'); // Envoy的地址
// 这是一个无UI的逻辑组件,负责后台订阅和更新Recoil状态
export const SagaSubscriptionManager = ({ transactionId }) => {
const setStepState = useSetRecoilState(sagaStepStateFamily);
useEffect(() => {
if (!transactionId) return;
console.log(`Subscribing to updates for transaction: ${transactionId}`);
const request = new SagaSubscriptionRequest();
request.setTransactionId(transactionId);
const stream = client.subscribeSagaUpdates(request, {});
stream.on('data', (response) => {
const update = response.toObject();
console.log('Received saga update:', update);
// 更新对应的Recoil atom
setStepState({ transactionId, stepName: update.stepName }, (prevState) => ({
...prevState,
status: update.status, // Note: status enum需要从数字映射到字符串
details: update.details,
}));
});
stream.on('status', (status) => {
if (status.code !== 0) {
console.error(`Stream error: ${status.details}`);
// 可以在这里设置一个全局的错误状态atom
}
});
stream.on('end', () => {
console.log(`Stream ended for transaction: ${transactionId}`);
});
// 组件卸载时,取消gRPC流
return () => {
console.log('Cleaning up stream.');
stream.cancel();
};
}, [transactionId, setStepState]);
return null; // No UI rendered by this component
};
BookingStatus.jsx
(显示状态的UI组件):
import { useRecoilValue } from 'recoil';
import { sagaStepStateFamily } from './state';
const StepDisplay = ({ transactionId, stepName }) => {
const { status, details } = useRecoilValue(sagaStepStateFamily({ transactionId, stepName }));
return (
<div>
<h3>{stepName}</h3>
<p>Status: <strong>{status}</strong></p>
<p>Details: <em>{details}</em></p>
</div>
);
};
export const BookingStatus = ({ transactionId }) => {
if (!transactionId) return <p>Start a booking to see its status.</p>;
return (
<div>
<h2>Booking Status (Transaction ID: {transactionId})</h2>
<StepDisplay transactionId={transactionId} stepName="FlightBooking" />
<StepDisplay transactionId={transactionId} stepName="HotelReservation" />
{/* ... other steps */}
</div>
);
};
Recoil 的优势在这里体现得淋漓尽致:SagaSubscriptionManager
负责与外部世界(gRPC流)交互,它唯一的职责就是将数据推入 Recoil 的状态池。StepDisplay
组件则完全与数据源解耦,它只关心 sagaStepStateFamily
这个状态原子。当状态更新时,只有订阅了该特定原子的组件会重新渲染,实现了高效、精确的UI更新。
架构的扩展性与局限性
该架构在扩展性上表现良好。增加一个新的 Saga 步骤,只需要在后端协调器和 Protobuf 中定义,前端增加一个对应的 StepDisplay
组件即可,核心通信逻辑无需改动。Envoy 的能力远不止于此,我们可以轻松地在其上叠加认证(JWT)、速率限制、精细化路由等策略。
然而,当前方案也存在一些局限性:
Saga 协调器的单点问题与状态持久化: 示例中的
InMemorySagaManager
在生产环境中是不可接受的。协调器的状态必须被持久化,例如写入 Redis 或数据库。同时,为了高可用,协调器服务本身需要部署多个实例。这引入了新的问题:当一个 gRPC 流连接到实例A,而 Saga 的状态更新事件由实例B处理时,如何将更新推送给实例A上的流?这通常需要一个 Pub/Sub 系统(如 Redis Pub/Sub, Kafka)来在实例间广播状态更新。客户端重连与状态恢复: 当前实现没有处理网络断开重连的场景。一个健壮的客户端应该在重连后,能够向服务端请求自上次断开以来的所有状态变更,或者直接请求所有步骤的当前全量状态,然后再继续监听流。这需要在 gRPC 服务中增加一个新的 RPC 方法,如
GetSagaCurrentState(transactionId)
。gRPC-Web 协议限制: gRPC-Web 目前不支持客户端流和双向流,仅支持一元调用和服务器流。对于当前“服务器推送状态”的场景来说这已足够,但如果未来有需要客户端持续向服务器推流的场景,则需要评估其他技术或等待 gRPC-Web 的发展。