Tornado 结合 Milvus 与 SSE 构建实时向量相似性推送管道的实践


项目的需求很明确:当一个新商品上架时,系统需要立即通知那些正在浏览与该商品高度相似的其它商品的用户。常规的轮询方案在实时性和服务器负载上都无法接受,而WebSocket对于这种单向的服务器推送场景来说,显得过于笨重。我们的目标是构建一个轻量、高效、可维护的实时推送管道。

这个场景的核心挑战在于状态管理和事件流的解耦。一个用户的浏览行为创建了一个临时的“兴趣订阅”,而一个后台的商品入库行为则是一个触发事件。我们需要一个机制,能将这两个异步且独立的活动关联起来。

初步构想与技术选型

最初的架构草图如下:

  1. 前端: 用户浏览器通过 Server-Sent Events (SSE) 与后端建立一个长连接,订阅其当前正在浏览的商品ID。
  2. Web服务器: 必须能够高效地处理大量并发长连接。Python生态中,Tornado的异步IO模型和轻量级特性使其成为理想选择。它不像Django或Flask那样需要复杂的ASGI/WSGI服务器配置来处理长连接。
  3. 向量数据库: 商品的“相似性”通过Embedding向量来度量。Milvus作为业界领先的向量数据库,其高性能的ANN(近似最近邻)检索能力是这个方案的核心。
  4. 测试: 整个流程是异步且跨前后端的,单元测试无法覆盖完整的用户体验。端到端(E2E)测试是必须的,Cypress因其对异步操作的优秀处理能力和调试友好性而被选中。

数据流应该是这样的:

  1. 用户访问 product/A 页面,浏览器向Tornado发起一个SSE连接,表示“我对A感兴趣”。
  2. Tornado将这个连接与 product/A 关联并保持。
  3. 一个独立的后台进程处理新商品上架。当商品 B 上架时,它的向量被存入Milvus。
  4. 该进程立刻在Milvus中执行一次反向搜索:“哪些已存在的商品与 B 最相似?”。假设结果是 A
  5. 进程通知Tornado:“与 A 相关的连接,请推送 B 的信息”。
  6. Tornado找到所有订阅了 A 的SSE连接,并将新商品 B 的信息推送过去。
sequenceDiagram
    participant Browser
    participant Tornado
    participant IngestionService
    participant Milvus

    Browser->>+Tornado: GET /events?itemId=A (SSE a long connection)
    Tornado-->>-Browser: Connection established, headers set
    Note right of Tornado: Connection registered for itemId 'A'

    IngestionService->>+Milvus: Insert vector for new item 'B'
    Milvus-->>-IngestionService: Insert success

    IngestionService->>+Milvus: Search similar items to 'B'
    Milvus-->>-IngestionService: Found similar item 'A'

    IngestionService->>+Tornado: POST /internal/notify (itemId=A, newItem=B)
    Tornado->>Tornado: Find SSE connections for 'A'
    Tornado-->>-Browser: Send SSE message: {data: "New similar item 'B' found!"}

这个架构看起来可行,但有一个关键问题:在单体Tornado应用中,IngestionService可以直接访问Tornado内存中的连接管理器。但在真实项目中,Tornado服务为了扩展性通常会部署多个进程,IngestionService也可能是完全独立的服务。直接内存访问是行不通的。我们将在实现中解决这个问题,先从单进程模型开始构建,然后讨论其扩展性。

核心实现:Tornado SSE处理器与连接管理

首先是管理SSE连接的核心组件。我们需要一个地方来存储哪个客户端连接正在监听哪个商品ID。一个简单的字典就足够作为起点。

connection_manager.py:

import logging
from typing import Dict, Set
from tornado.iostream import StreamClosedError
from tornado.web import RequestHandler

# 设置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

class ConnectionManager:
    """
    一个简单的单例模式连接管理器,用于在内存中跟踪活动的SSE连接。
    在生产环境中,这应该被一个外部系统替代,比如Redis Pub/Sub。
    """
    _instance = None

    def __new__(cls):
        if cls._instance is None:
            cls._instance = super(ConnectionManager, cls).__new__(cls)
            # key: itemId (str), value: set of SSEHandler instances
            cls._instance.subscriptions: Dict[str, Set[RequestHandler]] = {}
        return cls._instance

    def subscribe(self, item_id: str, handler: RequestHandler):
        """订阅一个商品ID."""
        if item_id not in self.subscriptions:
            self.subscriptions[item_id] = set()
        self.subscriptions[item_id].add(handler)
        logging.info(f"Handler {id(handler)} subscribed to item_id: {item_id}")

    def unsubscribe(self, item_id: str, handler: RequestHandler):
        """取消订阅."""
        if item_id in self.subscriptions and handler in self.subscriptions[item_id]:
            self.subscriptions[item_id].remove(handler)
            logging.info(f"Handler {id(handler)} unsubscribed from item_id: {item_id}")
            if not self.subscriptions[item_id]:
                del self.subscriptions[item_id]

    async def broadcast(self, item_id: str, message: str):
        """向所有订阅了特定商品ID的连接广播消息."""
        if item_id not in self.subscriptions:
            return

        # 创建一个副本进行迭代,因为在迭代过程中可能会有连接关闭并修改集合
        handlers_to_notify = list(self.subscriptions[item_id])
        logging.info(f"Broadcasting to {len(handlers_to_notify)} handlers for item_id: {item_id}")
        
        for handler in handlers_to_notify:
            try:
                handler.write(f"data: {message}\n\n")
                await handler.flush()
            except StreamClosedError:
                logging.warning(f"Stream closed for handler {id(handler)} during broadcast. Cleaning up.")
                self.unsubscribe(item_id, handler)

# 创建一个全局实例
connection_manager = ConnectionManager()

这个管理器很简单,但在broadcast方法中隐藏了一个关键细节:迭代集合的副本。因为在writeflush时,如果客户端已经断开,会抛出StreamClosedError,我们需要在异常处理中调用unsubscribe来清理,这会修改原始集合。在迭代时修改集合是一个常见的错误,使用副本可以避免。

接下来是Tornado的SSEHandler

handlers.py:

import json
import logging
from tornado.web import RequestHandler
from tornado.iostream import StreamClosedError
from .connection_manager import connection_manager
from .milvus_service import MilvusService  # 稍后实现

class SSEHandler(RequestHandler):
    def initialize(self):
        self.set_header('Content-Type', 'text/event-stream')
        self.set_header('Cache-Control', 'no-cache')
        self.set_header('Access-Control-Allow-Origin', '*') # 仅为开发方便
        self.item_id = None

    async def get(self):
        self.item_id = self.get_argument('itemId', None)
        if not self.item_id:
            self.set_status(400)
            self.finish("Missing itemId parameter")
            return

        connection_manager.subscribe(self.item_id, self)
        
        # 保持连接开放
        try:
            # 等待连接关闭。这是一个技巧,让请求挂起。
            await self.wait_for_disconnect()
        except StreamClosedError:
            logging.info(f"Client disconnected for itemId: {self.item_id}")
        finally:
            # 确保在任何情况下都清理连接
            if self.item_id:
                connection_manager.unsubscribe(self.item_id, self)

    def on_connection_close(self):
        # Tornado会在连接关闭时自动调用此方法
        if self.item_id:
            connection_manager.unsubscribe(self.item_id, self)
        logging.info(f"Connection closed for handler {id(self)}")
    
    def wait_for_disconnect(self):
        # 返回一个永远不会完成的Future,直到连接关闭并引发StreamClosedError
        return self._get_client_future()

    # _get_client_future 是一个非公开的API,但在这里很有用
    # 用来获取一个在连接关闭时会解析的future
    def _get_client_future(self):
        if not hasattr(self, '_disconnect_future'):
            self._disconnect_future = self.request.connection.get_close_future()
        return self._disconnect_future

class NotifyHandler(RequestHandler):
    """一个内部API,用于触发通知"""
    def initialize(self, milvus_service: MilvusService):
        self.milvus_service = milvus_service

    async def post(self):
        try:
            # 在真实项目中,这个接口应该受到严格的访问控制
            body = json.loads(self.request.body)
            item_id = body['itemId']
            vector = body['vector']
            
            # 1. 插入新向量
            self.milvus_service.insert_vector(collection_name='products', entities=[{'id': item_id, 'vector': vector}])
            logging.info(f"Inserted new item {item_id} into Milvus.")

            # 2. 查找相似的已存在商品
            similar_item_ids = self.milvus_service.search_similar_and_get_ids(
                collection_name='products',
                vector=vector,
                limit=5 # 查找最相似的5个
            )
            logging.info(f"Found similar items for {item_id}: {similar_item_ids}")

            # 3. 广播通知
            notification_message = json.dumps({"newItemId": item_id, "message": f"A new item similar to what you're viewing is available!"})
            for similar_id in similar_item_ids:
                await connection_manager.broadcast(str(similar_id), notification_message)

            self.set_status(200)
            self.write({"status": "ok", "notified_for_items": similar_item_ids})
        except Exception as e:
            logging.error(f"Error in NotifyHandler: {e}", exc_info=True)
            self.set_status(500)
            self.write({"status": "error", "message": str(e)})

这里的SSEHandler通过wait_for_disconnect技巧性地挂起了请求,直到连接关闭。on_connection_close是最终的保障,确保任何连接中断都会触发清理逻辑。NotifyHandler则封装了完整的业务流程:接收新商品 -> 存入Milvus -> 反向搜索 -> 触发广播。

Milvus服务层:隔离数据库交互

将Milvus的交互逻辑封装在一个服务类中是良好的实践,便于管理连接和模拟测试。

milvus_service.py:

import logging
import random
from pymilvus import connections, utility, Collection, CollectionSchema, FieldSchema, DataType

class MilvusService:
    def __init__(self, host='localhost', port='19530', alias='default'):
        self.alias = alias
        try:
            connections.connect(alias=self.alias, host=host, port=port)
            logging.info(f"Connected to Milvus at {host}:{port}")
        except Exception as e:
            logging.error(f"Failed to connect to Milvus: {e}")
            raise

    def get_or_create_collection(self, name: str, dim: int) -> Collection:
        """获取或创建集合。在生产环境中,集合管理应由运维脚本完成。"""
        if utility.has_collection(name, using=self.alias):
            logging.info(f"Collection '{name}' already exists.")
            return Collection(name, using=self.alias)

        logging.info(f"Creating collection '{name}'...")
        fields = [
            FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=False),
            FieldSchema(name="vector", dtype=DataType.FLOAT_VECTOR, dim=dim)
        ]
        schema = CollectionSchema(fields, "Product similarity collection")
        collection = Collection(name, schema, using=self.alias)

        # 创建索引,这是性能的关键
        index_params = {
            "metric_type": "L2",
            "index_type": "HNSW",
            "params": {"M": 8, "efConstruction": 200}
        }
        collection.create_index(field_name="vector", index_params=index_params)
        logging.info(f"Created HNSW index for collection '{name}'.")
        return collection

    def insert_vector(self, collection_name: str, entities: list):
        """插入向量,并立即flush以保证可搜索性。"""
        collection = Collection(collection_name, using=self.alias)
        mr = collection.insert(entities)
        collection.flush()
        logging.info(f"Inserted {len(mr.primary_keys)} entities. PKs: {mr.primary_keys}")
        return mr

    def search_similar_and_get_ids(self, collection_name: str, vector: list, limit: int) -> list[int]:
        """搜索相似向量并仅返回ID列表。"""
        collection = Collection(collection_name, using=self.alias)
        collection.load() # 确保数据加载到内存中以供搜索
        
        search_params = {"metric_type": "L2", "params": {"ef": 50}}
        
        results = collection.search(
            data=[vector],
            anns_field="vector",
            param=search_params,
            limit=limit,
            expr=None,
            consistency_level="Strong" # 确保我们能搜到刚插入的数据
        )
        
        # results[0] 对应于我们搜索的第一个(也是唯一一个)向量
        hit_ids = [hit.id for hit in results[0]]
        return hit_ids

    def teardown(self):
        utility.drop_collection('products', using=self.alias)
        connections.disconnect(self.alias)
        logging.info("Milvus collection dropped and connection closed.")

这个服务类处理了连接、集合创建、索引配置和核心的增查操作。注意search中的consistency_level="Strong",这对于我们“插入后立即查询”的场景至关重要,尽管它会带来微小的性能开销。

组装应用并准备测试

app.py:

import tornado.ioloop
import tornado.web
import logging
from handlers import SSEHandler, NotifyHandler
from milvus_service import MilvusService

# 配置
DIMENSION = 128 # 向量维度
COLLECTION_NAME = 'products'

def make_app():
    milvus_service = MilvusService()
    # 确保集合存在
    milvus_service.get_or_create_collection(COLLECTION_NAME, DIMENSION)

    # 插入一些基础数据用于测试
    # 在真实项目中,数据是持续流入的
    base_vectors = [
        {'id': 100, 'vector': [random.random() for _ in range(DIMENSION)]},
        {'id': 101, 'vector': [random.random() + 0.1 for _ in range(DIMENSION)]}
    ]
    milvus_service.insert_vector(COLLECTION_NAME, base_vectors)

    return tornado.web.Application([
        (r"/events", SSEHandler),
        # 内部API,不应暴露给公网
        (r"/internal/notify", NotifyHandler, dict(milvus_service=milvus_service)),
    ])

if __name__ == "__main__":
    app = make_app()
    app.listen(8888)
    logging.info("Server started on port 8888")
    tornado.ioloop.IOLoop.current().start()

端到端测试:Cypress的用武之地

现在,整个后端管道已经就绪。如何验证它是工作的?手动测试非常繁琐且不可靠。我们需要一个自动化的E2E测试来模拟完整流程。

前端代码 (index.html)
为了让Cypress有页面可以访问,我们需要一个极其简单的HTML页面。

<!DOCTYPE html>
<html>
<head>
    <title>SSE Test</title>
</head>
<body>
    <h1>Listening for similar items to <span id="itemId"></span></h1>
    <div id="notifications"></div>

    <script>
        const urlParams = new URLSearchParams(window.location.search);
        const itemId = urlParams.get('itemId');
        document.getElementById('itemId').textContent = itemId;

        if (itemId) {
            const eventSource = new EventSource(`http://localhost:8888/events?itemId=${itemId}`);

            eventSource.onmessage = function(event) {
                const notificationsDiv = document.getElementById('notifications');
                const data = JSON.parse(event.data);
                
                const notif = document.createElement('div');
                notif.className = 'toast';
                notif.textContent = `${data.message} (New Item: ${data.newItemId})`;
                notificationsDiv.appendChild(notif);
            };

            eventSource.onerror = function(err) {
                console.error("EventSource failed:", err);
                eventSource.close();
            };
        }
    </script>
</body>
</html>

Cypress测试脚本 (cypress/e2e/similarity_alert.cy.js)

这里的挑战在于,测试需要编排两个独立的操作:

  1. 模拟用户访问页面,建立SSE连接。
  2. 模拟后台服务推送一个新商品。

cy.request非常适合用来触发后台API。

describe('Real-time Similarity Alert System', () => {
    
    // 生成一个与现有向量相似的向量
    // 在真实测试中,这应该从一个固定的、已知的数据集里获取
    const createSimilarVector = (baseVector, noise = 0.01) => {
        return baseVector.map(val => val + (Math.random() - 0.5) * noise);
    };

    it('should display a notification when a similar item is added', () => {
        // 目标:我们假设用户正在看 itemId=100 的商品
        const targetItemId = 100;

        // 步骤1: 模拟用户访问商品页面,这将建立一个SSE连接
        cy.visit(`index.html?itemId=${targetItemId}`);
        cy.get('#itemId').should('contain', targetItemId);
        cy.get('#notifications').should('be.empty');

        // 为了让测试稳定,我们稍等一下,确保SSE连接已建立。
        // 在更复杂的应用中,可以监听应用级别的事件。
        cy.wait(500); 

        // 步骤2: 模拟后台系统上架一个与itemId=100相似的新商品 (newItemId=500)
        // 我们需要一个基准向量。在真实测试中,可以通过API获取itemId=100的向量。
        // 这里为了简化,我们硬编码一个。
        const baseVector = Array.from({ length: 128 }, () => Math.random()); 
        const newItemVector = createSimilarVector(baseVector);
        const newItemId = 500;

        // 使用cy.request触发内部通知API
        cy.request({
            method: 'POST',
            url: 'http://localhost:8888/internal/notify',
            body: {
                itemId: newItemId,
                vector: newItemVector,
                // 为了让测试更精确,我们可以假设 Milvus 中已经有 id=100 的商品,
                // 并且它的 vector 就是 baseVector。
                // 我们的测试设置脚本应该保证这个前提。
            },
            headers: {
                'Content-Type': 'application/json'
            }
        }).then((response) => {
            expect(response.status).to.eq(200);
            // 响应体中应包含被通知的商品ID
            expect(response.body.notified_for_items).to.include(targetItemId);
        });

        // 步骤3: 验证UI上是否收到了通知
        // Cypress的should()会自动重试,直到超时。这完美地处理了SSE消息的异步性。
        cy.get('.toast', { timeout: 5000 })
            .should('be.visible')
            .and('contain', 'A new item similar to what you\'re viewing is available!')
            .and('contain', `New Item: ${newItemId}`);
    });
});

这个Cypress测试完美地模拟了整个流程。它首先作为“用户”访问前端页面,然后切换角色变为“后台服务”,通过cy.request调用API。最后,它再切回“用户”角色,断言UI因SSE推送而产生的变化。这种能力是E2E测试框架的核心价值。

遗留问题与未来迭代

当前这个实现在单进程模型下是可行的,但距离生产环境还有距离。

  1. 连接管理器的扩展性: ConnectionManager是单进程内存状态。当Tornado服务水平扩展到多个进程或多台机器时,一个进程无法将通知广播到由另一进程管理的连接。这里的标准解决方案是使用外部的Pub/Sub系统,例如Redis。

    • 改进方案: 当SSEHandler建立连接时,它不再将自己存入本地字典,而是向Redis订阅一个频道,如 subscribe("item_notifications:100")
    • NotifyHandler在需要广播时,不再调用本地方法,而是向Redis频道发布消息:publish("item_notifications:100", message)
    • 每个Tornado进程中的SSEHandler都会收到Redis的消息,并将其写入自己的客户端连接。
  2. 服务解耦: NotifyHandler直接与Milvus交互并负责业务逻辑。在更复杂的系统中,商品入库、向量化、存入Milvus和触发通知应该是几个独立的微服务,通过消息队列(如Kafka或RabbitMQ)进行通信。这可以提高系统的韧性和可扩展性。

  3. 测试数据依赖: Cypress测试目前依赖于后台已经存在的特定数据(如itemId=100)。一个更健壮的测试套件应该包含一个beforeEach钩子,通过API清理并设置测试所需的确切数据状态,确保测试的独立性和可重复性。


  目录