Tutorial 15: Production Deployment¶
Difficulty: Advanced
Prerequisites¶
- Tutorial 1: Introduction to Event Sourcing
- Tutorial 11: PostgreSQL Event Store
- Tutorial 13: Subscription Management
- Tutorial 14: Optimizing with Aggregate Snapshotting
- Python 3.10 or higher
- Understanding of async/await
- Basic Docker and database administration knowledge
- Understanding of production deployment concepts
Learning Objectives¶
By the end of this tutorial, you will be able to:
- Configure production-ready database connection pools
- Implement comprehensive health checks for Kubernetes deployments
- Set up structured logging for production observability
- Integrate OpenTelemetry tracing for distributed systems
- Configure graceful shutdown with signal handling
- Tune event processing for production workloads
- Implement monitoring and alerting strategies
- Apply security best practices for production deployments
Production Readiness Checklist¶
Before deploying to production, verify these key areas:
Infrastructure¶
- PostgreSQL configured with proper connection limits
- Database indices created for event queries
- Connection pooling configured based on instance count
- Backup and point-in-time recovery (PITR) enabled
- Secrets management for credentials
- Network security and firewall rules
Application¶
- Health check endpoints (liveness, readiness) implemented
- Graceful shutdown with signal handling configured
- Structured logging for aggregation
- Error monitoring and alerting
- Checkpoint strategy selected and configured
- Snapshot thresholds tuned for workload
- OpenTelemetry tracing enabled (optional)
Operations¶
- Deployment runbook documented
- Monitoring dashboards configured
- Alert thresholds defined
- Capacity planning completed
- Disaster recovery plan tested
- On-call rotation established
Database Connection Pool Configuration¶
Connection pooling is critical for production performance. Proper configuration prevents connection exhaustion and optimizes database resource usage.
Basic Connection Pool Setup¶
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker
def create_production_engine(
database_url: str,
pool_size: int = 10,
max_overflow: int = 20,
):
"""
Create production-ready database engine.
Args:
database_url: PostgreSQL connection URL
pool_size: Number of persistent connections
max_overflow: Additional connections under load
"""
return create_async_engine(
database_url,
pool_size=pool_size, # Persistent connections
max_overflow=max_overflow, # Temporary overflow connections
pool_timeout=30.0, # Wait time for connection
pool_recycle=1800, # Recycle connections after 30 min
pool_pre_ping=True, # Verify connection before use
echo=False, # Disable SQL logging
connect_args={
"server_settings": {
"statement_timeout": "30000", # 30s query timeout
}
},
)
# Create session factory
engine = create_production_engine(
"postgresql+asyncpg://user:pass@localhost:5432/mydb"
)
session_factory = async_sessionmaker(
engine,
expire_on_commit=False, # Performance optimization
)
Connection Pool Sizing¶
Formula: total_connections = (pool_size + max_overflow) * instance_count
Constraint: Must not exceed PostgreSQL max_connections (default: 100)
| Workload | Instances | pool_size | max_overflow | Total |
|---|---|---|---|---|
| Development | 1 | 5 | 5 | 10 |
| Light | 2 | 5 | 10 | 30 |
| Medium | 4 | 10 | 15 | 100 |
| Heavy | 8 | 5 | 7 | 96 |
Example calculation:
import os
# Environment configuration
instance_count = int(os.getenv("INSTANCE_COUNT", "1"))
max_db_connections = int(os.getenv("MAX_DB_CONNECTIONS", "100"))
# Reserve 10% for admin connections
usable_connections = int(max_db_connections * 0.9)
# Divide across instances
connections_per_instance = usable_connections // instance_count
# Split into pool_size and max_overflow (2:1 ratio)
pool_size = max(2, connections_per_instance // 3)
max_overflow = connections_per_instance - pool_size
print(f"Pool config: pool_size={pool_size}, max_overflow={max_overflow}")
Common Connection Pool Issues¶
Problem: Connection pool exhausted
Symptoms: TimeoutError: QueuePool limit exceeded
Solutions:
- Increase pool_size and max_overflow
- Reduce max_in_flight in subscription configs
- Check for connection leaks (unclosed sessions)
Problem: Too many database connections
Symptoms: PostgreSQL FATAL: too many connections
Solutions:
- Reduce pool_size + max_overflow per instance
- Increase PostgreSQL max_connections
- Use connection pooler like PgBouncer
Event Processing Configuration¶
Configure event stores and subscription managers for production workloads.
Production Event Store¶
from eventsource import PostgreSQLEventStore
event_store = PostgreSQLEventStore(
session_factory,
enable_tracing=True, # OpenTelemetry tracing
auto_detect_uuid=True, # Automatically parse UUID strings
)
Production Subscription Manager¶
from eventsource.subscriptions import (
SubscriptionManager,
SubscriptionConfig,
CheckpointStrategy,
HealthCheckConfig,
)
# Health check thresholds
health_config = HealthCheckConfig(
max_error_rate_per_minute=10.0, # Degraded at 10 errors/min
max_errors_warning=50, # Warning threshold
max_errors_critical=200, # Critical threshold
max_lag_events_warning=5000, # Warn if 5k events behind
max_lag_events_critical=50000, # Critical if 50k behind
)
# Create manager
manager = SubscriptionManager(
event_store=event_store,
event_bus=event_bus,
checkpoint_repo=checkpoint_repo,
shutdown_timeout=60.0, # 60s graceful shutdown
drain_timeout=30.0, # 30s to drain in-flight
health_check_config=health_config,
enable_tracing=True, # OpenTelemetry tracing
)
# Production subscription config
config = SubscriptionConfig(
start_from="checkpoint", # Resume from last position
batch_size=100, # Events per batch
max_in_flight=1000, # Concurrent processing limit
checkpoint_strategy=CheckpointStrategy.EVERY_BATCH,
processing_timeout=30.0, # Max time per event
continue_on_error=True, # Continue after DLQ
max_retries=5, # Retry attempts
circuit_breaker_enabled=True, # Enable circuit breaker
)
# Subscribe projections
await manager.subscribe(projection, config=config, name="orders")
Tuning Guidelines¶
Batch Size: - Small (10-50): Low latency, higher database load - Medium (50-200): Balanced (recommended) - Large (200-1000): High throughput, higher latency
Max In-Flight:
- Formula: max_in_flight = batch_size * 10
- Higher values = more concurrency, more memory
- Lower values = less memory, slower catch-up
Checkpoint Strategy:
- EVERY_EVENT: Maximum durability, slowest
- EVERY_BATCH: Recommended for production
- PERIODIC: Fastest, suitable for analytics
Health Check Endpoints¶
Implement Kubernetes-compatible health checks for production deployments.
FastAPI Health Endpoints¶
from fastapi import FastAPI, Response
from contextlib import asynccontextmanager
# Global manager reference
manager: SubscriptionManager | None = None
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Manage application lifecycle."""
global manager
# Startup
event_store = PostgreSQLEventStore(session_factory)
event_bus = InMemoryEventBus()
checkpoint_repo = PostgreSQLCheckpointRepository(session_factory)
manager = SubscriptionManager(
event_store=event_store,
event_bus=event_bus,
checkpoint_repo=checkpoint_repo,
shutdown_timeout=60.0,
drain_timeout=30.0,
)
# Subscribe projections
await manager.subscribe(order_projection, name="orders")
await manager.subscribe(customer_projection, name="customers")
# Start manager
await manager.start()
yield
# Shutdown
if manager:
await manager.stop()
app = FastAPI(lifespan=lifespan)
@app.get("/health")
async def health_check():
"""
Overall health check endpoint.
Returns 200 if healthy/degraded, 503 if unhealthy/critical.
"""
if not manager:
return Response(
content='{"status":"unhealthy","reason":"manager not initialized"}',
status_code=503,
media_type="application/json",
)
health = await manager.health_check()
# Return 503 for unhealthy or critical
status_code = 200 if health.status in ("healthy", "degraded") else 503
return Response(
content=health.model_dump_json(),
status_code=status_code,
media_type="application/json",
)
@app.get("/health/ready")
async def readiness_check():
"""
Readiness probe for Kubernetes.
Returns 200 if ready to receive traffic, 503 otherwise.
"""
if not manager:
return Response(
content='{"ready":false,"reason":"manager not initialized"}',
status_code=503,
media_type="application/json",
)
readiness = await manager.readiness_check()
status_code = 200 if readiness.ready else 503
return Response(
content=readiness.model_dump_json(),
status_code=status_code,
media_type="application/json",
)
@app.get("/health/live")
async def liveness_check():
"""
Liveness probe for Kubernetes.
Returns 200 if alive, 503 if needs restart.
"""
if not manager:
return Response(
content='{"alive":false,"reason":"manager not initialized"}',
status_code=503,
media_type="application/json",
)
liveness = await manager.liveness_check()
status_code = 200 if liveness.alive else 503
return Response(
content=liveness.model_dump_json(),
status_code=status_code,
media_type="application/json",
)
Kubernetes Deployment¶
apiVersion: apps/v1
kind: Deployment
metadata:
name: eventsource-app
spec:
replicas: 3
template:
spec:
containers:
- name: app
image: myapp:latest
ports:
- containerPort: 8000
# Liveness probe - restart if fails
livenessProbe:
httpGet:
path: /health/live
port: 8000
initialDelaySeconds: 30
periodSeconds: 10
timeoutSeconds: 5
failureThreshold: 3
# Readiness probe - stop sending traffic if fails
readinessProbe:
httpGet:
path: /health/ready
port: 8000
initialDelaySeconds: 10
periodSeconds: 5
timeoutSeconds: 3
failureThreshold: 2
Health Check Response Examples¶
Healthy:
{
"status": "healthy",
"running": true,
"subscription_count": 2,
"healthy_count": 2,
"degraded_count": 0,
"unhealthy_count": 0,
"total_events_processed": 125000,
"total_events_failed": 0,
"total_lag_events": 0,
"uptime_seconds": 3600.5,
"timestamp": "2024-01-15T12:00:00Z",
"subscriptions": [...]
}
Degraded:
{
"status": "degraded",
"running": true,
"subscription_count": 2,
"healthy_count": 1,
"degraded_count": 1,
"unhealthy_count": 0,
"total_events_processed": 125000,
"total_events_failed": 15,
"total_lag_events": 3000,
"uptime_seconds": 3600.5
}
Structured Logging¶
Production systems require structured logging for aggregation and analysis.
JSON Logging Configuration¶
import logging
import json
import sys
from datetime import datetime, UTC
class JSONFormatter(logging.Formatter):
"""Format log records as JSON for structured logging."""
def format(self, record: logging.LogRecord) -> str:
"""Format log record as JSON."""
log_data = {
"timestamp": datetime.now(UTC).isoformat(),
"level": record.levelname,
"logger": record.name,
"message": record.getMessage(),
}
# Add exception info if present
if record.exc_info:
log_data["exception"] = self.formatException(record.exc_info)
# Add custom fields from extra parameter
for key, value in record.__dict__.items():
if key not in {
"name", "msg", "args", "created", "filename", "funcName",
"levelname", "levelno", "lineno", "module", "msecs",
"message", "pathname", "process", "processName",
"relativeCreated", "thread", "threadName", "exc_info",
"exc_text", "stack_info",
}:
log_data[key] = value
return json.dumps(log_data)
def configure_production_logging(level: str = "INFO") -> logging.Logger:
"""
Configure production logging with JSON format.
Args:
level: Log level (DEBUG, INFO, WARNING, ERROR, CRITICAL)
Returns:
Configured root logger
"""
root_logger = logging.getLogger()
root_logger.setLevel(getattr(logging, level.upper()))
# Remove existing handlers
root_logger.handlers.clear()
# Add JSON handler
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(JSONFormatter())
root_logger.addHandler(handler)
# Reduce SQLAlchemy noise
logging.getLogger("sqlalchemy.engine").setLevel(logging.WARNING)
return root_logger
# Usage
logger = configure_production_logging("INFO")
# Log with structured data
logger.info(
"Order processed successfully",
extra={
"order_id": "550e8400-e29b-41d4-a716-446655440000",
"customer_id": "123",
"total": 99.99,
"duration_ms": 45.2,
}
)
Output:
{
"timestamp": "2024-01-15T12:00:00.123456Z",
"level": "INFO",
"logger": "__main__",
"message": "Order processed successfully",
"order_id": "550e8400-e29b-41d4-a716-446655440000",
"customer_id": "123",
"total": 99.99,
"duration_ms": 45.2
}
Logging Best Practices¶
DO:
- Use structured logging with JSON format
- Include correlation IDs for request tracing
- Log at appropriate levels (INFO for events, ERROR for failures)
- Include context (aggregate_id, event_type, etc.)
- Use extra={} for structured fields
DON'T: - Log sensitive data (passwords, credit cards, PII) - Log at DEBUG level in production - Use string concatenation for log messages - Log the same event multiple times
OpenTelemetry Tracing¶
Distributed tracing helps debug issues in production systems.
Tracing Configuration¶
from opentelemetry import trace
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.resources import Resource, SERVICE_NAME, SERVICE_VERSION
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
def configure_tracing(
service_name: str,
service_version: str = "1.0.0",
otlp_endpoint: str = "localhost:4317",
) -> TracerProvider:
"""
Configure OpenTelemetry tracing.
Args:
service_name: Name of the service
service_version: Version of the service
otlp_endpoint: OTLP collector endpoint
Returns:
Configured tracer provider
"""
# Create resource with service metadata
resource = Resource.create({
SERVICE_NAME: service_name,
SERVICE_VERSION: service_version,
})
# Create tracer provider
provider = TracerProvider(resource=resource)
# Configure OTLP exporter
exporter = OTLPSpanExporter(
endpoint=otlp_endpoint,
insecure=True, # Use TLS in production
)
# Add batch span processor
processor = BatchSpanProcessor(exporter)
provider.add_span_processor(processor)
# Set as global provider
trace.set_tracer_provider(provider)
return provider
# Usage
import os
configure_tracing(
service_name=os.getenv("SERVICE_NAME", "eventsource-app"),
service_version=os.getenv("SERVICE_VERSION", "1.0.0"),
otlp_endpoint=os.getenv("OTLP_ENDPOINT", "localhost:4317"),
)
# Enable tracing in components
event_store = PostgreSQLEventStore(
session_factory,
enable_tracing=True, # Traces database operations
)
manager = SubscriptionManager(
event_store=event_store,
event_bus=event_bus,
checkpoint_repo=checkpoint_repo,
enable_tracing=True, # Traces subscription operations
)
repo = AggregateRepository(
event_store=event_store,
aggregate_factory=OrderAggregate,
aggregate_type="Order",
enable_tracing=True, # Traces aggregate operations
)
Trace Attributes¶
eventsource-py automatically adds these attributes to spans:
event.id: Event UUIDevent.type: Event type nameaggregate.id: Aggregate UUIDaggregate.type: Aggregate type namesubscription.name: Subscription namesubscription.state: Current statedb.system: Database type (postgresql)db.operation: Database operation (select, insert, etc.)
Graceful Shutdown¶
Production systems must handle shutdown signals gracefully.
Signal Handling with SubscriptionManager¶
import asyncio
import signal
import logging
from typing import NoReturn
logger = logging.getLogger(__name__)
async def run_production_service() -> NoReturn:
"""
Run production service with graceful shutdown.
Handles SIGTERM and SIGINT signals for graceful shutdown.
"""
# Create infrastructure
engine = create_production_engine(
os.getenv("DATABASE_URL", "postgresql+asyncpg://..."),
)
session_factory = async_sessionmaker(engine, expire_on_commit=False)
event_store = PostgreSQLEventStore(session_factory, enable_tracing=True)
event_bus = InMemoryEventBus()
checkpoint_repo = PostgreSQLCheckpointRepository(session_factory)
manager = SubscriptionManager(
event_store=event_store,
event_bus=event_bus,
checkpoint_repo=checkpoint_repo,
shutdown_timeout=60.0, # Max shutdown time
drain_timeout=30.0, # Max drain time
)
# Subscribe projections
await manager.subscribe(order_projection, name="orders")
await manager.subscribe(customer_projection, name="customers")
logger.info("Starting subscription manager")
# Run until signal received
result = await manager.run_until_shutdown()
# Log shutdown details
logger.info(
"Shutdown complete",
extra={
"phase": result.phase.value,
"duration_seconds": result.duration_seconds,
"forced": result.forced,
"events_in_flight": result.events_in_flight,
"checkpoints_saved": result.checkpoints_saved,
}
)
# Cleanup
await engine.dispose()
if __name__ == "__main__":
# Configure logging
configure_production_logging("INFO")
# Run service
asyncio.run(run_production_service())
Shutdown Sequence¶
- Signal received (SIGTERM or SIGINT)
- Stop accepting new events - Coordinators stop reading
- Drain in-flight events - Wait up to
drain_timeoutfor processing to complete - Save checkpoints - Persist final positions
- Cleanup resources - Close database connections
Manual Shutdown Trigger¶
# Programmatically request shutdown
manager.request_shutdown()
# Check shutdown status
if manager.is_shutting_down:
logger.info(f"Shutdown phase: {manager.shutdown_phase.value}")
Error Monitoring¶
Production systems require comprehensive error monitoring and alerting.
Error Callbacks¶
from eventsource.subscriptions import ErrorInfo, ErrorSeverity
async def log_all_errors(error_info: ErrorInfo) -> None:
"""Log all errors with structured data."""
logger.error(
"Subscription error",
extra={
"subscription": error_info.subscription_name,
"event_id": str(error_info.event_id),
"event_type": error_info.event_type,
"error_message": error_info.error_message,
"severity": error_info.classification.severity.value,
"category": error_info.classification.category.value,
"retry_count": error_info.retry_count,
}
)
async def alert_critical_errors(error_info: ErrorInfo) -> None:
"""Send alerts for critical errors."""
logger.critical(
f"CRITICAL ERROR in {error_info.subscription_name}",
extra={
"event_id": str(error_info.event_id),
"error_message": error_info.error_message,
}
)
# Integration points for alerting
# await send_pagerduty_alert(error_info)
# await send_slack_notification(error_info)
# await send_opsgenie_alert(error_info)
# Register error callbacks
manager.on_error(log_all_errors)
manager.on_error_severity(ErrorSeverity.CRITICAL, alert_critical_errors)
Error Statistics¶
# Get error statistics
stats = manager.get_error_stats()
logger.info(
"Error statistics",
extra={
"total_errors": stats["total_errors"],
"total_dlq_count": stats["total_dlq_count"],
"subscriptions": stats["subscriptions"],
}
)
Scaling Considerations¶
Horizontal Scaling¶
Run multiple instances with partitioned event processing:
import hashlib
import os
def get_partition_for_aggregate(
aggregate_id: str,
num_partitions: int,
) -> int:
"""Use consistent hashing to assign aggregates to partitions."""
hash_value = int(hashlib.sha256(aggregate_id.encode()).hexdigest(), 16)
return hash_value % num_partitions
class PartitionedProjection:
"""Projection that processes only its assigned partition."""
def __init__(self, partition_id: int, num_partitions: int):
self.partition_id = partition_id
self.num_partitions = num_partitions
def should_handle(self, aggregate_id: str) -> bool:
"""Check if this instance should handle the aggregate."""
partition = get_partition_for_aggregate(aggregate_id, self.num_partitions)
return partition == self.partition_id
async def handle(self, event: DomainEvent) -> None:
"""Handle event if it belongs to this partition."""
if self.should_handle(str(event.aggregate_id)):
await self._process_event(event)
async def _process_event(self, event: DomainEvent) -> None:
"""Process the event."""
# Implementation here
pass
# Configure from environment
partition_id = int(os.getenv("PARTITION_ID", "0"))
num_partitions = int(os.getenv("NUM_PARTITIONS", "1"))
projection = PartitionedProjection(partition_id, num_partitions)
Connection Pool Scaling¶
# Calculate pool size based on instance count
instance_count = int(os.getenv("INSTANCE_COUNT", "1"))
max_db_connections = int(os.getenv("MAX_DB_CONNECTIONS", "100"))
# Reserve connections for admin
usable = int(max_db_connections * 0.9)
per_instance = usable // instance_count
pool_size = max(2, per_instance // 3)
max_overflow = per_instance - pool_size
logger.info(
"Connection pool configuration",
extra={
"instance_count": instance_count,
"pool_size": pool_size,
"max_overflow": max_overflow,
"total_per_instance": pool_size + max_overflow,
}
)
Security Best Practices¶
Database Connection Security¶
def create_secure_engine(
host: str,
port: int,
database: str,
user: str,
password: str,
ssl_mode: str = "require",
ssl_root_cert: str | None = None,
):
"""Create database engine with SSL/TLS."""
url = f"postgresql+asyncpg://{user}:{password}@{host}:{port}/{database}"
connect_args = {"ssl": ssl_mode}
if ssl_root_cert:
connect_args["ssl"] = {"ca": ssl_root_cert}
return create_async_engine(
url,
connect_args=connect_args,
pool_pre_ping=True,
)
Secrets Management¶
import os
# DO: Use environment variables
database_url = os.getenv("DATABASE_URL")
# DO: Use secrets management systems
# - Kubernetes Secrets
# - AWS Secrets Manager
# - HashiCorp Vault
# - Azure Key Vault
# DON'T: Hard-code credentials
# database_url = "postgresql://user:password@localhost/db" # WRONG!
Complete Production Example¶
"""
Tutorial 15: Production Deployment
Production-ready event-sourced application with:
- Connection pooling
- Health checks
- Structured logging
- Graceful shutdown
- Error monitoring
Run with: python tutorial_15_production.py
"""
import asyncio
import logging
import os
import json
import sys
from datetime import datetime, UTC
from uuid import UUID, uuid4
from pydantic import BaseModel
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker
from eventsource import (
AggregateRepository,
AggregateRoot,
DomainEvent,
InMemoryEventBus,
PostgreSQLEventStore,
PostgreSQLCheckpointRepository,
register_event,
)
from eventsource.subscriptions import (
CheckpointStrategy,
ErrorInfo,
ErrorSeverity,
HealthCheckConfig,
SubscriptionConfig,
SubscriptionManager,
)
# =============================================================================
# Logging Configuration
# =============================================================================
class JSONFormatter(logging.Formatter):
"""Format logs as JSON for production."""
def format(self, record: logging.LogRecord) -> str:
log_data = {
"timestamp": datetime.now(UTC).isoformat(),
"level": record.levelname,
"logger": record.name,
"message": record.getMessage(),
}
if record.exc_info:
log_data["exception"] = self.formatException(record.exc_info)
# Add extra fields
for key, value in record.__dict__.items():
if key not in {
"name", "msg", "args", "created", "filename", "funcName",
"levelname", "levelno", "lineno", "module", "msecs",
"message", "pathname", "process", "processName",
"relativeCreated", "thread", "threadName", "exc_info",
"exc_text", "stack_info",
}:
log_data[key] = value
return json.dumps(log_data)
def configure_logging(level: str = "INFO") -> logging.Logger:
"""Configure production logging."""
root_logger = logging.getLogger()
root_logger.setLevel(getattr(logging, level.upper()))
root_logger.handlers.clear()
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(JSONFormatter())
root_logger.addHandler(handler)
logging.getLogger("sqlalchemy.engine").setLevel(logging.WARNING)
return root_logger
# =============================================================================
# Application Configuration
# =============================================================================
class AppConfig:
"""Production application configuration."""
def __init__(self):
self.database_url = os.getenv(
"DATABASE_URL",
"postgresql+asyncpg://postgres:postgres@localhost:5432/eventsource_prod"
)
# Connection pool sizing
instance_count = int(os.getenv("INSTANCE_COUNT", "1"))
max_db_connections = int(os.getenv("MAX_DB_CONNECTIONS", "100"))
usable = int(max_db_connections * 0.9)
per_instance = usable // instance_count
self.pool_size = max(2, per_instance // 3)
self.max_overflow = per_instance - self.pool_size
# Timeouts
self.shutdown_timeout = float(os.getenv("SHUTDOWN_TIMEOUT", "60"))
self.drain_timeout = float(os.getenv("DRAIN_TIMEOUT", "30"))
# Logging
self.log_level = os.getenv("LOG_LEVEL", "INFO")
# =============================================================================
# Domain Model
# =============================================================================
@register_event
class OrderCreated(DomainEvent):
event_type: str = "OrderCreated"
aggregate_type: str = "Order"
customer_id: str
total: float
@register_event
class OrderShipped(DomainEvent):
event_type: str = "OrderShipped"
aggregate_type: str = "Order"
tracking_number: str
class OrderState(BaseModel):
order_id: UUID
customer_id: str | None = None
total: float = 0.0
status: str = "draft"
tracking_number: str | None = None
class OrderAggregate(AggregateRoot[OrderState]):
aggregate_type = "Order"
def _get_initial_state(self) -> OrderState:
return OrderState(order_id=self.aggregate_id)
def _apply(self, event: DomainEvent) -> None:
if isinstance(event, OrderCreated):
self._state = OrderState(
order_id=self.aggregate_id,
customer_id=event.customer_id,
total=event.total,
status="created",
)
elif isinstance(event, OrderShipped):
if self._state:
self._state = self._state.model_copy(
update={
"status": "shipped",
"tracking_number": event.tracking_number,
}
)
def create(self, customer_id: str, total: float) -> None:
"""Create a new order."""
if self.version > 0:
raise ValueError("Order already exists")
self.apply_event(
OrderCreated(
aggregate_id=self.aggregate_id,
customer_id=customer_id,
total=total,
aggregate_version=self.get_next_version(),
)
)
def ship(self, tracking_number: str) -> None:
"""Ship the order."""
if not self.state or self.state.status != "created":
raise ValueError("Cannot ship order in current state")
self.apply_event(
OrderShipped(
aggregate_id=self.aggregate_id,
tracking_number=tracking_number,
aggregate_version=self.get_next_version(),
)
)
# =============================================================================
# Error Monitoring
# =============================================================================
async def log_all_errors(error_info: ErrorInfo) -> None:
"""Log all subscription errors."""
logger = logging.getLogger("eventsource.errors")
logger.error(
"Subscription error",
extra={
"subscription": error_info.subscription_name,
"event_id": str(error_info.event_id),
"error_message": error_info.error_message,
"severity": error_info.classification.severity.value,
}
)
async def alert_critical_errors(error_info: ErrorInfo) -> None:
"""Alert on critical errors."""
logger = logging.getLogger("eventsource.alerts")
logger.critical(
f"CRITICAL: {error_info.error_message}",
extra={"subscription": error_info.subscription_name}
)
# =============================================================================
# Production Application
# =============================================================================
class ProductionApp:
"""Production application with full production features."""
def __init__(self, config: AppConfig):
self.config = config
self.logger = configure_logging(config.log_level)
self.engine = None
self.manager = None
self.repo = None
async def initialize(self) -> None:
"""Initialize application infrastructure."""
self.logger.info("Initializing application")
# Create database engine
self.engine = create_async_engine(
self.config.database_url,
pool_size=self.config.pool_size,
max_overflow=self.config.max_overflow,
pool_timeout=30.0,
pool_recycle=1800,
pool_pre_ping=True,
)
self.logger.info(
"Database engine created",
extra={
"pool_size": self.config.pool_size,
"max_overflow": self.config.max_overflow,
}
)
session_factory = async_sessionmaker(
self.engine,
expire_on_commit=False,
)
# Create infrastructure
event_store = PostgreSQLEventStore(session_factory, enable_tracing=False)
event_bus = InMemoryEventBus()
checkpoint_repo = PostgreSQLCheckpointRepository(session_factory)
# Health check config
health_config = HealthCheckConfig(
max_error_rate_per_minute=10.0,
max_errors_warning=50,
max_errors_critical=200,
max_lag_events_warning=5000,
max_lag_events_critical=50000,
)
# Create subscription manager
self.manager = SubscriptionManager(
event_store=event_store,
event_bus=event_bus,
checkpoint_repo=checkpoint_repo,
shutdown_timeout=self.config.shutdown_timeout,
drain_timeout=self.config.drain_timeout,
health_check_config=health_config,
enable_tracing=False,
)
# Register error callbacks
self.manager.on_error(log_all_errors)
self.manager.on_error_severity(ErrorSeverity.CRITICAL, alert_critical_errors)
# Create repository
self.repo = AggregateRepository(
event_store=event_store,
aggregate_factory=OrderAggregate,
aggregate_type="Order",
event_publisher=event_bus,
enable_tracing=False,
)
self.logger.info("Application initialized successfully")
async def start(self) -> None:
"""Start the application."""
self.logger.info("Starting application")
await self.manager.start()
self.logger.info("Application started")
async def stop(self) -> None:
"""Stop the application gracefully."""
self.logger.info("Stopping application")
if self.manager:
result = await self.manager.stop()
self.logger.info(
"Manager stopped",
extra={
"phase": result.phase.value,
"duration_seconds": result.duration_seconds,
"forced": result.forced,
}
)
if self.engine:
await self.engine.dispose()
self.logger.info("Application stopped")
async def health_check(self) -> dict:
"""Get application health status."""
if not self.manager:
return {"status": "unhealthy", "reason": "manager not initialized"}
health = await self.manager.health_check()
return health.to_dict()
# =============================================================================
# Main Entry Point
# =============================================================================
async def main():
"""Run production application."""
config = AppConfig()
app = ProductionApp(config)
try:
# Initialize
await app.initialize()
await app.start()
# Demo: Create an order
order_id = uuid4()
order = app.repo.create_new(order_id)
order.create(customer_id="cust-prod-001", total=499.99)
await app.repo.save(order)
app.logger.info(
"Order created",
extra={"order_id": str(order_id), "total": 499.99}
)
# Check health
health = await app.health_check()
app.logger.info(
"Health check",
extra={"status": health["status"]}
)
# Run for a bit
app.logger.info("Application running. Press Ctrl+C to stop.")
await asyncio.sleep(5)
finally:
await app.stop()
if __name__ == "__main__":
asyncio.run(main())
Key Takeaways¶
- Connection pools must be sized correctly: Calculate based on instance count and database limits
- Health checks are essential: Implement liveness and readiness for Kubernetes
- Structured logging enables observability: Use JSON format for log aggregation
- Graceful shutdown prevents data loss: Handle signals and drain in-flight events
- Error monitoring catches issues early: Register callbacks for critical errors
- OpenTelemetry provides visibility: Enable tracing for distributed debugging
- Security is non-negotiable: Use SSL/TLS, secrets management, and proper authentication
- Tune for your workload: Adjust batch sizes, checkpoint strategies, and pool sizes based on metrics
Next Steps¶
Continue to Tutorial 16: Multi-Tenancy to learn about: - Tenant isolation patterns - Tenant-scoped event stores - Multi-tenant projections - Tenant migration strategies
For production deployment examples, see:
- Examples: examples/projection_example.py
- Tests: tests/integration/subscriptions/test_resilience.py
- Documentation: Production deployment guides