返回顶部
e

event-store

Design and implement event stores for event-sourced systems. Use when building event sourcing infrastructure, implementing event persistence, projections, snapshotting, or CQRS patterns.

作者: admin | 来源: ClawHub
源自
ClawHub
版本
V 1.0.0
安全检测
已通过
749
下载量
0
收藏
概述
安装方式
版本历史

event-store

# Event Store Guide to designing event stores for event-sourced applications — covering event schemas, projections, snapshotting, and CQRS integration. ## When to Use This Skill - Designing event sourcing infrastructure - Choosing between event store technologies - Implementing custom event stores - Building projections from event streams - Adding snapshotting for aggregate performance - Integrating CQRS with event sourcing ## Core Concepts ### Event Store Architecture ``` ┌─────────────────────────────────────────────────────┐ │ Event Store │ ├─────────────────────────────────────────────────────┤ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │ │ Stream 1 │ │ Stream 2 │ │ Stream 3 │ │ │ │ (Aggregate) │ │ (Aggregate) │ │ (Aggregate) │ │ │ ├─────────────┤ ├─────────────┤ ├─────────────┤ │ │ │ Event 1 │ │ Event 1 │ │ Event 1 │ │ │ │ Event 2 │ │ Event 2 │ │ Event 2 │ │ │ │ Event 3 │ │ ... │ │ Event 3 │ │ │ │ ... │ │ │ │ Event 4 │ │ │ └─────────────┘ └─────────────┘ └─────────────┘ │ ├─────────────────────────────────────────────────────┤ │ Global Position: 1 → 2 → 3 → 4 → 5 → 6 → ... │ └─────────────────────────────────────────────────────┘ ``` ### Event Store Requirements | Requirement | Description | | ----------------- | ---------------------------------- | | **Append-only** | Events are immutable, only appends | | **Ordered** | Per-stream and global ordering | | **Versioned** | Optimistic concurrency control | | **Subscriptions** | Real-time event notifications | | **Idempotent** | Handle duplicate writes safely | ### Technology Comparison | Technology | Best For | Limitations | | ---------------- | ----------------------- | -------------------------------- | | **EventStoreDB** | Pure event sourcing | Single-purpose | | **PostgreSQL** | Existing Postgres stack | Manual implementation | | **Kafka** | High-throughput streams | Not ideal for per-stream queries | | **DynamoDB** | Serverless, AWS-native | Query limitations | ## Event Schema Design Events are the source of truth. Well-designed schemas ensure long-term evolvability. ### Event Envelope Structure ```json { "event_id": "uuid", "stream_id": "Order-abc123", "event_type": "OrderPlaced", "version": 1, "schema_version": 1, "data": { "customer_id": "cust-1", "total_cents": 5000 }, "metadata": { "correlation_id": "req-xyz", "causation_id": "evt-prev", "user_id": "user-1", "timestamp": "2025-01-15T10:30:00Z" }, "global_position": 42 } ``` ### Schema Evolution Rules 1. **Add fields freely** — new optional fields are always safe 2. **Never remove or rename fields** — introduce a new event type instead 3. **Version event types** — `OrderPlacedV2` when the schema changes materially 4. **Upcast on read** — transform old versions to the current shape in the deserializer ## PostgreSQL Event Store Schema ```sql CREATE TABLE events ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), stream_id VARCHAR(255) NOT NULL, stream_type VARCHAR(255) NOT NULL, event_type VARCHAR(255) NOT NULL, event_data JSONB NOT NULL, metadata JSONB DEFAULT '{}', version BIGINT NOT NULL, global_position BIGSERIAL, created_at TIMESTAMPTZ DEFAULT NOW(), CONSTRAINT unique_stream_version UNIQUE (stream_id, version) ); CREATE INDEX idx_events_stream ON events(stream_id, version); CREATE INDEX idx_events_global ON events(global_position); CREATE INDEX idx_events_type ON events(event_type); CREATE TABLE snapshots ( stream_id VARCHAR(255) PRIMARY KEY, stream_type VARCHAR(255) NOT NULL, snapshot_data JSONB NOT NULL, version BIGINT NOT NULL, created_at TIMESTAMPTZ DEFAULT NOW() ); CREATE TABLE subscription_checkpoints ( subscription_id VARCHAR(255) PRIMARY KEY, last_position BIGINT NOT NULL DEFAULT 0, updated_at TIMESTAMPTZ DEFAULT NOW() ); ``` ## Event Store Implementation ```python @dataclass class Event: stream_id: str event_type: str data: dict metadata: dict = field(default_factory=dict) event_id: UUID = field(default_factory=uuid4) version: int | None = None global_position: int | None = None class EventStore: # backed by PostgreSQL schema above def __init__(self, pool: asyncpg.Pool): self.pool = pool async def append(self, stream_id: str, stream_type: str, events: list[Event], expected_version: int | None = None) -> list[Event]: """Append events with optimistic concurrency control.""" async with self.pool.acquire() as conn: async with conn.transaction(): if expected_version is not None: current = await conn.fetchval( "SELECT MAX(version) FROM events " "WHERE stream_id = $1", stream_id ) or 0 if current != expected_version: raise ConcurrencyError( f"Expected {expected_version}, got {current}" ) start = await conn.fetchval( "SELECT COALESCE(MAX(version), 0) + 1 " "FROM events WHERE stream_id = $1", stream_id ) for i, evt in enumerate(events): evt.version = start + i row = await conn.fetchrow( "INSERT INTO events (id, stream_id, stream_type, " "event_type, event_data, metadata, version) " "VALUES ($1,$2,$3,$4,$5,$6,$7) " "RETURNING global_position", evt.event_id, stream_id, stream_type, evt.event_type, json.dumps(evt.data), json.dumps(evt.metadata), evt.version, ) evt.global_position = row["global_position"] return events async def read_stream(self, stream_id: str, from_version: int = 0) -> list[Event]: """Read events for a single stream.""" async with self.pool.acquire() as conn: rows = await conn.fetch( "SELECT * FROM events WHERE stream_id = $1 " "AND version >= $2 ORDER BY version", stream_id, from_version, ) return [self._to_event(r) for r in rows] async def read_all(self, from_position: int = 0, limit: int = 1000) -> list[Event]: """Read global event stream for projections / subscriptions.""" async with self.pool.acquire() as conn: rows = await conn.fetch( "SELECT * FROM events WHERE global_position > $1 " "ORDER BY global_position LIMIT $2", from_position, limit, ) return [self._to_event(r) for r in rows] ``` ## Projections Projections build read-optimised views by replaying events. They are the "Q" side of CQRS. ### Projection Lifecycle 1. **Start from checkpoint** — resume from last processed global position 2. **Apply events** — update the read model for each relevant event type 3. **Save checkpoint** — persist the new position atomically with the read model ### Projection Example ```python class OrderSummaryProjection: def __init__(self, db, event_store: EventStore): self.db = db self.store = event_store async def run(self, batch_size: int = 100): position = await self._load_checkpoint() while True: events = await self.store.read_all(position, batch_size) if not events: await asyncio.sleep(1) continue for evt in events: await self._apply(evt) position = evt.global_position await self._save_checkpoint(position) async def _apply(self, event: Event): match event.event_type: case "OrderPlaced": await self.db.execute( "INSERT INTO order_summaries (id, customer, total, status) " "VALUES ($1,$2,$3,'placed')", event.data["order_id"], event.data["customer_id"], event.data["total_cents"], ) case "OrderShipped": await self.db.execute( "UPDATE order_summaries SET status='shipped' " "WHERE id=$1", event.data["order_id"], ) ``` ### Projection Design Rules - **Idempotent handlers** — replaying the same event twice must not corrupt state - **One projection per read model** — keep projections focused - **Rebuild from scratch** — projections should be deletable and fully replayable - **Separate storage** — projections can live in different databases (Postgres, Elasticsearch, Redis) ## Snapshotting Snapshots accelerate aggregate rehydration by caching state at a known version. Use when streams exceed ~100 events, aggregates have expensive rehydration, or on a cadence (e.g., every 50 events). ### Snapshot Flow ```python class SnapshottedRepository: def __init__(self, event_store: EventStore, pool): self.store = event_store self.pool = pool async def load(self, stream_id: str) -> Aggregate: # 1. Try loading snapshot snap = await self._load_snapshot(stream_id) from_version = 0 aggregate = Aggregate(stream_id) if snap: aggregate.restore(snap["data"]) from_version = snap["version"] + 1 # 2. Replay events after snapshot events = await self.store.read_stream(stream_id, from_version) for evt in events: aggregate.apply(evt) # 3. Snapshot if too many events replayed if len(events) > 50: await self._save_snapshot( stream_id, aggregate.snapshot(), aggregate.version ) return aggregate ``` ## CQRS Integration CQRS separates the write model (commands → events) from the read model (projections). ``` Commands ──► Aggregate ──► Event Store ──► Projections ──► Query API (write) (domain) (append) (build) (read) ``` ### Key Principles 1. **Write side** validates commands, emits events, enforces invariants 2. **Read side** subscribes to events, builds optimised query models 3. **Eventual consistency** — reads may lag behind writes by milliseconds to seconds 4. **Independent scaling** — scale reads and writes separately ### Command Handler Pattern ```python class PlaceOrderHandler: def __init__(self, event_store: EventStore): self.store = event_store async def handle(self, cmd: PlaceOrderCommand): # Load aggregate from events events = await self.store.read_stream(f"Order-{cmd.order_id}") order = Order.reconstitute(events) # Execute command — validates and produces new events new_events = order.place(cmd.customer_id, cmd.items) # Persist with concurrency check await self.store.append( f"Order-{cmd.order_id}", "Order", new_events, expected_version=order.version, ) ``` ## EventStoreDB Integration ```python from esdbclient import EventStoreDBClient, NewEvent, StreamState import json client = EventStoreDBClient(uri="esdb://localhost:2113?tls=false") def append_events(stream_name: str, events: list, expected_revision=None): new_events = [ NewEvent( type=event['type'], data=json.dumps(event['data']).encode(), metadata=json.dumps(event.get('metadata', {})).encode() ) for event in events ] state = (StreamState.ANY if expected_revision is None else StreamState.NO_STREAM if expected_revision == -1 else expected_revision) return client.append_to_stream(stream_name, new_events, current_version=state) def read_stream(stream_name: str, from_revision: int = 0): return [ {'type': e.type, 'data': json.loads(e.data), 'stream_position': e.stream_position} for e in client.get_stream(stream_name, stream_position=from_revision) ] # Category projection: read all events for Order-* streams def read_category(category: str): return read_stream(f"$ce-{category}") ``` ## DynamoDB Event Store ```python import boto3 from boto3.dynamodb.conditions import Key from datetime import datetime import json, uuid class DynamoEventStore: def __init__(self, table_name: str): self.table = boto3.resource('dynamodb').Table(table_name) def append(self, stream_id: str, events: list, expected_version: int = 0): with self.table.batch_writer() as batch: for i, event in enumerate(events): version = expected_version + i + 1 batch.put_item(Item={ 'PK': f"STREAM#{stream_id}", 'SK': f"VERSION#{version:020d}", 'GSI1PK': 'EVENTS', 'GSI1SK': datetime.utcnow().isoformat(), 'event_id': str(uuid.uuid4()), 'event_type': event['type'], 'event_data': json.dumps(event['data']), 'version': version, }) def read_stream(self, stream_id: str, from_version: int = 0): resp = self.table.query( KeyConditionExpression= Key('PK').eq(f"STREAM#{stream_id}") & Key('SK').gte(f"VERSION#{from_version:020d}") ) return [ {'event_type': item['event_type'], 'data': json.loads(item['event_data']), 'version': item['version']} for item in resp['Items'] ] ``` **DynamoDB table design:** PK=`STREAM#{id}`, SK=`VERSION#{version}`, GSI1 for global ordering. ## Best Practices ### Do - **Name streams `{Type}-{id}`** — e.g., `Order-abc123` - **Include correlation / causation IDs** in metadata for tracing - **Version event schemas from day one** — plan for evolution - **Implement idempotent writes** — use event IDs for deduplication - **Index for your query patterns** — stream, global position, event type ### Don't - **Mutate or delete events** — they are immutable facts - **Store large payloads** — keep events small; reference blobs externally - **Skip optimistic concurrency** — prevents data corruption - **Ignore backpressure** — handle slow consumers gracefully - **Couple projections to the write model** — projections should be independently deployable ## NEVER Do - **NEVER update or delete events** — Events are immutable historical facts; create compensating events instead - **NEVER skip version checks on append** — Optimistic concurrency prevents lost updates and corruption - **NEVER embed large blobs in events** — Store blobs externally, reference by ID in the event - **NEVER use random UUIDs for event IDs without idempotency checks** — Retries create duplicates - **NEVER read projections for command validation** — Use the event stream as the source of truth - **NEVER couple projections to the write transaction** — Projections must be rebuildable independently

标签

skill ai

通过对话安装

该技能支持在以下平台通过对话安装:

OpenClaw WorkBuddy QClaw Kimi Claude

方式一:安装 SkillHub 和技能

帮我安装 SkillHub 和 backend-event-stores-1776419953 技能

方式二:设置 SkillHub 为优先技能安装源

设置 SkillHub 为我的优先技能安装源,然后帮我安装 backend-event-stores-1776419953 技能

通过命令行安装

skillhub install backend-event-stores-1776419953

下载 Zip 包

⬇ 下载 event-store v1.0.0

文件大小: 6.68 KB | 发布时间: 2026-4-17 20:09

v1.0.0 最新 2026-4-17 20:09
Initial release with comprehensive guidance on designing and implementing event stores for event-sourced systems.

- Covers core event store concepts, requirements, and architectural patterns
- Provides technology comparisons for EventStoreDB, PostgreSQL, Kafka, and DynamoDB
- Includes recommended event schema/envelope design and schema evolution strategies
- Supplies a sample PostgreSQL schema for events, snapshots, and subscription checkpoints
- Offers code for a basic async Python event store (append, read_stream, read_all)
- Explains projection patterns and their role in building CQRS read models

Archiver·手机版·闲社网·闲社论坛·羊毛社区· 多链控股集团有限公司 · 苏ICP备2025199260号-1

Powered by Discuz! X5.0   © 2024-2025 闲社网·线报更新论坛·羊毛分享社区·http://xianshe.com

p2p_official_large
返回顶部