基于 containerd 快照实现 spaCy 模型的秒级热加载与执行


团队的数据科学部门最近遇到了一个难以忍受的瓶颈:NLP 模型的迭代调试周期。一个典型的流程是,研究员修改几行 spaCy 的处理逻辑,然后等待 CI/CD 流水线执行 docker build,将新的 Python 脚本和依赖打包,推送镜像,最后在开发环境中更新 Pod。这个过程,即使有缓存,也常常需要 2 到 5 分钟。当一天需要进行几十次微调时,这种等待时间累积起来是惊人的生产力损耗。

问题的核心在于,99% 的时间里,改变的只是几 KB 的 Python 脚本,而整个 GB 级别的基础环境(Python 解释器、spaCy 库、预训练模型)是完全不变的。docker build 却需要为这点微小的变化重新处理整个上下文,即便能利用层缓存,其固有的开销依然巨大。我们需要一种能够绕过 docker build 的机制,实现代码的秒级部署与执行。

初步构想是直接操作容器文件系统,将新的脚本替换旧的。但这引出了权限、状态管理和一致性的问题。更进一步的思路是利用容器底层的文件系统快照能力。Docker 的上层 API 对此作了抽象,但其核心运行时 containerd 提供了更底层的接口,允许我们直接操作镜像和容器的快照。

这便是我们的技术路径:构建一个轻量级的任务执行服务,它直接与 containerd API 交互。当一个新任务提交时,服务不会构建新镜像,而是:

  1. 以一个预先准备好的、包含完整环境的 “基础镜像” 为模板。
  2. 为本次任务创建一个该镜像文件系统的可写快照 (snapshot)。
  3. 将用户提交的 Python 脚本动态注入到这个新快照中。
  4. 基于这个被修改过的快照,立即启动一个容器任务。

整个过程预计在毫秒到秒级完成。状态管理、任务队列和结果存储则交由成熟的 MySQL 数据库处理。

整体架构设计

我们将用 Go 语言编写这个核心的编排服务,因为它有优秀的并发性能和官方的 containerd 客户端库。MySQL 数据库负责持久化任务信息。

graph TD
    subgraph Go Orchestrator Service
        A[API Endpoint] --> B{Task Scheduler};
        B -- Polls for new tasks --> C[MySQL Database];
        B -- Dispatches task --> D[Containerd Task Runner];
        D -- Creates snapshot --> E[containerd Daemon];
        E -- Uses base image --> F[Base Python/spaCy Image];
        D -- Injects user script --> E;
        D -- Starts container --> E;
        E -- Reports status --> D;
        D -- Writes results --> C;
    end

    subgraph Data Persistence
        C
    end

    subgraph Container Runtime
        E
    end

    U[Data Scientist] -- Submits script & data --> A;
    U -- Queries results --> C;

    style C fill:#f9f,stroke:#333,stroke-width:2px
    style E fill:#ccf,stroke:#333,stroke-width:2px

数据库模型

我们需要几张表来支撑这个系统:base_images 用于管理基础环境,user_scripts 存储用户提交的待执行代码,execution_tasks 作为任务队列和结果记录。

-- `base_images`: 存储预先拉取并准备好的基础镜像信息
-- 这些镜像是我们创建快照的模板
CREATE TABLE `base_images` (
  `id` INT UNSIGNED NOT NULL AUTO_INCREMENT,
  `name` VARCHAR(255) NOT NULL COMMENT 'e.g., spacy-base:3.4.1',
  `image_ref` VARCHAR(512) NOT NULL COMMENT 'Full image reference, e.g., docker.io/library/python:3.9-slim',
  `containerd_snapshot_id` VARCHAR(255) NOT NULL COMMENT 'The ID of the stable snapshot in containerd',
  `status` ENUM('PULLED', 'READY', 'DEPRECATED') NOT NULL DEFAULT 'PULLED',
  `created_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
  PRIMARY KEY (`id`),
  UNIQUE KEY `uniq_name` (`name`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='Base environment images';

-- `user_scripts`: 存储需要执行的 Python 脚本
CREATE TABLE `user_scripts` (
  `id` INT UNSIGNED NOT NULL AUTO_INCREMENT,
  `name` VARCHAR(255) NOT NULL,
  `content` MEDIUMTEXT NOT NULL COMMENT 'The actual Python code',
  `hash` VARCHAR(64) NOT NULL COMMENT 'SHA256 hash of the content for deduplication',
  `created_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
  PRIMARY KEY (`id`),
  UNIQUE KEY `uniq_hash` (`hash`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='User-submitted Python scripts';

-- `execution_tasks`: 核心任务表,记录任务状态和结果
CREATE TABLE `execution_tasks` (
  `id` BIGINT UNSIGNED NOT NULL AUTO_INCREMENT,
  `base_image_id` INT UNSIGNED NOT NULL,
  `user_script_id` INT UNSIGNED NOT NULL,
  `input_data` JSON DEFAULT NULL COMMENT 'Input data for the script, passed via stdin',
  `status` ENUM('PENDING', 'RUNNING', 'COMPLETED', 'FAILED') NOT NULL DEFAULT 'PENDING',
  `container_id` VARCHAR(255) DEFAULT NULL COMMENT 'ID of the containerd container',
  `exit_code` INT DEFAULT NULL,
  `stdout` MEDIUMTEXT DEFAULT NULL,
  `stderr` MEDIUMTEXT DEFAULT NULL,
  `submitted_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
  `started_at` TIMESTAMP NULL DEFAULT NULL,
  `finished_at` TIMESTAMP NULL DEFAULT NULL,
  PRIMARY KEY (`id`),
  KEY `idx_status` (`status`),
  CONSTRAINT `fk_base_image` FOREIGN KEY (`base_image_id`) REFERENCES `base_images` (`id`),
  CONSTRAINT `fk_user_script` FOREIGN KEY (`user_script_id`) REFERENCES `user_scripts` (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='NLP execution tasks';

这个 schema 设计考虑了脚本的去重(通过 hash)和任务生命周期的完整追踪。

Go 编排器核心实现

这是整个系统的核心。我们将使用 containerd.io/containerd 官方客户端库。

1. 初始化与连接

首先,我们需要一个能够连接到 containerd守护进程的客户端。

package main

import (
	"context"
	"log"
	"time"

	"github.comcom/containerd/containerd"
	"github.comcom/containerd/namespaces"
)

const (
	containerdSocket = "/run/containerd/containerd.sock"
	// 使用命名空间来隔离我们的应用和其他使用 containerd 的应用(如 Docker 或 Kubernetes)
	namespace = "spacy-executor"
)

// ExecutorService 封装了与 containerd 的交互逻辑
type ExecutorService struct {
	client *containerd.Client
	ctx    context.Context
}

// NewExecutorService 创建一个新的 ExecutorService 实例
func NewExecutorService() (*ExecutorService, error) {
	// 创建一个 containerd 客户端
	client, err := containerd.New(containerdSocket)
	if err != nil {
		log.Printf("Error creating containerd client: %v", err)
		return nil, err
	}

	// 使用一个带命名空间的上下文,确保所有操作都在我们的隔离环境中
	ctx := namespaces.WithNamespace(context.Background(), namespace)

	return &ExecutorService{
		client: client,
		ctx:    ctx,
	}, nil
}

// Close 关闭与 containerd 的连接
func (s *ExecutorService) Close() {
	if s.client != nil {
		s.client.Close()
	}
}

func main() {
	executor, err := NewExecutorService()
	if err != nil {
		log.Fatalf("Failed to initialize executor service: %v", err)
	}
	defer executor.Close()
	log.Println("Successfully connected to containerd daemon.")
	// ... 后续任务调度逻辑 ...
}

2. 准备基础镜像

在系统启动时,或通过一个管理接口,我们需要确保基础镜像已经被拉取到 containerd 中。

import (
    "github.com/containerd/containerd/images"
	"github.com/containerd/containerd/platforms"
)

// PrepareBaseImage 拉取或确认基础镜像存在
func (s *ExecutorService) PrepareBaseImage(imageRef string) (containerd.Image, error) {
	log.Printf("Preparing base image: %s", imageRef)

	// 尝试获取本地镜像
	img, err := s.client.GetImage(s.ctx, imageRef)
	if err == nil {
		log.Printf("Image %s already exists locally.", imageRef)
		return img, nil
	}

	// 如果本地不存在,则从远程仓库拉取
	// WithPlatform 确保我们拉取的是 amd64 架构的镜像
	// WithPullUnpack 表示拉取后自动解压文件系统内容,为快照做准备
	pulledImg, err := s.client.Pull(s.ctx, imageRef,
		containerd.WithPlatform(platforms.DefaultString()),
		containerd.WithPullUnpack,
	)
	if err != nil {
		log.Printf("Failed to pull image %s: %v", imageRef, err)
		return nil, err
	}

	log.Printf("Successfully pulled and unpacked image: %s", pulledImg.Name())
	return pulledImg, nil
}

3. 核心逻辑:创建快照并注入脚本

这是整个方案中最关键的部分。containerdsnapshotter 服务允许我们对文件系统进行操作。

import (
	"fmt"
	"io/ioutil"
	"os"

	"github.com/containerd/containerd/mount"
	"github.com/containerd/containerd/snapshots"
)

// CreateTaskSnapshotAndInjectScript 为特定任务创建快照并注入脚本
// baseImage: 预先准备好的 containerd 镜像对象
// taskID: 任务的唯一标识,用于命名快照
// scriptContent: 要注入的 Python 脚本内容
func (s *ExecutorService) CreateTaskSnapshotAndInjectScript(
	baseImage containd.Image,
	taskID string,
	scriptContent []byte,
) (string, error) {
	// 获取镜像的根文件系统快照ID(只读层)
	diffIDs, err := baseImage.RootFS(s.ctx)
	if err != nil {
		return "", fmt.Errorf("failed to get image rootfs: %w", err)
	}
	parentSnapshotID := identity.ChainID(diffIDs).String()

	snapshotter := s.client.SnapshotService()
	// 快照ID必须唯一,我们用任务ID来保证
	snapshotID := fmt.Sprintf("task-%s-snapshot", taskID)

	// 从基础镜像的只读层创建一个新的可写快照 (KindActive)
	// 这类似于 'git branch',速度非常快,因为它只创建了元数据
	mounts, err := snapshotter.Prepare(s.ctx, snapshotID, parentSnapshotID)
	if err != nil {
		// 如果快照已存在,先清理,以保证幂等性
		if errdefs.IsAlreadyExists(err) {
			if err := snapshotter.Remove(s.ctx, snapshotID); err != nil {
				return "", fmt.Errorf("failed to remove existing snapshot %s: %w", snapshotID, err)
			}
			mounts, err = snapshotter.Prepare(s.ctx, snapshotID, parentSnapshotID)
			if err != nil {
				return "", fmt.Errorf("failed to prepare snapshot after removal: %w", err)
			}
		} else {
			return "", fmt.Errorf("failed to prepare snapshot %s: %w", snapshotID, err)
		}
	}
	
	log.Printf("Created new snapshot '%s' from parent '%s'", snapshotID, parentSnapshotID)

	// 将这个可写快照挂载到一个临时目录
	tempMountDir, err := ioutil.TempDir("", "containerd-mount-")
	if err != nil {
		// 确保在出错时清理快照
		_ = snapshotter.Remove(s.ctx, snapshotID)
		return "", fmt.Errorf("failed to create temp mount dir: %w", err)
	}
	defer os.RemoveAll(tempMountDir)

	if err := mount.All(mounts, tempMountDir); err != nil {
		_ = snapshotter.Remove(s.ctx, snapshotID)
		return "", fmt.Errorf("failed to mount snapshot: %w", err)
	}
	defer func() {
		if err := mount.UnmountAll(tempMountDir, 0); err != nil {
			log.Printf("Warning: failed to unmount %s: %v", tempMountDir, err)
		}
	}()

	// **核心注入步骤**
	// 将脚本写入挂载点的特定路径
	scriptPath := filepath.Join(tempMountDir, "app", "task.py")
	if err := os.MkdirAll(filepath.Dir(scriptPath), 0755); err != nil {
		return "", fmt.Errorf("failed to create script directory in mount: %w", err)
	}
	if err := ioutil.WriteFile(scriptPath, scriptContent, 0644); err != nil {
		return "", fmt.Errorf("failed to write script to mount: %w", err)
	}
	log.Printf("Injected script into %s", scriptPath)

	// 卸载后,快照的变更被持久化。此时快照已经包含了我们的脚本。
	// 注意:Commit 这一步是必须的,它将 Prepare 的临时快照变为一个正式的、可用的快照。
	if err := snapshotter.Commit(s.ctx, snapshotID, parentSnapshotID); err != nil {
		return "", fmt.Errorf("failed to commit snapshot %s: %w", snapshotID, err)
	}

	return snapshotID, nil
}

这段代码是整个系统的魔法所在。它利用了写时复制(Copy-on-Write)文件系统的特性,Prepare 操作几乎是瞬时的。真正的 IO 只发生在写入新脚本时,由于脚本文件很小,这个过程也非常快。

4. 执行任务并获取结果

有了包含自定义脚本的快照,我们就可以用它作为根文件系统来创建和运行一个容器。

import (
    "bytes"
    "syscall"

    "github.com/containerd/containerd/cio"
    "github.com/containerd/containerd/oci"
    "github.com/opencontainers/runtime-spec/specs-go"
)

// RunTaskInContainer 使用指定的快照运行容器任务
func (s *ExecutorService) RunTaskInContainer(taskID, snapshotID string, inputData []byte) (string, int, error) {
	containerID := fmt.Sprintf("task-%s-container", taskID)

	// 清理可能存在的旧容器
	existingContainer, err := s.client.LoadContainer(s.ctx, containerID)
	if err == nil {
		// 如果容器存在,先删除
		log.Printf("Cleaning up existing container %s", containerID)
		task, err := existingContainer.Task(s.ctx, nil)
		if err == nil {
			// 如果有正在运行的任务,先杀死
			_ = task.Kill(s.ctx, syscall.SIGKILL)
			<-task.Wait(s.ctx) // 等待任务进程退出
		}
		if err := existingContainer.Delete(s.ctx, containerd.WithSnapshotCleanup); err != nil {
			log.Printf("Warning: failed to delete existing container: %v", err)
		}
	}

	var stdout, stderr bytes.Buffer

	// 创建一个新容器
	// 关键在于 `WithNewSnapshot`,它告诉 containerd 使用我们刚刚创建并注入了脚本的快照作为容器的根
	// `WithNewSpec` 定义了容器的运行时配置 (OCI Spec)
	container, err := s.client.NewContainer(
		s.ctx,
		containerID,
		containerd.WithNewSnapshot(snapshotID, baseImage), // 使用我们的任务快照
		containerd.WithNewSpec(oci.WithImageConfig(baseImage), // 从基础镜像继承配置
			oci.WithProcessArgs("python", "/app/task.py"), // 指定执行命令
			oci.WithHostNamespace(specs.NetworkNamespace), // 为了简单,共享主机网络
		),
	)
	if err != nil {
		return "", -1, fmt.Errorf("failed to create container: %w", err)
	}
	// defer container.Delete(s.ctx, containerd.WithSnapshotCleanup) // 任务完成后删除容器和快照

	// 创建容器内的任务(即进程)
	// cio.NewCreator 返回一个 IO 创建函数,cio.WithStreams 将容器的 stdin/out/err 连接到我们提供的 buffer
	task, err := container.NewTask(s.ctx, cio.NewCreator(cio.WithStreams(bytes.NewReader(inputData), &stdout, &stderr)))
	if err != nil {
		return "", -1, fmt.Errorf("failed to create task: %w", err)
	}
	defer task.Delete(s.ctx)

	// 等待任务完成的 channel
	exitStatusC, err := task.Wait(s.ctx)
	if err != nil {
		return "", -1, fmt.Errorf("failed to wait for task: %w", err)
	}

	// 启动任务
	if err := task.Start(s.ctx); err != nil {
		return "", -1, fmt.Errorf("failed to start task: %w", err)
	}
	log.Printf("Task %s started in container %s", task.ID(), container.ID())

	// 阻塞等待任务退出
	status := <-exitStatusC
	code, _, err := status.Result()
	if err != nil {
		return "", -1, fmt.Errorf("failed to get task result: %w", err)
	}
	log.Printf("Task finished with exit code: %d", code)

	// 返回标准输出、退出码
	return stdout.String(), int(code), nil
}

示例 spaCy Python 脚本

这个脚本 task.py 会被注入到容器的 /app/ 目录下。它需要设计成从标准输入读取数据,并将结果打印到标准输出。

# /app/task.py
import sys
import json
import spacy

# 在真实项目中,模型加载应该被缓存或作为服务的一部分预加载
# 这里为了演示,每次执行都加载
print("Loading spaCy model en_core_web_sm...", file=sys.stderr)
try:
    nlp = spacy.load("en_core_web_sm")
    print("Model loaded.", file=sys.stderr)
except OSError:
    # 第一次运行时,spaCy 可能需要下载模型。基础镜像应该预先下载好。
    print("Downloading model...", file=sys.stderr)
    from spacy.cli import download
    download("en_core_web_sm")
    nlp = spacy.load("en_core_web_sm")

def process_text(text):
    """
    一个简单的 NLP 处理函数,提取命名实体。
    """
    doc = nlp(text)
    entities = []
    for ent in doc.ents:
        entities.append({
            "text": ent.text,
            "start_char": ent.start_char,
            "end_char": ent.end_char,
            "label": ent.label_
        })
    return {"entities": entities}

if __name__ == "__main__":
    # 从标准输入读取 JSON 数据
    try:
        input_json = json.load(sys.stdin)
        text_to_process = input_json.get("text")
        
        if not text_to_process:
            raise ValueError("Input JSON must contain a 'text' field.")

        print(f"Processing text: '{text_to_process[:50]}...'", file=sys.stderr)
        
        # 执行处理并输出结果
        result = process_text(text_to_process)
        
        # 将结果以 JSON 格式打印到标准输出
        json.dump(result, sys.stdout)

    except json.JSONDecodeError:
        error_msg = {"error": "Invalid JSON input."}
        json.dump(error_msg, sys.stdout)
        sys.exit(1)
    except Exception as e:
        error_msg = {"error": str(e)}
        json.dump(error_msg, sys.stdout)
        sys.exit(1)

这个脚本是可独立运行的,符合我们的设计。Go 编排器会将 JSON 输入喂给它的 stdin,并从 stdout 读取 JSON 输出。

局限性与未来展望

这个方案极大地提升了纯代码变更场景下的迭代速度,从分钟级降低到了秒级。然而,它并非万能。

首先,这个模型对依赖变更不友好。如果 Python 脚本引入了一个基础镜像中没有的新库,整个流程就会失败。这种情况下,必须构建一个新的基础镜像,但这属于低频操作,可以接受。一个改进方向是,在任务提交时分析 requirements.txt,动态选择或构建所需的基础镜像。

其次,直接操作宿主机的 containerd.sock 并挂载文件系统,带来了安全风险。在生产环境中,该服务需要被严格隔离,并有权限控制。它更适合作为内部开发平台(IDP)的一个组件,而非直接暴露给最终用户。

最后,当前的实现是单体的。为了支持高并发,可以将任务调度器和执行器分离,使用像 RabbitMQ 这样的消息队列来解耦,并部署多个无状态的执行器节点。每个节点运行我们的 Go 程序,并连接到本地的 containerd 守护进程,形成一个分布式的执行池。


  目录