利用 Serverless 与 PWA 构建基于 SQLite 的离线优先数据同步层


要构建一个真正意义上的离线优先(Offline-First)应用,最大的挑战并非在客户端缓存静态资源,而在于如何处理离线期间的数据变更,并在网络恢复时与云端进行可靠同步。传统的方案是在客户端和服务端各维护一套复杂的状态机,但这往往导致代码臃肿、难以维护,并且在处理网络不稳、并发编辑等边缘场景时极易出错。

我们的目标是设计一个健壮、可维护的数据同步层,它必须满足以下几个核心要求:

  1. 客户端持久化: 使用关系型数据库的能力,而不是简单的键值存储。
  2. 可靠上传: 保证离线期间的本地修改最终能送达服务器,即使应用被关闭或刷新。
  3. 幂等处理: 服务端必须能处理重复的客户端请求,避免数据不一致。
  4. 冲突消解: 必须有一套明确的机制来处理多个客户端同时修改同一份数据的情况。

为此,我们选择的技术栈是 PWA + WebAssembly SQLite + Serverless。PWA的Service Worker为我们提供了后台同步的能力;WASM SQLite (通过wa-sqlite库) 让我们在浏览器中获得了功能完备的SQL数据库;Serverless架构则为我们提供了处理无状态、事件驱动型同步请求的理想后端。

我们的核心策略是实现一个基于客户端“发件箱模式”(Outbox Pattern)的异步数据复制机制。

第一步:客户端数据库与发件箱的设计

放弃IndexedDB,选择SQLite的首要原因是为了其强大的事务能力和关系查询能力。在复杂的应用中,这能极大简化客户端的业务逻辑。我们使用wa-sqlite库,因为它提供了对Origin Private File System (OPFS)的底层支持,这是实现真正持久化存储的关键。

首先是数据库表结构的设计。除了业务表,我们还需要一个sync_queue表作为我们的发件箱。

-- 业务数据表:以一个简单的文档为例
CREATE TABLE IF NOT EXISTS documents (
    id TEXT PRIMARY KEY,
    content TEXT NOT NULL,
    -- 版本号,用于乐观锁和冲突检测
    version INTEGER NOT NULL DEFAULT 1,
    -- 最后修改时间戳 (UTC milliseconds)
    last_modified_at INTEGER NOT NULL,
    -- 脏标记,表示本地有未同步的修改
    is_dirty INTEGER NOT NULL DEFAULT 0
);

-- 发件箱表:记录所有待同步的数据变更
CREATE TABLE IF NOT EXISTS sync_queue (
    -- 本地自增ID
    op_id INTEGER PRIMARY KEY AUTOINCREMENT,
    -- 幂等性保障的关键,由客户端生成
    request_id TEXT NOT NULL UNIQUE,
    -- 操作类型: 'INSERT', 'UPDATE', 'DELETE'
    op_type TEXT NOT NULL,
    -- 关联的表名
    table_name TEXT NOT NULL,
    -- 变更的数据行ID
    row_id TEXT NOT NULL,
    -- 变更的数据载荷 (JSON格式)
    -- 对于UPDATE,这里是变更的字段
    -- 对于INSERT,这里是完整的新行数据
    -- 对于DELETE,这里可以为空
    payload TEXT,
    -- 操作发生的时间戳
    created_at INTEGER NOT NULL
);

这里的关键设计在于sync_queue表。它不是简单记录最终状态,而是记录了每一次的“变更意图”。request_id是一个客户端生成的UUID,它将伴随整个同步流程,是服务端实现幂等性的基石。

接下来是数据操作层的实现。在真实项目中,所有的数据库写入操作都必须封装在一个事务里,同时更新业务表和sync_queue表。这是一个常见的错误,即只更新业务表,然后异步地去写队列,这在两者之间存在崩溃的风险,导致数据不一致。

// src/services/database.ts

import { SQLiteCloud, VFS } from 'wa-sqlite';
// 具体的wa-sqlite初始化代码省略,假设我们已经有一个db实例

interface Document {
  id: string;
  content: string;
  version: number;
  last_modified_at: number;
  is_dirty: number;
}

// 这是一个Repository层的核心方法,演示了如何原子性地写入业务表和发件箱
export async function updateDocument(db: SQLiteCloud, docId: string, newContent: string): Promise<void> {
  const now = Date.now();
  const requestId = crypto.randomUUID();

  // 获取当前的文档版本
  const [currentDoc] = await db.exec<Document>({
    sql: 'SELECT version FROM documents WHERE id = ?',
    bind: [docId],
    rowMode: 'object',
  });

  if (!currentDoc) {
    throw new Error(`Document with id ${docId} not found.`);
  }
  const nextVersion = currentDoc.version + 1;

  const transaction = [
    {
      sql: `UPDATE documents SET content = ?, version = ?, last_modified_at = ?, is_dirty = 1 WHERE id = ?`,
      bind: [newContent, nextVersion, now, docId],
    },
    {
      sql: `INSERT INTO sync_queue (request_id, op_type, table_name, row_id, payload, created_at) VALUES (?, ?, ?, ?, ?, ?)`,
      bind: [
        requestId,
        'UPDATE',
        'documents',
        docId,
        JSON.stringify({ content: newContent }), // payload只包含变更部分
        now,
      ],
    },
  ];

  try {
    // wa-sqlite的事务执行方式
    await db.exec('BEGIN IMMEDIATE');
    for (const stmt of transaction) {
      await db.exec(stmt);
    }
    await db.exec('COMMIT');
    
    // 事务成功后,尝试触发一次立即同步
    // 在真实应用中,这里会通过 postMessage 通知 Service Worker
    console.log('Transaction successful. Triggering background sync...');
    navigator.serviceWorker.ready.then(registration => {
      if (registration.sync) {
        registration.sync.register('data-sync');
      }
    });

  } catch (err) {
    console.error('Transaction failed. Rolling back.', err);
    await db.exec('ROLLBACK');
    throw err; // 向上层抛出异常
  }
}

这个updateDocument函数是整个客户端逻辑的核心。它保证了业务数据的修改和同步任务的创建是原子性的。要么都成功,要么都失败。

第二步:Service Worker的后台同步逻辑

PWA的魅力在于Service Worker (SW)。我们使用Periodic Background Sync APIBackground Sync API来实现数据同步。前者用于定期轮询,后者用于在网络恢复时立即触发。

// public/sw.js

// 监听sync事件
self.addEventListener('sync', (event) => {
  if (event.tag === 'data-sync') {
    event.waitUntil(handleSync());
  }
});

// 也可以用 periodic sync
self.addEventListener('periodicsync', (event) => {
  if (event.tag === 'data-sync-periodic') {
    event.waitUntil(handleSync());
  }
});

async function handleSync() {
  // 这里的 openDatabase() 是一个辅助函数,用于在SW中打开与主线程相同的wa-sqlite数据库实例
  // 这部分实现较为复杂,涉及到与主线程的通信和锁机制,此处为简化示意
  const db = await openDatabase(); 
  
  // 批量从发件箱读取任务
  const batchSize = 20;
  const operations = await db.exec({
    sql: `SELECT * FROM sync_queue ORDER BY op_id ASC LIMIT ?`,
    bind: [batchSize],
    rowMode: 'object',
  });

  if (operations.length === 0) {
    console.log('Sync handler: No pending operations.');
    return;
  }

  console.log(`Sync handler: Found ${operations.length} operations to sync.`);

  try {
    const response = await fetch('/api/sync', {
      method: 'POST',
      headers: {
        'Content-Type': 'application/json',
      },
      body: JSON.stringify({ operations }),
    });

    if (!response.ok) {
      // 服务端返回非2xx状态码,意味着可能是业务逻辑错误或永久性错误
      // 这里的错误处理策略至关重要
      const errorData = await response.json();
      console.error('Server returned an error:', response.status, errorData);
      
      if (response.status === 409) { // 409 Conflict
        // 服务端检测到版本冲突,需要处理
        await handleConflict(db, errorData.conflicts);
      } else if (response.status >= 400 && response.status < 500) {
        // 客户端错误,可能数据格式有问题,直接丢弃或标记为死信
        await markAsFailed(db, operations.map(op => op.op_id));
      }
      
      // 如果是5xx错误,fetch会抛出异常进入catch块,由重试机制处理
      throw new Error(`Server error: ${response.status}`);
    }

    const result = await response.json();

    // 服务端成功处理,从本地发件箱中删除这些操作
    // 同样,这必须在一个事务中完成
    const processedOpIds = operations.map(op => op.op_id);
    await db.exec('BEGIN');
    await db.exec({
      sql: `DELETE FROM sync_queue WHERE op_id IN (${processedOpIds.map(() => '?').join(',')})`,
      bind: processedOpIds,
    });
    
    // 如果服务端返回了需要客户端更新的数据(来自其他客户端的变更)
    if (result.downstreamChanges && result.downstreamChanges.length > 0) {
        await applyDownstreamChanges(db, result.downstreamChanges);
    }
    await db.exec('COMMIT');

    console.log(`Sync handler: Successfully processed ${operations.length} operations.`);

  } catch (error) {
    console.error('Sync handler: Failed to sync with server. Will retry later.', error);
    // 网络错误或5xx错误,不删除队列中的任务,Background Sync会自动重试
    throw error; // 必须向上抛出,告知浏览器sync失败
  }
}

// 冲突和下行同步的处理函数 (示意)
async function handleConflict(db, conflicts) { /* ... */ }
async function applyDownstreamChanges(db, changes) { /* ... */ }
async function markAsFailed(db, opIds) { /* ... */ }

SW中的handleSync是整个系统的引擎。它的健壮性直接决定了系统的可靠性。注意,只有在确认服务端成功处理后,才能删除本地sync_queue中的任务。任何网络失败或服务端5xx错误都会导致SW自动进行指数退避重试。

第三步:Serverless后端的幂等性与业务处理

我们使用AWS Lambda和API Gateway来构建/api/sync端点。后端存储可以使用DynamoDB或Aurora Serverless。这里以DynamoDB为例。

后端的首要职责是实现幂等性。我们通过检查客户端传来的request_id来做到这一点。

# serverless.yml or SAM template.yaml
AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: PWA Sync Backend

Resources:
  SyncFunction:
    Type: AWS::Serverless::Function
    Properties:
      CodeUri: src/
      Handler: app.handler
      Runtime: nodejs18.x
      Architectures: [arm64]
      Policies:
        - DynamoDBCrudPolicy:
            TableName: !Ref DocumentsTable
        - DynamoDBCrudPolicy:
            TableName: !Ref ProcessedRequestsTable
      Events:
        SyncApi:
          Type: Api
          Properties:
            Path: /sync
            Method: post

  DocumentsTable:
    Type: AWS::DynamoDB::Table
    Properties:
      AttributeDefinitions:
        - AttributeName: "id"
          AttributeType: "S"
      KeySchema:
        - AttributeName: "id"
          KeyType: "HASH"
      BillingMode: PAY_PER_REQUEST

  ProcessedRequestsTable:
    Type: AWS::DynamoDB::Table
    Properties:
      AttributeDefinitions:
        - AttributeName: "requestId"
          AttributeType: "S"
      KeySchema:
        - AttributeName: "requestId"
          KeyType: "HASH"
      BillingMode: PAY_PER_REQUEST
      TimeToLiveSpecification:
        AttributeName: "ttl"
        Enabled: true

我们创建了一个ProcessedRequestsTable,专门用来存储已经处理过的request_id,并设置了TTL,让记录在一段时间后自动过期,以控制成本。

Lambda函数的处理逻辑如下:

// src/app.js
import { DynamoDBClient } from "@aws-sdk/client-dynamodb";
import { DynamoDBDocumentClient, GetCommand, PutCommand, UpdateCommand, TransactWriteCommand } from "@aws-sdk/lib-dynamodb";

const client = new DynamoDBClient({});
const docClient = DynamoDBDocumentClient.from(client);

const DOCUMENTS_TABLE = process.env.DOCUMENTS_TABLE;
const PROCESSED_REQUESTS_TABLE = process.env.PROCESSED_REQUESTS_TABLE;

export const handler = async (event) => {
  const { operations } = JSON.parse(event.body);

  if (!operations || !Array.isArray(operations) || operations.length === 0) {
    return { statusCode: 400, body: JSON.stringify({ message: "Invalid operations payload" }) };
  }

  // DynamoDB TransactWriteItems有最多100个操作的限制
  // 生产环境需要做批处理
  
  const transactionItems = [];
  const conflicts = [];
  
  for (const op of operations) {
    // 1. 幂等性检查
    const checkIdempotency = new GetCommand({
      TableName: PROCESSED_REQUESTS_TABLE,
      Key: { requestId: op.request_id },
    });
    
    const existingRequest = await docClient.send(checkIdempotency);
    if (existingRequest.Item) {
      console.log(`Request ${op.request_id} already processed. Skipping.`);
      continue; // 如果已经处理过,直接跳过
    }

    // 2. 标记请求ID为已处理 (加入事务)
    const ttl = Math.floor(Date.now() / 1000) + 86400; // 24小时后过期
    transactionItems.push({
        Put: {
            TableName: PROCESSED_REQUESTS_TABLE,
            Item: { requestId: op.request_id, ttl },
        }
    });

    // 3. 核心业务逻辑与冲突检测
    if (op.op_type === 'UPDATE') {
        const { content } = JSON.parse(op.payload);
        
        // 获取当前数据库中的版本
        const getCmd = new GetCommand({ TableName: DOCUMENTS_TABLE, Key: { id: op.row_id } });
        const { Item: currentDoc } = await docClient.send(getCmd);
        const serverVersion = currentDoc ? currentDoc.version : 0;

        // 客户端的版本号计算是 currentVersion + 1
        // 所以客户端认为的“更新前”版本是 op.payload.version - 1 (如果客户端传了的话)
        // 一个更简单的策略是服务端全权负责版本,客户端只提供变更
        // 这里我们假设客户端的version是基于它本地的state,而我们需要和服务器的state对比
        // 伪代码: client_base_version = get_version_from_op(op)
        // if (client_base_version !== serverVersion) {
        //    conflicts.push({ row_id: op.row_id, server_version: serverVersion });
        //    continue; // 发生冲突,跳过此操作
        // }
        
        const newVersion = serverVersion + 1;
        
        transactionItems.push({
            Update: {
                TableName: DOCUMENTS_TABLE,
                Key: { id: op.row_id },
                UpdateExpression: "SET content = :c, version = :v, last_modified_at = :t",
                // ConditionExpression 确保我们只在版本匹配时才更新,这是乐观锁的核心
                ConditionExpression: "attribute_not_exists(id) OR version = :current_v",
                ExpressionAttributeValues: {
                    ":c": content,
                    ":v": newVersion,
                    ":t": Date.now(),
                    ":current_v": serverVersion
                },
            }
        });
    }
    // ... 此处省略 INSERT 和 DELETE 的逻辑
  }
  
  if (conflicts.length > 0) {
      return { statusCode: 409, body: JSON.stringify({ message: "Conflict detected", conflicts }) };
  }

  if (transactionItems.length === 0) {
      return { statusCode: 200, body: JSON.stringify({ message: "No new operations to process." }) };
  }

  try {
    const transactWrite = new TransactWriteCommand({ TransactItems: transactionItems });
    await docClient.send(transactWrite);
    
    // 成功后,可以查询下行数据并返回给客户端
    // const downstreamChanges = await getDownstreamChanges(...);

    return {
        statusCode: 200,
        body: JSON.stringify({ 
            success: true,
            // downstreamChanges: downstreamChanges 
        }),
    };
  } catch (err) {
    // 特别处理条件检查失败的异常(冲突)
    if (err.name === 'TransactionCanceledException' && err.CancellationReasons.some(r => r.Code === 'ConditionalCheckFailed')) {
         return { statusCode: 409, body: JSON.stringify({ message: "Conflict detected during transaction" }) };
    }
    console.error("Transaction failed", err);
    return { statusCode: 500, body: JSON.stringify({ message: "Internal server error" }) };
  }
};

此Lambda函数展示了处理同步请求的核心步骤:

  1. 对每个操作进行幂等性检查。
  2. 使用DynamoDB的ConditionExpression实现乐观锁,这是冲突检测的关键。
  3. 将所有数据库操作(包括幂等性标记的写入)打包到一个DynamoDB事务中,保证原子性。

数据流与架构图

整个数据同步的生命周期可以用下图来描述:

sequenceDiagram
    participant User
    participant PWA_MainThread as PWA Main Thread
    participant PWA_ServiceWorker as PWA Service Worker
    participant API_Gateway as Serverless API
    participant Lambda
    participant Cloud_DB as Cloud Database
    participant Idempotency_Store as Idempotency Store

    User->>PWA_MainThread: 修改数据
    activate PWA_MainThread
    PWA_MainThread->>PWA_MainThread: 开始事务
    PWA_MainThread->>PWA_MainThread: 1. 更新业务表 (documents)
    PWA_MainThread->>PWA_MainThread: 2. 插入同步任务 (sync_queue)
    PWA_MainThread->>PWA_MainThread: 提交事务
    PWA_MainThread-->>User: UI更新
    PWA_MainThread->>PWA_ServiceWorker: 触发 'data-sync' 事件
    deactivate PWA_MainThread

    activate PWA_ServiceWorker
    PWA_ServiceWorker->>PWA_ServiceWorker: 监听到 sync 事件
    PWA_ServiceWorker->>PWA_ServiceWorker: 从 sync_queue 读取一批任务
    PWA_ServiceWorker->>API_Gateway: POST /api/sync (带operations)
    
    activate API_Gateway
    API_Gateway->>Lambda: 触发执行
    activate Lambda
    Lambda->>Idempotency_Store: 检查 request_id 是否存在
    alt request_id 已存在
        Idempotency_Store-->>Lambda: 已存在
        Lambda-->>API_Gateway: 返回成功 (跳过处理)
    else request_id 不存在
        Idempotency_Store-->>Lambda: 不存在
        Lambda->>Cloud_DB: 开始数据库事务
        Lambda->>Cloud_DB: 检查版本号 (乐观锁)
        alt 版本冲突
            Cloud_DB-->>Lambda: 版本不匹配
            Lambda->>Lambda: 回滚事务
            Lambda-->>API_Gateway: 返回 409 Conflict
        else 版本匹配
            Cloud_DB-->>Lambda: 版本匹配
            Lambda->>Cloud_DB: 更新业务数据
            Lambda->>Idempotency_Store: 写入 request_id
            Lambda->>Cloud_DB: 提交数据库事务
            Lambda-->>API_Gateway: 返回 200 OK
        end
    end
    deactivate Lambda
    
    API_Gateway-->>PWA_ServiceWorker: 返回处理结果
    deactivate API_Gateway
    
    alt 响应成功 (200 OK)
        PWA_ServiceWorker->>PWA_ServiceWorker: 从 sync_queue 删除已处理任务
    else 响应冲突 (409 Conflict)
        PWA_ServiceWorker->>PWA_ServiceWorker: 执行冲突解决逻辑
    else 网络/服务器错误
        PWA_ServiceWorker->>PWA_ServiceWorker: 保留任务,等待下次重试
    end
    deactivate PWA_ServiceWorker

局限性与未来迭代方向

我们构建的这个同步层虽然健壮,但并非万能。它也存在一些局限和可以优化的方向:

  1. 下行同步机制: 当前模型依赖客户端在每次上行同步成功后,顺便拉取下行数据。这是一种轮询机制,存在延迟。对于需要实时协作的应用,应引入WebSocket或Server-Sent Events,但这会给纯Serverless架构带来状态管理的挑战(例如,需要维护连接ID与用户身份的映射)。
  2. 冲突解决策略: 我们采用了基于版本号的“先到先得”冲突检测,并将解决冲突的责任推给了客户端。对于文档编辑等复杂场景,这远远不够。更高级的方案是引入CRDTs(无冲突复制数据类型)或 operational transformation (OT),但这会极大地增加客户端和服务器的逻辑复杂度。
  3. Schema迁移: 当数据库表结构需要变更时,如何协调和迁移分布在成千上万个客户端上的本地数据库是一个巨大的挑战。需要设计一套版本化的迁移脚本和相应的客户端逻辑来平滑处理升级。
  4. 性能与成本: 大量小规模的写操作可能会增加Serverless的调用成本。客户端的批量提交和后端的批量处理是降低成本的关键。此外,sync_queue表可能会持续增长,需要定期清理已完成但因某些原因未被删除的旧任务。

尽管存在这些挑战,这个基于PWA、WASM SQLite和Serverless的架构为构建高性能、高可用的离线优先应用提供了一个坚实且可扩展的工程起点。它将复杂的状态管理和重试逻辑从应用层剥离,形成了一个独立的、可复用的同步层。


  目录