Snapshotting Migration Guide¶
This guide covers how to add snapshotting to an existing event-sourced application with pre-existing aggregates and events.
Overview¶
Adding snapshotting to an existing system requires:
- Database schema updates (if using PostgreSQL/SQLite)
- Adding
schema_versionto aggregate classes - Configuring repositories with snapshot stores
- Optional: Pre-populating snapshots for existing aggregates
The migration is non-destructive - your existing events remain untouched and the system continues working if snapshots are unavailable.
Step 1: Create the Snapshots Table¶
PostgreSQL¶
Run the migration to create the snapshots table:
from eventsource.migrations import get_schema
# Get the SQL for snapshots table
sql = get_schema("snapshots")
print(sql)
Execute the generated SQL:
CREATE TABLE IF NOT EXISTS snapshots (
id BIGSERIAL PRIMARY KEY,
aggregate_id UUID NOT NULL,
aggregate_type VARCHAR(255) NOT NULL,
version INTEGER NOT NULL,
schema_version INTEGER NOT NULL DEFAULT 1,
state JSONB NOT NULL,
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
CONSTRAINT uq_snapshots_aggregate UNIQUE (aggregate_id, aggregate_type)
);
CREATE INDEX IF NOT EXISTS idx_snapshots_aggregate_lookup
ON snapshots(aggregate_id, aggregate_type);
CREATE INDEX IF NOT EXISTS idx_snapshots_aggregate_type
ON snapshots(aggregate_type);
CREATE INDEX IF NOT EXISTS idx_snapshots_schema_version
ON snapshots(aggregate_type, schema_version);
CREATE INDEX IF NOT EXISTS idx_snapshots_created_at
ON snapshots(created_at);
SQLite¶
CREATE TABLE IF NOT EXISTS snapshots (
id INTEGER PRIMARY KEY AUTOINCREMENT,
aggregate_id TEXT NOT NULL,
aggregate_type TEXT NOT NULL,
version INTEGER NOT NULL,
schema_version INTEGER NOT NULL DEFAULT 1,
state TEXT NOT NULL,
created_at TEXT NOT NULL,
UNIQUE (aggregate_id, aggregate_type)
);
CREATE INDEX IF NOT EXISTS idx_snapshots_aggregate_lookup
ON snapshots(aggregate_id, aggregate_type);
CREATE INDEX IF NOT EXISTS idx_snapshots_aggregate_type
ON snapshots(aggregate_type);
CREATE INDEX IF NOT EXISTS idx_snapshots_schema_version
ON snapshots(aggregate_type, schema_version);
CREATE INDEX IF NOT EXISTS idx_snapshots_created_at
ON snapshots(created_at);
Step 2: Add Schema Version to Aggregates¶
Add the schema_version class attribute to each aggregate that will use snapshotting:
Before¶
class OrderAggregate(AggregateRoot[OrderState]):
aggregate_type = "Order"
def _get_initial_state(self) -> OrderState:
return OrderState(order_id=self.aggregate_id)
After¶
class OrderAggregate(AggregateRoot[OrderState]):
aggregate_type = "Order"
schema_version = 1 # Add this line
def _get_initial_state(self) -> OrderState:
return OrderState(order_id=self.aggregate_id)
Important: Start with schema_version = 1. Increment this value whenever you make breaking changes to your state model (see Schema Evolution below).
Step 3: Configure Repository with Snapshot Store¶
Update your repository configuration to include a snapshot store:
Before¶
repo = AggregateRepository(
event_store=event_store,
aggregate_factory=OrderAggregate,
aggregate_type="Order",
)
After (PostgreSQL)¶
from eventsource.snapshots import PostgreSQLSnapshotStore
snapshot_store = PostgreSQLSnapshotStore(session_factory)
repo = AggregateRepository(
event_store=event_store,
aggregate_factory=OrderAggregate,
aggregate_type="Order",
snapshot_store=snapshot_store,
snapshot_threshold=100, # Snapshot every 100 events
snapshot_mode="sync", # Or "background" for async creation
)
After (SQLite)¶
from eventsource.snapshots import SQLiteSnapshotStore
snapshot_store = SQLiteSnapshotStore("snapshots.db")
repo = AggregateRepository(
event_store=event_store,
aggregate_factory=OrderAggregate,
aggregate_type="Order",
snapshot_store=snapshot_store,
snapshot_threshold=100,
)
Step 4: Deploy and Verify¶
After deploying:
- Existing aggregates will load normally (full event replay)
- On next save that crosses a threshold boundary, a snapshot is created
- Subsequent loads will use the snapshot + recent events
No immediate action is required - snapshots will be created organically as aggregates are modified.
Optional: Pre-Populate Snapshots¶
For large aggregates, you may want to create snapshots proactively rather than waiting for organic updates.
Batch Snapshot Creation¶
async def create_snapshots_for_aggregate_type(
repo: AggregateRepository,
aggregate_ids: list[UUID],
) -> int:
"""Create snapshots for existing aggregates."""
count = 0
for aggregate_id in aggregate_ids:
try:
# Load aggregate (full replay)
aggregate = await repo.load(aggregate_id)
# Create snapshot manually
await repo.create_snapshot(aggregate)
count += 1
if count % 100 == 0:
print(f"Created {count} snapshots...")
except Exception as e:
print(f"Failed to snapshot {aggregate_id}: {e}")
return count
# Usage
aggregate_ids = await get_all_order_ids() # Your query
created = await create_snapshots_for_aggregate_type(repo, aggregate_ids)
print(f"Created {created} snapshots")
Prioritize High-Event Aggregates¶
Focus on aggregates that will benefit most:
# Query aggregates with many events
high_event_aggregates = await connection.execute("""
SELECT aggregate_id
FROM events
WHERE aggregate_type = 'Order'
GROUP BY aggregate_id
HAVING COUNT(*) > 500
ORDER BY COUNT(*) DESC
""")
Schema Evolution¶
When you change your state model in a breaking way, increment schema_version:
Example: Adding a Required Field¶
# Version 1 state
class OrderStateV1(BaseModel):
order_id: UUID
status: str
items: list[dict]
# Version 2 state - added customer_id
class OrderStateV2(BaseModel):
order_id: UUID
status: str
items: list[dict]
customer_id: UUID # New required field
Update aggregate:
class OrderAggregate(AggregateRoot[OrderStateV2]):
aggregate_type = "Order"
schema_version = 2 # Increment version
def _get_initial_state(self) -> OrderStateV2:
return OrderStateV2(
order_id=self.aggregate_id,
status="draft",
items=[],
customer_id=UUID("00000000-0000-0000-0000-000000000000"),
)
What Happens to Old Snapshots?¶
When an aggregate is loaded:
- If snapshot exists with
schema_version < current, it's ignored - Full event replay occurs
- New snapshot is created at current schema version
Old snapshots are automatically invalidated - no manual cleanup required.
Bulk Invalidation (Optional)¶
To proactively clean up old snapshots:
# Delete all Order snapshots with schema_version < 2
deleted = await snapshot_store.delete_snapshots_by_type(
"Order",
schema_version_below=2,
)
print(f"Deleted {deleted} outdated snapshots")
Rollback Plan¶
If issues arise, snapshotting can be disabled instantly:
Option 1: Remove Snapshot Store¶
# Simply remove snapshot_store parameter
repo = AggregateRepository(
event_store=event_store,
aggregate_factory=OrderAggregate,
aggregate_type="Order",
# snapshot_store removed - back to full replay
)
Option 2: Clear All Snapshots¶
Events are never modified, so reverting is always safe.
Monitoring the Migration¶
Check Snapshot Coverage¶
-- PostgreSQL: Count aggregates with/without snapshots
SELECT
e.aggregate_type,
COUNT(DISTINCT e.aggregate_id) as total_aggregates,
COUNT(DISTINCT s.aggregate_id) as with_snapshot,
COUNT(DISTINCT e.aggregate_id) - COUNT(DISTINCT s.aggregate_id) as without_snapshot
FROM events e
LEFT JOIN snapshots s
ON e.aggregate_id = s.aggregate_id
AND e.aggregate_type = s.aggregate_type
GROUP BY e.aggregate_type;
Check Snapshot Freshness¶
-- Find stale snapshots (many events since snapshot)
SELECT
s.aggregate_type,
s.aggregate_id,
s.version as snapshot_version,
MAX(e.version) as current_version,
MAX(e.version) - s.version as events_since_snapshot
FROM snapshots s
JOIN events e
ON s.aggregate_id = e.aggregate_id
AND s.aggregate_type = e.aggregate_type
GROUP BY s.aggregate_type, s.aggregate_id, s.version
HAVING MAX(e.version) - s.version > 100
ORDER BY events_since_snapshot DESC;
Troubleshooting¶
Snapshots Not Being Created¶
- Check threshold: Aggregate must have
>= thresholdevents - Check schema_version: Ensure aggregate has
schema_versionattribute - Check logs: Enable DEBUG logging for
eventsource.aggregates.repository
Snapshot Deserialization Errors¶
If snapshots fail to deserialize (state model changed):
- The system automatically falls back to full event replay
- A warning is logged
- A new snapshot is created on next save
To force regeneration:
# Delete specific snapshot
await snapshot_store.delete_snapshot(aggregate_id, "Order")
# Or delete all for a type
await snapshot_store.delete_snapshots_by_type("Order")
Performance Not Improving¶
- Check event count: Snapshotting helps most with 100+ events
- Check snapshot age: Very old snapshots may have many events to replay
- Consider lowering threshold: Try
snapshot_threshold=50for faster snapshots
Related Documentation¶
- Snapshotting User Guide - Complete feature documentation
- Snapshots API Reference - Detailed API documentation
- Snapshotting Examples - Code examples