Tutorial 11: PostgreSQL - Production Event Store¶
Difficulty: Intermediate
Prerequisites¶
- Tutorial 1: Introduction to Event Sourcing
- Tutorial 2: Your First Domain Event
- Tutorial 3: Building Your First Aggregate
- Tutorial 4: Event Stores
- Tutorial 5: Repositories and Aggregate Lifecycle
- Python 3.10 or higher
- PostgreSQL 12 or higher
- Understanding of async/await and SQLAlchemy
pip install eventsource-py[postgresql]
Learning Objectives¶
By the end of this tutorial, you will be able to:
- Set up PostgreSQL as your production event store backend
- Create the events table schema with proper indexes
- Configure PostgreSQLEventStore with connection pooling
- Use the outbox pattern for reliable event publishing
- Set up PostgreSQLCheckpointRepository for projections
- Understand PostgreSQL-specific features like JSONB and partitioning
- Tune connection pool settings for different workloads
- Monitor and maintain your PostgreSQL event store
Why PostgreSQL for Production?¶
PostgreSQL is the recommended event store for production deployments. Here's why:
Advantages: - ACID guarantees: Full transactional consistency across events - JSONB storage: Native JSON querying for event payloads - Row-level locking: Optimistic concurrency control via unique constraints - Replication: Built-in streaming replication for high availability - Partitioning: Scale to billions of events with table partitioning - Multi-process: Safe concurrent access from multiple workers - Observability: Rich monitoring via pg_stat tables
When to use: - Production applications requiring durability - Multi-process or distributed deployments - Applications needing complex event queries - Systems requiring audit trails and compliance - High-throughput event sourcing workloads
PostgreSQL Setup¶
Option 1: Docker (Recommended for Development)¶
docker run -d --name eventsource-postgres \
-e POSTGRES_USER=postgres \
-e POSTGRES_PASSWORD=postgres \
-e POSTGRES_DB=eventsource_tutorial \
-p 5432:5432 \
postgres:15
# Verify connection
docker exec -it eventsource-postgres psql -U postgres -d eventsource_tutorial
Option 2: Local PostgreSQL¶
# Connect to PostgreSQL
psql -U postgres
# Create database
CREATE DATABASE eventsource_tutorial;
# Verify
\c eventsource_tutorial
Option 3: Cloud PostgreSQL¶
For production, use managed PostgreSQL services:
- AWS RDS for PostgreSQL: Managed with automated backups
- Google Cloud SQL: Fully managed PostgreSQL
- Azure Database for PostgreSQL: Enterprise-grade managed service
- Supabase: PostgreSQL with built-in auth and APIs
Connection string format:
# Local development
DATABASE_URL = "postgresql+asyncpg://postgres:postgres@localhost:5432/eventsource_tutorial"
# Production with SSL
DATABASE_URL = "postgresql+asyncpg://user:pass@prod-host:5432/mydb?ssl=require"
# Cloud providers
AWS_URL = "postgresql+asyncpg://user:pass@mydb.123.us-east-1.rds.amazonaws.com:5432/mydb?ssl=require"
Database Schema¶
Core Events Table¶
The events table is the heart of your event store:
CREATE TABLE IF NOT EXISTS events (
-- Global position for ordered replay across all streams
global_position BIGSERIAL PRIMARY KEY,
-- Unique event identifier (UUID v4)
event_id UUID NOT NULL UNIQUE,
-- Aggregate identification
aggregate_id UUID NOT NULL,
aggregate_type VARCHAR(255) NOT NULL,
-- Event metadata
event_type VARCHAR(255) NOT NULL,
-- Multi-tenancy support (optional)
tenant_id UUID,
-- Actor/user who triggered the event
actor_id VARCHAR(255),
-- Optimistic concurrency control
-- Version starts at 1 and increments for each event
version INTEGER NOT NULL,
-- Event timestamp (when it occurred in the domain)
timestamp TIMESTAMP WITH TIME ZONE NOT NULL,
-- Event payload stored as JSONB for flexible querying
payload JSONB NOT NULL,
-- Technical metadata
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
-- Ensure no two events have the same version for an aggregate
-- This prevents concurrent modifications
CONSTRAINT uq_events_aggregate_version
UNIQUE (aggregate_id, aggregate_type, version)
);
Essential Indexes¶
These indexes optimize common event sourcing query patterns:
-- Load aggregate event streams
CREATE INDEX IF NOT EXISTS idx_events_aggregate_id
ON events (aggregate_id);
-- Query events by aggregate type (for projections)
CREATE INDEX IF NOT EXISTS idx_events_aggregate_type
ON events (aggregate_type);
-- Query events by event type
CREATE INDEX IF NOT EXISTS idx_events_event_type
ON events (event_type);
-- Time-based queries for replay and auditing
CREATE INDEX IF NOT EXISTS idx_events_timestamp
ON events (timestamp);
-- Multi-tenant queries (partial index saves space)
CREATE INDEX IF NOT EXISTS idx_events_tenant_id
ON events (tenant_id)
WHERE tenant_id IS NOT NULL;
-- Composite index for projection queries
CREATE INDEX IF NOT EXISTS idx_events_type_tenant_timestamp
ON events (aggregate_type, tenant_id, timestamp);
-- Aggregate stream loading with version ordering
CREATE INDEX IF NOT EXISTS idx_events_aggregate_version
ON events (aggregate_id, aggregate_type, version);
Running Migrations¶
Option 1: Use the complete schema file
# Download the complete schema
psql -U postgres -d eventsource_tutorial -f src/eventsource/migrations/schemas/all.sql
# Or from the repository
wget https://raw.githubusercontent.com/tyevans/eventsource-py/main/src/eventsource/migrations/schemas/all.sql
psql -U postgres -d eventsource_tutorial -f all.sql
The complete schema includes: - Events table with indexes - Event outbox table (for reliable publishing) - Projection checkpoints table - Dead letter queue table - Snapshots table
Option 2: Create tables programmatically
from sqlalchemy import text
from sqlalchemy.ext.asyncio import create_async_engine
async def setup_database(database_url: str):
"""Create tables programmatically."""
engine = create_async_engine(database_url)
async with engine.begin() as conn:
# Create events table
await conn.execute(text("""
CREATE TABLE IF NOT EXISTS events (
global_position BIGSERIAL PRIMARY KEY,
event_id UUID NOT NULL UNIQUE,
aggregate_id UUID NOT NULL,
aggregate_type VARCHAR(255) NOT NULL,
event_type VARCHAR(255) NOT NULL,
tenant_id UUID,
actor_id VARCHAR(255),
version INTEGER NOT NULL,
timestamp TIMESTAMP WITH TIME ZONE NOT NULL,
payload JSONB NOT NULL,
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
CONSTRAINT uq_events_aggregate_version
UNIQUE (aggregate_id, aggregate_type, version)
)
"""))
# Create essential indexes
await conn.execute(text("""
CREATE INDEX IF NOT EXISTS idx_events_aggregate_id
ON events(aggregate_id)
"""))
await conn.execute(text("""
CREATE INDEX IF NOT EXISTS idx_events_aggregate_type
ON events(aggregate_type)
"""))
await conn.execute(text("""
CREATE INDEX IF NOT EXISTS idx_events_timestamp
ON events(timestamp)
"""))
await engine.dispose()
# Usage
await setup_database(DATABASE_URL)
Option 3: Use Alembic for version control
For production, use Alembic to manage schema changes:
# Initialize Alembic
alembic init migrations
# Edit alembic.ini with your database URL
# Edit migrations/env.py to configure async SQLAlchemy
# Create initial migration
alembic revision -m "Initial event store schema"
# Apply migrations
alembic upgrade head
See src/eventsource/migrations/templates/alembic/ for Alembic templates.
Configuring PostgreSQLEventStore¶
Basic Configuration¶
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession
from eventsource import PostgreSQLEventStore
# Create async engine
engine = create_async_engine(
"postgresql+asyncpg://postgres:postgres@localhost:5432/eventsource_tutorial",
echo=False, # Set True to see SQL queries
)
# Create session factory
# IMPORTANT: expire_on_commit=False is REQUIRED for async
session_factory = async_sessionmaker(
engine,
expire_on_commit=False,
class_=AsyncSession,
)
# Create event store
event_store = PostgreSQLEventStore(session_factory)
# Remember to dispose engine on shutdown
await engine.dispose()
Connection Pooling¶
Configure connection pool based on your workload:
from sqlalchemy.ext.asyncio import create_async_engine
# Light workload (2-5 connections)
# Good for: development, single-worker apps, low traffic
engine = create_async_engine(
database_url,
pool_size=2, # Baseline connections
max_overflow=3, # Additional connections under load
pool_timeout=30, # Wait up to 30s for a connection
pool_recycle=1800, # Recycle connections after 30 minutes
pool_pre_ping=True, # Test connections before use
)
# Medium workload (5-10 connections)
# Good for: typical web apps, moderate traffic
engine = create_async_engine(
database_url,
pool_size=5,
max_overflow=10,
pool_timeout=30,
pool_recycle=1800,
pool_pre_ping=True,
)
# Heavy workload (10-20 connections)
# Good for: high-traffic apps, background workers, multi-tenant
engine = create_async_engine(
database_url,
pool_size=10,
max_overflow=20,
pool_timeout=30,
pool_recycle=1800,
pool_pre_ping=True,
)
Connection pool parameters:
| Parameter | Description | Default | Recommendation |
|---|---|---|---|
pool_size |
Baseline connections kept open | 5 | Start low, monitor, increase if exhausted |
max_overflow |
Additional connections under load | 10 | 2x pool_size |
pool_timeout |
Seconds to wait for connection | 30 | 30s for web apps, 60s for batch jobs |
pool_recycle |
Recycle connections after N seconds | -1 (never) | 1800 (30 min) to handle network issues |
pool_pre_ping |
Test connections before use | False | True for production (handles stale connections) |
echo |
Log all SQL statements | False | True for debugging only |
Monitoring connection pool:
# Check pool status
print(f"Pool size: {engine.pool.size()}")
print(f"Checked out: {engine.pool.checkedout()}")
print(f"Overflow: {engine.pool.overflow()}")
print(f"Checked in: {engine.pool.checkedin()}")
# If checkedout == pool_size + max_overflow, increase pool_size
Constructor Parameters¶
event_store = PostgreSQLEventStore(
session_factory, # Required: async session factory
event_registry=custom_registry, # Optional: custom event registry
outbox_enabled=True, # Optional: enable outbox pattern
enable_tracing=True, # Optional: OpenTelemetry tracing
uuid_fields={"parent_id"}, # Optional: additional UUID fields
string_id_fields={"stripe_id"}, # Optional: exclude from UUID detection
auto_detect_uuid=True, # Optional: auto-detect UUID fields
)
Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
session_factory |
async_sessionmaker | Required | SQLAlchemy session factory |
event_registry |
EventRegistry | default_registry | Event type lookup |
outbox_enabled |
bool | False | Enable outbox pattern |
enable_tracing |
bool | True | OpenTelemetry tracing |
uuid_fields |
set[str] | None | Additional UUID field names |
string_id_fields |
set[str] | None | Fields to exclude from UUID detection |
auto_detect_uuid |
bool | True | Auto-detect fields ending in '_id' as UUIDs |
UUID Field Detection¶
PostgreSQL stores UUIDs efficiently as 128-bit values. The event store automatically detects UUID fields:
# Default behavior: auto-detect fields ending in '_id'
event_store = PostgreSQLEventStore(session_factory)
# Treats as UUIDs: event_id, aggregate_id, tenant_id, user_id, etc.
# Add custom UUID fields
event_store = PostgreSQLEventStore(
session_factory,
uuid_fields={"parent_id", "reference_id"},
)
# Exclude string IDs from auto-detection
event_store = PostgreSQLEventStore(
session_factory,
string_id_fields={"stripe_customer_id", "external_api_id"},
)
# Strict mode: explicit UUID fields only (no auto-detection)
event_store = PostgreSQLEventStore.with_strict_uuid_detection(
session_factory,
uuid_fields={"event_id", "aggregate_id", "tenant_id"},
)
Using with Repositories¶
Repository usage is identical across all event store backends:
from uuid import uuid4
from eventsource import AggregateRepository
async def main():
# Create event store
event_store = PostgreSQLEventStore(session_factory)
# Create repository
repo = AggregateRepository(
event_store=event_store,
aggregate_factory=OrderAggregate,
aggregate_type="Order",
)
# Create aggregate
order_id = uuid4()
order = repo.create_new(order_id)
order.create(customer_id="cust-123", total=299.99)
await repo.save(order)
# Load aggregate
loaded = await repo.load(order_id)
loaded.ship(tracking_number="TRACK-456")
await repo.save(loaded)
# Verify
final = await repo.load(order_id)
print(f"Order status: {final.state.status}")
The beauty of the event store abstraction: change backends without changing your application code!
The Outbox Pattern¶
The outbox pattern guarantees event publishing by writing events to both the events table and an outbox table in the same transaction.
Why Use the Outbox Pattern?¶
Problem: Event publishing can fail if the message broker is unavailable:
# Without outbox: events saved but publishing fails
await event_store.append_events(...) # Saved to database
await event_bus.publish(...) # Fails! Broker down!
# Events lost in the event bus!
Solution: The outbox pattern ensures at-least-once delivery:
# With outbox: events and outbox entries saved together
await event_store.append_events(...) # Saves events + outbox entries
# Background worker publishes from outbox
# If broker is down, events remain in outbox for retry
Enabling the Outbox Pattern¶
# Create event store with outbox enabled
event_store = PostgreSQLEventStore(
session_factory,
outbox_enabled=True,
)
Outbox Table Schema¶
The outbox table tracks events pending publication:
CREATE TABLE IF NOT EXISTS event_outbox (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
event_id UUID NOT NULL,
event_type VARCHAR(255) NOT NULL,
aggregate_id UUID NOT NULL,
aggregate_type VARCHAR(255) NOT NULL,
tenant_id UUID,
event_data JSONB NOT NULL,
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
published_at TIMESTAMP WITH TIME ZONE,
retry_count INTEGER NOT NULL DEFAULT 0,
last_error TEXT,
status VARCHAR(20) NOT NULL DEFAULT 'pending',
CONSTRAINT chk_outbox_status CHECK (status IN ('pending', 'published', 'failed'))
);
-- Indexes for outbox worker
CREATE INDEX IF NOT EXISTS idx_outbox_pending
ON event_outbox (created_at)
WHERE status = 'pending';
Outbox Worker¶
A background worker polls the outbox and publishes events:
import asyncio
from datetime import datetime, UTC
from sqlalchemy import text
async def outbox_worker(session_factory, event_bus, interval_seconds: int = 5):
"""
Background worker that publishes events from the outbox.
Args:
session_factory: SQLAlchemy session factory
event_bus: Event bus for publishing
interval_seconds: Polling interval
"""
while True:
try:
async with session_factory() as session:
# Get pending events (with row locking)
result = await session.execute(
text("""
SELECT id, event_id, event_type, event_data
FROM event_outbox
WHERE status = 'pending'
ORDER BY created_at ASC
LIMIT 100
FOR UPDATE SKIP LOCKED
""")
)
pending = result.fetchall()
for row in pending:
outbox_id, event_id, event_type, event_data = row
try:
# Deserialize event
event_class = event_registry.get(event_type)
event = event_class.model_validate(event_data)
# Publish to event bus
await event_bus.publish(event)
# Mark as published
await session.execute(
text("""
UPDATE event_outbox
SET status = 'published',
published_at = NOW()
WHERE id = :id
"""),
{"id": outbox_id}
)
except Exception as e:
# Increment retry count and log error
await session.execute(
text("""
UPDATE event_outbox
SET retry_count = retry_count + 1,
last_error = :error,
status = CASE
WHEN retry_count >= 5 THEN 'failed'
ELSE 'pending'
END
WHERE id = :id
"""),
{"id": outbox_id, "error": str(e)}
)
await session.commit()
except Exception as e:
print(f"Outbox worker error: {e}")
# Wait before next poll
await asyncio.sleep(interval_seconds)
# Run worker in background
asyncio.create_task(outbox_worker(session_factory, event_bus))
When to Use the Outbox Pattern¶
Use when: - Event publishing must be guaranteed (financial transactions, orders) - Message broker may be temporarily unavailable - You need exactly-once or at-least-once delivery semantics - Events trigger critical downstream processes
Skip when: - Best-effort publishing is acceptable - Development/testing environments - Message broker is highly available with redundancy - Events are for non-critical notifications only
PostgreSQL Checkpoint Repository¶
Use PostgreSQLCheckpointRepository to track projection positions:
from eventsource.repositories.checkpoint import PostgreSQLCheckpointRepository
# Create checkpoint repository
checkpoint_repo = PostgreSQLCheckpointRepository(engine)
# Save checkpoint
await checkpoint_repo.save_position(
subscription_id="OrderProjection",
position=1000,
event_id=last_event.event_id,
event_type=last_event.event_type,
)
# Resume from checkpoint
last_position = await checkpoint_repo.get_position("OrderProjection")
# Start from position 1001
Features: - Automatic upsert (idempotent) - Tracks global position and event ID - Monitors projection lag - Supports projection resets
See Tutorial 10: Checkpoints for detailed usage.
Multi-Tenancy Support¶
PostgreSQL event store has built-in multi-tenancy via the tenant_id column:
from uuid import UUID
# Events with tenant_id
@register_event
class OrderPlaced(DomainEvent):
event_type: str = "OrderPlaced"
aggregate_type: str = "Order"
customer_id: str
total: float
tenant_id: UUID | None = None # Multi-tenancy support
# Query events by tenant
from eventsource import ReadOptions
tenant_id = UUID("...")
options = ReadOptions(tenant_id=tenant_id)
async for stored_event in event_store.read_all(options):
# Only events for this tenant
print(stored_event.event_type)
Multi-tenancy patterns:
| Pattern | Description | Use Case |
|---|---|---|
| Shared schema | All tenants share events table, filtered by tenant_id | Cost-effective, easy to manage |
| Separate schemas | Each tenant has own schema in same database | Better isolation, moderate cost |
| Separate databases | Each tenant has own database | Maximum isolation, highest cost |
For shared schema, the partial index on tenant_id keeps queries fast.
Performance Tuning¶
Connection Pool Tuning¶
Monitor connection pool usage:
import asyncio
async def monitor_pool(engine):
"""Monitor connection pool every 30 seconds."""
while True:
pool = engine.pool
print(f"Pool stats:")
print(f" Size: {pool.size()}")
print(f" Checked out: {pool.checkedout()}")
print(f" Overflow: {pool.overflow()}")
print(f" Checked in: {pool.checkedin()}")
# Alert if pool is exhausted
if pool.checkedout() >= pool.size() + pool._max_overflow:
print("WARNING: Connection pool exhausted!")
await asyncio.sleep(30)
# Run in background
asyncio.create_task(monitor_pool(engine))
Tuning guidelines:
- Start with small pool (2-5 connections)
- Monitor checkedout vs pool_size + max_overflow
- If frequently exhausted, increase pool_size
- Set pool_recycle=1800 to handle stale connections
- Always use pool_pre_ping=True in production
Table Partitioning¶
For high-volume deployments (millions+ events), use table partitioning:
-- See src/eventsource/migrations/templates/events_partitioned.sql
-- Partitioned by timestamp (monthly partitions)
CREATE TABLE events (
-- Same columns as before
) PARTITION BY RANGE (timestamp);
-- Create partitions
CREATE TABLE events_2024_01 PARTITION OF events
FOR VALUES FROM ('2024-01-01') TO ('2024-02-01');
CREATE TABLE events_2024_02 PARTITION OF events
FOR VALUES FROM ('2024-02-01') TO ('2024-03-01');
-- Benefits:
-- - Faster queries (partition pruning)
-- - Easier archival (drop old partitions)
-- - Better vacuum performance
Partitioning strategies:
| Strategy | Partition By | Use Case |
|---|---|---|
| Time-based | timestamp (monthly/daily) | Most common, enables archival |
| Tenant-based | tenant_id | Multi-tenant isolation |
| Hybrid | tenant_id + timestamp | Large multi-tenant systems |
Index Optimization¶
Monitor index usage:
-- Check index usage
SELECT
schemaname,
tablename,
indexname,
idx_scan AS scans,
idx_tup_read AS tuples_read,
idx_tup_fetch AS tuples_fetched
FROM pg_stat_user_indexes
WHERE tablename = 'events'
ORDER BY idx_scan DESC;
-- Unused indexes (scans = 0) may be candidates for removal
JSONB Query Optimization¶
Create indexes on JSONB fields for fast queries:
-- Index on specific JSONB field
CREATE INDEX idx_events_payload_customer_id
ON events ((payload->>'customer_id'));
-- GIN index for JSONB containment queries
CREATE INDEX idx_events_payload_gin
ON events USING GIN (payload);
-- Query with JSONB index
SELECT * FROM events
WHERE payload->>'customer_id' = 'cust-123';
Complete Working Example¶
"""
Tutorial 11: PostgreSQL Event Store
Complete example demonstrating PostgreSQL as production event store.
Prerequisites:
- PostgreSQL running on localhost:5432
- Database 'eventsource_tutorial' created
- pip install eventsource-py[postgresql]
Run with: python tutorial_11_postgresql.py
"""
import asyncio
from uuid import uuid4
from pydantic import BaseModel
from sqlalchemy import text
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession
from eventsource import (
DomainEvent,
register_event,
AggregateRoot,
PostgreSQLEventStore,
AggregateRepository,
)
# =============================================================================
# Events
# =============================================================================
@register_event
class OrderCreated(DomainEvent):
event_type: str = "OrderCreated"
aggregate_type: str = "Order"
customer_id: str
total: float
@register_event
class OrderShipped(DomainEvent):
event_type: str = "OrderShipped"
aggregate_type: str = "Order"
tracking_number: str
# =============================================================================
# Aggregate
# =============================================================================
class OrderState(BaseModel):
order_id: str
customer_id: str | None = None
total: float = 0.0
status: str = "draft"
tracking_number: str | None = None
class OrderAggregate(AggregateRoot[OrderState]):
aggregate_type = "Order"
def _get_initial_state(self) -> OrderState:
return OrderState(order_id=str(self.aggregate_id))
def _apply(self, event: DomainEvent) -> None:
if isinstance(event, OrderCreated):
self._state = OrderState(
order_id=str(self.aggregate_id),
customer_id=event.customer_id,
total=event.total,
status="created",
)
elif isinstance(event, OrderShipped):
if self._state:
self._state = self._state.model_copy(update={
"status": "shipped",
"tracking_number": event.tracking_number,
})
def create(self, customer_id: str, total: float) -> None:
if self.version > 0:
raise ValueError("Order already exists")
self.apply_event(OrderCreated(
aggregate_id=self.aggregate_id,
customer_id=customer_id,
total=total,
aggregate_version=self.get_next_version(),
))
def ship(self, tracking_number: str) -> None:
if not self.state or self.state.status != "created":
raise ValueError("Cannot ship order in current state")
self.apply_event(OrderShipped(
aggregate_id=self.aggregate_id,
tracking_number=tracking_number,
aggregate_version=self.get_next_version(),
))
# =============================================================================
# PostgreSQL Configuration
# =============================================================================
DATABASE_URL = "postgresql+asyncpg://postgres:postgres@localhost:5432/eventsource_tutorial"
async def setup_database(engine):
"""Create the events table and indexes."""
async with engine.begin() as conn:
# Create events table
await conn.execute(text("""
CREATE TABLE IF NOT EXISTS events (
global_position BIGSERIAL PRIMARY KEY,
event_id UUID NOT NULL UNIQUE,
aggregate_id UUID NOT NULL,
aggregate_type VARCHAR(255) NOT NULL,
event_type VARCHAR(255) NOT NULL,
tenant_id UUID,
actor_id VARCHAR(255),
version INTEGER NOT NULL,
timestamp TIMESTAMP WITH TIME ZONE NOT NULL,
payload JSONB NOT NULL,
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
CONSTRAINT uq_events_aggregate_version
UNIQUE (aggregate_id, aggregate_type, version)
)
"""))
# Create indexes
await conn.execute(text("""
CREATE INDEX IF NOT EXISTS idx_events_aggregate_id
ON events(aggregate_id)
"""))
await conn.execute(text("""
CREATE INDEX IF NOT EXISTS idx_events_aggregate_type
ON events(aggregate_type)
"""))
await conn.execute(text("""
CREATE INDEX IF NOT EXISTS idx_events_timestamp
ON events(timestamp)
"""))
print("Database schema created successfully!")
def create_event_store(database_url: str):
"""Create PostgreSQL event store with connection pooling."""
# Create async engine with connection pool
engine = create_async_engine(
database_url,
pool_size=5, # Baseline connections
max_overflow=10, # Additional connections under load
pool_timeout=30, # Wait 30s for connection
pool_recycle=1800, # Recycle connections after 30 min
pool_pre_ping=True, # Test connections before use
echo=False, # Set True to see SQL queries
)
# Create session factory
# IMPORTANT: expire_on_commit=False is REQUIRED for async
session_factory = async_sessionmaker(
engine,
expire_on_commit=False,
class_=AsyncSession,
)
# Create event store
event_store = PostgreSQLEventStore(
session_factory,
outbox_enabled=False, # Set True to enable outbox pattern
enable_tracing=True, # OpenTelemetry tracing
)
return event_store, engine
# =============================================================================
# Main Demo
# =============================================================================
async def main():
print("=" * 70)
print("Tutorial 11: PostgreSQL Event Store")
print("=" * 70)
# Create event store and engine
print("\n1. Creating PostgreSQL event store...")
event_store, engine = create_event_store(DATABASE_URL)
try:
# Setup database schema
print("2. Setting up database schema...")
await setup_database(engine)
# Create repository
print("3. Creating aggregate repository...")
repo = AggregateRepository(
event_store=event_store,
aggregate_factory=OrderAggregate,
aggregate_type="Order",
)
# Create an order
print("\n4. Creating an order...")
order_id = uuid4()
order = repo.create_new(order_id)
order.create(customer_id="customer-456", total=299.99)
await repo.save(order)
print(f" Order created: {order_id}")
print(f" Version: {order.version}")
print(f" Status: {order.state.status}")
print(f" Customer: {order.state.customer_id}")
print(f" Total: ${order.state.total}")
# Load and modify
print("\n5. Loading and shipping order...")
loaded = await repo.load(order_id)
loaded.ship(tracking_number="TRACK-123456")
await repo.save(loaded)
print(f" Order shipped!")
print(f" Version: {loaded.version}")
print(f" Status: {loaded.state.status}")
print(f" Tracking: {loaded.state.tracking_number}")
# Verify persistence
print("\n6. Verifying persistence...")
verified = await repo.load(order_id)
print(f" Loaded order version: {verified.version}")
print(f" Loaded status: {verified.state.status}")
# Query events from store
print("\n7. Querying events from PostgreSQL...")
stream = await event_store.get_events(order_id, "Order")
print(f" Found {len(stream.events)} events:")
for event in stream.events:
print(f" - v{event.aggregate_version}: {event.event_type}")
# Check connection pool stats
print("\n8. Connection pool statistics...")
pool = engine.pool
print(f" Pool size: {pool.size()}")
print(f" Checked out: {pool.checkedout()}")
print(f" Overflow: {pool.overflow()}")
print("\n" + "=" * 70)
print("SUCCESS: PostgreSQL event store is working!")
print("=" * 70)
finally:
# Always dispose engine to close connections
await engine.dispose()
print("\nConnection pool disposed.")
if __name__ == "__main__":
asyncio.run(main())
Expected output:
======================================================================
Tutorial 11: PostgreSQL Event Store
======================================================================
1. Creating PostgreSQL event store...
2. Setting up database schema...
Database schema created successfully!
3. Creating aggregate repository...
4. Creating an order...
Order created: [UUID]
Version: 1
Status: created
Customer: customer-456
Total: $299.99
5. Loading and shipping order...
Order shipped!
Version: 2
Status: shipped
Tracking: TRACK-123456
6. Verifying persistence...
Loaded order version: 2
Loaded status: shipped
7. Querying events from PostgreSQL...
Found 2 events:
- v1: OrderCreated
- v2: OrderShipped
8. Connection pool statistics...
Pool size: 5
Checked out: 0
Overflow: 0
======================================================================
SUCCESS: PostgreSQL event store is working!
======================================================================
Connection pool disposed.
Monitoring and Maintenance¶
Monitoring Event Store Health¶
-- Total events in store
SELECT COUNT(*) AS total_events FROM events;
-- Events by aggregate type
SELECT aggregate_type, COUNT(*) AS count
FROM events
GROUP BY aggregate_type
ORDER BY count DESC;
-- Events per day (last 7 days)
SELECT DATE(timestamp) AS date, COUNT(*) AS events
FROM events
WHERE timestamp >= CURRENT_DATE - INTERVAL '7 days'
GROUP BY DATE(timestamp)
ORDER BY date DESC;
-- Largest aggregates (most events)
SELECT aggregate_id, aggregate_type, COUNT(*) AS event_count
FROM events
GROUP BY aggregate_id, aggregate_type
ORDER BY event_count DESC
LIMIT 10;
-- Table size
SELECT
pg_size_pretty(pg_total_relation_size('events')) AS total_size,
pg_size_pretty(pg_relation_size('events')) AS table_size,
pg_size_pretty(pg_total_relation_size('events') - pg_relation_size('events')) AS index_size;
Maintenance Tasks¶
-- Vacuum events table (reclaim space)
VACUUM ANALYZE events;
-- Reindex if queries are slow
REINDEX TABLE events;
-- Update table statistics
ANALYZE events;
Archiving Old Events¶
-- Archive events older than 1 year to archive table
CREATE TABLE events_archive (LIKE events INCLUDING ALL);
INSERT INTO events_archive
SELECT * FROM events
WHERE timestamp < NOW() - INTERVAL '1 year';
DELETE FROM events
WHERE timestamp < NOW() - INTERVAL '1 year';
VACUUM ANALYZE events;
Common Patterns¶
Transaction Management¶
# Manual transaction control
async with session_factory() as session:
async with session.begin():
# Append events
result = await event_store.append_events(...)
# Do other database work in same transaction
await session.execute(...)
# Commit happens automatically on context exit
Querying JSONB Payloads¶
from sqlalchemy import text
# Query events by JSONB field
async with session_factory() as session:
result = await session.execute(
text("""
SELECT event_id, event_type, payload
FROM events
WHERE payload->>'customer_id' = :customer_id
"""),
{"customer_id": "cust-123"}
)
rows = result.fetchall()
Handling Connection Errors¶
from sqlalchemy.exc import DBAPIError
import asyncio
async def save_with_retry(repo, aggregate, max_retries: int = 3):
"""Save with automatic retry on connection errors."""
for attempt in range(max_retries):
try:
await repo.save(aggregate)
return
except DBAPIError as e:
if attempt == max_retries - 1:
raise
print(f"Connection error, retrying... ({attempt + 1}/{max_retries})")
await asyncio.sleep(1)
Key Takeaways¶
- PostgreSQL is production-ready: Full ACID guarantees, multi-process support, high performance
- Connection pooling is essential: Configure pool_size and max_overflow based on workload
- JSONB enables rich queries: Store structured event data, query with JSON operators
- Indexes optimize performance: All essential indexes included in schema
- Outbox pattern guarantees publishing: At-least-once delivery even if broker is down
- Partitioning scales to billions: Use table partitioning for high-volume deployments
- expire_on_commit=False is required: Critical for async SQLAlchemy sessions
- Monitor connection pool: Watch for exhaustion and increase pool_size if needed
- Use checkpoints for projections: PostgreSQLCheckpointRepository tracks positions
- Dispose engine on shutdown: Always call
await engine.dispose()to close connections
Next Steps¶
Now that you understand PostgreSQL for production, explore alternative backends:
Continue to Tutorial 12: SQLite Event Store to learn about: - Setting up SQLite for development and embedded applications - WAL mode for better concurrency - When to use SQLite vs PostgreSQL - Migration strategies between backends
For production deployments, see: - Tutorial 13: Event Bus Integration (Redis, RabbitMQ, Kafka) - Tutorial 14: Snapshots for Performance Optimization - Tutorial 15: Multi-Tenancy Patterns