构建基于 Snowflake 与 InfluxDB 的混合 MLOps 特征存储的 etcd 控制面


我们团队的 MLOps 平台遇到了一个典型的瓶颈:训练-服务偏斜(Training-Serving Skew)。模型训练时,特征工程师在 Snowflake 上用 SQL 跑出海量的批式特征;而在线上进行实时推断时,应用服务需要从一个低延迟的数据库中获取最新特征。这两套系统的数据视图不一致,导致模型效果在生产环境中出现无法预期的衰减。问题根源在于缺乏一个统一的、自动化的机制来管理和同步线上与线下的特征数据。

初步构想是建立一个“特征同步服务”,它能感知到特征定义的变化,自动从 Snowflake 中提取数据,并将其灌入一个高性能的线上存储。但这里的魔鬼藏在细节里:如何定义和管理这些特征?如何保证同步任务的可靠性?如何让整个系统声明式地工作,而不是充斥着各种手动触发的脚本?

技术选型决策过程很直接。我们已经重度使用 Snowflake 作为数据仓库,它是我们离线特征的“事实标准”。线上存储,考虑到许多特征是基于用户近期行为的时间序列数据(如“过去5分钟点击次数”),InfluxDB 的高性能读写和时序模型非常契合。真正的关键在于控制面(Control Plane)。我们需要一个高可用的、强一致性的组件来存储特征的元数据(schema、源表、更新频率等),并作为同步任务的协调中心。使用关系型数据库来做这件事显得笨重,且无法提供我们想要的 watch 机制。最终,我们选择了 etcd。它轻量、可靠,其 watch API 完美契合了我们声明式的设计哲学:我们只需在 etcd 中声明“期望状态”(即特征定义),同步服务就会自动工作以达到该状态。

于是,一个以 etcd 为大脑,Snowflake 为数据源,InfluxDB 为在线服务的混合特征存储架构浮出水面。

一、etcd 中的特征元数据定义

一切始于元数据。我们需要在 etcd 中设计一套清晰的 Key-Value 结构来描述一个特征。Key 的设计需要有层次,便于按组或按项目进行查询。我们决定采用 /mlops/features/{feature_set_name}/{feature_name} 的路径格式。

Value 部分则更为关键,它需要包含同步服务所需的所有信息。直接用 JSON 字符串虽然简单,但在多服务、多语言环境下容易出错且效率不高。我们选择使用 Protocol Buffers (Protobuf) 来定义特征元数据的结构,这能提供强类型约束和高效的序列化。

这是我们的 feature.proto 定义:

syntax = "proto3";

package mlops.metastore;

option go_package = "github.com/your-org/feature-sync/gen/go/metastore";

// FeatureSource defines where the feature data comes from.
message FeatureSource {
  // Type of the source, e.g., "snowflake".
  string type = 1;
  // Snowflake-specific source details.
  SnowflakeSource snowflake_source = 2;
  // We can add other sources like BigQuery, etc. in the future.
}

message SnowflakeSource {
  // The database to connect to.
  string database = 1;
  // The schema to use.
  string schema = 2;
  // The warehouse for running the query.
  string warehouse = 3;
  // The SQL query to generate the feature data.
  // It MUST return at least two columns: ENTITY_ID and FEATURE_VALUE,
  // and an optional TIMESTAMP column.
  string query = 4;
}

// Sink defines where the feature data should be written to.
message FeatureSink {
  // Type of the sink, e.g., "influxdb".
  string type = 1;
  // InfluxDB-specific sink details.
  InfluxDBSink influxdb_sink = 2;
}

message InfluxDBSink {
  // InfluxDB bucket name.
  string bucket = 1;
  // InfluxDB measurement name.
  string measurement = 2;
  // The column from the source query that maps to the entity ID (InfluxDB tag).
  string entity_id_column = 3;
  // The column from the source query that maps to the feature value (InfluxDB field).
  string value_column = 4;
  // The column from the source query that maps to the timestamp. If empty, current time is used.
  string timestamp_column = 5;
}

// FeatureDefinition is the core metadata object for a single feature.
message FeatureDefinition {
  // Name of the feature, must be unique within a feature set.
  string name = 1;
  // Description of the feature.
  string description = 2;
  // The source of the feature data.
  FeatureSource source = 3;
  // The destination for the online feature data.
  FeatureSink sink = 4;
  // TTL for the feature in the online store, in seconds. 0 means infinite.
  int64 ttl_seconds = 5;
  // Synchronization schedule in cron format. e.g., "0 * * * *"
  string sync_schedule = 6;
}

这个定义非常清晰:一个 FeatureDefinition 包含 source(数据从哪来)和 sink(数据到哪去),以及调度信息。这种设计有很好的扩展性,未来支持新的数据源或目标存储,只需在 Protobuf 中增加新的 message 类型。

二、核心组件:特征同步守护进程 (Feature Sync Daemon)

这个守护进程是整个系统的心脏,它是一个持续运行的 Go 服务。其核心逻辑是:

  1. 启动时,列出 (List) etcd 中 /mlops/features/ 前缀下的所有特征定义。
  2. 为每个已存在的特征启动一个同步协程。
  3. 同时,持续监听 (Watch) 该前缀下的任何变化(创建、更新、删除)。
  4. 根据变化事件,动态地启动、停止或更新对应的同步协程。

下面是这个守护进程的核心结构和 etcd 交互部分的代码。在真实项目中,配置管理、日志、指标都是必不可少的。

// main.go
package main

import (
	"context"
	"log/slog"
	"os"
	"os/signal"
	"syscall"

	"github.com/your-org/feature-sync/internal/config"
	"github.com/your-org/feature-sync/internal/controller"
	"github.com/your-org/feature-sync/internal/logging"
	"github.com/your-org/feature-sync/internal/metastore"
	"github.com/your-org/feature-sync/internal/sink"
	"github.com/your-org/feature-sync/internal/source"
	
	"go.etcd.io/etcd/client/v3"
)

func main() {
	ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
	defer cancel()

	logging.Init()

	cfg, err := config.Load()
	if err != nil {
		slog.Error("failed to load configuration", "error", err)
		os.Exit(1)
	}

	etcdCli, err := clientv3.New(clientv3.Config{
		Endpoints:   cfg.Etcd.Endpoints,
		DialTimeout: cfg.Etcd.DialTimeout,
	})
	if err != nil {
		slog.Error("failed to connect to etcd", "error", err)
		os.Exit(1)
	}
	defer etcdCli.Close()

	// 工厂模式创建数据源和数据汇的实例
	sourceFactory, err := source.NewFactory(cfg)
	if err != nil {
		slog.Error("failed to create source factory", "error", err)
		os.Exit(1)
	}
	sinkFactory, err := sink.NewFactory(cfg)
	if err != nil {
		slog.Error("failed to create sink factory", "error", err)
		os.Exit(1)
	}

	store := metastore.NewEtcdStore(etcdCli)
	
	// 控制器是核心业务逻辑
	ctrl := controller.NewFeatureController(store, sourceFactory, sinkFactory)

	slog.Info("starting feature sync controller")
	if err := ctrl.Run(ctx); err != nil {
		slog.Error("controller stopped with error", "error", err)
	}

	slog.Info("shutdown complete")
}

控制器 FeatureController 是逻辑的核心。它的 Run 方法会启动一个 watch 循环。

// internal/controller/controller.go
package controller

import (
	"context"
	"log/slog"
	"sync"
	
	metastorepb "github.com/your-org/feature-sync/gen/go/metastore"
	"github.com/your-org/feature-sync/internal/metastore"
	"github.com/your-org/feature-sync/internal/worker"
	
	"go.etcd.io/etcd/client/v3"
)

const featurePrefix = "/mlops/features/"

type FeatureController struct {
	store         metastore.Store
	sourceFactory source.Factory
	sinkFactory   sink.Factory
	workers       map[string]*worker.SyncWorker
	mu            sync.Mutex
}

func NewFeatureController(store metastore.Store, sf source.Factory, skf sink.Factory) *FeatureController {
	return &FeatureController{
		store:         store,
		sourceFactory: sf,
		sinkFactory:   skf,
		workers:       make(map[string]*worker.SyncWorker),
	}
}

func (c *FeatureController) Run(ctx context.Context) error {
	slog.Info("starting initial reconciliation of features")
	
	// 1. 初始全量加载
	initialFeatures, revision, err := c.store.ListFeatures(ctx, featurePrefix)
	if err != nil {
		return err
	}
	for key, featureDef := range initialFeatures {
		c.reconcile(ctx, key, featureDef)
	}
	
	slog.Info("initial reconciliation complete, starting to watch for changes", "revision", revision)

	// 2. 启动 Watch 循环,从加载后的 revision 开始
	watchChan := c.store.WatchFeatures(ctx, featurePrefix, revision+1)

	for {
		select {
		case <-ctx.Done():
			slog.Info("context cancelled, stopping controller")
			c.shutdownAllWorkers()
			return ctx.Err()
		case watchResp, ok := <-watchChan:
			if !ok {
				slog.Warn("etcd watch channel closed, attempting to restart watch")
				// 实际生产中这里需要更复杂的重连和重试逻辑
				watchChan = c.store.WatchFeatures(ctx, featurePrefix, 0) // 从最新版本重新watch
				continue
			}
			for _, event := range watchResp.Events {
				key := string(event.Kv.Key)
				switch event.Type {
				case clientv3.EventTypePut:
					var featureDef metastorepb.FeatureDefinition
					// 省略反序列化和错误处理
					c.reconcile(ctx, key, &featureDef)
				case clientv3.EventTypeDelete:
					c.stopWorker(key)
				}
			}
		}
	}
}

// reconcile 负责处理单个特征的创建或更新
func (c *FeatureController) reconcile(ctx context.Context, key string, def *metastorepb.FeatureDefinition) {
	c.mu.Lock()
	defer c.mu.Unlock()

	// 如果 worker 已存在, 停止旧的
	if existingWorker, ok := c.workers[key]; ok {
		slog.Info("feature definition updated, stopping old worker", "key", key)
		existingWorker.Stop()
	}

	// 创建新的 worker
	slog.Info("creating new worker for feature", "key", key, "schedule", def.SyncSchedule)
	newWorker, err := worker.NewSyncWorker(key, def, c.sourceFactory, c.sinkFactory)
	if err != nil {
		slog.Error("failed to create sync worker", "key", key, "error", err)
		return
	}
	
	c.workers[key] = newWorker
	go newWorker.Start(ctx)
}

// ... stopWorker 和 shutdownAllWorkers 的实现

这个控制器的设计是典型的 Kubernetes Operator 模式:通过 watch 监听“期望状态”,然后通过一系列操作(创建/停止 worker)来调整“当前状态”,使其趋向于“期望状态”。

三、数据同步的实现细节:从 Snowflake 到 InfluxDB

SyncWorker 负责一个具体特征的同步逻辑。它使用 cron 解析器来按计划触发同步任务。

一次同步任务的核心流程如下:

  1. 连接 Snowflake:使用官方 Go driver,并确保连接池的正确配置。
  2. 执行查询:执行在 etcd 中定义的 SQL 查询。这里的坑在于,查询可能返回大量数据,一次性加载到内存中是不可接受的。必须使用数据库 cursor/stream 的方式逐行读取。
  3. 数据转换:将从 Snowflake 读取的每一行数据,转换为 InfluxDB 的 Line Protocol 格式。measurement,tag_key=tag_value field_key=field_value timestamp
  4. 批量写入 InfluxDB:为了性能,不能每读一行就写一次 InfluxDB。必须在客户端进行批处理,比如积攒 5000 条数据或等待 1 秒钟,然后一次性通过 InfluxDB client API 发送。
  5. 错误处理与日志:整个过程必须有详尽的日志。如果查询失败或写入失败,需要有重试机制。一个常见的错误是数据类型不匹配,这需要在日志中明确指出问题行。

下面是一个简化的 SyncWorker 和其 sync 方法的实现:

// internal/worker/sync_worker.go
package worker

import (
	"context"
	"fmt"
	"log/slog"
	"time"

	"github.com/robfig/cron/v3"
	
	metastorepb "github.com/your-org/feature-sync/gen/go/metastore"
	"github.com/your-org/feature-sync/internal/sink"
	"github.com/your-org/feature-sync/internal/source"
)

type SyncWorker struct {
	key      string
	def      *metastorepb.FeatureDefinition
	source   source.Source
	sink     sink.Sink
	cron     *cron.Cron
	stopChan chan struct{}
}

func NewSyncWorker(key string, def *metastorepb.FeatureDefinition, sf source.Factory, skf sink.Factory) (*SyncWorker, error) {
	src, err := sf.Get(def.Source)
	if err != nil {
		return nil, fmt.Errorf("failed to get source: %w", err)
	}
	snk, err := skf.Get(def.Sink)
	if err != nil {
		return nil, fmt.Errorf("failed to get sink: %w", err)
	}

	return &SyncWorker{
		key:      key,
		def:      def,
		source:   src,
		sink:     snk,
		stopChan: make(chan struct{}),
	}, nil
}

func (w *SyncWorker) Start(ctx context.Context) {
	logger := slog.With("feature_key", w.key)
	logger.Info("starting worker")

	w.cron = cron.New(cron.WithSeconds()) // 支持秒级精度
	_, err := w.cron.AddFunc(w.def.SyncSchedule, func() {
		// 避免上一次任务没跑完又开始新的
		// 实际项目中会用更复杂的锁机制
		logger.Info("cron triggered, starting sync job")
		jobCtx, cancel := context.WithTimeout(context.Background(), 1*time.Hour) // 设置任务超时
		defer cancel()
		if err := w.sync(jobCtx); err != nil {
			logger.Error("sync job failed", "error", err)
		} else {
			logger.Info("sync job completed successfully")
		}
	})

	if err != nil {
		logger.Error("failed to add cron job", "schedule", w.def.SyncSchedule, "error", err)
		return
	}
	
	w.cron.Start()
	
	<-w.stopChan
	w.cron.Stop()
	logger.Info("worker stopped")
}

func (w *SyncWorker) Stop() {
	close(w.stopChan)
}

func (w *SyncWorker) sync(ctx context.Context) error {
	logger := slog.With("feature_key", w.key)
	
	// 1. 从数据源读取数据流
	reader, err := w.source.Read(ctx, w.def.Source)
	if err != nil {
		return fmt.Errorf("failed to read from source: %w", err)
	}
	defer reader.Close()

	// 2. 获取数据汇的写入器
	writer, err := w.sink.Write(ctx, w.def.Sink)
	if err != nil {
		return fmt.Errorf("failed to get sink writer: %w", err)
	}
	defer writer.Close()

	// 3. 流式处理
	var recordsProcessed int64
	for {
		// reader.Next() 应该内部处理好批次读取
		records, err := reader.NextBatch(ctx, 5000) // 每次处理5000条
		if err != nil {
			// io.EOF 表示读取完毕
			return fmt.Errorf("error reading next batch: %w", err)
		}
		if len(records) == 0 {
			break // 读取完成
		}
		
		if err := writer.WriteBatch(ctx, records); err != nil {
			return fmt.Errorf("error writing batch to sink: %w", err)
		}
		recordsProcessed += int64(len(records))
	}

	logger.Info("sync finished", "records_processed", recordsProcessed)
	return nil
}

这个实现暴露了一个初期的性能问题:当 Snowflake 的查询返回百万甚至千万行数据时,单协程的 sync 过程会非常漫长。读取、转换、写入是串行的。一个直接的优化是引入管道和并行化,使用一个协程池来处理数据转换和写入,而主协程只负责从 Snowflake 读取数据并分发到 channel 中。

四、架构图与工作流

整个系统的工作流程可以用下面的 Mermaid 图来清晰地表示。

graph TD
    subgraph Control Plane
        A[ML Engineer/CLI] -- kubectl apply -f feature.yaml --> B(YAML to Protobuf/etcd);
        B -- client.Put() --> C{etcd};
    end
    
    subgraph Data Plane
        D[Feature Sync Daemon] -- Watch --> C;
        D -- On Event --> E{SyncWorker};
        E -- cron schedule --> F[Read from Snowflake];
        F -- SQL Query --> G[(Snowflake)];
        G -- Batched Rows --> F;
        F -- Transformed Data --> H[Write to InfluxDB];
        H -- Line Protocol Batch --> I[(InfluxDB)];
    end

    subgraph Serving
        J[Inference Service] -- GetFeature(user_id) --> I;
    end

    style A fill:#d4f0f0
    style J fill:#f0d4d4

当一个特征工程师想要上线一个新特征,他只需要编写一个简单的 YAML 文件(内部工具会将其转换为 Protobuf 并写入 etcd),然后提交。系统会自动完成后续所有的数据同步工作。

五、一个实际操作案例

假设我们要创建一个名为 user_last_7d_login_count 的特征。

  1. 准备 Snowflake SQL 查询 (query.sql):

    SELECT
        USER_ID AS ENTITY_ID,
        COUNT(DISTINCT TO_DATE(EVENT_TIMESTAMP)) AS FEATURE_VALUE,
        CURRENT_TIMESTAMP() AS TIMESTAMP
    FROM
        EVENT_LOGS.LOGIN_EVENTS
    WHERE
        EVENT_TIMESTAMP >= DATEADD(day, -7, CURRENT_DATE())
    GROUP BY
        USER_ID;
  2. 定义特征元数据 (feature.yaml):

    name: user_last_7d_login_count
    description: "Count of unique days a user logged in over the last 7 days."
    syncSchedule: "0 2 * * *" # Every day at 2 AM
    ttlSeconds: 86400 # Cache for 1 day
    source:
      type: snowflake
      snowflakeSource:
        database: MY_PROD_DB
        schema: FEATURES
        warehouse: COMPUTE_WH
        query: |
          SELECT
              USER_ID AS ENTITY_ID,
              COUNT(DISTINCT TO_DATE(EVENT_TIMESTAMP)) AS FEATURE_VALUE,
              CURRENT_TIMESTAMP() AS TIMESTAMP
          FROM
              EVENT_LOGS.LOGIN_EVENTS
          WHERE
              EVENT_TIMESTAMP >= DATEADD(day, -7, CURRENT_DATE())
          GROUP BY
              USER_ID;
    sink:
      type: influxdb
      influxdbSink:
        bucket: online_features
        measurement: user_features
        entityIdColumn: ENTITY_ID
        valueColumn: FEATURE_VALUE
        timestampColumn: TIMESTAMP
  3. 应用定义到 etcd:
    一个内部 CLI 工具会将这个 YAML 转换为 Protobuf 序列化后的二进制,然后执行 etcdctl put /mlops/features/user_profile/user_last_7d_login_count <binary_data>

  4. 自动同步:

    • Feature Sync Daemon 的 Watch 循环收到一个 PUT 事件。
    • FeatureController 创建一个新的 SyncWorker 实例。
    • SyncWorker 设置一个每天凌晨2点的 cron 任务。
    • 到了时间点,任务触发,从 Snowflake 拉取数据,并将其写入 InfluxDB 的 online_features bucket 和 user_features measurement 中。
  5. 线上服务查询:
    当线上模型需要这个特征时,它会向 InfluxDB 发起一个简单的 Flux 查询:

    from(bucket: "online_features")
      |> range(start: -1d)
      |> filter(fn: (r) => r._measurement == "user_features" and r.ENTITY_ID == "some_user_id_123" and r._field == "FEATURE_VALUE")
      |> last()

    这个查询的延迟通常在几毫秒内。

这套基于 etcd 控制面的系统,虽然在实现初期需要投入精力构建守护进程和数据读写适配器,但它提供了一个高度自动化、可扩展且可靠的 MLOps 基础设施组件。它将特征的管理从命令式的脚本执行,转变成了声明式的状态管理,极大地降低了特征上线的复杂度和出错率。

当前方案的一个局限性在于同步是基于定时调度的,对于需要秒级甚至毫秒级新鲜度的特征(例如“用户当前会话的点击次数”)支持不佳。未来的一个迭代方向是引入基于 CDC (Change Data Capture) 的数据源,例如监听 Snowflake 的 STREAMS 或直接从上游的 Kafka 事件流中消费数据,以实现真正的流式特征同步。此外,对于 Schema 变更的管理目前还比较简单,未来需要引入更复杂的版本控制和向后兼容性检查机制,而这些新的元数据同样可以存储在 etcd 中,由控制器进行解析和处理。


  目录