Testing Patterns¶
This guide covers testing patterns for event-sourced applications using eventsource.
Overview¶
Event sourcing makes testing easier because: - Events are the source of truth - State is derived from events - In-memory implementations enable fast tests
Testing Module (Recommended)¶
The eventsource.testing module provides comprehensive testing utilities that reduce boilerplate and improve test readability.
Installation¶
The testing module is included with eventsource - no extra dependencies needed:
from eventsource.testing import (
EventBuilder,
InMemoryTestHarness,
EventAssertions,
given_events,
when_command,
then_event_published,
)
EventBuilder - Fluent Event Construction¶
Build test events with minimal boilerplate:
from uuid import uuid4
from eventsource.testing import EventBuilder
from your_app.events import OrderCreated, OrderShipped
# Basic usage - auto-generates IDs and version
event = EventBuilder(OrderCreated).build(
customer_id=uuid4(),
total=99.99,
)
# Fluent API for configuration
order_id = uuid4()
event = (
EventBuilder(OrderCreated)
.with_aggregate_id(order_id)
.with_version(1)
.with_tenant_id(tenant_id)
.build(customer_id=uuid4(), total=99.99)
)
# Build a sequence of events for the same aggregate
events = (
EventBuilder(OrderCreated)
.with_aggregate_id(order_id)
.build_sequence([
{"customer_id": uuid4(), "total": 100.0}, # Version 1
])
)
shipped_event = (
EventBuilder(OrderShipped)
.with_aggregate_id(order_id)
.with_version(2)
.build(tracking_number="TRACK-001")
)
InMemoryTestHarness - Pre-configured Infrastructure¶
Get a complete test environment with one line:
import pytest
from eventsource.testing import InMemoryTestHarness
from your_app.aggregates import OrderAggregate
@pytest.fixture
async def harness():
"""Provides pre-configured in-memory infrastructure."""
h = InMemoryTestHarness()
await h.setup()
yield h
await h.teardown()
@pytest.mark.asyncio
async def test_order_creation(harness):
# Harness provides: event_store, event_bus, checkpoint_repo, dlq_repo
repo = harness.create_repository(OrderAggregate, "Order")
order = repo.create_new(uuid4())
order.create(customer_id=uuid4(), total=100.0)
await repo.save(order)
# Access infrastructure for assertions
events = await harness.event_store.get_events(order.aggregate_id, "Order")
assert len(events.events) == 1
@pytest.mark.asyncio
async def test_multiple_scenarios(harness):
# Clear between scenarios
await harness.clear()
# ... run next scenario
BDD-Style Helpers¶
Write expressive, readable tests:
import pytest
from eventsource.testing import (
given_events,
when_command,
then_event_published,
then_no_events_published,
then_event_sequence,
then_event_count,
InMemoryTestHarness,
EventBuilder,
)
@pytest.fixture
async def harness():
h = InMemoryTestHarness()
await h.setup()
yield h
await h.teardown()
@pytest.mark.asyncio
async def test_ship_order_bdd_style(harness):
order_id = uuid4()
# Given: Order exists and is paid
await given_events(harness, [
EventBuilder(OrderCreated)
.with_aggregate_id(order_id)
.build(customer_id=uuid4(), total=100.0),
EventBuilder(OrderPaid)
.with_aggregate_id(order_id)
.with_version(2)
.build(),
])
# When: Ship order command
async def ship_command():
repo = harness.create_repository(OrderAggregate, "Order")
order = await repo.load(order_id)
order.ship(tracking_number="TRACK-123")
await repo.save(order)
await when_command(harness, ship_command)
# Then: OrderShipped event was published
then_event_published(harness, OrderShipped, aggregate_id=order_id)
then_event_count(harness, 3) # Created + Paid + Shipped
@pytest.mark.asyncio
async def test_cannot_ship_unpaid_order(harness):
order_id = uuid4()
await given_events(harness, [
EventBuilder(OrderCreated)
.with_aggregate_id(order_id)
.build(customer_id=uuid4(), total=100.0),
])
async def ship_unpaid():
repo = harness.create_repository(OrderAggregate, "Order")
order = await repo.load(order_id)
order.ship(tracking_number="TRACK-123") # Should raise
await repo.save(order)
with pytest.raises(ValueError, match="must be paid"):
await when_command(harness, ship_unpaid)
# No shipping event should have been published
then_no_events_published(harness, event_type=OrderShipped)
EventAssertions - Detailed Assertions¶
For more control over assertions:
from eventsource.testing import EventAssertions
assertions = EventAssertions(harness.event_store, harness.event_bus)
# Assert specific event was published
assertions.assert_event_published(
OrderShipped,
aggregate_id=order_id,
tracking_number="TRACK-123",
)
# Assert event sequence
assertions.assert_event_sequence([
(OrderCreated, {"total": 100.0}),
(OrderPaid, {}),
(OrderShipped, {"tracking_number": "TRACK-123"}),
])
# Assert aggregate state
assertions.assert_aggregate_version(order_id, expected_version=3)
Complete Example¶
import pytest
from uuid import uuid4
from eventsource.testing import (
InMemoryTestHarness,
EventBuilder,
given_events,
when_command,
then_event_published,
)
from your_app.events import OrderCreated, OrderPaid, OrderShipped
from your_app.aggregates import OrderAggregate
@pytest.fixture
async def harness():
h = InMemoryTestHarness()
await h.setup()
yield h
await h.teardown()
class TestOrderShipping:
"""Tests for order shipping functionality."""
@pytest.mark.asyncio
async def test_ship_paid_order(self, harness):
"""Paid orders can be shipped."""
order_id = uuid4()
customer_id = uuid4()
# Arrange
await given_events(harness, [
EventBuilder(OrderCreated)
.with_aggregate_id(order_id)
.build(customer_id=customer_id, total=50.0),
EventBuilder(OrderPaid)
.with_aggregate_id(order_id)
.with_version(2)
.build(),
])
# Act
async def ship():
repo = harness.create_repository(OrderAggregate, "Order")
order = await repo.load(order_id)
order.ship("TRACK-001")
await repo.save(order)
await when_command(harness, ship)
# Assert
then_event_published(harness, OrderShipped, tracking_number="TRACK-001")
@pytest.mark.asyncio
async def test_ship_order_updates_state(self, harness):
"""Shipping updates order status to shipped."""
order_id = uuid4()
await given_events(harness, [
EventBuilder(OrderCreated)
.with_aggregate_id(order_id)
.build(customer_id=uuid4(), total=50.0),
EventBuilder(OrderPaid)
.with_aggregate_id(order_id)
.with_version(2)
.build(),
])
repo = harness.create_repository(OrderAggregate, "Order")
order = await repo.load(order_id)
order.ship("TRACK-002")
await repo.save(order)
# Reload and verify state
reloaded = await repo.load(order_id)
assert reloaded.state.status == "shipped"
assert reloaded.state.tracking_number == "TRACK-002"
Manual Testing Patterns¶
For simpler tests or when you prefer manual control, you can use the patterns below.
Unit Testing Aggregates¶
Testing Commands¶
import pytest
from uuid import uuid4
from eventsource import InMemoryEventStore, AggregateRepository
# Test fixtures
@pytest.fixture
def event_store():
return InMemoryEventStore()
@pytest.fixture
def order_repo(event_store):
return AggregateRepository(
event_store=event_store,
aggregate_factory=OrderAggregate,
aggregate_type="Order",
)
# Test creating an order
@pytest.mark.asyncio
async def test_create_order(order_repo):
order_id = uuid4()
customer_id = uuid4()
order = order_repo.create_new(order_id)
order.create(customer_id, "test@example.com")
assert order.version == 1
assert order.state.customer_id == customer_id
assert order.state.status == "created"
# Test business rule validation
@pytest.mark.asyncio
async def test_cannot_add_item_to_submitted_order(order_repo):
order_id = uuid4()
# Create and submit order
order = order_repo.create_new(order_id)
order.create(uuid4(), "test@example.com")
order.add_item(uuid4(), "Product", 1, 10.00)
order.submit()
await order_repo.save(order)
# Try to add item
loaded = await order_repo.load(order_id)
with pytest.raises(ValueError, match="Can only add items"):
loaded.add_item(uuid4(), "Another", 1, 5.00)
# Test idempotency
@pytest.mark.asyncio
async def test_cannot_create_twice(order_repo):
order = order_repo.create_new(uuid4())
order.create(uuid4(), "test@example.com")
with pytest.raises(ValueError, match="already exists"):
order.create(uuid4(), "another@example.com")
Testing Event Application¶
def test_order_created_event_application():
order = OrderAggregate(uuid4())
customer_id = uuid4()
event = OrderCreated(
aggregate_id=order.aggregate_id,
customer_id=customer_id,
customer_email="test@example.com",
aggregate_version=1,
)
order.apply_event(event, is_new=False)
assert order.state.customer_id == customer_id
assert order.state.status == "created"
assert order.version == 1
def test_state_reconstruction_from_events():
order_id = uuid4()
customer_id = uuid4()
events = [
OrderCreated(
aggregate_id=order_id,
customer_id=customer_id,
customer_email="test@example.com",
aggregate_version=1,
),
OrderItemAdded(
aggregate_id=order_id,
product_id=uuid4(),
product_name="Widget",
quantity=2,
unit_price=25.00,
aggregate_version=2,
),
OrderSubmitted(
aggregate_id=order_id,
total_amount=50.00,
aggregate_version=3,
),
]
order = OrderAggregate(order_id)
order.load_from_history(events)
assert order.version == 3
assert order.state.status == "submitted"
assert len(order.state.items) == 1
assert order.state.total_amount == 50.00
Unit Testing Projections¶
import pytest
from eventsource.projections import DeclarativeProjection, handles
from eventsource.repositories import InMemoryCheckpointRepository, InMemoryDLQRepository
class TestOrderProjection(DeclarativeProjection):
def __init__(self):
super().__init__(
checkpoint_repo=InMemoryCheckpointRepository(),
dlq_repo=InMemoryDLQRepository(),
)
self.orders = {}
@handles(OrderCreated)
async def _on_created(self, event: OrderCreated) -> None:
self.orders[event.aggregate_id] = {
"id": event.aggregate_id,
"customer_email": event.customer_email,
"status": "created",
}
@handles(OrderSubmitted)
async def _on_submitted(self, event: OrderSubmitted) -> None:
if event.aggregate_id in self.orders:
self.orders[event.aggregate_id]["status"] = "submitted"
async def _truncate_read_models(self) -> None:
self.orders.clear()
# Tests
@pytest.fixture
def projection():
return TestOrderProjection()
@pytest.mark.asyncio
async def test_projection_handles_order_created(projection):
event = OrderCreated(
aggregate_id=uuid4(),
customer_id=uuid4(),
customer_email="test@example.com",
aggregate_version=1,
)
await projection.handle(event)
assert event.aggregate_id in projection.orders
assert projection.orders[event.aggregate_id]["status"] == "created"
@pytest.mark.asyncio
async def test_projection_handles_multiple_events(projection):
order_id = uuid4()
await projection.handle(OrderCreated(
aggregate_id=order_id,
customer_id=uuid4(),
customer_email="test@example.com",
aggregate_version=1,
))
await projection.handle(OrderSubmitted(
aggregate_id=order_id,
total_amount=100.00,
aggregate_version=2,
))
assert projection.orders[order_id]["status"] == "submitted"
@pytest.mark.asyncio
async def test_projection_reset(projection):
await projection.handle(OrderCreated(
aggregate_id=uuid4(),
customer_id=uuid4(),
customer_email="test@example.com",
aggregate_version=1,
))
assert len(projection.orders) == 1
await projection.reset()
assert len(projection.orders) == 0
Integration Testing¶
Testing Event Store Operations¶
@pytest.mark.asyncio
async def test_append_and_retrieve_events():
store = InMemoryEventStore()
aggregate_id = uuid4()
event1 = OrderCreated(
aggregate_id=aggregate_id,
customer_id=uuid4(),
customer_email="test@example.com",
aggregate_version=1,
)
event2 = OrderItemAdded(
aggregate_id=aggregate_id,
product_id=uuid4(),
product_name="Widget",
quantity=1,
unit_price=10.00,
aggregate_version=2,
)
# Append events
result = await store.append_events(
aggregate_id=aggregate_id,
aggregate_type="Order",
events=[event1, event2],
expected_version=0,
)
assert result.success
assert result.new_version == 2
# Retrieve events
stream = await store.get_events(aggregate_id, "Order")
assert len(stream.events) == 2
assert stream.version == 2
@pytest.mark.asyncio
async def test_optimistic_locking():
store = InMemoryEventStore()
aggregate_id = uuid4()
# Initial event
await store.append_events(
aggregate_id=aggregate_id,
aggregate_type="Order",
events=[OrderCreated(
aggregate_id=aggregate_id,
customer_id=uuid4(),
customer_email="test@example.com",
aggregate_version=1,
)],
expected_version=0,
)
# Try to append with wrong version
from eventsource import OptimisticLockError
with pytest.raises(OptimisticLockError):
await store.append_events(
aggregate_id=aggregate_id,
aggregate_type="Order",
events=[OrderSubmitted(
aggregate_id=aggregate_id,
total_amount=100.00,
aggregate_version=2,
)],
expected_version=0, # Should be 1
)
Testing Full Flow¶
@pytest.mark.asyncio
async def test_full_order_flow():
# Setup
store = InMemoryEventStore()
bus = InMemoryEventBus()
projection = TestOrderProjection()
repo = AggregateRepository(
event_store=store,
aggregate_factory=OrderAggregate,
aggregate_type="Order",
event_publisher=bus,
)
bus.subscribe_all(projection)
# Create order
order_id = uuid4()
order = repo.create_new(order_id)
order.create(uuid4(), "customer@test.com")
order.add_item(uuid4(), "Product", 2, 25.00)
order.submit()
await repo.save(order)
# Verify aggregate
loaded = await repo.load(order_id)
assert loaded.state.status == "submitted"
assert loaded.state.total_amount == 50.00
# Verify projection
assert order_id in projection.orders
assert projection.orders[order_id]["status"] == "submitted"
# Verify event store
stream = await store.get_events(order_id, "Order")
assert len(stream.events) == 3 # created, item_added, submitted
Testing with PostgreSQL (Integration)¶
import pytest
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker
from eventsource import PostgreSQLEventStore
@pytest.fixture
async def pg_store():
"""Create PostgreSQL event store for integration tests."""
engine = create_async_engine(
"postgresql+asyncpg://test:test@localhost/test_db"
)
session_factory = async_sessionmaker(engine, expire_on_commit=False)
store = PostgreSQLEventStore(session_factory)
yield store
# Cleanup
async with engine.begin() as conn:
await conn.execute(text("TRUNCATE TABLE events"))
@pytest.mark.integration
@pytest.mark.asyncio
async def test_postgresql_event_store(pg_store):
aggregate_id = uuid4()
result = await pg_store.append_events(
aggregate_id=aggregate_id,
aggregate_type="Order",
events=[OrderCreated(
aggregate_id=aggregate_id,
customer_id=uuid4(),
customer_email="test@example.com",
aggregate_version=1,
)],
expected_version=0,
)
assert result.success
stream = await pg_store.get_events(aggregate_id, "Order")
assert len(stream.events) == 1
Test Helpers¶
Event Builder¶
class EventBuilder:
"""Helper for building test events."""
def __init__(self, aggregate_id: UUID | None = None):
self.aggregate_id = aggregate_id or uuid4()
self.version = 0
def order_created(self, **kwargs) -> OrderCreated:
self.version += 1
return OrderCreated(
aggregate_id=self.aggregate_id,
customer_id=kwargs.get("customer_id", uuid4()),
customer_email=kwargs.get("email", "test@example.com"),
aggregate_version=self.version,
)
def item_added(self, **kwargs) -> OrderItemAdded:
self.version += 1
return OrderItemAdded(
aggregate_id=self.aggregate_id,
product_id=kwargs.get("product_id", uuid4()),
product_name=kwargs.get("name", "Test Product"),
quantity=kwargs.get("quantity", 1),
unit_price=kwargs.get("price", 10.00),
aggregate_version=self.version,
)
# Usage in tests
def test_with_builder():
builder = EventBuilder()
events = [
builder.order_created(email="customer@test.com"),
builder.item_added(name="Widget", quantity=2, price=15.00),
builder.item_added(name="Gadget", quantity=1, price=30.00),
]
order = OrderAggregate(builder.aggregate_id)
order.load_from_history(events)
assert order.version == 3
assert len(order.state.items) == 2
Aggregate Test Helper¶
class AggregateTestHelper:
"""Helper for testing aggregates in isolation."""
def __init__(self, aggregate_class, aggregate_type: str):
self.aggregate_class = aggregate_class
self.aggregate_type = aggregate_type
self.store = InMemoryEventStore()
self.repo = AggregateRepository(
self.store, aggregate_class, aggregate_type
)
def create(self, aggregate_id: UUID | None = None):
return self.repo.create_new(aggregate_id or uuid4())
async def save(self, aggregate):
await self.repo.save(aggregate)
async def load(self, aggregate_id: UUID):
return await self.repo.load(aggregate_id)
def get_events(self, aggregate_id: UUID):
return self.store._events.get(aggregate_id, [])
# Usage
@pytest.fixture
def order_helper():
return AggregateTestHelper(OrderAggregate, "Order")
@pytest.mark.asyncio
async def test_order_flow(order_helper):
order = order_helper.create()
order.create(uuid4(), "test@example.com")
await order_helper.save(order)
events = order_helper.get_events(order.aggregate_id)
assert len(events) == 1
Best Practices¶
- Use InMemoryEventStore for unit tests - Fast and deterministic
- Test event application separately - Verify state transitions work correctly
- Test business rules - Ensure commands validate properly
- Test idempotency - Commands should be safe to retry
- Test projections in isolation - Feed events directly
- Use fixtures - Share setup code between tests
- Mark integration tests - Separate slow tests from fast ones
See Also¶
- Testing Strategy Guide - Comprehensive testing documentation with advanced patterns, integration testing setup, and CI/CD examples