API Reference¶
This section provides comprehensive API documentation for the eventsource library. The library implements the event sourcing pattern with a focus on type safety, async-first design, and developer ergonomics.
Quick Reference¶
All Public Classes and Functions¶
| Module | Class/Function | Description |
|---|---|---|
| Events | DomainEvent |
Base class for all domain events |
EventRegistry |
Maps event type names to classes for deserialization | |
@register_event |
Decorator to register event types | |
get_event_class |
Look up event class by type name | |
is_event_registered |
Check if event type is registered | |
list_registered_events |
List all registered event types | |
| Stores | EventStore |
Abstract interface for event persistence |
InMemoryEventStore |
In-memory store for testing | |
PostgreSQLEventStore |
Production PostgreSQL store | |
SQLiteEventStore |
SQLite store for dev/testing/embedded | |
EventStream |
Container for aggregate events | |
AppendResult |
Result of appending events | |
StoredEvent |
Wrapper for persisted events | |
ExpectedVersion |
Constants for version expectations | |
| Aggregates | AggregateRoot |
Base class for event-sourced aggregates |
DeclarativeAggregate |
Aggregate with decorator-based handlers | |
AggregateRepository |
Repository pattern for aggregates | |
| Projections | Projection |
Simple async projection base class |
DeclarativeProjection |
Projection with @handles decorators |
|
CheckpointTrackingProjection |
Projection with checkpoints and DLQ | |
@handles |
Decorator for event handlers | |
ProjectionCoordinator |
Coordinates multiple projections | |
| Bus | EventBus |
Abstract interface for event distribution |
InMemoryEventBus |
In-memory bus for single-process apps | |
RedisEventBus |
Distributed bus using Redis Streams | |
RedisEventBusConfig |
Configuration for Redis bus | |
EventHandler |
Protocol for event handlers | |
EventSubscriber |
Protocol for subscribers with declared types | |
| Snapshots | Snapshot |
Point-in-time aggregate state capture |
SnapshotStore |
Abstract interface for snapshot storage | |
InMemorySnapshotStore |
In-memory store for testing | |
PostgreSQLSnapshotStore |
Production PostgreSQL store | |
SQLiteSnapshotStore |
SQLite store for dev/embedded | |
| Observability | OTEL_AVAILABLE |
OpenTelemetry availability constant |
get_tracer |
Get OpenTelemetry tracer if available | |
should_trace |
Check if tracing should be active | |
@traced |
Decorator for method-level tracing | |
Tracer |
Protocol for composition-based tracing | |
create_tracer |
Factory for creating tracers | |
| Subscriptions | SubscriptionManager |
Main entry point for subscription management |
Subscription |
Individual subscription state machine | |
SubscriptionConfig |
Configuration for subscriptions | |
CatchUpRunner |
Historical event processing | |
LiveRunner |
Real-time event processing | |
TransitionCoordinator |
Catch-up to live transition | |
FlowController |
Backpressure management | |
ShutdownCoordinator |
Graceful shutdown coordination | |
CircuitBreaker |
Circuit breaker for resilience | |
EventFilter |
Event filtering support |
Module Overview¶
Events Module¶
The foundation of event sourcing - immutable records of state changes.
from eventsource import (
DomainEvent,
register_event,
EventRegistry,
get_event_class,
get_event_class_or_none,
is_event_registered,
list_registered_events,
)
Key Components:
-
DomainEvent: Base class for all domain events. Provides metadata fields (event_id, occurred_at, correlation_id), aggregate tracking, and serialization methods. -
EventRegistry: Maps event type names to event classes, enabling proper deserialization from storage. -
@register_event: Decorator that registers an event class with the default registry.
Example:
@register_event
class OrderCreated(DomainEvent):
event_type: str = "OrderCreated"
aggregate_type: str = "Order"
customer_id: UUID
total: float
Read the full Events API documentation
Event Stores Module¶
Persistent storage for events with optimistic locking and streaming support.
from eventsource import (
# Interface
EventStore,
# Implementations
InMemoryEventStore,
PostgreSQLEventStore,
SQLiteEventStore,
# Data structures
EventStream,
AppendResult,
StoredEvent,
ReadOptions,
ReadDirection,
ExpectedVersion,
)
Key Components:
-
EventStore: Abstract interface defining append_events, get_events, and streaming methods. -
InMemoryEventStore: Thread-safe in-memory implementation for testing and development. -
SQLiteEventStore: Lightweight SQLite implementation for development, testing, and embedded applications. -
PostgreSQLEventStore: Production-ready store with optimistic locking, transactional outbox, and OpenTelemetry tracing.
Example:
# Development (no persistence)
store = InMemoryEventStore()
# Development/Testing (SQLite)
async with SQLiteEventStore("./events.db") as store:
await store.initialize()
# ... use store
# Production (PostgreSQL)
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker
engine = create_async_engine("postgresql+asyncpg://user:pass@localhost/db")
session_factory = async_sessionmaker(engine, expire_on_commit=False)
store = PostgreSQLEventStore(session_factory, outbox_enabled=True)
Read the full Event Stores API documentation
Aggregates Module¶
Consistency boundaries that enforce business rules and emit events.
from eventsource import (
AggregateRoot,
DeclarativeAggregate,
AggregateRepository,
handles, # For DeclarativeAggregate
)
Key Components:
-
AggregateRoot: Generic base class for aggregates. Manages state, tracks uncommitted events, and handles version management. -
DeclarativeAggregate: Alternative to AggregateRoot using@handlesdecorators for cleaner event application. -
AggregateRepository: Repository pattern for loading aggregates from event streams and saving new events. Supports event publishing, snapshot integration, and optimistic locking.
See the Repository Pattern Guide for comprehensive usage patterns.
Example:
class OrderAggregate(AggregateRoot[OrderState]):
aggregate_type = "Order"
def _get_initial_state(self) -> OrderState:
return OrderState(order_id=self.aggregate_id)
def _apply(self, event: DomainEvent) -> None:
if isinstance(event, OrderCreated):
self._state = OrderState(
order_id=self.aggregate_id,
customer_id=event.customer_id,
status="created",
)
def create(self, customer_id: UUID) -> None:
if self.version > 0:
raise ValueError("Order already exists")
self.apply_event(OrderCreated(
aggregate_id=self.aggregate_id,
customer_id=customer_id,
aggregate_version=self.get_next_version(),
))
Read the full Aggregates API documentation
Projections Module¶
Build read-optimized views from event streams.
from eventsource.projections import (
# Base classes
Projection,
DeclarativeProjection,
CheckpointTrackingProjection,
# Decorators
handles,
# Coordinator
ProjectionCoordinator,
)
Key Components:
-
Projection: Simple abstract base class for async projections. -
DeclarativeProjection: Uses@handlesdecorators for clean event routing. -
CheckpointTrackingProjection: Full-featured projection with automatic checkpointing, retry logic, and dead letter queue support. -
ProjectionCoordinator: Coordinates multiple projections for batch event processing.
Example:
class OrderSummaryProjection(DeclarativeProjection):
@handles(OrderCreated)
async def _handle_created(self, event: OrderCreated) -> None:
await self._db.execute(
"INSERT INTO orders (id, customer_id, status) VALUES ($1, $2, $3)",
event.aggregate_id, event.customer_id, "created"
)
@handles(OrderShipped)
async def _handle_shipped(self, event: OrderShipped) -> None:
await self._db.execute(
"UPDATE orders SET status = 'shipped' WHERE id = $1",
event.aggregate_id
)
async def _truncate_read_models(self) -> None:
await self._db.execute("TRUNCATE TABLE orders")
Read the full Projections API documentation
Event Bus Module¶
Distribute events to subscribers in real-time.
from eventsource import (
# Interface
EventBus,
EventHandler,
EventSubscriber,
EventHandlerFunc,
AsyncEventHandler,
# Implementations
InMemoryEventBus,
RedisEventBus,
RedisEventBusConfig,
)
Key Components:
-
EventBus: Abstract interface for event publishing and subscription. -
InMemoryEventBus: Single-process event bus with wildcard subscriptions and background task management. -
RedisEventBus: Distributed event bus using Redis Streams with consumer groups and reliable delivery.
Example:
# Setup
bus = InMemoryEventBus()
# Subscribe handlers
bus.subscribe(OrderCreated, notification_handler)
bus.subscribe_all(analytics_projection) # Subscribes to all declared types
bus.subscribe_to_all_events(audit_logger) # Wildcard subscription
# Connect to repository for automatic publishing
repo = AggregateRepository(
event_store=store,
aggregate_factory=OrderAggregate,
aggregate_type="Order",
event_publisher=bus, # Events published after save
)
💡 Tip: For production projections, use SubscriptionManager instead of direct
subscribe_all()to get historical catch-up, checkpoint tracking, and health monitoring.
Read the full Event Bus API documentation
Snapshots Module¶
Optimize aggregate loading with point-in-time state snapshots.
from eventsource.snapshots import (
# Core types
Snapshot,
SnapshotStore,
# Implementations
InMemorySnapshotStore,
PostgreSQLSnapshotStore,
SQLiteSnapshotStore,
# Exceptions
SnapshotError,
SnapshotDeserializationError,
SnapshotSchemaVersionError,
)
Key Components:
-
Snapshot: Immutable data structure representing captured aggregate state at a point in time. -
SnapshotStore: Abstract interface for snapshot persistence with upsert semantics. -
InMemorySnapshotStore: Thread-safe in-memory implementation for testing. -
PostgreSQLSnapshotStore: Production-ready store with OpenTelemetry tracing. -
SQLiteSnapshotStore: Lightweight implementation for embedded deployments.
Example:
from eventsource import AggregateRepository
from eventsource.snapshots import PostgreSQLSnapshotStore
snapshot_store = PostgreSQLSnapshotStore(session_factory)
repo = AggregateRepository(
event_store=event_store,
aggregate_factory=OrderAggregate,
aggregate_type="Order",
snapshot_store=snapshot_store,
snapshot_threshold=100, # Snapshot every 100 events
snapshot_mode="background", # Non-blocking
)
# Load uses snapshot if available
order = await repo.load(order_id)
Read the full Snapshots API documentation
Observability Module¶
Reusable OpenTelemetry tracing utilities for consistent observability.
from eventsource.observability import (
# Constants
OTEL_AVAILABLE,
# Helpers
get_tracer,
should_trace,
# Decorator
traced,
# Composition-based Tracer API
Tracer,
create_tracer,
NullTracer,
MockTracer,
)
Key Components:
-
OTEL_AVAILABLE: Boolean constant indicating OpenTelemetry availability (single source of truth). -
get_tracer(): Safely obtain an OpenTelemetry tracer, returning None if unavailable. -
@traced: Decorator for adding tracing to methods with minimal boilerplate. -
create_tracer(): Factory function to create appropriate tracer based on configuration. -
Tracer: Protocol for composition-based tracing withspan()method.
Example:
from eventsource.observability import traced, create_tracer
class MyStore:
def __init__(self, enable_tracing: bool = True):
self._tracer = create_tracer(__name__, enable_tracing)
self._enable_tracing = self._tracer.enabled
@traced("my_store.save")
async def save(self, item_id: str) -> None:
# Automatically traced
await self._do_save(item_id)
async def load(self, item_id: str) -> dict:
# Dynamic attributes
with self._tracer.span(
"my_store.load",
{"item.id": item_id},
):
return await self._do_load(item_id)
Read the full Observability API documentation
Subscriptions Module¶
Unified catch-up and live event subscription management with resilience features.
from eventsource.subscriptions import (
# Core classes
SubscriptionManager,
SubscriptionConfig,
Subscription,
SubscriptionState,
SubscriptionStatus,
# Runners
CatchUpRunner,
LiveRunner,
TransitionCoordinator,
# Resilience
FlowController,
ShutdownCoordinator,
CircuitBreaker,
RetryConfig,
# Filtering
EventFilter,
# Error handling
SubscriptionErrorHandler,
ErrorHandlerRegistry,
# Health
ManagerHealth,
ReadinessStatus,
LivenessStatus,
)
Key Components:
-
SubscriptionManager: Main entry point for managing catch-up and live event subscriptions. Handles subscription lifecycle, graceful shutdown, and health monitoring. -
SubscriptionConfig: Configuration for subscriptions including start position, batch size, backpressure settings, checkpoint strategy, and retry/circuit breaker settings. -
CatchUpRunner: Reads historical events from the event store in batches with checkpointing. -
LiveRunner: Receives real-time events from the event bus with duplicate detection. -
TransitionCoordinator: Coordinates the transition from catch-up to live using a watermark approach to ensure no events are lost. -
FlowController: Backpressure management using semaphore-based flow control. -
ShutdownCoordinator: Signal-aware graceful shutdown with phased draining.
Example:
from eventsource.subscriptions import SubscriptionManager, SubscriptionConfig
# Create manager
manager = SubscriptionManager(event_store, event_bus, checkpoint_repo)
# Register subscribers
await manager.subscribe(
my_projection,
SubscriptionConfig(start_from="beginning", batch_size=500)
)
# Run until shutdown signal (SIGTERM/SIGINT)
result = await manager.run_until_shutdown()
Read the full Subscriptions API documentation
Common Import Patterns¶
Minimal Import (Most Common Use Case)¶
from eventsource import (
# Events
DomainEvent,
register_event,
# Aggregates
AggregateRoot,
AggregateRepository,
# Stores
InMemoryEventStore,
)
Production Setup¶
from eventsource import (
# Events
DomainEvent,
register_event,
# Aggregates
AggregateRoot,
AggregateRepository,
# Stores
PostgreSQLEventStore,
ExpectedVersion,
OptimisticLockError,
# Bus
InMemoryEventBus, # or RedisEventBus
)
from eventsource.projections import (
DeclarativeProjection,
handles,
)
Full-Featured Setup¶
from eventsource import (
# Events
DomainEvent,
register_event,
EventRegistry,
get_event_class,
EventTypeNotFoundError,
DuplicateEventTypeError,
# Aggregates
AggregateRoot,
DeclarativeAggregate,
AggregateRepository,
AggregateNotFoundError,
# Stores
EventStore,
InMemoryEventStore,
PostgreSQLEventStore,
EventStream,
AppendResult,
StoredEvent,
ReadOptions,
ExpectedVersion,
OptimisticLockError,
# Bus
EventBus,
InMemoryEventBus,
RedisEventBus,
RedisEventBusConfig,
EventHandler,
EventSubscriber,
)
from eventsource.projections import (
Projection,
SyncProjection,
DeclarativeProjection,
CheckpointTrackingProjection,
handles,
ProjectionCoordinator,
)
from eventsource.repositories import (
CheckpointRepository,
DLQRepository,
PostgreSQLCheckpointRepository,
PostgreSQLDLQRepository,
SQLiteCheckpointRepository,
SQLiteOutboxRepository,
SQLiteDLQRepository,
InMemoryCheckpointRepository,
InMemoryDLQRepository,
)
from eventsource.subscriptions import (
SubscriptionManager,
SubscriptionConfig,
Subscription,
SubscriptionState,
FlowController,
ShutdownCoordinator,
CircuitBreaker,
RetryConfig,
EventFilter,
)
Quick Start Example¶
Here is a minimal working example demonstrating the core API:
import asyncio
from uuid import UUID, uuid4
from pydantic import BaseModel
from eventsource import (
DomainEvent,
register_event,
AggregateRoot,
InMemoryEventStore,
AggregateRepository,
)
# 1. Define events
@register_event
class OrderCreated(DomainEvent):
event_type: str = "OrderCreated"
aggregate_type: str = "Order"
customer_id: UUID
@register_event
class OrderCompleted(DomainEvent):
event_type: str = "OrderCompleted"
aggregate_type: str = "Order"
# 2. Define state
class OrderState(BaseModel):
order_id: UUID
customer_id: UUID | None = None
status: str = "draft"
# 3. Define aggregate
class OrderAggregate(AggregateRoot[OrderState]):
aggregate_type = "Order"
def _get_initial_state(self) -> OrderState:
return OrderState(order_id=self.aggregate_id)
def _apply(self, event: DomainEvent) -> None:
if isinstance(event, OrderCreated):
self._state = OrderState(
order_id=self.aggregate_id,
customer_id=event.customer_id,
status="created",
)
elif isinstance(event, OrderCompleted):
if self._state:
self._state = self._state.model_copy(update={"status": "completed"})
def create(self, customer_id: UUID) -> None:
if self.version > 0:
raise ValueError("Order already exists")
self.apply_event(OrderCreated(
aggregate_id=self.aggregate_id,
customer_id=customer_id,
aggregate_version=self.get_next_version(),
))
def complete(self) -> None:
if not self.state or self.state.status != "created":
raise ValueError("Cannot complete order")
self.apply_event(OrderCompleted(
aggregate_id=self.aggregate_id,
aggregate_version=self.get_next_version(),
))
# 4. Use it
async def main():
store = InMemoryEventStore()
repo = AggregateRepository(store, OrderAggregate, "Order")
# Create and save
order_id = uuid4()
order = repo.create_new(order_id)
order.create(customer_id=uuid4())
await repo.save(order)
# Load, modify, save
order = await repo.load(order_id)
order.complete()
await repo.save(order)
print(f"Order {order_id}: {order.state.status}")
asyncio.run(main())
Error Handling¶
The library provides specific exception types for common error cases:
| Exception | Module | Description |
|---|---|---|
EventTypeNotFoundError |
Events | Event type not registered |
DuplicateEventTypeError |
Events | Event type already registered |
OptimisticLockError |
Stores | Concurrent modification detected |
AggregateNotFoundError |
Aggregates | No events found for aggregate |
RedisNotAvailableError |
Bus | Redis package not installed |
from eventsource import (
EventTypeNotFoundError,
DuplicateEventTypeError,
OptimisticLockError,
AggregateNotFoundError,
)
# Handle optimistic lock conflicts
try:
await repo.save(order)
except OptimisticLockError as e:
# Reload and retry
order = await repo.load(e.aggregate_id)
order.complete()
await repo.save(order)
Type Safety¶
The library is fully typed and works with mypy in strict mode. Key type patterns:
# Generic aggregate with typed state
class OrderAggregate(AggregateRoot[OrderState]):
...
# Typed event handlers
@handles(OrderCreated)
async def _handle_created(self, event: OrderCreated) -> None:
...
# Protocol-based handlers
class MyHandler(EventHandler):
async def handle(self, event: DomainEvent) -> None:
...
See Also¶
Guides¶
- Getting Started Guide - Step-by-step tutorial for new users
- Architecture Overview - System design and data flow
- Repository Pattern Guide - Comprehensive repository usage
- Snapshotting Guide - Optimize aggregate load performance
- Snapshotting Migration Guide - Add snapshotting to existing projects
- Multi-Tenant Guide - Multi-tenancy support
- Error Handling Guide - Error handling patterns
- Production Guide - Production deployment
Examples¶
- Basic Order Example - Complete order management example
- Snapshotting Example - Snapshot configuration and usage
- Projections Example - Building read models
- Multi-Tenant Example - Tenant isolation
- Testing Example - Testing strategies
Architecture Decision Records¶
- ADR-0001: Async-First Design - Why all I/O is async
- ADR-0002: Pydantic Event Models - Event serialization approach
- ADR-0003: Optimistic Locking - Concurrency control strategy
- ADR-0004: Projection Error Handling - Retry and DLQ patterns
- ADR-0005: API Design Patterns - Public API conventions
- ADR-0006: Event Registry - Event type registration