Tutorial 8: Testing Event-Sourced Systems¶
Difficulty: Intermediate
Prerequisites¶
- Tutorial 1: Introduction to Event Sourcing
- Tutorial 2: Your First Domain Event
- Tutorial 3: Building Your First Aggregate
- Tutorial 4: Event Stores
- Tutorial 5: Repositories and Aggregate Lifecycle
- Tutorial 6: Projections
- Python 3.10 or higher
- pytest and pytest-asyncio installed
Learning Objectives¶
By the end of this tutorial, you will be able to:
- Understand why event sourcing simplifies testing
- Write unit tests for aggregates using direct event application
- Test event replay and state reconstruction
- Use InMemoryEventStore for fast, isolated integration tests
- Create pytest fixtures for reusable test infrastructure
- Test repositories with optimistic concurrency control
- Test projections with event handlers
- Apply given-when-then testing patterns for aggregates
- Test business rule validation in command methods
Why Event Sourcing Simplifies Testing¶
Event-sourced systems are inherently testable because:
- Events are explicit: Inputs and outputs are clearly defined domain events
- State is deterministic: Same events always produce the same state
- No database required: InMemoryEventStore provides fast, isolated tests
- Time travel: Test historical scenarios by replaying events
- Business rules visible: Validation logic is separate from state updates
Traditional vs Event-Sourced Testing¶
Traditional CRUD testing:
# Setup database
db = TestDatabase()
await db.migrate()
# Create test data
task = Task(title="Test")
await db.save(task)
# Test
task = await db.get(task.id)
task.complete()
await db.save(task)
# Assert
task = await db.get(task.id)
assert task.status == "completed"
# Teardown
await db.drop_all()
Event-sourced testing:
# No database setup needed
task = TaskAggregate(uuid4())
# Test
task.create("Test", "Description")
task.complete(user_id)
# Assert
assert task.state.status == "completed"
assert len(task.uncommitted_events) == 2
Event sourcing eliminates database setup, teardown, and slow I/O operations.
Testing Aggregates: Unit Tests¶
Aggregate tests verify business rules and state transitions without touching the event store.
Basic Aggregate Test¶
from uuid import uuid4
import pytest
from my_domain import TaskAggregate
def test_create_task():
"""Test creating a new task."""
# Arrange
task_id = uuid4()
task = TaskAggregate(task_id)
# Act
task.create("Learn testing", "Master event sourcing tests")
# Assert: State updated correctly
assert task.state.title == "Learn testing"
assert task.state.description == "Master event sourcing tests"
assert task.state.status == "pending"
# Assert: Version incremented
assert task.version == 1
# Assert: Event emitted
assert len(task.uncommitted_events) == 1
event = task.uncommitted_events[0]
assert event.event_type == "TaskCreated"
assert event.aggregate_version == 1
assert event.title == "Learn testing"
Key pattern: 1. Arrange: Create aggregate instance 2. Act: Execute command 3. Assert: Check state, version, and events
Testing Business Rules¶
Commands should validate business rules before emitting events:
def test_cannot_complete_task_twice():
"""Test business rule: completed tasks cannot be completed again."""
# Arrange
task = TaskAggregate(uuid4())
task.create("Test task", "Description")
user_id = uuid4()
task.complete(user_id)
# Act & Assert
with pytest.raises(ValueError, match="already completed"):
task.complete(user_id)
def test_cannot_assign_completed_task():
"""Test business rule: completed tasks cannot be reassigned."""
task = TaskAggregate(uuid4())
task.create("Test task", "Description")
user_id = uuid4()
task.complete(user_id)
with pytest.raises(ValueError, match="Cannot assign completed task"):
task.assign(uuid4())
def test_create_validates_duplicate():
"""Test business rule: cannot create task twice."""
task = TaskAggregate(uuid4())
task.create("First", "Description")
with pytest.raises(ValueError, match="already exists"):
task.create("Second", "Description")
Testing philosophy: Business rules should fail before events are emitted. Events represent facts that always succeed.
Given-When-Then Pattern¶
The given-when-then pattern makes tests read like specifications:
def test_complete_assigned_task():
"""
Given a task that has been created and assigned
When the assigned user completes the task
Then the task status is completed
"""
# Given: task exists and is assigned
task = TaskAggregate(uuid4())
user_id = uuid4()
task.create("Test task", "Description")
task.assign(user_id)
task.mark_events_as_committed() # Simulate persistence
# When: user completes the task
task.complete(user_id)
# Then: status is completed
assert task.state.status == "completed"
assert task.state.completed_by == user_id
assert task.version == 3
# Then: only the completion event is uncommitted
assert len(task.uncommitted_events) == 1
assert task.uncommitted_events[0].event_type == "TaskCompleted"
Note: Use mark_events_as_committed() to simulate event persistence between commands. This keeps uncommitted_events clean for the "when" phase.
Testing Event Replay¶
Aggregates must rebuild state correctly by replaying historical events:
def test_rebuild_task_from_events():
"""Test state reconstruction from event history."""
# Arrange: Create event sequence
task_id = uuid4()
user_id = uuid4()
events = [
TaskCreated(
aggregate_id=task_id,
title="Historical task",
description="From event store",
aggregate_version=1,
),
TaskAssigned(
aggregate_id=task_id,
assigned_to=user_id,
aggregate_version=2,
),
TaskCompleted(
aggregate_id=task_id,
completed_by=user_id,
aggregate_version=3,
),
]
# Act: Replay events
task = TaskAggregate(task_id)
task.load_from_history(events)
# Assert: State reconstructed correctly
assert task.version == 3
assert task.state.title == "Historical task"
assert task.state.status == "completed"
assert task.state.assigned_to == user_id
assert task.state.completed_by == user_id
# Assert: No uncommitted events (historical replay)
assert len(task.uncommitted_events) == 0
Critical test: Every aggregate must correctly replay its events. This test ensures your event handlers work correctly.
Testing with InMemoryEventStore¶
Use InMemoryEventStore for integration tests without database setup:
import pytest
from eventsource import InMemoryEventStore, AggregateRepository
@pytest.mark.asyncio
async def test_save_and_load_task():
"""Test complete save and load workflow."""
# Arrange
store = InMemoryEventStore()
repo = AggregateRepository(
event_store=store,
aggregate_factory=TaskAggregate,
aggregate_type="Task",
)
# Act: Create and save
task_id = uuid4()
task = repo.create_new(task_id)
task.create("Integration test", "Testing with event store")
await repo.save(task)
# Assert: Load and verify
loaded_task = await repo.load(task_id)
assert loaded_task.state.title == "Integration test"
assert loaded_task.version == 1
assert loaded_task.aggregate_id == task_id
@pytest.mark.asyncio
async def test_repository_optimistic_locking():
"""Test optimistic concurrency control."""
from eventsource import OptimisticLockError
# Arrange
store = InMemoryEventStore()
repo = AggregateRepository(
event_store=store,
aggregate_factory=TaskAggregate,
aggregate_type="Task",
)
task_id = uuid4()
task = repo.create_new(task_id)
task.create("Test", "Description")
await repo.save(task)
# Act: Load twice (simulate concurrent access)
task1 = await repo.load(task_id)
task2 = await repo.load(task_id)
# Task 1 saves successfully
task1.assign(uuid4())
await repo.save(task1)
# Task 2 save fails with conflict
task2.assign(uuid4())
with pytest.raises(OptimisticLockError) as exc_info:
await repo.save(task2)
# Assert: Error details
assert exc_info.value.expected_version == 1
assert exc_info.value.actual_version == 2
assert exc_info.value.aggregate_id == task_id
Why InMemoryEventStore? - No database setup or teardown - Tests run in milliseconds, not seconds - Perfect isolation between tests - Same interface as PostgreSQL/SQLite stores
Pytest Fixtures for Test Infrastructure¶
Create reusable fixtures to eliminate boilerplate:
import pytest
from uuid import uuid4
from eventsource import InMemoryEventStore, AggregateRepository
# conftest.py or test file
@pytest.fixture
def event_store():
"""Provide a fresh event store for each test."""
return InMemoryEventStore()
@pytest.fixture
def task_repo(event_store):
"""Provide a configured task repository."""
return AggregateRepository(
event_store=event_store,
aggregate_factory=TaskAggregate,
aggregate_type="Task",
)
@pytest.fixture
def task_id():
"""Provide a random task ID."""
return uuid4()
@pytest.fixture
def user_id():
"""Provide a random user ID."""
return uuid4()
# Now tests are concise
@pytest.mark.asyncio
async def test_create_task_with_fixtures(task_repo, task_id):
"""Test using fixtures."""
task = task_repo.create_new(task_id)
task.create("Fixture test", "Using pytest fixtures")
await task_repo.save(task)
loaded = await task_repo.load(task_id)
assert loaded.state.title == "Fixture test"
@pytest.mark.asyncio
async def test_assign_task_with_fixtures(task_repo, task_id, user_id):
"""Test using multiple fixtures."""
task = task_repo.create_new(task_id)
task.create("Test", "Description")
task.assign(user_id)
await task_repo.save(task)
loaded = await task_repo.load(task_id)
assert loaded.state.assigned_to == user_id
Fixture benefits: - Eliminate repetitive setup code - Make tests more readable - Easy to update all tests by changing fixture - pytest automatically provides isolation
Testing Projections¶
Projections are tested by feeding events and verifying read model updates:
import pytest
from collections import defaultdict
from eventsource import DomainEvent
class TaskListProjection:
"""Example projection for testing."""
def __init__(self):
self.tasks: dict[UUID, dict] = {}
def subscribed_to(self) -> list[type[DomainEvent]]:
return [TaskCreated, TaskCompleted]
async def handle(self, event: DomainEvent) -> None:
if isinstance(event, TaskCreated):
self.tasks[event.aggregate_id] = {
"title": event.title,
"status": "pending",
}
elif isinstance(event, TaskCompleted):
if event.aggregate_id in self.tasks:
self.tasks[event.aggregate_id]["status"] = "completed"
@pytest.mark.asyncio
async def test_projection_handles_task_created():
"""Test projection processes TaskCreated event."""
# Arrange
projection = TaskListProjection()
task_id = uuid4()
event = TaskCreated(
aggregate_id=task_id,
title="Test task",
description="Description",
aggregate_version=1,
)
# Act
await projection.handle(event)
# Assert
assert task_id in projection.tasks
assert projection.tasks[task_id]["title"] == "Test task"
assert projection.tasks[task_id]["status"] == "pending"
@pytest.mark.asyncio
async def test_projection_handles_task_completed():
"""Test projection processes TaskCompleted event."""
# Arrange
projection = TaskListProjection()
task_id = uuid4()
# Create task first
await projection.handle(TaskCreated(
aggregate_id=task_id,
title="Test",
description="Desc",
aggregate_version=1,
))
# Act: Complete task
await projection.handle(TaskCompleted(
aggregate_id=task_id,
completed_by=uuid4(),
aggregate_version=2,
))
# Assert
assert projection.tasks[task_id]["status"] == "completed"
@pytest.mark.asyncio
async def test_projection_processes_event_sequence():
"""Test projection handles multiple events correctly."""
# Arrange
projection = TaskListProjection()
# Act: Process multiple task lifecycles
task1_id = uuid4()
task2_id = uuid4()
await projection.handle(TaskCreated(
aggregate_id=task1_id,
title="First task",
description="Desc",
aggregate_version=1,
))
await projection.handle(TaskCreated(
aggregate_id=task2_id,
title="Second task",
description="Desc",
aggregate_version=1,
))
await projection.handle(TaskCompleted(
aggregate_id=task1_id,
completed_by=uuid4(),
aggregate_version=2,
))
# Assert
assert len(projection.tasks) == 2
assert projection.tasks[task1_id]["status"] == "completed"
assert projection.tasks[task2_id]["status"] == "pending"
Projection testing pattern:
1. Create projection instance
2. Send events via handle()
3. Verify read model state
4. No event store needed
Testing Event Publishing¶
Test that repositories publish events after successful saves:
@pytest.mark.asyncio
async def test_repository_publishes_events():
"""Test repository publishes events to event bus."""
from eventsource import InMemoryEventBus
# Arrange
store = InMemoryEventStore()
bus = InMemoryEventBus()
repo = AggregateRepository(
event_store=store,
aggregate_factory=TaskAggregate,
aggregate_type="Task",
event_publisher=bus, # Configure event publishing
)
# Track published events
published_events = []
async def capture_event(event):
published_events.append(event)
await bus.subscribe(TaskCreated, capture_event)
# Act
task = repo.create_new(uuid4())
task.create("Published task", "Will trigger event")
await repo.save(task)
# Assert
assert len(published_events) == 1
assert published_events[0].event_type == "TaskCreated"
assert published_events[0].title == "Published task"
Advanced Testing Patterns¶
Testing Aggregate Lifecycle¶
@pytest.mark.asyncio
async def test_complete_task_lifecycle(task_repo):
"""Test full task lifecycle: create -> assign -> complete."""
# Arrange
task_id = uuid4()
user_id = uuid4()
# Act: Create
task = task_repo.create_new(task_id)
task.create("Lifecycle test", "Full workflow")
await task_repo.save(task)
# Act: Assign
task = await task_repo.load(task_id)
task.assign(user_id)
await task_repo.save(task)
# Act: Complete
task = await task_repo.load(task_id)
task.complete(user_id)
await task_repo.save(task)
# Assert: Final state
final_task = await task_repo.load(task_id)
assert final_task.version == 3
assert final_task.state.status == "completed"
assert final_task.state.assigned_to == user_id
assert final_task.state.completed_by == user_id
# Assert: Event history
from eventsource import ExpectedVersion
stream = await task_repo.event_store.get_events(task_id, "Task")
assert len(stream.events) == 3
assert stream.events[0].event_type == "TaskCreated"
assert stream.events[1].event_type == "TaskAssigned"
assert stream.events[2].event_type == "TaskCompleted"
Testing Event Metadata¶
def test_event_metadata_populated():
"""Test that events have proper metadata."""
task = TaskAggregate(uuid4())
task.create("Test", "Description")
event = task.uncommitted_events[0]
# Standard metadata
assert event.event_id is not None
assert event.event_type == "TaskCreated"
assert event.aggregate_type == "Task"
assert event.aggregate_id == task.aggregate_id
assert event.aggregate_version == 1
assert event.occurred_at is not None
assert event.correlation_id is not None
Testing Correlation and Causation¶
def test_event_causation_tracking():
"""Test events can track causation chains."""
# Create initial event
task = TaskAggregate(uuid4())
task.create("Parent task", "Main task")
parent_event = task.uncommitted_events[0]
# Create caused event
subtask = TaskAggregate(uuid4())
subtask.create("Subtask", "Child task")
child_event = subtask.uncommitted_events[0].with_causation(parent_event)
# Assert causation
assert child_event.causation_id == parent_event.event_id
assert child_event.is_caused_by(parent_event)
assert child_event.correlation_id == parent_event.correlation_id
Complete Working Example¶
Here's a complete test file demonstrating all concepts:
"""
Tutorial 8: Testing Event-Sourced Systems
Run with: pytest tutorial_08_testing.py -v
"""
import pytest
from uuid import UUID, uuid4
from pydantic import BaseModel
from eventsource import (
AggregateRepository,
AggregateRoot,
DomainEvent,
InMemoryEventStore,
InMemoryEventBus,
OptimisticLockError,
register_event,
)
# =============================================================================
# Domain Model
# =============================================================================
@register_event
class TaskCreated(DomainEvent):
event_type: str = "TaskCreated"
aggregate_type: str = "Task"
title: str
description: str
@register_event
class TaskAssigned(DomainEvent):
event_type: str = "TaskAssigned"
aggregate_type: str = "Task"
assigned_to: UUID
@register_event
class TaskCompleted(DomainEvent):
event_type: str = "TaskCompleted"
aggregate_type: str = "Task"
completed_by: UUID
class TaskState(BaseModel):
task_id: UUID
title: str = ""
description: str = ""
assigned_to: UUID | None = None
status: str = "pending"
completed_by: UUID | None = None
class TaskAggregate(AggregateRoot[TaskState]):
aggregate_type = "Task"
def _get_initial_state(self) -> TaskState:
return TaskState(task_id=self.aggregate_id)
def _apply(self, event: DomainEvent) -> None:
if isinstance(event, TaskCreated):
self._state = TaskState(
task_id=self.aggregate_id,
title=event.title,
description=event.description,
status="pending",
)
elif isinstance(event, TaskAssigned):
if self._state:
self._state = self._state.model_copy(
update={"assigned_to": event.assigned_to}
)
elif isinstance(event, TaskCompleted):
if self._state:
self._state = self._state.model_copy(
update={
"status": "completed",
"completed_by": event.completed_by,
}
)
def create(self, title: str, description: str) -> None:
if self.version > 0:
raise ValueError("Task already exists")
self.apply_event(TaskCreated(
aggregate_id=self.aggregate_id,
title=title,
description=description,
aggregate_version=self.get_next_version(),
))
def assign(self, user_id: UUID) -> None:
if not self.state:
raise ValueError("Task does not exist")
if self.state.status == "completed":
raise ValueError("Cannot assign completed task")
self.apply_event(TaskAssigned(
aggregate_id=self.aggregate_id,
assigned_to=user_id,
aggregate_version=self.get_next_version(),
))
def complete(self, completed_by: UUID) -> None:
if not self.state:
raise ValueError("Task does not exist")
if self.state.status == "completed":
raise ValueError("Task already completed")
self.apply_event(TaskCompleted(
aggregate_id=self.aggregate_id,
completed_by=completed_by,
aggregate_version=self.get_next_version(),
))
# =============================================================================
# Test Fixtures
# =============================================================================
@pytest.fixture
def event_store():
return InMemoryEventStore()
@pytest.fixture
def task_repo(event_store):
return AggregateRepository(
event_store=event_store,
aggregate_factory=TaskAggregate,
aggregate_type="Task",
)
# =============================================================================
# Unit Tests: Aggregates
# =============================================================================
class TestTaskAggregate:
"""Unit tests for TaskAggregate business logic."""
def test_create_task(self):
"""Test creating a new task."""
task = TaskAggregate(uuid4())
task.create("Test task", "Description")
assert task.state.title == "Test task"
assert task.state.status == "pending"
assert task.version == 1
assert len(task.uncommitted_events) == 1
def test_assign_task(self):
"""Test assigning a task to a user."""
task = TaskAggregate(uuid4())
task.create("Test", "Desc")
user_id = uuid4()
task.assign(user_id)
assert task.state.assigned_to == user_id
assert task.version == 2
def test_complete_task(self):
"""Test completing a task."""
task = TaskAggregate(uuid4())
task.create("Test", "Desc")
user_id = uuid4()
task.complete(user_id)
assert task.state.status == "completed"
assert task.state.completed_by == user_id
def test_cannot_complete_twice(self):
"""Test business rule: cannot complete completed task."""
task = TaskAggregate(uuid4())
task.create("Test", "Desc")
user_id = uuid4()
task.complete(user_id)
with pytest.raises(ValueError, match="already completed"):
task.complete(user_id)
def test_cannot_assign_completed_task(self):
"""Test business rule: cannot assign completed task."""
task = TaskAggregate(uuid4())
task.create("Test", "Desc")
user_id = uuid4()
task.complete(user_id)
with pytest.raises(ValueError, match="Cannot assign completed"):
task.assign(uuid4())
def test_rebuild_from_events(self):
"""Test state reconstruction from events."""
task_id = uuid4()
user_id = uuid4()
events = [
TaskCreated(
aggregate_id=task_id,
title="Historical task",
description="From events",
aggregate_version=1,
),
TaskAssigned(
aggregate_id=task_id,
assigned_to=user_id,
aggregate_version=2,
),
TaskCompleted(
aggregate_id=task_id,
completed_by=user_id,
aggregate_version=3,
),
]
task = TaskAggregate(task_id)
task.load_from_history(events)
assert task.version == 3
assert task.state.status == "completed"
assert task.state.assigned_to == user_id
assert len(task.uncommitted_events) == 0
# =============================================================================
# Integration Tests: Repository
# =============================================================================
class TestTaskRepository:
"""Integration tests for TaskAggregate with repository."""
@pytest.mark.asyncio
async def test_save_and_load(self, task_repo):
"""Test saving and loading a task."""
task_id = uuid4()
task = task_repo.create_new(task_id)
task.create("Integration test", "Repository test")
await task_repo.save(task)
loaded = await task_repo.load(task_id)
assert loaded.state.title == "Integration test"
assert loaded.version == 1
@pytest.mark.asyncio
async def test_load_modify_save(self, task_repo):
"""Test load-modify-save pattern."""
task_id = uuid4()
# Create
task = task_repo.create_new(task_id)
task.create("Test", "Description")
await task_repo.save(task)
# Load and modify
task = await task_repo.load(task_id)
user_id = uuid4()
task.assign(user_id)
await task_repo.save(task)
# Verify
task = await task_repo.load(task_id)
assert task.state.assigned_to == user_id
assert task.version == 2
@pytest.mark.asyncio
async def test_optimistic_locking(self, task_repo):
"""Test optimistic concurrency control."""
task_id = uuid4()
task = task_repo.create_new(task_id)
task.create("Test", "Description")
await task_repo.save(task)
# Load twice
task1 = await task_repo.load(task_id)
task2 = await task_repo.load(task_id)
# First save succeeds
task1.assign(uuid4())
await task_repo.save(task1)
# Second save fails
task2.assign(uuid4())
with pytest.raises(OptimisticLockError):
await task_repo.save(task2)
@pytest.mark.asyncio
async def test_event_history(self, task_repo):
"""Test event history is preserved."""
task_id = uuid4()
user_id = uuid4()
task = task_repo.create_new(task_id)
task.create("Test", "Description")
task.assign(user_id)
task.complete(user_id)
await task_repo.save(task)
stream = await task_repo.event_store.get_events(task_id, "Task")
assert len(stream.events) == 3
assert stream.events[0].event_type == "TaskCreated"
assert stream.events[1].event_type == "TaskAssigned"
assert stream.events[2].event_type == "TaskCompleted"
# =============================================================================
# Integration Tests: Event Publishing
# =============================================================================
class TestEventPublishing:
"""Tests for event publishing via repository."""
@pytest.mark.asyncio
async def test_repository_publishes_events(self, event_store):
"""Test repository publishes events after save."""
bus = InMemoryEventBus()
repo = AggregateRepository(
event_store=event_store,
aggregate_factory=TaskAggregate,
aggregate_type="Task",
event_publisher=bus,
)
published = []
async def capture(event):
published.append(event)
await bus.subscribe(TaskCreated, capture)
task = repo.create_new(uuid4())
task.create("Published task", "Will trigger handler")
await repo.save(task)
assert len(published) == 1
assert published[0].event_type == "TaskCreated"
Run the tests:
Expected output:
test_create_task PASSED
test_assign_task PASSED
test_complete_task PASSED
test_cannot_complete_twice PASSED
test_cannot_assign_completed_task PASSED
test_rebuild_from_events PASSED
test_save_and_load PASSED
test_load_modify_save PASSED
test_optimistic_locking PASSED
test_event_history PASSED
test_repository_publishes_events PASSED
Testing Best Practices¶
1. Use InMemoryEventStore for Speed¶
# Fast: No database I/O
@pytest.fixture
def event_store():
return InMemoryEventStore()
# Slow: Database setup and teardown
@pytest.fixture
async def event_store():
db = await create_test_database()
await db.migrate()
yield db
await db.drop_all()
InMemoryEventStore runs tests 100x faster than database-backed stores.
2. Test One Thing at a Time¶
# Good: Tests one business rule
def test_cannot_complete_twice():
task = TaskAggregate(uuid4())
task.create("Test", "Desc")
task.complete(uuid4())
with pytest.raises(ValueError, match="already completed"):
task.complete(uuid4())
# Bad: Tests multiple things
def test_task_lifecycle():
task = TaskAggregate(uuid4())
task.create("Test", "Desc")
task.assign(uuid4())
task.complete(uuid4())
with pytest.raises(ValueError):
task.complete(uuid4())
# Too much in one test
3. Use Descriptive Test Names¶
# Good
def test_cannot_assign_completed_task()
def test_rebuild_task_from_historical_events()
def test_repository_detects_concurrent_modification()
# Bad
def test_task_1()
def test_error_case()
def test_repo()
4. Assert on Multiple Levels¶
def test_complete_task():
task = TaskAggregate(uuid4())
task.create("Test", "Desc")
user_id = uuid4()
task.complete(user_id)
# Assert state
assert task.state.status == "completed"
# Assert version
assert task.version == 2
# Assert events
assert len(task.uncommitted_events) == 2
assert task.uncommitted_events[1].event_type == "TaskCompleted"
5. Use mark_events_as_committed() for Multi-Step Tests¶
def test_multi_step_workflow():
task = TaskAggregate(uuid4())
# Step 1
task.create("Test", "Desc")
task.mark_events_as_committed() # Simulate save
# Step 2
task.assign(uuid4())
task.mark_events_as_committed() # Simulate save
# Step 3
task.complete(uuid4())
# Only last event uncommitted
assert len(task.uncommitted_events) == 1
Common Testing Patterns¶
Testing Exception Messages¶
def test_error_message_clarity():
"""Test error messages are helpful."""
task = TaskAggregate(uuid4())
task.create("Test", "Desc")
task.complete(uuid4())
with pytest.raises(ValueError) as exc_info:
task.complete(uuid4())
assert "already completed" in str(exc_info.value)
Testing Event Metadata¶
def test_events_have_correlation_id():
"""Test events track correlation."""
task = TaskAggregate(uuid4())
task.create("Test", "Desc")
event1 = task.uncommitted_events[0]
task.assign(uuid4())
event2 = task.uncommitted_events[1]
# Same correlation chain
assert event1.correlation_id == event2.correlation_id
Testing Aggregate Exists¶
@pytest.mark.asyncio
async def test_check_aggregate_exists(task_repo):
"""Test checking if aggregate exists."""
task_id = uuid4()
assert not await task_repo.exists(task_id)
task = task_repo.create_new(task_id)
task.create("Test", "Desc")
await task_repo.save(task)
assert await task_repo.exists(task_id)
Key Takeaways¶
- InMemoryEventStore eliminates database overhead: Tests run in milliseconds
- Test aggregates in isolation: No event store needed for unit tests
- Use given-when-then pattern: Makes tests read like specifications
- Test business rules explicitly: Verify commands fail with invalid state
- Test event replay: Ensure aggregates rebuild correctly from history
- Use pytest fixtures: Eliminate boilerplate and improve readability
- Assert on state, version, and events: Comprehensive verification
- Test optimistic locking: Verify concurrent modification detection
- mark_events_as_committed() for multi-step tests: Clean uncommitted events between steps
Next Steps¶
You've completed the testing tutorial! You now know how to: - Write fast, isolated tests with InMemoryEventStore - Test aggregate business rules and state transitions - Test event replay and state reconstruction - Use pytest fixtures for clean test code - Test repositories with optimistic concurrency control
Continue to Tutorial 9: Error Handling to learn about: - Handling business rule violations - Dealing with OptimisticLockError - Dead letter queues for projection failures - Retry strategies - Error recovery patterns
For more examples, see:
- tests/unit/test_aggregate_root.py - Comprehensive aggregate tests
- tests/unit/test_aggregate_repository.py - Repository test patterns
- tests/unit/test_projection_base.py - Projection testing examples
- tests/conftest.py - Shared fixtures and testing utilities