SQLite Usage Examples¶
This page provides practical examples for using eventsource with SQLite backends.
Basic Event Store Usage¶
Creating and Using the Store¶
import asyncio
from uuid import uuid4
from eventsource import SQLiteEventStore, DomainEvent, register_event
# Define your events
@register_event
class OrderCreated(DomainEvent):
event_type: str = "OrderCreated"
aggregate_type: str = "Order"
customer_id: str
total_amount: float
@register_event
class OrderShipped(DomainEvent):
event_type: str = "OrderShipped"
aggregate_type: str = "Order"
tracking_number: str
async def main():
# Create and initialize the store
async with SQLiteEventStore("./orders.db") as store:
await store.initialize()
order_id = uuid4()
# Append events
result = await store.append_events(
aggregate_id=order_id,
aggregate_type="Order",
events=[
OrderCreated(
aggregate_id=order_id,
customer_id="cust-123",
total_amount=99.99,
aggregate_version=1,
),
],
expected_version=0, # New aggregate
)
print(f"Success: {result.success}")
print(f"New version: {result.new_version}")
print(f"Global position: {result.global_position}")
# Retrieve events
stream = await store.get_events(order_id, "Order")
print(f"Events in stream: {len(stream.events)}")
print(f"Current version: {stream.version}")
for event in stream.events:
print(f" - {event.event_type}: {event.event_id}")
asyncio.run(main())
Reading All Events (for Projections)¶
async def rebuild_projection():
async with SQLiteEventStore("./orders.db") as store:
await store.initialize()
# Read all events in global order
async for stored_event in store.read_all():
print(f"Position {stored_event.global_position}: "
f"{stored_event.event_type}")
# Process for your projection
# projection.handle(stored_event.event)
asyncio.run(rebuild_projection())
Streaming from a Specific Position¶
from eventsource.stores import ReadOptions, ReadDirection
async def catch_up_from_position(last_position: int):
async with SQLiteEventStore("./orders.db") as store:
await store.initialize()
options = ReadOptions(
from_position=last_position,
direction=ReadDirection.FORWARD,
limit=100, # Process in batches
)
async for stored_event in store.read_all(options):
print(f"Processing event at position {stored_event.global_position}")
# ... process event
asyncio.run(catch_up_from_position(last_position=1000))
Repository Pattern Usage¶
Complete Aggregate Example¶
import asyncio
from uuid import UUID, uuid4
from pydantic import BaseModel
from eventsource import (
DomainEvent,
register_event,
AggregateRoot,
AggregateRepository,
SQLiteEventStore,
)
# Events
@register_event
class AccountOpened(DomainEvent):
event_type: str = "AccountOpened"
aggregate_type: str = "Account"
owner_name: str
initial_balance: float
@register_event
class MoneyDeposited(DomainEvent):
event_type: str = "MoneyDeposited"
aggregate_type: str = "Account"
amount: float
@register_event
class MoneyWithdrawn(DomainEvent):
event_type: str = "MoneyWithdrawn"
aggregate_type: str = "Account"
amount: float
# State
class AccountState(BaseModel):
account_id: UUID
owner_name: str = ""
balance: float = 0.0
is_open: bool = False
# Aggregate
class AccountAggregate(AggregateRoot[AccountState]):
aggregate_type = "Account"
def _get_initial_state(self) -> AccountState:
return AccountState(account_id=self.aggregate_id)
def _apply(self, event: DomainEvent) -> None:
if isinstance(event, AccountOpened):
self._state = AccountState(
account_id=self.aggregate_id,
owner_name=event.owner_name,
balance=event.initial_balance,
is_open=True,
)
elif isinstance(event, MoneyDeposited):
if self._state:
self._state = self._state.model_copy(
update={"balance": self._state.balance + event.amount}
)
elif isinstance(event, MoneyWithdrawn):
if self._state:
self._state = self._state.model_copy(
update={"balance": self._state.balance - event.amount}
)
def open(self, owner_name: str, initial_balance: float = 0.0) -> None:
if self.version > 0:
raise ValueError("Account already exists")
if initial_balance < 0:
raise ValueError("Initial balance cannot be negative")
self.apply_event(AccountOpened(
aggregate_id=self.aggregate_id,
owner_name=owner_name,
initial_balance=initial_balance,
aggregate_version=self.get_next_version(),
))
def deposit(self, amount: float) -> None:
if not self.state or not self.state.is_open:
raise ValueError("Account not open")
if amount <= 0:
raise ValueError("Deposit amount must be positive")
self.apply_event(MoneyDeposited(
aggregate_id=self.aggregate_id,
amount=amount,
aggregate_version=self.get_next_version(),
))
def withdraw(self, amount: float) -> None:
if not self.state or not self.state.is_open:
raise ValueError("Account not open")
if amount <= 0:
raise ValueError("Withdrawal amount must be positive")
if amount > self.state.balance:
raise ValueError("Insufficient funds")
self.apply_event(MoneyWithdrawn(
aggregate_id=self.aggregate_id,
amount=amount,
aggregate_version=self.get_next_version(),
))
# Usage
async def main():
async with SQLiteEventStore("./bank.db") as store:
await store.initialize()
repo = AggregateRepository(
event_store=store,
aggregate_factory=AccountAggregate,
aggregate_type="Account",
)
# Open new account
account_id = uuid4()
account = repo.create_new(account_id)
account.open("Alice Smith", initial_balance=100.00)
await repo.save(account)
print(f"Opened account {account_id} with balance ${account.state.balance:.2f}")
# Make transactions
account = await repo.load(account_id)
account.deposit(50.00)
account.withdraw(25.00)
await repo.save(account)
# Check final state
account = await repo.load(account_id)
print(f"Final balance: ${account.state.balance:.2f}")
print(f"Total events: {account.version}")
asyncio.run(main())
Checkpoint Repository Usage¶
import asyncio
import aiosqlite
from uuid import uuid4
from eventsource import SQLiteEventStore
from eventsource.repositories import SQLiteCheckpointRepository
async def main():
async with SQLiteEventStore("./app.db") as store:
await store.initialize()
# Use the same connection for checkpoint repo
async with aiosqlite.connect("./app.db") as db:
checkpoint_repo = SQLiteCheckpointRepository(db)
# Simulate processing events
projection_name = "OrderSummaryProjection"
# Get last checkpoint (None if first run)
last_checkpoint = await checkpoint_repo.get_checkpoint(projection_name)
print(f"Last checkpoint: {last_checkpoint}")
# Process some events...
event_id = uuid4()
event_type = "OrderCreated"
# Update checkpoint after processing
await checkpoint_repo.update_checkpoint(
projection_name=projection_name,
event_id=event_id,
event_type=event_type,
)
print(f"Updated checkpoint to: {event_id}")
# Get lag metrics
metrics = await checkpoint_repo.get_lag_metrics(
projection_name,
event_types=["OrderCreated", "OrderShipped"],
)
if metrics:
print(f"Events processed: {metrics.events_processed}")
print(f"Lag seconds: {metrics.lag_seconds}")
asyncio.run(main())
Outbox Repository Usage¶
import asyncio
import aiosqlite
from uuid import uuid4
from eventsource import SQLiteEventStore, DomainEvent, register_event
from eventsource.repositories import SQLiteOutboxRepository
@register_event
class OrderPlaced(DomainEvent):
event_type: str = "OrderPlaced"
aggregate_type: str = "Order"
customer_email: str
total: float
async def add_to_outbox():
"""Add events to outbox within a transaction."""
async with SQLiteEventStore("./app.db") as store:
await store.initialize()
async with aiosqlite.connect("./app.db") as db:
outbox = SQLiteOutboxRepository(db)
# Create event
event = OrderPlaced(
aggregate_id=uuid4(),
customer_email="customer@example.com",
total=149.99,
aggregate_version=1,
)
# Add to outbox
outbox_id = await outbox.add_event(event)
print(f"Added to outbox: {outbox_id}")
async def publish_outbox_events():
"""Worker that publishes pending outbox events."""
async with aiosqlite.connect("./app.db") as db:
outbox = SQLiteOutboxRepository(db)
while True:
# Get pending events
pending = await outbox.get_pending_events(limit=10)
if not pending:
print("No pending events")
await asyncio.sleep(1)
continue
for entry in pending:
try:
# Simulate publishing to message bus
print(f"Publishing event {entry.event_type}")
# await message_bus.publish(entry.event_data)
# Mark as published
await outbox.mark_published(entry.id)
print(f"Marked as published: {entry.id}")
except Exception as e:
# Increment retry count on failure
await outbox.increment_retry(entry.id, str(e))
print(f"Failed to publish: {e}")
break # Exit loop for example
async def main():
await add_to_outbox()
await publish_outbox_events()
asyncio.run(main())
Dead Letter Queue Usage¶
import asyncio
import aiosqlite
from uuid import uuid4
from eventsource import SQLiteEventStore
from eventsource.repositories import SQLiteDLQRepository
async def main():
async with SQLiteEventStore("./app.db") as store:
await store.initialize()
async with aiosqlite.connect("./app.db") as db:
dlq = SQLiteDLQRepository(db)
# Simulate a failed event
event_id = uuid4()
try:
raise ValueError("Processing failed: invalid data format")
except Exception as e:
await dlq.add_failed_event(
event_id=event_id,
projection_name="OrderProjection",
event_type="OrderCreated",
event_data={"order_id": str(uuid4()), "total": 99.99},
error=e,
retry_count=1,
)
print(f"Added to DLQ: {event_id}")
# View failed events
failed_events = await dlq.get_failed_events(
projection_name="OrderProjection",
status="failed",
limit=10,
)
print(f"\nFailed events: {len(failed_events)}")
for entry in failed_events:
print(f" - Event: {entry['event_type']}")
print(f" Error: {entry['error_message']}")
print(f" Retry count: {entry['retry_count']}")
# Get statistics
stats = await dlq.get_statistics()
print(f"\nDLQ Statistics: {stats}")
# Mark as resolved after fixing
if failed_events:
await dlq.mark_resolved(
dlq_id=failed_events[0]["id"],
resolution="Fixed data issue manually",
)
print("Marked as resolved")
asyncio.run(main())
Testing Setup with pytest¶
Basic Test Fixture¶
import pytest
from uuid import uuid4
from eventsource import SQLiteEventStore, AggregateRepository
# Import your domain classes
# from my_app.aggregates import OrderAggregate
# from my_app.events import OrderCreated
@pytest.fixture
async def sqlite_store():
"""In-memory SQLite store for fast, isolated tests."""
async with SQLiteEventStore(":memory:") as store:
await store.initialize()
yield store
@pytest.fixture
async def order_repo(sqlite_store):
"""Repository using in-memory store."""
return AggregateRepository(
event_store=sqlite_store,
aggregate_factory=OrderAggregate,
aggregate_type="Order",
)
@pytest.mark.asyncio
async def test_create_order(order_repo):
order_id = uuid4()
customer_id = uuid4()
order = order_repo.create_new(order_id)
order.create(customer_id=customer_id, email="test@example.com")
assert order.version == 1
assert order.state.customer_id == customer_id
@pytest.mark.asyncio
async def test_save_and_load_order(order_repo):
order_id = uuid4()
# Create and save
order = order_repo.create_new(order_id)
order.create(customer_id=uuid4(), email="test@example.com")
await order_repo.save(order)
# Load and verify
loaded = await order_repo.load(order_id)
assert loaded.version == 1
assert loaded.state.status == "created"
Testing Projections¶
import pytest
import aiosqlite
from eventsource import SQLiteEventStore
from eventsource.repositories import (
SQLiteCheckpointRepository,
SQLiteDLQRepository,
)
from eventsource.projections import DeclarativeProjection, handles
# Mock projection for testing
class TestProjection(DeclarativeProjection):
def __init__(self, checkpoint_repo, dlq_repo):
super().__init__()
self.orders = {}
self._checkpoint_repo = checkpoint_repo
self._dlq_repo = dlq_repo
@handles(OrderCreated)
async def _on_order_created(self, event):
self.orders[event.aggregate_id] = {
"id": event.aggregate_id,
"status": "created",
}
async def _truncate_read_models(self):
self.orders.clear()
@pytest.fixture
async def sqlite_db():
"""Shared in-memory database for all repos."""
async with SQLiteEventStore(":memory:") as store:
await store.initialize()
async with aiosqlite.connect(":memory:") as db:
# Create tables in the same connection
schema = """
CREATE TABLE IF NOT EXISTS projection_checkpoints (
projection_name TEXT PRIMARY KEY,
last_event_id TEXT,
last_event_type TEXT,
last_processed_at TEXT,
events_processed INTEGER DEFAULT 0,
created_at TEXT,
updated_at TEXT
);
"""
await db.executescript(schema)
yield {"store": store, "db": db}
@pytest.fixture
async def projection(sqlite_db):
"""Projection with test dependencies."""
checkpoint_repo = SQLiteCheckpointRepository(sqlite_db["db"])
dlq_repo = SQLiteDLQRepository(sqlite_db["db"])
return TestProjection(checkpoint_repo, dlq_repo)
@pytest.mark.asyncio
async def test_projection_handles_event(projection):
event = OrderCreated(
aggregate_id=uuid4(),
customer_id=uuid4(),
email="test@example.com",
aggregate_version=1,
)
await projection.handle(event)
assert event.aggregate_id in projection.orders
assert projection.orders[event.aggregate_id]["status"] == "created"
Integration Test with Full Stack¶
For production-like testing, use SubscriptionManager to manage projections:
import asyncio
import pytest
from uuid import uuid4
from eventsource import (
SQLiteEventStore,
AggregateRepository,
InMemoryEventBus,
InMemoryCheckpointRepository,
)
from eventsource.subscriptions import SubscriptionManager, SubscriptionConfig
@pytest.fixture
async def full_stack():
"""Complete stack for integration testing with SubscriptionManager."""
async with SQLiteEventStore(":memory:") as store:
await store.initialize()
bus = InMemoryEventBus()
checkpoint_repo = InMemoryCheckpointRepository()
projection = OrderListProjection()
# Set up SubscriptionManager for proper event delivery
manager = SubscriptionManager(
event_store=store,
event_bus=bus,
checkpoint_repo=checkpoint_repo,
)
config = SubscriptionConfig(start_from="beginning")
await manager.subscribe(projection, config=config, name="OrderList")
await manager.start()
repo = AggregateRepository(
event_store=store,
aggregate_factory=OrderAggregate,
aggregate_type="Order",
event_publisher=bus,
)
yield {
"store": store,
"bus": bus,
"projection": projection,
"repo": repo,
"manager": manager,
}
# Cleanup
await manager.stop()
@pytest.mark.asyncio
async def test_full_order_flow(full_stack):
repo = full_stack["repo"]
projection = full_stack["projection"]
# Create order
order_id = uuid4()
order = repo.create_new(order_id)
order.create(customer_id=uuid4(), email="customer@test.com")
order.add_item(product_id=uuid4(), name="Widget", quantity=2, price=25.00)
order.submit()
await repo.save(order)
# Wait for events to be processed
await asyncio.sleep(0.1)
# Verify aggregate
loaded = await repo.load(order_id)
assert loaded.state.status == "submitted"
assert loaded.state.total == 50.00
# Verify projection was updated
assert order_id in projection.orders
assert projection.orders[order_id]["status"] == "submitted"
File-Based vs In-Memory Databases¶
When to Use In-Memory (:memory:)¶
# Fast, isolated tests
store = SQLiteEventStore(":memory:")
# Temporary processing
store = SQLiteEventStore(":memory:")
# Each connection gets a new database
When to Use File-Based¶
# Development with persistent data
store = SQLiteEventStore("./dev_events.db")
# Local testing with inspection
store = SQLiteEventStore("/tmp/test_events.db")
# Production single-instance
store = SQLiteEventStore("/var/data/myapp/events.db")
Shared In-Memory Database¶
For testing scenarios where you need multiple connections to the same in-memory database:
See Also¶
- SQLite Backend Guide - Configuration and best practices
- Event Stores API Reference - Complete API documentation
- Testing Patterns - General testing strategies
- Basic Order Example - Domain modeling example