Building a Real-time Graph Feature Platform with Pulsar and Neo4j to Power Kubeflow Workflows


When dealing with modern fraud detection or recommendation systems, the core challenge is no longer the volume of data, but the depth and real-time nature of its connections. Traditional feature engineering heavily relies on key-value stores like Redis or Cassandra. They excel at handling aggregate features for individual entities, such as “the number of transactions for user A in the last 5 minutes.” However, for scenarios requiring deep relational analysis—like “is user A connected to a known fraud ring member C through two intermediary accounts?”—these key-value stores fall short. Such queries require graph traversal, a task traditionally relegated to offline batch processing (e.g., using Spark GraphX), whose latency is simply unacceptable for online business needs.

Our goal is to build a platform that can ingest events in real-time, dynamically update an entity-relationship graph, and serve low-latency graph features to downstream machine learning models managed by Kubeflow. This isn’t just about assembling a tech stack; it’s about making critical architectural trade-offs.

Defining the Problem: The Real-time Need Beyond Flat Features

A typical real-time feature platform architecture follows this pattern: Event Stream (Kafka/Pulsar) -> Stream Processing (Flink/Spark Streaming) -> KV Store (Redis). This is a mature, widely adopted architecture, effective for calculating features like rolling window aggregations, counts, and sums.

Approach A: Traditional KV-based Architecture

  • Implementation: Use Pulsar as the message bus to consume events like user behaviors and transactions. A Flink job performs real-time aggregations, writing calculated features like user_id_transaction_count_5m to Redis. The model training or online prediction services within Kubeflow then read these flat features directly from Redis.
  • Pros: Mature tech stack with a rich ecosystem. Extremely high performance for single-point reads and writes, with latency controllable within milliseconds. Relatively straightforward to operate.
  • Cons: Its fatal flaw is the inability to express and query relationships. All connections between entities are “flattened” into independent feature values during the stream processing phase. Answering the fraud-link question from earlier is nearly impossible. A common workaround is to pre-compute some path-based features, but this leads to feature explosion and fails to cover unknown query patterns. In real-world projects where fraud patterns are constantly evolving, the pre-computation approach quickly becomes obsolete.

Approach B: Real-time Graph Computing Architecture with a Graph Database

We must acknowledge that the problem lies at the core of the data model. If we need to process relationships, we should use a tool that natively supports a relational model. This brings us to our alternative: integrating the Neo4j graph database into our real-time data stream.

  • Implementation:

    1. Event Ingestion: Continue using Pulsar as the event source. Its multi-tenancy, tiered storage, and unified messaging model (streams and queues) provide immense flexibility.
    2. Graph Construction: Develop one or more dedicated Pulsar consumer services to replace some of the Flink aggregation logic. These services consume raw events and translate them into operations on the Neo4j graph (creating/updating nodes, establishing/deleting relationships).
    3. Feature Service: Deploy a standalone API service that receives requests (e.g., { "user_id": "u123" }) and then executes predefined Cypher queries to dynamically compute graph features in Neo4j, such as community detection (Label Propagation), shortest path, or PageRank.
    4. Model Integration: Kubeflow pipeline training tasks and online inference services call this feature service API to fetch real-time graph features.
    5. Visualization & Monitoring: A frontend application for visualizing local graph structures, monitoring data ingestion status, and tracking the performance of the feature service.
  • Pros:

    • Model is Data: Data is stored in its most natural form (a graph), and the query language (Cypher) is natively designed for graph traversal, making it extremely expressive.
    • Dynamic Features: Features are calculated on-the-fly at query time, enabling the system to adapt to changing business requirements without pre-computing all possible paths.
    • Deep Insights: Easily enables deep relational features previously only available through batch processing, such as community membership, centrality, and relationship paths.
  • Cons:

    • Architectural Complexity: Introduces Neo4j as a new core component, bringing new challenges related to cluster stability, performance tuning, and operations.
    • Performance Trade-offs: Graph query latency is typically higher than KV lookups, especially for deep or wide traversals. The pitfall here is that poorly designed Cypher queries can lead to a cascading performance failure in the database.
    • Data Consistency: Ensuring eventual consistency and transactional integrity in a high-throughput write system is a challenge that requires careful handling.

Final Decision and Rationale

We chose Approach B. Although more complex, it directly addresses the core business pain point. In the domain of fraud detection, the cost of missing a sophisticated fraud ring far outweighs the increased operational overhead of the new architecture. Approach A’s ceiling is too low to support future business evolution. We accept this trade-off and aim to mitigate its risks through meticulous engineering practices.

Core Implementation Overview

Below is the system architecture diagram and key code implementations.

graph TD
    subgraph "Event Sources"
        A[User Behavior] --> P
        B[Transaction Data] --> P
    end

    subgraph "Messaging & Processing Layer"
        P(Pulsar Topic: raw-events) --> C{Pulsar Consumer Service}
    end

    subgraph "Data Storage & Service Layer"
        C -- Cypher MERGE --> N[Neo4j Cluster]
        API[Feature Service API] -- Cypher READ --> N
    end

    subgraph "Machine Learning Platform"
        KF[Kubeflow Pipeline] -- HTTP Request --> API
    end

    subgraph "Monitoring & Management"
        FE[Frontend Dashboard] -- API Calls --> API
        FE -- Websocket/Polling --> P_Admin[Pulsar Admin API]
        FE -- Bolt Protocol --> N
    end

    style P fill:#2496ed,stroke:#333,stroke-width:2px
    style N fill:#008cc1,stroke:#333,stroke-width:2px
    style KF fill:#0769de,stroke:#333,stroke-width:2px

1. Pulsar Consumer: Real-time Graph Construction

This service is the bridge between the event stream and the graph database. We’ll implement a robust, idempotent Python consumer with proper error handling.

graph_builder_service.py

import pulsar
import json
import logging
from neo4j import GraphDatabase, exceptions
import os
import time

# --- Configuration ---
PULSAR_SERVICE_URL = os.environ.get("PULSAR_URL", "pulsar://localhost:6650")
PULSAR_TOPIC = os.environ.get("PULSAR_TOPIC", "persistent://public/default/raw-events")
PULSAR_SUBSCRIPTION = "graph-builder-subscription"
NEO4J_URI = os.environ.get("NEO4J_URI", "neo4j://localhost:7687")
NEO4J_USER = os.environ.get("NEO4J_USER", "neo4j")
NEO4J_PASSWORD = os.environ.get("NEO4J_PASSWORD", "password")

# --- Logging Setup ---
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

class GraphUpdater:
    def __init__(self, uri, user, password):
        # Pitfall: The driver object is thread-safe and should be reused 
        # as a singleton or global object throughout the application.
        self._driver = GraphDatabase.driver(uri, auth=(user, password))
        self.ensure_constraints()

    def close(self):
        self._driver.close()

    def ensure_constraints(self):
        # In a production environment, indexes and uniqueness constraints are vital 
        # for performance and data integrity.
        # Ensuring constraints exist on startup is a form of defensive programming.
        queries = [
            "CREATE CONSTRAINT IF NOT EXISTS FOR (u:User) REQUIRE u.userId IS UNIQUE",
            "CREATE CONSTRAINT IF NOT EXISTS FOR (d:Device) REQUIRE d.deviceId IS UNIQUE",
            "CREATE CONSTRAINT IF NOT EXISTS FOR (p:Payment) REQUIRE p.paymentId IS UNIQUE",
            "CREATE INDEX IF NOT EXISTS FOR (t:Transaction) ON (t.timestamp)"
        ]
        with self._driver.session(database="neo4j") as session:
            for query in queries:
                try:
                    session.run(query)
                    logging.info(f"Successfully applied constraint/index: {query}")
                except exceptions.ClientError as e:
                    logging.warning(f"Could not apply constraint (might exist): {e}")


    def process_transaction_event(self, event_data):
        # Use a managed transaction to ensure atomicity.
        # session.write_transaction automatically handles retry logic, which is key for production code.
        with self._driver.session(database="neo4j") as session:
            session.write_transaction(self._create_transaction_graph, event_data)

    @staticmethod
    def _create_transaction_graph(tx, data):
        # This is a core Cypher query using MERGE to ensure idempotency.
        # MERGE = MATCH or CREATE. It creates the node/relationship if it doesn't exist, otherwise it matches.
        # ON CREATE SET is used to set properties upon creation, ON MATCH SET for updates.
        # A common mistake: overusing CREATE, which leads to duplicate nodes in the graph.
        query = (
            "MERGE (u:User {userId: $userId}) "
            "MERGE (d:Device {deviceId: $deviceId}) "
            "MERGE (p:Payment {paymentId: $paymentId}) "
            "CREATE (t:Transaction {transactionId: $transactionId, amount: $amount, timestamp: datetime($timestamp)}) "
            "MERGE (u)-[:INITIATED]->(t) "
            "MERGE (d)-[:USED_FOR]->(t) "
            "MERGE (t)-[:TARGETS]->(p)"
        )
        tx.run(query, **data)
        logging.info(f"Processed transaction {data.get('transactionId')}")


def run_consumer():
    client = None
    consumer = None
    updater = GraphUpdater(NEO4J_URI, NEO4J_USER, NEO4J_PASSWORD)
    
    try:
        client = pulsar.Client(PULSAR_SERVICE_URL)
        consumer = client.subscribe(
            PULSAR_TOPIC,
            PULSAR_SUBSCRIPTION,
            # Key configuration: Dead Letter Queue. Unprocessable messages are sent here 
            # instead of blocking the entire consumption flow.
            dead_letter_policy=pulsar.DeadLetterPolicy(
                max_redeliver_count=5,
                dead_letter_topic='persistent://public/default/raw-events-dlq'
            )
        )

        logging.info("Pulsar consumer started, waiting for messages...")
        while True:
            msg = consumer.receive()
            try:
                event = json.loads(msg.data().decode('utf-8'))
                event_type = event.get("eventType")
                
                if event_type == "TRANSACTION":
                    updater.process_transaction_event(event["data"])
                # Could be extended to handle other event types
                # elif event_type == "LOGIN":
                #     updater.process_login_event(event["data"])
                
                # Acknowledge only after successful processing
                consumer.acknowledge(msg)
            except json.JSONDecodeError as e:
                logging.error(f"Failed to decode JSON: {e}")
                # Nack allows the message to be re-consumed
                consumer.negative_acknowledge(msg)
            except Exception as e:
                logging.error(f"An unexpected error occurred: {e}", exc_info=True)
                # This will trigger the DLQ policy
                consumer.negative_acknowledge(msg)
    except KeyboardInterrupt:
        logging.info("Consumer shutting down...")
    finally:
        if consumer:
            consumer.close()
        if client:
            client.close()
        updater.close()

if __name__ == "__main__":
    run_consumer()

2. Feature Service API: Dynamic Graph Feature Computation

This service acts as a decoupling layer between our models and the graph database. We use FastAPI for its high performance and ease of use.

feature_api.py

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from neo4j import GraphDatabase, exceptions
import os
import logging

# --- Configuration ---
NEO4J_URI = os.environ.get("NEO4J_URI", "neo4j://localhost:7687")
NEO4J_USER = os.environ.get("NEO4J_USER", "neo4j")
NEO4J_PASSWORD = os.environ.get("NEO4J_PASSWORD", "password")

# --- Logging Setup ---
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

app = FastAPI()

# --- Neo4j Driver Singleton ---
# Manage the driver lifecycle with FastAPI's startup and shutdown events.
@app.on_event("startup")
async def startup_event():
    app.state.driver = GraphDatabase.driver(NEO4J_URI, auth=(NEO4J_USER, NEO4J_PASSWORD))

@app.on_event("shutdown")
async def shutdown_event():
    if app.state.driver:
        app.state.driver.close()

class FeatureRequest(BaseModel):
    user_id: str

class FeatureResponse(BaseModel):
    user_id: str
    shared_device_count: int
    second_degree_fraud_contacts: int
    transaction_amount_stddev_24h: float

# --- Unit Testing Strategy ---
# 1. Mock the Neo4j driver to test API routing and request/response models.
# 2. Write integration tests for the _get_features_for_user function below,
#    connecting to a test database with pre-populated graph data to validate
#    the Cypher query's results.

async def _get_features_for_user(user_id: str):
    # This is the most critical part: the Cypher query.
    # The design of this query directly determines the API's performance.
    # We combine the calculation of multiple features into a single query to reduce database round-trips.
    query = """
    MATCH (u:User {userId: $userId})
    // Feature 1: Shared device count (1-degree connections)
    OPTIONAL MATCH (u)-[:INITIATED]->()-[:USED_FOR]-(d:Device)-[:USED_FOR]-()<-[:INITIATED]-(other:User)
    WITH u, count(DISTINCT other) AS sharedDeviceUsers
    
    // Feature 2: Number of fraud users within 2 hops
    OPTIONAL MATCH (u)-[*1..2]-(contact:User)-[:HAS_STATUS]->(s:Status {type: 'FRAUD'})
    WITH u, sharedDeviceUsers, count(DISTINCT contact) AS fraudContactsIn2Hops
    
    // Feature 3: Standard deviation of transaction amounts in the last 24 hours
    OPTIONAL MATCH (u)-[:INITIATED]->(t:Transaction)
    WHERE t.timestamp > datetime() - duration({days: 1})
    WITH u, sharedDeviceUsers, fraudContactsIn2Hops, collect(t.amount) AS amounts
    
    RETURN
        u.userId AS userId,
        sharedDeviceUsers,
        fraudContactsIn2Hops,
        CASE
            WHEN size(amounts) > 1 THEN stDev(amounts)
            ELSE 0.0
        END AS amountStdDev
    """
    
    try:
        with app.state.driver.session(database="neo4j") as session:
            result = session.run(query, userId=user_id)
            record = result.single()
            if not record:
                raise HTTPException(status_code=404, detail="User not found")
            return {
                "user_id": record["userId"],
                "shared_device_count": record["sharedDeviceUsers"],
                "second_degree_fraud_contacts": record["fraudContactsIn2Hops"],
                "transaction_amount_stddev_24h": record["amountStdDev"]
            }
    except exceptions.ServiceUnavailable as e:
        logging.error(f"Neo4j connection error: {e}")
        raise HTTPException(status_code=503, detail="Feature database is unavailable")
    except Exception as e:
        logging.error(f"Error fetching features for {user_id}: {e}", exc_info=True)
        raise HTTPException(status_code=500, detail="Internal server error")


@app.post("/features", response_model=FeatureResponse)
async def get_features(request: FeatureRequest):
    """
    Computes and returns a graph feature vector for the specified user_id.
    This endpoint is called by online inference services in Kubeflow.
    """
    features = await _get_features_for_user(request.user_id)
    return features

3. Frontend Visualization: Gaining Insight into Relationships

For ML engineers and data analysts, a frontend interface that can intuitively display local subgraphs is invaluable. It’s not just for data exploration but also a powerful tool for debugging feature issues. Here’s some pseudo-code for a React component to illustrate the idea.

GraphExplorer.tsx (React + vis-network)

import React, { useState, useEffect, useCallback } from 'react';
import { Network } from 'vis-network';
import 'vis-network/styles/vis-network.css';

// This is a simplified frontend component to display a subgraph centered on a specific user.
// In a real project, this data would be fetched from an API; here, it's mocked.

// API client to fetch subgraph data for a given user ID
// async function fetchSubgraph(userId) {
//   const response = await fetch(`/api/subgraph/${userId}`);
//   return await response.json(); // { nodes: [], edges: [] }
// }

const GraphExplorer = ({ centerNodeId }) => {
  const containerRef = React.useRef(null);
  const [network, setNetwork] = useState(null);

  const drawGraph = useCallback(() => {
    if (!containerRef.current) return;

    // Mock data fetched from an API
    const data = {
      nodes: [
        { id: 'u123', label: 'User: u123', group: 'user', shape: 'dot', color: '#ff6347' },
        { id: 'd456', label: 'Device: d456', group: 'device' },
        { id: 'u789', label: 'User: u789', group: 'user' },
        { id: 'p999', label: 'Payment: p999', group: 'payment' },
        { id: 'fraud_ring', label: 'Fraud Ring', group: 'fraud' }
      ],
      edges: [
        { from: 'u123', to: 'd456' },
        { from: 'u789', to: 'd456' },
        { from: 'u789', to: 'fraud_ring' }
      ]
    };

    const options = {
      nodes: {
        shape: 'box',
        font: { size: 14, color: '#ffffff' },
        borderWidth: 2
      },
      edges: {
        width: 2,
        arrows: 'to'
      },
      physics: {
        enabled: true,
        solver: 'barnesHut',
        barnesHut: {
          gravitationalConstant: -8000,
          springConstant: 0.04,
          springLength: 95
        }
      },
      interaction: {
        hover: true,
        tooltipDelay: 300,
        dragNodes: true,
      },
      layout: {
          hierarchical: false
      }
    };

    const net = new Network(containerRef.current, data, options);
    setNetwork(net);

  }, [centerNodeId]);

  useEffect(() => {
    // In a real application, this would trigger an API call
    // fetchSubgraph(centerNodeId).then(data => drawGraph(data));
    drawGraph();
  }, [drawGraph]);

  return <div ref={containerRef} style={{ height: '600px', border: '1px solid #ccc' }} />;
};

export default GraphExplorer;

Architectural Extensibility and Limitations

The extension points of this architecture are clear. We can handle new event types by adding new Pulsar consumers to define new node and relationship types. The feature API can easily add new query endpoints to provide different feature dimensions without altering the underlying data structure. Neo4j itself supports a Causal Clustering mode, which allows read loads to be distributed across multiple instances, ensuring high availability.

However, this solution is not without its limitations. The performance bottleneck of this architecture is most likely to be Neo4j. Queries involving supernodes (nodes with millions of connected edges) will suffer from severe performance degradation without special optimizations like relationship sampling or pre-aggregation. The system’s end-to-end latency is also limited by its slowest component: Pulsar consumption latency + Neo4j write latency + API query latency. For use cases requiring sub-millisecond latency, this architecture may not be suitable, and a traditional KV store would still be the preferred choice. Furthermore, maintaining a production-grade Neo4j cluster requires specialized knowledge, placing higher demands on the team’s skillset. Finally, unbounded graph growth will lead to increased storage costs and performance degradation, requiring a well-designed TTL (Time-To-Live) policy or an archiving mechanism to periodically prune old, inactive graph data.


  TOC