构建连接事件驱动架构与 Lit 前端的高可用 Server-Sent Events 推送网关


在真实的项目中,我们经常面临一个挑战:后端是成熟的事件驱动架构(EDA),通常由 Kafka 或 RabbitMQ 这类消息队列驱动,而前端需要实时、低延迟地响应这些事件。常见的做法是前端通过轮询去拉取状态,但这带来了延迟和大量的无效请求,尤其是在事件稀疏的场景下。问题的核心在于如何为单向的、服务端的事件流,在 Web 前端建立一个高效、可靠的订阅通道。

架构决策需要在多种技术方案中权衡。短轮询(Short-Polling)因其高延迟和服务器压力首先被排除。长轮询(Long-Polling)虽有所改善,但依然存在连接管理的复杂性和请求开销。WebSocket 提供了功能强大的双向通信,但对于纯粹的服务端推送场景,它的协议复杂性、额外的握手机制以及在某些网络环境下的代理穿透问题,显得有些过度设计。

我们最终选择的方案是 Server-Sent Events (SSE)。SSE 基于标准的 HTTP/1.1 (或 HTTP/2),本质上是一个永不关闭的 GET 请求。它为服务端到客户端的单向通信提供了完美的模型,协议简单,浏览器原生支持 EventSource API,并内置了自动重连机制。这恰好满足了我们将后端 EDA 事件流“转播”到前端的需求,而无需引入 WebSocket 的复杂性。

本文将记录构建一个生产级的 SSE 推送网关(SSE Push Gateway)的完整过程。该网关作为中间层,消费来自 Kafka 的业务事件,并通过 SSE 将其推送给订阅的 Lit 前端组件。我们将重点关注以下几个方面:

  1. 服务端: 使用 Java 和 Spring WebFlux 构建一个能处理大量并发 SSE 连接的非阻塞网关。
  2. 前端: 使用 Lit 构建一个弹性的 Web Component,能够优雅地处理连接生命周期和数据渲染。
  3. 容器化: 使用 Jib 实现无 Dockerfile 的、高效可重复的 Java 应用容器化,简化 CI/CD 流程。

架构概览

整体架构非常直接。一个独立的 Java 服务作为 SSE 网关,它同时是 Kafka 消费者和 SSE 事件的生产者。

graph TD
    subgraph "后端事件源 (EDA)"
        A[业务服务] -- 产生业务事件 --> B(Kafka Topic);
    end

    subgraph "SSE 推送网关 (Java / Spring WebFlux)"
        C[Kafka Consumer] -- 消费 --> D{事件处理器};
        D -- 分发 --> E[SSE Connection Manager];
        E -- Server-Sent Events --> G;
    end

    subgraph "客户端"
        G[浏览器] -- HTTP GET --> E;
        H(Lit Component) -- 订阅 --> G[EventSource API];
    end

    style C fill:#f9f,stroke:#333,stroke-width:2px
    style E fill:#ccf,stroke:#333,stroke-width:2px
    style H fill:#9f9,stroke:#333,stroke-width:2px

这里的关键组件是 SSE Connection Manager。它的职责是维护所有活跃的前端连接,并将从 Kafka 消费到的事件,根据订阅逻辑(例如,按用户ID、会话ID等)路由到正确的连接上。

服务端实现:高可用的 SSE 网关

我们使用 Spring WebFlux 来构建网关,因为它的响应式编程模型天然适合处理 I/O 密集型和长连接场景,能够用较少的线程资源支撑大量的并发连接。

1. 核心 Controller

这是 SSE 连接的入口点。我们通过返回一个 Flux<ServerSentEvent<String>> 来建立一个响应式的 SSE 流。

src/main/java/com/example/sse/gateway/SseController.java

package com.example.sse.gateway;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.MediaType;
import org.springframework.http.codec.ServerSentEvent;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestHeader;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;

import java.time.Duration;
import java.util.Map;
import java.util.Optional;

@RestController
public class SseController {

    private static final Logger logger = LoggerFactory.getLogger(SseController.class);
    private final SseEventService eventService;

    public SseController(SseEventService eventService) {
        this.eventService = eventService;
    }

    @GetMapping(path = "/stream/events/{clientId}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<ServerSentEvent<String>> streamEvents(
            @PathVariable String clientId,
            @RequestHeader(name = "Last-Event-ID", required = false) String lastEventId) {

        logger.info("Client '{}' connecting... Last-Event-ID: {}", clientId, lastEventId);

        // 1. 创建一个心跳流,防止代理或防火墙因超时而切断连接
        // 在生产环境中,这个间隔应该更长,比如30秒
        Flux<ServerSentEvent<String>> heartbeatStream = Flux.interval(Duration.ofSeconds(15))
                .map(i -> ServerSentEvent.<String>builder()
                        .comment("heartbeat")
                        .build())
                .doOnNext(hb -> logger.trace("Sending heartbeat to client '{}'", clientId));

        // 2. 从我们的事件服务中获取为该客户端准备的真实事件流
        Flux<ServerSentEvent<String>> eventStream = eventService.subscribe(clientId, Optional.ofNullable(lastEventId));

        // 3. 合并事件流和心跳流
        // takeUntilOther 会在主事件流结束时也结束心跳流
        return Flux.merge(eventStream, heartbeatStream.takeUntilOther(eventStream))
                .doOnCancel(() -> {
                    // 当客户端断开连接时(例如关闭浏览器标签),这个回调会被触发
                    logger.info("Client '{}' disconnected, unsubscribing.", clientId);
                    eventService.unsubscribe(clientId);
                })
                .doOnError(e -> {
                    // 发生错误时记录日志
                    logger.error("Error on SSE stream for client '{}'", clientId, e);
                    eventService.unsubscribe(clientId);
                });
    }
}

这里的代码包含了几个生产级的考量:

  • 心跳机制: 很多反向代理(如 Nginx)有默认的连接超时时间。通过定期发送注释行(:heartbeat),可以保持连接活跃。
  • Last-Event-ID: SSE 规范支持断线重连。浏览器在重连时会自动带上 Last-Event-ID 头,服务端可以利用这个 ID 来补发客户端断线期间错过的消息。我们的 SseEventService 需要实现这个逻辑。
  • 生命周期管理: doOnCanceldoOnError 确保了当连接异常或正常关闭时,我们能清理服务端的资源(比如从 Connection Manager 中移除该客户端)。

2. 事件服务与连接管理

SseEventService 是核心的有状态组件。它需要线程安全地管理所有客户端连接,并将来自 Kafka 的消息广播给它们。

src/main/java/com/example/sse/gateway/SseEventService.java

package com.example.sse.gateway;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.codec.ServerSentEvent;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;
import reactor.core.scheduler.Schedulers;

import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;

@Service
public class SseEventService {

    private static final Logger logger = LoggerFactory.getLogger(SseEventService.class);

    // 使用 ConcurrentHashMap 来线程安全地管理所有客户端的 Sink
    // Sink 是一个响应式编程中的概念,可以把它看作一个可编程的事件发射器
    private final Map<String, Sinks.Many<ServerSentEvent<String>>> clientSinks = new ConcurrentHashMap<>();

    /**
     * 为指定的客户端ID创建一个订阅流。
     * @param clientId 客户端唯一标识
     * @param lastEventId 可选的,客户端上一次收到的事件ID
     * @return 一个 ServerSentEvent 的 Flux 流
     */
    public Flux<ServerSentEvent<String>> subscribe(String clientId, Optional<String> lastEventId) {
        // Sinks.many().multicast().onBackpressureBuffer() 创建一个支持多个订阅者的 Sink
        // 当下游消费者处理不过来时,事件会被缓存起来。
        Sinks.Many<ServerSentEvent<String>> sink = Sinks.many().multicast().onBackpressureBuffer();
        
        // 只有当客户端第一次连接或者该客户端的 Sink 不存在时才创建和存储
        clientSinks.put(clientId, sink);
        logger.info("Client '{}' subscribed. Total clients: {}", clientId, clientSinks.size());

        // 如果客户端提供了 lastEventId,这里是实现事件回溯逻辑的地方。
        // 在真实项目中,你可能需要从持久化存储(如数据库或事件日志)中查询自 lastEventId 以来的事件。
        // 为简化示例,我们这里只打印日志。
        lastEventId.ifPresent(id -> logger.warn(
            "Client '{}' requested replay from event ID '{}'. Replay logic not implemented in this example.", 
            clientId, id
        ));

        return sink.asFlux();
    }

    /**
     * 客户端断开连接时,移除其 Sink。
     * @param clientId 客户端唯一标识
     */
    public void unsubscribe(String clientId) {
        Sinks.Many<ServerSentEvent<String>> sink = clientSinks.remove(clientId);
        if (sink != null) {
            // 发出一个完成信号,通知所有订阅者流已结束
            sink.tryEmitComplete();
            logger.info("Client '{}' unsubscribed. Total clients: {}", clientId, clientSinks.size());
        }
    }

    /**
     * 从外部(如 Kafka 消费者)接收事件,并分发给所有订阅的客户端。
     * @param eventData 事件内容
     */
    public void dispatchEvent(String eventData) {
        // 为了避免在 Kafka 消费线程中执行耗时操作,我们将分发逻辑切换到另一个线程池
        // publishOn 保证了下游操作在指定的 Scheduler 上执行
        Flux.fromIterable(clientSinks.entrySet())
            .parallel()
            .runOn(Schedulers.parallel())
            .doOnNext(entry -> {
                String clientId = entry.getKey();
                Sinks.Many<ServerSentEvent<String>> sink = entry.getValue();
                
                // 构建 SSE 事件,包含 id, event, data 字段
                String eventId = UUID.randomUUID().toString();
                ServerSentEvent<String> sseEvent = ServerSentEvent.<String>builder()
                        .id(eventId) // id 用于断线重连
                        .event("message") // event 类型,前端可以监听特定类型
                        .data(eventData)
                        .build();

                // 尝试发射事件。如果 Sink 因为下游慢而满了,或者已经关闭,会返回失败状态。
                Sinks.EmitResult result = sink.tryEmitNext(sseEvent);
                if (result.isFailure()) {
                    logger.warn("Failed to send event to client '{}'. Result: {}", clientId, result);
                    // 在生产代码中,可以根据失败类型决定是否移除该客户端
                    if (result == Sinks.EmitResult.FAIL_TERMINATED || result == Sinks.EmitResult.FAIL_CANCELLED) {
                        unsubscribe(clientId);
                    }
                }
            })
            .sequential()
            .subscribe(); // 触发执行
    }
}

这里的 Sinks.Many 是 Project Reactor 的一个关键组件,它充当了命令式编程(Kafka 消费者)和响应式世界(Flux 流)之间的桥梁。

3. Kafka 消费者集成

最后,我们创建一个 Kafka 消费者来驱动事件的分发。

src/main/java/com/example/sse/gateway/KafkaEventConsumer.java

package com.example.sse.gateway;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
public class KafkaEventConsumer {

    private static final Logger logger = LoggerFactory.getLogger(KafkaEventConsumer.class);
    private final SseEventService eventService;

    public KafkaEventConsumer(SseEventService eventService) {
        this.eventService = eventService;
    }

    // 监听名为 'business-events' 的 Kafka topic
    @KafkaListener(topics = "business-events", groupId = "sse-gateway-group")
    public void consume(String message) {
        logger.info("Received event from Kafka: {}", message);
        // 将消息分发给所有连接的 SSE 客户端
        eventService.dispatchEvent(message);
    }
}

至此,一个功能完备的服务端网关就绪了。

容器化:使用 Jib 实现无缝部署

对于 Java 应用,传统的 Dockerfile 构建流程通常比较繁琐且效率不高。Jib 是一个由 Google 开发的 Maven/Gradle 插件,它可以直接将 Java 应用构建成优化的 Docker 镜像,无需 Docker daemon,也无需编写 Dockerfile。

它的核心优势在于:

  1. 分层构建: Jib 会智能地将应用拆分成多个层:依赖项、资源、类文件。在代码变更时,只有最上层的类文件层需要重新构建,极大地加快了 CI/CD 流程。
  2. 可复现: 相同的输入总是产生完全相同的镜像,消除了环境差异导致的问题。
  3. 配置简单: 只需在 pom.xml 中添加几行配置。

pom.xml

<build>
    <plugins>
        <!-- ... other plugins -->
        <plugin>
            <groupId>com.google.cloud.tools</groupId>
            <artifactId>jib-maven-plugin</artifactId>
            <version>3.4.0</version>
            <configuration>
                <from>
                    <!-- 使用一个无根用户的、精简的Java基础镜像 -->
                    <image>gcr.io/distroless/java17-debian11</image>
                </from>
                <to>
                    <!-- 配置你的镜像仓库地址和镜像名 -->
                    <image>your-docker-registry/sse-push-gateway</image>
                    <tags>
                        <tag>${project.version}</tag>
                        <tag>latest</tag>
                    </tags>
                </to>
                <container>
                    <ports>
                        <port>8080</port>
                    </ports>
                    <creationTime>USE_CURRENT_TIMESTAMP</creationTime>
                    <jvmFlags>
                        <!-- 生产环境的 JVM 参数,例如内存设置 -->
                        <jvmFlag>-Xms512m</jvmFlag>
                        <jvmFlag>-Xmx512m</jvmFlag>
                        <!-- 针对容器环境的优化 -->
                        <jvmFlag>-XX:+UseContainerSupport</jvmFlag>
                    </jvmFlags>
                </container>
            </configuration>
            <executions>
                <execution>
                    <phase>package</phase>
                    <goals>
                        <goal>build</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

现在,只需运行 mvn package,Jib 就会自动构建镜像并(如果配置了认证信息)推送到你的镜像仓库。部署过程被极大地简化了。

前端实现:弹性的 Lit 组件

前端使用 Lit 来创建一个自包含的 Web Component,负责与 SSE 网关通信并展示数据。

src/event-stream-display.ts

import { LitElement, html, css } from 'lit';
import { customElement, property, state } from 'lit/decorators.js';

enum ConnectionStatus {
  CONNECTING = 'CONNECTING',
  OPEN = 'OPEN',
  CLOSED = 'CLOSED',
}

@customElement('event-stream-display')
export class EventStreamDisplay extends LitElement {
  static styles = css`
    :host {
      display: block;
      padding: 16px;
      border: 1px solid #ccc;
      font-family: sans-serif;
    }
    .status {
      font-weight: bold;
      padding-bottom: 8px;
    }
    .status.connecting { color: orange; }
    .status.open { color: green; }
    .status.closed { color: red; }
    ul {
        list-style-type: none;
        padding: 0;
        max-height: 300px;
        overflow-y: auto;
    }
    li {
        background-color: #f0f0f0;
        padding: 8px;
        margin-bottom: 4px;
        border-radius: 4px;
    }
  `;

  // 从外部传入,用于构建 SSE 连接 URL
  @property({ type: String })
  clientId = 'default-client';

  // 组件内部状态
  @state()
  private status: ConnectionStatus = ConnectionStatus.CONNECTING;

  @state()
  private messages: string[] = [];
  
  private eventSource: EventSource | null = null;

  // Lit 的生命周期回调,当组件连接到 DOM 时调用
  connectedCallback() {
    super.connectedCallback();
    this.connect();
  }

  // Lit 的生命周期回调,当组件从 DOM 中移除时调用
  disconnectedCallback() {
    super.disconnectedCallback();
    this.disconnect();
  }

  private connect() {
    // 确保在创建新连接前关闭旧连接
    if (this.eventSource) {
      this.eventSource.close();
    }

    const url = `/stream/events/${this.clientId}`;
    this.eventSource = new EventSource(url);
    this.status = ConnectionStatus.CONNECTING;

    // 连接成功建立时的回调
    this.eventSource.onopen = () => {
      console.log('SSE connection opened.');
      this.status = ConnectionStatus.OPEN;
    };

    // 发生错误时的回调
    // EventSource 会在出错后自动尝试重连,所以我们只需更新状态
    this.eventSource.onerror = (error) => {
      console.error('EventSource failed:', error);
      this.status = ConnectionStatus.CLOSED;
      // 注意:不需要手动关闭,浏览器会自动处理重连逻辑
    };

    // 监听名为 'message' 的事件(这是默认事件类型)
    this.eventSource.addEventListener('message', (event: MessageEvent) => {
      console.log('Received SSE message:', event.data, 'with ID:', event.lastEventId);
      // 将新消息添加到列表顶部,并保持列表长度不超过100
      this.messages = [event.data, ...this.messages].slice(0, 100);
    });
  }

  private disconnect() {
    if (this.eventSource) {
      // 主动关闭连接,这将停止浏览器的自动重连
      this.eventSource.close();
      this.eventSource = null;
      this.status = ConnectionStatus.CLOSED;
      console.log('SSE connection closed by component.');
    }
  }

  render() {
    return html`
      <div class="status ${this.status.toLowerCase()}">
        Status: ${this.status}
      </div>
      <ul>
        ${this.messages.map(msg => html`<li>${msg}</li>`)}
      </ul>
    `;
  }
}

这个 Lit 组件非常健壮:

  • 生命周期管理: 它利用 connectedCallbackdisconnectedCallback 来自动管理 EventSource 的生命周期,防止内存泄漏。
  • 状态反馈: UI 清晰地展示了连接的当前状态(CONNECTING, OPEN, CLOSED)。
  • 利用原生能力: 它完全依赖浏览器 EventSource 的原生重连机制,无需编写复杂的重连和退避算法。当网络中断时,onerror 会被触发,UI 变为 CLOSED 状态,但浏览器会在后台持续尝试重连。一旦成功,onopen 会再次触发,UI 恢复 OPEN 状态。

方案的局限性与扩展

尽管 SSE 是一个非常适合此场景的方案,但在生产环境中应用时,仍需考虑其边界。

首先,SSE 是单向的。如果前端需要向后端发送消息,就需要配合使用传统的 AJAX/Fetch API,或者直接选择 WebSocket。对于需要双向通信的场景,WebSocket 仍然是更优的选择。

其次,浏览器对单个域名下的并发 HTTP 连接数有限制(HTTP/1.1 通常是 6 个)。如果一个页面打开了大量需要 SSE 连接的组件,可能会耗尽连接池。升级到 HTTP/2 可以很大程度上缓解这个问题,因为它支持在单个 TCP 连接上进行多路复用。

最后,SSE 网关本身是一个有状态的节点。水平扩展这个网关需要一些额外的架构考量。如果客户端可以随机连接到任何一个网关实例,那么一个 Kafka 分区(Partition)的消息可能需要被广播到所有网关实例,这会造成资源浪费。一种优化策略是使用支持粘性会话的负载均衡器,或者引入一个像 Redis Pub/Sub 这样的中间层,让网关实例订阅特定的频道,从而实现更精细的事件路由。但这会增加架构的复杂性,需要在可扩展性和简洁性之间做出权衡。


  目录