Error Handling Guide¶
This guide covers all exception types in eventsource and provides patterns for handling and recovering from errors in your event-sourced applications.
Overview¶
Eventsource provides a structured exception hierarchy that helps you handle different types of failures appropriately. The library distinguishes between:
- Recoverable errors: Concurrency conflicts that can be resolved with retry logic
- Not-found errors: Missing resources that may require user notification or fallback logic
- Permanent failures: Bugs or data corruption that require investigation
- Transient failures: Temporary issues that resolve with retries
Error Handling Philosophy¶
- Fail fast with clear information: Exceptions include context (IDs, versions) for debugging
- Enable recovery: Errors like
OptimisticLockErrorprovide enough information to retry intelligently - Preserve data: The dead letter queue (DLQ) ensures no events are silently lost
- Isolation: Failures in one component (e.g., a projection) don't crash the entire system
Exception Hierarchy¶
All eventsource exceptions inherit from EventSourceError, enabling both specific and broad exception handling:
EventSourceError (base)
|
+-- OptimisticLockError # Version conflicts during concurrent writes
+-- AggregateNotFoundError # Aggregate does not exist
+-- EventNotFoundError # Specific event not found
+-- ProjectionError # Projection failed to process event
+-- EventStoreError # Event store operation failed
+-- EventBusError # Event bus operation failed
+-- CheckpointError # Checkpoint operation failed
+-- SerializationError # Event serialization/deserialization failed
Importing Exceptions¶
from eventsource.exceptions import (
EventSourceError,
OptimisticLockError,
AggregateNotFoundError,
EventNotFoundError,
ProjectionError,
EventStoreError,
EventBusError,
CheckpointError,
SerializationError,
)
OptimisticLockError¶
Raised when there is a version conflict during event append. This is the most common exception you will handle in production.
When It Happens¶
- Two processes load the same aggregate simultaneously
- Both attempt to append events at the same expected version
- The second append fails because the version has changed
Process A: Load Order (v5) --> Process --> Save (expect v5) --> Success (now v6)
Process B: Load Order (v5) --> Process --> Save (expect v5) --> CONFLICT!
Exception Details¶
class OptimisticLockError(EventSourceError):
aggregate_id: UUID # Which aggregate had the conflict
expected_version: int # Version the code expected
actual_version: int # Actual version in the store
Handling Pattern: Retry with Reload¶
The standard pattern is to catch the error, reload the aggregate with fresh state, and retry:
from uuid import UUID
from eventsource.exceptions import OptimisticLockError
from eventsource.aggregates import AggregateRepository
async def ship_order_with_retry(
repo: AggregateRepository,
order_id: UUID,
tracking_number: str,
max_retries: int = 3,
) -> None:
"""Ship an order with automatic retry on conflict."""
for attempt in range(max_retries):
try:
# Load the current state
order = await repo.load(order_id)
# Execute business logic
order.ship(tracking_number)
# Persist changes
await repo.save(order)
return # Success
except OptimisticLockError as e:
if attempt == max_retries - 1:
# Exhausted retries - log and re-raise
logger.error(
"Failed to ship order after %d retries",
max_retries,
extra={
"order_id": str(e.aggregate_id),
"expected_version": e.expected_version,
"actual_version": e.actual_version,
},
)
raise
# Log the conflict and retry
logger.warning(
"Version conflict on order %s (expected v%d, actual v%d), retrying...",
e.aggregate_id,
e.expected_version,
e.actual_version,
)
# Loop continues with fresh load
Generic Retry Helper¶
For reusable retry logic across commands:
from typing import Callable, TypeVar
from eventsource.aggregates import Aggregate, AggregateRepository
from eventsource.exceptions import OptimisticLockError
TAggregate = TypeVar("TAggregate", bound=Aggregate)
async def execute_with_retry(
repo: AggregateRepository[TAggregate],
aggregate_id: UUID,
command: Callable[[TAggregate], None],
max_retries: int = 3,
) -> TAggregate:
"""
Execute a command with automatic retry on version conflict.
Args:
repo: The aggregate repository
aggregate_id: ID of the aggregate to operate on
command: A callable that modifies the aggregate
max_retries: Maximum retry attempts
Returns:
The aggregate after successful save
Raises:
OptimisticLockError: If max retries exceeded
"""
for attempt in range(max_retries):
try:
aggregate = await repo.load(aggregate_id)
command(aggregate) # Execute the business logic
await repo.save(aggregate)
return aggregate
except OptimisticLockError:
if attempt == max_retries - 1:
raise
raise RuntimeError("Unreachable") # For type checker
# Usage
await execute_with_retry(
repo=order_repo,
aggregate_id=order_id,
command=lambda order: order.ship("TRACK-123"),
max_retries=3,
)
Best Practices for Concurrency¶
- Always implement retry logic for aggregate save operations
- Keep aggregates small to reduce conflict probability
- Log conflicts for monitoring contention patterns
- Consider command deduplication if retries might cause duplicate side effects
- Use exponential backoff for high-contention scenarios
See ADR-0003: Optimistic Locking for the design rationale.
AggregateNotFoundError¶
Raised when attempting to load an aggregate that does not exist.
When It Happens¶
repo.load(aggregate_id)with a non-existent ID- User provides an invalid or stale reference
Exception Details¶
class AggregateNotFoundError(EventSourceError):
aggregate_id: UUID # The requested ID
aggregate_type: str | None # Type of aggregate (if known)
Handling Pattern: User-Friendly Response¶
from eventsource.exceptions import AggregateNotFoundError
async def get_order(order_id: UUID) -> Order:
"""Get an order, raising HTTP 404 if not found."""
try:
return await order_repo.load(order_id)
except AggregateNotFoundError:
raise HTTPException(
status_code=404,
detail=f"Order {order_id} not found",
)
Alternative: Load or Create¶
For upsert patterns where you want to create if not exists:
from eventsource.exceptions import AggregateNotFoundError
async def ensure_cart_exists(cart_id: UUID, customer_id: UUID) -> ShoppingCart:
"""Get existing cart or create a new one."""
cart = await cart_repo.load_or_create(cart_id)
if cart.version == 0:
# New cart - initialize it
cart.create(customer_id=customer_id)
await cart_repo.save(cart)
return cart
Best Practices¶
- Don't catch broadly: Only catch when you have a specific recovery strategy
- Validate early: Check existence before complex operations if appropriate
- Use load_or_create: For initialization patterns instead of try/except
EventNotFoundError¶
Raised when a specific event cannot be found by its ID.
When It Happens¶
- Querying for an event by ID that does not exist
- Referencing an event ID that was never persisted
Exception Details¶
Handling Pattern¶
from eventsource.exceptions import EventNotFoundError
async def get_event_details(event_id: UUID) -> EventDetails:
"""Get details of a specific event."""
try:
event = await event_store.get_event(event_id)
return EventDetails.from_event(event)
except EventNotFoundError:
raise HTTPException(
status_code=404,
detail=f"Event {event_id} not found",
)
ProjectionError¶
Raised when a projection fails to process an event after exhausting retry attempts.
When It Happens¶
- Bug in projection code (e.g., unhandled event type)
- Database constraint violation in read model
- Missing required external data
- After exhausting all retry attempts
Exception Details¶
class ProjectionError(EventSourceError):
projection_name: str # Name of the failed projection
event_id: UUID # ID of the event that caused failure
Automatic Handling via DLQ¶
Projections extending CheckpointTrackingProjection automatically handle errors with retry and DLQ:
from eventsource.projections import DeclarativeProjection, handles
class OrderProjection(DeclarativeProjection):
"""Projection that automatically handles errors."""
# Configure retry behavior
MAX_RETRIES = 3 # Retry 3 times before DLQ
RETRY_BACKOFF_BASE = 2 # Exponential backoff: 1s, 2s, 4s
@handles(OrderCreated)
async def _on_order_created(self, event: OrderCreated) -> None:
# If this fails, automatic retry with backoff
# After MAX_RETRIES failures, event goes to DLQ
await self._db.execute(
"INSERT INTO orders ...",
...
)
Custom Error Handling¶
For additional error handling beyond automatic DLQ:
from eventsource.projections import DeclarativeProjection
from eventsource.events import DomainEvent
class RobustProjection(DeclarativeProjection):
"""Projection with custom error handling."""
async def handle(self, event: DomainEvent) -> None:
"""Override handle to add custom behavior."""
try:
await super().handle(event)
except Exception as e:
# Custom handling (in addition to DLQ)
await self._send_alert(
f"Projection failed on {event.event_id}: {e}"
)
# Re-raise to let parent handle DLQ
raise
Monitoring the DLQ¶
Query the DLQ to monitor and resolve failures:
from eventsource.repositories import PostgreSQLDLQRepository
async def check_dlq_health(dlq_repo: PostgreSQLDLQRepository) -> None:
"""Check DLQ for unresolved failures."""
stats = await dlq_repo.get_failure_stats()
if stats["failed_count"] > 0:
logger.warning(
"DLQ has %d unresolved failures",
stats["failed_count"],
extra=stats,
)
# Get recent failures for investigation
failures = await dlq_repo.get_failed_events(
status="failed",
limit=10,
)
for failure in failures:
logger.error(
"Failed event: %s in %s: %s",
failure["event_id"],
failure["projection_name"],
failure["error_message"],
)
Resolving DLQ Entries¶
After fixing the underlying issue:
async def resolve_dlq_entry(
dlq_repo: PostgreSQLDLQRepository,
dlq_id: int,
resolved_by: UUID,
) -> None:
"""Mark a DLQ entry as resolved after manual intervention."""
await dlq_repo.mark_resolved(dlq_id, resolved_by)
logger.info("Resolved DLQ entry %d by %s", dlq_id, resolved_by)
See ADR-0004: Projection Error Handling for the design rationale.
SerializationError¶
Raised when event serialization or deserialization fails.
When It Happens¶
- Unknown event type during deserialization
- Event data does not match expected schema
- Missing required fields in event payload
Exception Details¶
Handling Pattern¶
from eventsource.exceptions import SerializationError
async def replay_aggregate_events(aggregate_id: UUID) -> None:
"""Safely replay events, handling serialization errors."""
try:
events = await event_store.get_events(aggregate_id)
for event in events:
process_event(event)
except SerializationError as e:
logger.error(
"Failed to deserialize event type %s for aggregate %s",
e.event_type,
aggregate_id,
exc_info=True,
)
# Consider: skip, use fallback, or fail entirely
raise
Prevention¶
- Register all event types before deserialization:
from eventsource import register_event
@register_event
class OrderCreated(DomainEvent):
event_type: str = "OrderCreated"
# ...
# Ensure all events are imported at startup
from myapp.events import orders, payments, shipping
- Use event versioning for schema evolution:
@register_event
class OrderCreatedV2(DomainEvent):
event_type: str = "OrderCreated"
version: int = 2 # Schema version
# New fields with defaults for backward compatibility
currency: str = "USD"
- Validate events on write to catch issues early:
# Pydantic validation happens automatically
order_created = OrderCreated(
aggregate_id=order_id,
customer_id=customer_id,
# Missing required fields raise ValidationError
)
EventStoreError¶
Raised for general errors in event store operations.
When It Happens¶
- Database connection failure
- Query execution error
- Transaction failure
- Storage layer issues
Handling Pattern¶
from eventsource.exceptions import EventStoreError
async def robust_append(events: list[DomainEvent]) -> None:
"""Append events with error handling."""
try:
await event_store.append_events(
aggregate_id=aggregate_id,
aggregate_type="Order",
events=events,
expected_version=current_version,
)
except EventStoreError as e:
logger.error("Event store error: %s", e, exc_info=True)
# May be transient - consider retry with backoff
raise ServiceUnavailableError("Unable to save order, please retry")
Recovery Strategy¶
For transient store errors, implement exponential backoff:
import asyncio
from eventsource.exceptions import EventStoreError
async def append_with_backoff(
events: list[DomainEvent],
max_retries: int = 3,
) -> None:
"""Append events with exponential backoff for transient errors."""
for attempt in range(max_retries):
try:
await event_store.append_events(...)
return
except EventStoreError as e:
if attempt == max_retries - 1:
raise
backoff = 2 ** attempt # 1s, 2s, 4s
logger.warning(
"Event store error (attempt %d/%d), retrying in %ds: %s",
attempt + 1, max_retries, backoff, e,
)
await asyncio.sleep(backoff)
EventBusError¶
Raised for errors in event bus operations (publishing/subscribing).
When It Happens¶
- Message queue unavailable
- Subscriber registration failure
- Event delivery failure
Handling Pattern¶
from eventsource.exceptions import EventBusError
async def publish_event_safely(event: DomainEvent) -> None:
"""Publish event with fallback handling."""
try:
await event_bus.publish(event)
except EventBusError as e:
logger.error("Event bus error: %s", e, exc_info=True)
# Fallback: queue for later retry
await failed_event_queue.add(event)
CheckpointError¶
Raised for errors in checkpoint operations.
When It Happens¶
- Checkpoint storage unavailable
- Checkpoint read/write failure
- Checkpoint data corruption
Handling Pattern¶
from eventsource.exceptions import CheckpointError
async def get_projection_position(projection_name: str) -> UUID | None:
"""Get projection checkpoint with error handling."""
try:
return await checkpoint_repo.get_checkpoint(projection_name)
except CheckpointError as e:
logger.error(
"Checkpoint error for %s: %s",
projection_name, e,
exc_info=True,
)
# Consider: return None to replay from beginning, or raise
raise
Pydantic Validation Errors¶
Events use Pydantic for validation. Invalid event data raises pydantic.ValidationError.
When It Happens¶
- Missing required fields when creating events
- Invalid field types
- Failed field validators
Handling Pattern¶
from pydantic import ValidationError
def create_order_event(data: dict) -> OrderCreated:
"""Create event with validation error handling."""
try:
return OrderCreated(
aggregate_id=data["order_id"],
customer_id=data["customer_id"],
total_amount=data["total"],
)
except ValidationError as e:
logger.warning("Invalid order data: %s", e.errors())
raise HTTPException(
status_code=422,
detail=e.errors(),
)
Prevention¶
- Define clear schemas with appropriate types and constraints
- Use Optional fields with defaults for backward compatibility
- Add custom validators for complex business rules:
from pydantic import field_validator
@register_event
class OrderCreated(DomainEvent):
event_type: str = "OrderCreated"
aggregate_type: str = "Order"
total_amount: float
@field_validator("total_amount")
@classmethod
def amount_must_be_positive(cls, v: float) -> float:
if v <= 0:
raise ValueError("Total amount must be positive")
return v
Best Practices¶
1. Catch Specific Exceptions¶
Handle specific exceptions for appropriate recovery:
# Good - specific handling
try:
await repo.save(aggregate)
except OptimisticLockError:
# Reload and retry
pass
except AggregateNotFoundError:
# Create new aggregate
pass
# Avoid - too broad
try:
await repo.save(aggregate)
except EventSourceError: # Catches everything, loses context
pass
2. Log with Context¶
Include relevant context for debugging:
except OptimisticLockError as e:
logger.error(
"Version conflict saving order",
extra={
"aggregate_id": str(e.aggregate_id),
"expected_version": e.expected_version,
"actual_version": e.actual_version,
"user_id": str(current_user.id),
"command": "ship_order",
},
)
3. Don't Swallow Errors¶
Always log or handle errors meaningfully:
# Bad - silently ignores errors
except ProjectionError:
pass
# Good - log and decide on action
except ProjectionError as e:
logger.warning(
"Projection %s failed on %s: %s",
e.projection_name,
e.event_id,
str(e),
)
# Event is in DLQ; continue processing
4. Use Error Boundaries¶
Isolate errors to prevent cascade failures:
async def process_event_batch(events: list[DomainEvent]) -> None:
"""Process events with error isolation."""
for event in events:
try:
await process_event(event)
except Exception as e:
logger.error("Error processing %s: %s", event.event_id, e)
# Continue with next event; don't fail entire batch
continue
5. Implement Circuit Breakers for External Services¶
For projections that depend on external services:
from datetime import datetime, timedelta
class CircuitBreaker:
def __init__(self, failure_threshold: int = 5, reset_timeout: int = 60):
self.failures = 0
self.threshold = failure_threshold
self.reset_timeout = reset_timeout
self.last_failure: datetime | None = None
self.is_open = False
def record_failure(self) -> None:
self.failures += 1
self.last_failure = datetime.now()
if self.failures >= self.threshold:
self.is_open = True
def record_success(self) -> None:
self.failures = 0
self.is_open = False
def should_allow_request(self) -> bool:
if not self.is_open:
return True
# Check if reset timeout has passed
if self.last_failure and datetime.now() - self.last_failure > timedelta(seconds=self.reset_timeout):
self.is_open = False # Half-open state
return True
return False
Error Recovery Strategies¶
Retry with Exponential Backoff¶
For transient failures:
import asyncio
import random
async def retry_with_backoff(
operation,
max_retries: int = 3,
base_delay: float = 1.0,
max_delay: float = 60.0,
jitter: bool = True,
):
"""Retry an operation with exponential backoff."""
for attempt in range(max_retries):
try:
return await operation()
except Exception as e:
if attempt == max_retries - 1:
raise
delay = min(base_delay * (2 ** attempt), max_delay)
if jitter:
delay *= (0.5 + random.random()) # 50-150% of delay
logger.warning(
"Attempt %d failed, retrying in %.1fs: %s",
attempt + 1, delay, e,
)
await asyncio.sleep(delay)
Graceful Degradation¶
When a component fails, provide reduced functionality:
async def get_order_with_stats(order_id: UUID) -> OrderResponse:
"""Get order with graceful degradation if stats unavailable."""
order = await order_repo.load(order_id)
try:
stats = await stats_service.get_order_stats(order_id)
except ServiceUnavailableError:
# Degrade gracefully - return order without stats
stats = None
logger.warning("Stats unavailable for order %s", order_id)
return OrderResponse(order=order, stats=stats)
Dead Letter Queue Processing¶
For manual intervention on failed events:
async def process_dlq(
dlq_repo: PostgreSQLDLQRepository,
projection: DeclarativeProjection,
operator_id: UUID,
) -> int:
"""Process DLQ entries after fixing underlying issues."""
resolved_count = 0
failed_events = await dlq_repo.get_failed_events(status="failed")
for entry in failed_events:
try:
# Mark as retrying
await dlq_repo.mark_retrying(entry["id"])
# Reconstruct and retry
event = reconstruct_event(entry["event_data"])
await projection.handle(event)
# Mark as resolved
await dlq_repo.mark_resolved(entry["id"], operator_id)
resolved_count += 1
except Exception as e:
logger.error(
"DLQ replay failed for %s: %s",
entry["event_id"], e,
)
# Entry remains in failed state
return resolved_count
Logging and Observability¶
Structured Logging¶
Use structured logging for machine-parseable error information:
import structlog
logger = structlog.get_logger()
except OptimisticLockError as e:
logger.error(
"version_conflict",
aggregate_id=str(e.aggregate_id),
expected_version=e.expected_version,
actual_version=e.actual_version,
error_type="OptimisticLockError",
)
Metrics to Track¶
Consider tracking these metrics for error observability:
| Metric | Description |
|---|---|
eventsource_lock_errors_total |
Count of OptimisticLockError by aggregate type |
eventsource_not_found_errors_total |
Count of AggregateNotFoundError |
eventsource_projection_errors_total |
Count of projection failures by projection name |
eventsource_dlq_size |
Current size of dead letter queue |
eventsource_retry_attempts |
Histogram of retry attempts before success |
Alerting Recommendations¶
| Error Type | Alert Threshold | Urgency |
|---|---|---|
| OptimisticLockError | High rate (>10/min) | Low - investigate contention |
| ProjectionError | Any in DLQ | Medium - investigate within 4h |
| SerializationError | Any occurrence | High - likely deployment issue |
| EventStoreError | Connection failures | High - database health |
See Also¶
- ADR-0003: Optimistic Locking - Concurrency control design
- ADR-0004: Projection Error Handling - DLQ and retry strategy
- Projections Guide - Building read models with error handling
- Events API - Event handling and exceptions