通过 Pulumi 与 Actix-web 实现 Kubernetes 上的 Hadoop YARN 按需集群声明式管理


在真实的生产环境中,技术栈的演进并非总是一蹴而就的线性过程。我们团队就面临这样一个典型的场景:一方面,我们全面拥抱云原生,所有新服务都构建在 Kubernetes 之上,享受其声明式 API 和强大的弹性伸缩能力;另一方面,核心数据处理业务依然深度依赖一个庞大的、运行多年的 Hadoop YARN 集群。这个集群承载着关键的 ETL 和批处理任务,迁移成本和风险极高。

问题在于,这个静态的 Hadoop 集群造成了巨大的资源浪费。它必须按照业务峰值进行容量规划,但在大部分时间里,利用率不足30%。同时,新业务的开发人员希望能够像使用 Kubernetes 一样,通过 API 动态、隔离地申请和释放计算资源来运行他们的数据处理任务,而不是排队等待共享集群的调度。

我们的目标是:构建一个系统,允许用户通过一个统一的、云原生友好的 API,按需创建生命周期与任务绑定的、资源隔离的临时 Hadoop YARN 集群。任务结束,集群自动销毁。这不仅能极大提高资源利用率,还能为不同团队提供隔离的、可定制的运行环境。

方案 A: Kubernetes 原生调度器与 Spark Operator

一个直接的思路是完全抛弃 YARN,将数据处理任务直接迁移到 Kubernetes 上。社区为此提供了成熟的解决方案,例如 Spark Operator。

  • 优势:

    • 完全K8s原生: 使用 CRD (Custom Resource Definition) 定义 Spark 应用,可以通过 kubectl 或 GitOps 工具(如 ArgoCD)进行管理,与现有的云原生工作流无缝集成。
    • 社区成熟: Spark Operator 经过了广泛的生产验证,功能完善,能够处理依赖管理、驱动程序和执行程序的生命周期。
  • 劣势:

    • 绑定特定计算引擎: Spark Operator 专为 Spark 设计。我们的历史遗留任务中包含大量纯 MapReduce 作业,甚至一些依赖特定 YARN 特性的应用。迁移这些作业到 Spark on K8s 需要重写,这在短期内是不现实的。
    • 调度能力差异: YARN 的调度器(如 Capacity Scheduler 和 Fair Scheduler)在处理大规模、混合类型的批处理作业时,提供了非常精细的队列管理、抢占和资源分配策略。Kubernetes 的默认调度器虽然通用,但在大数据作业的吞吐量和调度效率方面,针对性不如 YARN。我们需要的是 YARN 的调度能力,结合 K8s 的资源供给能力。

这个方案对于新项目或许是最佳选择,但对于我们这种需要兼容并包的场景,它的破坏性太强,无法平滑过渡。

方案 B: 混合架构:以 Actix-web 为核心的自定义控制平面

既然无法完全抛弃 YARN,我们可以换个思路:将 YARN 本身容器化,并将其作为一种“应用负载”部署在 Kubernetes 上。我们可以构建一个控制平面,负责接收用户的请求,然后在 Kubernetes 中动态地拉起一套完整的 YARN 集群(一个 ResourceManager,若干个 NodeManager),待用户作业完成后再将其彻底清理。

这个控制平面是整个架构的核心,我们对其有几个关键要求:

  1. 高性能与低资源占用: 作为平台的核心组件,它自身不能成为瓶颈或资源消耗大户。
  2. 强类型与高可靠性: 与底层基础设施(Kubernetes API)和分布式系统(YARN API)交互,必须保证类型安全和健壮的错误处理。
  3. 声明式基础设施管理: 整个控制平面及其管理的 YARN 集群,都应该通过代码来定义和管理,即基础设施即代码 (IaC)。

基于以上考量,我们确定了技术选型:

  • 控制平面API服务: Actix-web (Rust)。Rust 的性能、内存安全和出色的并发模型使其成为构建高可靠系统服务的理想选择。Actix-web 框架轻量且速度极快。
  • 基础设施即代码 (IaC): Pulumi。与 Terraform 不同,Pulumi 允许我们使用通用编程语言(如 TypeScript, Python, Go)来定义基础设施。这使得我们可以在基础设施代码中实现更复杂的逻辑,例如根据不同请求参数动态生成 Kubernetes 配置,这对于我们按需创建集群的场景至关重要。

最终架构如下:

graph TD
    subgraph User Space
        A[用户/CI/CD] -- "POST /api/v1/clusters\n{name, workers, memory...}" --> B(Actix-web Controller)
    end

    subgraph Kubernetes Cluster
        B -- "1. 使用 kube-rs 库" --> C(与 K8s API Server 交互)
        C -- "2. 创建 Deployment (YARN RM)" --> D[Pod: YARN ResourceManager]
        C -- "3. 创建 StatefulSet (YARN NM)" --> E[Pods: YARN NodeManagers]
        C -- "4. 创建 Service" --> F(Service for RM UI & API)
    end

    B -- "5. 轮询 Service 获取 RM API 地址" --> F
    A -- "6. 返回 YARN RM 地址与集群ID" --> G{Client}
    G -- "7. 通过 YARN REST API/CLI 提交作业" --> F

    style B fill:#f9f,stroke:#333,stroke-width:2px
    style Pulumi fill:#bbf,stroke:#333,stroke-width:2px

用户向 Actix-web 控制器发起请求,控制器与 Kubernetes API Server 通信,创建运行 YARN 组件的 Pods。一旦 YARN 集群就绪,控制器就将 ResourceManager 的访问地址返回给用户,用户便可以像使用传统 YARN 集群一样提交作业。

核心实现概览

我们将整个系统分为两大部分:使用 Pulumi 定义基础平台设施,以及使用 Rust (Actix-web) 实现核心的控制逻辑。

1. 使用 Pulumi 定义基础设施 (TypeScript)

首先,我们需要为我们的 Actix-web 控制器本身创建 Kubernetes 部署。这部分工作由 Pulumi 完成,确保了我们平台自身的部署也是声明式和可重复的。

infra/index.ts:

import * as k8s from "@pulumi/kubernetes";
import * as pulumi from "@pulumi/pulumi";

// 从 Pulumi 配置中读取镜像版本等参数
const config = new pulumi.Config();
const appName = "yarn-on-k8s-controller";
const appImage = config.require("appImage");
const appNamespace = new k8s.core.v1.Namespace(appName, {
    metadata: { name: appName },
});

// 为控制器创建 ServiceAccount 和 ClusterRoleBinding,以便它能管理其他命名空间中的资源
const serviceAccount = new k8s.core.v1.ServiceAccount(appName, {
    metadata: {
        name: appName,
        namespace: appNamespace.metadata.name,
    },
});

const clusterRole = new k8s.rbac.v1.ClusterRole(appName, {
    metadata: { name: appName },
    rules: [
        {
            apiGroups: ["", "apps"],
            resources: ["namespaces", "services", "deployments", "statefulsets", "pods", "configmaps"],
            verbs: ["create", "get", "list", "watch", "update", "patch", "delete"],
        },
    ],
});

new k8s.rbac.v1.ClusterRoleBinding(appName, {
    metadata: { name: appName },
    subjects: [{
        kind: "ServiceAccount",
        name: serviceAccount.metadata.name,
        namespace: appNamespace.metadata.name,
    }],
    roleRef: {
        kind: "ClusterRole",
        name: clusterRole.metadata.name,
        apiGroup: "rbac.authorization.k8s.io",
    },
});


const deployment = new k8s.apps.v1.Deployment(appName, {
    metadata: {
        name: appName,
        namespace: appNamespace.metadata.name,
        labels: { app: appName },
    },
    spec: {
        replicas: 1,
        selector: { matchLabels: { app: appName } },
        template: {
            metadata: { labels: { app: appName } },
            spec: {
                serviceAccountName: serviceAccount.metadata.name,
                containers: [{
                    name: appName,
                    image: appImage,
                    ports: [{ containerPort: 8080 }],
                    env: [
                        // 配置日志级别等运行时参数
                        { name: "RUST_LOG", value: "info,yarn_on_k8s_controller=debug" }
                    ],
                    resources: {
                        requests: { cpu: "200m", memory: "256Mi" },
                        limits: { cpu: "500m", memory: "512Mi" },
                    },
                }],
            },
        },
    },
});

const service = new k8s.core.v1.Service(appName, {
    metadata: {
        name: appName,
        namespace: appNamespace.metadata.name,
    },
    spec: {
        type: "ClusterIP",
        selector: { app: appName },
        ports: [{ port: 80, targetPort: 8080 }],
    },
});

export const controllerUrl = pulumi.interpolate`http://${service.metadata.name}.${appNamespace.metadata.name}.svc.cluster.local`;

这段 Pulumi 代码定义了:

  • 一个独立的 Namespace 用于部署控制器。
  • 一个 ServiceAccount,并绑定了足以创建和管理其他 Namespace 中资源的 ClusterRole。这是控制器能够工作的权限基础。在真实项目中,权限应被限制在特定、由控制器动态创建的命名空间内,遵循最小权限原则。
  • 一个 Deployment,用于运行我们的 Actix-web 应用。
  • 一个 ClusterIP Service,用于在集群内部暴露控制器的 API。

通过运行 pulumi up,我们就能一键将这个控制平面部署到任何 Kubernetes 集群中。

2. Actix-web 控制器核心逻辑 (Rust)

这是整个系统的“大脑”。我们将使用 kube-rs 库与 Kubernetes API 交互,serde 进行序列化/反序列化,tracing 进行结构化日志记录。

src/main.rs:

use actix_web::{web, App, HttpServer, Responder, HttpResponse, post};
use serde::{Deserialize, Serialize};
use kube::{Client, Api, api::{PostParams, Patch, PatchParams}};
use kube::api::ListParams;
use k8s_openapi::api::core::v1::{Namespace, Service, Pod};
use k8s_openapi::api::apps::v1::{Deployment, StatefulSet};
use std::collections::BTreeMap;
use tracing::{info, warn, error, instrument};
use tracing_subscriber::EnvFilter;

// API请求体定义
#[derive(Deserialize, Debug)]
struct CreateClusterRequest {
    name: String,
    node_managers: i32,
    nm_cpu_request: String,
    nm_memory_request: String,
}

// API响应体定义
#[derive(Serialize, Debug)]
struct ClusterResponse {
    cluster_id: String,
    namespace: String,
    resource_manager_url: Option<String>,
    status: String,
}

#[instrument(skip(client))]
async fn create_yarn_namespace(client: Client, name: &str) -> Result<(), kube::Error> {
    let namespaces: Api<Namespace> = Api::all(client);
    let ns = Namespace {
        metadata: k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta {
            name: Some(name.to_string()),
            ..Default::default()
        },
        ..Default::default()
    };
    namespaces.create(&PostParams::default(), &ns).await?;
    info!(namespace = name, "Successfully created namespace");
    Ok(())
}

// 核心处理函数
#[instrument(skip(req, app_state))]
#[post("/clusters")]
async fn create_cluster(req: web::Json<CreateClusterRequest>, app_state: web::Data<AppState>) -> impl Responder {
    let client = app_state.client.clone();
    let cluster_name = &req.name;
    let namespace = format!("yarn-cluster-{}", cluster_name);

    info!(cluster_name = cluster_name, "Received cluster creation request");

    // 步骤1: 创建独立的 Namespace
    if let Err(e) = create_yarn_namespace(client.clone(), &namespace).await {
        // 这里的错误处理需要更细致,例如判断 Namespace 是否已存在
        error!(error = %e, "Failed to create namespace");
        return HttpResponse::InternalServerError().json(ClusterResponse {
            cluster_id: cluster_name.to_string(),
            namespace: namespace.clone(),
            resource_manager_url: None,
            status: format!("Failed to create namespace: {}", e),
        });
    }

    // 步骤2: 创建 YARN ResourceManager Deployment
    // 在真实项目中,这些 YAML 定义应该从模板文件加载和渲染,而不是硬编码
    let rm_deployment_def: Deployment = serde_json::from_value(serde_json::json!({
        "apiVersion": "apps/v1",
        "kind": "Deployment",
        "metadata": { "name": format!("{}-resourcemanager", cluster_name) },
        "spec": {
            "replicas": 1,
            "selector": { "matchLabels": { "app": "yarn-resourcemanager", "cluster": cluster_name } },
            "template": {
                "metadata": { "labels": { "app": "yarn-resourcemanager", "cluster": cluster_name } },
                "spec": {
                    "containers": [{
                        "name": "resourcemanager",
                        "image": "apache/hadoop:3.3.4", // 使用官方或自建的Hadoop镜像
                        "command": ["hadoop-yarn-resourcemanager"],
                        "ports": [
                            { "containerPort": 8088 }, // RM REST API
                            { "containerPort": 8032 }, // RM Scheduler
                        ],
                        "resources": {
                           "requests": { "cpu": "1", "memory": "2Gi" },
                           "limits": { "cpu": "2", "memory": "4Gi" }
                        }
                    }]
                }
            }
        }
    })).unwrap();
    
    let deployments: Api<Deployment> = Api::namespaced(client.clone(), &namespace);
    if let Err(e) = deployments.create(&PostParams::default(), &rm_deployment_def).await {
        error!(error = %e, "Failed to create ResourceManager deployment");
        // 应该在这里添加清理逻辑,删除已创建的 namespace
        return HttpResponse::InternalServerError().body("Failed to create ResourceManager");
    }

    // 步骤3: 创建 ResourceManager Service
    let rm_service_def: Service = serde_json::from_value(serde_json::json!({
        "apiVersion": "v1",
        "kind": "Service",
        "metadata": { "name": format!("{}-resourcemanager-svc", cluster_name) },
        "spec": {
            "selector": { "app": "yarn-resourcemanager", "cluster": cluster_name },
            "ports": [{ "name": "http", "port": 8088, "targetPort": 8088 }]
        }
    })).unwrap();
    
    let services: Api<Service> = Api::namespaced(client.clone(), &namespace);
    let rm_service = match services.create(&PostParams::default(), &rm_service_def).await {
        Ok(s) => s,
        Err(e) => {
            error!(error = %e, "Failed to create ResourceManager service");
            return HttpResponse::InternalServerError().body("Failed to create ResourceManager Service");
        }
    };

    // 步骤4: 创建 YARN NodeManager StatefulSet
    let nm_statefulset_def: StatefulSet = serde_json::from_value(serde_json::json!({
        "apiVersion": "apps/v1",
        "kind": "StatefulSet",
        "metadata": { "name": format!("{}-nodemanager", cluster_name) },
        "spec": {
            "replicas": req.node_managers,
            "selector": { "matchLabels": { "app": "yarn-nodemanager", "cluster": cluster_name } },
            "serviceName": format!("{}-nodemanager", cluster_name), // Headless service for stable network IDs
            "template": {
                "metadata": { "labels": { "app": "yarn-nodemanager", "cluster": cluster_name } },
                "spec": {
                    "containers": [{
                        "name": "nodemanager",
                        "image": "apache/hadoop:3.3.4",
                        "command": ["hadoop-yarn-nodemanager"],
                        // 关键:通过环境变量将 RM 的地址注入到 NM 中
                        "env": [{
                            "name": "YARN_CONF_yarn_resourcemanager_hostname",
                            "value": rm_service.metadata.name.as_ref().unwrap()
                        }],
                        "resources": {
                           "requests": { "cpu": &req.nm_cpu_request, "memory": &req.nm_memory_request },
                           // Limits should also be configurable
                        }
                    }]
                }
            }
        }
    })).unwrap();
    
    let statefulsets: Api<StatefulSet> = Api::namespaced(client.clone(), &namespace);
    if let Err(e) = statefulsets.create(&PostParams::default(), &nm_statefulset_def).await {
        error!(error = %e, "Failed to create NodeManager statefulset");
        return HttpResponse::InternalServerError().body("Failed to create NodeManager StatefulSet");
    }

    info!(cluster_name = cluster_name, "YARN cluster components created successfully.");

    let rm_url = format!(
        "http://{}.{}.svc.cluster.local:8088",
        rm_service.metadata.name.unwrap(),
        namespace
    );

    HttpResponse::Accepted().json(ClusterResponse {
        cluster_id: cluster_name.to_string(),
        namespace,
        resource_manager_url: Some(rm_url),
        status: "Creating".to_string(),
    })
}

struct AppState {
    client: Client,
}

#[actix_web::main]
async fn main() -> std::io::Result<()> {
    // 初始化日志
    tracing_subscriber::fmt()
        .with_env_filter(EnvFilter::from_default_env())
        .init();

    let client = Client::try_default().await.expect("Failed to create kube client");

    HttpServer::new(move || {
        App::new()
            .app_data(web::Data::new(AppState { client: client.clone() }))
            .service(create_cluster)
    })
    .bind(("0.0.0.0", 8080))?
    .run()
    .await
}

这段 Rust 代码展示了控制器最核心的创建逻辑:

  1. API 定义: 使用 serde 定义了清晰的请求和响应结构体。
  2. Kubernetes 客户端: kube-rs 提供了强类型的、异步的 Kubernetes API 客户端。
  3. 资源创建流程: 严格按照 Namespace -> ResourceManager Deployment -> Service -> NodeManager StatefulSet 的顺序创建资源,确保了依赖关系的正确性。
  4. 配置注入: 一个关键细节是,NodeManager 需要知道 ResourceManager 的地址。我们通过创建 Service 为 RM 提供一个稳定的 DNS 名称,然后将这个名称作为环境变量注入到 NodeManager 的 Pod 中。这是 Kubernetes 服务发现的典型实践。
  5. 日志记录: 使用 tracing 库,为每个请求的关键步骤添加了结构化日志,并利用 instrument宏自动添加上下文信息(如函数名和参数),这对于调试和监控至关重要。

在真实的生产级实现中,还需要考虑更多:

  • 状态管理与轮询: 当前实现是“即发即忘”的。控制器应该持续监控所创建资源的状态,例如等待 Pods 进入 Running 状态,然后通过 YARN 的 REST API 检查集群是否健康,最后才更新集群状态。
  • 错误处理与回滚: 如果在创建过程中任何一步失败,控制器应该能够执行回滚操作,删除所有已创建的资源,避免留下孤儿资源。
  • 删除接口: 需要实现 DELETE /clusters/{id} 接口,用于清理整个 Namespace 和其中的所有资源。
  • Hadoop 配置: Hadoop 集群通常需要复杂的 XML 配置文件。一个更健壮的方案是将这些配置存储在 Kubernetes 的 ConfigMap 中,并将其挂载到 Pod 中,而不是完全依赖环境变量。

架构的扩展性与局限性

我们选择的这种混合架构,本质上是在 Kubernetes 之上构建了一个特定领域的 PaaS (平台即服务) 层,专门用于管理 YARN 集群。

扩展性:

  • 支持多版本/多引擎: 可以轻松扩展 API,允许用户指定不同的 Hadoop 镜像版本,甚至可以支持其他大数据框架(如 Flink),只需在控制器中加入创建相应 Flink JobManager 和 TaskManager 的逻辑即可。
  • 与 Pulumi Automation API 集成: 对于更复杂的、需要动态创建云资源(如 S3 存储桶、数据库)的场景,Actix-web 控制器可以直接调用 Pulumi Automation API,从而将应用层面的编排和基础设施层面的编排统一起来。

局限性:

  • 自定义组件的维护成本: 引入 Actix-web 控制器意味着我们需要负责这个组件的开发、测试、部署和维护。这比直接使用社区的 Operator 增加了团队的认知负担和维护成本。
  • 网络复杂性: Pod 间的网络通信依赖 Kubernetes CNI。虽然对于大多数场景已经足够,但在需要极致网络性能的 shuffle 密集型作业中,可能需要对 CNI 插件进行额外的性能调优,这比在物理机上部署 Hadoop 网络配置更复杂。
  • 存储问题: 当前实现没有处理持久化存储 (HDFS)。虽然可以将 HDFS 也容器化部署在 Kubernetes 上(例如使用 Rook+Ceph 或专用的 HDFS Operator),但这会进一步增加系统的复杂性。更常见的做法是,让这些临时的 YARN 集群直接读写外部的对象存储(如 S3、GCS),实现计算与存储的分离。

这个方案并非银弹,它是在特定约束(保留 YARN 生态、拥抱云原生弹性)下的权衡结果。它通过引入一个轻量、高效的自定义控制平面,成功地在 Kubernetes 和 Hadoop 这两个看似不同的世界之间架起了一座桥梁,解决了我们面临的实际工程问题。


  目录