基于Kotlin与GitLab CI实现数据湖仓Iceberg表格式的自动化模式演进


团队一次代价惨痛的生产事故,源于一次简单的模式变更。在向数据湖核心事实表中添加一个新字段时,一位工程师手动执行 DDL 脚本时引入了一个微小的拼写错误。这个错误直到下游的报表任务在凌晨批量失败时才被发现,导致了数小时的数据延迟和业务决策中断。这次事件暴露了一个核心痛点:在快速迭代的业务需求下,依赖人工操作和文档交接来管理数据湖的表结构,既脆弱又低效。我们需要一个将模式(Schema)视为代码,并由自动化流程强制保障其正确性和安全性的系统。

我们的初步构想是构建一个内部服务,它将作为数据湖仓(我们选用了 Apache Iceberg 表格式)与开发人员之间的唯一网关。所有模式变更请求都必须通过这个服务,而这个服务本身则受控于我们的 GitLab CI/CD 流程。开发者不再直接连接生产环境,而是通过提交代码(一个定义了目标模式的声明式文件)来触发变更。

技术选型与架构决策

整个方案的核心是一个 API 服务。对于它的技术栈,我们做了如下权衡:

  1. 语言与框架:Kotlin + Ktor

    • 为什么是 Kotlin? 团队对 JVM 生态很熟悉,但厌倦了 Java 的冗长。Kotlin 带来的空安全、数据类(Data Classes)以及协程,对于构建一个 I/O 密集型、健壮性要求高的 API 服务来说是完美的。数据类能极其简洁地定义模式契约,而空安全则从编译层面消除了大量的潜在 NullPointerException
    • 为什么是 Ktor 而不是 Spring Boot? 这个服务的功能非常单一,职责明确。Spring Boot 虽然功能强大,但引入了大量的依赖和启动开销。Ktor 是一个轻量级的异步框架,完全基于协程,非常适合构建这种目标明确的高性能微服务。我们需要的是一个“手术刀”,而非“瑞士军军刀”。
  2. 数据湖表格式:Apache Iceberg

    • Iceberg 是这个架构能够成立的基石。它的核心优势在于其元数据管理机制。每一次模式变更或数据更新都会创建一个新的元数据快照,这是一个原子操作。这意味着模式演进是事务性的,要么完全成功,要么保持原样,不会出现中间状态。此外,它的“时间旅行”功能允许我们在出现问题时快速回滚到变更前的表状态,这是传统 Hive 表无法比拟的。
  3. CI/CD:GitLab CI/CD

    • 这是公司的标准工具链。关键在于我们如何利用它来保障数据变更的安全性。我们将设计一个多阶段的流水线,在合并请求(Merge Request)阶段进行严格的“干跑”(Dry Run)验证,只有在验证通过并合入主分支后,才执行真正的生产变更。

流程设计:Schema-as-Code 的闭环

我们将整个流程设计为一个自动化的闭环,通过 Mermaid 图表可以清晰地看到各个组件的交互:

graph TD
    subgraph "GitLab Repository"
        A[Developer: 提交 schema.yml 的 MR] --> B{GitLab CI Pipeline};
    end

    subgraph "CI/CD Pipeline Stages"
        B -- 触发 --> C[Stage: validate];
        C -- 验证通过 --> D[Human: Code Review & Approve MR];
        D -- Merge to main --> E[Stage: deploy];
    end

    subgraph "Kotlin Schema Service"
        F[API Endpoint: /v1/schema/validate];
        G[API Endpoint: /v1/schema/apply];
    end

    subgraph "Data Lakehouse (Iceberg)"
        H[Iceberg Catalog];
        I[Underlying Storage: S3/HDFS];
        H <--> I;
    end

    C --> |调用 Dry Run API| F;
    F -- 检查兼容性 --> H;
    F --> |返回验证结果| C;

    E --> |调用 Apply API| G;
    G -- 执行原子性变更 --> H;
    G --> |返回执行结果| E;

第一步:定义声明式的模式契约

我们决定使用 YAML 来定义表结构,因为它比 JSON 更具可读性。开发者将在一个专门的 Git 仓库中为每一张表维护一个 schema.yml 文件。

一个典型的 user_profiles.yml 文件内容如下:

# table_name: 唯一的表标识符,通常是 database.table
table_name: "ods.user_profiles"

# schema: 定义表的列结构
schema:
  columns:
    - name: "user_id"
      type: "long"
      doc: "唯一用户ID,主键"
      nullable: false
    - name: "username"
      type: "string"
      doc: "用户名"
      nullable: false
    - name: "email"
      type: "string"
      doc: "用户邮箱"
      nullable: true # 允许为空
    - name: "created_at"
      type: "timestamp"
      doc: "账户创建时间戳"
      nullable: false
    - name: "is_active"
      type: "boolean"
      doc: "账户是否激活"
      nullable: false
      default: true # 新增字段,提供默认值以兼容旧数据

# partition_spec: 定义分区策略,对查询性能至关重要
partition_spec:
  - field: "created_at"
    transform: "day" # 按天分区

# table_properties: Iceberg 表的其他配置
table_properties:
  "format-version": "2"
  "write.target-file-size-bytes": "536870912" # 512MB

第二步:构建 Kotlin 模式管理服务

这是系统的核心引擎。我们使用 Ktor 来搭建服务,并引入 Iceberg 的 Core API 来与数据湖交互。

项目结构与依赖

build.gradle.kts 中的关键依赖:

// build.gradle.kts
dependencies {
    // Ktor for web server
    implementation("io.ktor:ktor-server-core:$ktorVersion")
    implementation("io.ktor:ktor-server-netty:$ktorVersion")
    implementation("io.ktor:ktor-server-content-negotiation:$ktorVersion")
    implementation("io.ktor:ktor-serialization-jackson:$ktorVersion")

    // Iceberg
    implementation("org.apache.iceberg:iceberg-core:1.4.2")
    implementation("org.apache.iceberg:iceberg-aws-bundle:1.4.2") // 如果使用 S3
    implementation("org.apache.iceberg:iceberg-jdbc-catalog:1.4.2") // 使用 JDBC Catalog

    // Logging
    implementation("ch.qos.logback:logback-classic:1.4.11")

    // Jackson for YAML parsing
    implementation("com.fasterxml.jackson.dataformat:jackson-data-format-yaml:2.15.2")
    implementation("com.fasterxml.jackson.module:jackson-module-kotlin:2.15.2")
}

数据契约的 Kotlin 实现

使用 Kotlin 的数据类来精确映射 YAML 结构,这为后续处理提供了类型安全。

// src/main/kotlin/com/datacore/schemas/models/SchemaContracts.kt
package com.datacore.schemas.models

import com.fasterxml.jackson.annotation.JsonProperty

// 整个 schema 文件的顶层结构
data class TableDefinition(
    @JsonProperty("table_name")
    val tableName: String,
    val schema: SchemaDefinition,
    @JsonProperty("partition_spec")
    val partitionSpec: List<PartitionFieldDefinition>? = null,
    @JsonProperty("table_properties")
    val tableProperties: Map<String, String>? = null
)

// Schema 定义
data class SchemaDefinition(
    val columns: List<ColumnDefinition>
)

// 列定义
data class ColumnDefinition(
    val name: String,
    val type: String, // e.g., "long", "string", "struct<...>", etc.
    val doc: String?,
    val nullable: Boolean,
    val default: Any? = null // 用于模式演进时为新列提供默认值
)

// 分区定义
data class PartitionFieldDefinition(
    val field: String,
    val transform: String // e.g., "day", "month", "bucket[16]"
)

核心服务逻辑

SchemaService 负责与 Iceberg Catalog 交互,执行模式的比较与应用。在真实项目中,配置(如 Catalog URL)应当外部化,这里为简化示例而硬编码。

// src/main/kotlin/com/datacore/schemas/services/SchemaService.kt
package com.datacore.schemas.services

import com.datacore.schemas.models.TableDefinition
import org.apache.hadoop.conf.Configuration
import org.apache.iceberg.PartitionSpec
import org.apache.iceberg.Schema
import org.apache.iceberg.Table
import org.apache.iceberg.Transaction
import org.apache.iceberg.catalog.TableIdentifier
import org.apache.iceberg.jdbc.JdbcCatalog
import org.apache.iceberg.types.Type
import org.apache.iceberg.types.Types
import org.slf4j.LoggerFactory
import java.util.concurrent.atomic.AtomicInteger

class SchemaService {
    private val logger = LoggerFactory.getLogger(javaClass)

    // 在生产环境中,Catalog 实例应该是单例且被妥善管理的
    private val catalog = JdbcCatalog().apply {
        val hadoopConf = Configuration()
        this.setConf(hadoopConf)
        this.initialize("jdbc-catalog", mapOf(
            "uri" to "jdbc:postgresql://localhost:5432/iceberg_catalog",
            "jdbc.user" to "user",
            "jdbc.password" to "password",
            "warehouse" to "s3a://my-datalake-bucket/warehouse"
        ))
    }

    /**
     * 应用模式变更的核心方法。
     * @param definition 目标表定义
     * @param isDryRun 如果为 true,则只做验证不提交事务
     * @return 返回变更描述
     */
    fun applySchemaChange(definition: TableDefinition, isDryRun: Boolean): String {
        val tableIdentifier = TableIdentifier.parse(definition.tableName)
        
        return if (catalog.tableExists(tableIdentifier)) {
            // 表存在,执行更新逻辑
            val table = catalog.loadTable(tableIdentifier)
            updateTable(table, definition, isDryRun)
        } else {
            // 表不存在,执行创建逻辑
            createTable(tableIdentifier, definition, isDryRun)
        }
    }

    private fun createTable(identifier: TableIdentifier, definition: TableDefinition, isDryRun: Boolean): String {
        val schema = buildIcebergSchema(definition)
        val spec = buildPartitionSpec(schema, definition)
        
        if (isDryRun) {
            return "Dry Run: Table '${identifier}' would be created with ${schema.columns().size} columns."
        }
        
        logger.info("Creating new table: {}", identifier)
        catalog.createTable(identifier, schema, spec, definition.tableProperties)
        return "Successfully created table '${identifier}'."
    }

    private fun updateTable(table: Table, definition: TableDefinition, isDryRun: Boolean): String {
        val transaction = table.newTransaction()
        val currentSchema = table.schema()
        val targetSchema = buildIcebergSchema(definition)
        
        val changes = mutableListOf<String>()

        // 1. 比较列的变化
        val currentColumns = currentSchema.columns().associateBy { it.name() }
        val targetColumns = targetSchema.columns().associateBy { it.name() }

        // 检查需要添加的列
        targetColumns.filterKeys { it !in currentColumns.keys }.forEach { (name, col) ->
            transaction.updateSchema().addColumn(name, col.type(), col.doc()).commit()
            changes.add("ADD COLUMN '$name' ${col.type()}")
        }
        
        // 这里的逻辑可以更复杂,例如处理列类型提升、重命名、可空性变更等
        // 一个常见的错误是忽略了模式演进的兼容性规则。Iceberg 不允许破坏性的变更,
        // 例如删除列或将 optional 列变为 required。我们的服务必须遵守这些规则。

        // 2. 比较表属性的变化 (简化示例)
        val currentProps = table.properties()
        definition.tableProperties?.forEach { (key, value) ->
            if (currentProps[key] != value) {
                transaction.updateProperties().set(key, value).commit()
                changes.add("SET TBLPROPERTIES ('$key' = '$value')")
            }
        }

        if (changes.isEmpty()) {
            return "No schema changes detected for table '${table.name()}'."
        }

        val summary = "Detected changes for '${table.name()}': ${changes.joinToString("; ")}"
        if (isDryRun) {
            return "Dry Run: $summary"
        }

        logger.info("Committing transaction for table {}: {}", table.name(), summary)
        try {
            transaction.commitTransaction()
        } catch (e: Exception) {
            logger.error("Failed to commit schema change transaction for ${table.name()}", e)
            // 异常处理至关重要,必须确保事务失败时不会留下中间状态
            throw IllegalStateException("Transaction failed for ${table.name()}", e)
        }
        
        return "Successfully applied changes to table '${table.name()}'."
    }

    // 辅助函数,将我们的契约模型转换为 Iceberg 的内部模型
    private fun buildIcebergSchema(definition: TableDefinition): Schema {
        val nextId = AtomicInteger(1) // 在真实项目中,需要从 Catalog 获取 next-column-id 来保证 ID 唯一
        val columns = definition.schema.columns.map {
            val type = convertToIcebergType(it.type)
            if (it.nullable) {
                Types.NestedField.optional(nextId.getAndIncrement(), it.name, type, it.doc)
            } else {
                Types.NestedField.required(nextId.getAndIncrement(), it.name, type, it.doc)
            }
        }
        return Schema(columns)
    }

    private fun buildPartitionSpec(schema: Schema, definition: TableDefinition): PartitionSpec {
        if (definition.partitionSpec.isNullOrEmpty()) {
            return PartitionSpec.unpartitioned()
        }
        val builder = PartitionSpec.builderFor(schema)
        definition.partitionSpec.forEach {
            when (it.transform.lowercase()) {
                "day" -> builder.day(it.field)
                "month" -> builder.month(it.field)
                //... 支持更多转换
                else -> if (it.transform.startsWith("bucket")) {
                    val buckets = it.transform.substringAfter("[").substringBefore("]").toInt()
                    builder.bucket(it.field, buckets)
                }
            }
        }
        return builder.build()
    }
    
    // 类型的转换逻辑,这里只做简单示例
    private fun convertToIcebergType(typeStr: String): Type {
        return when(typeStr.lowercase()) {
            "string" -> Types.StringType.get()
            "long" -> Types.LongType.get()
            "integer" -> Types.IntegerType.get()
            "boolean" -> Types.BooleanType.get()
            "timestamp" -> Types.TimestampType.withZone()
            "date" -> Types.DateType.get()
            "double" -> Types.DoubleType.get()
            else -> throw IllegalArgumentException("Unsupported type: $typeStr")
        }
    }
}

API Endpoint

最后,我们用 Ktor 暴露两个端点:

// src/main/kotlin/com/datacore/schemas/Application.kt
package com.datacore.schemas

import com.datacore.schemas.models.TableDefinition
import com.datacore.schemas.services.SchemaService
import com.fasterxml.jackson.dataformat.yaml.YAMLMapper
import com.fasterxml.jackson.module.kotlin.registerKotlinModule
import io.ktor.server.application.*
import io.ktor.server.engine.*
import io.ktor.server.netty.*
import io.ktor.server.request.*
import io.ktor.server.response.*
import io.ktor.server.routing.*
import io.ktor.http.*

fun main() {
    embeddedServer(Netty, port = 8080, host = "0.0.0.0") {
        val schemaService = SchemaService()
        val yamlMapper = YAMLMapper().registerKotlinModule()

        routing {
            post("/v1/schema/validate") {
                try {
                    val yamlContent = call.receiveText()
                    val definition = yamlMapper.readValue(yamlContent, TableDefinition::class.java)
                    val result = schemaService.applySchemaChange(definition, isDryRun = true)
                    call.respond(HttpStatusCode.OK, mapOf("status" to "OK", "message" to result))
                } catch (e: Exception) {
                    call.respond(HttpStatusCode.BadRequest, mapOf("status" to "ERROR", "message" to e.message))
                }
            }
            post("/v1/schema/apply") {
                try {
                    val yamlContent = call.receiveText()
                    val definition = yamlMapper.readValue(yamlContent, TableDefinition::class.java)
                    val result = schemaService.applySchemaChange(definition, isDryRun = false)
                    call.respond(HttpStatusCode.OK, mapOf("status" to "OK", "message" to result))
                } catch (e: Exception) {
                    call.respond(HttpStatusCode.InternalServerError, mapOf("status" to "ERROR", "message" to e.message))
                }
            }
        }
    }.start(wait = true)
}

第三步:编排 GitLab CI/CD 流水线

现在,我们将这个服务与 GitLab CI/CD 集成,创建自动化的安全门禁。

.gitlab-ci.yml 文件定义了两个关键阶段:

# .gitlab-ci.yml

stages:
  - validate
  - deploy

variables:
  # 服务地址,通过 GitLab CI/CD 变量进行配置,区分环境
  SCHEMA_SERVICE_URL: "http://schema-service.internal:8080"

# 这个作业在每个 Merge Request 中运行
validate_schema_changes:
  stage: validate
  script:
    # 1. 查找所有发生变化的 .yml 文件
    - |
      FILES=$(git diff --name-only $CI_MERGE_REQUEST_TARGET_BRANCH_SHA...$CI_COMMIT_SHA | grep '.yml$')
      if [ -z "$FILES" ]; then
        echo "No schema files changed. Skipping validation."
        exit 0
      fi
    # 2. 对每个变更的文件调用 validate API
    - |
      for file in $FILES; do
        echo "Validating schema for $file..."
        response=$(curl --silent --show-error --request POST \
          --header "Content-Type: application/x-yaml" \
          --data-binary "@$file" \
          "$SCHEMA_SERVICE_URL/v1/schema/validate")
        
        # 检查 API 响应是否成功
        if echo "$response" | grep -q '"status":"ERROR"'; then
          echo "Validation failed for $file!"
          echo "API Response: $response"
          exit 1
        else
          echo "Validation successful for $file."
          echo "API Response: $response"
        fi
      done
  rules:
    - if: '$CI_PIPELINE_SOURCE == "merge_request_event"'

# 这个作业仅在代码合并到 main 分支后运行
apply_schema_changes:
  stage: deploy
  script:
    - |
      FILES=$(git diff --name-only HEAD~1...HEAD | grep '.yml$')
      if [ -z "$FILES" ]; then
        echo "No schema files in the last commit. Skipping deployment."
        exit 0
      fi
    - |
      for file in $FILES; do
        echo "Applying schema for $file..."
        response=$(curl --silent --show-error --request POST \
          --header "Content-Type: application/x-yaml" \
          --data-binary "@$file" \
          "$SCHEMA_SERVICE_URL/v1/schema/apply")

        if echo "$response" | grep -q '"status":"ERROR"'; then
          echo "Failed to apply schema for $file!"
          echo "API Response: $response"
          # 在真实项目中,这里应该触发告警和回滚流程
          exit 1
        else
          echo "Successfully applied schema for $file."
          echo "API Response: $response"
        fi
      done
  rules:
    - if: '$CI_COMMIT_BRANCH == "main"'

这里的关键在于 rules 的使用,它确保了 validate 作业只在 MR 中运行,而 deploy 作业只在主分支上运行。这种分离强制执行了“先审查,后部署”的原则。

当前方案的局限性与未来展望

这个系统已经极大地提升了我们数据湖模式管理的稳定性和效率,但它并非完美。

首先,目前的模式演进逻辑相对简单,仅支持添加列和修改表属性。对于更复杂的操作,如列重命名、类型提升(例如 intlong)、修改嵌套结构等,SchemaService 需要实现更复杂的比较和转换逻辑,并且要严格遵循 Iceberg 的兼容性规则。一个常见的错误是允许了不兼容的类型转换,这会导致数据读取失败。

其次,对于删除列这种破坏性操作,我们目前选择在 validate 阶段直接拒绝。未来的版本可以考虑引入“软删除”机制,即将列重命名为 _col_deprecated_ 并标记为废弃,在经过几个版本迭代确认无任何组件使用后再由独立的清理任务进行物理删除。

最后,整个系统的容错性依赖于 Kotlin 服务的健壮性和 Iceberg 事务的原子性。但如果 apply_schema_changes 作业在处理多个文件时中途失败,会造成部分成功部分失败的状态。一个更优的方案是让 API 支持批量处理多个模式变更,并将它们包裹在单个 Iceberg 事务中(如果 Catalog 支持),或者在 CI/CD 端引入更复杂的重试和状态管理机制。未来的迭代方向还将包括与数据血缘和数据质量监控系统的集成,实现变更影响的自动分析和数据验证。


  目录