我们面临一个棘手的工程问题:一个处理密集型计算任务的后端系统,其核心依赖于一个不稳定的第三方 API。这些任务通过 RabbitMQ 分发给一组 Node.js worker。当第三方 API 抖动或宕机时,我们的 worker 会持续重试,不仅耗尽自身资源,还可能因大量失败请求冲击本已脆弱的 API,引发雪崩效应。传统的日志监控和事后告警在这种场景下显得过于被动。我们需要一个能实时观察并介入的系统,一个能直观展示每个 worker 健康状况的“作战指挥室”。
最初的想法是暴露一个 RESTful API,前端定期轮询任务状态和 worker 健康度。但这很快被否决。轮询带来了不必要的延迟和服务器负载,我们无法在秒级内感知到故障。我们需要的是一个推送模型,一个从后端到前端的实时、单向数据流。这直接将我们的技术栈选择导向了 WebSocket。
整个系统的设计蓝图逐渐清晰:
- 高韧性 Worker: 每个任务 worker 必须内置熔断器(Circuit Breaker)模式。当对第三方 API 的调用失败率超过阈值时,熔断器打开,在一段时间内直接拒绝新任务,避免无效冲击。
- 事件驱动的 API: 放弃轮询。API 的核心职责是管理 WebSocket 连接,并将后端产生的状态事件实时推送到前端。这并非一个传统的 CRUD API,而是一个事件网关。
- 实时状态反馈循环: Worker 在执行任务的每个关键阶段(接收、开始、成功、失败)以及熔断器状态变化时,都必须向 RabbitMQ 的特定交换机发布一个状态事件。
- 解耦的通知服务: 一个独立的“通知服务”订阅这些状态事件,并负责将它们通过 WebSocket 推送给已连接的前端客户端。这使得 worker 无需关心任何与客户端通信的逻辑。
- 响应式前端: 前端需要一个能高效处理高频数据流的状态管理库。MobX 的响应式模型与这种场景天然契合。结合 Vue.js,我们可以构建一个仅在数据实际变化时才精确更新的 UI。
下面是这个系统的核心架构图。
graph TD subgraph Browser A[Vue.js/MobX UI] end subgraph API Gateway B[WebSocket Server] end subgraph Backend Infrastructure C[RabbitMQ] D[Task Worker 1] E[Task Worker 2] F[Notifier Service] end subgraph External G[Unstable 3rd-Party API] end A -- WebSocket Connection --> B B -- Pushes Status Updates --> A B -- Submits New Task (HTTP/RPC) --> C[tasks_exchange] C -- tasks_queue --> D C -- tasks_queue --> E D -- Calls API --> G E -- Calls API --> G D -- Publishes Event --> C[status_fanout_exchange] E -- Publishes Event --> C[status_fanout_exchange] C -- status_events_queue --> F F -- Consumes Events --> B
这个架构的核心在于通过 RabbitMQ 实现了彻底的解耦。Worker 只关心处理任务和报告状态,Notifier 只关心传递状态,前端只关心渲染状态。
第一步:RabbitMQ 拓扑与配置
一个健壮的 RabbitMQ 拓扑是系统的基石。我们需要两个主要的交换机:一个用于任务分发,一个用于状态广播。
-
tasks_exchange
(direct): 用于精确地将任务路由到tasks_queue
。 -
status_fanout_exchange
(fanout): 用于将状态更新广播给所有关心状态的消费者,这里主要是我们的 Notifier Service。
使用 amqplib
在 Node.js 中建立连接和声明拓扑的代码如下。在真实项目中,这些配置应全部来自环境变量。
// common/rabbit.js
const amqp = require('amqplib');
const RABBITMQ_URL = process.env.RABBITMQ_URL || 'amqp://guest:guest@localhost:5672';
const TASK_EXCHANGE = 'tasks_exchange';
const TASK_QUEUE = 'tasks_queue';
const STATUS_EXCHANGE = 'status_fanout_exchange';
let connection = null;
let channel = null;
async function connectRabbitMQ() {
if (channel) {
return { connection, channel };
}
try {
console.log('Connecting to RabbitMQ...');
connection = await amqp.connect(RABBITMQ_URL);
channel = await connection.createChannel();
// 健壮性保证:如果进程崩溃,连接关闭,但队列和交换机应保留
await channel.assertExchange(TASK_EXCHANGE, 'direct', { durable: true });
await channel.assertQueue(TASK_QUEUE, { durable: true });
await channel.bindQueue(TASK_QUEUE, TASK_EXCHANGE, 'task_routing_key');
await channel.assertExchange(STATUS_EXCHANGE, 'fanout', { durable: false });
console.log('RabbitMQ connected and topology asserted.');
connection.on('error', (err) => {
console.error('[AMQP] connection error', err.message);
// 实现重连逻辑
channel = null;
connection = null;
setTimeout(connectRabbitMQ, 5000);
});
connection.on('close', () => {
console.warn('[AMQP] connection closed. Reconnecting...');
channel = null;
connection = null;
setTimeout(connectRabbitMQ, 5000);
});
return { connection, channel };
} catch (error) {
console.error('Failed to connect to RabbitMQ:', error.message);
// 初始连接失败也需要重试
setTimeout(connectRabbitMQ, 5000);
throw error;
}
}
function getChannel() {
if (!channel) {
throw new Error('RabbitMQ channel is not available.');
}
return channel;
}
// 优雅关闭
process.on('SIGINT', async () => {
if (channel) await channel.close();
if (connection) await connection.close();
process.exit(0);
});
module.exports = { connectRabbitMQ, getChannel, TASK_EXCHANGE, STATUS_EXCHANGE, TASK_QUEUE };
这段代码不仅建立了连接,还包含了基本的错误处理和重连逻辑,这是生产级代码的最低要求。
第二步:实现高韧性 Worker 与熔断器
Worker 的核心是熔断器。一个简单的熔断器实现需要包含三种状态:CLOSED
, OPEN
, HALF_OPEN
。
- CLOSED: 正常状态,允许请求通过。记录失败次数。当失败次数达到阈值,切换到
OPEN
状态。 - OPEN: 熔断状态,直接拒绝所有请求。一段时间后(
resetTimeout
),切换到HALF_OPEN
状态。 - HALF_OPEN: 试探状态,允许单个请求通过。如果成功,切换回
CLOSED
;如果失败,立即切换回OPEN
。
// worker/circuitBreaker.js
const STATE = {
CLOSED: 'CLOSED',
OPEN: 'OPEN',
HALF_OPEN: 'HALF_OPEN'
};
class CircuitBreaker {
constructor(options = {}) {
this.failureThreshold = options.failureThreshold || 3;
this.successThreshold = options.successThreshold || 1; // For HALF_OPEN -> CLOSED
this.resetTimeout = options.resetTimeout || 10000; // 10 seconds
this.state = STATE.CLOSED;
this.failureCount = 0;
this.successCount = 0;
this.lastFailureTime = null;
this.timer = null;
// 回调,用于报告状态变化
this.onStateChange = options.onStateChange || (() => {});
}
_trip() {
if (this.state === STATE.OPEN) return;
this.state = STATE.OPEN;
this.lastFailureTime = Date.now();
this.onStateChange(this.state);
console.log(`[CircuitBreaker] Tripped to OPEN state.`);
this.timer = setTimeout(() => this._attemptReset(), this.resetTimeout);
}
_attemptReset() {
if (this.state !== STATE.OPEN) return;
this.state = STATE.HALF_OPEN;
this.onStateChange(this.state);
console.log(`[CircuitBreaker] Moving to HALF_OPEN state.`);
}
_reset() {
if (this.state === STATE.CLOSED) return;
this.state = STATE.CLOSED;
this.failureCount = 0;
this.successCount = 0;
this.onStateChange(this.state);
console.log(`[CircuitBreaker] Reset to CLOSED state.`);
if (this.timer) {
clearTimeout(this.timer);
this.timer = null;
}
}
async execute(action) {
if (this.state === STATE.OPEN) {
throw new Error('CircuitBreaker is open. Request rejected.');
}
try {
const result = await action();
this._onSuccess();
return result;
} catch (error) {
this._onFailure();
throw error;
}
}
_onSuccess() {
if (this.state === STATE.HALF_OPEN) {
this.successCount++;
if (this.successCount >= this.successThreshold) {
this._reset();
}
} else {
// 在 CLOSED 状态下成功,重置失败计数器
this.failureCount = 0;
}
}
_onFailure() {
this.failureCount++;
if (this.state === STATE.HALF_OPEN || this.failureCount >= this.failureThreshold) {
this._trip();
}
}
getState() {
return this.state;
}
}
module.exports = CircuitBreaker;
现在,我们将这个熔断器集成到 Worker 中。Worker 会消费 tasks_queue
里的消息,并用熔断器包装对第三方 API 的调用。
// worker/index.js
const { connectRabbitMQ, getChannel, TASK_QUEUE, STATUS_EXCHANGE } = require('../common/rabbit');
const CircuitBreaker = require('./circuitBreaker');
const { v4: uuidv4 } = require('uuid');
const workerId = `worker-${uuidv4()}`;
// 模拟不稳定的第三方API
async function unstableApiCall(payload) {
return new Promise((resolve, reject) => {
setTimeout(() => {
if (Math.random() > 0.6) {
resolve({ success: true, data: `Processed: ${payload.data}` });
} else {
reject(new Error('Third-party API failed.'));
}
}, Math.random() * 1000 + 500);
});
}
function publishStatus(channel, eventType, payload) {
const event = {
workerId,
eventType,
timestamp: new Date().toISOString(),
...payload
};
const message = Buffer.from(JSON.stringify(event));
// 发送到 fanout 交换机,不需要 routing key
channel.publish(STATUS_EXCHANGE, '', message);
}
async function main() {
await connectRabbitMQ();
const channel = getChannel();
const circuitBreaker = new CircuitBreaker({
failureThreshold: 5,
resetTimeout: 15000,
onStateChange: (newState) => {
publishStatus(channel, 'circuit_breaker_state_changed', {
state: newState
});
}
});
// 初始化时发布一次状态
publishStatus(channel, 'worker_started', {
state: circuitBreaker.getState()
});
console.log(`[${workerId}] Waiting for tasks in queue: ${TASK_QUEUE}`);
channel.consume(TASK_QUEUE, async (msg) => {
if (msg !== null) {
const task = JSON.parse(msg.content.toString());
console.log(`[${workerId}] Received task ${task.id}`);
publishStatus(channel, 'task_started', { taskId: task.id });
try {
// 使用熔断器执行任务
const result = await circuitBreaker.execute(() => unstableApiCall(task.payload));
console.log(`[${workerId}] Task ${task.id} succeeded.`);
publishStatus(channel, 'task_succeeded', { taskId: task.id, result });
channel.ack(msg);
} catch (error) {
console.error(`[${workerId}] Task ${task.id} failed: ${error.message}`);
publishStatus(channel, 'task_failed', { taskId: task.id, error: error.message, breakerState: circuitBreaker.getState() });
// 即使失败也要 ack,防止任务被无限重试,熔断器已经处理了失败逻辑
// 在真实场景中,可能会将失败的任务路由到死信队列
channel.ack(msg);
}
}
}, { noAck: false }); // 必须使用手动确认
}
main().catch(console.error);
这里的关键点是:publishStatus
函数。无论任务成功、失败,还是熔断器状态改变,Worker 都会将结构化的事件发布到 status_fanout_exchange
。这是前端实时性的数据来源。
第三步:Notifier Service 与 WebSocket API
Notifier Service 是连接后端逻辑和前端 UI 的桥梁。它非常简单,只做两件事:
- 从 RabbitMQ 消费状态事件。
- 将事件广播给所有连接的 WebSocket 客户端。
// notifier/index.js
const { WebSocketServer } = require('ws');
const { connectRabbitMQ, getChannel, STATUS_EXCHANGE } = require('../common/rabbit');
async function main() {
// 1. 设置 WebSocket 服务器
const wss = new WebSocketServer({ port: 8080 });
console.log('WebSocket server started on port 8080.');
wss.on('connection', ws => {
console.log('Client connected.');
ws.on('close', () => {
console.log('Client disconnected.');
});
ws.on('error', console.error);
});
function broadcast(data) {
wss.clients.forEach(client => {
if (client.readyState === client.OPEN) {
client.send(data);
}
});
}
// 2. 连接 RabbitMQ 并消费状态事件
await connectRabbitMQ();
const channel = getChannel();
const q = await channel.assertQueue('', { exclusive: true }); // 创建一个临时的、独占的队列
channel.bindQueue(q.queue, STATUS_EXCHANGE, '');
console.log('Waiting for status events...');
channel.consume(q.queue, (msg) => {
if (msg.content) {
const eventString = msg.content.toString();
console.log(`Received status event: ${eventString}`);
// 直接将原始事件广播给所有客户端
broadcast(eventString);
}
}, { noAck: true });
}
main().catch(console.error);
这种设计非常干净。API 设计的核心不在于 HTTP 端点,而在于这套基于 WebSocket 和 RabbitMQ 的事件流协议。
第四步:Vue + MobX 构建响应式前端
前端的核心是状态管理。我们需要存储和展示两类状态:Worker 的状态(主要是熔断器)和 Task 的状态。
首先,定义 MobX Stores。
// src/stores/WorkerStore.js
import { makeAutoObservable, observable } from 'mobx';
class WorkerStore {
workers = observable.map(); // 使用 map 更高效地按 ID 查找和更新
constructor() {
makeAutoObservable(this);
}
updateWorkerState(workerId, state, timestamp) {
if (!this.workers.has(workerId)) {
this.workers.set(workerId, { id: workerId, state: 'UNKNOWN', lastUpdate: '' });
}
const worker = this.workers.get(workerId);
worker.state = state;
worker.lastUpdate = timestamp;
}
// 将 map 转换为数组,方便在 Vue 中渲染
get workerList() {
return Array.from(this.workers.values());
}
}
export const workerStore = new WorkerStore();
// src/stores/TaskStore.js
import { makeAutoObservable, observable } from 'mobx';
class TaskStore {
tasks = observable.map();
MAX_TASKS = 100; // 防止内存无限增长
constructor() {
makeAutoObservable(this);
}
updateTask(taskData) {
const { taskId, eventType, timestamp } = taskData;
let task = this.tasks.get(taskId);
if (!task) {
task = { id: taskId, status: 'PENDING', history: [] };
this.tasks.set(taskId, task);
}
switch (eventType) {
case 'task_started':
task.status = 'RUNNING';
break;
case 'task_succeeded':
task.status = 'SUCCEEDED';
task.result = taskData.result;
break;
case 'task_failed':
task.status = 'FAILED';
task.error = taskData.error;
break;
}
task.history.push({ event: eventType, time: timestamp });
// 维护任务列表大小
if (this.tasks.size > this.MAX_TASKS) {
const oldestKey = this.tasks.keys().next().value;
this.tasks.delete(oldestKey);
}
}
get taskList() {
// 按时间倒序排列
return Array.from(this.tasks.values()).sort((a, b) =>
new Date(b.history[0].time) - new Date(a.history[0].time)
);
}
}
export const taskStore = new TaskStore();
接着是 WebSocket 服务,它负责接收数据并调用 Store 的 action。
// src/services/WebSocketService.js
import { workerStore } from '../stores/WorkerStore';
import { taskStore } from '../stores/TaskStore';
const WEBSOCKET_URL = 'ws://localhost:8080';
class WebSocketService {
ws = null;
connect() {
this.ws = new WebSocket(WEBSOCKET_URL);
this.ws.onopen = () => {
console.log('WebSocket connected.');
};
this.ws.onmessage = (event) => {
try {
const data = JSON.parse(event.data);
this.handleEvent(data);
} catch (error) {
console.error('Error parsing WebSocket message:', error);
}
};
this.ws.onclose = () => {
console.log('WebSocket disconnected. Reconnecting in 3 seconds...');
setTimeout(() => this.connect(), 3000);
};
this.ws.onerror = (error) => {
console.error('WebSocket error:', error);
this.ws.close();
};
}
handleEvent(event) {
const { eventType, workerId, timestamp } = event;
switch (eventType) {
case 'worker_started':
case 'circuit_breaker_state_changed':
workerStore.updateWorkerState(workerId, event.state, timestamp);
break;
case 'task_started':
case 'task_succeeded':
case 'task_failed':
taskStore.updateTask(event);
break;
default:
// 未知事件,可以加日志
break;
}
}
}
export const webSocketService = new WebSocketService();
最后,是 Vue 组件。使用 mobx-vue-lite
将组件变为 observer,这样它们就能自动响应 MobX 状态的变化。
<!-- src/components/Dashboard.vue -->
<template>
<div class="dashboard">
<div class="panel">
<h2>Worker Status</h2>
<div v-if="workerStore.workerList.length === 0" class="empty-state">
Waiting for workers...
</div>
<ul v-else>
<li v-for="worker in workerStore.workerList" :key="worker.id" class="worker-item">
<span class="worker-id">{{ worker.id }}</span>
<span :class="['status-badge', getStatusClass(worker.state)]">{{ worker.state }}</span>
</li>
</ul>
</div>
<div class="panel">
<h2>Task Stream</h2>
<div v-if="taskStore.taskList.length === 0" class="empty-state">
Waiting for tasks...
</div>
<ul v-else class="task-list">
<li v-for="task in taskStore.taskList" :key="task.id" :class="['task-item', task.status.toLowerCase()]">
<strong>Task ID:</strong> {{ task.id }} <br/>
<strong>Status:</strong> {{ task.status }}
<div v-if="task.error" class="error-details"><strong>Error:</strong> {{ task.error }}</div>
</li>
</ul>
</div>
</div>
</template>
<script>
import { observer } from 'mobx-vue-lite';
import { workerStore } from '../stores/WorkerStore';
import { taskStore } from '../stores/TaskStore';
export default observer({
name: 'Dashboard',
setup() {
return {
workerStore,
taskStore,
};
},
methods: {
getStatusClass(state) {
const statusMap = {
'CLOSED': 'status-closed',
'OPEN': 'status-open',
'HALF_OPEN': 'status-half-open'
};
return statusMap[state] || '';
}
},
});
</script>
<style scoped>
/* 一些基本的样式 */
.dashboard { display: flex; gap: 20px; font-family: sans-serif; }
.panel { flex: 1; border: 1px solid #ccc; padding: 15px; border-radius: 8px; }
.worker-item, .task-item { list-style: none; padding: 10px; margin-bottom: 5px; border-radius: 4px; }
.status-badge { padding: 3px 8px; border-radius: 12px; color: white; font-weight: bold; }
.status-closed { background-color: #28a745; }
.status-open { background-color: #dc3545; }
.status-half-open { background-color: #ffc107; color: black; }
.task-item.succeeded { background-color: #e9f7ec; }
.task-item.failed { background-color: #fbe9e9; }
.task-item.running { background-color: #e8f4fd; }
.error-details { color: #dc3545; font-size: 0.9em; margin-top: 5px; }
</style>
在应用的入口 (main.js
) 启动 WebSocket 连接:
// src/main.js
import { createApp } from 'vue';
import App from './App.vue';
import { webSocketService } from './services/WebSocketService';
webSocketService.connect();
createApp(App).mount('#app');
至此,整个闭环完成。当启动所有服务(RabbitMQ, Notifier, 多个 Worker 实例)后,前端界面会实时地、无需任何用户操作地展示出新启动的 worker,以及任务流的状态。如果模拟第三方 API 频繁失败,能清晰地看到 worker 的熔断器状态从 CLOSED
变为 OPEN
,任务从 RUNNING
变为 FAILED
,这一切都在毫秒间响应。
方案局限与未来迭代
这个方案虽然解决了实时性和解耦的问题,但在生产环境中仍有几个需要考虑的点:
- 熔断器状态持久化: 当前熔断器的状态是 worker 进程的内存状态。如果 worker 重启,状态会丢失。对于需要跨进程、跨实例共享熔断状态的场景,应将状态(如失败次数、状态位)存储在 Redis 等外部共享存储中。
- WebSocket 网关的扩展性: Notifier Service 是一个单点。在大量客户端连接的场景下,需要一个可水平扩展的 WebSocket 集群。这通常需要借助 Redis Pub/Sub 等机制在多个 WebSocket 服务实例之间广播消息。
- 消息的最终一致性: 从 Worker 发出状态到前端 UI 更新,整个链路是异步的。虽然速度很快,但并非强一致。对于需要事务性保证的场景,此架构不适用。
- 前端状态的精细化: 对于超高频的更新,可以引入
throttle
或debounce
机制来控制 UI 更新频率,或者使用transaction
来批量更新 MobX 状态,以避免不必要的渲染。
未来的一个演进方向是加入更丰富的可观测性。例如,在每个状态事件中都注入一个全局唯一的 traceId
,这样就可以将前端的一个任务条目与后端日志、指标完整地关联起来,实现端到端的链路追踪。