我们面临一个棘手的工程问题:需要将一个核心生产环境的 SQL Server 数据库的实时变更数据同步到一个数据湖中,但DBA团队基于稳定性和性能的考虑,明确拒绝了所有常规的CDC(Change Data Capture)方案。这意味着我们不能在数据库上启用原生的CDC功能,不能安装任何代理(Agent),更不能使用触发器或频繁轮询。数据库服务器本身是一块不容触碰的“圣地”。
唯一的交互界面是网络。所有客户端应用都通过网络与SQL Server通信,使用Tabular Data Stream (TDS)协议。这个限制反而给了我们一个清晰的方向:如果能在网络层无侵入地捕获、解析并重构出DML操作,就能绕开对数据库的所有直接干预。这套方案的技术栈最终确定为:eBPF用于高效的内核态网络包捕获,C#构建用户态的协议解析与数据分发服务,Apache Hudi作为数据湖的存储格式以支持UPSERT操作,以及InfluxDB用于监控整个管道的健康状况。
第一步:内核层的无感捕获 - eBPF
传统的网络抓包方案如 libpcap
会将大量数据包复制到用户空间,在流量大的情况下性能开销显著。eBPF (extended Berkeley Packet Filter) 则允许我们在内核中运行一个沙箱化的程序,只将我们需要的数据发送到用户空间,效率极高。
我们的目标是捕获所有目标端口为1433(SQL Server默认端口)的TCP包的载荷。这里使用一个简单的 eBPF C 程序,通过 libbpf
框架进行加载和交互。
// bpf_program.c
#include <vmlinux.h>
#include <bpf/bpf_helpers.h>
#include <bpf/bpf_tracing.h>
#include <bpf/bpf_core_read.h>
#define SQL_SERVER_PORT 1433
// 定义一个perf buffer,用于将数据发送到用户空间
struct {
__uint(type, BPF_MAP_TYPE_PERF_EVENT_ARRAY);
__uint(key_size, sizeof(u32));
__uint(value_size, sizeof(u32));
} tcp_payloads BPF_MAP_SEC(".maps");
// 定义一个结构体来承载我们要发送的数据
struct payload_data {
u32 pid;
u32 sport;
u32 dport;
u32 seq;
u32 ack;
u32 data_len;
// 动态数据长度,这里只是标记
char data[4096];
};
// 使用kprobe挂载到tcp_sendmsg内核函数
SEC("kprobe/tcp_sendmsg")
int BPF_KPROBE(trace_tcp_sendmsg, struct sock *sk, struct msghdr *msg, size_t size)
{
if (sk == NULL) {
return 0;
}
u16 dport = BPF_CORE_READ(sk, __sk_common.skc_dport);
dport = bpf_ntohs(dport);
// 只关心发往SQL Server端口的数据包
if (dport != SQL_SERVER_PORT) {
return 0;
}
struct tcp_sock *ts = (struct tcp_sock *)sk;
u32 snd_nxt = BPF_CORE_READ(ts, snd_nxt);
// 填充数据结构
struct payload_data data = {};
data.pid = bpf_get_current_pid_tgid() >> 32;
data.sport = bpf_ntohs(BPF_CORE_READ(sk, __sk_common.skc_num));
data.dport = dport;
data.seq = snd_nxt - size; // TCP sequence number for this segment
data.ack = BPF_CORE_READ(ts, rcv_nxt);
if (size == 0 || size > sizeof(data.data)) {
return 0;
}
// 从msg中读取数据。这是一个复杂的步骤,因为数据可能在多个iovec中。
// 在真实项目中,需要遍历msg->msg_iter。这里为了简化,我们只处理简单的场景。
struct iov_iter *iter = &msg->msg_iter;
if (iter->type != ITER_IOVEC) {
return 0;
}
int iov_count = BPF_CORE_READ(iter, nr_segs);
if (iov_count > 0) {
const struct iovec *iov = BPF_CORE_READ(iter, iov);
void *buf_ptr = BPF_CORE_READ(iov, iov_base);
size_t buf_len = BPF_CORE_READ(iov, iov_len);
size_t len_to_read = size < sizeof(data.data) ? size : sizeof(data.data);
len_to_read = len_to_read < buf_len ? len_to_read : buf_len;
if(len_to_read > 0) {
bpf_probe_read_user(&data.data, len_to_read, buf_ptr);
data.data_len = len_to_read;
// 将数据发送到perf buffer
bpf_perf_event_output(ctx, &tcp_payloads, BPF_F_CURRENT_CPU, &data, sizeof(data));
}
}
return 0;
}
char LICENSE[] SEC("license") = "GPL";
这个eBPF程序的核心是 trace_tcp_sendmsg
函数,它挂载在 tcp_sendmsg
这个内核函数上。每当有TCP包发送时,它会检查目标端口是否是1433。如果是,它就读取TCP包的载荷(payload)和一些元数据(PID、端口、TCP序列号),然后通过 bpf_perf_event_output
将这些信息推送到一个名为 tcp_payloads
的perf buffer中,等待用户态程序来消费。一个现实的考量是,这个实现只处理了客户端发往服务器的数据,一个完整的方案还需要捕获服务器的响应。
第二步:用户态的协议解析 - C# 服务
捕获原始的TCP载荷只是第一步,真正的挑战在于解析TDS协议。TDS是一个复杂的二进制协议。我们在用户空间构建一个 .NET Core 的后台服务(BackgroundService)来完成这项工作。
2.1 C# 与 eBPF Perf Buffer 交互
我们使用一个.NET的libbpf封装库(例如 libbpf-tools
的.NET绑定)来读取perf buffer。服务启动时,它会加载编译好的eBPF程序(bpf_program.o
)并监听 tcp_payloads
这个perf buffer。
// TdsCaptureService.cs
public class TdsCaptureService : BackgroundService
{
private readonly ILogger<TdsCaptureService> _logger;
private readonly TdsPacketProcessor _processor;
private readonly CancellationTokenSource _cts = new();
public TdsCaptureService(ILogger<TdsCaptureService> logger, TdsPacketProcessor processor)
{
_logger = logger;
_processor = processor;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
try
{
// 伪代码: 使用一个库来加载eBPF程序并监听Perf Buffer
// var bpfLoader = new BpfLoader();
// var perfBuffer = bpfLoader.LoadAndAttach("./bpf_program.o")
// .GetPerfBuffer("tcp_payloads");
_logger.LogInformation("eBPF program loaded. Listening for TDS packets...");
// 伪代码: 轮询Perf Buffer
// await foreach (var rawEvent in perfBuffer.PollAsync(stoppingToken))
// {
// // rawEvent.Data 是一个 ReadOnlyMemory<byte>,包含了eBPF程序发送的 payload_data 结构体
// var payload = ParsePayloadData(rawEvent.Data);
// await _processor.ProcessPacketAsync(payload);
// }
// 模拟数据流用于演示
while (!stoppingToken.IsCancellationRequested)
{
var simulatedPayload = GenerateSimulatedTdsPacket();
await _processor.ProcessPacketAsync(simulatedPayload);
await Task.Delay(1000, stoppingToken);
}
}
catch (OperationCanceledException)
{
_logger.LogInformation("TDS capture service is stopping.");
}
catch (Exception ex)
{
_logger.LogError(ex, "An unhandled exception occurred in TdsCaptureService.");
}
}
// 模拟的Payload,实际应从eBPF Perf Buffer中解析
private PacketPayload GenerateSimulatedTdsPacket()
{
// 这是一个伪造的TDS包,包含一个简单的INSERT语句
// TDS Header (8 bytes): Type=1(SQL Batch), Status=0, Length=86, SPID=0, PacketID=0, Window=0
// Payload: "INSERT INTO [dbo].[Products] ([Name], [Price]) VALUES (N'Laptop', 1200.50)"
byte[] tdsData = new byte[] {
0x01, 0x00, 0x00, 0x56, 0x00, 0x00, 0x00, 0x00, 0x49, 0x00, 0x4E, 0x00, 0x53, 0x00, 0x45, 0x00,
0x52, 0x00, 0x54, 0x00, 0x20, 0x00, 0x49, 0x00, 0x4E, 0x00, 0x54, 0x00, 0x4F, 0x00, 0x20, 0x00,
0x5B, 0x00, 0x64, 0x00, 0x62, 0x00, 0x6F, 0x00, 0x5D, 0x00, 0x2E, 0x00, 0x5B, 0x00, 0x50, 0x00,
0x72, 0x00, 0x6F, 0x00, 0x64, 0x00, 0x75, 0x00, 0x63, 0x00, 0x74, 0x00, 0x73, 0x00, 0x5D, 0x00,
0x20, 0x00, 0x28, 0x00, 0x5B, 0x00, 0x4E, 0x00, 0x61, 0x00, 0x6D, 0x00, 0x65, 0x00, 0x5D, 0x00,
0x2C, 0x00, 0x20, 0x00, 0x5B, 0x00, 0x50, 0x00, 0x72, 0x00, 0x69, 0x00, 0x63, 0x00, 0x65, 0x00,
0x5D, 0x00, 0x29, 0x00, 0x20, 0x00, 0x56, 0x00, 0x41, 0x00, 0x4C, 0x00, 0x55, 0x00, 0x45, 0x00,
0x53, 0x00, 0x20, 0x00, 0x28, 0x00, 0x4E, 0x00, 0x27, 0x00, 0x4C, 0x00, 0x61, 0x00, 0x70, 0x00,
0x74, 0x00, 0x6F, 0x00, 0x70, 0x00, 0x27, 0x00, 0x2C, 0x00, 0x20, 0x00, 0x31, 0x00, 0x32, 0x00,
0x30, 0x00, 0x30, 0x00, 0x2E, 0x00, 0x35, 0x00, 0x30, 0x00, 0x29, 0x00
};
return new PacketPayload(1234, 54321, 1433, 1000, 2000, new ReadOnlyMemory<byte>(tdsData));
}
}
public record PacketPayload(uint Pid, uint Sport, uint Dport, uint Seq, uint Ack, ReadOnlyMemory<byte> Data);
2.2 TDS 协议解析器
这里的坑在于,TCP是流式协议,一个TDS消息可能被分割在多个TCP包中,或者多个TDS消息可能合并在一个TCP包里。我们需要管理每个TCP会话(由源IP、源端口、目标IP、目标端口唯一标识)的状态,并根据TDS包头中的长度字段来重组完整的TDS消息。
下面是一个简化的解析器,它只处理单个包内包含一个完整的SQL Batch消息的理想情况。
// TdsPacketProcessor.cs
using System.Buffers.Binary;
using System.Text;
public class TdsPacketProcessor
{
private readonly ILogger<TdsPacketProcessor> _logger;
private readonly HudiProducer _hudiProducer;
private readonly IMetrics _metrics;
public TdsPacketProcessor(ILogger<TdsPacketProcessor> logger, HudiProducer hudiProducer, IMetrics metrics)
{
_logger = logger;
_hudiProducer = hudiProducer;
_metrics = metrics;
}
public async Task ProcessPacketAsync(PacketPayload payload)
{
using var _ = _metrics.Measure.Timer.Time(MetricsRegistry.PacketProcessingTimer);
_metrics.Measure.Counter.Increment(MetricsRegistry.PacketsReceived);
var span = payload.Data.Span;
// TDS Header is 8 bytes
if (span.Length < 8) return;
// Packet Type: 1 byte
byte packetType = span[0];
if (packetType != 1) // 1 = SQL Batch
{
_metrics.Measure.Counter.Increment(MetricsRegistry.UnsupportedPacketType);
return;
}
// Status: 1 byte (0 for End of Message)
byte status = span[1];
bool isEom = (status & 0x01) == 0x01;
// Length: 2 bytes, big-endian
ushort length = BinaryPrimitives.ReadUInt16BigEndian(span.Slice(2, 2));
// 实际场景中,这里的length是整个TDS包(包括头)的长度,
// 需要根据它和isEom标志来重组跨TCP包的TDS消息。
if (span.Length < length)
{
_logger.LogWarning("Incomplete TDS packet received. Reassembly required. Length in header: {length}, Actual span length: {spanLength}", length, span.Length);
// 这里应该把不完整的包缓存起来,等待后续的包
return;
}
// 提取SQL文本,它是UCS-2 (UTF-16LE) 编码
var sqlTextBytes = span.Slice(8, length - 8);
string sqlText = Encoding.Unicode.GetString(sqlTextBytes);
_logger.LogDebug("Decoded SQL: {sql}", sqlText);
// 在这里,我们需要一个SQL解析器来解析sqlText,
// 而不是简单的字符串匹配。但为了演示,我们使用正则。
// 一个常见的错误是依赖脆弱的字符串操作,这在生产中是不可靠的。
var parsedStatement = ParseSimpleInsert(sqlText);
if (parsedStatement != null)
{
_metrics.Measure.Counter.Increment(MetricsRegistry.StatementsParsed);
await _hudiProducer.SendAsync(parsedStatement);
}
else
{
_metrics.Measure.Counter.Increment(MetricsRegistry.StatementsUnparsed);
}
}
// 这是一个非常脆弱的解析器,仅用于演示。
// 生产环境必须使用真正的SQL语法分析库,如 ANTLR。
private ParsedStatement? ParseSimpleInsert(string sql)
{
var match = System.Text.RegularExpressions.Regex.Match(sql,
@"INSERT INTO \[dbo\]\.\[(\w+)\] \(\[Name\], \[Price\]\) VALUES \(N'([^']*)', ([0-9\.]+)\)",
System.Text.RegularExpressions.RegexOptions.IgnoreCase);
if (match.Success)
{
var record = new Dictionary<string, object>
{
{ "id", Guid.NewGuid().ToString() }, // Hudi需要一个主键
{ "name", match.Groups[2].Value },
{ "price", decimal.Parse(match.Groups[3].Value) },
{ "ts", DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() } // Hudi需要一个precombine字段
};
return new ParsedStatement(
OperationType.Insert,
match.Groups[1].Value,
record
);
}
return null;
}
}
public enum OperationType { Insert, Update, Delete }
public record ParsedStatement(OperationType Op, string TableName, Dictionary<string, object> Record);
这个处理器的核心逻辑是:接收eBPF传来的PacketPayload
,解析TDS头,提取SQL文本,然后(用一个简化的方式)解析出表名和数据。解析出的结构化数据ParsedStatement
会交给下一环节。
第三步:写入数据湖 - Apache Hudi
Apache Hudi 是一个数据湖存储框架,它为 Parquet/ORC 等文件格式带来了数据库的能力,如原子性的UPSERT操作和增量查询。这正是我们需要的,因为源端数据库会有大量的UPDATE和DELETE操作。
我们的C#服务不会直接写Hudi文件,这太复杂了。一个务实的架构是,C#服务将解析出的ParsedStatement
序列化成JSON或Avro,推送到一个消息队列(如Kafka)中。然后一个独立的Spark Streaming或Flink作业消费这些消息,并使用Hudi的DataSource API写入数据湖。
// HudiSparkWriter.scala (Spark Streaming Job)
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.Trigger
import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.config.HoodieWriteConfig
object HudiStreamWriter {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName("TDS CDC to Hudi")
// ... Spark配置 ...
.getOrCreate()
import spark.implicits._
// 从Kafka读取C#服务产生的JSON数据
val kafkaStreamDF = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "kafka:9092")
.option("subscribe", "sql-server-cdc-topic")
.load()
// 解析JSON数据
val schema = ... // 定义匹配ParsedStatement的Schema
val dataDF = kafkaStreamDF
.selectExpr("CAST(value AS STRING) as json")
.select(from_json($"json", schema).as("data"))
.select("data.*")
// Hudi配置
val hudiOptions = Map[String, String](
DataSourceWriteOptions.RECORDKEY_FIELD.key -> "id", // 主键
DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "ts", // 用于去重的precombine key
DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "", // 不分区
DataSourceWriteOptions.TABLE_NAME.key -> "sql_server_products",
DataSourceWriteOptions.OPERATION.key -> DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
HoodieWriteConfig.TBL_NAME.key -> "sql_server_products"
)
// 写入Hudi表
val query = dataDF.writeStream
.format("hudi")
.options(hudiOptions)
.outputMode("append")
.trigger(Trigger.ProcessingTime("1 minute"))
.option("checkpointLocation", "/path/to/checkpoint")
.start("/path/to/hudi/table/sql_server_products")
query.awaitTermination()
}
}
这个Spark作业是整个管道的终点。它将来自Kafka的变更流应用到Hudi表中,实现了数据的准实时同步。使用Hudi的UPSERT
操作,可以确保数据湖中的数据与源数据库保持一致,即使面对乱序或重复的消息。
graph TD A[SQL Server] -- TDS Protocol (Port 1433) --> B{Linux Kernel}; subgraph Kernel Space B -- kprobe(tcp_sendmsg) --> C[eBPF Program]; end C -- Perf Buffer --> D[C# TdsCaptureService]; subgraph User Space D -- Parse & Reconstruct --> E{ParsedStatement}; E -- JSON over Kafka --> F[Kafka Topic: cdc-events]; end subgraph Data Platform F --> G[Spark Streaming Job]; G -- Hudi DataSource API (UPSERT) --> H[(Hudi Table on S3/HDFS)]; end subgraph Observability D -- Metrics (Line Protocol) --> I[InfluxDB]; end
第四步:管道的可观测性 - InfluxDB
这样一个自定义的、多阶段的管道,如果没有良好的监控,就是个黑盒。任何一个环节出问题都可能导致数据丢失或延迟。因此,我们在C#服务中深度集成了度量(Metrics)收集。
我们使用 App.Metrics
库在代码的关键路径上埋点,并配置一个Reporter将数据定期推送到InfluxDB。
// MetricsRegistry.cs
using App.Metrics;
using App.Metrics.Counter;
using App.Metrics.Timer;
public static class MetricsRegistry
{
public static readonly CounterOptions PacketsReceived = new CounterOptions
{
Name = "Packets Received",
MeasurementUnit = Unit.Events,
Context = "TdsCapture"
};
public static readonly CounterOptions StatementsParsed = new CounterOptions
{
Name = "Statements Parsed",
MeasurementUnit = Unit.Events,
Context = "TdsCapture"
};
// ... 其他Counter
public static readonly TimerOptions PacketProcessingTimer = new TimerOptions
{
Name = "Packet Processing Time",
MeasurementUnit = Unit.Milliseconds,
DurationUnit = TimeUnit.Milliseconds,
RateUnit = TimeUnit.Seconds,
Context = "TdsCapture"
};
}
// 在 Startup.cs 中配置
public void ConfigureServices(IServiceCollection services)
{
// ...
var metrics = AppMetrics.CreateDefaultBuilder()
.Report.ToInfluxDb(options =>
{
options.InfluxDb.BaseUri = new Uri("http://influxdb:8086");
options.InfluxDb.Database = "tds_capture_metrics";
options.HttpPolicy.Timeout = TimeSpan.FromSeconds(10);
options.FlushInterval = TimeSpan.FromSeconds(5);
})
.Build();
services.AddMetrics(metrics);
services.AddMetricsTrackingMiddleware();
services.AddMetricsEndpoints();
services.AddHostedService<TdsCaptureService>();
// ...
}
通过这些度量,我们可以构建一个Grafana仪表盘,实时监控以下关键指标:
- eBPF捕获的数据包速率: 确认底层捕获是否正常。
- 已解析/未解析的SQL语句数: 评估TDS解析器的覆盖率和准确性。
- 包处理延迟: 识别性能瓶颈。
- Kafka生产者吞吐量/错误率: 监控与数据平台的连接健康状况。
这套监控体系是保障这个方案在生产环境中稳定运行的生命线。
方案的局限性与未来展望
尽管这套方案实现了对SQL Server的无侵入式CDC,但它并非银弹。首先,它无法处理加密的TDS流量。如果数据库强制要求所有连接都加密,这个基于网络嗅探的方案将完全失效。这是它最大的适用性边界。
其次,TDS协议非常复杂,一个生产级的解析器需要处理RPC调用、事务标记(Begin Tran, Commit, Rollback)、多种数据类型编码等,工作量巨大。我们实现的只是一个针对特定DML模式的简化版。一个更健壮的方案可能需要投入大量精力去完善解析器,或者寻找开源的TDS解析库。
最后,对Schema变更的处理是滞后的。当ALTER TABLE
发生时,这个管道无法自动感知。需要建立额外的带外(out-of-band)机制来捕获DDL变更,并通知下游的Hudi作业调整Schema,这增加了运维的复杂性。
未来的一个优化方向可能是探索在SQL Server进程上使用用户态探针(uprobes),在数据进入网络加密层之前进行捕获。这种方式虽然侵入性比网络嗅探稍高(需要在主机上运行eBPF程序),但可以绕过加密问题,并可能获取到更结构化的执行计划信息,从而降低解析的复杂度。