Skip to content

如何设计与实现高性能的事件存储(Event Store)

本技能旨在引导开发者构建事件溯源系统(Event-Sourced Systems)的核心基础设施,解决如何可靠地持久化、读取和订阅不可变事件流的问题。

为什么需要这个技能

在传统的 CRUD 架构中,数据库只保存实体的当前状态,历史变更会被覆盖。但在金融交易、审计追踪或复杂状态机等场景下,我们需要记录每一次状态变更(事件)。

事件存储(Event Store)不同于普通数据库,它要求严格的追加写(Append-only)强顺序性不可变性。设计一个合格的 Event Store 需要处理乐观并发控制(OCC)、全局位置索引以及高效的快照机制,否则在规模扩大后会出现严重的数据不一致或性能瓶颈。

适用场景

  • 构建事件溯源基础设施:从零搭建支持状态回溯的系统。
  • 技术选型对比:在 EventStoreDB、PostgreSQL、Kafka 或 DynamoDB 之间做出选择。
  • 实现自定义持久化模式:编写支持流(Stream)概念的存储层代码。
  • 优化存储与检索:通过索引和快照(Snapshot)提升读取聚合根的速度。
  • 规划扩展方案:设计支持高吞吐量的事件存储架构。

核心工作流

1. 确定架构要求

一个标准的 Event Store 必须满足以下核心特性:

  • Append-only:事件一旦写入不可修改。
  • Ordered:确保单个流内及全局的顺序性。
  • Versioned:通过版本号实现乐观并发控制,防止覆盖写入。
  • Subscriptions:支持实时事件通知。

2. 选择存储技术

根据业务需求选择底层实现:

  • EventStoreDB:原生支持事件溯源,功能最全。
  • PostgreSQL:适合已有关系型数据库栈,通过 JSONB 实现灵活存储。
  • Kafka:适合极高吞吐量的流处理,但单流点查询较弱。
  • DynamoDB:适合 AWS Serverless 架构,扩展性强。

3. 实现模式

  • Schema 设计:定义包含 stream_idevent_typeversionglobal_position 的表结构。
  • 写入流程:验证 expected_version 写入事件 更新全局位置。
  • 读取流程:根据 stream_id 顺序加载事件 在内存中重建状态(Rehydration)。

代码实现示例

PostgreSQL 事件存储 Schema

sql
-- Events table
CREATE TABLE events (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    stream_id VARCHAR(255) NOT NULL,
    stream_type VARCHAR(255) NOT NULL,
    event_type VARCHAR(255) NOT NULL,
    event_data JSONB NOT NULL,
    metadata JSONB DEFAULT '{}',
    version BIGINT NOT NULL,
    global_position BIGSERIAL,
    created_at TIMESTAMPTZ DEFAULT NOW(),

    CONSTRAINT unique_stream_version UNIQUE (stream_id, version)
);

-- Index for stream queries
CREATE INDEX idx_events_stream_id ON events(stream_id, version);

Python 异步写入实现(乐观并发控制)

python
async def append_events(
    self,
    stream_id: str,
    stream_type: str,
    events: List[Event],
    expected_version: Optional[int] = None
) -> List[Event]:
    async with self.pool.acquire() as conn:
        async with conn.transaction():
            if expected_version is not None:
                current = await conn.fetchval(
                    "SELECT MAX(version) FROM events WHERE stream_id = $1",
                    stream_id
                )
                current = current or 0
                if current != expected_version:
                    raise ConcurrencyError(f"Expected version {expected_version}, got {current}")
            # ... 插入事件逻辑

下载和安装

下载 event-store-design 中文版 Skill ZIP

解压后将目录放入你的 AI 工具 skills 文件夹,重启工具后即可使用。具体路径参考内附的 USAGE.zh.md

你可能还需要

暂无推荐