Event Stores API Reference¶
The event store is the heart of event sourcing. It is an append-only database that stores all domain events as the single source of truth.
What is an Event Store?¶
In traditional systems, you store the current state and lose the history. Event sourcing flips this: you store the sequence of events that led to the current state, then derive the state by replaying those events.
Traditional Database: Event Store:
+------------------+ +----------------------------------+
| orders | | events |
|------------------| |----------------------------------|
| id: 123 | | OrderCreated { id: 123 } |
| status: shipped | | ItemAdded { sku: "ABC" } |
| items: [...] | | ItemAdded { sku: "XYZ" } |
+------------------+ | OrderShipped { tracking: "..." } |
+----------------------------------+
(State only) (Complete history)
The event store provides:
- Durability: Events are persisted and never lost
- Ordering: Events are stored in strict order (both per-stream and global)
- Optimistic concurrency: Prevents concurrent modifications to the same aggregate
- Replay capability: Rebuild any aggregate or projection from events
Quick Start¶
from eventsource import (
# Interface
EventStore,
# Implementations
InMemoryEventStore, # Testing and development
PostgreSQLEventStore, # Production
SQLiteEventStore, # Lightweight/embedded
# Data structures
EventStream,
AppendResult,
StoredEvent,
ReadOptions,
ReadDirection,
ExpectedVersion,
)
Choose your store:
# Development/Testing: In-memory (no persistence)
store = InMemoryEventStore()
# Development/Testing: SQLite (file or memory)
async with SQLiteEventStore("./events.db") as store:
await store.initialize()
# Production: PostgreSQL
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker
engine = create_async_engine("postgresql+asyncpg://localhost/mydb")
session_factory = async_sessionmaker(engine, expire_on_commit=False)
store = PostgreSQLEventStore(session_factory)
Key Concepts¶
Stream Position vs Global Position¶
Every event has two positions:
Stream "Order-123": Global Event Log:
+-------------------+ +--------------------------------+
| Position 1: Created | | Position 1: User-456 Created |
| Position 2: ItemAdd | --> | Position 2: Order-123 Created | <-- stream_position=1
| Position 3: Shipped | | Position 3: Order-123 ItemAdd | <-- stream_position=2
+-------------------+ | Position 4: User-789 Updated |
| Position 5: Order-123 Shipped | <-- stream_position=3
+--------------------------------+
- Stream Position: Order within a single aggregate's events (used for aggregate reconstruction)
- Global Position: Order across ALL events in the store (used for projections and subscriptions)
Why Integer Positions Instead of UUIDs?¶
Events have both an event_id (UUID) and a global_position (integer) because they serve different purposes:
| Field | Type | Purpose |
|---|---|---|
event_id |
UUID | Identity — Unique identifier for deduplication and idempotency checks |
global_position |
Integer | Ordering — Strict sequential ordering for subscriptions and catch-up |
Why not use UUIDs for ordering? Standard UUIDs (v1-v4) have no natural sort order. Even UUID v7 (timestamp-based) can have ordering issues with clock skew across distributed writers. Database-assigned auto-increment integers provide:
- Guaranteed sequential ordering — No gaps, no out-of-order events
- Atomic assignment — The database ensures each position is unique
- Clock-independent — Works correctly with multiple writers across machines
- Watermark compatibility — Subscriptions can reliably track "catch up to position X"
This is the standard pattern used by EventStoreDB, Marten, Axon, and other production event stores.
See the FAQ: Why integers instead of UUIDs? for more details.
Optimistic Concurrency Control¶
When saving events, you specify the expected version. If another process saved events first, you get a conflict:
# Load aggregate at version 3
order = await repo.load(order_id) # version = 3
# Meanwhile, another process saves to version 4...
# Your save fails with OptimisticLockError
await repo.save(order) # expected_version=3, but current is 4!
This prevents lost updates in concurrent systems. See ADR-0003: Optimistic Locking for details.
EventStore Interface¶
The abstract EventStore class defines the contract for all implementations.
append_events()¶
Append events to an aggregate's stream with optimistic locking.
async def append_events(
self,
aggregate_id: UUID,
aggregate_type: str,
events: list[DomainEvent],
expected_version: int,
) -> AppendResult
Parameters:
| Parameter | Description |
|---|---|
aggregate_id |
UUID of the aggregate |
aggregate_type |
Type name (e.g., "Order", "User") |
events |
List of events to append (in order) |
expected_version |
Current version for optimistic locking (0 for new aggregates) |
Example:
from uuid import uuid4
from eventsource import ExpectedVersion, OptimisticLockError
# Create a new aggregate (version 0 -> 1)
result = await store.append_events(
aggregate_id=uuid4(),
aggregate_type="Order",
events=[OrderCreated(...)],
expected_version=0,
)
assert result.success
assert result.new_version == 1
# Update existing aggregate (version 1 -> 2)
result = await store.append_events(
aggregate_id=order_id,
aggregate_type="Order",
events=[OrderShipped(...)],
expected_version=1,
)
# Handle concurrent modification
try:
await store.append_events(...)
except OptimisticLockError as e:
print(f"Expected version {e.expected_version}, but was {e.actual_version}")
# Reload and retry
get_events()¶
Retrieve events for an aggregate to reconstruct its state.
async def get_events(
self,
aggregate_id: UUID,
aggregate_type: str | None = None,
from_version: int = 0,
from_timestamp: datetime | None = None,
to_timestamp: datetime | None = None,
) -> EventStream
Example:
# Get all events for an aggregate
stream = await store.get_events(order_id, "Order")
print(f"Order has {stream.version} events")
for event in stream.events:
aggregate._apply(event)
# Get events after version 5 (for partial replay)
stream = await store.get_events(order_id, "Order", from_version=5)
# Get events within a time range (useful for partitioned tables)
from datetime import datetime, timedelta, UTC
one_week_ago = datetime.now(UTC) - timedelta(days=7)
stream = await store.get_events(
order_id, "Order",
from_timestamp=one_week_ago,
)
get_events_by_type()¶
Get all events for a specific aggregate type (useful for building projections).
async def get_events_by_type(
self,
aggregate_type: str,
tenant_id: UUID | None = None,
from_timestamp: datetime | None = None,
) -> list[DomainEvent]
Example:
from datetime import datetime, UTC, timedelta
# Get all Order events
order_events = await store.get_events_by_type("Order")
# Get Order events for a specific tenant
order_events = await store.get_events_by_type(
"Order",
tenant_id=tenant_uuid,
)
# Get recent events only
one_hour_ago = datetime.now(UTC) - timedelta(hours=1)
recent_events = await store.get_events_by_type(
"Order",
from_timestamp=one_hour_ago,
)
event_exists()¶
Check if an event exists (for idempotency checks).
Example:
# Prevent duplicate processing
if await store.event_exists(event.event_id):
print("Already processed, skipping")
return
get_stream_version()¶
Get the current version of an aggregate without fetching all events.
get_global_position()¶
Get the maximum global position in the event store. Used by subscriptions to determine catch-up completion.
Example:
# Check how far behind a projection is
current_position = await store.get_global_position()
projection_position = await checkpoint_repo.get_position("MyProjection")
lag = current_position - projection_position
print(f"Projection is {lag} events behind")
read_stream() / read_all()¶
Streaming methods for efficient event processing (memory-efficient for large datasets):
# Read a specific aggregate's stream
async for stored_event in store.read_stream("order-123:Order"):
print(f"Event at position {stored_event.stream_position}")
# Read ALL events (for projections)
async for stored_event in store.read_all():
await projection.handle(stored_event.event)
# Read with options
from eventsource.stores import ReadOptions, ReadDirection
options = ReadOptions(
from_position=1000, # Start after position 1000
limit=100, # Process in batches
direction=ReadDirection.FORWARD,
)
async for stored_event in store.read_all(options):
print(f"Global position: {stored_event.global_position}")
InMemoryEventStore¶
Fast, thread-safe in-memory implementation for testing and development.
from eventsource import InMemoryEventStore
store = InMemoryEventStore()
# All EventStore operations work
result = await store.append_events(
aggregate_id=order_id,
aggregate_type="Order",
events=[order_created],
expected_version=0,
)
stream = await store.get_events(order_id, "Order")
Constructor¶
| Parameter | Type | Default | Description |
|---|---|---|---|
enable_tracing |
bool |
True |
Enable OpenTelemetry tracing |
Testing Utilities¶
The in-memory store provides extra methods useful for testing:
# Reset state between tests
await store.clear()
# Get all events (useful for assertions)
all_events = await store.get_all_events()
# Count total events
count = await store.get_event_count()
# List all aggregate IDs
aggregate_ids = await store.get_aggregate_ids()
# Get current global position
position = await store.get_global_position()
Characteristics¶
| Feature | Value |
|---|---|
| Persistence | None (data lost on process exit) |
| Concurrency | Thread-safe via asyncio.Lock |
| Performance | Very fast (all in-memory) |
| Global ordering | Fully supported |
| Tracing | OpenTelemetry (optional) |
When to Use¶
- Unit tests (fast, isolated, deterministic)
- Integration tests (full event store API)
- Development/prototyping (quick iteration)
- Single-process apps with ephemeral state
When NOT to Use¶
- Production (no persistence)
- Distributed systems (no shared state)
- Large event volumes (memory-bound)
PostgreSQLEventStore¶
Production-ready PostgreSQL implementation with ACID guarantees, optimistic locking, and optional transactional outbox.
Setup¶
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker
from eventsource import PostgreSQLEventStore
# Create SQLAlchemy async engine
engine = create_async_engine(
"postgresql+asyncpg://user:pass@localhost/mydb",
pool_size=10,
max_overflow=20,
)
session_factory = async_sessionmaker(engine, expire_on_commit=False)
# Basic setup
store = PostgreSQLEventStore(session_factory)
# Production setup with all features
store = PostgreSQLEventStore(
session_factory,
outbox_enabled=True, # Transactional outbox pattern
enable_tracing=True, # OpenTelemetry tracing
)
Constructor Parameters¶
| Parameter | Type | Default | Description |
|---|---|---|---|
session_factory |
async_sessionmaker |
Required | SQLAlchemy async session factory |
event_registry |
EventRegistry |
Default | Registry for event deserialization |
outbox_enabled |
bool |
False |
Write to outbox table atomically |
enable_tracing |
bool |
True |
OpenTelemetry tracing |
type_converter |
TypeConverter |
None |
Custom type converter |
uuid_fields |
set[str] |
None |
Additional UUID field names |
string_id_fields |
set[str] |
None |
Fields to exclude from UUID detection |
auto_detect_uuid |
bool |
True |
Auto-detect UUIDs by _id suffix |
Factory Methods¶
# Strict UUID detection (no auto-detection, explicit fields only)
store = PostgreSQLEventStore.with_strict_uuid_detection(
session_factory,
uuid_fields={"event_id", "aggregate_id", "tenant_id"},
)
Features¶
| Feature | Description |
|---|---|
| Optimistic locking | Version-based conflict detection with unique constraint backup |
| Idempotent writes | Duplicate events (same event_id) are silently skipped |
| Transactional outbox | Events + outbox written atomically (reliable publishing) |
| OpenTelemetry tracing | Spans for all operations with semantic attributes |
| Partition-aware | Timestamp filters enable partition pruning |
| Multi-tenancy | tenant_id field for data isolation |
| Global ordering | Auto-increment id provides total ordering |
Properties¶
store.session_factory # Get the SQLAlchemy session factory
store.event_registry # Get the event registry
store.outbox_enabled # Check if outbox is enabled
Database Schema¶
The store requires an events table. Create it with this schema:
-- Main events table
CREATE TABLE events (
id BIGSERIAL PRIMARY KEY, -- Global position (auto-increment)
event_id UUID NOT NULL UNIQUE, -- Unique event identifier
event_type VARCHAR(255) NOT NULL, -- e.g., "OrderCreated"
aggregate_type VARCHAR(255) NOT NULL, -- e.g., "Order"
aggregate_id UUID NOT NULL, -- Aggregate identifier
tenant_id VARCHAR(36), -- Multi-tenancy support
actor_id VARCHAR(255), -- Who triggered the event
version INTEGER NOT NULL, -- Stream position (1-based)
timestamp TIMESTAMPTZ NOT NULL, -- When event occurred
payload JSONB NOT NULL, -- Event data
created_at TIMESTAMPTZ DEFAULT NOW(), -- When stored
-- Optimistic locking constraint
CONSTRAINT uq_events_aggregate_version
UNIQUE (aggregate_id, aggregate_type, version)
);
-- Performance indexes
CREATE INDEX idx_events_aggregate ON events(aggregate_id, aggregate_type);
CREATE INDEX idx_events_type ON events(aggregate_type);
CREATE INDEX idx_events_tenant ON events(tenant_id);
CREATE INDEX idx_events_timestamp ON events(timestamp);
For the transactional outbox pattern (when outbox_enabled=True):
CREATE TABLE event_outbox (
id UUID PRIMARY KEY,
event_id UUID NOT NULL,
event_type VARCHAR(255) NOT NULL,
aggregate_id UUID NOT NULL,
aggregate_type VARCHAR(255) NOT NULL,
tenant_id VARCHAR(36),
event_data JSONB NOT NULL,
created_at TIMESTAMPTZ NOT NULL,
status VARCHAR(20) DEFAULT 'pending',
processed_at TIMESTAMPTZ
);
CREATE INDEX idx_outbox_status ON event_outbox(status, created_at);
Performance Considerations¶
Connection Pooling:
engine = create_async_engine(
"postgresql+asyncpg://...",
pool_size=10, # Base pool size
max_overflow=20, # Additional connections under load
pool_recycle=3600, # Recycle connections hourly
)
Partition Pruning:
# For time-partitioned tables, always include timestamp filters
from datetime import datetime, timedelta, UTC
# This enables PostgreSQL to skip irrelevant partitions
stream = await store.get_events(
aggregate_id=order_id,
aggregate_type="Order",
from_timestamp=datetime.now(UTC) - timedelta(days=30),
)
Batch Processing:
# Use read_all with limits for large datasets
options = ReadOptions(from_position=last_position, limit=1000)
async for event in store.read_all(options):
await process(event)
last_position = event.global_position
SQLiteEventStore¶
Lightweight SQLite implementation for development, testing, and embedded applications. Zero configuration required.
Installation¶
Setup¶
from eventsource import SQLiteEventStore
# File-based database (persistent)
async with SQLiteEventStore("./events.db") as store:
await store.initialize()
# ... use store
# In-memory database (perfect for tests)
async with SQLiteEventStore(":memory:") as store:
await store.initialize()
# ... use store
Constructor Parameters¶
| Parameter | Type | Default | Description |
|---|---|---|---|
database |
str |
Required | Path to SQLite file or :memory: |
event_registry |
EventRegistry |
Default | Registry for event deserialization |
wal_mode |
bool |
True |
WAL mode for concurrent read performance |
busy_timeout |
int |
5000 |
Milliseconds to wait when database is locked |
enable_tracing |
bool |
True |
OpenTelemetry tracing |
type_converter |
TypeConverter |
None |
Custom type converter |
uuid_fields |
set[str] |
None |
Additional UUID field names |
string_id_fields |
set[str] |
None |
Fields to exclude from UUID detection |
auto_detect_uuid |
bool |
True |
Auto-detect UUIDs by _id suffix |
Factory Methods¶
# Strict UUID detection (no auto-detection)
store = SQLiteEventStore.with_strict_uuid_detection(
database="./events.db",
uuid_fields={"event_id", "aggregate_id", "tenant_id"},
)
Context Manager Usage¶
Always use the async context manager:
# Recommended: automatic cleanup
async with SQLiteEventStore("./events.db") as store:
await store.initialize()
# ... use store
# Connection automatically closed
# Manual (if needed)
store = SQLiteEventStore("./events.db")
await store._connect()
await store.initialize()
try:
# ... use store
finally:
await store.close()
Properties¶
store.database # "./events.db"
store.event_registry # EventRegistry instance
store.is_connected # True if connection is open
store.wal_mode # True/False
store.busy_timeout # Timeout in ms
Features¶
| Feature | Description |
|---|---|
| Optimistic locking | Version-based conflict detection |
| Idempotent writes | Duplicate events silently skipped |
| WAL mode | Better concurrent read performance |
| Multi-tenancy | tenant_id field for isolation |
| OpenTelemetry tracing | Optional performance monitoring |
| Auto schema creation | initialize() creates tables |
SQLite Type Adaptations¶
SQLite stores all data as TEXT, but the store handles conversion transparently:
| Python Type | SQLite Storage | Notes |
|---|---|---|
UUID |
TEXT | 36-char hyphenated format |
datetime |
TEXT | ISO 8601 format |
dict (payload) |
TEXT | JSON serialized |
int (id) |
INTEGER | Auto-increment |
Database Schema¶
Created automatically by initialize():
CREATE TABLE events (
id INTEGER PRIMARY KEY AUTOINCREMENT, -- Global position
event_id TEXT NOT NULL UNIQUE,
event_type TEXT NOT NULL,
aggregate_type TEXT NOT NULL,
aggregate_id TEXT NOT NULL,
tenant_id TEXT,
actor_id TEXT,
version INTEGER NOT NULL, -- Stream position
timestamp TEXT NOT NULL,
payload TEXT NOT NULL,
created_at TEXT NOT NULL,
UNIQUE(aggregate_id, aggregate_type, version)
);
CREATE INDEX idx_events_aggregate ON events(aggregate_id, aggregate_type);
CREATE INDEX idx_events_type ON events(aggregate_type);
CREATE INDEX idx_events_tenant ON events(tenant_id);
CREATE INDEX idx_events_timestamp ON events(timestamp);
Limitations¶
| Limitation | Impact | Mitigation |
|---|---|---|
| Single writer | Only one concurrent write | Use PostgreSQL for high write loads |
| No network sharing | Cannot access from multiple machines | Use PostgreSQL for distributed systems |
| Busy timeout | Writes may fail under contention | Increase busy_timeout or retry |
When to Use¶
- Local development (no database setup)
- Unit and integration tests (fast, isolated)
- CI/CD pipelines
- Single-instance deployments
- Embedded/edge applications
- Desktop applications
When NOT to Use¶
- High-throughput production
- Multi-instance deployments
- Heavy concurrent writes
- Distributed systems
Example¶
import asyncio
from uuid import uuid4
from eventsource import SQLiteEventStore
async def main():
async with SQLiteEventStore("./app.db") as store:
await store.initialize()
order_id = uuid4()
# Append events
result = await store.append_events(
aggregate_id=order_id,
aggregate_type="Order",
events=[OrderCreated(aggregate_id=order_id, ...)],
expected_version=0,
)
print(f"Version: {result.new_version}, Position: {result.global_position}")
# Read events
stream = await store.get_events(order_id, "Order")
print(f"Order has {stream.version} events")
# Stream all events
async for stored in store.read_all():
print(f"[{stored.global_position}] {stored.event_type}")
asyncio.run(main())
See SQLite Backend Guide for comprehensive usage patterns.
Choosing an Event Store¶
Use this comparison to select the right store for your use case:
| Feature | InMemoryEventStore | SQLiteEventStore | PostgreSQLEventStore |
|---|---|---|---|
| Persistence | None | File or memory | Full ACID |
| Setup complexity | None | None | Database required |
| Concurrency | Single process | Single writer | Full multi-writer |
| Performance | Fastest | Fast | Production-grade |
| Scalability | None | Limited | Horizontal |
| Distributed | No | No | Yes |
| Multi-tenancy | Yes | Yes | Yes |
| Tracing | Optional | Optional | Optional |
| Best for | Tests | Dev/Embedded | Production |
Decision Guide¶
Need persistence?
├── No → InMemoryEventStore
└── Yes → Need concurrent writes from multiple instances?
├── No → SQLiteEventStore
└── Yes → PostgreSQLEventStore
Migration Path¶
All stores implement the same EventStore interface, making migration easy:
# Development
store = InMemoryEventStore()
# Testing with persistence
store = SQLiteEventStore(":memory:")
# Production
store = PostgreSQLEventStore(session_factory)
# Your code doesn't change!
repo = AggregateRepository(event_store=store, ...)
TypeConverter¶
When events are stored, UUIDs and datetimes become JSON strings. The TypeConverter handles converting them back to proper Python types during deserialization.
from eventsource.stores import (
TypeConverter, # Protocol
DefaultTypeConverter, # Default implementation
DEFAULT_UUID_FIELDS, # Default UUID field names
DEFAULT_STRING_ID_FIELDS, # Default exclusions
)
TypeConverter Protocol¶
from typing import Any, Protocol, runtime_checkable
@runtime_checkable
class TypeConverter(Protocol):
"""Protocol for type conversion during event deserialization."""
def convert_types(self, data: Any) -> Any:
"""Recursively convert string types to proper Python types."""
...
def is_uuid_field(self, key: str) -> bool:
"""Determine if a field should be treated as UUID."""
...
def is_datetime_field(self, key: str) -> bool:
"""Determine if a field should be treated as datetime."""
...
DefaultTypeConverter¶
The default implementation with configurable UUID detection.
Constructor Parameters¶
| Parameter | Type | Default | Description |
|---|---|---|---|
uuid_fields |
set[str] |
None |
Additional field names to treat as UUIDs |
string_id_fields |
set[str] |
None |
Field names that should NOT be treated as UUIDs |
auto_detect_uuid |
bool |
True |
Whether _id suffix triggers UUID detection |
use_defaults |
bool |
True |
Whether to merge with default field sets |
Default Field Sets¶
DEFAULT_UUID_FIELDS - Fields always converted to UUID:
{"event_id", "aggregate_id", "tenant_id", "correlation_id",
"causation_id", "template_id", "issuance_id", "user_id"}
DEFAULT_STRING_ID_FIELDS - Fields never converted to UUID (exclusions):
{"actor_id", "issuer_id", "recipient_id", "invited_by",
"assigned_by", "revoked_by", "deactivated_by", "reactivated_by", "removed_by"}
UUID Detection Logic¶
- If field name is in
uuid_fields, returnTrue - If field name is in
string_id_fields, returnFalse - If
auto_detect_uuidisTrueand field name ends with_id, returnTrue - Otherwise, return
False
Datetime Detection Logic¶
- If field name is
"occurred_at", returnTrue - If field name ends with
"_at", returnTrue - Otherwise, return
False
Usage Examples¶
Default Configuration¶
from eventsource.stores import DefaultTypeConverter
converter = DefaultTypeConverter()
data = {"event_id": "550e8400-e29b-41d4-a716-446655440000"}
result = converter.convert_types(data)
# result["event_id"] is now a UUID object
Custom UUID Fields¶
converter = DefaultTypeConverter(
uuid_fields={"reference_id", "source_id"},
)
# reference_id and source_id will now be converted to UUID
Excluding String ID Fields¶
converter = DefaultTypeConverter(
string_id_fields={"stripe_customer_id", "external_user_id"},
)
# These fields will remain as strings even though they end in _id
Strict Mode (No Auto-Detection)¶
converter = DefaultTypeConverter.strict(
uuid_fields={"event_id", "aggregate_id", "tenant_id"},
)
# ONLY these three fields will be converted to UUID
# No auto-detection based on _id suffix
Using with Event Stores¶
Both PostgreSQLEventStore and SQLiteEventStore support type converter configuration:
# Direct converter injection
converter = DefaultTypeConverter(uuid_fields={"custom_reference_id"})
store = PostgreSQLEventStore(session_factory, type_converter=converter)
# Or via constructor parameters (creates converter internally)
store = PostgreSQLEventStore(
session_factory,
uuid_fields={"custom_reference_id"},
string_id_fields={"external_id"},
auto_detect_uuid=True,
)
# Strict mode via factory method
store = SQLiteEventStore.with_strict_uuid_detection(
database="./events.db",
uuid_fields={"event_id", "aggregate_id", "tenant_id"},
)
Custom TypeConverter¶
Implement the protocol for custom type conversion:
class MyCustomConverter:
"""Custom converter for company-specific field naming."""
def convert_types(self, data: Any) -> Any:
if isinstance(data, dict):
result = {}
for key, value in data.items():
if isinstance(value, str) and self.is_uuid_field(key):
try:
result[key] = UUID(value)
except ValueError:
result[key] = value
elif isinstance(value, dict):
result[key] = self.convert_types(value)
else:
result[key] = value
return result
return data
def is_uuid_field(self, key: str) -> bool:
# Company convention: UUIDs end with _uuid
return key.endswith("_uuid")
def is_datetime_field(self, key: str) -> bool:
# Company convention: timestamps end with _timestamp
return key.endswith("_timestamp")
# Use with any event store
store = PostgreSQLEventStore(session_factory, type_converter=MyCustomConverter())
Data Structures¶
EventStream¶
Container for events belonging to an aggregate.
@dataclass(frozen=True)
class EventStream:
aggregate_id: UUID
aggregate_type: str
events: list[DomainEvent]
version: int
@property
def is_empty(self) -> bool: ...
@property
def latest_event(self) -> DomainEvent | None: ...
@classmethod
def empty(cls, aggregate_id: UUID, aggregate_type: str) -> EventStream: ...
AppendResult¶
Result of appending events.
@dataclass(frozen=True)
class AppendResult:
success: bool
new_version: int
global_position: int = 0
conflict: bool = False
@classmethod
def successful(cls, new_version: int, global_position: int = 0) -> AppendResult: ...
@classmethod
def conflicted(cls, current_version: int) -> AppendResult: ...
StoredEvent¶
Wrapper for persisted events with position metadata.
@dataclass(frozen=True)
class StoredEvent:
event: DomainEvent
stream_id: str
stream_position: int
global_position: int
stored_at: datetime
@property
def event_id(self) -> UUID: ...
@property
def event_type(self) -> str: ...
@property
def aggregate_id(self) -> UUID: ...
ReadOptions¶
Configuration for reading events.
@dataclass(frozen=True)
class ReadOptions:
direction: ReadDirection = ReadDirection.FORWARD
from_position: int = 0
limit: int | None = None
from_timestamp: datetime | None = None
to_timestamp: datetime | None = None
ExpectedVersion¶
Constants for version expectations.
class ExpectedVersion:
ANY: int = -1 # Skip version check
NO_STREAM: int = 0 # Stream must not exist
STREAM_EXISTS: int = -2 # Stream must exist
Usage:
# New aggregate (must not exist)
await store.append_events(
aggregate_id=uuid4(),
aggregate_type="Order",
events=[order_created],
expected_version=ExpectedVersion.NO_STREAM,
)
# Update existing (must exist)
await store.append_events(
aggregate_id=existing_id,
aggregate_type="Order",
events=[order_shipped],
expected_version=ExpectedVersion.STREAM_EXISTS,
)
# Don't check version
await store.append_events(
aggregate_id=any_id,
aggregate_type="Order",
events=[event],
expected_version=ExpectedVersion.ANY,
)
Error Handling¶
OptimisticLockError¶
Raised when version check fails.
from eventsource import OptimisticLockError
try:
await store.append_events(
aggregate_id=order_id,
aggregate_type="Order",
events=[event],
expected_version=5,
)
except OptimisticLockError as e:
print(f"Aggregate: {e.aggregate_id}")
print(f"Expected version: {e.expected_version}")
print(f"Actual version: {e.actual_version}")
# Retry with fresh state
Best Practices¶
Use Repository Pattern¶
Prefer the repository over direct store access. The repository handles optimistic locking automatically:
# Recommended: Repository pattern
repo = AggregateRepository(event_store=store, ...)
order = await repo.load(order_id)
order.ship(tracking_number)
await repo.save(order) # Handles versioning internally
# Direct store access (lower-level, manual versioning)
stream = await store.get_events(order_id, "Order")
# ... create events manually ...
await store.append_events(..., expected_version=stream.version)
Handle Concurrency Conflicts¶
Always handle OptimisticLockError with retry logic:
from eventsource import OptimisticLockError
MAX_RETRIES = 3
for attempt in range(MAX_RETRIES):
try:
order = await repo.load(order_id)
order.ship(tracking_number)
await repo.save(order)
break
except OptimisticLockError:
if attempt == MAX_RETRIES - 1:
raise # Give up after max retries
# Reload and retry
continue
Idempotency¶
Use unique event IDs to prevent duplicate processing:
# Each event has a unique event_id
if await store.event_exists(event.event_id):
return # Already processed, skip
# Or use deterministic event IDs for idempotent commands
event_id = uuid5(NAMESPACE, f"{command_id}:{command_type}")
Batch Processing for Projections¶
Use streaming methods for memory efficiency:
# Process in batches
last_position = await checkpoint_repo.get_position("MyProjection")
options = ReadOptions(from_position=last_position, limit=1000)
async for stored_event in store.read_all(options):
await projection.handle(stored_event.event)
last_position = stored_event.global_position
await checkpoint_repo.save_position("MyProjection", last_position)
Timestamp Filtering for Partitioned Tables¶
Include timestamp bounds to enable partition pruning in PostgreSQL:
from datetime import datetime, timedelta, UTC
# Instead of scanning all partitions
stream = await store.get_events(order_id, "Order")
# Scan only relevant partitions
recent = datetime.now(UTC) - timedelta(days=30)
stream = await store.get_events(
order_id, "Order",
from_timestamp=recent,
)
Testing Best Practices¶
import pytest
from eventsource import InMemoryEventStore, SQLiteEventStore
# Fast unit tests: InMemoryEventStore
@pytest.fixture
def store():
return InMemoryEventStore()
# Integration tests with persistence: SQLiteEventStore in-memory
@pytest.fixture
async def store():
async with SQLiteEventStore(":memory:") as store:
await store.initialize()
yield store
# Reset between tests
@pytest.fixture(autouse=True)
async def reset_store(store):
yield
if hasattr(store, 'clear'):
await store.clear()
See Also¶
- Architecture Overview - How event stores fit in the system
- ADR-0003: Optimistic Locking - Design rationale
- ADR-0007: Remove Sync Event Store - Async-only design
- SQLite Backend Guide - SQLite patterns and configuration
- Repository Pattern Guide - Higher-level aggregate access
- Subscriptions Guide - Processing events from the store
- SQLite Usage Examples - Practical code examples