构建从 ZeroMQ 到 Apache Hudi 的近实时数据管道:一个 Node.js 实现的复盘


技术痛点:从分钟级到秒级的延迟鸿沟

我们的业务场景中,大量微服务(清一色的 Node.js 技术栈)持续不断地产生用户行为事件——点击、浏览、加购等。最初的架构简单粗暴:应用将事件日志以 JSON Lines 格式写入本地文件,一个定时任务每 5 分钟将这些日志文件 rsync 到一台聚合服务器,再由一个 Python 脚本解析、转换后批量 COPY 到数据仓库。这个架构在业务初期运行良好,但随着机器学习团队对实时特征需求的提出,5到10分钟的数据延迟成了无法容忍的瓶颈。推荐系统和实时风控模型需要的是“刚刚发生”的数据,而不是“几分钟前”的陈旧快照。

我们的目标很明确:将数据从产生到可供查询的端到端延迟(end-to-end latency)压缩到 1 分钟以内。同时,数据湖必须支持高效的记录级别更新(UPSERT),因为我们需要维护的是用户最新状态的画像,而不是一长串不可变的历史事件流。

初步构想与技术选型的挣扎

业界标准的解决方案立刻浮现在脑海:Kafka -> Spark Streaming -> Apache Hudi。这是一个久经考验的组合,功能强大,生态成熟。但对于我们以 Node.js 为主的团队来说,引入并维护一个庞大的 JVM 技术栈——Kafka、Zookeeper、Spark——意味着巨大的运维成本和学习曲线。我们的运维团队对这套体系并不熟悉,这在真实项目中是一个必须正视的风险。

于是,我们开始探索一个更轻量、更贴合现有技术栈的方案。

  1. 消息传输层:ZeroMQ vs Kafka
    在我们的场景里,事件数据在内网传输,允许极低概率的消息丢失,但对吞吐量和延迟要求极高。Kafka 提供了强大的持久化和至少一次(at-least-once)的保证,但其 Broker 模式引入了额外的网络跳数和运维复杂性。ZeroMQ,作为一个 brokerless 的消息库,提供了惊人的性能和极低的延迟。通过 PUSH/PULLPUB/SUB 模式,它可以直接在服务间建立高效的数据通道。考虑到我们的 Node.js 服务已经遍布整个集群,内嵌一个 ZMQ 的 socket 几乎没有额外成本。我们决定采用 ZMQ 的 PUSH/PULL 模式,由事件源服务作为 PUSH 端,一个中心化的数据摄取服务作为 PULL 端。

  2. 数据湖存储层:为什么是 Apache Hudi
    我们需要 UPSERT。这是构建实时特征仓库的核心需求。无论是用户的最后一次点击类别,还是当前购物车中的商品总数,这些都是需要被更新的状态,而非简单追加。Apache Hudi 提供的 Copy-on-Write (CoW) 和 Merge-on-Read (MoR) 表格式,尤其是 CoW 对读优化查询非常友好,完美契合我们读多写少的特征查询场景。

  3. 摄取与写入层:Node.js 的挑战与破局
    这是最棘手的一环。Apache Hudi 的核心写入客户端是 Java 实现的。社区并没有提供官方的 Node.js Writer。直接从 Node.js 调用 Hudi API 是不可能的。
    摆在我们面前有三条路:

    • 方案A:Node.js -> S3 -> Spark Trigger。 Node.js 服务将微批次数据写入 S3 的一个临时目录,然后通过某种方式(如 S3 事件通知触发 Lambda)启动一个短暂的 Spark 任务(HoodieDeltaStreamer)来完成对 Hudi 表的写入。这个方案解耦了写入逻辑,但 Spark 任务的冷启动延迟可能会成为我们达成“1分钟内”目标的关键障碍。
    • 方案B:在 Node.js 中通过 child_process 调用 Hudi CLI 工具。 这是一个非常 hacky 的方式,配置复杂,错误处理困难,性能堪忧。在生产环境中,这无异于埋下一颗定时炸弹。
    • 方案C:构建一个轻量级的“Hudi 写入服务”。 用 Java 或 Scala (我们团队有少量 Java 储备) 编写一个极简的、常驻的 HTTP/gRPC 服务。这个服务只做一件事:接收来自 Node.js 摄取网关的数据批次,然后调用 Hudi 的 Java 客户端 API 将其写入数据湖。

我们最终选择了方案 C。它虽然增加了一个新的服务组件,但权衡下来优势明显:

  • 关注点分离: Node.js 团队可以专注于数据流的消费、校验和批处理,无需关心 Hudi 的复杂配置。Java 服务则专心负责与 Hudi 的交互。
  • 性能可控: 这是一个长服务,避免了 Spark 任务的冷启动延迟。我们可以通过水平扩展这个服务来应对写入压力。
  • 技术栈隔离: 将 JVM 的复杂性封装在一个小服务内,对整个系统的侵入性最小。

整个数据流架构因此定型。

flowchart TD
    subgraph "事件源 (Node.js Microservices)"
        Producer1[Service A] --> ZMQSocket
        Producer2[Service B] --> ZMQSocket
        Producer3[Service C] --> ZMQSocket
    end

    subgraph "数据摄取层"
        ZMQSocket(ZeroMQ PUSH/PULL Socket) --> IngestionGateway[Node.js Ingestion Gateway]
    end

    subgraph "数据写入层"
        IngestionGateway -- "POST /write-batch (JSON/Arrow)" --> HudiWriterService[Java Hudi Writer Service]
    end



    subgraph "数据湖 (S3)"
        HudiWriterService -- "Hudi Java Client API" --> HudiTable[Apache Hudi Table]
    end

    subgraph "数据消费层"
        Analytics[Presto / Spark SQL] --> HudiTable
        ML[ML Feature Store Client] --> HudiTable
    end

步骤化实现:代码中的魔鬼细节

1. 事件生产者 (Node.js Microservice)

这是一个典型的业务服务,它在完成某个业务逻辑后,需要推送一个事件。我们使用 zeromq npm 包。这里的关键是,连接是 connect 到一个众所周知的摄取网关地址。

producer.js:

// producer.js
const zmq = require('zeromq');
const { v4: uuidv4 } = require('uuid');

class EventProducer {
    constructor(gatewayAddress) {
        if (!gatewayAddress) {
            throw new Error('Gateway address is required.');
        }
        this.socket = new zmq.Push();
        // 设置高水位标记,防止消息在发送端无限积压
        this.socket.setOption(zmq.ZMQ_SNDHWM, 10000);
        this.gatewayAddress = gatewayAddress;
    }

    async connect() {
        try {
            await this.socket.connect(this.gatewayAddress);
            console.log(`Producer connected to gateway at ${this.gatewayAddress}`);
        } catch (err) {
            console.error(`Failed to connect producer: ${err.message}`);
            // 在真实项目中,这里应该有重连逻辑
            process.exit(1);
        }
    }

    async sendEvent(eventType, payload) {
        const event = {
            eventId: uuidv4(),
            eventType: eventType,
            timestamp: new Date().toISOString(),
            source: 'user-service',
            payload: payload
        };

        try {
            // 我们选择发送JSON字符串,因为易于调试。
            // 在对性能有极致要求的场景,可以考虑Protobuf或FlatBuffers。
            await this.socket.send(JSON.stringify(event));
        } catch (err) {
            // send是异步的,如果网络中断可能会失败
            console.error(`Failed to send event: ${err.message}`);
            // 生产环境中,失败的消息需要被记录到备用存储(如本地文件)以便后续重试
        }
    }
}

// 使用示例
async function main() {
    const producer = new EventProducer('tcp://127.0.0.1:3000');
    await producer.connect();

    console.log('Starting to send events...');
    setInterval(() => {
        const userId = Math.floor(Math.random() * 1000);
        const event = {
            userId: `user_${userId}`,
            action: 'view_product',
            productId: `prod_${Math.floor(Math.random() * 100)}`,
            price: parseFloat((Math.random() * 100).toFixed(2))
        };
        producer.sendEvent('user_interaction', event);
    }, 100); // 每秒发送约10个事件
}

main();

生产级考量:

  • 连接管理: connect 失败需要有健壮的重连机制,例如使用指数退避算法。
  • 发送失败处理: send 失败的消息不能直接丢弃,应实现一个降级策略,例如写入本地磁盘的 dead-letter queue,由另一个进程负责重试。

2. 核心:Node.js 摄取网关

这是整个管道的大脑,负责消费 ZMQ 消息、进行微批处理,并调用 Hudi 写入服务。

ingestion-gateway.js:

// ingestion-gateway.js
const zmq = require('zeromq');
const axios = require('axios');
const pino = require('pino');

const logger = pino({ level: 'info' });

class IngestionGateway {
    constructor(config) {
        this.config = Object.assign({
            bindAddress: 'tcp://*:3000',
            hudiWriterUrl: 'http://localhost:8080/hudi/write-batch',
            batchSize: 1000,
            batchTimeoutMs: 5000, // 5 seconds
        }, config);

        this.socket = new zmq.Pull();
        this.messageBuffer = [];
        this.flushTimer = null;
    }

    async start() {
        await this.socket.bind(this.config.bindAddress);
        logger.info(`Ingestion gateway listening on ${this.config.bindAddress}`);
        
        this.resetFlushTimer();
        this.listen();
    }
    
    // 循环监听ZMQ消息
    async listen() {
        for await (const [msg] of this.socket) {
            try {
                // 缓冲区满了,立即刷写
                if (this.messageBuffer.length >= this.config.batchSize) {
                    logger.info(`Buffer full (${this.messageBuffer.length}), flushing...`);
                    // flush必须是异步的,但我们不能在这里await,否则会阻塞新消息的接收
                    // 我们选择“fire-and-forget”,flush内部自己处理重试和错误
                    this.flush(); 
                }
                
                // 注意:msg是Buffer对象,需要转为字符串
                const messageString = msg.toString('utf-8');
                const parsedMessage = JSON.parse(messageString);
                this.messageBuffer.push(parsedMessage);

            } catch (err) {
                logger.error({ err, msg: msg.toString() }, 'Failed to process incoming message');
                // 错误的消息可以推送到一个专门的错误队列
            }
        }
    }

    resetFlushTimer() {
        if (this.flushTimer) {
            clearTimeout(this.flushTimer);
        }
        this.flushTimer = setTimeout(() => {
            if (this.messageBuffer.length > 0) {
                logger.info(`Timeout reached, flushing ${this.messageBuffer.length} messages...`);
                this.flush();
            } else {
                this.resetFlushTimer();
            }
        }, this.config.batchTimeoutMs);
    }
    
    // flush 是整个系统的核心逻辑
    async flush() {
        // 在开始刷写前,清除定时器并交换缓冲区
        // 这是一个关键操作,避免了在异步发送期间,新消息被添加到正在发送的批次中
        if (this.flushTimer) clearTimeout(this.flushTimer);
        
        const batchToSend = this.messageBuffer;
        this.messageBuffer = [];
        this.resetFlushTimer();

        if (batchToSend.length === 0) {
            return;
        }

        try {
            const response = await axios.post(this.config.hudiWriterUrl, {
                records: batchToSend
            }, {
                headers: { 'Content-Type': 'application/json' },
                timeout: 10000 // 设置10秒超时
            });

            logger.info({ 
                count: batchToSend.length, 
                status: response.status 
            }, 'Successfully flushed batch to Hudi Writer Service.');

        } catch (err) {
            logger.error({ 
                err: err.message, 
                count: batchToSend.length 
            }, 'Failed to flush batch. Re-queuing for next attempt.');
            
            // **错误处理关键点**: 发送失败的批次必须被重新加入缓冲区,或者持久化到备用队列
            // 一个简单的策略是将其放回缓冲区的开头,但这可能阻塞新数据
            // 更好的方式是使用一个独立的重试队列
            this.messageBuffer.unshift(...batchToSend);
        }
    }
}

// 启动服务
const gateway = new IngestionGateway({});
gateway.start().catch(err => {
    logger.fatal(err, 'Gateway failed to start');
    process.exit(1);
});

生产级考量:

  • 背压 (Backpressure): 当前实现没有背压机制。如果 Hudi 写入服务变慢,messageBuffer 会在内存中无限增长,导致 OOM。在真实项目中,需要设置缓冲区上限,并在达到上限时暂停从 ZMQ 拉取消息,或者将消息溢出到磁盘。
  • 优雅停机 (Graceful Shutdown): 进程收到 SIGTERM 信号时,应确保将内存中剩余的 messageBuffer 全部刷写成功后再退出。
  • 数据格式: JSON 格式简单,但序列化/反序列化开销大。对于大数据量,Node.js 服务和 Java 服务之间最好使用 Apache Arrow 格式通信。Arrow 提供了零拷贝的跨语言数据读写,性能远超 JSON。

3. Hudi 写入服务 (Java/Spring Boot)

我们不会在这里展示完整的 Spring Boot 应用,但会列出其核心 API 契约和 Hudi 写入逻辑的关键配置。这部分对于理解整个架构至关重要。

REST API 定义:

  • Endpoint: POST /hudi/write-batch
  • Request Body (JSON):
    {
      "records": [
        { "eventId": "...", "eventType": "...", "timestamp": "...", ... },
        { "eventId": "...", "eventType": "...", "timestamp": "...", ... }
      ]
    }
  • Success Response: HTTP 200 OK
  • Failure Response: HTTP 500 Internal Server Error with error details.

Hudi 写入关键配置 (hudi_options.properties):

# Hudi表名
hoodie.table.name=user_events_cow

# 表类型:Copy On Write
hoodie.table.type=COPY_ON_WRITE

# 记录的唯一键
hoodie.datasource.write.recordkey.field=eventId

# 用于确定数据分区的字段,这里按天分区
hoodie.datasource.write.partitionpath.field=payload.date

# 在记录键冲突时,用于确定哪条记录是最新的字段
hoodie.datasource.write.precombine.field=timestamp

# 写入操作类型:upsert
hoodie.datasource.write.operation=upsert

# Hive 同步配置
hoodie.datasource.hive_sync.enable=true
hoodie.datasource.hive_sync.database=user_behavior
hoodie.datasource.hive_sync.table=user_events_cow
hoodie.datasource.hive_sync.partition_fields=date
hoodie.datasource.hive_sync.jdbcurl=jdbc:hive2://...

核心 Java 伪代码:

// Inside a Spring @RestController
@PostMapping("/hudi/write-batch")
public ResponseEntity<String> writeBatch(@RequestBody BatchRequest request) {
    // 1. 将接收到的JSON List<Map<String, Object>> 转换为 Spark DataFrame
    // Dataset<Row> df = sparkSession.createDataFrame(request.getRecords(), ...);

    // 2. 加载Hudi配置
    Map<String, String> hudiOptions = loadHudiProperties(); 

    // 3. 写入Hudi
    df.write()
      .format("hudi")
      .options(hudiOptions)
      .mode(SaveMode.Append) // 总是使用Append模式,具体操作由`hoodie.datasource.write.operation`决定
      .save("/path/to/s3/user_events_cow");
      
    return ResponseEntity.ok("Batch written successfully.");
}

这部分代码清晰地展示了 Java 服务如何作为 Node.js 和 Hudi 之间的桥梁,它封装了所有与 JVM 相关的复杂性。

最终成果与反思

部署上线后,这套架构成功地将我们的数据延迟从平均 8 分钟降低到了 45 秒左右,完全满足了业务需求。Node.js 摄取网关在单机(4核8G)上稳定处理了每秒 20,000 条事件的峰值流量,CPU 和内存占用都维持在健康水平。整个方案的技术栈也对团队非常友好,除了那个小小的 Java 服务,大部分组件都在我们熟悉的领域内。

局限性与未来迭代路径

这个方案并非银弹,它是在特定约束下的一个务实选择。

  1. 摄取网关的单点问题: 当前的 Node.js 摄取网关是一个单点。虽然可以部署多个实例并使用 ZMQ 的 DEALER/ROUTER 模式来做负载均衡,但这会增加架构的复杂度。
  2. 数据一致性: 当前的实现是“至少一次”的交付语义。如果 Hudi 写入服务在处理完请求后、返回成功响应前崩溃,Node.js 网关会重试,导致数据重复。实现“恰好一次” (exactly-once) 需要在 Hudi 写入服务中引入事务性写入和幂等性检查,或者在 Node.js 端实现更复杂的握手协议,这会显著增加系统复杂性。
  3. 自定义写入服务的维护成本: 虽然这个 Java 服务很小,但它依然是一个需要独立部署、监控和维护的组件。长期来看,如果 Apache Hudi 社区能提供基于 REST 的数据写入网关(如 Hudi Streamer 的一个变种),或者一个轻量级的 C++ 客户端(可通过 Node.js FFI 调用),那将是更理想的解决方案。
  4. 背压机制的缺失: 当前依赖内存的批处理和简单的重试逻辑在面对下游持续拥堵时是脆弱的。未来的一个重要迭代方向是在 ZMQ 和批处理器之间引入一个持久化的缓冲层,例如使用 Redis List 或一个轻量级的磁盘队列,以实现更可靠的背压和流量削峰。

  目录