Code Structure Guide¶
This guide explains the eventsource package organization, module responsibilities, and how the components interact.
Overview¶
The eventsource library follows a modular architecture designed around the core concepts of event sourcing:
- Events are the source of truth
- Aggregates maintain consistency boundaries
- Stores persist events durably
- Projections build read models from events
- Buses distribute events to interested consumers
The package uses the src layout pattern with all source code under src/eventsource/.
Package Structure¶
src/eventsource/
├── __init__.py # Public API exports (version 0.1.0)
├── types.py # Shared type definitions
├── exceptions.py # Exception hierarchy
├── config.py # Configuration utilities
├── py.typed # PEP 561 marker for type hints
│
├── events/ # Event foundation layer
│ ├── __init__.py
│ ├── base.py # DomainEvent base class
│ └── registry.py # Event type registration
│
├── stores/ # Event persistence layer
│ ├── __init__.py
│ ├── interface.py # EventStore ABC and data classes
│ ├── in_memory.py # InMemoryEventStore
│ └── postgresql.py # PostgreSQLEventStore
│
├── aggregates/ # Aggregate pattern implementation
│ ├── __init__.py
│ ├── base.py # AggregateRoot, DeclarativeAggregate
│ └── repository.py # AggregateRepository
│
├── projections/ # Read model building
│ ├── __init__.py
│ ├── base.py # Projection base classes
│ ├── decorators.py # @handles decorator
│ ├── protocols.py # EventSubscriber protocol
│ └── coordinator.py # ProjectionCoordinator
│
├── repositories/ # Infrastructure repositories
│ ├── __init__.py
│ ├── _json.py # JSON utilities (internal)
│ ├── checkpoint.py # CheckpointRepository
│ ├── dlq.py # DLQRepository
│ └── outbox.py # OutboxRepository
│
├── bus/ # Event distribution
│ ├── __init__.py
│ ├── interface.py # EventBus ABC and protocols
│ ├── memory.py # InMemoryEventBus
│ └── redis.py # RedisEventBus
│
└── migrations/ # Database schema
├── __init__.py
├── schemas/ # SQL schema files
└── templates/ # Alembic templates
Module Dependency Graph¶
The package follows a layered architecture where higher layers depend on lower layers:
┌─────────────┐
│ types.py │
│ exceptions │
└──────┬──────┘
│
┌──────▼──────┐
│ events/ │
│ DomainEvent│
│ Registry │
└──────┬──────┘
│
┌───────────────────┼───────────────────┐
│ │ │
┌──────▼──────┐ ┌──────▼──────┐ ┌──────▼──────┐
│ stores/ │ │ bus/ │ │projections/ │
│ EventStore │ │ EventBus │ │ Projection │
└──────┬──────┘ └──────┬──────┘ └──────┬──────┘
│ │ │
│ │ ┌───────▼───────┐
│ │ │ repositories/ │
│ │ │ Checkpoint │
│ │ │ DLQ, Outbox │
│ │ └───────────────┘
┌──────▼──────┐
│ aggregates/ │
│AggregateRoot│
│ Repository │
└─────────────┘
│
▼
(Application Code)
Core Module Details¶
types.py¶
Shared type definitions used throughout the library.
| Type | Description |
|---|---|
TState |
TypeVar for aggregate state (bound to BaseModel) |
AggregateId |
UUID alias for aggregate identifiers |
EventId |
UUID alias for event identifiers |
TenantId |
UUID or None for multi-tenancy |
CorrelationId |
UUID for linking related events |
CausationId |
UUID or None for event causation tracking |
Version |
int alias for optimistic locking |
StreamPosition |
int for position within an aggregate stream |
GlobalPosition |
int for position across all events |
Dependencies: pydantic (for BaseModel bound)
exceptions.py¶
Exception hierarchy for the library:
| Exception | Description |
|---|---|
EventSourceError |
Base exception for all library errors |
OptimisticLockError |
Version conflict during event append |
EventNotFoundError |
Event lookup failed |
AggregateNotFoundError |
Aggregate has no events |
ProjectionError |
Projection failed to process event |
EventStoreError |
Event store operation failed |
EventBusError |
Event bus operation failed |
CheckpointError |
Checkpoint operation failed |
SerializationError |
Event serialization/deserialization failed |
Dependencies: None (stdlib only)
events/¶
The foundation of event sourcing - all other modules depend on this.
base.py - DomainEvent¶
The immutable base class for all domain events, built on Pydantic:
from eventsource import DomainEvent
class OrderCreated(DomainEvent):
event_type: str = "OrderCreated"
aggregate_type: str = "Order"
customer_id: UUID
order_total: Decimal
Key Features:
- Immutable (Pydantic frozen=True)
- Auto-generated event_id, occurred_at, correlation_id
- Causation tracking via with_causation()
- Metadata enrichment via with_metadata()
- JSON serialization via to_dict() / from_dict()
Dependencies: pydantic, datetime, uuid
registry.py - EventRegistry¶
Maps event type names to classes for deserialization:
from eventsource import register_event, get_event_class
@register_event
class OrderCreated(DomainEvent):
event_type: str = "OrderCreated"
...
# Later, deserialize from storage
event_class = get_event_class("OrderCreated")
event = event_class.from_dict(stored_data)
Key Features: - Thread-safe registration - Decorator-based or explicit registration - Multiple registry support for testing isolation - Helpful error messages listing available types
Dependencies: events/base.py, threading
stores/¶
The persistence layer for events.
interface.py - EventStore ABC¶
Defines the contract for event persistence:
| Class/Type | Purpose |
|---|---|
EventStore |
Abstract base for async event stores |
EventStream |
Container for an aggregate's events |
StoredEvent |
Event with position metadata |
AppendResult |
Result of append operation |
ReadOptions |
Configuration for reading events |
ReadDirection |
Forward or backward reading |
ExpectedVersion |
Constants for optimistic locking |
EventPublisher |
Protocol for publishing events |
Key Operations:
- append_events() - Atomic append with optimistic locking
- get_events() - Retrieve events for an aggregate
- get_events_by_type() - Query events by aggregate type
- event_exists() - Idempotency check
- read_stream() / read_all() - Streaming reads
Dependencies: events/base.py, abc
in_memory.py - InMemoryEventStore¶
Testing and development implementation:
from eventsource import InMemoryEventStore
store = InMemoryEventStore()
result = await store.append_events(
aggregate_id=order_id,
aggregate_type="Order",
events=[order_created],
expected_version=0,
)
Suitable for: Unit tests, prototyping, single-process apps Not suitable for: Production, distributed systems, data persistence
Dependencies: stores/interface.py, collections, threading
postgresql.py - PostgreSQLEventStore¶
Production implementation using SQLAlchemy async:
from eventsource import PostgreSQLEventStore
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker
engine = create_async_engine("postgresql+asyncpg://...")
session_factory = async_sessionmaker(engine, expire_on_commit=False)
store = PostgreSQLEventStore(
session_factory,
outbox_enabled=True,
enable_tracing=True,
)
Features: - Optimistic locking via database constraints - Optional outbox pattern integration - Optional OpenTelemetry tracing - Partition-aware timestamp filtering
Dependencies: stores/interface.py, events/registry.py, sqlalchemy, json
aggregates/¶
The aggregate pattern for consistency boundaries.
base.py - AggregateRoot¶
Base class for event-sourced aggregates:
from eventsource import AggregateRoot, handles
class OrderAggregate(AggregateRoot[OrderState]):
aggregate_type = "Order"
def _get_initial_state(self) -> OrderState:
return OrderState(order_id=self.aggregate_id)
def _apply(self, event: DomainEvent) -> None:
if isinstance(event, OrderCreated):
self._state = OrderState(...)
Alternative: DeclarativeAggregate with @handles decorator:
class OrderAggregate(DeclarativeAggregate[OrderState]):
aggregate_type = "Order"
@handles(OrderCreated)
def _on_order_created(self, event: OrderCreated) -> None:
self._state = OrderState(...)
Key Features: - Generic state type parameter - Uncommitted event tracking - Event replay for reconstitution - Version tracking for optimistic locking
Dependencies: events/base.py, types.py, abc
repository.py - AggregateRepository¶
Repository pattern for aggregate persistence:
from eventsource import AggregateRepository
repo = AggregateRepository(
event_store=store,
aggregate_factory=OrderAggregate,
aggregate_type="Order",
event_publisher=event_bus, # Optional
)
# Load existing
order = await repo.load(order_id)
# Save changes
order.ship(tracking_number="TRACK123")
await repo.save(order)
Key Features:
- Load aggregates from event history
- Save uncommitted events atomically
- Optional event publishing after save
- load_or_create() for new aggregates
Dependencies: aggregates/base.py, stores/interface.py, exceptions.py
projections/¶
Read model building from events.
base.py - Projection Base Classes¶
| Class | Purpose |
|---|---|
Projection |
Abstract base for async projections |
EventHandlerBase |
Base for event handlers |
CheckpointTrackingProjection |
Adds checkpoint, retry, DLQ |
DeclarativeProjection |
Uses @handles decorator |
Dependencies: events/base.py, abc
decorators.py - @handles¶
Decorator for declarative event handling:
from eventsource.projections import DeclarativeProjection, handles
class OrderProjection(DeclarativeProjection):
@handles(OrderCreated)
async def _handle_order_created(self, conn, event: OrderCreated):
await conn.execute(...)
@handles(OrderShipped)
async def _handle_order_shipped(self, conn, event: OrderShipped):
await conn.execute(...)
Dependencies: events/base.py
protocols.py - EventSubscriber¶
Protocol for event subscription:
class OrderProjection:
def subscribed_to(self) -> list[type[DomainEvent]]:
return [OrderCreated, OrderShipped]
async def handle(self, event: DomainEvent) -> None:
...
Dependencies: events/base.py, typing
coordinator.py - ProjectionCoordinator¶
Manages multiple projections:
from eventsource.projections import ProjectionRegistry
registry = ProjectionRegistry()
registry.register_projection(OrderProjection())
registry.register_projection(InventoryProjection())
# Dispatch event to all interested projections
await registry.dispatch(event)
Dependencies: projections/base.py, projections/protocols.py
repositories/¶
Infrastructure support repositories.
checkpoint.py - CheckpointRepository¶
Tracks projection processing position:
from eventsource import CheckpointRepository, PostgreSQLCheckpointRepository
repo = PostgreSQLCheckpointRepository(conn)
# Get last processed event
last_event_id = await repo.get_checkpoint("MyProjection")
# Update after processing
await repo.update_checkpoint("MyProjection", event.event_id, event.event_type)
# Get lag metrics
metrics = await repo.get_lag_metrics("MyProjection", ["OrderCreated", "OrderShipped"])
Implementations:
- PostgreSQLCheckpointRepository - Production
- InMemoryCheckpointRepository - Testing
Dependencies: sqlalchemy, datetime, uuid
dlq.py - DLQRepository¶
Dead letter queue for failed events:
from eventsource import DLQRepository, PostgreSQLDLQRepository
repo = PostgreSQLDLQRepository(conn)
# Add failed event
await repo.add_failed_event(
event_id=event.event_id,
projection_name="MyProjection",
event_type=event.event_type,
event_data=event.to_dict(),
error=exception,
)
# Get failures for investigation
failed = await repo.get_failed_events(projection_name="MyProjection")
# Mark resolved after fix
await repo.mark_resolved(dlq_id, resolved_by="admin")
Implementations:
- PostgreSQLDLQRepository - Production
- InMemoryDLQRepository - Testing
Dependencies: sqlalchemy, datetime, uuid, traceback
outbox.py - OutboxRepository¶
Transactional outbox pattern:
from eventsource import OutboxRepository, PostgreSQLOutboxRepository
repo = PostgreSQLOutboxRepository(conn)
# Add event to outbox (in same transaction as event store)
outbox_id = await repo.add_event(event)
# Background worker publishes and marks complete
pending = await repo.get_pending_events(limit=100)
for entry in pending:
await event_bus.publish(...)
await repo.mark_published(entry.id)
Implementations:
- PostgreSQLOutboxRepository - Production
- InMemoryOutboxRepository - Testing
Dependencies: events/base.py, sqlalchemy, datetime, uuid
bus/¶
Event distribution to consumers.
interface.py - EventBus ABC¶
Defines the contract for event distribution:
| Type | Purpose |
|---|---|
EventBus |
Abstract base for event buses |
EventHandler |
Protocol for handlers with handle() method |
EventSubscriber |
Protocol declaring subscribed event types |
EventHandlerFunc |
Type alias for function handlers |
AsyncEventHandler |
Base class for async handlers |
Key Operations:
- publish() - Send events to subscribers
- subscribe() - Register handler for event type
- subscribe_all() - Register EventSubscriber for all its types
- subscribe_to_all_events() - Wildcard subscription
Dependencies: events/base.py, abc
memory.py - InMemoryEventBus¶
In-process event distribution:
from eventsource import InMemoryEventBus
bus = InMemoryEventBus()
bus.subscribe(OrderCreated, order_handler)
bus.subscribe_to_all_events(audit_logger)
# Synchronous (wait for handlers)
await bus.publish([event])
# Fire and forget
await bus.publish([event], background=True)
# Graceful shutdown
await bus.shutdown(timeout=30.0)
Features: - Thread-safe subscription - Sync and async handler support - Error isolation between handlers - Optional OpenTelemetry tracing - Background task management
Suitable for: Single-instance apps, testing Not suitable for: Distributed deployments
Dependencies: bus/interface.py, asyncio, threading
redis.py - RedisEventBus¶
Distributed event streaming via Redis Streams:
from eventsource.bus import RedisEventBus, RedisEventBusConfig
config = RedisEventBusConfig(
redis_url="redis://localhost:6379",
stream_prefix="myapp",
consumer_group="projections",
)
bus = RedisEventBus(config=config, event_registry=registry)
await bus.connect()
bus.subscribe(OrderCreated, order_handler)
await bus.publish([event])
await bus.start_consuming() # Blocks, processing events
Features: - At-least-once delivery via consumer groups - Horizontal scaling with multiple consumers - Automatic pending message recovery - Dead letter queue for unrecoverable failures - Pipeline optimization for batch publishing - Optional OpenTelemetry tracing
Dependencies: bus/interface.py, events/registry.py, redis.asyncio
Public API Surface¶
The main __init__.py exports the public API. Everything listed in __all__ is considered stable:
Core Types¶
from eventsource import (
# Type aliases
TState, AggregateId, EventId, TenantId, CorrelationId, CausationId,
)
Events¶
from eventsource import (
DomainEvent,
EventRegistry, default_registry,
register_event, get_event_class, get_event_class_or_none,
is_event_registered, list_registered_events,
EventTypeNotFoundError, DuplicateEventTypeError,
)
Event Stores¶
from eventsource import (
EventStore, EventPublisher,
EventStream, AppendResult, StoredEvent,
ReadOptions, ReadDirection, ExpectedVersion,
InMemoryEventStore, PostgreSQLEventStore,
)
Aggregates¶
Event Bus¶
from eventsource import (
EventBus, EventHandler, EventSubscriber,
EventHandlerFunc, AsyncEventHandler,
InMemoryEventBus,
RedisEventBus, RedisEventBusConfig, RedisEventBusStats,
RedisNotAvailableError, REDIS_AVAILABLE,
)
Repositories¶
from eventsource import (
CheckpointRepository, PostgreSQLCheckpointRepository,
InMemoryCheckpointRepository, CheckpointData, LagMetrics,
DLQRepository, PostgreSQLDLQRepository, InMemoryDLQRepository,
DLQEntry, DLQStats, ProjectionFailureCount,
OutboxRepository, PostgreSQLOutboxRepository, InMemoryOutboxRepository,
OutboxEntry, OutboxStats,
EventSourceJSONEncoder,
)
Exceptions¶
from eventsource import (
EventSourceError,
OptimisticLockError,
AggregateNotFoundError,
EventNotFoundError,
ProjectionError,
)
Internal vs Public¶
Public (stable API): Everything exported from the top-level __init__.py
Internal (may change):
- Modules starting with _ (e.g., repositories/_json.py)
- Classes/functions not in __all__
- Helper utilities within modules
- Anything accessed by direct submodule import that's not re-exported
Example of internal usage (avoid in application code):
# Internal - may change without notice
from eventsource.repositories._json import json_dumps
# Public - stable API
from eventsource import EventSourceJSONEncoder
Extension Points¶
Custom Event Store¶
Implement EventStore ABC for new storage backends:
from eventsource import EventStore, AppendResult, EventStream
class MongoDBEventStore(EventStore):
async def append_events(
self,
aggregate_id: UUID,
aggregate_type: str,
events: list[DomainEvent],
expected_version: int,
) -> AppendResult:
# Implement MongoDB-specific logic
...
async def get_events(
self,
aggregate_id: UUID,
aggregate_type: str | None = None,
from_version: int = 0,
from_timestamp: datetime | None = None,
to_timestamp: datetime | None = None,
) -> EventStream:
...
# ... other abstract methods
Custom Projection¶
Extend DeclarativeProjection for read models:
from eventsource.projections import DeclarativeProjection, handles
class OrderDashboardProjection(DeclarativeProjection):
@handles(OrderCreated)
async def _on_order_created(self, conn, event: OrderCreated) -> None:
await conn.execute(
"INSERT INTO order_dashboard (order_id, status) VALUES ($1, $2)",
event.aggregate_id, "created"
)
@handles(OrderShipped)
async def _on_order_shipped(self, conn, event: OrderShipped) -> None:
await conn.execute(
"UPDATE order_dashboard SET status = $1 WHERE order_id = $2",
"shipped", event.aggregate_id
)
Custom Event Bus¶
Implement EventBus ABC for new distribution mechanisms:
from eventsource import EventBus, EventHandler, EventHandlerFunc
class KafkaEventBus(EventBus):
async def publish(
self,
events: list[DomainEvent],
background: bool = False,
) -> None:
# Implement Kafka publishing
...
def subscribe(
self,
event_type: type[DomainEvent],
handler: EventHandler | EventHandlerFunc,
) -> None:
...
# ... other abstract methods
Type Safety¶
The package is fully typed with a py.typed marker (PEP 561):
- All public APIs have type hints
- Compatible with mypy strict mode
- IDE autocomplete supported
- Generic types for aggregates (
AggregateRoot[TState])
Run type checking:
Related Documentation¶
Architecture Decision Records¶
| ADR | Relevance |
|---|---|
| ADR-0001: Async-First Design | Why all stores/buses are async |
| ADR-0002: Pydantic Event Models | DomainEvent design choices |
| ADR-0003: Optimistic Locking | Version-based concurrency |
| ADR-0004: Projection Error Handling | DLQ and retry strategy |
| ADR-0005: API Design Patterns | Public API conventions |
| ADR-0006: Event Registry | Event type registration |
Other Documentation¶
- Architecture Overview - High-level system design
- Getting Started Guide - Quick start tutorial
- API Reference - Detailed API documentation
- Testing Guide - Testing strategies and patterns
- Production Guide - Deployment recommendations