我们团队的 MLOps 平台遇到了一个典型的瓶颈:训练-服务偏斜(Training-Serving Skew)。模型训练时,特征工程师在 Snowflake 上用 SQL 跑出海量的批式特征;而在线上进行实时推断时,应用服务需要从一个低延迟的数据库中获取最新特征。这两套系统的数据视图不一致,导致模型效果在生产环境中出现无法预期的衰减。问题根源在于缺乏一个统一的、自动化的机制来管理和同步线上与线下的特征数据。
初步构想是建立一个“特征同步服务”,它能感知到特征定义的变化,自动从 Snowflake 中提取数据,并将其灌入一个高性能的线上存储。但这里的魔鬼藏在细节里:如何定义和管理这些特征?如何保证同步任务的可靠性?如何让整个系统声明式地工作,而不是充斥着各种手动触发的脚本?
技术选型决策过程很直接。我们已经重度使用 Snowflake 作为数据仓库,它是我们离线特征的“事实标准”。线上存储,考虑到许多特征是基于用户近期行为的时间序列数据(如“过去5分钟点击次数”),InfluxDB 的高性能读写和时序模型非常契合。真正的关键在于控制面(Control Plane)。我们需要一个高可用的、强一致性的组件来存储特征的元数据(schema、源表、更新频率等),并作为同步任务的协调中心。使用关系型数据库来做这件事显得笨重,且无法提供我们想要的 watch 机制。最终,我们选择了 etcd。它轻量、可靠,其 watch API 完美契合了我们声明式的设计哲学:我们只需在 etcd 中声明“期望状态”(即特征定义),同步服务就会自动工作以达到该状态。
于是,一个以 etcd 为大脑,Snowflake 为数据源,InfluxDB 为在线服务的混合特征存储架构浮出水面。
一、etcd 中的特征元数据定义
一切始于元数据。我们需要在 etcd 中设计一套清晰的 Key-Value 结构来描述一个特征。Key 的设计需要有层次,便于按组或按项目进行查询。我们决定采用 /mlops/features/{feature_set_name}/{feature_name}
的路径格式。
Value 部分则更为关键,它需要包含同步服务所需的所有信息。直接用 JSON 字符串虽然简单,但在多服务、多语言环境下容易出错且效率不高。我们选择使用 Protocol Buffers (Protobuf) 来定义特征元数据的结构,这能提供强类型约束和高效的序列化。
这是我们的 feature.proto
定义:
syntax = "proto3";
package mlops.metastore;
option go_package = "github.com/your-org/feature-sync/gen/go/metastore";
// FeatureSource defines where the feature data comes from.
message FeatureSource {
// Type of the source, e.g., "snowflake".
string type = 1;
// Snowflake-specific source details.
SnowflakeSource snowflake_source = 2;
// We can add other sources like BigQuery, etc. in the future.
}
message SnowflakeSource {
// The database to connect to.
string database = 1;
// The schema to use.
string schema = 2;
// The warehouse for running the query.
string warehouse = 3;
// The SQL query to generate the feature data.
// It MUST return at least two columns: ENTITY_ID and FEATURE_VALUE,
// and an optional TIMESTAMP column.
string query = 4;
}
// Sink defines where the feature data should be written to.
message FeatureSink {
// Type of the sink, e.g., "influxdb".
string type = 1;
// InfluxDB-specific sink details.
InfluxDBSink influxdb_sink = 2;
}
message InfluxDBSink {
// InfluxDB bucket name.
string bucket = 1;
// InfluxDB measurement name.
string measurement = 2;
// The column from the source query that maps to the entity ID (InfluxDB tag).
string entity_id_column = 3;
// The column from the source query that maps to the feature value (InfluxDB field).
string value_column = 4;
// The column from the source query that maps to the timestamp. If empty, current time is used.
string timestamp_column = 5;
}
// FeatureDefinition is the core metadata object for a single feature.
message FeatureDefinition {
// Name of the feature, must be unique within a feature set.
string name = 1;
// Description of the feature.
string description = 2;
// The source of the feature data.
FeatureSource source = 3;
// The destination for the online feature data.
FeatureSink sink = 4;
// TTL for the feature in the online store, in seconds. 0 means infinite.
int64 ttl_seconds = 5;
// Synchronization schedule in cron format. e.g., "0 * * * *"
string sync_schedule = 6;
}
这个定义非常清晰:一个 FeatureDefinition
包含 source
(数据从哪来)和 sink
(数据到哪去),以及调度信息。这种设计有很好的扩展性,未来支持新的数据源或目标存储,只需在 Protobuf 中增加新的 message 类型。
二、核心组件:特征同步守护进程 (Feature Sync Daemon)
这个守护进程是整个系统的心脏,它是一个持续运行的 Go 服务。其核心逻辑是:
- 启动时,列出 (List) etcd 中
/mlops/features/
前缀下的所有特征定义。 - 为每个已存在的特征启动一个同步协程。
- 同时,持续监听 (Watch) 该前缀下的任何变化(创建、更新、删除)。
- 根据变化事件,动态地启动、停止或更新对应的同步协程。
下面是这个守护进程的核心结构和 etcd 交互部分的代码。在真实项目中,配置管理、日志、指标都是必不可少的。
// main.go
package main
import (
"context"
"log/slog"
"os"
"os/signal"
"syscall"
"github.com/your-org/feature-sync/internal/config"
"github.com/your-org/feature-sync/internal/controller"
"github.com/your-org/feature-sync/internal/logging"
"github.com/your-org/feature-sync/internal/metastore"
"github.com/your-org/feature-sync/internal/sink"
"github.com/your-org/feature-sync/internal/source"
"go.etcd.io/etcd/client/v3"
)
func main() {
ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer cancel()
logging.Init()
cfg, err := config.Load()
if err != nil {
slog.Error("failed to load configuration", "error", err)
os.Exit(1)
}
etcdCli, err := clientv3.New(clientv3.Config{
Endpoints: cfg.Etcd.Endpoints,
DialTimeout: cfg.Etcd.DialTimeout,
})
if err != nil {
slog.Error("failed to connect to etcd", "error", err)
os.Exit(1)
}
defer etcdCli.Close()
// 工厂模式创建数据源和数据汇的实例
sourceFactory, err := source.NewFactory(cfg)
if err != nil {
slog.Error("failed to create source factory", "error", err)
os.Exit(1)
}
sinkFactory, err := sink.NewFactory(cfg)
if err != nil {
slog.Error("failed to create sink factory", "error", err)
os.Exit(1)
}
store := metastore.NewEtcdStore(etcdCli)
// 控制器是核心业务逻辑
ctrl := controller.NewFeatureController(store, sourceFactory, sinkFactory)
slog.Info("starting feature sync controller")
if err := ctrl.Run(ctx); err != nil {
slog.Error("controller stopped with error", "error", err)
}
slog.Info("shutdown complete")
}
控制器 FeatureController
是逻辑的核心。它的 Run
方法会启动一个 watch 循环。
// internal/controller/controller.go
package controller
import (
"context"
"log/slog"
"sync"
metastorepb "github.com/your-org/feature-sync/gen/go/metastore"
"github.com/your-org/feature-sync/internal/metastore"
"github.com/your-org/feature-sync/internal/worker"
"go.etcd.io/etcd/client/v3"
)
const featurePrefix = "/mlops/features/"
type FeatureController struct {
store metastore.Store
sourceFactory source.Factory
sinkFactory sink.Factory
workers map[string]*worker.SyncWorker
mu sync.Mutex
}
func NewFeatureController(store metastore.Store, sf source.Factory, skf sink.Factory) *FeatureController {
return &FeatureController{
store: store,
sourceFactory: sf,
sinkFactory: skf,
workers: make(map[string]*worker.SyncWorker),
}
}
func (c *FeatureController) Run(ctx context.Context) error {
slog.Info("starting initial reconciliation of features")
// 1. 初始全量加载
initialFeatures, revision, err := c.store.ListFeatures(ctx, featurePrefix)
if err != nil {
return err
}
for key, featureDef := range initialFeatures {
c.reconcile(ctx, key, featureDef)
}
slog.Info("initial reconciliation complete, starting to watch for changes", "revision", revision)
// 2. 启动 Watch 循环,从加载后的 revision 开始
watchChan := c.store.WatchFeatures(ctx, featurePrefix, revision+1)
for {
select {
case <-ctx.Done():
slog.Info("context cancelled, stopping controller")
c.shutdownAllWorkers()
return ctx.Err()
case watchResp, ok := <-watchChan:
if !ok {
slog.Warn("etcd watch channel closed, attempting to restart watch")
// 实际生产中这里需要更复杂的重连和重试逻辑
watchChan = c.store.WatchFeatures(ctx, featurePrefix, 0) // 从最新版本重新watch
continue
}
for _, event := range watchResp.Events {
key := string(event.Kv.Key)
switch event.Type {
case clientv3.EventTypePut:
var featureDef metastorepb.FeatureDefinition
// 省略反序列化和错误处理
c.reconcile(ctx, key, &featureDef)
case clientv3.EventTypeDelete:
c.stopWorker(key)
}
}
}
}
}
// reconcile 负责处理单个特征的创建或更新
func (c *FeatureController) reconcile(ctx context.Context, key string, def *metastorepb.FeatureDefinition) {
c.mu.Lock()
defer c.mu.Unlock()
// 如果 worker 已存在, 停止旧的
if existingWorker, ok := c.workers[key]; ok {
slog.Info("feature definition updated, stopping old worker", "key", key)
existingWorker.Stop()
}
// 创建新的 worker
slog.Info("creating new worker for feature", "key", key, "schedule", def.SyncSchedule)
newWorker, err := worker.NewSyncWorker(key, def, c.sourceFactory, c.sinkFactory)
if err != nil {
slog.Error("failed to create sync worker", "key", key, "error", err)
return
}
c.workers[key] = newWorker
go newWorker.Start(ctx)
}
// ... stopWorker 和 shutdownAllWorkers 的实现
这个控制器的设计是典型的 Kubernetes Operator 模式:通过 watch 监听“期望状态”,然后通过一系列操作(创建/停止 worker)来调整“当前状态”,使其趋向于“期望状态”。
三、数据同步的实现细节:从 Snowflake 到 InfluxDB
SyncWorker
负责一个具体特征的同步逻辑。它使用 cron 解析器来按计划触发同步任务。
一次同步任务的核心流程如下:
- 连接 Snowflake:使用官方 Go driver,并确保连接池的正确配置。
- 执行查询:执行在 etcd 中定义的 SQL 查询。这里的坑在于,查询可能返回大量数据,一次性加载到内存中是不可接受的。必须使用数据库 cursor/stream 的方式逐行读取。
- 数据转换:将从 Snowflake 读取的每一行数据,转换为 InfluxDB 的 Line Protocol 格式。
measurement,tag_key=tag_value field_key=field_value timestamp
。 - 批量写入 InfluxDB:为了性能,不能每读一行就写一次 InfluxDB。必须在客户端进行批处理,比如积攒 5000 条数据或等待 1 秒钟,然后一次性通过 InfluxDB client API 发送。
- 错误处理与日志:整个过程必须有详尽的日志。如果查询失败或写入失败,需要有重试机制。一个常见的错误是数据类型不匹配,这需要在日志中明确指出问题行。
下面是一个简化的 SyncWorker
和其 sync
方法的实现:
// internal/worker/sync_worker.go
package worker
import (
"context"
"fmt"
"log/slog"
"time"
"github.com/robfig/cron/v3"
metastorepb "github.com/your-org/feature-sync/gen/go/metastore"
"github.com/your-org/feature-sync/internal/sink"
"github.com/your-org/feature-sync/internal/source"
)
type SyncWorker struct {
key string
def *metastorepb.FeatureDefinition
source source.Source
sink sink.Sink
cron *cron.Cron
stopChan chan struct{}
}
func NewSyncWorker(key string, def *metastorepb.FeatureDefinition, sf source.Factory, skf sink.Factory) (*SyncWorker, error) {
src, err := sf.Get(def.Source)
if err != nil {
return nil, fmt.Errorf("failed to get source: %w", err)
}
snk, err := skf.Get(def.Sink)
if err != nil {
return nil, fmt.Errorf("failed to get sink: %w", err)
}
return &SyncWorker{
key: key,
def: def,
source: src,
sink: snk,
stopChan: make(chan struct{}),
}, nil
}
func (w *SyncWorker) Start(ctx context.Context) {
logger := slog.With("feature_key", w.key)
logger.Info("starting worker")
w.cron = cron.New(cron.WithSeconds()) // 支持秒级精度
_, err := w.cron.AddFunc(w.def.SyncSchedule, func() {
// 避免上一次任务没跑完又开始新的
// 实际项目中会用更复杂的锁机制
logger.Info("cron triggered, starting sync job")
jobCtx, cancel := context.WithTimeout(context.Background(), 1*time.Hour) // 设置任务超时
defer cancel()
if err := w.sync(jobCtx); err != nil {
logger.Error("sync job failed", "error", err)
} else {
logger.Info("sync job completed successfully")
}
})
if err != nil {
logger.Error("failed to add cron job", "schedule", w.def.SyncSchedule, "error", err)
return
}
w.cron.Start()
<-w.stopChan
w.cron.Stop()
logger.Info("worker stopped")
}
func (w *SyncWorker) Stop() {
close(w.stopChan)
}
func (w *SyncWorker) sync(ctx context.Context) error {
logger := slog.With("feature_key", w.key)
// 1. 从数据源读取数据流
reader, err := w.source.Read(ctx, w.def.Source)
if err != nil {
return fmt.Errorf("failed to read from source: %w", err)
}
defer reader.Close()
// 2. 获取数据汇的写入器
writer, err := w.sink.Write(ctx, w.def.Sink)
if err != nil {
return fmt.Errorf("failed to get sink writer: %w", err)
}
defer writer.Close()
// 3. 流式处理
var recordsProcessed int64
for {
// reader.Next() 应该内部处理好批次读取
records, err := reader.NextBatch(ctx, 5000) // 每次处理5000条
if err != nil {
// io.EOF 表示读取完毕
return fmt.Errorf("error reading next batch: %w", err)
}
if len(records) == 0 {
break // 读取完成
}
if err := writer.WriteBatch(ctx, records); err != nil {
return fmt.Errorf("error writing batch to sink: %w", err)
}
recordsProcessed += int64(len(records))
}
logger.Info("sync finished", "records_processed", recordsProcessed)
return nil
}
这个实现暴露了一个初期的性能问题:当 Snowflake 的查询返回百万甚至千万行数据时,单协程的 sync
过程会非常漫长。读取、转换、写入是串行的。一个直接的优化是引入管道和并行化,使用一个协程池来处理数据转换和写入,而主协程只负责从 Snowflake 读取数据并分发到 channel 中。
四、架构图与工作流
整个系统的工作流程可以用下面的 Mermaid 图来清晰地表示。
graph TD subgraph Control Plane A[ML Engineer/CLI] -- kubectl apply -f feature.yaml --> B(YAML to Protobuf/etcd); B -- client.Put() --> C{etcd}; end subgraph Data Plane D[Feature Sync Daemon] -- Watch --> C; D -- On Event --> E{SyncWorker}; E -- cron schedule --> F[Read from Snowflake]; F -- SQL Query --> G[(Snowflake)]; G -- Batched Rows --> F; F -- Transformed Data --> H[Write to InfluxDB]; H -- Line Protocol Batch --> I[(InfluxDB)]; end subgraph Serving J[Inference Service] -- GetFeature(user_id) --> I; end style A fill:#d4f0f0 style J fill:#f0d4d4
当一个特征工程师想要上线一个新特征,他只需要编写一个简单的 YAML 文件(内部工具会将其转换为 Protobuf 并写入 etcd),然后提交。系统会自动完成后续所有的数据同步工作。
五、一个实际操作案例
假设我们要创建一个名为 user_last_7d_login_count
的特征。
准备 Snowflake SQL 查询 (
query.sql
):SELECT USER_ID AS ENTITY_ID, COUNT(DISTINCT TO_DATE(EVENT_TIMESTAMP)) AS FEATURE_VALUE, CURRENT_TIMESTAMP() AS TIMESTAMP FROM EVENT_LOGS.LOGIN_EVENTS WHERE EVENT_TIMESTAMP >= DATEADD(day, -7, CURRENT_DATE()) GROUP BY USER_ID;
定义特征元数据 (
feature.yaml
):name: user_last_7d_login_count description: "Count of unique days a user logged in over the last 7 days." syncSchedule: "0 2 * * *" # Every day at 2 AM ttlSeconds: 86400 # Cache for 1 day source: type: snowflake snowflakeSource: database: MY_PROD_DB schema: FEATURES warehouse: COMPUTE_WH query: | SELECT USER_ID AS ENTITY_ID, COUNT(DISTINCT TO_DATE(EVENT_TIMESTAMP)) AS FEATURE_VALUE, CURRENT_TIMESTAMP() AS TIMESTAMP FROM EVENT_LOGS.LOGIN_EVENTS WHERE EVENT_TIMESTAMP >= DATEADD(day, -7, CURRENT_DATE()) GROUP BY USER_ID; sink: type: influxdb influxdbSink: bucket: online_features measurement: user_features entityIdColumn: ENTITY_ID valueColumn: FEATURE_VALUE timestampColumn: TIMESTAMP
应用定义到 etcd:
一个内部 CLI 工具会将这个 YAML 转换为 Protobuf 序列化后的二进制,然后执行etcdctl put /mlops/features/user_profile/user_last_7d_login_count <binary_data>
。自动同步:
- Feature Sync Daemon 的 Watch 循环收到一个
PUT
事件。 -
FeatureController
创建一个新的SyncWorker
实例。 -
SyncWorker
设置一个每天凌晨2点的 cron 任务。 - 到了时间点,任务触发,从 Snowflake 拉取数据,并将其写入 InfluxDB 的
online_features
bucket 和user_features
measurement 中。
- Feature Sync Daemon 的 Watch 循环收到一个
线上服务查询:
当线上模型需要这个特征时,它会向 InfluxDB 发起一个简单的 Flux 查询:from(bucket: "online_features") |> range(start: -1d) |> filter(fn: (r) => r._measurement == "user_features" and r.ENTITY_ID == "some_user_id_123" and r._field == "FEATURE_VALUE") |> last()
这个查询的延迟通常在几毫秒内。
这套基于 etcd 控制面的系统,虽然在实现初期需要投入精力构建守护进程和数据读写适配器,但它提供了一个高度自动化、可扩展且可靠的 MLOps 基础设施组件。它将特征的管理从命令式的脚本执行,转变成了声明式的状态管理,极大地降低了特征上线的复杂度和出错率。
当前方案的一个局限性在于同步是基于定时调度的,对于需要秒级甚至毫秒级新鲜度的特征(例如“用户当前会话的点击次数”)支持不佳。未来的一个迭代方向是引入基于 CDC (Change Data Capture) 的数据源,例如监听 Snowflake 的 STREAMS
或直接从上游的 Kafka 事件流中消费数据,以实现真正的流式特征同步。此外,对于 Schema 变更的管理目前还比较简单,未来需要引入更复杂的版本控制和向后兼容性检查机制,而这些新的元数据同样可以存储在 etcd 中,由控制器进行解析和处理。