为 Rust Vercel Functions 构建基于 Consul 和 Vercel KV 的热缓存服务发现机制


一个看似简单的需求,将一个 Rust 编写的 Vercel Function 连接到部署在内部环境的某个 gRPC 服务。服务地址由 Consul 统一管理。最初的实现直接而粗暴:函数每次执行时,都通过 Consul 的 HTTP API 查询目标服务的健康实例,然后建立连接。

// api/naive_lookup.rs
// 这是一个错误的、不应在生产中使用的示例

use vercel_runtime::{run, Body, Error, Request, Response, StatusCode};
use std::collections::HashMap;
use serde::Deserialize;

#[derive(Deserialize, Debug)]
struct ConsulService {
    #[serde(rename = "ServiceAddress")]
    address: String,
    #[serde(rename = "ServicePort")]
    port: u16,
}

async fn handler(_req: Request) -> Result<Response<Body>, Error> {
    let consul_addr = std::env::var("CONSUL_HTTP_ADDR")
        .unwrap_or_else(|_| "http://127.0.0.1:8500".to_string());
    let service_name = "user-profile-service";

    // 每次调用都查询 Consul
    let client = reqwest::Client::new();
    let request_url = format!(
        "{}/v1/health/service/{}?passing",
        consul_addr, service_name
    );

    let start_time = std::time::Instant::now();
    let resp = client.get(&request_url).send().await?;
    
    if !resp.status().is_success() {
        // ... 错误处理
        return Ok(Response::builder()
            .status(StatusCode::INTERNAL_SERVER_ERROR)
            .body("Failed to query Consul".into())?);
    }

    let services: Vec<ConsulService> = resp.json().await?;
    let elapsed = start_time.elapsed();

    // 简单地选择第一个健康实例
    let service_endpoint = services.first().map(|s| format!("{}:{}", s.address, s.port));
    
    let response_body = format!(
        "Consul lookup took: {:?}. Service endpoint: {:?}",
        elapsed, service_endpoint
    );

    Ok(Response::builder()
        .status(StatusCode::OK)
        .header("Content-Type", "text/plain")
        .body(response_body.into())?)
}

#[tokio::main]
async fn main() -> Result<(), Error> {
    run(handler).await
}

在本地测试时,一切看起来都还行。elapsed 的耗时大约在 5ms 到 10ms 之间。但部署到 Vercel 后,灾难降临了。冷启动下的 P99 延迟轻松飙升到 300ms 以上,其中超过 250ms 都消耗在了这次 reqwest 调用上。原因很明显:Vercel Function 的冷启动、与内部 Consul 实例之间的网络延迟、TLS 握手开销,这些因素叠加在一起,使得每次调用的服务发现成本高到无法接受。对于一个期望在 50ms 内响应的 API 来说,这直接判了死刑。

在 Serverless 架构下,任何依赖外部 TCP 长连接或高频同步 I/O 的模式都必须被重新审视。函数的实例生命周期是不可预测的,可能在一次调用后就被冻结或销毁。这意味着任何形式的实例内缓存(in-memory cache)几乎是无效的,因为下一次请求很可能由一个全新的、没有任何缓存的实例来处理。

问题变成了:如何在一个无状态、短暂的计算环境中,实现低延迟的服务发现?

答案必须是在函数实例之外,但在一个离函数执行环境足够近的地方,维护一个服务地址的热缓存。Vercel 生态系统提供了两个候选方案:Vercel KV (基于 Upstash Redis) 和 Vercel Edge Config。考虑到服务发现数据结构相对简单,更新频率也不是极端的高(例如每秒一次),Vercel KV 的毫秒级延迟已经完全足够,且其更灵活的数据结构和更成熟的 Rust 客户端生态使其成为首选。

新的架构思路逐渐清晰:

  1. 解耦数据同步与函数执行:创建一个独立的、可以长期运行的同步器(Synchronizer)进程。它的唯一职责是定期从 Consul 拉取最新的健康服务实例列表,并将其写入 Vercel KV。
  2. 函数直连高速缓存:Vercel Function 不再直接与 Consul 通信,而是直接从 Vercel KV 读取已缓存的服务地址。Vercel KV 与 Vercel Function 部署在同一区域,网络延迟极低。
graph TD
    subgraph "旧架构:高延迟"
        A[Client] --> B{Vercel Function};
        B -- "1. Query Service (每次调用, >250ms)" --> C[Consul];
        C -- "2. Return Address" --> B;
        B -- "3. Connect to Service" --> D[gRPC Service];
    end

    subgraph "新架构:低延迟"
        subgraph "数据同步 (异步、低频)"
            E[Rust Synchronizer] -- "1. Poll Health Checks (e.g., every 30s)" --> F[Consul];
            F -- "2. Healthy Instances" --> E;
            E -- "3. Update Cache" --> G[(Vercel KV)];
        end
        subgraph "API 请求 (同步、高频)"
            H[Client] --> I{Vercel Function};
            I -- "1. Read from Cache (<10ms)" --> G;
            G -- "2. Cached Address" --> I;
            I -- "3. Connect to Service" --> J[gRPC Service];
        end
    end

第一步:实现 Rust 同步器

这个同步器可以是一个部署在任何地方的 long-running 进程,比如一个小型 VPS、一个 Kubernetes pod,甚至可以是另一个利用 Vercel Cron Jobs 定期触发的 Serverless Function。为了演示核心逻辑,我们构建一个独立的 Rust 二进制文件。

它的核心职责是:

  • 连接到 Consul。
  • 连接到 Vercel KV。
  • 在一个循环中,定期拉取数据并更新缓存。
  • 具备健壮的错误处理和重试机制。

Cargo.toml 依赖:

[dependencies]
tokio = { version = "1", features = ["full"] }
reqwest = { version = "0.11", features = ["json"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
dotenvy = "0.15"
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
redis = { version = "0.23", features = ["tokio-comp"] }
backoff = { version = "0.4", features = ["tokio"] }

synchronizer/src/main.rs 核心代码:

// synchronizer/src/main.rs

use backoff::ExponentialBackoff;
use backoff::future::retry;
use redis::AsyncCommands;
use serde::{Deserialize, Serialize};
use std::time::Duration;
use tracing::{error, info, instrument};
use tracing_subscriber::EnvFilter;

// Consul API返回的服务实例结构
#[derive(Deserialize, Debug, Clone)]
struct ConsulServiceInstance {
    #[serde(rename = "Service")]
    service: ConsulService,
}

#[derive(Deserialize, Debug, Clone)]
struct ConsulService {
    #[serde(rename = "ID")]
    id: String,
    #[serde(rename = "Service")]
    name: String,
    #[serde(rename = "Address")]
    address: String,
    #[serde(rename = "Port")]
    port: u16,
}

// 序列化后存储到Redis中的结构
#[derive(Serialize, Deserialize, Debug)]
struct CachedServiceEndpoint {
    address: String,
    port: u16,
    // 可以加入一些元数据,比如同步时间戳
    last_updated_utc: i64,
}

// 我们要监控的服务列表
const WATCHED_SERVICES: &[&str] = &["user-profile-service", "payment-gateway-service"];

#[tokio::main]
async fn main() {
    dotenvy::dotenv().ok();
    tracing_subscriber::fmt()
        .with_env_filter(EnvFilter::from_default_env())
        .init();

    info!("Starting Consul to Vercel KV synchronizer...");

    let consul_addr = std::env::var("CONSUL_HTTP_ADDR").expect("CONSUL_HTTP_ADDR must be set");
    let vercel_kv_url = std::env::var("KV_URL").expect("KV_URL must be set");

    let redis_client = redis::Client::open(vercel_kv_url).expect("Failed to create Redis client");
    let reqwest_client = reqwest::Client::builder()
        .timeout(Duration::from_secs(5))
        .build()
        .expect("Failed to create reqwest client");

    let mut interval = tokio::time::interval(Duration::from_secs(30));

    loop {
        interval.tick().await;
        info!("Starting sync cycle...");
        if let Err(e) = run_sync_cycle(&consul_addr, &redis_client, &reqwest_client).await {
            error!("Sync cycle failed: {}", e);
        }
    }
}

#[instrument(skip(redis_client, reqwest_client))]
async fn run_sync_cycle(
    consul_addr: &str,
    redis_client: &redis::Client,
    reqwest_client: &reqwest::Client,
) -> Result<(), anyhow::Error> {
    // 获取一个异步连接,注意处理连接失败
    let mut redis_con = redis_client.get_async_connection().await?;

    for &service_name in WATCHED_SERVICES {
        let backoff = ExponentialBackoff::default();

        // 使用指数退避重试策略来查询Consul,增强韧性
        let operation = || async {
            fetch_and_cache_service(consul_addr, service_name, &reqwest_client, &mut redis_con)
                .await
                .map_err(|e| {
                    // 对于可重试的错误,包装成backoff::Error::Transient
                    error!("Attempt to fetch {} failed: {}. Retrying...", service_name, e);
                    backoff::Error::transient(e)
                })
        };

        retry(backoff, operation).await?;
    }
    Ok(())
}

async fn fetch_and_cache_service(
    consul_addr: &str,
    service_name: &str,
    reqwest_client: &reqwest::Client,
    redis_con: &mut redis::aio::Connection,
) -> Result<(), anyhow::Error> {
    info!("Fetching healthy instances for service: {}", service_name);
    let request_url = format!(
        "{}/v1/health/service/{}?passing",
        consul_addr, service_name
    );

    let resp = reqwest_client.get(&request_url).send().await?;
    if !resp.status().is_success() {
        return Err(anyhow::anyhow!(
            "Consul API returned non-success status: {}",
            resp.status()
        ));
    }

    let instances: Vec<ConsulServiceInstance> = resp.json().await?;
    let endpoints: Vec<CachedServiceEndpoint> = instances
        .into_iter()
        .map(|inst| CachedServiceEndpoint {
            address: inst.service.address,
            port: inst.service.port,
            last_updated_utc: chrono::Utc::now().timestamp(),
        })
        .collect();

    if endpoints.is_empty() {
        // 这是一个关键的决策点。如果所有实例都下线了,是应该清空缓存还是保留旧数据?
        // 清空缓存会导致服务彻底不可用(fail-fast)。
        // 保留旧数据可能会导致请求打到已经不存在的实例上(stale cache)。
        // 在真实项目中,通常选择删除key,让客户端快速失败。
        info!("No healthy instances found for {}. Deleting cache key.", service_name);
        let cache_key = format!("service:{}", service_name);
        redis_con.del(&cache_key).await?;
    } else {
        let cache_key = format!("service:{}", service_name);
        let value = serde_json::to_string(&endpoints)?;
        // 设置一个TTL,比如60秒。这是一种保护机制,
        // 即使同步器挂掉,缓存也会在一段时间后自动失效,防止无限期的脏数据。
        redis_con.set_ex(&cache_key, value, 60).await?;
        info!("Successfully cached {} endpoints for service: {}", endpoints.len(), service_name);
    }
    Ok(())
}

这个同步器考虑了几个生产实践中的关键点:

  1. 韧性:使用 backoff crate 实现指数退避重试。如果 Consul 短暂抖动,同步器不会立即失败,而是会尝试几次。
  2. 配置驱动:通过环境变量 (.env 文件) 来配置 Consul 和 Vercel KV 的地址,符合十二要素应用原则。
  3. 明确的缓存策略:当服务没有健康实例时,我们选择删除缓存键,这是一种“快速失败”策略。同时,为缓存键设置了 TTL,作为同步器进程本身发生故障时的最后一道防线。
  4. 结构化日志:使用 tracing 提供了上下文感知的日志,便于排查问题。

第二步:改造 Vercel Function

现在,Vercel Function 的逻辑变得异常简单和高效。它不再关心 Consul 的存在,其世界里只有 Vercel KV。

api/discover.rs 的最终实现:

// api/discover.rs

use serde::Deserialize;
use std::sync::Arc;
use vercel_runtime::{run, Body, Error, Request, Response, StatusCode};
use vercel_kv::{connect, Store};
use rand::seq::SliceRandom;

// 这个结构必须与同步器中写入的结构保持一致
#[derive(Deserialize, Debug, Clone)]
struct CachedServiceEndpoint {
    address: String,
    port: u16,
}

// 在函数外部初始化客户端,以便在热启动的函数实例中复用
// Arc确保了跨async任务的线程安全共享
lazy_static::lazy_static! {
    static ref KV_CLIENT: Arc<Store> = {
        let kv_url = std::env::var("KV_URL").expect("KV_URL env var not set");
        let kv_rest_api_token = std::env::var("KV_REST_API_TOKEN").expect("KV_REST_API_TOKEN env var not set");
        
        // 使用 vercel_kv crate
        let store = connect(&kv_url, &kv_rest_api_token)
            .expect("Failed to connect to Vercel KV");
        Arc::new(store)
    };
}

async fn handler(_req: Request) -> Result<Response<Body>, Error> {
    let service_name = "user-profile-service";
    let cache_key = format!("service:{}", service_name);
    
    let start_time = std::time::Instant::now();

    // 从Vercel KV中读取服务列表
    let result: Option<String> = KV_CLIENT.get(&cache_key).await?;
    
    let endpoints: Vec<CachedServiceEndpoint> = match result {
        Some(json_str) => serde_json::from_str(&json_str).unwrap_or_default(),
        None => {
            // 缓存未命中或服务无健康实例
            // 这里的处理至关重要,返回一个明确的服务不可用错误
            return Ok(Response::builder()
                .status(StatusCode::SERVICE_UNAVAILABLE)
                .body(format!("No healthy endpoints found for service: {}", service_name).into())?);
        }
    };

    // 从多个健康实例中随机选择一个,实现简单的客户端负载均衡
    let selected_endpoint = endpoints.choose(&mut rand::thread_rng());
    let elapsed = start_time.elapsed();

    let response_body = match selected_endpoint {
        Some(ep) => format!(
            "Cache lookup took: {:?}. Selected endpoint: {}:{}",
            elapsed, ep.address, ep.port
        ),
        None => {
            // 理论上如果endpoints不为空,这里不会发生,但作为防御性编程
             return Ok(Response::builder()
                .status(StatusCode::INTERNAL_SERVER_ERROR)
                .body("Endpoint list was empty after deserialization".into())?);
        }
    };
    
    // 接下来就可以使用 selected_endpoint 去连接gRPC服务了...

    Ok(Response::builder()
        .status(StatusCode::OK)
        .header("Content-Type", "text/plain")
        .body(response_body.into())?)
}

#[tokio::main]
async fn main() -> Result<(), Error> {
    run(handler).await
}

改造后的函数有几个优点:

  1. 极低延迟KV_CLIENT.get() 的耗时通常在 10ms 以内,相比之前直接查询 Consul 的 250ms+,性能提升是数量级的。
  2. 客户端复用:通过 lazy_static,Vercel KV 的客户端连接在函数热实例中得以复用,避免了每次调用都重新建立连接的开销。
  3. 解耦与健壮性:函数不再感知 Consul 的存在。即使 Consul 发生故障,只要 Vercel KV 中的缓存(在 TTL 内)有效,服务发现依然能够成功。这大大提升了整个系统的容错能力。
  4. 简单的负载均衡:如果一个服务有多个健康实例,通过 SliceRandom::choose 可以轻松实现客户端的随机负载均衡。

局限性与未来迭代方向

这套架构并非银弹,它通过引入缓存解决了延迟问题,但也带来了缓存系统固有的问题——数据一致性。

数据时延: 同步器 30 秒的轮询周期意味着服务实例状态的变更最多有 30 秒的延迟才能反映到 Vercel Function。如果一个实例异常下线,在接下来的最多 30 秒内,函数仍有可能从缓存中读到这个旧地址并发起调用,导致调用失败。这个时延是否可以接受,完全取决于业务场景对实时性的要求。缩短轮询周期可以缓解,但会增加 Consul 和同步器本身的负载。

同步器单点问题: 当前的同步器是一个单点进程。如果它崩溃,缓存会在 TTL(本例中为 60 秒)后失效,导致整个服务发现机制失灵。在生产环境中,需要保证同步器的高可用,例如将其部署在 Kubernetes 中并设置多副本,或者使用 Vercel Cron Jobs 这种本身具备高可用性的调度服务来触发同步逻辑。

更优的缓存选型: 对于读取延迟极其敏感且数据全局一致性要求高的场景,Vercel Edge Config 是一个比 Vercel KV 更激进的选择。它的数据被复制到 Vercel 的全球边缘网络,能提供更低的读取延迟。但它的写入 API 延迟更高,且对数据大小和结构有更多限制,更适合存放不常变化但需要极速读取的配置类数据。对于我们的服务发现场景,如果同步周期可以拉长(例如几分钟一次),Edge Config 将是一个值得探索的优化方向。

这个从 250ms 到 10ms 的优化过程,本质上是用最终一致性换取了极致的性能和可用性,这正是分布式系统设计中无处不在的权衡。在 Serverless 这个计算模型被资源和生命周期严格限制的舞台上,这种权衡显得尤为重要和普遍。


  目录