我们面临一个普遍存在的技术痛点:检索增强生成(RAG)系统的知识库一旦建立,就会迅速过时。手动更新文档、重新生成嵌入向量、再将其导入向量数据库的过程,不仅效率低下,而且极易出错。在真实项目中,这种滞后意味着模型提供的答案可能基于陈旧信息,从而直接影响业务价值。我们需要的是一个自动化、事件驱动且资源高效的更新机制,一个能将“文档变更”这一业务事件无缝转化为“知识库更新”的技术流程。
初步的构想是建立一个以Git为单一事实来源(Single Source of Truth)的 MLOps 管道。当知识库维护人员向一个特定的Git仓库目录(例如 docs/
)推送新的Markdown文档或修改现有文档时,整个更新流程应该被自动触发。这个流程需要完成文档解析、向量化、数据存储和服务更新,全程无需人工干预。
技术选型决策围绕着几个核心原则:高并发处理能力、资源弹性、快速迭代和生态整合。
- **应用后端与向量化:Phoenix (Elixir)**。选择Elixir和Phoenix框架并非偶然。文档处理和对外部API(如Embedding模型)的调用是典型的IO密集型任务。BEAM虚拟机无与伦比的并发能力和轻量级进程模型,让我们能以极低的资源开销处理成千上万个文档块的并行向量化。我们将使用
Bumblebee
库在Elixir内部直接运行嵌入模型,避免了对外部Python服务的依赖。 - 向量存储:Weaviate。它是一个开源的向量数据库,内置了模块化设计,支持多种嵌入模型。其强大的过滤和混合搜索能力是生产环境RAG的刚需。更重要的是,它提供了稳定的API,易于集成。
- 服务运行时:Knative。我们的RAG查询API流量通常是突发性的。Knative Serving能够根据请求负载自动伸缩,甚至可以缩容至零。这意味着在没有查询时,我们的服务不消耗任何计算资源,极大地降低了闲置成本。这对于内部知识库这类非核心但重要的应用场景尤其有价值。
- CI/CD与自动化核心:GitLab CI/CD。GitLab CI/CD与Git仓库紧密集成,其强大的
rules
和workflow
功能可以精确地控制管道何时以及如何运行。我们可以轻松配置一个仅在docs/
目录发生变化时才触发的复杂管道。
整个系统的运转流程将如下所示:
graph TD A[Developer/Maintainer] -- git push --> B(GitLab Repository) B -- Webhook Trigger (on docs/ path change) --> C{GitLab CI/CD Pipeline} C --> D[1. Build Stage: Build Phoenix App Docker Image] D --> E[2. Ingest Stage: Run ingestion task] E --> F[Parse & Chunk Docs] F --> G[Generate Embeddings via Phoenix App] G --> H[Batch Insert into Weaviate] C --> I[3. Deploy Stage: Deploy new Phoenix App revision] I --> J{Knative Service} J --> K[Scale-to-Zero when idle] subgraph Knative Cluster J H(Weaviate Instance) end L[End User] -- RAG Query --> J
第一步: 核心Phoenix应用的构建
我们的Phoenix应用有两个主要职责:提供一个API端点用于RAG查询,以及一个内部任务用于处理文档的向量化和入库。
首先,我们需要一个模块来封装与Weaviate的交互。在真实项目中,直接使用HTTP客户端并手写JSON是不可维护的。这里我们创建一个简单的客户端模块,只实现我们需要的功能:批量创建对象。
lib/knowledge_base/weaviate_client.ex
defmodule KnowledgeBase.WeaviateClient do
@moduledoc """
A simple, pragmatic client for Weaviate batch operations.
In a real-world scenario, you might use a more complete library or expand this.
"""
require Logger
@weaviate_url Application.compile_env!(:knowledge_base, :weaviate_url)
@http_client Finch
def batch_create_objects(class_name, objects) do
# Weaviate's batch endpoint expects a specific JSON structure.
# The `objects` list should contain maps representing the data points.
payload = %{
"objects" =>
Enum.map(objects, fn object ->
%{
"class" => class_name,
"properties" => object.properties,
"vector" => object.vector
}
end)
}
headers = [
{"Content-Type", "application/json"}
]
# The request is built using Finch for robustness.
# Weaviate's batch API is asynchronous on its side, but our HTTP call is synchronous.
request =
Finch.build(:post, "#{@weaviate_url}/v1/batch/objects", headers, Jason.encode!(payload))
case @http_client.request(request, __MODULE__) do
{:ok, %{status: 200, body: body}} ->
# Even with a 200 OK, individual objects in the batch might have failed.
# A production system must parse the response body to check for errors.
parsed_body = Jason.decode!(body)
handle_batch_response(parsed_body)
:ok
{:ok, %{status: status, body: body}} ->
Logger.error(
"Weaviate batch import failed with status #{status}. Body: #{inspect(body)}"
)
{:error, :weaviate_request_failed}
{:error, reason} ->
Logger.error("HTTP client request to Weaviate failed: #{inspect(reason)}")
{:error, reason}
end
end
# A helper to log errors from the batch response. A real system might
# implement retry logic for failed items.
defp handle_batch_response(results) do
Enum.each(results, fn result ->
case result["result"]["status"] do
"SUCCESS" ->
:ok
"FAILED" ->
error_messages =
result["result"]["errors"]["error"]
|> Enum.map(& &1["message"])
|> Enum.join(", ")
Logger.warning(
"Failed to import object #{result["id"]}: #{error_messages}"
)
end
end)
end
end
接下来,是文档处理和向量化的核心逻辑。我们创建一个Ingestion
模块,它将包含读取文件、分块、调用嵌入模型并使用上述客户端将其加载到Weaviate的函数。这里我们使用Bumblebee.Text.TextEmbedding
来加载一个预训练的嵌入模型。
lib/knowledge_base/ingestion.ex
defmodule KnowledgeBase.Ingestion do
require Logger
alias KnowledgeBase.WeaviateClient
# The name of the collection in Weaviate. This should be configured.
@class_name "DocumentChunk"
# We batch inserts to Weaviate for efficiency.
@batch_size 50
def run(docs_path) do
Logger.info("Starting ingestion process for path: #{docs_path}")
# Load the embedding model serving.
# Nx.Serving makes managing the model lifecycle straightforward.
{:ok, serving} =
Bumblebee.Text.text_embedding(
model: "sentence-transformers/all-MiniLM-L6-v2",
# Important: We specify compilation for batching to dramatically
# speed up embedding generation for multiple chunks at once.
compile: [batch_size: @batch_size, sequence_length: 512],
defn_options: [compiler: EXLA]
)
docs_path
|> Path.join("**/*.md")
|> Path.wildcard()
|> Task.async_stream(&process_file(&1, serving), max_concurrency: System.schedulers_online() * 2)
|> Stream.run()
Logger.info("Ingestion process completed.")
end
defp process_file(file_path, serving) do
Logger.debug("Processing file: #{file_path}")
file_path
|> File.read!()
|> chunk_text() # Simple chunking logic, could be more sophisticated.
|> Enum.chunk_every(@batch_size)
|> Enum.each(fn chunk_batch ->
# Batch embedding generation is a massive performance win.
# Instead of one-by-one, we send a batch of texts to the model.
embeddings = Bumblebee.apply(serving, %{text: chunk_batch})
# The result from Bumblebee is a map of tensors. We need to extract them.
vectors = embeddings.result |> Nx.to_list()
objects_to_create =
Enum.zip(chunk_batch, vectors)
|> Enum.map(fn {text, vector} ->
%{
properties: %{
content: text,
source: Path.basename(file_path)
},
vector: vector
}
end)
# Send the prepared batch to Weaviate.
case WeaviateClient.batch_create_objects(@class_name, objects_to_create) do
:ok ->
Logger.info("Successfully imported batch of #{Enum.count(objects_to_create)} chunks.")
{:error, reason} ->
# In a production system, this failure needs to be handled,
# possibly with a dead-letter queue or more robust retry mechanism.
Logger.error("Failed to import batch: #{inspect(reason)}")
end
end)
end
# A very basic text chunker. Production systems need more advanced strategies
# like recursive character splitting or semantic chunking.
defp chunk_text(text) do
text
|> String.split(~r/\n\s*\n/, trim: true) # Split by double newlines
|> Enum.filter(&(&1 != ""))
end
end
为了能在CI/CD环境中执行这个任务,我们需要在mix.exs
中创建一个自定义的Mix任务。
mix.exs
(snippet)
defp aliases do
[
# ... other aliases
"run_ingestion": ["run priv/repo/ingestion_runner.exs"]
]
end
priv/repo/ingestion_runner.exs
# This script is executed by `mix run_ingestion`.
# It's designed to be run from the command line in our CI job.
# Start the application to get configuration and dependencies.
Mix.Task.run("app.start")
# The path to the documents is passed as a command-line argument.
docs_path =
case System.argv() do
[path] ->
path
_ ->
raise "Usage: mix run_ingestion <path_to_docs_folder>"
end
if File.dir?(docs_path) do
KnowledgeBase.Ingestion.run(docs_path)
else
raise "Provided path is not a directory: #{docs_path}"
end
第二步: 应用的容器化
一个健壮的多阶段Dockerfile
是必不可少的。它能确保构建环境和运行环境的隔离,并产生一个最小化的、更安全的最终镜像。
Dockerfile
# Stage 1: Builder
# Use an official Elixir image that includes hex and rebar.
FROM hexpm/elixir:1.15.7-erlang-26.1.2-debian-bullseye-20230904-slim AS builder
# Set build-time arguments for environment-specific compilation.
ARG MIX_ENV=prod
ENV MIX_ENV=${MIX_ENV}
WORKDIR /app
# Install build dependencies.
# We need gcc for native code compilation (e.g., NIFs) and git for deps.
RUN apt-get update -y && apt-get install -y build-essential git \
&& apt-get clean && rm -f /var/lib/apt/lists/*_*
# Install Hex and Rebar3
RUN mix local.hex --force && \
mix local.rebar --force
# Copy over the dependency definitions.
COPY mix.exs mix.lock ./
COPY config config
# Fetch and compile dependencies. This layer is cached as long as mix.lock doesn't change.
RUN mix deps.get --only $MIX_ENV
RUN mix deps.compile
# Copy the rest of the application source code.
COPY priv priv
COPY lib lib
# Compile the application. This includes compiling protocols.
# The `MIX_ENV` ensures we use production configuration.
RUN mix compile
# Build the release using `mix release`. This creates a self-contained,
# runnable artifact that doesn't need Elixir/Mix installed in the final image.
RUN mix release
# Stage 2: Runner
# Use a minimal base image for the final production container.
FROM debian:bullseye-slim
# Set environment variables. The `REPLACE_OS_VARS` is crucial for runtime configuration.
ENV LANG=C.UTF-8 \
SHELL=/bin/bash \
REPLACE_OS_VARS=true
WORKDIR /app
# Install runtime dependencies. For Elixir releases, we only need a few libs.
# libssl for crypto, and potentially others depending on NIFs.
RUN apt-get update -y && apt-get install -y libssl-dev openssl \
&& apt-get clean && rm -f /var/lib/apt/lists/*_*
# Copy the compiled release from the builder stage.
COPY /app/_build/prod/rel/knowledge_base .
# The entrypoint is the script generated by `mix release`.
# It's important to use `foreground` to keep the container running.
ENTRYPOINT ["/app/bin/knowledge_base", "foreground"]
第三步: Knative服务定义
Knative Serving通过一个简单的YAML文件来描述服务。我们将定义一个Service
,它会自动管理Configuration
、Revision
和Route
。
config/knative/service.yaml
apiVersion: serving.knative.dev/v1
kind: Service
metadata:
name: rag-phoenix-service
spec:
template:
metadata:
annotations:
# This is a key Knative feature. It defines the number of concurrent
# requests a single pod can handle before scaling up.
# For I/O bound Phoenix apps, this can be set relatively high (e.g., 200).
autoscaling.knative.dev/target: "200"
# Defines the minimum and maximum number of pods.
# Setting minScale to 0 enables scale-to-zero.
autoscaling.knative.dev/minScale: "0"
autoscaling.knative.dev/maxScale: "10"
spec:
containers:
- image: your-registry/rag-phoenix-service:latest # This will be replaced by GitLab CI
ports:
- containerPort: 4000 # The port Phoenix is listening on
env:
- name: PORT
value: "4000"
- name: SECRET_KEY_BASE
valueFrom:
secretKeyRef:
name: phoenix-secrets
key: secret_key_base
- name: WEAVIATE_URL
valueFrom:
configMapKeyRef:
name: app-config
key: weaviate.url
# A common mistake is not setting readiness and liveness probes.
# Knative uses these to determine if a new revision is healthy before shifting traffic.
readinessProbe:
httpGet:
path: /api/health
initialDelaySeconds: 5
periodSeconds: 10
livenessProbe:
httpGet:
path: /api/health
initialDelaySeconds: 15
periodSeconds: 30
第四步: GitLab CI/CD 自动化管道
这是将所有部分粘合在一起的核心。.gitlab-ci.yml
文件定义了从代码提交到服务更新的完整流程。我们将使用rules
来确保管道仅在docs/
目录或应用源代码发生变化时才运行。
.gitlab-ci.yml
variables:
# Using GitLab's built-in variables for Docker image naming.
IMAGE_TAG: $CI_REGISTRY_IMAGE:$CI_COMMIT_REF_SLUG
KUBE_CONTEXT: "your-agent/your-cluster:agent-name" # Assumes GitLab Agent for Kubernetes is configured
stages:
- build
- ingest
- deploy
build_image:
stage: build
image: docker:24.0.5
services:
- docker:24.0.5-dind
script:
- docker login -u $CI_REGISTRY_USER -p $CI_REGISTRY_PASSWORD $CI_REGISTRY
- docker build -t $IMAGE_TAG .
- docker push $IMAGE_TAG
rules:
# Run this job if source code changes OR if docs change (because ingest job needs it).
- if: '$CI_PIPELINE_SOURCE == "merge_request_event"'
changes:
- lib/**/*
- config/**/*
- mix.exs
- mix.lock
- Dockerfile
- if: '$CI_COMMIT_BRANCH == "main"'
changes:
- lib/**/*
- config/**/*
- mix.exs
- mix.lock
- Dockerfile
- docs/**/* # Trigger build also when docs change
run_data_ingestion:
stage: ingest
image: $IMAGE_TAG # Use the image we just built, it has all dependencies.
script:
# The script runs our custom mix task, pointing it to the docs directory.
- echo "Starting data ingestion from docs/ folder..."
- mix run_ingestion docs/
# The real pitfall here is error handling. If this script fails, the pipeline should fail.
# The `set -e` shell option is implicitly on in GitLab CI runners.
needs:
- build_image
rules:
# CRITICAL: This job ONLY runs on the main branch when the docs/ directory has changes.
# This prevents re-ingesting data on every code change.
- if: '$CI_COMMIT_BRANCH == "main"'
changes:
- docs/**/*
deploy_to_knative:
stage: deploy
image:
name: bitnami/kubectl:latest
entrypoint: [""]
script:
- echo "Deploying to Knative..."
- kubectl config use-context $KUBE_CONTEXT
# A common mistake is to hardcode the image tag in the YAML.
# We use `sed` to dynamically substitute the image tag built in the 'build' stage.
- sed -i "s|image: your-registry/rag-phoenix-service:latest|image: $IMAGE_TAG|g" config/knative/service.yaml
- kubectl apply -f config/knative/service.yaml
# Optional but recommended: Wait for the new revision to be ready.
- kubectl wait --for=condition=Ready revision -l serving.knative.dev/service=rag-phoenix-service --timeout=5m
needs:
- build_image
rules:
# Deploy when any application code changes on the main branch.
- if: '$CI_COMMIT_BRANCH == "main"'
changes:
- lib/**/*
- config/**/*
- mix.exs
- mix.lock
- Dockerfile
- config/knative/service.yaml
# Also deploy after a successful data ingestion to ensure the service
# is running the latest code that might be compatible with the new data structure.
- if: '$CI_COMMIT_BRANCH == "main"'
changes:
- docs/**/*
这个管道设计体现了几个务实的考量:
- 分离关注点:
build
,ingest
,deploy
是独立的阶段。 - 条件执行:
run_data_ingestion
任务仅在docs/
目录变更时在main
分支上运行,避免了不必要的、昂贵的数据处理。 - 动态配置: 使用
sed
动态更新Knative的YAML文件,确保部署的是刚刚构建的镜像,这是GitOps流程中的一个关键实践。 - 依赖关系:
deploy
和ingest
都依赖于build
阶段的成功。
现在,整个流程已经闭环。当知识库维护者在本地完成文档撰写,执行 git add docs/new-feature.md
, git commit -m "Add docs for new feature"
,然后 git push origin main
,GitLab CI/CD会立即接管。管道将自动构建新镜像,执行数据入库任务将新文档向量化并存入Weaviate,最后部署一个新的Knative服务版本。终端用户在几分钟后就能通过RAG API查询到关于这个新功能的信息。
这个方案目前的局限性在于数据摄取过程。当前的实现是在一个同步的CI/CD Job中完成的。如果文档数量巨大,比如一次性提交了上万个文件,CI Job可能会运行很长时间,甚至超时。一个更健壮的演进方向是采用Knative Eventing。GitLab可以发送一个Webhook事件到Knative Eventing的Broker,然后由一个专门用于数据摄取的、可独立伸缩的Knative服务来异步处理这些文档。这将使整个系统对大批量数据更新更具弹性和可观测性。此外,对于向量的删除和更新策略也需要更精细的设计,例如,当一个文档被从Git仓库中删除时,需要有对应的流程来清理Weaviate中相关的向量数据。