Snapshotting Example¶
This example demonstrates aggregate snapshotting for performance optimization in an event-sourced application.
Overview¶
We'll extend the basic order system to use snapshotting, showing:
- Configuring snapshot stores
- Automatic snapshot creation at thresholds
- Manual snapshot creation
- Schema versioning for state evolution
- Background snapshot mode
Complete Code¶
"""
Complete snapshotting example with an Order aggregate.
This example demonstrates:
1. Configuring snapshotting on a repository
2. Automatic snapshot creation at event thresholds
3. Manual snapshot creation
4. Loading aggregates from snapshots
5. Schema versioning for state evolution
"""
import asyncio
from datetime import datetime, UTC
from uuid import UUID, uuid4
from pydantic import BaseModel
from eventsource import (
AggregateRepository,
AggregateRoot,
DomainEvent,
InMemoryEventStore,
register_event,
)
from eventsource.snapshots import InMemorySnapshotStore, Snapshot
# =============================================================================
# Events
# =============================================================================
@register_event
class OrderCreated(DomainEvent):
"""Event: Order was created."""
event_type: str = "OrderCreated"
aggregate_type: str = "Order"
customer_id: UUID
customer_email: str
@register_event
class OrderItemAdded(DomainEvent):
"""Event: Item was added to order."""
event_type: str = "OrderItemAdded"
aggregate_type: str = "Order"
product_id: UUID
product_name: str
quantity: int
unit_price: float
@register_event
class OrderSubmitted(DomainEvent):
"""Event: Order was submitted for processing."""
event_type: str = "OrderSubmitted"
aggregate_type: str = "Order"
total_amount: float
@register_event
class OrderItemRemoved(DomainEvent):
"""Event: Item was removed from order."""
event_type: str = "OrderItemRemoved"
aggregate_type: str = "Order"
product_id: UUID
# =============================================================================
# State
# =============================================================================
class OrderItem(BaseModel):
"""Order line item."""
product_id: UUID
product_name: str
quantity: int
unit_price: float
@property
def total(self) -> float:
return self.quantity * self.unit_price
class OrderState(BaseModel):
"""State of an Order aggregate."""
order_id: UUID
customer_id: UUID | None = None
customer_email: str = ""
items: list[OrderItem] = []
status: str = "draft"
total_amount: float = 0.0
@property
def calculated_total(self) -> float:
return sum(item.total for item in self.items)
@property
def item_count(self) -> int:
return len(self.items)
# =============================================================================
# Aggregate
# =============================================================================
class OrderAggregate(AggregateRoot[OrderState]):
"""
Event-sourced Order aggregate with snapshotting support.
The schema_version attribute tracks compatibility between
the state model and stored snapshots.
"""
aggregate_type = "Order"
schema_version = 1 # Increment when OrderState structure changes
def _get_initial_state(self) -> OrderState:
return OrderState(order_id=self.aggregate_id)
def _apply(self, event: DomainEvent) -> None:
if isinstance(event, OrderCreated):
self._state = OrderState(
order_id=self.aggregate_id,
customer_id=event.customer_id,
customer_email=event.customer_email,
status="created",
)
elif isinstance(event, OrderItemAdded):
if self._state:
new_item = OrderItem(
product_id=event.product_id,
product_name=event.product_name,
quantity=event.quantity,
unit_price=event.unit_price,
)
self._state = self._state.model_copy(
update={"items": [*self._state.items, new_item]}
)
elif isinstance(event, OrderItemRemoved):
if self._state:
updated_items = [
item
for item in self._state.items
if item.product_id != event.product_id
]
self._state = self._state.model_copy(update={"items": updated_items})
elif isinstance(event, OrderSubmitted):
if self._state:
self._state = self._state.model_copy(
update={
"status": "submitted",
"total_amount": event.total_amount,
}
)
# Command methods
def create(self, customer_id: UUID, customer_email: str) -> None:
"""Create the order."""
if self.version > 0:
raise ValueError("Order already exists")
self.apply_event(
OrderCreated(
aggregate_id=self.aggregate_id,
customer_id=customer_id,
customer_email=customer_email,
aggregate_version=self.get_next_version(),
)
)
def add_item(
self,
product_id: UUID,
product_name: str,
quantity: int,
unit_price: float,
) -> None:
"""Add an item to the order."""
if not self.state or self.state.status != "created":
raise ValueError("Can only add items to created orders")
if quantity <= 0:
raise ValueError("Quantity must be positive")
self.apply_event(
OrderItemAdded(
aggregate_id=self.aggregate_id,
product_id=product_id,
product_name=product_name,
quantity=quantity,
unit_price=unit_price,
aggregate_version=self.get_next_version(),
)
)
def remove_item(self, product_id: UUID) -> None:
"""Remove an item from the order."""
if not self.state or self.state.status != "created":
raise ValueError("Can only remove items from created orders")
if not any(item.product_id == product_id for item in self.state.items):
raise ValueError("Item not found in order")
self.apply_event(
OrderItemRemoved(
aggregate_id=self.aggregate_id,
product_id=product_id,
aggregate_version=self.get_next_version(),
)
)
def submit(self) -> None:
"""Submit the order for processing."""
if not self.state or self.state.status != "created":
raise ValueError("Order must be in created status")
if not self.state.items:
raise ValueError("Cannot submit empty order")
self.apply_event(
OrderSubmitted(
aggregate_id=self.aggregate_id,
total_amount=self.state.calculated_total,
aggregate_version=self.get_next_version(),
)
)
# =============================================================================
# Examples
# =============================================================================
async def basic_snapshotting_example():
"""
Basic example: Enable snapshotting with automatic threshold.
Shows how to configure a repository with snapshot support and
let it automatically create snapshots at the configured threshold.
"""
print("\n" + "=" * 60)
print("BASIC SNAPSHOTTING EXAMPLE")
print("=" * 60)
# Create stores
event_store = InMemoryEventStore()
snapshot_store = InMemorySnapshotStore()
# Configure repository with snapshotting
repo = AggregateRepository(
event_store=event_store,
aggregate_factory=OrderAggregate,
aggregate_type="Order",
snapshot_store=snapshot_store,
snapshot_threshold=5, # Snapshot every 5 events (low for demo)
snapshot_mode="sync",
)
# Create order
order_id = uuid4()
customer_id = uuid4()
order = repo.create_new(order_id)
order.create(customer_id, "customer@example.com")
# Add items (reaching threshold at event 5)
products = [
("Widget A", 2, 19.99),
("Widget B", 1, 49.99),
("Widget C", 3, 9.99),
("Widget D", 1, 29.99),
]
for name, qty, price in products:
order.add_item(uuid4(), name, qty, price)
await repo.save(order)
# Check if snapshot was created
snapshot = await snapshot_store.get_snapshot(order_id, "Order")
print(f"Order version after save: {order.version}")
print(f"Snapshot exists: {snapshot is not None}")
if snapshot:
print(f"Snapshot version: {snapshot.version}")
print(f"Snapshot state keys: {list(snapshot.state.keys())}")
# Load order - will use snapshot
print("\nLoading order from snapshot...")
loaded = await repo.load(order_id)
print(f"Loaded order version: {loaded.version}")
print(f"Loaded order items: {loaded.state.item_count}")
print(f"Loaded order status: {loaded.state.status}")
async def manual_snapshot_example():
"""
Manual snapshot example: Create snapshots at specific points.
Shows how to use manual snapshot mode for precise control
over when snapshots are taken.
"""
print("\n" + "=" * 60)
print("MANUAL SNAPSHOT EXAMPLE")
print("=" * 60)
# Create stores
event_store = InMemoryEventStore()
snapshot_store = InMemorySnapshotStore()
# Configure repository with manual snapshot mode
repo = AggregateRepository(
event_store=event_store,
aggregate_factory=OrderAggregate,
aggregate_type="Order",
snapshot_store=snapshot_store,
snapshot_mode="manual", # No automatic snapshots
)
# Create and populate order
order_id = uuid4()
order = repo.create_new(order_id)
order.create(uuid4(), "customer@example.com")
for i in range(10):
order.add_item(uuid4(), f"Product {i}", 1, 10.0 + i)
await repo.save(order)
# No snapshot created automatically
snapshot = await snapshot_store.get_snapshot(order_id, "Order")
print(f"Snapshot after save: {snapshot is not None}")
# Manually create snapshot at a business milestone
print("\nCreating manual snapshot...")
snapshot = await repo.create_snapshot(order)
print(f"Manual snapshot created at version: {snapshot.version}")
# Submit order (another business milestone)
order.submit()
await repo.save(order)
# Create another snapshot after submission
snapshot = await repo.create_snapshot(order)
print(f"Post-submission snapshot at version: {snapshot.version}")
async def background_snapshot_example():
"""
Background snapshot example: Non-blocking snapshot creation.
Shows how to use background mode for high-throughput scenarios
where snapshot creation shouldn't block the save operation.
"""
print("\n" + "=" * 60)
print("BACKGROUND SNAPSHOT EXAMPLE")
print("=" * 60)
# Create stores
event_store = InMemoryEventStore()
snapshot_store = InMemorySnapshotStore()
# Configure repository with background snapshot mode
repo = AggregateRepository(
event_store=event_store,
aggregate_factory=OrderAggregate,
aggregate_type="Order",
snapshot_store=snapshot_store,
snapshot_threshold=5,
snapshot_mode="background", # Non-blocking snapshots
)
# Create order with enough events to trigger snapshot
order_id = uuid4()
order = repo.create_new(order_id)
order.create(uuid4(), "customer@example.com")
for i in range(4):
order.add_item(uuid4(), f"Product {i}", 1, 10.0)
# Save returns immediately, snapshot created in background
print("Saving order (snapshot will be created in background)...")
await repo.save(order)
print(f"Save completed. Pending snapshots: {repo.pending_snapshot_count}")
# Wait for background snapshots to complete
count = await repo.await_pending_snapshots()
print(f"Waited for {count} background snapshot(s)")
# Verify snapshot exists
snapshot = await snapshot_store.get_snapshot(order_id, "Order")
print(f"Snapshot exists: {snapshot is not None}")
async def schema_versioning_example():
"""
Schema versioning example: Handling state model evolution.
Shows how schema versioning invalidates incompatible snapshots
and triggers full event replay.
"""
print("\n" + "=" * 60)
print("SCHEMA VERSIONING EXAMPLE")
print("=" * 60)
# Create stores
event_store = InMemoryEventStore()
snapshot_store = InMemorySnapshotStore()
# Repository with schema_version=1
repo_v1 = AggregateRepository(
event_store=event_store,
aggregate_factory=OrderAggregate, # schema_version=1
aggregate_type="Order",
snapshot_store=snapshot_store,
snapshot_threshold=3,
)
# Create order and trigger snapshot
order_id = uuid4()
order = repo_v1.create_new(order_id)
order.create(uuid4(), "customer@example.com")
order.add_item(uuid4(), "Product", 1, 10.0)
order.add_item(uuid4(), "Product 2", 2, 20.0)
await repo_v1.save(order)
snapshot = await snapshot_store.get_snapshot(order_id, "Order")
print(f"Snapshot created with schema_version: {snapshot.schema_version}")
# Simulate schema version change by creating new aggregate class
class OrderAggregateV2(OrderAggregate):
schema_version = 2 # Incremented
# Repository with schema_version=2
repo_v2 = AggregateRepository(
event_store=event_store,
aggregate_factory=OrderAggregateV2,
aggregate_type="Order",
snapshot_store=snapshot_store,
snapshot_threshold=3,
)
# Load with v2 - snapshot will be invalidated
print("\nLoading with schema_version=2...")
loaded = await repo_v2.load(order_id)
print(f"Loaded successfully via full event replay")
print(f"Loaded version: {loaded.version}")
print(f"Loaded items: {loaded.state.item_count}")
async def performance_comparison_example():
"""
Performance comparison: Snapshot vs full replay.
Shows the performance difference between loading with
and without snapshots.
"""
print("\n" + "=" * 60)
print("PERFORMANCE COMPARISON EXAMPLE")
print("=" * 60)
import time
# Create stores
event_store = InMemoryEventStore()
snapshot_store = InMemorySnapshotStore()
# Repository with snapshotting
repo_with_snapshot = AggregateRepository(
event_store=event_store,
aggregate_factory=OrderAggregate,
aggregate_type="Order",
snapshot_store=snapshot_store,
snapshot_threshold=50,
)
# Repository without snapshotting (same event store)
repo_without_snapshot = AggregateRepository(
event_store=event_store,
aggregate_factory=OrderAggregate,
aggregate_type="Order",
# No snapshot_store
)
# Create order with many events
order_id = uuid4()
order = repo_with_snapshot.create_new(order_id)
order.create(uuid4(), "customer@example.com")
num_items = 200
print(f"Creating order with {num_items} item events...")
for i in range(num_items):
order.add_item(uuid4(), f"Product {i}", 1, 10.0 + i)
await repo_with_snapshot.save(order)
print(f"Order saved. Version: {order.version}")
# Check snapshot
snapshot = await snapshot_store.get_snapshot(order_id, "Order")
print(f"Snapshot at version: {snapshot.version if snapshot else 'None'}")
# Load without snapshot
print("\nLoading WITHOUT snapshot...")
start = time.perf_counter()
for _ in range(10):
await repo_without_snapshot.load(order_id)
time_without = (time.perf_counter() - start) / 10
print(f"Average load time: {time_without * 1000:.2f}ms")
# Load with snapshot
print("\nLoading WITH snapshot...")
start = time.perf_counter()
for _ in range(10):
await repo_with_snapshot.load(order_id)
time_with = (time.perf_counter() - start) / 10
print(f"Average load time: {time_with * 1000:.2f}ms")
# Calculate improvement
if time_without > 0:
improvement = ((time_without - time_with) / time_without) * 100
print(f"\nPerformance improvement: {improvement:.1f}%")
async def snapshot_inspection_example():
"""
Snapshot inspection: Examining snapshot contents.
Shows how to inspect snapshot data for debugging
and understanding what's being stored.
"""
print("\n" + "=" * 60)
print("SNAPSHOT INSPECTION EXAMPLE")
print("=" * 60)
# Create stores
event_store = InMemoryEventStore()
snapshot_store = InMemorySnapshotStore()
# Configure repository
repo = AggregateRepository(
event_store=event_store,
aggregate_factory=OrderAggregate,
aggregate_type="Order",
snapshot_store=snapshot_store,
snapshot_threshold=3,
)
# Create order
order_id = uuid4()
customer_id = uuid4()
order = repo.create_new(order_id)
order.create(customer_id, "customer@example.com")
order.add_item(uuid4(), "Premium Widget", 2, 99.99)
order.add_item(uuid4(), "Basic Widget", 5, 19.99)
await repo.save(order)
# Get and inspect snapshot
snapshot = await snapshot_store.get_snapshot(order_id, "Order")
print("Snapshot Details:")
print(f" Aggregate ID: {snapshot.aggregate_id}")
print(f" Aggregate Type: {snapshot.aggregate_type}")
print(f" Version: {snapshot.version}")
print(f" Schema Version: {snapshot.schema_version}")
print(f" Created At: {snapshot.created_at}")
print(f"\nSnapshot State:")
for key, value in snapshot.state.items():
if isinstance(value, list):
print(f" {key}: [{len(value)} items]")
for i, item in enumerate(value[:2]): # Show first 2
print(f" [{i}]: {item}")
if len(value) > 2:
print(f" ... and {len(value) - 2} more")
else:
print(f" {key}: {value}")
async def main():
"""Run all examples."""
await basic_snapshotting_example()
await manual_snapshot_example()
await background_snapshot_example()
await schema_versioning_example()
await performance_comparison_example()
await snapshot_inspection_example()
print("\n" + "=" * 60)
print("ALL EXAMPLES COMPLETED")
print("=" * 60)
if __name__ == "__main__":
asyncio.run(main())
Running the Example¶
Save the code as snapshotting_example.py and run:
Expected output:
============================================================
BASIC SNAPSHOTTING EXAMPLE
============================================================
Order version after save: 5
Snapshot exists: True
Snapshot version: 5
Snapshot state keys: ['order_id', 'customer_id', 'customer_email', 'items', 'status', 'total_amount']
Loading order from snapshot...
Loaded order version: 5
Loaded order items: 4
Loaded order status: created
============================================================
MANUAL SNAPSHOT EXAMPLE
============================================================
Snapshot after save: False
Creating manual snapshot...
Manual snapshot created at version: 11
Post-submission snapshot at version: 12
============================================================
BACKGROUND SNAPSHOT EXAMPLE
============================================================
Saving order (snapshot will be created in background)...
Save completed. Pending snapshots: 1
Waited for 1 background snapshot(s)
Snapshot exists: True
============================================================
SCHEMA VERSIONING EXAMPLE
============================================================
Snapshot created with schema_version: 1
Loading with schema_version=2...
Loaded successfully via full event replay
Loaded version: 3
Loaded items: 2
============================================================
PERFORMANCE COMPARISON EXAMPLE
============================================================
Creating order with 200 item events...
Order saved. Version: 201
Snapshot at version: 200
Loading WITHOUT snapshot...
Average load time: 15.23ms
Loading WITH snapshot...
Average load time: 1.45ms
Performance improvement: 90.5%
============================================================
SNAPSHOT INSPECTION EXAMPLE
============================================================
Snapshot Details:
Aggregate ID: 550e8400-e29b-41d4-a716-446655440000
Aggregate Type: Order
Version: 3
Schema Version: 1
Created At: 2024-01-15 10:30:45.123456+00:00
Snapshot State:
order_id: 550e8400-e29b-41d4-a716-446655440000
customer_id: 661f9511-f30c-52e5-b827-557766551111
customer_email: customer@example.com
items: [2 items]
[0]: {'product_id': '...', 'product_name': 'Premium Widget', ...}
[1]: {'product_id': '...', 'product_name': 'Basic Widget', ...}
status: created
total_amount: 0.0
============================================================
ALL EXAMPLES COMPLETED
============================================================
Key Concepts Demonstrated¶
1. Automatic Snapshotting¶
Configure snapshot_threshold to automatically create snapshots:
2. Manual Snapshotting¶
Use snapshot_mode="manual" for explicit control:
repo = AggregateRepository(
...,
snapshot_mode="manual",
)
# Create snapshot at business milestones
await repo.create_snapshot(order)
3. Background Snapshotting¶
Use snapshot_mode="background" for non-blocking saves:
repo = AggregateRepository(
...,
snapshot_mode="background",
)
# Wait for background tasks in tests
await repo.await_pending_snapshots()
4. Schema Versioning¶
Increment schema_version when state structure changes:
class OrderAggregate(AggregateRoot[OrderState]):
schema_version = 2 # Invalidates snapshots with schema_version=1
Production Example¶
For production with PostgreSQL:
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker
from eventsource import AggregateRepository, PostgreSQLEventStore
from eventsource.snapshots import PostgreSQLSnapshotStore
# Database setup
engine = create_async_engine(
"postgresql+asyncpg://user:pass@localhost/mydb",
pool_size=20,
max_overflow=10,
)
session_factory = async_sessionmaker(engine, expire_on_commit=False)
# Create stores
event_store = PostgreSQLEventStore(session_factory)
snapshot_store = PostgreSQLSnapshotStore(session_factory)
# Production repository configuration
repo = AggregateRepository(
event_store=event_store,
aggregate_factory=OrderAggregate,
aggregate_type="Order",
snapshot_store=snapshot_store,
snapshot_threshold=100,
snapshot_mode="background", # Non-blocking for production
)
See Also¶
- Snapshotting Guide - Complete usage guide
- Migration Guide - Adding to existing projects
- API Reference - Detailed API documentation
- Basic Order Example - Event sourcing fundamentals