在 tRPC 中间件层实现 Couchbase 的透明读写分离与会话一致性保障


一个服务的性能瓶颈最初显现时,往往是在数据库层面。我们的一个核心元数据服务,其 Couchbase 集群的主节点(Write Master)CPU 负载开始频繁告警。分析流量后发现,超过85%的操作是读请求,但它们全部被路由到了负责写入的主节点。这不仅浪费了集群中其他只读副本(Read Replicas)的计算资源,也让写入操作的延迟受到了严重影响。技术债已经累积,必须偿还。

初步的构想是实现读写分离。这在理论上很简单:写操作发往主节点,读操作分发到多个只读副本。但在实践中,这立刻引出了一个棘手的问题:如何让业务代码无感?如果在每个服务函数里都手动判断该用主节点连接还是副本连接,代码会变得极其丑陋且难以维护。一个常见的错误是让业务逻辑去关心基础设施的细节,这违反了关注点分离原则。

我们的技术栈是基于 tRPC 和 TypeScript 的。tRPC 的中间件(middleware)机制似乎是解决这个问题的完美切入点。我们可以在中间件中拦截所有请求,根据请求类型(query 或 mutation)来决定使用哪个数据库连接,从而对上层业务代码完全透明。

技术选型决策如下:

  1. 数据库: 继续使用 Couchbase。它的 N1QL 查询和内置的跨数据中心复制(XDCR)能力为读写分离提供了基础。
  2. RPC 框架: tRPC。它的类型安全和中间件架构是实现透明化的关键。
  3. 核心策略: 利用 tRPC 的 procedure 类型(query vs mutation)作为天然的读写标识。所有 query 默认路由到只读副本,所有 mutation 路由到主节点。

然而,一个更深层次的挑战随之而来:数据同步延迟。Couchbase 的副本同步是异步的。如果一个用户更新了个人资料(写操作),然后立即刷新页面请求数据(读操作),读请求可能会被路由到一个尚未同步最新数据的副本,导致用户看到的是旧数据。这就是典型的“读己之写”(Read-Your-Writes)一致性问题。任何一个生产级的读写分离方案,都必须解决这个问题。

第一步:构建一个健壮的数据库连接管理器

在深入中间件之前,我们需要一个能够同时管理主节点连接和多个只读副本连接的模块。这个管理器需要负责初始化连接、提供获取连接的方法,以及在应用关闭时优雅地断开所有连接。

在真实项目中,配置不能硬编码。我们会从环境变量加载,并提供默认值。

// src/db/couchbaseManager.ts
import { Cluster, connect, IBucket, ICluster } from 'couchbase';
import { env } from '../env'; // 使用 Zod 或类似工具管理环境变量

interface CouchbaseConnections {
  writeCluster: ICluster;
  writeBucket: IBucket;
  readClusters: ICluster[];
  readBuckets: IBucket[];
}

class CouchbaseConnectionManager {
  private static instance: CouchbaseConnectionManager;
  private connections: CouchbaseConnections | null = null;
  private connectionPromise: Promise<void> | null = null;
  private roundRobinIndex = 0;

  private constructor() {}

  public static getInstance(): CouchbaseConnectionManager {
    if (!CouchbaseConnectionManager.instance) {
      CouchbaseConnectionManager.instance = new CouchbaseConnectionManager();
    }
    return CouchbaseConnectionManager.instance;
  }

  /**
   * 初始化所有数据库连接。
   * 这里的关键是确保该方法是幂等的,并且能处理并发调用。
   */
  public async initialize(): Promise<void> {
    if (this.connections) {
      console.log('Couchbase connections already initialized.');
      return;
    }

    if (this.connectionPromise) {
      return this.connectionPromise;
    }

    this.connectionPromise = (async () => {
      try {
        console.log('Initializing Couchbase connections...');
        const writeCluster = await connect(env.COUCHBASE_WRITE_HOST, {
          username: env.COUCHBASE_USERNAME,
          password: env.COUCHBASE_PASSWORD,
        });
        const writeBucket = writeCluster.bucket(env.COUCHBASE_BUCKET_NAME);
        await writeBucket.waitUntilReady(5000); // 生产环境中必须设置超时

        const readClusters: ICluster[] = [];
        const readBuckets: IBucket[] = [];

        // 支持配置多个只读副本地址
        for (const host of env.COUCHBASE_READ_HOSTS) {
          const readCluster = await connect(host, {
            username: env.COUCHBASE_USERNAME,
            password: env.COUCHBASE_PASSWORD,
          });
          const readBucket = readCluster.bucket(env.COUCHBASE_BUCKET_NAME);
          await readBucket.waitUntilReady(5000);
          readClusters.push(readCluster);
          readBuckets.push(readBucket);
        }

        if (readBuckets.length === 0) {
            console.warn('No read replicas configured. All traffic will go to the write master.');
        }

        this.connections = {
          writeCluster,
          writeBucket,
          readClusters,
          readBuckets,
        };

        console.log('Couchbase connections established successfully.');
      } catch (error) {
        console.error('Failed to initialize Couchbase connections:', error);
        // 在启动阶段连接失败是致命的,应该直接退出进程
        process.exit(1);
      } finally {
        this.connectionPromise = null;
      }
    })();

    return this.connectionPromise;
  }

  public getWriteBucket(): IBucket {
    if (!this.connections) {
      throw new Error('Couchbase Connection Manager not initialized. Call initialize() first.');
    }
    return this.connections.writeBucket;
  }

  /**
   * 获取一个只读 Bucket。如果没有配置只读副本,则降级到主节点。
   * 这里使用简单的轮询(Round Robin)策略来分发读请求。
   */
  public getReadBucket(): IBucket {
    if (!this.connections) {
      throw new Error('Couchbase Connection Manager not initialized.');
    }

    if (this.connections.readBuckets.length === 0) {
      // 降级策略:如果没有可用的只读副本,读请求也走主节点
      // 这是保证服务可用的重要保障
      return this.connections.writeBucket;
    }

    const bucket = this.connections.readBuckets[this.roundRobinIndex];
    this.roundRobinIndex = (this.roundRobinIndex + 1) % this.connections.readBuckets.length;
    return bucket;
  }

  public async close(): Promise<void> {
    if (!this.connections) {
      return;
    }
    console.log('Closing Couchbase connections...');
    await this.connections.writeCluster.close();
    for (const cluster of this.connections.readClusters) {
      await cluster.close();
    }
    this.connections = null;
    console.log('Couchbase connections closed.');
  }
}

export const couchbaseManager = CouchbaseConnectionManager.getInstance();

这个管理器提供了单例模式,确保整个应用共享同一组连接。它支持配置多个只读副本,并实现了简单的轮询负载均衡。最重要的是,它包含了降级策略:如果没有只读副本可用,所有读请求将安全地回退到主节点,保证了系统的韧性。

第二步:实现 tRPC 中间件与会话一致性

现在是核心部分。我们将创建一个 tRPC 中间件,它会为每个请求的上下文(ctx)注入一个数据库句柄 db。这个 db 句柄会根据我们定义的规则指向主节点或只读副本的 bucket。

为了解决“读己之写”问题,我们引入一种“会话粘滞”(Session Stickiness)机制。当一个用户执行了写操作(mutation),我们会在此次会话的上下文中设置一个标记,并附带一个短暂的 TTL(例如,5秒)。在接下来的5秒内,该用户的所有读操作(query)都将被强制路由到主节点,确保能读到最新的数据。过了这个窗口期,读请求将再次被路由到只读副本。

AsyncLocalStorage 是在 Node.js 中实现这种请求级别上下文传递的理想工具。

// src/server/context.ts
import { IBucket } from 'couchbase';
import { AsyncLocalStorage } from 'async_hooks';

// 这个存储将贯穿整个请求生命周期,用于传递会话状态
export const sessionStateStorage = new AsyncLocalStorage<{ forceMaster: boolean; expiresAt: number }>();

export interface AppContext {
  db: IBucket;
  // 可以添加其他上下文信息,如用户信息
  // user?: { id: string };
}

export const createContext = (): AppContext => {
  // 在创建上下文时,我们还不知道应该用哪个连接
  // 所以暂时提供一个会抛出错误的占位符
  return {
    db: new Proxy({}, {
      get() {
        throw new Error('DB handle not available yet. It should be injected by middleware.');
      }
    }) as IBucket,
  };
};

接下来是中间件的实现。

// src/server/trpc.ts
import { initTRPC } from '@trpc/server';
import { AppContext, sessionStateStorage } from './context';
import { couchbaseManager } from '../db/couchbaseManager';
import { TRPCError } from '@trpc/server';

const t = initTRPC.context<AppContext>().create();

export const router = t.router;
export const publicProcedure = t.procedure;

const dbMiddleware = t.middleware(async ({ ctx, type, next }) => {
  const sessionState = sessionStateStorage.getStore();
  let useMaster = false;

  // 规则 1: 如果是 mutation,必须使用主节点
  if (type === 'mutation') {
    useMaster = true;
  }

  // 规则 2: 如果会话状态标记为 "forceMaster" 且未过期,则使用主节点
  if (sessionState && sessionState.forceMaster && sessionState.expiresAt > Date.now()) {
    console.log(`Session stickiness activated. Forcing master read.`);
    useMaster = true;
  }

  const bucket = useMaster
    ? couchbaseManager.getWriteBucket()
    : couchbaseManager.getReadBucket();

  // 为当前请求注入正确的数据库句柄
  const newCtx = { ...ctx, db: bucket };

  const result = await next({ ctx: newCtx });

  // 规则 3: 如果是成功的 mutation,则为当前会话设置 "forceMaster" 标记
  // 这里的坑在于:必须在 next() 调用之后设置,确保操作成功
  if (type === 'mutation' && result.ok) {
    const currentState = sessionStateStorage.getStore();
    const expiresAt = Date.now() + 5000; // 5秒的粘滞窗口

    // 如果已存在状态,则更新;否则创建一个新的
    if (currentState) {
      currentState.forceMaster = true;
      currentState.expiresAt = expiresAt;
    } else {
        // 这种情况理论上不应该发生,因为我们会在服务器入口处设置
        // 但作为防御性编程,我们还是处理一下
        sessionStateStorage.enterWith({ forceMaster: true, expiresAt });
    }
    console.log(`Mutation successful. Set session stickiness until ${new Date(expiresAt).toISOString()}`);
  }

  return result;
});

// 应用了中间件的 procedure
export const protectedProcedure = publicProcedure.use(dbMiddleware);

这个中间件的逻辑非常清晰:

  1. 检查请求类型,mutation 强制走主节点。
  2. 检查 AsyncLocalStorage 中是否存在有效的粘滞会话标记,如果存在,query 也强制走主节点。
  3. 根据判断结果,从 couchbaseManager 获取对应的 bucket 实例,并注入到 ctx.db 中。
  4. 执行后续的 procedure (next())。
  5. 如果执行的是一个成功的 mutation,则在 AsyncLocalStorage 中设置或更新粘滞标记。

为了让 AsyncLocalStorage 生效,我们需要在服务器的入口处对每个请求进行包裹。

// src/server/index.ts
// ... (express, tRPC server setup) ...

// 在 Express 或其他 HTTP 框架的中间件中包裹请求
app.use((req, res, next) => {
  sessionStateStorage.run({ forceMaster: false, expiresAt: 0 }, next);
});

app.use(
  '/trpc',
  trpcExpress.createExpressMiddleware({
    router: appRouter,
    createContext,
  }),
);

// ...

架构与流程的可视化

使用 Mermaid 可以清晰地展示请求的流转路径。

sequenceDiagram
    participant Client
    participant tRPC_Server
    participant SessionStorage
    participant Middleware
    participant CouchbaseManager
    participant CouchbaseMaster
    participant CouchbaseReplica

    Client->>tRPC_Server: 发起 user.update (mutation)
    tRPC_Server->>SessionStorage: 初始化请求上下文 (forceMaster=false)
    tRPC_Server->>Middleware: 调用中间件,type='mutation'
    Middleware->>CouchbaseManager: getWriteBucket()
    CouchbaseManager-->>Middleware: 返回 Master Bucket 实例
    Middleware->>tRPC_Server: next() with master bucket in ctx
    tRPC_Server->>CouchbaseMaster: 执行写入操作
    CouchbaseMaster-->>tRPC_Server: 写入成功
    tRPC_Server->>Middleware: 操作成功,返回结果
    Middleware->>SessionStorage: 设置 { forceMaster: true, expiresAt: T+5s }
    Middleware-->>Client: 返回成功响应

    %% 5秒内
    Client->>tRPC_Server: 发起 user.get (query)
    tRPC_Server->>SessionStorage: 初始化请求上下文 (forceMaster=false)
    tRPC_Server->>Middleware: 调用中间件,type='query'
    Middleware->>SessionStorage: 读取会话状态 { forceMaster: true, ... }
    Middleware-->>Middleware: 判断 forceMaster 为 true 且未过期
    Middleware->>CouchbaseManager: getWriteBucket()
    CouchbaseManager-->>Middleware: 返回 Master Bucket 实例
    Middleware->>tRPC_Server: next() with master bucket in ctx
    tRPC_Server->>CouchbaseMaster: 执行读取操作 (保证读到最新数据)
    CouchbaseMaster-->>tRPC_Server: 返回最新数据
    tRPC_Server-->>Client: 返回成功响应

整合到 tRPC Router

现在,我们只需要在定义我们的 tRPC router 时使用 protectedProcedure 即可。业务代码的开发者完全不需要关心底层是哪个数据库连接。

// src/routers/user.router.ts
import { router, protectedProcedure } from '../server/trpc';
import { z } from 'zod';

export const userRouter = router({
  get: protectedProcedure
    .input(z.object({ id: z.string() }))
    .query(async ({ ctx, input }) => {
      // ctx.db 在这里可能是主节点也可能是只读副本
      // 但业务代码无需关心
      const { db } = ctx;
      try {
        const result = await db.collection('users').get(input.id);
        return result.content;
      } catch (error) {
        // 生产级的错误处理
        console.error(`Failed to get user ${input.id}`, error);
        throw new TRPCError({ code: 'NOT_FOUND', message: 'User not found.' });
      }
    }),

  update: protectedProcedure
    .input(z.object({ id: z.string(), name: z.string() }))
    .mutation(async ({ ctx, input }) => {
      // ctx.db 在这里一定是主节点
      const { db } = ctx;
      try {
        await db.collection('users').upsert(input.id, { name: input.name });
        return { success: true };
      } catch (error) {
        console.error(`Failed to update user ${input.id}`, error);
        throw new TRPCError({ code: 'INTERNAL_SERVER_ERROR', message: 'Failed to update user.' });
      }
    }),
});

// src/routers/_app.ts
import { router } from '../server/trpc';
import { userRouter } from './user.router';

export const appRouter = router({
  user: userRouter,
});

export type AppRouter = typeof appRouter;

如上所示,user.getuser.update 的实现非常纯粹,只关注业务逻辑。ctx.db 是由中间件动态注入的,业务代码层面实现了完全的透明化。

方案的局限性与未来展望

这套方案并非银弹。它的一个核心假设是基于 tRPC 的 query/mutation 命名约定来区分读写。如果某个 query 内部实际上有写入数据库的副作用(这本身是反模式的),该方案会错误地将其路由到只读副本,导致失败。这要求团队有良好的编码规范。

“会话粘滞”的5秒窗口期是一个经验值,需要根据实际的数据库主从同步延迟进行调整。如果延迟很高,这个窗口就需要加长,但也会导致更多的读请求落到主节点上,削弱了读写分离的效果。这是一个需要权衡的参数。

此外,本方案没有处理复杂的跨多个操作的事务场景。如果一个业务流程需要先写,然后进行多次读取,并根据读取结果再写,那么这个5秒的窗口可能不足以覆盖整个流程。更复杂的场景可能需要显式地在业务逻辑中开启一个“主节点会话”,但这又会破坏透明性。

未来的一个优化方向是,可以探索一种更智能的路由策略。例如,不只是简单地设置一个时间窗口,而是可以根据具体写入的文档 key 来进行粘滞。当后续有针对同一个 key 的读请求时,才强制路由到主节点,这样可以更精确地控制路由,进一步提升只读副本的利用率。


  目录