构建基于Knative和Weaviate的事件驱动型RAG知识库自动化更新管道


我们面临一个普遍存在的技术痛点:检索增强生成(RAG)系统的知识库一旦建立,就会迅速过时。手动更新文档、重新生成嵌入向量、再将其导入向量数据库的过程,不仅效率低下,而且极易出错。在真实项目中,这种滞后意味着模型提供的答案可能基于陈旧信息,从而直接影响业务价值。我们需要的是一个自动化、事件驱动且资源高效的更新机制,一个能将“文档变更”这一业务事件无缝转化为“知识库更新”的技术流程。

初步的构想是建立一个以Git为单一事实来源(Single Source of Truth)的 MLOps 管道。当知识库维护人员向一个特定的Git仓库目录(例如 docs/)推送新的Markdown文档或修改现有文档时,整个更新流程应该被自动触发。这个流程需要完成文档解析、向量化、数据存储和服务更新,全程无需人工干预。

技术选型决策围绕着几个核心原则:高并发处理能力、资源弹性、快速迭代和生态整合。

  1. **应用后端与向量化:Phoenix (Elixir)**。选择Elixir和Phoenix框架并非偶然。文档处理和对外部API(如Embedding模型)的调用是典型的IO密集型任务。BEAM虚拟机无与伦比的并发能力和轻量级进程模型,让我们能以极低的资源开销处理成千上万个文档块的并行向量化。我们将使用 Bumblebee 库在Elixir内部直接运行嵌入模型,避免了对外部Python服务的依赖。
  2. 向量存储:Weaviate。它是一个开源的向量数据库,内置了模块化设计,支持多种嵌入模型。其强大的过滤和混合搜索能力是生产环境RAG的刚需。更重要的是,它提供了稳定的API,易于集成。
  3. 服务运行时:Knative。我们的RAG查询API流量通常是突发性的。Knative Serving能够根据请求负载自动伸缩,甚至可以缩容至零。这意味着在没有查询时,我们的服务不消耗任何计算资源,极大地降低了闲置成本。这对于内部知识库这类非核心但重要的应用场景尤其有价值。
  4. CI/CD与自动化核心:GitLab CI/CD。GitLab CI/CD与Git仓库紧密集成,其强大的 rulesworkflow 功能可以精确地控制管道何时以及如何运行。我们可以轻松配置一个仅在 docs/ 目录发生变化时才触发的复杂管道。

整个系统的运转流程将如下所示:

graph TD
    A[Developer/Maintainer] -- git push --> B(GitLab Repository)
    B -- Webhook Trigger (on docs/ path change) --> C{GitLab CI/CD Pipeline}
    C --> D[1. Build Stage: Build Phoenix App Docker Image]
    D --> E[2. Ingest Stage: Run ingestion task]
    E --> F[Parse & Chunk Docs]
    F --> G[Generate Embeddings via Phoenix App]
    G --> H[Batch Insert into Weaviate]
    C --> I[3. Deploy Stage: Deploy new Phoenix App revision]
    I --> J{Knative Service}
    J --> K[Scale-to-Zero when idle]
    subgraph Knative Cluster
        J
        H(Weaviate Instance)
    end
    L[End User] -- RAG Query --> J

第一步: 核心Phoenix应用的构建

我们的Phoenix应用有两个主要职责:提供一个API端点用于RAG查询,以及一个内部任务用于处理文档的向量化和入库。

首先,我们需要一个模块来封装与Weaviate的交互。在真实项目中,直接使用HTTP客户端并手写JSON是不可维护的。这里我们创建一个简单的客户端模块,只实现我们需要的功能:批量创建对象。

lib/knowledge_base/weaviate_client.ex

defmodule KnowledgeBase.WeaviateClient do
  @moduledoc """
  A simple, pragmatic client for Weaviate batch operations.
  In a real-world scenario, you might use a more complete library or expand this.
  """

  require Logger

  @weaviate_url Application.compile_env!(:knowledge_base, :weaviate_url)
  @http_client Finch

  def batch_create_objects(class_name, objects) do
    # Weaviate's batch endpoint expects a specific JSON structure.
    # The `objects` list should contain maps representing the data points.
    payload = %{
      "objects" =>
        Enum.map(objects, fn object ->
          %{
            "class" => class_name,
            "properties" => object.properties,
            "vector" => object.vector
          }
        end)
    }

    headers = [
      {"Content-Type", "application/json"}
    ]

    # The request is built using Finch for robustness.
    # Weaviate's batch API is asynchronous on its side, but our HTTP call is synchronous.
    request =
      Finch.build(:post, "#{@weaviate_url}/v1/batch/objects", headers, Jason.encode!(payload))

    case @http_client.request(request, __MODULE__) do
      {:ok, %{status: 200, body: body}} ->
        # Even with a 200 OK, individual objects in the batch might have failed.
        # A production system must parse the response body to check for errors.
        parsed_body = Jason.decode!(body)
        handle_batch_response(parsed_body)
        :ok

      {:ok, %{status: status, body: body}} ->
        Logger.error(
          "Weaviate batch import failed with status #{status}. Body: #{inspect(body)}"
        )
        {:error, :weaviate_request_failed}

      {:error, reason} ->
        Logger.error("HTTP client request to Weaviate failed: #{inspect(reason)}")
        {:error, reason}
    end
  end

  # A helper to log errors from the batch response. A real system might
  # implement retry logic for failed items.
  defp handle_batch_response(results) do
    Enum.each(results, fn result ->
      case result["result"]["status"] do
        "SUCCESS" ->
          :ok

        "FAILED" ->
          error_messages =
            result["result"]["errors"]["error"]
            |> Enum.map(& &1["message"])
            |> Enum.join(", ")

          Logger.warning(
            "Failed to import object #{result["id"]}: #{error_messages}"
          )
      end
    end)
  end
end

接下来,是文档处理和向量化的核心逻辑。我们创建一个Ingestion模块,它将包含读取文件、分块、调用嵌入模型并使用上述客户端将其加载到Weaviate的函数。这里我们使用Bumblebee.Text.TextEmbedding来加载一个预训练的嵌入模型。

lib/knowledge_base/ingestion.ex

defmodule KnowledgeBase.Ingestion do
  require Logger

  alias KnowledgeBase.WeaviateClient

  # The name of the collection in Weaviate. This should be configured.
  @class_name "DocumentChunk"

  # We batch inserts to Weaviate for efficiency.
  @batch_size 50

  def run(docs_path) do
    Logger.info("Starting ingestion process for path: #{docs_path}")

    # Load the embedding model serving.
    # Nx.Serving makes managing the model lifecycle straightforward.
    {:ok, serving} =
      Bumblebee.Text.text_embedding(
        model: "sentence-transformers/all-MiniLM-L6-v2",
        # Important: We specify compilation for batching to dramatically
        # speed up embedding generation for multiple chunks at once.
        compile: [batch_size: @batch_size, sequence_length: 512],
        defn_options: [compiler: EXLA]
      )

    docs_path
    |> Path.join("**/*.md")
    |> Path.wildcard()
    |> Task.async_stream(&process_file(&1, serving), max_concurrency: System.schedulers_online() * 2)
    |> Stream.run()

    Logger.info("Ingestion process completed.")
  end

  defp process_file(file_path, serving) do
    Logger.debug("Processing file: #{file_path}")

    file_path
    |> File.read!()
    |> chunk_text() # Simple chunking logic, could be more sophisticated.
    |> Enum.chunk_every(@batch_size)
    |> Enum.each(fn chunk_batch ->
      # Batch embedding generation is a massive performance win.
      # Instead of one-by-one, we send a batch of texts to the model.
      embeddings = Bumblebee.apply(serving, %{text: chunk_batch})
      # The result from Bumblebee is a map of tensors. We need to extract them.
      vectors = embeddings.result |> Nx.to_list()

      objects_to_create =
        Enum.zip(chunk_batch, vectors)
        |> Enum.map(fn {text, vector} ->
          %{
            properties: %{
              content: text,
              source: Path.basename(file_path)
            },
            vector: vector
          }
        end)

      # Send the prepared batch to Weaviate.
      case WeaviateClient.batch_create_objects(@class_name, objects_to_create) do
        :ok ->
          Logger.info("Successfully imported batch of #{Enum.count(objects_to_create)} chunks.")
        {:error, reason} ->
          # In a production system, this failure needs to be handled,
          # possibly with a dead-letter queue or more robust retry mechanism.
          Logger.error("Failed to import batch: #{inspect(reason)}")
      end
    end)
  end

  # A very basic text chunker. Production systems need more advanced strategies
  # like recursive character splitting or semantic chunking.
  defp chunk_text(text) do
    text
    |> String.split(~r/\n\s*\n/, trim: true) # Split by double newlines
    |> Enum.filter(&(&1 != ""))
  end
end

为了能在CI/CD环境中执行这个任务,我们需要在mix.exs中创建一个自定义的Mix任务。

mix.exs (snippet)

defp aliases do
  [
    # ... other aliases
    "run_ingestion": ["run priv/repo/ingestion_runner.exs"]
  ]
end

priv/repo/ingestion_runner.exs

# This script is executed by `mix run_ingestion`.
# It's designed to be run from the command line in our CI job.

# Start the application to get configuration and dependencies.
Mix.Task.run("app.start")

# The path to the documents is passed as a command-line argument.
docs_path =
  case System.argv() do
    [path] ->
      path
    _ ->
      raise "Usage: mix run_ingestion <path_to_docs_folder>"
  end

if File.dir?(docs_path) do
  KnowledgeBase.Ingestion.run(docs_path)
else
  raise "Provided path is not a directory: #{docs_path}"
end

第二步: 应用的容器化

一个健壮的多阶段Dockerfile是必不可少的。它能确保构建环境和运行环境的隔离,并产生一个最小化的、更安全的最终镜像。

Dockerfile

# Stage 1: Builder
# Use an official Elixir image that includes hex and rebar.
FROM hexpm/elixir:1.15.7-erlang-26.1.2-debian-bullseye-20230904-slim AS builder

# Set build-time arguments for environment-specific compilation.
ARG MIX_ENV=prod

ENV MIX_ENV=${MIX_ENV}

WORKDIR /app

# Install build dependencies.
# We need gcc for native code compilation (e.g., NIFs) and git for deps.
RUN apt-get update -y && apt-get install -y build-essential git \
    && apt-get clean && rm -f /var/lib/apt/lists/*_*

# Install Hex and Rebar3
RUN mix local.hex --force && \
    mix local.rebar --force

# Copy over the dependency definitions.
COPY mix.exs mix.lock ./
COPY config config

# Fetch and compile dependencies. This layer is cached as long as mix.lock doesn't change.
RUN mix deps.get --only $MIX_ENV
RUN mix deps.compile

# Copy the rest of the application source code.
COPY priv priv
COPY lib lib

# Compile the application. This includes compiling protocols.
# The `MIX_ENV` ensures we use production configuration.
RUN mix compile

# Build the release using `mix release`. This creates a self-contained,
# runnable artifact that doesn't need Elixir/Mix installed in the final image.
RUN mix release

# Stage 2: Runner
# Use a minimal base image for the final production container.
FROM debian:bullseye-slim

# Set environment variables. The `REPLACE_OS_VARS` is crucial for runtime configuration.
ENV LANG=C.UTF-8 \
    SHELL=/bin/bash \
    REPLACE_OS_VARS=true

WORKDIR /app

# Install runtime dependencies. For Elixir releases, we only need a few libs.
# libssl for crypto, and potentially others depending on NIFs.
RUN apt-get update -y && apt-get install -y libssl-dev openssl \
    && apt-get clean && rm -f /var/lib/apt/lists/*_*

# Copy the compiled release from the builder stage.
COPY --from=builder /app/_build/prod/rel/knowledge_base .

# The entrypoint is the script generated by `mix release`.
# It's important to use `foreground` to keep the container running.
ENTRYPOINT ["/app/bin/knowledge_base", "foreground"]

第三步: Knative服务定义

Knative Serving通过一个简单的YAML文件来描述服务。我们将定义一个Service,它会自动管理ConfigurationRevisionRoute

config/knative/service.yaml

apiVersion: serving.knative.dev/v1
kind: Service
metadata:
  name: rag-phoenix-service
spec:
  template:
    metadata:
      annotations:
        # This is a key Knative feature. It defines the number of concurrent
        # requests a single pod can handle before scaling up.
        # For I/O bound Phoenix apps, this can be set relatively high (e.g., 200).
        autoscaling.knative.dev/target: "200"

        # Defines the minimum and maximum number of pods.
        # Setting minScale to 0 enables scale-to-zero.
        autoscaling.knative.dev/minScale: "0"
        autoscaling.knative.dev/maxScale: "10"
    spec:
      containers:
        - image: your-registry/rag-phoenix-service:latest # This will be replaced by GitLab CI
          ports:
            - containerPort: 4000 # The port Phoenix is listening on
          env:
            - name: PORT
              value: "4000"
            - name: SECRET_KEY_BASE
              valueFrom:
                secretKeyRef:
                  name: phoenix-secrets
                  key: secret_key_base
            - name: WEAVIATE_URL
              valueFrom:
                configMapKeyRef:
                  name: app-config
                  key: weaviate.url
          # A common mistake is not setting readiness and liveness probes.
          # Knative uses these to determine if a new revision is healthy before shifting traffic.
          readinessProbe:
            httpGet:
              path: /api/health
            initialDelaySeconds: 5
            periodSeconds: 10
          livenessProbe:
            httpGet:
              path: /api/health
            initialDelaySeconds: 15
            periodSeconds: 30

第四步: GitLab CI/CD 自动化管道

这是将所有部分粘合在一起的核心。.gitlab-ci.yml文件定义了从代码提交到服务更新的完整流程。我们将使用rules来确保管道仅在docs/目录或应用源代码发生变化时才运行。

.gitlab-ci.yml

variables:
  # Using GitLab's built-in variables for Docker image naming.
  IMAGE_TAG: $CI_REGISTRY_IMAGE:$CI_COMMIT_REF_SLUG
  KUBE_CONTEXT: "your-agent/your-cluster:agent-name" # Assumes GitLab Agent for Kubernetes is configured

stages:
  - build
  - ingest
  - deploy

build_image:
  stage: build
  image: docker:24.0.5
  services:
    - docker:24.0.5-dind
  script:
    - docker login -u $CI_REGISTRY_USER -p $CI_REGISTRY_PASSWORD $CI_REGISTRY
    - docker build -t $IMAGE_TAG .
    - docker push $IMAGE_TAG
  rules:
    # Run this job if source code changes OR if docs change (because ingest job needs it).
    - if: '$CI_PIPELINE_SOURCE == "merge_request_event"'
      changes:
        - lib/**/*
        - config/**/*
        - mix.exs
        - mix.lock
        - Dockerfile
    - if: '$CI_COMMIT_BRANCH == "main"'
      changes:
        - lib/**/*
        - config/**/*
        - mix.exs
        - mix.lock
        - Dockerfile
        - docs/**/* # Trigger build also when docs change

run_data_ingestion:
  stage: ingest
  image: $IMAGE_TAG # Use the image we just built, it has all dependencies.
  script:
    # The script runs our custom mix task, pointing it to the docs directory.
    - echo "Starting data ingestion from docs/ folder..."
    - mix run_ingestion docs/
    # The real pitfall here is error handling. If this script fails, the pipeline should fail.
    # The `set -e` shell option is implicitly on in GitLab CI runners.
  needs:
    - build_image
  rules:
    # CRITICAL: This job ONLY runs on the main branch when the docs/ directory has changes.
    # This prevents re-ingesting data on every code change.
    - if: '$CI_COMMIT_BRANCH == "main"'
      changes:
        - docs/**/*

deploy_to_knative:
  stage: deploy
  image:
    name: bitnami/kubectl:latest
    entrypoint: [""]
  script:
    - echo "Deploying to Knative..."
    - kubectl config use-context $KUBE_CONTEXT
    # A common mistake is to hardcode the image tag in the YAML.
    # We use `sed` to dynamically substitute the image tag built in the 'build' stage.
    - sed -i "s|image: your-registry/rag-phoenix-service:latest|image: $IMAGE_TAG|g" config/knative/service.yaml
    - kubectl apply -f config/knative/service.yaml
    # Optional but recommended: Wait for the new revision to be ready.
    - kubectl wait --for=condition=Ready revision -l serving.knative.dev/service=rag-phoenix-service --timeout=5m
  needs:
    - build_image
  rules:
    # Deploy when any application code changes on the main branch.
    - if: '$CI_COMMIT_BRANCH == "main"'
      changes:
        - lib/**/*
        - config/**/*
        - mix.exs
        - mix.lock
        - Dockerfile
        - config/knative/service.yaml
    # Also deploy after a successful data ingestion to ensure the service
    # is running the latest code that might be compatible with the new data structure.
    - if: '$CI_COMMIT_BRANCH == "main"'
      changes:
        - docs/**/*

这个管道设计体现了几个务实的考量:

  1. 分离关注点: build, ingest, deploy是独立的阶段。
  2. 条件执行: run_data_ingestion 任务仅在 docs/ 目录变更时在 main 分支上运行,避免了不必要的、昂贵的数据处理。
  3. 动态配置: 使用 sed 动态更新Knative的YAML文件,确保部署的是刚刚构建的镜像,这是GitOps流程中的一个关键实践。
  4. 依赖关系: deployingest 都依赖于 build 阶段的成功。

现在,整个流程已经闭环。当知识库维护者在本地完成文档撰写,执行 git add docs/new-feature.md, git commit -m "Add docs for new feature",然后 git push origin main,GitLab CI/CD会立即接管。管道将自动构建新镜像,执行数据入库任务将新文档向量化并存入Weaviate,最后部署一个新的Knative服务版本。终端用户在几分钟后就能通过RAG API查询到关于这个新功能的信息。

这个方案目前的局限性在于数据摄取过程。当前的实现是在一个同步的CI/CD Job中完成的。如果文档数量巨大,比如一次性提交了上万个文件,CI Job可能会运行很长时间,甚至超时。一个更健壮的演进方向是采用Knative Eventing。GitLab可以发送一个Webhook事件到Knative Eventing的Broker,然后由一个专门用于数据摄取的、可独立伸缩的Knative服务来异步处理这些文档。这将使整个系统对大批量数据更新更具弹性和可观测性。此外,对于向量的删除和更新策略也需要更精细的设计,例如,当一个文档被从Git仓库中删除时,需要有对应的流程来清理Weaviate中相关的向量数据。


  目录