使用 eBPF、C# 与 Apache Hudi 构建 SQL Server 的无侵入实时数据捕获管道


我们面临一个棘手的工程问题:需要将一个核心生产环境的 SQL Server 数据库的实时变更数据同步到一个数据湖中,但DBA团队基于稳定性和性能的考虑,明确拒绝了所有常规的CDC(Change Data Capture)方案。这意味着我们不能在数据库上启用原生的CDC功能,不能安装任何代理(Agent),更不能使用触发器或频繁轮询。数据库服务器本身是一块不容触碰的“圣地”。

唯一的交互界面是网络。所有客户端应用都通过网络与SQL Server通信,使用Tabular Data Stream (TDS)协议。这个限制反而给了我们一个清晰的方向:如果能在网络层无侵入地捕获、解析并重构出DML操作,就能绕开对数据库的所有直接干预。这套方案的技术栈最终确定为:eBPF用于高效的内核态网络包捕获,C#构建用户态的协议解析与数据分发服务,Apache Hudi作为数据湖的存储格式以支持UPSERT操作,以及InfluxDB用于监控整个管道的健康状况。

第一步:内核层的无感捕获 - eBPF

传统的网络抓包方案如 libpcap 会将大量数据包复制到用户空间,在流量大的情况下性能开销显著。eBPF (extended Berkeley Packet Filter) 则允许我们在内核中运行一个沙箱化的程序,只将我们需要的数据发送到用户空间,效率极高。

我们的目标是捕获所有目标端口为1433(SQL Server默认端口)的TCP包的载荷。这里使用一个简单的 eBPF C 程序,通过 libbpf 框架进行加载和交互。

// bpf_program.c
#include <vmlinux.h>
#include <bpf/bpf_helpers.h>
#include <bpf/bpf_tracing.h>
#include <bpf/bpf_core_read.h>

#define SQL_SERVER_PORT 1433

// 定义一个perf buffer,用于将数据发送到用户空间
struct {
    __uint(type, BPF_MAP_TYPE_PERF_EVENT_ARRAY);
    __uint(key_size, sizeof(u32));
    __uint(value_size, sizeof(u32));
} tcp_payloads BPF_MAP_SEC(".maps");

// 定义一个结构体来承载我们要发送的数据
struct payload_data {
    u32 pid;
    u32 sport;
    u32 dport;
    u32 seq;
    u32 ack;
    u32 data_len;
    // 动态数据长度,这里只是标记
    char data[4096]; 
};

// 使用kprobe挂载到tcp_sendmsg内核函数
SEC("kprobe/tcp_sendmsg")
int BPF_KPROBE(trace_tcp_sendmsg, struct sock *sk, struct msghdr *msg, size_t size)
{
    if (sk == NULL) {
        return 0;
    }

    u16 dport = BPF_CORE_READ(sk, __sk_common.skc_dport);
    dport = bpf_ntohs(dport);
    
    // 只关心发往SQL Server端口的数据包
    if (dport != SQL_SERVER_PORT) {
        return 0;
    }

    struct tcp_sock *ts = (struct tcp_sock *)sk;
    u32 snd_nxt = BPF_CORE_READ(ts, snd_nxt);

    // 填充数据结构
    struct payload_data data = {};
    data.pid = bpf_get_current_pid_tgid() >> 32;
    data.sport = bpf_ntohs(BPF_CORE_READ(sk, __sk_common.skc_num));
    data.dport = dport;
    data.seq = snd_nxt - size; // TCP sequence number for this segment
    data.ack = BPF_CORE_READ(ts, rcv_nxt);
    
    if (size == 0 || size > sizeof(data.data)) {
        return 0;
    }

    // 从msg中读取数据。这是一个复杂的步骤,因为数据可能在多个iovec中。
    // 在真实项目中,需要遍历msg->msg_iter。这里为了简化,我们只处理简单的场景。
    struct iov_iter *iter = &msg->msg_iter;
    if (iter->type != ITER_IOVEC) {
        return 0;
    }

    int iov_count = BPF_CORE_READ(iter, nr_segs);
    if (iov_count > 0) {
        const struct iovec *iov = BPF_CORE_READ(iter, iov);
        void *buf_ptr = BPF_CORE_READ(iov, iov_base);
        size_t buf_len = BPF_CORE_READ(iov, iov_len);
        
        size_t len_to_read = size < sizeof(data.data) ? size : sizeof(data.data);
        len_to_read = len_to_read < buf_len ? len_to_read : buf_len;

        if(len_to_read > 0) {
            bpf_probe_read_user(&data.data, len_to_read, buf_ptr);
            data.data_len = len_to_read;
            
            // 将数据发送到perf buffer
            bpf_perf_event_output(ctx, &tcp_payloads, BPF_F_CURRENT_CPU, &data, sizeof(data));
        }
    }

    return 0;
}

char LICENSE[] SEC("license") = "GPL";

这个eBPF程序的核心是 trace_tcp_sendmsg 函数,它挂载在 tcp_sendmsg 这个内核函数上。每当有TCP包发送时,它会检查目标端口是否是1433。如果是,它就读取TCP包的载荷(payload)和一些元数据(PID、端口、TCP序列号),然后通过 bpf_perf_event_output 将这些信息推送到一个名为 tcp_payloads 的perf buffer中,等待用户态程序来消费。一个现实的考量是,这个实现只处理了客户端发往服务器的数据,一个完整的方案还需要捕获服务器的响应。

第二步:用户态的协议解析 - C# 服务

捕获原始的TCP载荷只是第一步,真正的挑战在于解析TDS协议。TDS是一个复杂的二进制协议。我们在用户空间构建一个 .NET Core 的后台服务(BackgroundService)来完成这项工作。

2.1 C# 与 eBPF Perf Buffer 交互

我们使用一个.NET的libbpf封装库(例如 libbpf-tools 的.NET绑定)来读取perf buffer。服务启动时,它会加载编译好的eBPF程序(bpf_program.o)并监听 tcp_payloads 这个perf buffer。

// TdsCaptureService.cs
public class TdsCaptureService : BackgroundService
{
    private readonly ILogger<TdsCaptureService> _logger;
    private readonly TdsPacketProcessor _processor;
    private readonly CancellationTokenSource _cts = new();

    public TdsCaptureService(ILogger<TdsCaptureService> logger, TdsPacketProcessor processor)
    {
        _logger = logger;
        _processor = processor;
    }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        try
        {
            // 伪代码: 使用一个库来加载eBPF程序并监听Perf Buffer
            // var bpfLoader = new BpfLoader();
            // var perfBuffer = bpfLoader.LoadAndAttach("./bpf_program.o")
            //                             .GetPerfBuffer("tcp_payloads");
            
            _logger.LogInformation("eBPF program loaded. Listening for TDS packets...");

            // 伪代码: 轮询Perf Buffer
            // await foreach (var rawEvent in perfBuffer.PollAsync(stoppingToken))
            // {
            //     // rawEvent.Data 是一个 ReadOnlyMemory<byte>,包含了eBPF程序发送的 payload_data 结构体
            //     var payload = ParsePayloadData(rawEvent.Data);
            //     await _processor.ProcessPacketAsync(payload);
            // }

            // 模拟数据流用于演示
            while (!stoppingToken.IsCancellationRequested)
            {
                var simulatedPayload = GenerateSimulatedTdsPacket();
                await _processor.ProcessPacketAsync(simulatedPayload);
                await Task.Delay(1000, stoppingToken);
            }
        }
        catch (OperationCanceledException)
        {
            _logger.LogInformation("TDS capture service is stopping.");
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "An unhandled exception occurred in TdsCaptureService.");
        }
    }
    
    // 模拟的Payload,实际应从eBPF Perf Buffer中解析
    private PacketPayload GenerateSimulatedTdsPacket()
    {
        // 这是一个伪造的TDS包,包含一个简单的INSERT语句
        // TDS Header (8 bytes): Type=1(SQL Batch), Status=0, Length=86, SPID=0, PacketID=0, Window=0
        // Payload: "INSERT INTO [dbo].[Products] ([Name], [Price]) VALUES (N'Laptop', 1200.50)"
        byte[] tdsData = new byte[] {
            0x01, 0x00, 0x00, 0x56, 0x00, 0x00, 0x00, 0x00, 0x49, 0x00, 0x4E, 0x00, 0x53, 0x00, 0x45, 0x00,
            0x52, 0x00, 0x54, 0x00, 0x20, 0x00, 0x49, 0x00, 0x4E, 0x00, 0x54, 0x00, 0x4F, 0x00, 0x20, 0x00,
            0x5B, 0x00, 0x64, 0x00, 0x62, 0x00, 0x6F, 0x00, 0x5D, 0x00, 0x2E, 0x00, 0x5B, 0x00, 0x50, 0x00,
            0x72, 0x00, 0x6F, 0x00, 0x64, 0x00, 0x75, 0x00, 0x63, 0x00, 0x74, 0x00, 0x73, 0x00, 0x5D, 0x00,
            0x20, 0x00, 0x28, 0x00, 0x5B, 0x00, 0x4E, 0x00, 0x61, 0x00, 0x6D, 0x00, 0x65, 0x00, 0x5D, 0x00,
            0x2C, 0x00, 0x20, 0x00, 0x5B, 0x00, 0x50, 0x00, 0x72, 0x00, 0x69, 0x00, 0x63, 0x00, 0x65, 0x00,
            0x5D, 0x00, 0x29, 0x00, 0x20, 0x00, 0x56, 0x00, 0x41, 0x00, 0x4C, 0x00, 0x55, 0x00, 0x45, 0x00,
            0x53, 0x00, 0x20, 0x00, 0x28, 0x00, 0x4E, 0x00, 0x27, 0x00, 0x4C, 0x00, 0x61, 0x00, 0x70, 0x00,
            0x74, 0x00, 0x6F, 0x00, 0x70, 0x00, 0x27, 0x00, 0x2C, 0x00, 0x20, 0x00, 0x31, 0x00, 0x32, 0x00,
            0x30, 0x00, 0x30, 0x00, 0x2E, 0x00, 0x35, 0x00, 0x30, 0x00, 0x29, 0x00
        };

        return new PacketPayload(1234, 54321, 1433, 1000, 2000, new ReadOnlyMemory<byte>(tdsData));
    }
}

public record PacketPayload(uint Pid, uint Sport, uint Dport, uint Seq, uint Ack, ReadOnlyMemory<byte> Data);

2.2 TDS 协议解析器

这里的坑在于,TCP是流式协议,一个TDS消息可能被分割在多个TCP包中,或者多个TDS消息可能合并在一个TCP包里。我们需要管理每个TCP会话(由源IP、源端口、目标IP、目标端口唯一标识)的状态,并根据TDS包头中的长度字段来重组完整的TDS消息。

下面是一个简化的解析器,它只处理单个包内包含一个完整的SQL Batch消息的理想情况。

// TdsPacketProcessor.cs
using System.Buffers.Binary;
using System.Text;

public class TdsPacketProcessor
{
    private readonly ILogger<TdsPacketProcessor> _logger;
    private readonly HudiProducer _hudiProducer;
    private readonly IMetrics _metrics;

    public TdsPacketProcessor(ILogger<TdsPacketProcessor> logger, HudiProducer hudiProducer, IMetrics metrics)
    {
        _logger = logger;
        _hudiProducer = hudiProducer;
        _metrics = metrics;
    }

    public async Task ProcessPacketAsync(PacketPayload payload)
    {
        using var _ = _metrics.Measure.Timer.Time(MetricsRegistry.PacketProcessingTimer);
        _metrics.Measure.Counter.Increment(MetricsRegistry.PacketsReceived);
        
        var span = payload.Data.Span;

        // TDS Header is 8 bytes
        if (span.Length < 8) return;

        // Packet Type: 1 byte
        byte packetType = span[0];
        if (packetType != 1) // 1 = SQL Batch
        {
            _metrics.Measure.Counter.Increment(MetricsRegistry.UnsupportedPacketType);
            return;
        }

        // Status: 1 byte (0 for End of Message)
        byte status = span[1];
        bool isEom = (status & 0x01) == 0x01;

        // Length: 2 bytes, big-endian
        ushort length = BinaryPrimitives.ReadUInt16BigEndian(span.Slice(2, 2));
        
        // 实际场景中,这里的length是整个TDS包(包括头)的长度,
        // 需要根据它和isEom标志来重组跨TCP包的TDS消息。
        if (span.Length < length)
        {
            _logger.LogWarning("Incomplete TDS packet received. Reassembly required. Length in header: {length}, Actual span length: {spanLength}", length, span.Length);
            // 这里应该把不完整的包缓存起来,等待后续的包
            return;
        }
        
        // 提取SQL文本,它是UCS-2 (UTF-16LE) 编码
        var sqlTextBytes = span.Slice(8, length - 8);
        string sqlText = Encoding.Unicode.GetString(sqlTextBytes);

        _logger.LogDebug("Decoded SQL: {sql}", sqlText);
        
        // 在这里,我们需要一个SQL解析器来解析sqlText,
        // 而不是简单的字符串匹配。但为了演示,我们使用正则。
        // 一个常见的错误是依赖脆弱的字符串操作,这在生产中是不可靠的。
        var parsedStatement = ParseSimpleInsert(sqlText);

        if (parsedStatement != null)
        {
            _metrics.Measure.Counter.Increment(MetricsRegistry.StatementsParsed);
            await _hudiProducer.SendAsync(parsedStatement);
        }
        else
        {
            _metrics.Measure.Counter.Increment(MetricsRegistry.StatementsUnparsed);
        }
    }

    // 这是一个非常脆弱的解析器,仅用于演示。
    // 生产环境必须使用真正的SQL语法分析库,如 ANTLR。
    private ParsedStatement? ParseSimpleInsert(string sql)
    {
        var match = System.Text.RegularExpressions.Regex.Match(sql,
            @"INSERT INTO \[dbo\]\.\[(\w+)\] \(\[Name\], \[Price\]\) VALUES \(N'([^']*)', ([0-9\.]+)\)",
            System.Text.RegularExpressions.RegexOptions.IgnoreCase);

        if (match.Success)
        {
            var record = new Dictionary<string, object>
            {
                { "id", Guid.NewGuid().ToString() }, // Hudi需要一个主键
                { "name", match.Groups[2].Value },
                { "price", decimal.Parse(match.Groups[3].Value) },
                { "ts", DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() } // Hudi需要一个precombine字段
            };

            return new ParsedStatement(
                OperationType.Insert,
                match.Groups[1].Value,
                record
            );
        }
        return null;
    }
}

public enum OperationType { Insert, Update, Delete }
public record ParsedStatement(OperationType Op, string TableName, Dictionary<string, object> Record);

这个处理器的核心逻辑是:接收eBPF传来的PacketPayload,解析TDS头,提取SQL文本,然后(用一个简化的方式)解析出表名和数据。解析出的结构化数据ParsedStatement会交给下一环节。

第三步:写入数据湖 - Apache Hudi

Apache Hudi 是一个数据湖存储框架,它为 Parquet/ORC 等文件格式带来了数据库的能力,如原子性的UPSERT操作和增量查询。这正是我们需要的,因为源端数据库会有大量的UPDATE和DELETE操作。

我们的C#服务不会直接写Hudi文件,这太复杂了。一个务实的架构是,C#服务将解析出的ParsedStatement序列化成JSON或Avro,推送到一个消息队列(如Kafka)中。然后一个独立的Spark Streaming或Flink作业消费这些消息,并使用Hudi的DataSource API写入数据湖。

// HudiSparkWriter.scala (Spark Streaming Job)
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.Trigger
import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.config.HoodieWriteConfig

object HudiStreamWriter {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      .appName("TDS CDC to Hudi")
      // ... Spark配置 ...
      .getOrCreate()

    import spark.implicits._

    // 从Kafka读取C#服务产生的JSON数据
    val kafkaStreamDF = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "kafka:9092")
      .option("subscribe", "sql-server-cdc-topic")
      .load()

    // 解析JSON数据
    val schema = ... // 定义匹配ParsedStatement的Schema
    val dataDF = kafkaStreamDF
      .selectExpr("CAST(value AS STRING) as json")
      .select(from_json($"json", schema).as("data"))
      .select("data.*")

    // Hudi配置
    val hudiOptions = Map[String, String](
      DataSourceWriteOptions.RECORDKEY_FIELD.key -> "id", // 主键
      DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "ts", // 用于去重的precombine key
      DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "", // 不分区
      DataSourceWriteOptions.TABLE_NAME.key -> "sql_server_products",
      DataSourceWriteOptions.OPERATION.key -> DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
      HoodieWriteConfig.TBL_NAME.key -> "sql_server_products"
    )

    // 写入Hudi表
    val query = dataDF.writeStream
      .format("hudi")
      .options(hudiOptions)
      .outputMode("append")
      .trigger(Trigger.ProcessingTime("1 minute"))
      .option("checkpointLocation", "/path/to/checkpoint")
      .start("/path/to/hudi/table/sql_server_products")

    query.awaitTermination()
  }
}

这个Spark作业是整个管道的终点。它将来自Kafka的变更流应用到Hudi表中,实现了数据的准实时同步。使用Hudi的UPSERT操作,可以确保数据湖中的数据与源数据库保持一致,即使面对乱序或重复的消息。

graph TD
    A[SQL Server] -- TDS Protocol (Port 1433) --> B{Linux Kernel};
    subgraph Kernel Space
        B -- kprobe(tcp_sendmsg) --> C[eBPF Program];
    end
    C -- Perf Buffer --> D[C# TdsCaptureService];
    subgraph User Space
        D -- Parse & Reconstruct --> E{ParsedStatement};
        E -- JSON over Kafka --> F[Kafka Topic: cdc-events];
    end
    subgraph Data Platform
        F --> G[Spark Streaming Job];
        G -- Hudi DataSource API (UPSERT) --> H[(Hudi Table on S3/HDFS)];
    end
    
    subgraph Observability
        D -- Metrics (Line Protocol) --> I[InfluxDB];
    end

第四步:管道的可观测性 - InfluxDB

这样一个自定义的、多阶段的管道,如果没有良好的监控,就是个黑盒。任何一个环节出问题都可能导致数据丢失或延迟。因此,我们在C#服务中深度集成了度量(Metrics)收集。

我们使用 App.Metrics 库在代码的关键路径上埋点,并配置一个Reporter将数据定期推送到InfluxDB。

// MetricsRegistry.cs
using App.Metrics;
using App.Metrics.Counter;
using App.Metrics.Timer;

public static class MetricsRegistry
{
    public static readonly CounterOptions PacketsReceived = new CounterOptions
    {
        Name = "Packets Received",
        MeasurementUnit = Unit.Events,
        Context = "TdsCapture"
    };

    public static readonly CounterOptions StatementsParsed = new CounterOptions
    {
        Name = "Statements Parsed",
        MeasurementUnit = Unit.Events,
        Context = "TdsCapture"
    };
    
    // ... 其他Counter
    
    public static readonly TimerOptions PacketProcessingTimer = new TimerOptions
    {
        Name = "Packet Processing Time",
        MeasurementUnit = Unit.Milliseconds,
        DurationUnit = TimeUnit.Milliseconds,
        RateUnit = TimeUnit.Seconds,
        Context = "TdsCapture"
    };
}

// 在 Startup.cs 中配置
public void ConfigureServices(IServiceCollection services)
{
    // ...
    var metrics = AppMetrics.CreateDefaultBuilder()
        .Report.ToInfluxDb(options =>
        {
            options.InfluxDb.BaseUri = new Uri("http://influxdb:8086");
            options.InfluxDb.Database = "tds_capture_metrics";
            options.HttpPolicy.Timeout = TimeSpan.FromSeconds(10);
            options.FlushInterval = TimeSpan.FromSeconds(5);
        })
        .Build();
        
    services.AddMetrics(metrics);
    services.AddMetricsTrackingMiddleware();
    services.AddMetricsEndpoints();
    
    services.AddHostedService<TdsCaptureService>();
    // ...
}

通过这些度量,我们可以构建一个Grafana仪表盘,实时监控以下关键指标:

  • eBPF捕获的数据包速率: 确认底层捕获是否正常。
  • 已解析/未解析的SQL语句数: 评估TDS解析器的覆盖率和准确性。
  • 包处理延迟: 识别性能瓶颈。
  • Kafka生产者吞吐量/错误率: 监控与数据平台的连接健康状况。

这套监控体系是保障这个方案在生产环境中稳定运行的生命线。

方案的局限性与未来展望

尽管这套方案实现了对SQL Server的无侵入式CDC,但它并非银弹。首先,它无法处理加密的TDS流量。如果数据库强制要求所有连接都加密,这个基于网络嗅探的方案将完全失效。这是它最大的适用性边界。

其次,TDS协议非常复杂,一个生产级的解析器需要处理RPC调用、事务标记(Begin Tran, Commit, Rollback)、多种数据类型编码等,工作量巨大。我们实现的只是一个针对特定DML模式的简化版。一个更健壮的方案可能需要投入大量精力去完善解析器,或者寻找开源的TDS解析库。

最后,对Schema变更的处理是滞后的。当ALTER TABLE发生时,这个管道无法自动感知。需要建立额外的带外(out-of-band)机制来捕获DDL变更,并通知下游的Hudi作业调整Schema,这增加了运维的复杂性。

未来的一个优化方向可能是探索在SQL Server进程上使用用户态探针(uprobes),在数据进入网络加密层之前进行捕获。这种方式虽然侵入性比网络嗅探稍高(需要在主机上运行eBPF程序),但可以绕过加密问题,并可能获取到更结构化的执行计划信息,从而降低解析的复杂度。


  目录