在真实的项目中,我们经常面临一个挑战:后端是成熟的事件驱动架构(EDA),通常由 Kafka 或 RabbitMQ 这类消息队列驱动,而前端需要实时、低延迟地响应这些事件。常见的做法是前端通过轮询去拉取状态,但这带来了延迟和大量的无效请求,尤其是在事件稀疏的场景下。问题的核心在于如何为单向的、服务端的事件流,在 Web 前端建立一个高效、可靠的订阅通道。
架构决策需要在多种技术方案中权衡。短轮询(Short-Polling)因其高延迟和服务器压力首先被排除。长轮询(Long-Polling)虽有所改善,但依然存在连接管理的复杂性和请求开销。WebSocket 提供了功能强大的双向通信,但对于纯粹的服务端推送场景,它的协议复杂性、额外的握手机制以及在某些网络环境下的代理穿透问题,显得有些过度设计。
我们最终选择的方案是 Server-Sent Events (SSE)。SSE 基于标准的 HTTP/1.1 (或 HTTP/2),本质上是一个永不关闭的 GET 请求。它为服务端到客户端的单向通信提供了完美的模型,协议简单,浏览器原生支持 EventSource
API,并内置了自动重连机制。这恰好满足了我们将后端 EDA 事件流“转播”到前端的需求,而无需引入 WebSocket 的复杂性。
本文将记录构建一个生产级的 SSE 推送网关(SSE Push Gateway)的完整过程。该网关作为中间层,消费来自 Kafka 的业务事件,并通过 SSE 将其推送给订阅的 Lit 前端组件。我们将重点关注以下几个方面:
- 服务端: 使用 Java 和 Spring WebFlux 构建一个能处理大量并发 SSE 连接的非阻塞网关。
- 前端: 使用 Lit 构建一个弹性的 Web Component,能够优雅地处理连接生命周期和数据渲染。
- 容器化: 使用 Jib 实现无 Dockerfile 的、高效可重复的 Java 应用容器化,简化 CI/CD 流程。
架构概览
整体架构非常直接。一个独立的 Java 服务作为 SSE 网关,它同时是 Kafka 消费者和 SSE 事件的生产者。
graph TD subgraph "后端事件源 (EDA)" A[业务服务] -- 产生业务事件 --> B(Kafka Topic); end subgraph "SSE 推送网关 (Java / Spring WebFlux)" C[Kafka Consumer] -- 消费 --> D{事件处理器}; D -- 分发 --> E[SSE Connection Manager]; E -- Server-Sent Events --> G; end subgraph "客户端" G[浏览器] -- HTTP GET --> E; H(Lit Component) -- 订阅 --> G[EventSource API]; end style C fill:#f9f,stroke:#333,stroke-width:2px style E fill:#ccf,stroke:#333,stroke-width:2px style H fill:#9f9,stroke:#333,stroke-width:2px
这里的关键组件是 SSE Connection Manager
。它的职责是维护所有活跃的前端连接,并将从 Kafka 消费到的事件,根据订阅逻辑(例如,按用户ID、会话ID等)路由到正确的连接上。
服务端实现:高可用的 SSE 网关
我们使用 Spring WebFlux 来构建网关,因为它的响应式编程模型天然适合处理 I/O 密集型和长连接场景,能够用较少的线程资源支撑大量的并发连接。
1. 核心 Controller
这是 SSE 连接的入口点。我们通过返回一个 Flux<ServerSentEvent<String>>
来建立一个响应式的 SSE 流。
src/main/java/com/example/sse/gateway/SseController.java
package com.example.sse.gateway;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.MediaType;
import org.springframework.http.codec.ServerSentEvent;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestHeader;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
import java.time.Duration;
import java.util.Map;
import java.util.Optional;
@RestController
public class SseController {
private static final Logger logger = LoggerFactory.getLogger(SseController.class);
private final SseEventService eventService;
public SseController(SseEventService eventService) {
this.eventService = eventService;
}
@GetMapping(path = "/stream/events/{clientId}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<String>> streamEvents(
@PathVariable String clientId,
@RequestHeader(name = "Last-Event-ID", required = false) String lastEventId) {
logger.info("Client '{}' connecting... Last-Event-ID: {}", clientId, lastEventId);
// 1. 创建一个心跳流,防止代理或防火墙因超时而切断连接
// 在生产环境中,这个间隔应该更长,比如30秒
Flux<ServerSentEvent<String>> heartbeatStream = Flux.interval(Duration.ofSeconds(15))
.map(i -> ServerSentEvent.<String>builder()
.comment("heartbeat")
.build())
.doOnNext(hb -> logger.trace("Sending heartbeat to client '{}'", clientId));
// 2. 从我们的事件服务中获取为该客户端准备的真实事件流
Flux<ServerSentEvent<String>> eventStream = eventService.subscribe(clientId, Optional.ofNullable(lastEventId));
// 3. 合并事件流和心跳流
// takeUntilOther 会在主事件流结束时也结束心跳流
return Flux.merge(eventStream, heartbeatStream.takeUntilOther(eventStream))
.doOnCancel(() -> {
// 当客户端断开连接时(例如关闭浏览器标签),这个回调会被触发
logger.info("Client '{}' disconnected, unsubscribing.", clientId);
eventService.unsubscribe(clientId);
})
.doOnError(e -> {
// 发生错误时记录日志
logger.error("Error on SSE stream for client '{}'", clientId, e);
eventService.unsubscribe(clientId);
});
}
}
这里的代码包含了几个生产级的考量:
- 心跳机制: 很多反向代理(如 Nginx)有默认的连接超时时间。通过定期发送注释行(
:heartbeat
),可以保持连接活跃。 - Last-Event-ID: SSE 规范支持断线重连。浏览器在重连时会自动带上
Last-Event-ID
头,服务端可以利用这个 ID 来补发客户端断线期间错过的消息。我们的SseEventService
需要实现这个逻辑。 - 生命周期管理:
doOnCancel
和doOnError
确保了当连接异常或正常关闭时,我们能清理服务端的资源(比如从Connection Manager
中移除该客户端)。
2. 事件服务与连接管理
SseEventService
是核心的有状态组件。它需要线程安全地管理所有客户端连接,并将来自 Kafka 的消息广播给它们。
src/main/java/com/example/sse/gateway/SseEventService.java
package com.example.sse.gateway;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.codec.ServerSentEvent;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;
import reactor.core.scheduler.Schedulers;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
@Service
public class SseEventService {
private static final Logger logger = LoggerFactory.getLogger(SseEventService.class);
// 使用 ConcurrentHashMap 来线程安全地管理所有客户端的 Sink
// Sink 是一个响应式编程中的概念,可以把它看作一个可编程的事件发射器
private final Map<String, Sinks.Many<ServerSentEvent<String>>> clientSinks = new ConcurrentHashMap<>();
/**
* 为指定的客户端ID创建一个订阅流。
* @param clientId 客户端唯一标识
* @param lastEventId 可选的,客户端上一次收到的事件ID
* @return 一个 ServerSentEvent 的 Flux 流
*/
public Flux<ServerSentEvent<String>> subscribe(String clientId, Optional<String> lastEventId) {
// Sinks.many().multicast().onBackpressureBuffer() 创建一个支持多个订阅者的 Sink
// 当下游消费者处理不过来时,事件会被缓存起来。
Sinks.Many<ServerSentEvent<String>> sink = Sinks.many().multicast().onBackpressureBuffer();
// 只有当客户端第一次连接或者该客户端的 Sink 不存在时才创建和存储
clientSinks.put(clientId, sink);
logger.info("Client '{}' subscribed. Total clients: {}", clientId, clientSinks.size());
// 如果客户端提供了 lastEventId,这里是实现事件回溯逻辑的地方。
// 在真实项目中,你可能需要从持久化存储(如数据库或事件日志)中查询自 lastEventId 以来的事件。
// 为简化示例,我们这里只打印日志。
lastEventId.ifPresent(id -> logger.warn(
"Client '{}' requested replay from event ID '{}'. Replay logic not implemented in this example.",
clientId, id
));
return sink.asFlux();
}
/**
* 客户端断开连接时,移除其 Sink。
* @param clientId 客户端唯一标识
*/
public void unsubscribe(String clientId) {
Sinks.Many<ServerSentEvent<String>> sink = clientSinks.remove(clientId);
if (sink != null) {
// 发出一个完成信号,通知所有订阅者流已结束
sink.tryEmitComplete();
logger.info("Client '{}' unsubscribed. Total clients: {}", clientId, clientSinks.size());
}
}
/**
* 从外部(如 Kafka 消费者)接收事件,并分发给所有订阅的客户端。
* @param eventData 事件内容
*/
public void dispatchEvent(String eventData) {
// 为了避免在 Kafka 消费线程中执行耗时操作,我们将分发逻辑切换到另一个线程池
// publishOn 保证了下游操作在指定的 Scheduler 上执行
Flux.fromIterable(clientSinks.entrySet())
.parallel()
.runOn(Schedulers.parallel())
.doOnNext(entry -> {
String clientId = entry.getKey();
Sinks.Many<ServerSentEvent<String>> sink = entry.getValue();
// 构建 SSE 事件,包含 id, event, data 字段
String eventId = UUID.randomUUID().toString();
ServerSentEvent<String> sseEvent = ServerSentEvent.<String>builder()
.id(eventId) // id 用于断线重连
.event("message") // event 类型,前端可以监听特定类型
.data(eventData)
.build();
// 尝试发射事件。如果 Sink 因为下游慢而满了,或者已经关闭,会返回失败状态。
Sinks.EmitResult result = sink.tryEmitNext(sseEvent);
if (result.isFailure()) {
logger.warn("Failed to send event to client '{}'. Result: {}", clientId, result);
// 在生产代码中,可以根据失败类型决定是否移除该客户端
if (result == Sinks.EmitResult.FAIL_TERMINATED || result == Sinks.EmitResult.FAIL_CANCELLED) {
unsubscribe(clientId);
}
}
})
.sequential()
.subscribe(); // 触发执行
}
}
这里的 Sinks.Many
是 Project Reactor 的一个关键组件,它充当了命令式编程(Kafka 消费者)和响应式世界(Flux 流)之间的桥梁。
3. Kafka 消费者集成
最后,我们创建一个 Kafka 消费者来驱动事件的分发。
src/main/java/com/example/sse/gateway/KafkaEventConsumer.java
package com.example.sse.gateway;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
public class KafkaEventConsumer {
private static final Logger logger = LoggerFactory.getLogger(KafkaEventConsumer.class);
private final SseEventService eventService;
public KafkaEventConsumer(SseEventService eventService) {
this.eventService = eventService;
}
// 监听名为 'business-events' 的 Kafka topic
@KafkaListener(topics = "business-events", groupId = "sse-gateway-group")
public void consume(String message) {
logger.info("Received event from Kafka: {}", message);
// 将消息分发给所有连接的 SSE 客户端
eventService.dispatchEvent(message);
}
}
至此,一个功能完备的服务端网关就绪了。
容器化:使用 Jib 实现无缝部署
对于 Java 应用,传统的 Dockerfile 构建流程通常比较繁琐且效率不高。Jib 是一个由 Google 开发的 Maven/Gradle 插件,它可以直接将 Java 应用构建成优化的 Docker 镜像,无需 Docker daemon,也无需编写 Dockerfile。
它的核心优势在于:
- 分层构建: Jib 会智能地将应用拆分成多个层:依赖项、资源、类文件。在代码变更时,只有最上层的类文件层需要重新构建,极大地加快了 CI/CD 流程。
- 可复现: 相同的输入总是产生完全相同的镜像,消除了环境差异导致的问题。
- 配置简单: 只需在
pom.xml
中添加几行配置。
pom.xml
<build>
<plugins>
<!-- ... other plugins -->
<plugin>
<groupId>com.google.cloud.tools</groupId>
<artifactId>jib-maven-plugin</artifactId>
<version>3.4.0</version>
<configuration>
<from>
<!-- 使用一个无根用户的、精简的Java基础镜像 -->
<image>gcr.io/distroless/java17-debian11</image>
</from>
<to>
<!-- 配置你的镜像仓库地址和镜像名 -->
<image>your-docker-registry/sse-push-gateway</image>
<tags>
<tag>${project.version}</tag>
<tag>latest</tag>
</tags>
</to>
<container>
<ports>
<port>8080</port>
</ports>
<creationTime>USE_CURRENT_TIMESTAMP</creationTime>
<jvmFlags>
<!-- 生产环境的 JVM 参数,例如内存设置 -->
<jvmFlag>-Xms512m</jvmFlag>
<jvmFlag>-Xmx512m</jvmFlag>
<!-- 针对容器环境的优化 -->
<jvmFlag>-XX:+UseContainerSupport</jvmFlag>
</jvmFlags>
</container>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>build</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
现在,只需运行 mvn package
,Jib 就会自动构建镜像并(如果配置了认证信息)推送到你的镜像仓库。部署过程被极大地简化了。
前端实现:弹性的 Lit 组件
前端使用 Lit 来创建一个自包含的 Web Component,负责与 SSE 网关通信并展示数据。
src/event-stream-display.ts
import { LitElement, html, css } from 'lit';
import { customElement, property, state } from 'lit/decorators.js';
enum ConnectionStatus {
CONNECTING = 'CONNECTING',
OPEN = 'OPEN',
CLOSED = 'CLOSED',
}
@customElement('event-stream-display')
export class EventStreamDisplay extends LitElement {
static styles = css`
:host {
display: block;
padding: 16px;
border: 1px solid #ccc;
font-family: sans-serif;
}
.status {
font-weight: bold;
padding-bottom: 8px;
}
.status.connecting { color: orange; }
.status.open { color: green; }
.status.closed { color: red; }
ul {
list-style-type: none;
padding: 0;
max-height: 300px;
overflow-y: auto;
}
li {
background-color: #f0f0f0;
padding: 8px;
margin-bottom: 4px;
border-radius: 4px;
}
`;
// 从外部传入,用于构建 SSE 连接 URL
@property({ type: String })
clientId = 'default-client';
// 组件内部状态
@state()
private status: ConnectionStatus = ConnectionStatus.CONNECTING;
@state()
private messages: string[] = [];
private eventSource: EventSource | null = null;
// Lit 的生命周期回调,当组件连接到 DOM 时调用
connectedCallback() {
super.connectedCallback();
this.connect();
}
// Lit 的生命周期回调,当组件从 DOM 中移除时调用
disconnectedCallback() {
super.disconnectedCallback();
this.disconnect();
}
private connect() {
// 确保在创建新连接前关闭旧连接
if (this.eventSource) {
this.eventSource.close();
}
const url = `/stream/events/${this.clientId}`;
this.eventSource = new EventSource(url);
this.status = ConnectionStatus.CONNECTING;
// 连接成功建立时的回调
this.eventSource.onopen = () => {
console.log('SSE connection opened.');
this.status = ConnectionStatus.OPEN;
};
// 发生错误时的回调
// EventSource 会在出错后自动尝试重连,所以我们只需更新状态
this.eventSource.onerror = (error) => {
console.error('EventSource failed:', error);
this.status = ConnectionStatus.CLOSED;
// 注意:不需要手动关闭,浏览器会自动处理重连逻辑
};
// 监听名为 'message' 的事件(这是默认事件类型)
this.eventSource.addEventListener('message', (event: MessageEvent) => {
console.log('Received SSE message:', event.data, 'with ID:', event.lastEventId);
// 将新消息添加到列表顶部,并保持列表长度不超过100
this.messages = [event.data, ...this.messages].slice(0, 100);
});
}
private disconnect() {
if (this.eventSource) {
// 主动关闭连接,这将停止浏览器的自动重连
this.eventSource.close();
this.eventSource = null;
this.status = ConnectionStatus.CLOSED;
console.log('SSE connection closed by component.');
}
}
render() {
return html`
<div class="status ${this.status.toLowerCase()}">
Status: ${this.status}
</div>
<ul>
${this.messages.map(msg => html`<li>${msg}</li>`)}
</ul>
`;
}
}
这个 Lit 组件非常健壮:
- 生命周期管理: 它利用
connectedCallback
和disconnectedCallback
来自动管理EventSource
的生命周期,防止内存泄漏。 - 状态反馈: UI 清晰地展示了连接的当前状态(
CONNECTING
,OPEN
,CLOSED
)。 - 利用原生能力: 它完全依赖浏览器
EventSource
的原生重连机制,无需编写复杂的重连和退避算法。当网络中断时,onerror
会被触发,UI 变为CLOSED
状态,但浏览器会在后台持续尝试重连。一旦成功,onopen
会再次触发,UI 恢复OPEN
状态。
方案的局限性与扩展
尽管 SSE 是一个非常适合此场景的方案,但在生产环境中应用时,仍需考虑其边界。
首先,SSE 是单向的。如果前端需要向后端发送消息,就需要配合使用传统的 AJAX/Fetch API,或者直接选择 WebSocket。对于需要双向通信的场景,WebSocket 仍然是更优的选择。
其次,浏览器对单个域名下的并发 HTTP 连接数有限制(HTTP/1.1 通常是 6 个)。如果一个页面打开了大量需要 SSE 连接的组件,可能会耗尽连接池。升级到 HTTP/2 可以很大程度上缓解这个问题,因为它支持在单个 TCP 连接上进行多路复用。
最后,SSE 网关本身是一个有状态的节点。水平扩展这个网关需要一些额外的架构考量。如果客户端可以随机连接到任何一个网关实例,那么一个 Kafka 分区(Partition)的消息可能需要被广播到所有网关实例,这会造成资源浪费。一种优化策略是使用支持粘性会话的负载均衡器,或者引入一个像 Redis Pub/Sub 这样的中间层,让网关实例订阅特定的频道,从而实现更精细的事件路由。但这会增加架构的复杂性,需要在可扩展性和简洁性之间做出权衡。