一个服务的性能瓶颈最初显现时,往往是在数据库层面。我们的一个核心元数据服务,其 Couchbase 集群的主节点(Write Master)CPU 负载开始频繁告警。分析流量后发现,超过85%的操作是读请求,但它们全部被路由到了负责写入的主节点。这不仅浪费了集群中其他只读副本(Read Replicas)的计算资源,也让写入操作的延迟受到了严重影响。技术债已经累积,必须偿还。
初步的构想是实现读写分离。这在理论上很简单:写操作发往主节点,读操作分发到多个只读副本。但在实践中,这立刻引出了一个棘手的问题:如何让业务代码无感?如果在每个服务函数里都手动判断该用主节点连接还是副本连接,代码会变得极其丑陋且难以维护。一个常见的错误是让业务逻辑去关心基础设施的细节,这违反了关注点分离原则。
我们的技术栈是基于 tRPC 和 TypeScript 的。tRPC 的中间件(middleware)机制似乎是解决这个问题的完美切入点。我们可以在中间件中拦截所有请求,根据请求类型(query 或 mutation)来决定使用哪个数据库连接,从而对上层业务代码完全透明。
技术选型决策如下:
- 数据库: 继续使用 Couchbase。它的 N1QL 查询和内置的跨数据中心复制(XDCR)能力为读写分离提供了基础。
- RPC 框架: tRPC。它的类型安全和中间件架构是实现透明化的关键。
- 核心策略: 利用 tRPC 的 procedure 类型(
query
vsmutation
)作为天然的读写标识。所有query
默认路由到只读副本,所有mutation
路由到主节点。
然而,一个更深层次的挑战随之而来:数据同步延迟。Couchbase 的副本同步是异步的。如果一个用户更新了个人资料(写操作),然后立即刷新页面请求数据(读操作),读请求可能会被路由到一个尚未同步最新数据的副本,导致用户看到的是旧数据。这就是典型的“读己之写”(Read-Your-Writes)一致性问题。任何一个生产级的读写分离方案,都必须解决这个问题。
第一步:构建一个健壮的数据库连接管理器
在深入中间件之前,我们需要一个能够同时管理主节点连接和多个只读副本连接的模块。这个管理器需要负责初始化连接、提供获取连接的方法,以及在应用关闭时优雅地断开所有连接。
在真实项目中,配置不能硬编码。我们会从环境变量加载,并提供默认值。
// src/db/couchbaseManager.ts
import { Cluster, connect, IBucket, ICluster } from 'couchbase';
import { env } from '../env'; // 使用 Zod 或类似工具管理环境变量
interface CouchbaseConnections {
writeCluster: ICluster;
writeBucket: IBucket;
readClusters: ICluster[];
readBuckets: IBucket[];
}
class CouchbaseConnectionManager {
private static instance: CouchbaseConnectionManager;
private connections: CouchbaseConnections | null = null;
private connectionPromise: Promise<void> | null = null;
private roundRobinIndex = 0;
private constructor() {}
public static getInstance(): CouchbaseConnectionManager {
if (!CouchbaseConnectionManager.instance) {
CouchbaseConnectionManager.instance = new CouchbaseConnectionManager();
}
return CouchbaseConnectionManager.instance;
}
/**
* 初始化所有数据库连接。
* 这里的关键是确保该方法是幂等的,并且能处理并发调用。
*/
public async initialize(): Promise<void> {
if (this.connections) {
console.log('Couchbase connections already initialized.');
return;
}
if (this.connectionPromise) {
return this.connectionPromise;
}
this.connectionPromise = (async () => {
try {
console.log('Initializing Couchbase connections...');
const writeCluster = await connect(env.COUCHBASE_WRITE_HOST, {
username: env.COUCHBASE_USERNAME,
password: env.COUCHBASE_PASSWORD,
});
const writeBucket = writeCluster.bucket(env.COUCHBASE_BUCKET_NAME);
await writeBucket.waitUntilReady(5000); // 生产环境中必须设置超时
const readClusters: ICluster[] = [];
const readBuckets: IBucket[] = [];
// 支持配置多个只读副本地址
for (const host of env.COUCHBASE_READ_HOSTS) {
const readCluster = await connect(host, {
username: env.COUCHBASE_USERNAME,
password: env.COUCHBASE_PASSWORD,
});
const readBucket = readCluster.bucket(env.COUCHBASE_BUCKET_NAME);
await readBucket.waitUntilReady(5000);
readClusters.push(readCluster);
readBuckets.push(readBucket);
}
if (readBuckets.length === 0) {
console.warn('No read replicas configured. All traffic will go to the write master.');
}
this.connections = {
writeCluster,
writeBucket,
readClusters,
readBuckets,
};
console.log('Couchbase connections established successfully.');
} catch (error) {
console.error('Failed to initialize Couchbase connections:', error);
// 在启动阶段连接失败是致命的,应该直接退出进程
process.exit(1);
} finally {
this.connectionPromise = null;
}
})();
return this.connectionPromise;
}
public getWriteBucket(): IBucket {
if (!this.connections) {
throw new Error('Couchbase Connection Manager not initialized. Call initialize() first.');
}
return this.connections.writeBucket;
}
/**
* 获取一个只读 Bucket。如果没有配置只读副本,则降级到主节点。
* 这里使用简单的轮询(Round Robin)策略来分发读请求。
*/
public getReadBucket(): IBucket {
if (!this.connections) {
throw new Error('Couchbase Connection Manager not initialized.');
}
if (this.connections.readBuckets.length === 0) {
// 降级策略:如果没有可用的只读副本,读请求也走主节点
// 这是保证服务可用的重要保障
return this.connections.writeBucket;
}
const bucket = this.connections.readBuckets[this.roundRobinIndex];
this.roundRobinIndex = (this.roundRobinIndex + 1) % this.connections.readBuckets.length;
return bucket;
}
public async close(): Promise<void> {
if (!this.connections) {
return;
}
console.log('Closing Couchbase connections...');
await this.connections.writeCluster.close();
for (const cluster of this.connections.readClusters) {
await cluster.close();
}
this.connections = null;
console.log('Couchbase connections closed.');
}
}
export const couchbaseManager = CouchbaseConnectionManager.getInstance();
这个管理器提供了单例模式,确保整个应用共享同一组连接。它支持配置多个只读副本,并实现了简单的轮询负载均衡。最重要的是,它包含了降级策略:如果没有只读副本可用,所有读请求将安全地回退到主节点,保证了系统的韧性。
第二步:实现 tRPC 中间件与会话一致性
现在是核心部分。我们将创建一个 tRPC 中间件,它会为每个请求的上下文(ctx
)注入一个数据库句柄 db
。这个 db
句柄会根据我们定义的规则指向主节点或只读副本的 bucket。
为了解决“读己之写”问题,我们引入一种“会话粘滞”(Session Stickiness)机制。当一个用户执行了写操作(mutation
),我们会在此次会话的上下文中设置一个标记,并附带一个短暂的 TTL(例如,5秒)。在接下来的5秒内,该用户的所有读操作(query
)都将被强制路由到主节点,确保能读到最新的数据。过了这个窗口期,读请求将再次被路由到只读副本。
AsyncLocalStorage
是在 Node.js 中实现这种请求级别上下文传递的理想工具。
// src/server/context.ts
import { IBucket } from 'couchbase';
import { AsyncLocalStorage } from 'async_hooks';
// 这个存储将贯穿整个请求生命周期,用于传递会话状态
export const sessionStateStorage = new AsyncLocalStorage<{ forceMaster: boolean; expiresAt: number }>();
export interface AppContext {
db: IBucket;
// 可以添加其他上下文信息,如用户信息
// user?: { id: string };
}
export const createContext = (): AppContext => {
// 在创建上下文时,我们还不知道应该用哪个连接
// 所以暂时提供一个会抛出错误的占位符
return {
db: new Proxy({}, {
get() {
throw new Error('DB handle not available yet. It should be injected by middleware.');
}
}) as IBucket,
};
};
接下来是中间件的实现。
// src/server/trpc.ts
import { initTRPC } from '@trpc/server';
import { AppContext, sessionStateStorage } from './context';
import { couchbaseManager } from '../db/couchbaseManager';
import { TRPCError } from '@trpc/server';
const t = initTRPC.context<AppContext>().create();
export const router = t.router;
export const publicProcedure = t.procedure;
const dbMiddleware = t.middleware(async ({ ctx, type, next }) => {
const sessionState = sessionStateStorage.getStore();
let useMaster = false;
// 规则 1: 如果是 mutation,必须使用主节点
if (type === 'mutation') {
useMaster = true;
}
// 规则 2: 如果会话状态标记为 "forceMaster" 且未过期,则使用主节点
if (sessionState && sessionState.forceMaster && sessionState.expiresAt > Date.now()) {
console.log(`Session stickiness activated. Forcing master read.`);
useMaster = true;
}
const bucket = useMaster
? couchbaseManager.getWriteBucket()
: couchbaseManager.getReadBucket();
// 为当前请求注入正确的数据库句柄
const newCtx = { ...ctx, db: bucket };
const result = await next({ ctx: newCtx });
// 规则 3: 如果是成功的 mutation,则为当前会话设置 "forceMaster" 标记
// 这里的坑在于:必须在 next() 调用之后设置,确保操作成功
if (type === 'mutation' && result.ok) {
const currentState = sessionStateStorage.getStore();
const expiresAt = Date.now() + 5000; // 5秒的粘滞窗口
// 如果已存在状态,则更新;否则创建一个新的
if (currentState) {
currentState.forceMaster = true;
currentState.expiresAt = expiresAt;
} else {
// 这种情况理论上不应该发生,因为我们会在服务器入口处设置
// 但作为防御性编程,我们还是处理一下
sessionStateStorage.enterWith({ forceMaster: true, expiresAt });
}
console.log(`Mutation successful. Set session stickiness until ${new Date(expiresAt).toISOString()}`);
}
return result;
});
// 应用了中间件的 procedure
export const protectedProcedure = publicProcedure.use(dbMiddleware);
这个中间件的逻辑非常清晰:
- 检查请求类型,
mutation
强制走主节点。 - 检查
AsyncLocalStorage
中是否存在有效的粘滞会话标记,如果存在,query
也强制走主节点。 - 根据判断结果,从
couchbaseManager
获取对应的 bucket 实例,并注入到ctx.db
中。 - 执行后续的 procedure (
next()
)。 - 如果执行的是一个成功的
mutation
,则在AsyncLocalStorage
中设置或更新粘滞标记。
为了让 AsyncLocalStorage
生效,我们需要在服务器的入口处对每个请求进行包裹。
// src/server/index.ts
// ... (express, tRPC server setup) ...
// 在 Express 或其他 HTTP 框架的中间件中包裹请求
app.use((req, res, next) => {
sessionStateStorage.run({ forceMaster: false, expiresAt: 0 }, next);
});
app.use(
'/trpc',
trpcExpress.createExpressMiddleware({
router: appRouter,
createContext,
}),
);
// ...
架构与流程的可视化
使用 Mermaid 可以清晰地展示请求的流转路径。
sequenceDiagram participant Client participant tRPC_Server participant SessionStorage participant Middleware participant CouchbaseManager participant CouchbaseMaster participant CouchbaseReplica Client->>tRPC_Server: 发起 user.update (mutation) tRPC_Server->>SessionStorage: 初始化请求上下文 (forceMaster=false) tRPC_Server->>Middleware: 调用中间件,type='mutation' Middleware->>CouchbaseManager: getWriteBucket() CouchbaseManager-->>Middleware: 返回 Master Bucket 实例 Middleware->>tRPC_Server: next() with master bucket in ctx tRPC_Server->>CouchbaseMaster: 执行写入操作 CouchbaseMaster-->>tRPC_Server: 写入成功 tRPC_Server->>Middleware: 操作成功,返回结果 Middleware->>SessionStorage: 设置 { forceMaster: true, expiresAt: T+5s } Middleware-->>Client: 返回成功响应 %% 5秒内 Client->>tRPC_Server: 发起 user.get (query) tRPC_Server->>SessionStorage: 初始化请求上下文 (forceMaster=false) tRPC_Server->>Middleware: 调用中间件,type='query' Middleware->>SessionStorage: 读取会话状态 { forceMaster: true, ... } Middleware-->>Middleware: 判断 forceMaster 为 true 且未过期 Middleware->>CouchbaseManager: getWriteBucket() CouchbaseManager-->>Middleware: 返回 Master Bucket 实例 Middleware->>tRPC_Server: next() with master bucket in ctx tRPC_Server->>CouchbaseMaster: 执行读取操作 (保证读到最新数据) CouchbaseMaster-->>tRPC_Server: 返回最新数据 tRPC_Server-->>Client: 返回成功响应
整合到 tRPC Router
现在,我们只需要在定义我们的 tRPC router 时使用 protectedProcedure
即可。业务代码的开发者完全不需要关心底层是哪个数据库连接。
// src/routers/user.router.ts
import { router, protectedProcedure } from '../server/trpc';
import { z } from 'zod';
export const userRouter = router({
get: protectedProcedure
.input(z.object({ id: z.string() }))
.query(async ({ ctx, input }) => {
// ctx.db 在这里可能是主节点也可能是只读副本
// 但业务代码无需关心
const { db } = ctx;
try {
const result = await db.collection('users').get(input.id);
return result.content;
} catch (error) {
// 生产级的错误处理
console.error(`Failed to get user ${input.id}`, error);
throw new TRPCError({ code: 'NOT_FOUND', message: 'User not found.' });
}
}),
update: protectedProcedure
.input(z.object({ id: z.string(), name: z.string() }))
.mutation(async ({ ctx, input }) => {
// ctx.db 在这里一定是主节点
const { db } = ctx;
try {
await db.collection('users').upsert(input.id, { name: input.name });
return { success: true };
} catch (error) {
console.error(`Failed to update user ${input.id}`, error);
throw new TRPCError({ code: 'INTERNAL_SERVER_ERROR', message: 'Failed to update user.' });
}
}),
});
// src/routers/_app.ts
import { router } from '../server/trpc';
import { userRouter } from './user.router';
export const appRouter = router({
user: userRouter,
});
export type AppRouter = typeof appRouter;
如上所示,user.get
和 user.update
的实现非常纯粹,只关注业务逻辑。ctx.db
是由中间件动态注入的,业务代码层面实现了完全的透明化。
方案的局限性与未来展望
这套方案并非银弹。它的一个核心假设是基于 tRPC 的 query
/mutation
命名约定来区分读写。如果某个 query
内部实际上有写入数据库的副作用(这本身是反模式的),该方案会错误地将其路由到只读副本,导致失败。这要求团队有良好的编码规范。
“会话粘滞”的5秒窗口期是一个经验值,需要根据实际的数据库主从同步延迟进行调整。如果延迟很高,这个窗口就需要加长,但也会导致更多的读请求落到主节点上,削弱了读写分离的效果。这是一个需要权衡的参数。
此外,本方案没有处理复杂的跨多个操作的事务场景。如果一个业务流程需要先写,然后进行多次读取,并根据读取结果再写,那么这个5秒的窗口可能不足以覆盖整个流程。更复杂的场景可能需要显式地在业务逻辑中开启一个“主节点会话”,但这又会破坏透明性。
未来的一个优化方向是,可以探索一种更智能的路由策略。例如,不只是简单地设置一个时间窗口,而是可以根据具体写入的文档 key 来进行粘滞。当后续有针对同一个 key 的读请求时,才强制路由到主节点,这样可以更精确地控制路由,进一步提升只读副本的利用率。