团队的数据科学部门最近遇到了一个难以忍受的瓶颈:NLP 模型的迭代调试周期。一个典型的流程是,研究员修改几行 spaCy
的处理逻辑,然后等待 CI/CD 流水线执行 docker build
,将新的 Python 脚本和依赖打包,推送镜像,最后在开发环境中更新 Pod。这个过程,即使有缓存,也常常需要 2 到 5 分钟。当一天需要进行几十次微调时,这种等待时间累积起来是惊人的生产力损耗。
问题的核心在于,99% 的时间里,改变的只是几 KB 的 Python 脚本,而整个 GB 级别的基础环境(Python 解释器、spaCy
库、预训练模型)是完全不变的。docker build
却需要为这点微小的变化重新处理整个上下文,即便能利用层缓存,其固有的开销依然巨大。我们需要一种能够绕过 docker build
的机制,实现代码的秒级部署与执行。
初步构想是直接操作容器文件系统,将新的脚本替换旧的。但这引出了权限、状态管理和一致性的问题。更进一步的思路是利用容器底层的文件系统快照能力。Docker 的上层 API 对此作了抽象,但其核心运行时 containerd 提供了更底层的接口,允许我们直接操作镜像和容器的快照。
这便是我们的技术路径:构建一个轻量级的任务执行服务,它直接与 containerd
API 交互。当一个新任务提交时,服务不会构建新镜像,而是:
- 以一个预先准备好的、包含完整环境的 “基础镜像” 为模板。
- 为本次任务创建一个该镜像文件系统的可写快照 (snapshot)。
- 将用户提交的 Python 脚本动态注入到这个新快照中。
- 基于这个被修改过的快照,立即启动一个容器任务。
整个过程预计在毫秒到秒级完成。状态管理、任务队列和结果存储则交由成熟的 MySQL
数据库处理。
整体架构设计
我们将用 Go 语言编写这个核心的编排服务,因为它有优秀的并发性能和官方的 containerd 客户端库。MySQL
数据库负责持久化任务信息。
graph TD subgraph Go Orchestrator Service A[API Endpoint] --> B{Task Scheduler}; B -- Polls for new tasks --> C[MySQL Database]; B -- Dispatches task --> D[Containerd Task Runner]; D -- Creates snapshot --> E[containerd Daemon]; E -- Uses base image --> F[Base Python/spaCy Image]; D -- Injects user script --> E; D -- Starts container --> E; E -- Reports status --> D; D -- Writes results --> C; end subgraph Data Persistence C end subgraph Container Runtime E end U[Data Scientist] -- Submits script & data --> A; U -- Queries results --> C; style C fill:#f9f,stroke:#333,stroke-width:2px style E fill:#ccf,stroke:#333,stroke-width:2px
数据库模型
我们需要几张表来支撑这个系统:base_images
用于管理基础环境,user_scripts
存储用户提交的待执行代码,execution_tasks
作为任务队列和结果记录。
-- `base_images`: 存储预先拉取并准备好的基础镜像信息
-- 这些镜像是我们创建快照的模板
CREATE TABLE `base_images` (
`id` INT UNSIGNED NOT NULL AUTO_INCREMENT,
`name` VARCHAR(255) NOT NULL COMMENT 'e.g., spacy-base:3.4.1',
`image_ref` VARCHAR(512) NOT NULL COMMENT 'Full image reference, e.g., docker.io/library/python:3.9-slim',
`containerd_snapshot_id` VARCHAR(255) NOT NULL COMMENT 'The ID of the stable snapshot in containerd',
`status` ENUM('PULLED', 'READY', 'DEPRECATED') NOT NULL DEFAULT 'PULLED',
`created_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (`id`),
UNIQUE KEY `uniq_name` (`name`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='Base environment images';
-- `user_scripts`: 存储需要执行的 Python 脚本
CREATE TABLE `user_scripts` (
`id` INT UNSIGNED NOT NULL AUTO_INCREMENT,
`name` VARCHAR(255) NOT NULL,
`content` MEDIUMTEXT NOT NULL COMMENT 'The actual Python code',
`hash` VARCHAR(64) NOT NULL COMMENT 'SHA256 hash of the content for deduplication',
`created_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (`id`),
UNIQUE KEY `uniq_hash` (`hash`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='User-submitted Python scripts';
-- `execution_tasks`: 核心任务表,记录任务状态和结果
CREATE TABLE `execution_tasks` (
`id` BIGINT UNSIGNED NOT NULL AUTO_INCREMENT,
`base_image_id` INT UNSIGNED NOT NULL,
`user_script_id` INT UNSIGNED NOT NULL,
`input_data` JSON DEFAULT NULL COMMENT 'Input data for the script, passed via stdin',
`status` ENUM('PENDING', 'RUNNING', 'COMPLETED', 'FAILED') NOT NULL DEFAULT 'PENDING',
`container_id` VARCHAR(255) DEFAULT NULL COMMENT 'ID of the containerd container',
`exit_code` INT DEFAULT NULL,
`stdout` MEDIUMTEXT DEFAULT NULL,
`stderr` MEDIUMTEXT DEFAULT NULL,
`submitted_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
`started_at` TIMESTAMP NULL DEFAULT NULL,
`finished_at` TIMESTAMP NULL DEFAULT NULL,
PRIMARY KEY (`id`),
KEY `idx_status` (`status`),
CONSTRAINT `fk_base_image` FOREIGN KEY (`base_image_id`) REFERENCES `base_images` (`id`),
CONSTRAINT `fk_user_script` FOREIGN KEY (`user_script_id`) REFERENCES `user_scripts` (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='NLP execution tasks';
这个 schema 设计考虑了脚本的去重(通过 hash)和任务生命周期的完整追踪。
Go 编排器核心实现
这是整个系统的核心。我们将使用 containerd.io/containerd
官方客户端库。
1. 初始化与连接
首先,我们需要一个能够连接到 containerd
守护进程的客户端。
package main
import (
"context"
"log"
"time"
"github.comcom/containerd/containerd"
"github.comcom/containerd/namespaces"
)
const (
containerdSocket = "/run/containerd/containerd.sock"
// 使用命名空间来隔离我们的应用和其他使用 containerd 的应用(如 Docker 或 Kubernetes)
namespace = "spacy-executor"
)
// ExecutorService 封装了与 containerd 的交互逻辑
type ExecutorService struct {
client *containerd.Client
ctx context.Context
}
// NewExecutorService 创建一个新的 ExecutorService 实例
func NewExecutorService() (*ExecutorService, error) {
// 创建一个 containerd 客户端
client, err := containerd.New(containerdSocket)
if err != nil {
log.Printf("Error creating containerd client: %v", err)
return nil, err
}
// 使用一个带命名空间的上下文,确保所有操作都在我们的隔离环境中
ctx := namespaces.WithNamespace(context.Background(), namespace)
return &ExecutorService{
client: client,
ctx: ctx,
}, nil
}
// Close 关闭与 containerd 的连接
func (s *ExecutorService) Close() {
if s.client != nil {
s.client.Close()
}
}
func main() {
executor, err := NewExecutorService()
if err != nil {
log.Fatalf("Failed to initialize executor service: %v", err)
}
defer executor.Close()
log.Println("Successfully connected to containerd daemon.")
// ... 后续任务调度逻辑 ...
}
2. 准备基础镜像
在系统启动时,或通过一个管理接口,我们需要确保基础镜像已经被拉取到 containerd
中。
import (
"github.com/containerd/containerd/images"
"github.com/containerd/containerd/platforms"
)
// PrepareBaseImage 拉取或确认基础镜像存在
func (s *ExecutorService) PrepareBaseImage(imageRef string) (containerd.Image, error) {
log.Printf("Preparing base image: %s", imageRef)
// 尝试获取本地镜像
img, err := s.client.GetImage(s.ctx, imageRef)
if err == nil {
log.Printf("Image %s already exists locally.", imageRef)
return img, nil
}
// 如果本地不存在,则从远程仓库拉取
// WithPlatform 确保我们拉取的是 amd64 架构的镜像
// WithPullUnpack 表示拉取后自动解压文件系统内容,为快照做准备
pulledImg, err := s.client.Pull(s.ctx, imageRef,
containerd.WithPlatform(platforms.DefaultString()),
containerd.WithPullUnpack,
)
if err != nil {
log.Printf("Failed to pull image %s: %v", imageRef, err)
return nil, err
}
log.Printf("Successfully pulled and unpacked image: %s", pulledImg.Name())
return pulledImg, nil
}
3. 核心逻辑:创建快照并注入脚本
这是整个方案中最关键的部分。containerd
的 snapshotter
服务允许我们对文件系统进行操作。
import (
"fmt"
"io/ioutil"
"os"
"github.com/containerd/containerd/mount"
"github.com/containerd/containerd/snapshots"
)
// CreateTaskSnapshotAndInjectScript 为特定任务创建快照并注入脚本
// baseImage: 预先准备好的 containerd 镜像对象
// taskID: 任务的唯一标识,用于命名快照
// scriptContent: 要注入的 Python 脚本内容
func (s *ExecutorService) CreateTaskSnapshotAndInjectScript(
baseImage containd.Image,
taskID string,
scriptContent []byte,
) (string, error) {
// 获取镜像的根文件系统快照ID(只读层)
diffIDs, err := baseImage.RootFS(s.ctx)
if err != nil {
return "", fmt.Errorf("failed to get image rootfs: %w", err)
}
parentSnapshotID := identity.ChainID(diffIDs).String()
snapshotter := s.client.SnapshotService()
// 快照ID必须唯一,我们用任务ID来保证
snapshotID := fmt.Sprintf("task-%s-snapshot", taskID)
// 从基础镜像的只读层创建一个新的可写快照 (KindActive)
// 这类似于 'git branch',速度非常快,因为它只创建了元数据
mounts, err := snapshotter.Prepare(s.ctx, snapshotID, parentSnapshotID)
if err != nil {
// 如果快照已存在,先清理,以保证幂等性
if errdefs.IsAlreadyExists(err) {
if err := snapshotter.Remove(s.ctx, snapshotID); err != nil {
return "", fmt.Errorf("failed to remove existing snapshot %s: %w", snapshotID, err)
}
mounts, err = snapshotter.Prepare(s.ctx, snapshotID, parentSnapshotID)
if err != nil {
return "", fmt.Errorf("failed to prepare snapshot after removal: %w", err)
}
} else {
return "", fmt.Errorf("failed to prepare snapshot %s: %w", snapshotID, err)
}
}
log.Printf("Created new snapshot '%s' from parent '%s'", snapshotID, parentSnapshotID)
// 将这个可写快照挂载到一个临时目录
tempMountDir, err := ioutil.TempDir("", "containerd-mount-")
if err != nil {
// 确保在出错时清理快照
_ = snapshotter.Remove(s.ctx, snapshotID)
return "", fmt.Errorf("failed to create temp mount dir: %w", err)
}
defer os.RemoveAll(tempMountDir)
if err := mount.All(mounts, tempMountDir); err != nil {
_ = snapshotter.Remove(s.ctx, snapshotID)
return "", fmt.Errorf("failed to mount snapshot: %w", err)
}
defer func() {
if err := mount.UnmountAll(tempMountDir, 0); err != nil {
log.Printf("Warning: failed to unmount %s: %v", tempMountDir, err)
}
}()
// **核心注入步骤**
// 将脚本写入挂载点的特定路径
scriptPath := filepath.Join(tempMountDir, "app", "task.py")
if err := os.MkdirAll(filepath.Dir(scriptPath), 0755); err != nil {
return "", fmt.Errorf("failed to create script directory in mount: %w", err)
}
if err := ioutil.WriteFile(scriptPath, scriptContent, 0644); err != nil {
return "", fmt.Errorf("failed to write script to mount: %w", err)
}
log.Printf("Injected script into %s", scriptPath)
// 卸载后,快照的变更被持久化。此时快照已经包含了我们的脚本。
// 注意:Commit 这一步是必须的,它将 Prepare 的临时快照变为一个正式的、可用的快照。
if err := snapshotter.Commit(s.ctx, snapshotID, parentSnapshotID); err != nil {
return "", fmt.Errorf("failed to commit snapshot %s: %w", snapshotID, err)
}
return snapshotID, nil
}
这段代码是整个系统的魔法所在。它利用了写时复制(Copy-on-Write)文件系统的特性,Prepare
操作几乎是瞬时的。真正的 IO 只发生在写入新脚本时,由于脚本文件很小,这个过程也非常快。
4. 执行任务并获取结果
有了包含自定义脚本的快照,我们就可以用它作为根文件系统来创建和运行一个容器。
import (
"bytes"
"syscall"
"github.com/containerd/containerd/cio"
"github.com/containerd/containerd/oci"
"github.com/opencontainers/runtime-spec/specs-go"
)
// RunTaskInContainer 使用指定的快照运行容器任务
func (s *ExecutorService) RunTaskInContainer(taskID, snapshotID string, inputData []byte) (string, int, error) {
containerID := fmt.Sprintf("task-%s-container", taskID)
// 清理可能存在的旧容器
existingContainer, err := s.client.LoadContainer(s.ctx, containerID)
if err == nil {
// 如果容器存在,先删除
log.Printf("Cleaning up existing container %s", containerID)
task, err := existingContainer.Task(s.ctx, nil)
if err == nil {
// 如果有正在运行的任务,先杀死
_ = task.Kill(s.ctx, syscall.SIGKILL)
<-task.Wait(s.ctx) // 等待任务进程退出
}
if err := existingContainer.Delete(s.ctx, containerd.WithSnapshotCleanup); err != nil {
log.Printf("Warning: failed to delete existing container: %v", err)
}
}
var stdout, stderr bytes.Buffer
// 创建一个新容器
// 关键在于 `WithNewSnapshot`,它告诉 containerd 使用我们刚刚创建并注入了脚本的快照作为容器的根
// `WithNewSpec` 定义了容器的运行时配置 (OCI Spec)
container, err := s.client.NewContainer(
s.ctx,
containerID,
containerd.WithNewSnapshot(snapshotID, baseImage), // 使用我们的任务快照
containerd.WithNewSpec(oci.WithImageConfig(baseImage), // 从基础镜像继承配置
oci.WithProcessArgs("python", "/app/task.py"), // 指定执行命令
oci.WithHostNamespace(specs.NetworkNamespace), // 为了简单,共享主机网络
),
)
if err != nil {
return "", -1, fmt.Errorf("failed to create container: %w", err)
}
// defer container.Delete(s.ctx, containerd.WithSnapshotCleanup) // 任务完成后删除容器和快照
// 创建容器内的任务(即进程)
// cio.NewCreator 返回一个 IO 创建函数,cio.WithStreams 将容器的 stdin/out/err 连接到我们提供的 buffer
task, err := container.NewTask(s.ctx, cio.NewCreator(cio.WithStreams(bytes.NewReader(inputData), &stdout, &stderr)))
if err != nil {
return "", -1, fmt.Errorf("failed to create task: %w", err)
}
defer task.Delete(s.ctx)
// 等待任务完成的 channel
exitStatusC, err := task.Wait(s.ctx)
if err != nil {
return "", -1, fmt.Errorf("failed to wait for task: %w", err)
}
// 启动任务
if err := task.Start(s.ctx); err != nil {
return "", -1, fmt.Errorf("failed to start task: %w", err)
}
log.Printf("Task %s started in container %s", task.ID(), container.ID())
// 阻塞等待任务退出
status := <-exitStatusC
code, _, err := status.Result()
if err != nil {
return "", -1, fmt.Errorf("failed to get task result: %w", err)
}
log.Printf("Task finished with exit code: %d", code)
// 返回标准输出、退出码
return stdout.String(), int(code), nil
}
示例 spaCy Python 脚本
这个脚本 task.py
会被注入到容器的 /app/
目录下。它需要设计成从标准输入读取数据,并将结果打印到标准输出。
# /app/task.py
import sys
import json
import spacy
# 在真实项目中,模型加载应该被缓存或作为服务的一部分预加载
# 这里为了演示,每次执行都加载
print("Loading spaCy model en_core_web_sm...", file=sys.stderr)
try:
nlp = spacy.load("en_core_web_sm")
print("Model loaded.", file=sys.stderr)
except OSError:
# 第一次运行时,spaCy 可能需要下载模型。基础镜像应该预先下载好。
print("Downloading model...", file=sys.stderr)
from spacy.cli import download
download("en_core_web_sm")
nlp = spacy.load("en_core_web_sm")
def process_text(text):
"""
一个简单的 NLP 处理函数,提取命名实体。
"""
doc = nlp(text)
entities = []
for ent in doc.ents:
entities.append({
"text": ent.text,
"start_char": ent.start_char,
"end_char": ent.end_char,
"label": ent.label_
})
return {"entities": entities}
if __name__ == "__main__":
# 从标准输入读取 JSON 数据
try:
input_json = json.load(sys.stdin)
text_to_process = input_json.get("text")
if not text_to_process:
raise ValueError("Input JSON must contain a 'text' field.")
print(f"Processing text: '{text_to_process[:50]}...'", file=sys.stderr)
# 执行处理并输出结果
result = process_text(text_to_process)
# 将结果以 JSON 格式打印到标准输出
json.dump(result, sys.stdout)
except json.JSONDecodeError:
error_msg = {"error": "Invalid JSON input."}
json.dump(error_msg, sys.stdout)
sys.exit(1)
except Exception as e:
error_msg = {"error": str(e)}
json.dump(error_msg, sys.stdout)
sys.exit(1)
这个脚本是可独立运行的,符合我们的设计。Go 编排器会将 JSON 输入喂给它的 stdin
,并从 stdout
读取 JSON 输出。
局限性与未来展望
这个方案极大地提升了纯代码变更场景下的迭代速度,从分钟级降低到了秒级。然而,它并非万能。
首先,这个模型对依赖变更不友好。如果 Python 脚本引入了一个基础镜像中没有的新库,整个流程就会失败。这种情况下,必须构建一个新的基础镜像,但这属于低频操作,可以接受。一个改进方向是,在任务提交时分析 requirements.txt
,动态选择或构建所需的基础镜像。
其次,直接操作宿主机的 containerd.sock
并挂载文件系统,带来了安全风险。在生产环境中,该服务需要被严格隔离,并有权限控制。它更适合作为内部开发平台(IDP)的一个组件,而非直接暴露给最终用户。
最后,当前的实现是单体的。为了支持高并发,可以将任务调度器和执行器分离,使用像 RabbitMQ 这样的消息队列来解耦,并部署多个无状态的执行器节点。每个节点运行我们的 Go 程序,并连接到本地的 containerd
守护进程,形成一个分布式的执行池。