构建基于 RabbitMQ 的高韧性任务系统及其 Vue 与 MobX 实时状态可视化前端


我们面临一个棘手的工程问题:一个处理密集型计算任务的后端系统,其核心依赖于一个不稳定的第三方 API。这些任务通过 RabbitMQ 分发给一组 Node.js worker。当第三方 API 抖动或宕机时,我们的 worker 会持续重试,不仅耗尽自身资源,还可能因大量失败请求冲击本已脆弱的 API,引发雪崩效应。传统的日志监控和事后告警在这种场景下显得过于被动。我们需要一个能实时观察并介入的系统,一个能直观展示每个 worker 健康状况的“作战指挥室”。

最初的想法是暴露一个 RESTful API,前端定期轮询任务状态和 worker 健康度。但这很快被否决。轮询带来了不必要的延迟和服务器负载,我们无法在秒级内感知到故障。我们需要的是一个推送模型,一个从后端到前端的实时、单向数据流。这直接将我们的技术栈选择导向了 WebSocket。

整个系统的设计蓝图逐渐清晰:

  1. 高韧性 Worker: 每个任务 worker 必须内置熔断器(Circuit Breaker)模式。当对第三方 API 的调用失败率超过阈值时,熔断器打开,在一段时间内直接拒绝新任务,避免无效冲击。
  2. 事件驱动的 API: 放弃轮询。API 的核心职责是管理 WebSocket 连接,并将后端产生的状态事件实时推送到前端。这并非一个传统的 CRUD API,而是一个事件网关。
  3. 实时状态反馈循环: Worker 在执行任务的每个关键阶段(接收、开始、成功、失败)以及熔断器状态变化时,都必须向 RabbitMQ 的特定交换机发布一个状态事件。
  4. 解耦的通知服务: 一个独立的“通知服务”订阅这些状态事件,并负责将它们通过 WebSocket 推送给已连接的前端客户端。这使得 worker 无需关心任何与客户端通信的逻辑。
  5. 响应式前端: 前端需要一个能高效处理高频数据流的状态管理库。MobX 的响应式模型与这种场景天然契合。结合 Vue.js,我们可以构建一个仅在数据实际变化时才精确更新的 UI。

下面是这个系统的核心架构图。

graph TD
    subgraph Browser
        A[Vue.js/MobX UI]
    end

    subgraph API Gateway
        B[WebSocket Server]
    end

    subgraph Backend Infrastructure
        C[RabbitMQ]
        D[Task Worker 1]
        E[Task Worker 2]
        F[Notifier Service]
    end
    
    subgraph External
        G[Unstable 3rd-Party API]
    end

    A -- WebSocket Connection --> B
    B -- Pushes Status Updates --> A

    B -- Submits New Task (HTTP/RPC) --> C[tasks_exchange]
    C -- tasks_queue --> D
    C -- tasks_queue --> E

    D -- Calls API --> G
    E -- Calls API --> G

    D -- Publishes Event --> C[status_fanout_exchange]
    E -- Publishes Event --> C[status_fanout_exchange]

    C -- status_events_queue --> F
    F -- Consumes Events --> B

这个架构的核心在于通过 RabbitMQ 实现了彻底的解耦。Worker 只关心处理任务和报告状态,Notifier 只关心传递状态,前端只关心渲染状态。

第一步:RabbitMQ 拓扑与配置

一个健壮的 RabbitMQ 拓扑是系统的基石。我们需要两个主要的交换机:一个用于任务分发,一个用于状态广播。

  • tasks_exchange (direct): 用于精确地将任务路由到 tasks_queue
  • status_fanout_exchange (fanout): 用于将状态更新广播给所有关心状态的消费者,这里主要是我们的 Notifier Service。

使用 amqplib 在 Node.js 中建立连接和声明拓扑的代码如下。在真实项目中,这些配置应全部来自环境变量。

// common/rabbit.js
const amqp = require('amqplib');

const RABBITMQ_URL = process.env.RABBITMQ_URL || 'amqp://guest:guest@localhost:5672';
const TASK_EXCHANGE = 'tasks_exchange';
const TASK_QUEUE = 'tasks_queue';
const STATUS_EXCHANGE = 'status_fanout_exchange';

let connection = null;
let channel = null;

async function connectRabbitMQ() {
    if (channel) {
        return { connection, channel };
    }

    try {
        console.log('Connecting to RabbitMQ...');
        connection = await amqp.connect(RABBITMQ_URL);
        channel = await connection.createChannel();

        // 健壮性保证:如果进程崩溃,连接关闭,但队列和交换机应保留
        await channel.assertExchange(TASK_EXCHANGE, 'direct', { durable: true });
        await channel.assertQueue(TASK_QUEUE, { durable: true });
        await channel.bindQueue(TASK_QUEUE, TASK_EXCHANGE, 'task_routing_key');
        
        await channel.assertExchange(STATUS_EXCHANGE, 'fanout', { durable: false });

        console.log('RabbitMQ connected and topology asserted.');
        
        connection.on('error', (err) => {
            console.error('[AMQP] connection error', err.message);
            // 实现重连逻辑
            channel = null;
            connection = null;
            setTimeout(connectRabbitMQ, 5000);
        });

        connection.on('close', () => {
            console.warn('[AMQP] connection closed. Reconnecting...');
            channel = null;
            connection = null;
            setTimeout(connectRabbitMQ, 5000);
        });

        return { connection, channel };
    } catch (error) {
        console.error('Failed to connect to RabbitMQ:', error.message);
        // 初始连接失败也需要重试
        setTimeout(connectRabbitMQ, 5000);
        throw error;
    }
}

function getChannel() {
    if (!channel) {
        throw new Error('RabbitMQ channel is not available.');
    }
    return channel;
}

// 优雅关闭
process.on('SIGINT', async () => {
    if (channel) await channel.close();
    if (connection) await connection.close();
    process.exit(0);
});

module.exports = { connectRabbitMQ, getChannel, TASK_EXCHANGE, STATUS_EXCHANGE, TASK_QUEUE };

这段代码不仅建立了连接,还包含了基本的错误处理和重连逻辑,这是生产级代码的最低要求。

第二步:实现高韧性 Worker 与熔断器

Worker 的核心是熔断器。一个简单的熔断器实现需要包含三种状态:CLOSED, OPEN, HALF_OPEN

  • CLOSED: 正常状态,允许请求通过。记录失败次数。当失败次数达到阈值,切换到 OPEN 状态。
  • OPEN: 熔断状态,直接拒绝所有请求。一段时间后(resetTimeout),切换到 HALF_OPEN 状态。
  • HALF_OPEN: 试探状态,允许单个请求通过。如果成功,切换回 CLOSED;如果失败,立即切换回 OPEN
// worker/circuitBreaker.js
const STATE = {
    CLOSED: 'CLOSED',
    OPEN: 'OPEN',
    HALF_OPEN: 'HALF_OPEN'
};

class CircuitBreaker {
    constructor(options = {}) {
        this.failureThreshold = options.failureThreshold || 3;
        this.successThreshold = options.successThreshold || 1; // For HALF_OPEN -> CLOSED
        this.resetTimeout = options.resetTimeout || 10000; // 10 seconds

        this.state = STATE.CLOSED;
        this.failureCount = 0;
        this.successCount = 0;
        this.lastFailureTime = null;
        this.timer = null;

        // 回调,用于报告状态变化
        this.onStateChange = options.onStateChange || (() => {});
    }

    _trip() {
        if (this.state === STATE.OPEN) return;
        this.state = STATE.OPEN;
        this.lastFailureTime = Date.now();
        this.onStateChange(this.state);
        console.log(`[CircuitBreaker] Tripped to OPEN state.`);
        this.timer = setTimeout(() => this._attemptReset(), this.resetTimeout);
    }

    _attemptReset() {
        if (this.state !== STATE.OPEN) return;
        this.state = STATE.HALF_OPEN;
        this.onStateChange(this.state);
        console.log(`[CircuitBreaker] Moving to HALF_OPEN state.`);
    }

    _reset() {
        if (this.state === STATE.CLOSED) return;
        this.state = STATE.CLOSED;
        this.failureCount = 0;
        this.successCount = 0;
        this.onStateChange(this.state);
        console.log(`[CircuitBreaker] Reset to CLOSED state.`);
        if (this.timer) {
            clearTimeout(this.timer);
            this.timer = null;
        }
    }

    async execute(action) {
        if (this.state === STATE.OPEN) {
            throw new Error('CircuitBreaker is open. Request rejected.');
        }

        try {
            const result = await action();
            this._onSuccess();
            return result;
        } catch (error) {
            this._onFailure();
            throw error;
        }
    }

    _onSuccess() {
        if (this.state === STATE.HALF_OPEN) {
            this.successCount++;
            if (this.successCount >= this.successThreshold) {
                this._reset();
            }
        } else {
            // 在 CLOSED 状态下成功,重置失败计数器
            this.failureCount = 0;
        }
    }

    _onFailure() {
        this.failureCount++;
        if (this.state === STATE.HALF_OPEN || this.failureCount >= this.failureThreshold) {
            this._trip();
        }
    }

    getState() {
        return this.state;
    }
}

module.exports = CircuitBreaker;

现在,我们将这个熔断器集成到 Worker 中。Worker 会消费 tasks_queue 里的消息,并用熔断器包装对第三方 API 的调用。

// worker/index.js
const { connectRabbitMQ, getChannel, TASK_QUEUE, STATUS_EXCHANGE } = require('../common/rabbit');
const CircuitBreaker = require('./circuitBreaker');
const { v4: uuidv4 } = require('uuid');

const workerId = `worker-${uuidv4()}`;

// 模拟不稳定的第三方API
async function unstableApiCall(payload) {
    return new Promise((resolve, reject) => {
        setTimeout(() => {
            if (Math.random() > 0.6) {
                resolve({ success: true, data: `Processed: ${payload.data}` });
            } else {
                reject(new Error('Third-party API failed.'));
            }
        }, Math.random() * 1000 + 500);
    });
}

function publishStatus(channel, eventType, payload) {
    const event = {
        workerId,
        eventType,
        timestamp: new Date().toISOString(),
        ...payload
    };
    const message = Buffer.from(JSON.stringify(event));
    // 发送到 fanout 交换机,不需要 routing key
    channel.publish(STATUS_EXCHANGE, '', message);
}

async function main() {
    await connectRabbitMQ();
    const channel = getChannel();

    const circuitBreaker = new CircuitBreaker({
        failureThreshold: 5,
        resetTimeout: 15000,
        onStateChange: (newState) => {
            publishStatus(channel, 'circuit_breaker_state_changed', {
                state: newState
            });
        }
    });

    // 初始化时发布一次状态
    publishStatus(channel, 'worker_started', {
        state: circuitBreaker.getState()
    });

    console.log(`[${workerId}] Waiting for tasks in queue: ${TASK_QUEUE}`);
    
    channel.consume(TASK_QUEUE, async (msg) => {
        if (msg !== null) {
            const task = JSON.parse(msg.content.toString());
            console.log(`[${workerId}] Received task ${task.id}`);
            
            publishStatus(channel, 'task_started', { taskId: task.id });

            try {
                // 使用熔断器执行任务
                const result = await circuitBreaker.execute(() => unstableApiCall(task.payload));
                console.log(`[${workerId}] Task ${task.id} succeeded.`);
                publishStatus(channel, 'task_succeeded', { taskId: task.id, result });
                channel.ack(msg);
            } catch (error) {
                console.error(`[${workerId}] Task ${task.id} failed: ${error.message}`);
                publishStatus(channel, 'task_failed', { taskId: task.id, error: error.message, breakerState: circuitBreaker.getState() });
                // 即使失败也要 ack,防止任务被无限重试,熔断器已经处理了失败逻辑
                // 在真实场景中,可能会将失败的任务路由到死信队列
                channel.ack(msg);
            }
        }
    }, { noAck: false }); // 必须使用手动确认
}

main().catch(console.error);

这里的关键点是:publishStatus 函数。无论任务成功、失败,还是熔断器状态改变,Worker 都会将结构化的事件发布到 status_fanout_exchange。这是前端实时性的数据来源。

第三步:Notifier Service 与 WebSocket API

Notifier Service 是连接后端逻辑和前端 UI 的桥梁。它非常简单,只做两件事:

  1. 从 RabbitMQ 消费状态事件。
  2. 将事件广播给所有连接的 WebSocket 客户端。
// notifier/index.js
const { WebSocketServer } = require('ws');
const { connectRabbitMQ, getChannel, STATUS_EXCHANGE } = require('../common/rabbit');

async function main() {
    // 1. 设置 WebSocket 服务器
    const wss = new WebSocketServer({ port: 8080 });
    console.log('WebSocket server started on port 8080.');

    wss.on('connection', ws => {
        console.log('Client connected.');
        ws.on('close', () => {
            console.log('Client disconnected.');
        });
        ws.on('error', console.error);
    });

    function broadcast(data) {
        wss.clients.forEach(client => {
            if (client.readyState === client.OPEN) {
                client.send(data);
            }
        });
    }

    // 2. 连接 RabbitMQ 并消费状态事件
    await connectRabbitMQ();
    const channel = getChannel();
    
    const q = await channel.assertQueue('', { exclusive: true }); // 创建一个临时的、独占的队列
    channel.bindQueue(q.queue, STATUS_EXCHANGE, '');

    console.log('Waiting for status events...');
    channel.consume(q.queue, (msg) => {
        if (msg.content) {
            const eventString = msg.content.toString();
            console.log(`Received status event: ${eventString}`);
            // 直接将原始事件广播给所有客户端
            broadcast(eventString);
        }
    }, { noAck: true });
}

main().catch(console.error);

这种设计非常干净。API 设计的核心不在于 HTTP 端点,而在于这套基于 WebSocket 和 RabbitMQ 的事件流协议。

第四步:Vue + MobX 构建响应式前端

前端的核心是状态管理。我们需要存储和展示两类状态:Worker 的状态(主要是熔断器)和 Task 的状态。

首先,定义 MobX Stores。

// src/stores/WorkerStore.js
import { makeAutoObservable, observable } from 'mobx';

class WorkerStore {
    workers = observable.map(); // 使用 map 更高效地按 ID 查找和更新

    constructor() {
        makeAutoObservable(this);
    }

    updateWorkerState(workerId, state, timestamp) {
        if (!this.workers.has(workerId)) {
            this.workers.set(workerId, { id: workerId, state: 'UNKNOWN', lastUpdate: '' });
        }
        const worker = this.workers.get(workerId);
        worker.state = state;
        worker.lastUpdate = timestamp;
    }

    // 将 map 转换为数组,方便在 Vue 中渲染
    get workerList() {
        return Array.from(this.workers.values());
    }
}

export const workerStore = new WorkerStore();
// src/stores/TaskStore.js
import { makeAutoObservable, observable } from 'mobx';

class TaskStore {
    tasks = observable.map();
    MAX_TASKS = 100; // 防止内存无限增长

    constructor() {
        makeAutoObservable(this);
    }

    updateTask(taskData) {
        const { taskId, eventType, timestamp } = taskData;
        let task = this.tasks.get(taskId);

        if (!task) {
            task = { id: taskId, status: 'PENDING', history: [] };
            this.tasks.set(taskId, task);
        }

        switch (eventType) {
            case 'task_started':
                task.status = 'RUNNING';
                break;
            case 'task_succeeded':
                task.status = 'SUCCEEDED';
                task.result = taskData.result;
                break;
            case 'task_failed':
                task.status = 'FAILED';
                task.error = taskData.error;
                break;
        }

        task.history.push({ event: eventType, time: timestamp });

        // 维护任务列表大小
        if (this.tasks.size > this.MAX_TASKS) {
            const oldestKey = this.tasks.keys().next().value;
            this.tasks.delete(oldestKey);
        }
    }

    get taskList() {
        // 按时间倒序排列
        return Array.from(this.tasks.values()).sort((a, b) => 
            new Date(b.history[0].time) - new Date(a.history[0].time)
        );
    }
}

export const taskStore = new TaskStore();

接着是 WebSocket 服务,它负责接收数据并调用 Store 的 action。

// src/services/WebSocketService.js
import { workerStore } from '../stores/WorkerStore';
import { taskStore } from '../stores/TaskStore';

const WEBSOCKET_URL = 'ws://localhost:8080';

class WebSocketService {
    ws = null;

    connect() {
        this.ws = new WebSocket(WEBSOCKET_URL);

        this.ws.onopen = () => {
            console.log('WebSocket connected.');
        };

        this.ws.onmessage = (event) => {
            try {
                const data = JSON.parse(event.data);
                this.handleEvent(data);
            } catch (error) {
                console.error('Error parsing WebSocket message:', error);
            }
        };

        this.ws.onclose = () => {
            console.log('WebSocket disconnected. Reconnecting in 3 seconds...');
            setTimeout(() => this.connect(), 3000);
        };

        this.ws.onerror = (error) => {
            console.error('WebSocket error:', error);
            this.ws.close();
        };
    }

    handleEvent(event) {
        const { eventType, workerId, timestamp } = event;

        switch (eventType) {
            case 'worker_started':
            case 'circuit_breaker_state_changed':
                workerStore.updateWorkerState(workerId, event.state, timestamp);
                break;
            case 'task_started':
            case 'task_succeeded':
            case 'task_failed':
                taskStore.updateTask(event);
                break;
            default:
                // 未知事件,可以加日志
                break;
        }
    }
}

export const webSocketService = new WebSocketService();

最后,是 Vue 组件。使用 mobx-vue-lite 将组件变为 observer,这样它们就能自动响应 MobX 状态的变化。

<!-- src/components/Dashboard.vue -->
<template>
  <div class="dashboard">
    <div class="panel">
      <h2>Worker Status</h2>
      <div v-if="workerStore.workerList.length === 0" class="empty-state">
        Waiting for workers...
      </div>
      <ul v-else>
        <li v-for="worker in workerStore.workerList" :key="worker.id" class="worker-item">
          <span class="worker-id">{{ worker.id }}</span>
          <span :class="['status-badge', getStatusClass(worker.state)]">{{ worker.state }}</span>
        </li>
      </ul>
    </div>
    <div class="panel">
      <h2>Task Stream</h2>
      <div v-if="taskStore.taskList.length === 0" class="empty-state">
        Waiting for tasks...
      </div>
      <ul v-else class="task-list">
        <li v-for="task in taskStore.taskList" :key="task.id" :class="['task-item', task.status.toLowerCase()]">
            <strong>Task ID:</strong> {{ task.id }} <br/>
            <strong>Status:</strong> {{ task.status }}
            <div v-if="task.error" class="error-details"><strong>Error:</strong> {{ task.error }}</div>
        </li>
      </ul>
    </div>
  </div>
</template>

<script>
import { observer } from 'mobx-vue-lite';
import { workerStore } from '../stores/WorkerStore';
import { taskStore } from '../stores/TaskStore';

export default observer({
  name: 'Dashboard',
  setup() {
    return {
      workerStore,
      taskStore,
    };
  },
  methods: {
    getStatusClass(state) {
        const statusMap = {
            'CLOSED': 'status-closed',
            'OPEN': 'status-open',
            'HALF_OPEN': 'status-half-open'
        };
        return statusMap[state] || '';
    }
  },
});
</script>

<style scoped>
/* 一些基本的样式 */
.dashboard { display: flex; gap: 20px; font-family: sans-serif; }
.panel { flex: 1; border: 1px solid #ccc; padding: 15px; border-radius: 8px; }
.worker-item, .task-item { list-style: none; padding: 10px; margin-bottom: 5px; border-radius: 4px; }
.status-badge { padding: 3px 8px; border-radius: 12px; color: white; font-weight: bold; }
.status-closed { background-color: #28a745; }
.status-open { background-color: #dc3545; }
.status-half-open { background-color: #ffc107; color: black; }
.task-item.succeeded { background-color: #e9f7ec; }
.task-item.failed { background-color: #fbe9e9; }
.task-item.running { background-color: #e8f4fd; }
.error-details { color: #dc3545; font-size: 0.9em; margin-top: 5px; }
</style>

在应用的入口 (main.js) 启动 WebSocket 连接:

// src/main.js
import { createApp } from 'vue';
import App from './App.vue';
import { webSocketService } from './services/WebSocketService';

webSocketService.connect();

createApp(App).mount('#app');

至此,整个闭环完成。当启动所有服务(RabbitMQ, Notifier, 多个 Worker 实例)后,前端界面会实时地、无需任何用户操作地展示出新启动的 worker,以及任务流的状态。如果模拟第三方 API 频繁失败,能清晰地看到 worker 的熔断器状态从 CLOSED 变为 OPEN,任务从 RUNNING 变为 FAILED,这一切都在毫秒间响应。

方案局限与未来迭代

这个方案虽然解决了实时性和解耦的问题,但在生产环境中仍有几个需要考虑的点:

  1. 熔断器状态持久化: 当前熔断器的状态是 worker 进程的内存状态。如果 worker 重启,状态会丢失。对于需要跨进程、跨实例共享熔断状态的场景,应将状态(如失败次数、状态位)存储在 Redis 等外部共享存储中。
  2. WebSocket 网关的扩展性: Notifier Service 是一个单点。在大量客户端连接的场景下,需要一个可水平扩展的 WebSocket 集群。这通常需要借助 Redis Pub/Sub 等机制在多个 WebSocket 服务实例之间广播消息。
  3. 消息的最终一致性: 从 Worker 发出状态到前端 UI 更新,整个链路是异步的。虽然速度很快,但并非强一致。对于需要事务性保证的场景,此架构不适用。
  4. 前端状态的精细化: 对于超高频的更新,可以引入 throttledebounce 机制来控制 UI 更新频率,或者使用 transaction 来批量更新 MobX 状态,以避免不必要的渲染。

未来的一个演进方向是加入更丰富的可观测性。例如,在每个状态事件中都注入一个全局唯一的 traceId,这样就可以将前端的一个任务条目与后端日志、指标完整地关联起来,实现端到端的链路追踪。


  目录