基于CDC、Hudi与Consul Connect的全链路实时数据架构实现


我们面临一个棘手的需求:构建一个运营仪表盘,它必须同时展示来源于生产SQL数据库的最新交易记录(例如,最近5分钟内的订单状态变更),以及沉淀在数据湖中的用户历史聚合指标(例如,用户生命周期总价值LTV)。初期的方案是在前端React应用中发起两个独立的API请求,一个请求主业务后端的RESTful接口,另一个请求数据分析平台的API。这个方案在原型阶段勉强可行,但进入预生产环境后,问题立刻暴露:数据不一致、状态竞争、以及前后端数据模型的“粘合代码”急剧膨胀。更严重的是,两个独立的API端点带来了双倍的安全暴露面和复杂的认证授权管理。

整个架构的核心痛点在于,如何将一个高度事务性的操作型数据存储(OLTP)和一个分析型数据湖(OLAP)的数据,在用户无感的情况下,实时、安全、且一致地呈现在单一视图中。轮询方案因其延迟和资源浪费被首先否决。我们需要的是一个从数据源头到用户界面的、由事件驱动的全链路推送架构。

初步构想与技术选型决策

我们的目标是建立一条从MySQL Binlog出发,经由数据湖,最终抵达前端状态管理的实时数据管道。这条管道的每一环都必须是高效且安全的。

  1. 数据源头与变更捕获: 生产环境的核心是MySQL,这是一个典型的关系型数据库。为了不侵入现有业务逻辑,我们选择使用Debezium进行变更数据捕获(CDC)。它通过读取MySQL的Binlog,能够以事件流的形式,近乎实时地捕获所有INSERTUPDATEDELETE操作,并将这些变更以JSON格式推送到Kafka中。

  2. 数据湖技术: 数据湖需要承载准实时的分析查询,并能够处理来自上游的行级别变更。Apache Hudi是理想选择。其Merge-on-Read (MOR)表类型特别适合这种场景,它能快速摄入增量数据,并将合并操作推迟到查询时,实现了写入延迟和查询性能的平衡。更重要的是,Hudi为数据湖带来了ACID事务和增量处理能力。

  3. 流处理引擎: 我们需要一个组件来消费Kafka中的Debezium事件,并将其写入Hudi表。Apache Spark Structured Streaming因其强大的生态和对Hudi的良好支持而入选。它能够处理复杂的ETL逻辑,并保证端到端的Exactly-Once语义。

  4. 服务间安全通信: 整条数据管道涉及多个分布式组件:Debezium Connect、Kafka、Spark集群、数据服务API、业务BFF(Backend for Frontend)。在生产环境中,确保这些服务间的通信安全至关重要。手动配置TLS证书是一场噩梦。Consul Connect通过其服务网格能力,以Sidecar代理(Envoy)的方式为所有服务提供了自动化的mTLS加密和基于服务身份的授权,极大地简化了安全管理。

  5. 前端状态管理: 前端需要一个轻量级、高效的状态管理器来处理初始数据加载和后续源源不断的实时更新。Zustand因其极简的API和对React Hooks的天然亲和力而胜出。它没有Redux那样的模板代码负担,非常适合管理这种从WebSocket推送而来的、离散的数据流。

最终的架构图如下:

graph TD
    subgraph "生产环境 (OLTP)"
        A[MySQL] -- Binlog --> B[Debezium Connector];
    end

    subgraph "数据管道 (Kafka & Consul Mesh)"
        B -- Kafka Topic: mysql.orders --> C[Spark Streaming Job];
        C -- Hudi Upsert --> D[Hudi Table on S3/HDFS];
        B -- Consul Agent --> E((Consul Server));
        C -- Consul Agent --> E;
        F[BFF Service] -- Consul Agent --> E;
        G[Data API] -- Consul Agent --> E;
    end

    subgraph "数据服务 (OLAP & Real-time)"
        D -- Query --> G[Data API - Trino/Presto];
        A -- Real-time Query --> F[BFF Service];
        C -- Kafka Topic: filtered.updates --> F;
    end

    subgraph "前端应用"
        H[React Client] -- HTTP Initial Load --> F;
        H -- WebSocket Updates --> F;
        I[Zustand Store] --> H;
    end

    style B fill:#f9f,stroke:#333,stroke-width:2px
    style C fill:#f9f,stroke:#333,stroke-width:2px
    style F fill:#f9f,stroke:#333,stroke-width:2px
    style G fill:#f9f,stroke:#333,stroke-width:2px
    style E fill:#bbf,stroke:#333,stroke-width:2px

步骤化实现:从数据库到UI

1. 配置Debezium捕获MySQL变更

首先,确保MySQL已开启GTID和Binlog。

-- my.cnf or mysql command
SET GLOBAL gtid_mode = ON;
SET GLOBAL enforce_gtid_consistency = ON;
SET GLOBAL binlog_format = ROW;

然后,我们向Kafka Connect集群部署Debezium的MySQL Connector。这里的配置是关键,它定义了捕获的数据库、表,以及输出到Kafka的格式。

// debezium-mysql-connector.json
{
  "name": "inventory-connector",
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "tasks.max": "1",
    "database.hostname": "mysql.service.consul", // 使用Consul DNS
    "database.port": "3306",
    "database.user": "debezium",
    "database.password": "dbz",
    "database.server.id": "184054",
    "database.server.name": "mysql-server-1",
    "database.include.list": "inventory",
    "table.include.list": "inventory.orders",
    "database.history.kafka.bootstrap.servers": "kafka.service.consul:9092",
    "database.history.kafka.topic": "schema-changes.inventory",

    // JSON格式化,并包含schema信息
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable": "false",
    "value.converter.schemas.enable": "true",

    // 时区问题是常见陷阱
    "database.connectionTimeZone": "UTC"
  }
}

注意database.hostname配置为mysql.service.consul。这是因为我们将通过Consul Connect来代理流量,而不是直接连接MySQL。为此,我们需要为Kafka Connect服务定义一个Consul服务。

// connect-service.hcl
service {
  name = "kafka-connect"
  port = 8083

  connect {
    sidecar_service {
      proxy {
        upstreams {
          destination_name = "mysql"
          local_bind_port = 3306 // Debezium配置中的端口现在指向本地Sidecar
        }
        upstreams {
          destination_name = "kafka"
          local_bind_port = 9092
        }
      }
    }
  }
}

通过这个配置,Debezium Connector对MySQL和Kafka的访问请求会被本地的Envoy Sidecar拦截,然后通过mTLS加密通道转发到真正的MySQL和Kafka服务。

2. Spark Streaming作业:消费Kafka并写入Hudi

这个Spark作业是数据管道的核心。它消费Debezium格式的JSON数据,执行必要的转换,然后以upsert模式写入Hudi表。

// SparkHudiCDCProcessor.scala
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.Trigger
import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.config.HoodieWriteConfig

object SparkHudiCDCProcessor {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder
      .appName("MySQL CDC to Hudi")
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .config("spark.sql.hive.convertMetastoreParquet", "false")
      .getOrCreate()

    import spark.implicits._

    val hudiBasePath = "s3a://my-datalake/hudi/orders"

    val kafkaStreamDF = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "kafka.service.consul:9092") // 通过Consul DNS
      .option("subscribe", "mysql-server-1.inventory.orders")
      .option("startingOffsets", "latest")
      .load()

    val debeziumJsonDF = kafkaStreamDF
      .selectExpr("CAST(value AS STRING) as json")
      .filter($"json".isNotNull)

    // 解析Debezium的复杂JSON结构
    val payloadDF = debeziumJsonDF
      .select(from_json($"json", Schemas.debeziumSchema).as("data"))
      .select("data.payload.*")

    // 处理DELETE操作。Debezium的DELETE事件中'after'字段为null
    // Hudi通过特定的列来识别删除,我们添加一个`_hoodie_is_deleted`列
    val transformedDF = payloadDF
      .select(
        col("after.order_id").alias("order_id"),
        col("after.customer_id").alias("customer_id"),
        col("after.order_date").alias("order_ts"),
        col("after.total_amount").alias("amount"),
        col("after.status").alias("status"),
        // op='d'表示删除
        when(col("op") === "d", true).otherwise(false).alias("_hoodie_is_deleted"),
        // 从事件元数据中获取时间戳作为提交时间
        from_unixtime(col("ts_ms") / 1000).cast("timestamp").alias("commit_ts")
      )
      // 过滤掉只有`before`没有`after`的删除事件产生的空行 (除了_hoodie_is_deleted)
      .filter($"order_id".isNotNull)


    val hudiOptions = Map[String, String](
      DataSourceWriteOptions.TABLE_TYPE_OPT_KEY -> "MERGE_ON_READ",
      DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "order_id",
      DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "commit_ts", // 使用事件时间戳处理乱序
      DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "status",
      HoodieWriteConfig.TBL_NAME.key -> "orders_mor",
      DataSourceWriteOptions.HIVE_STYLE_PARTITIONING_OPT_KEY -> "true",
      DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME_OPT_KEY -> "org.apache.hudi.keygen.ComplexKeyGenerator",
      DataSourceWriteOptions.OPERATION_OPT_KEY -> DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL
    )

    val query = transformedDF.writeStream
      .format("hudi")
      .options(hudiOptions)
      .outputMode("append")
      .trigger(Trigger.ProcessingTime("30 seconds")) // 每30秒触发一次微批处理
      .option("checkpointLocation", "/tmp/spark-checkpoint/hudi_orders_cdc")
      .start(hudiBasePath)

    query.awaitTermination()
  }
}

// Schemas.debeziumSchema 定义了Debezium JSON的结构,这里省略以保持简洁

这段代码中的几个关键点:

  • PRECOMBINE_FIELD_OPT_KEY: 这是Hudi处理乱序事件和保证数据正确性的核心。我们使用Debezium事件中的时间戳ts_ms作为precombine key,确保即使事件到达顺序混乱,最终写入的也是最新的状态。
  • _hoodie_is_deleted: 对于删除操作,Debezium的after字段为null。我们通过判断op字段是否为’d’来生成这个Hudi能够识别的软删除标记。
  • Checkpointing: Spark Structured Streaming的checkpoint是保证端到端Exactly-Once语义的关键。

3. BFF服务:融合实时与历史数据

BFF是这个架构的粘合剂。它负责提供两种数据:

  1. 通过HTTP GET请求提供用户的全量画像数据,这部分数据需要关联查询MySQL和Hudi。
  2. 通过WebSocket连接,实时推送特定用户的订单状态变更。
// bff-service/main.go (simplified Go example)
package main

import (
	"database/sql"
	"fmt"
	"log"
	"net/http"
	"github.com/gorilla/websocket"
	_ "github.com/go-sql-driver/mysql"
	// ... imports for Kafka consumer, Trino client
)

var upgrader = websocket.Upgrader{
	CheckOrigin: func(r *http.Request) bool { return true },
}

// 这是一个简化的示例,生产环境需要连接池和更复杂的错误处理
var mysqlDB *sql.DB
var trinoClient *TrinoClient // 假设有一个Trino客户端库

func main() {
	// 连接通过Consul Connect Sidecar暴露的本地端口
	mysqlDSN := "user:pass@tcp(127.0.0.1:33061)/inventory" // 假设Consul将MySQL上游映射到33061
	trinoAddr := "http://127.0.0.1:80801" // 假设Trino上游映射到80801
	
	var err error
	mysqlDB, err = sql.Open("mysql", mysqlDSN)
	if err != nil {
		log.Fatal("Failed to connect to MySQL via sidecar:", err)
	}
	trinoClient = NewTrinoClient(trinoAddr)

	// 启动一个goroutine来消费Kafka变更,并分发到WebSocket连接
	go consumeAndBroadcastUpdates()

	http.HandleFunc("/api/customer/profile", handleGetProfile)
	http.HandleFunc("/ws/updates", handleWebSocket)

	// 服务监听在自己的端口上,例如8080
	log.Println("BFF service starting on :8080")
	log.Fatal(http.ListenAndServe(":8080", nil))
}

func handleGetProfile(w http.ResponseWriter, r *http.Request) {
	// 1. 从MySQL获取近期的、事务性强的订单
	rows, err := mysqlDB.Query("SELECT order_id, status FROM orders WHERE customer_id = ? ORDER BY order_date DESC LIMIT 5", r.URL.Query().Get("customerId"))
	// ... error handling and data serialization
	
	// 2. 从Hudi (via Trino) 查询历史聚合数据
	ltv, err := trinoClient.Query("SELECT sum(amount) FROM hive.default.orders_mor_ro WHERE customer_id = ?", r.URL.Query().Get("customerId"))
	// ... error handling
	
	// 3. 合并数据并返回JSON
	// ...
}

func handleWebSocket(w http.ResponseWriter, r *http.Request) {
	conn, err := upgrader.Upgrade(w, r, nil)
	if err != nil {
		log.Println("Upgrade error:", err)
		return
	}
	defer conn.Close()

	// 注册此连接,以便Kafka消费者可以向其推送消息
	// 生产级代码需要一个带锁的并发安全map来管理连接
	registerConnection(conn)
	
	// 保持连接开启
	for {
		_, _, err := conn.ReadMessage()
		if err != nil {
			unregisterConnection(conn)
			break
		}
	}
}

// consumeAndBroadcastUpdates() 函数将作为Kafka消费者
// 消费一个由Spark作业额外产出的、经过过滤的、轻量级的更新topic
// 当收到消息后,它会查找对应的WebSocket连接并推送消息

同样,BFF服务也需要注册到Consul,并声明其上游依赖:

// bff-service.hcl
service {
  name = "bff-service"
  port = 8080

  connect {
    sidecar_service {
      proxy {
        upstreams {
          destination_name = "mysql"
          local_bind_port = 33061
        }
        upstreams {
          destination_name = "trino"
          local_bind_port = 80801
        }
        upstreams {
            destination_name = "kafka"
            local_bind_port = 9092
        }
      }
    }
  }
}

4. Zustand:前端的优雅状态聚合

最后是前端部分。Zustand store的设计需要能够原子性地处理初始加载和后续的增量更新。

// store/profileStore.js
import { create } from 'zustand';
import { immer } from 'zustand/middleware/immer';

const useProfileStore = create(immer((set, get) => ({
  profile: null,
  loading: true,
  error: null,
  ws: null,

  // Action: 初始数据加载
  fetchProfile: async (customerId) => {
    set({ loading: true, error: null });
    try {
      const response = await fetch(`/api/customer/profile?customerId=${customerId}`);
      if (!response.ok) {
        throw new Error('Failed to fetch profile data.');
      }
      const data = await response.json();
      set((state) => {
        state.profile = data;
        state.loading = false;
      });
      // 数据加载成功后,初始化WebSocket连接
      get().connectWebSocket(customerId);
    } catch (e) {
      set({ error: e.message, loading: false });
    }
  },
  
  // Action: 连接WebSocket
  connectWebSocket: (customerId) => {
    // 防止重复连接
    if (get().ws) {
      get().ws.close();
    }
    
    // 这里的URL应该指向BFF的WebSocket端点
    const socket = new WebSocket(`ws://localhost:8080/ws/updates?customerId=${customerId}`);

    socket.onopen = () => {
      console.log('WebSocket connection established.');
      set({ ws: socket });
    };

    socket.onmessage = (event) => {
      const update = JSON.parse(event.data);
      // 调用状态更新逻辑
      get().applyRealtimeUpdate(update);
    };

    socket.onclose = () => {
      console.log('WebSocket connection closed.');
      set({ ws: null });
      // 可以在这里实现重连逻辑
    };

    socket.onerror = (error) => {
      console.error('WebSocket error:', error);
      set({ ws: null, error: 'WebSocket connection failed.' });
    };
  },

  // Action: 应用实时更新。这是核心逻辑所在。
  applyRealtimeUpdate: (update) => {
    set((state) => {
      if (!state.profile || !state.profile.recentOrders) return;

      const { operation, payload } = update;
      const orderIndex = state.profile.recentOrders.findIndex(o => o.order_id === payload.order_id);

      switch (operation) {
        case 'CREATE':
          // 新订单,插入到数组开头
          state.profile.recentOrders.unshift(payload);
          // 保持列表长度,例如最多显示5条
          if (state.profile.recentOrders.length > 5) {
            state.profile.recentOrders.pop();
          }
          break;
        case 'UPDATE':
          // 更新现有订单
          if (orderIndex > -1) {
            Object.assign(state.profile.recentOrders[orderIndex], payload);
          }
          break;
        case 'DELETE':
          // 删除订单
          if (orderIndex > -1) {
            state.profile.recentOrders.splice(orderIndex, 1);
          }
          break;
        default:
          // 未知操作
          break;
      }
    });
  },

  // Action: 断开连接
  disconnect: () => {
    if (get().ws) {
      get().ws.close();
      set({ ws: null });
    }
  },
})));

export default useProfileStore;

// 在React组件中使用
// function ProfilePage({ customerId }) {
//   const { profile, loading, fetchProfile, disconnect } = useProfileStore();
//
//   useEffect(() => {
//     fetchProfile(customerId);
//     return () => disconnect(); // 组件卸载时断开连接
//   }, [customerId, fetchProfile, disconnect]);
//
//   if (loading) return <div>Loading...</div>;
//   // ... 渲染profile数据
// }

immer中间件让我们可以用一种看似“可变”的直观方式来更新状态,而底层仍然是不可变操作。applyRealtimeUpdate函数清晰地展示了如何根据BFF推送来的事件类型(CREATE, UPDATE, DELETE)来精确地修改前端状态数组,避免了整个组件的重新渲染和数据全量刷新。

遗留问题与未来迭代路径

这套架构解决了核心问题,但其复杂性也引入了新的挑战。当前的实现并非终点。

首先,BFF中的WebSocket分发逻辑是单点的,生产环境需要一个更健壮的、可水平扩展的实时消息推送层,它可能需要借助Redis Pub/Sub或其他消息队列来实现广播,而不是在BFF内存中管理连接。

其次,Hudi的Merge-on-Read表虽然写入快,但查询时需要合并base文件和log文件,可能会有性能开销。需要定期运行compaction作业来优化表结构。监控Hudi表的小文件问题并调整Spark作业的批处理间隔和并行度,是一项持续的调优工作。

再者,错误处理和容错机制需要进一步强化。例如,如果Spark作业失败,如何保证数据不丢失?Kafka的offset和Spark的checkpoint提供了基础,但需要建立完整的监控告警体系,对数据管道的延迟、吞吐量和错误率进行实时监控。

最后,前端的状态同步逻辑可以更加完善。目前的实现依赖于BFF推送的指令。一种更高级的模式是实现客户端的CRDT(无冲突复制数据类型)或类似的数据结构,使前端状态能够更从容地处理乱序或重复的消息,进一步增强数据一致性的鲁棒性。


  目录