Tutorial 16: Multi-Tenancy Patterns¶
Difficulty: Advanced
Prerequisites¶
- Tutorial 1: Introduction to Event Sourcing
- Tutorial 4: Event Stores
- Tutorial 5: Repositories - Managing Aggregate Lifecycle
- Tutorial 6: Projections - Building Read Models
- Tutorial 15: Production Deployment
- Python 3.10 or higher
- Understanding of async/await
- Understanding of multi-tenant application architecture
Learning Objectives¶
By the end of this tutorial, you will be able to:
- Explain multi-tenancy patterns in event sourcing systems
- Use the built-in
tenant_idfield on domain events - Implement tenant-aware aggregates and repositories
- Filter events by tenant using event store queries
- Build tenant-scoped and cross-tenant projections
- Propagate tenant context through your application
- Apply security best practices for tenant isolation
- Test multi-tenant applications effectively
What is Multi-Tenancy?¶
Multi-tenancy allows multiple customers (tenants) to share the same infrastructure while maintaining strict data isolation. Each tenant's data is logically separated, ensuring privacy and security.
Multi-Tenancy Patterns¶
| Pattern | Description | Use Cases |
|---|---|---|
| Separate Databases | Each tenant has own database | Strongest isolation, highest cost |
| Separate Schemas | Shared database, tenant schemas | Good isolation, moderate cost |
| Shared Schema | Application-level filtering | Best scalability, lowest cost |
eventsource-py uses shared schema with application-level isolation via the built-in tenant_id field. This provides excellent scalability while maintaining strong isolation guarantees.
Benefits¶
- Cost efficiency: Share infrastructure across tenants
- Operational simplicity: Single deployment, unified monitoring
- Scalability: Add tenants without infrastructure changes
- Data isolation: Built-in tenant filtering at the event store level
Trade-offs¶
- Security responsibility: Application must enforce tenant boundaries
- Cross-tenant queries: More complex than single-tenant systems
- Testing complexity: Must verify isolation between tenants
- Migration coordination: Schema changes affect all tenants
The tenant_id Field¶
Every DomainEvent includes an optional tenant_id field that's automatically indexed and persisted by all event store implementations.
Field Definition¶
class DomainEvent(BaseModel):
"""Base class for all domain events."""
tenant_id: UUID | None = Field(
default=None,
description="Tenant this event belongs to (optional)",
)
# ... other fields
Key characteristics:
- Optional: Events without
tenant_idare not tenant-scoped - Immutable: Once set, cannot be changed (events are frozen)
- Indexed: All event stores index
tenant_idfor fast filtering - Propagated: Automatically preserved through event chains
Creating Tenant-Scoped Events¶
Events inherit tenant_id from DomainEvent. You just need to provide it when creating events.
Basic Event Definition¶
from uuid import UUID, uuid4
from eventsource import DomainEvent, register_event
@register_event
class OrderPlaced(DomainEvent):
event_type: str = "OrderPlaced"
aggregate_type: str = "Order"
customer_id: UUID
customer_name: str
total_amount: float
# tenant_id inherited from DomainEvent
@register_event
class OrderShipped(DomainEvent):
event_type: str = "OrderShipped"
aggregate_type: str = "Order"
tracking_number: str
Creating Events with Tenant Context¶
# Tenant context (typically from authentication)
tenant_id = uuid4() # ACME Corp
order_id = uuid4()
# Create event with tenant
event = OrderPlaced(
aggregate_id=order_id,
tenant_id=tenant_id, # Associate with tenant
customer_id=uuid4(),
customer_name="Alice Johnson",
total_amount=299.99,
aggregate_version=1,
)
print(f"Event belongs to tenant: {event.tenant_id}")
Important: The tenant_id should come from a trusted source (authentication context), never from user input.
Tenant-Aware Aggregates¶
Aggregates need to track and enforce tenant context throughout their lifecycle.
Aggregate with Tenant Support¶
from uuid import UUID
from pydantic import BaseModel
from eventsource import AggregateRoot, DomainEvent
class OrderState(BaseModel):
order_id: UUID
tenant_id: UUID | None = None
customer_id: UUID | None = None
customer_name: str = ""
total_amount: float = 0.0
status: str = "draft"
tracking_number: str | None = None
class OrderAggregate(AggregateRoot[OrderState]):
aggregate_type = "Order"
def __init__(self, aggregate_id: UUID, tenant_id: UUID | None = None):
"""
Initialize aggregate with tenant context.
Args:
aggregate_id: Unique order identifier
tenant_id: Tenant this order belongs to
"""
super().__init__(aggregate_id)
self._tenant_id = tenant_id
@property
def tenant_id(self) -> UUID | None:
"""Get the tenant ID for this aggregate."""
return self._tenant_id
def _get_initial_state(self) -> OrderState:
"""Initialize state with tenant context."""
return OrderState(
order_id=self.aggregate_id,
tenant_id=self._tenant_id,
)
def _apply(self, event: DomainEvent) -> None:
"""Apply events and update state."""
if isinstance(event, OrderPlaced):
self._state = OrderState(
order_id=self.aggregate_id,
tenant_id=event.tenant_id,
customer_id=event.customer_id,
customer_name=event.customer_name,
total_amount=event.total_amount,
status="placed",
)
# Set tenant from first event
if event.tenant_id:
self._tenant_id = event.tenant_id
elif isinstance(event, OrderShipped):
if self._state:
self._state = self._state.model_copy(
update={
"status": "shipped",
"tracking_number": event.tracking_number,
}
)
def place_order(self, customer_id: UUID, customer_name: str, total_amount: float) -> None:
"""Place a new order."""
if self.version > 0:
raise ValueError("Order already placed")
if not self._tenant_id:
raise ValueError("Tenant context required to place order")
self.apply_event(
OrderPlaced(
aggregate_id=self.aggregate_id,
tenant_id=self._tenant_id, # Always include tenant
customer_id=customer_id,
customer_name=customer_name,
total_amount=total_amount,
aggregate_version=self.get_next_version(),
)
)
def ship(self, tracking_number: str) -> None:
"""Ship the order."""
if not self._state or self._state.status != "placed":
raise ValueError("Order must be placed before shipping")
self.apply_event(
OrderShipped(
aggregate_id=self.aggregate_id,
tenant_id=self._tenant_id, # Always include tenant
tracking_number=tracking_number,
aggregate_version=self.get_next_version(),
)
)
Key patterns:
- Constructor takes tenant_id: Aggregate knows its tenant from creation
- Property exposes tenant: Easy to check which tenant owns the aggregate
- All events include tenant: Ensures complete tenant tracking
- Tenant validation: Prevents operations without tenant context
Tenant-Aware Repository¶
Repositories should enforce tenant boundaries, preventing cross-tenant access.
Basic Tenant-Scoped Repository¶
from eventsource import AggregateRepository, EventStore
class TenantOrderRepository:
"""Repository that enforces tenant boundaries."""
def __init__(self, event_store: EventStore, tenant_id: UUID):
"""
Create tenant-scoped repository.
Args:
event_store: Underlying event store
tenant_id: Tenant this repository is scoped to
"""
self._tenant_id = tenant_id
self._inner_repo = AggregateRepository(
event_store=event_store,
aggregate_factory=lambda aid: OrderAggregate(aid, tenant_id),
aggregate_type="Order",
)
@property
def tenant_id(self) -> UUID:
"""Get the tenant ID this repository is scoped to."""
return self._tenant_id
def create_new(self, order_id: UUID) -> OrderAggregate:
"""Create new order aggregate for this tenant."""
return OrderAggregate(order_id, self._tenant_id)
async def save(self, order: OrderAggregate) -> None:
"""
Save order aggregate with tenant validation.
Raises:
PermissionError: If order belongs to different tenant
"""
if order.tenant_id != self._tenant_id:
raise PermissionError(
f"Cannot save order from tenant {order.tenant_id} "
f"using repository scoped to {self._tenant_id}"
)
await self._inner_repo.save(order)
async def load(self, order_id: UUID) -> OrderAggregate:
"""
Load order aggregate with tenant validation.
Raises:
PermissionError: If order belongs to different tenant
"""
order = await self._inner_repo.load(order_id)
if order.tenant_id != self._tenant_id:
raise PermissionError(
f"Order {order_id} belongs to different tenant"
)
return order
async def exists(self, order_id: UUID) -> bool:
"""Check if order exists and belongs to this tenant."""
try:
await self.load(order_id)
return True
except (PermissionError, Exception):
return False
Security enforcement:
- Constructor scoping: Repository is bound to specific tenant
- Validation on save: Prevents saving other tenant's aggregates
- Validation on load: Prevents accessing other tenant's data
- Factory injection: All created aggregates have correct tenant
Querying Events by Tenant¶
Event stores provide built-in tenant filtering for efficient queries.
Filter by Tenant and Type¶
from eventsource import InMemoryEventStore
from datetime import datetime, UTC, timedelta
async def get_tenant_orders(
event_store: InMemoryEventStore,
tenant_id: UUID,
) -> list[DomainEvent]:
"""Get all order events for a specific tenant."""
return await event_store.get_events_by_type(
aggregate_type="Order",
tenant_id=tenant_id, # Filter by tenant
)
async def get_recent_tenant_orders(
event_store: InMemoryEventStore,
tenant_id: UUID,
hours: int = 24,
) -> list[DomainEvent]:
"""Get recent order events for a specific tenant."""
since = datetime.now(UTC) - timedelta(hours=hours)
return await event_store.get_events_by_type(
aggregate_type="Order",
tenant_id=tenant_id,
from_timestamp=since, # Time filter
)
Stream All Events for Tenant¶
from eventsource.stores import ReadOptions
async def stream_tenant_events(
event_store: InMemoryEventStore,
tenant_id: UUID,
):
"""Stream all events for a specific tenant."""
options = ReadOptions(
tenant_id=tenant_id, # Filter by tenant
from_position=0, # Start from beginning
)
async for stored_event in event_store.read_all(options):
print(f"Event: {stored_event.event_type}")
print(f"Tenant: {stored_event.event.tenant_id}")
print(f"Position: {stored_event.global_position}")
Performance note: All event stores (InMemory, PostgreSQL, SQLite) index tenant_id, making these queries efficient even with millions of events.
Tenant-Scoped Projections¶
Projections can be scoped to a single tenant or process events for all tenants.
Single-Tenant Projection¶
from eventsource.projections import DeclarativeProjection, handles
class TenantOrderSummaryProjection(DeclarativeProjection):
"""Projection scoped to a specific tenant."""
def __init__(self, tenant_id: UUID):
"""
Create tenant-scoped projection.
Args:
tenant_id: Tenant to build summary for
"""
super().__init__()
self._target_tenant = tenant_id
self._orders: dict[UUID, dict] = {}
@handles(OrderPlaced)
async def _on_order_placed(self, event: OrderPlaced) -> None:
"""Handle OrderPlaced events for this tenant only."""
# Skip events from other tenants
if event.tenant_id != self._target_tenant:
return
self._orders[event.aggregate_id] = {
"order_id": event.aggregate_id,
"customer_id": event.customer_id,
"customer_name": event.customer_name,
"total_amount": event.total_amount,
"status": "placed",
}
@handles(OrderShipped)
async def _on_order_shipped(self, event: OrderShipped) -> None:
"""Handle OrderShipped events for this tenant only."""
if event.tenant_id != self._target_tenant:
return
if event.aggregate_id in self._orders:
self._orders[event.aggregate_id]["status"] = "shipped"
async def _truncate_read_models(self) -> None:
"""Clear projection data."""
self._orders.clear()
def get_orders(self) -> list[dict]:
"""Get all orders for this tenant."""
return list(self._orders.values())
def get_order_count(self) -> int:
"""Get total order count for this tenant."""
return len(self._orders)
Cross-Tenant Projection¶
class GlobalOrderStatsProjection(DeclarativeProjection):
"""Projection that aggregates stats across all tenants."""
def __init__(self):
super().__init__()
self._stats_by_tenant: dict[UUID, dict] = {}
@handles(OrderPlaced)
async def _on_order_placed(self, event: OrderPlaced) -> None:
"""Track order statistics per tenant."""
tenant_id = event.tenant_id or uuid4() # Handle null tenant
if tenant_id not in self._stats_by_tenant:
self._stats_by_tenant[tenant_id] = {
"tenant_id": tenant_id,
"order_count": 0,
"total_revenue": 0.0,
}
stats = self._stats_by_tenant[tenant_id]
stats["order_count"] += 1
stats["total_revenue"] += event.total_amount
async def _truncate_read_models(self) -> None:
"""Clear all stats."""
self._stats_by_tenant.clear()
def get_tenant_stats(self, tenant_id: UUID) -> dict | None:
"""Get statistics for a specific tenant."""
return self._stats_by_tenant.get(tenant_id)
def get_all_stats(self) -> list[dict]:
"""Get statistics for all tenants."""
return list(self._stats_by_tenant.values())
def get_top_tenants(self, limit: int = 10) -> list[dict]:
"""Get top tenants by revenue."""
stats = list(self._stats_by_tenant.values())
stats.sort(key=lambda x: x["total_revenue"], reverse=True)
return stats[:limit]
Pattern selection:
- Single-tenant: Use when building tenant-specific dashboards
- Cross-tenant: Use for admin dashboards, analytics, billing
- Hybrid: Process all events but store per-tenant read models
Tenant Context Propagation¶
In production applications, tenant context should flow through the request lifecycle automatically.
Context Variables¶
from contextvars import ContextVar
from uuid import UUID
# Thread-safe tenant context storage
_current_tenant: ContextVar[UUID | None] = ContextVar(
"current_tenant",
default=None,
)
def get_current_tenant() -> UUID | None:
"""Get the current tenant ID from context."""
return _current_tenant.get()
def set_current_tenant(tenant_id: UUID | None) -> None:
"""Set the current tenant ID in context."""
_current_tenant.set(tenant_id)
def require_tenant() -> UUID:
"""
Get the current tenant ID, raising if not set.
Raises:
ValueError: If no tenant context is set
"""
tenant_id = get_current_tenant()
if tenant_id is None:
raise ValueError("Tenant context is required but not set")
return tenant_id
Why ContextVar?
- Thread-safe: Each async task has isolated context
- Propagates: Automatically flows through
awaitcalls - Clean: No need to pass tenant_id through every function
FastAPI Integration¶
from fastapi import FastAPI, Request, Depends, HTTPException
from starlette.middleware.base import BaseHTTPMiddleware
class TenantMiddleware(BaseHTTPMiddleware):
"""Extract tenant from request and set in context."""
async def dispatch(self, request: Request, call_next):
"""Process request with tenant context."""
# Extract tenant from header
tenant_header = request.headers.get("X-Tenant-ID")
if tenant_header:
try:
tenant_id = UUID(tenant_header)
set_current_tenant(tenant_id)
except ValueError:
raise HTTPException(
status_code=400,
detail="Invalid X-Tenant-ID header format",
)
try:
response = await call_next(request)
return response
finally:
# Clear context after request
set_current_tenant(None)
# Application setup
app = FastAPI()
app.add_middleware(TenantMiddleware)
# Dependency for requiring tenant
async def get_tenant_id() -> UUID:
"""Get tenant ID from context or raise 401."""
tenant_id = get_current_tenant()
if not tenant_id:
raise HTTPException(
status_code=401,
detail="Tenant context required (missing X-Tenant-ID header)",
)
return tenant_id
# Dependency for tenant-scoped repository
async def get_order_repository(
tenant_id: UUID = Depends(get_tenant_id),
) -> TenantOrderRepository:
"""Get tenant-scoped order repository."""
event_store = app.state.event_store # Assume stored in app state
return TenantOrderRepository(event_store, tenant_id)
# Routes automatically get tenant context
@app.post("/orders")
async def create_order(
customer_id: UUID,
customer_name: str,
total_amount: float,
repo: TenantOrderRepository = Depends(get_order_repository),
):
"""Create order for authenticated tenant."""
order_id = uuid4()
order = repo.create_new(order_id)
order.place_order(customer_id, customer_name, total_amount)
await repo.save(order)
return {
"order_id": str(order_id),
"tenant_id": str(repo.tenant_id),
}
@app.get("/orders/{order_id}")
async def get_order(
order_id: UUID,
repo: TenantOrderRepository = Depends(get_order_repository),
):
"""Get order for authenticated tenant."""
order = await repo.load(order_id)
return {
"order_id": str(order.aggregate_id),
"customer_name": order.state.customer_name,
"total_amount": order.state.total_amount,
"status": order.state.status,
}
Security flow:
- Client sends
X-Tenant-IDheader (from auth token) - Middleware validates and sets in context
- Dependencies retrieve tenant from context
- Repository enforces tenant boundaries
- Context cleared after request completes
Security Best Practices¶
Multi-tenancy requires careful security implementation to prevent data leaks.
Never Trust Client Input¶
# WRONG - Client controls tenant_id
@app.post("/orders/unsafe")
async def create_order_unsafe(request: dict):
# Client can specify any tenant!
tenant_id = UUID(request.get("tenant_id")) # DANGEROUS!
repo = TenantOrderRepository(event_store, tenant_id)
# Client can now access any tenant's data
# CORRECT - Tenant from authentication
@app.post("/orders/safe")
async def create_order_safe(
customer_name: str,
total_amount: float,
tenant_id: UUID = Depends(get_tenant_id), # From auth token
):
# Tenant verified by authentication middleware
repo = TenantOrderRepository(event_store, tenant_id)
# Can only access authenticated tenant's data
Always Validate After Loading¶
async def get_order_safe(
order_id: UUID,
tenant_id: UUID,
) -> OrderAggregate:
"""Load order with tenant validation."""
# Load from event store
order = await base_repo.load(order_id)
# Verify tenant ownership
if order.tenant_id != tenant_id:
logger.warning(
"Tenant mismatch",
order_id=str(order_id),
expected_tenant=str(tenant_id),
actual_tenant=str(order.tenant_id),
)
raise PermissionError("Access denied")
return order
Log Security Events¶
import logging
logger = logging.getLogger(__name__)
async def load_with_audit(
order_id: UUID,
tenant_id: UUID,
) -> OrderAggregate:
"""Load order with security audit logging."""
try:
order = await repo.load(order_id)
# Log successful access
logger.info(
"Order loaded",
order_id=str(order_id),
tenant_id=str(tenant_id),
)
return order
except PermissionError:
# Log security violation
logger.warning(
"Cross-tenant access attempt blocked",
order_id=str(order_id),
requested_by_tenant=str(tenant_id),
)
raise
Defense in Depth¶
class SecureTenantRepository:
"""Repository with multiple security layers."""
def __init__(self, event_store: EventStore, tenant_id: UUID):
self._event_store = event_store
self._tenant_id = tenant_id
self._access_log = []
async def load(self, order_id: UUID) -> OrderAggregate:
"""Load with multi-layer security."""
# Layer 1: Check access log for suspicious patterns
if self._is_suspicious_access(order_id):
logger.warning("Suspicious access pattern detected")
raise PermissionError("Access denied")
# Layer 2: Load aggregate
order = await self._load_from_store(order_id)
# Layer 3: Verify tenant ownership
if order.tenant_id != self._tenant_id:
logger.error("Tenant mismatch after load")
raise PermissionError("Access denied")
# Layer 4: Log successful access
self._record_access(order_id)
return order
def _is_suspicious_access(self, order_id: UUID) -> bool:
"""Detect suspicious access patterns."""
# Example: Too many failed attempts
recent_failures = [
access for access in self._access_log[-10:]
if access["order_id"] == order_id and not access["success"]
]
return len(recent_failures) > 3
async def _load_from_store(self, order_id: UUID) -> OrderAggregate:
"""Load aggregate from event store."""
# Implementation details...
pass
def _record_access(self, order_id: UUID, success: bool = True):
"""Record access attempt for audit."""
self._access_log.append({
"order_id": order_id,
"tenant_id": self._tenant_id,
"timestamp": datetime.now(UTC),
"success": success,
})
Testing Multi-Tenant Applications¶
Comprehensive testing is essential to verify tenant isolation.
Basic Isolation Test¶
import pytest
from uuid import uuid4
@pytest.mark.asyncio
async def test_tenant_isolation():
"""Verify tenants cannot access each other's data."""
event_store = InMemoryEventStore()
# Create two tenants
tenant_acme = uuid4()
tenant_globex = uuid4()
repo_acme = TenantOrderRepository(event_store, tenant_acme)
repo_globex = TenantOrderRepository(event_store, tenant_globex)
# Tenant A creates order
order_id = uuid4()
order = repo_acme.create_new(order_id)
order.place_order(uuid4(), "Alice", 100.0)
await repo_acme.save(order)
# Tenant B cannot access Tenant A's order
with pytest.raises(PermissionError, match="different tenant"):
await repo_globex.load(order_id)
@pytest.mark.asyncio
async def test_cross_tenant_save_blocked():
"""Verify cannot save aggregate from different tenant."""
event_store = InMemoryEventStore()
tenant_acme = uuid4()
tenant_globex = uuid4()
repo_acme = TenantOrderRepository(event_store, tenant_acme)
repo_globex = TenantOrderRepository(event_store, tenant_globex)
# Create order with Tenant A
order = repo_acme.create_new(uuid4())
order.place_order(uuid4(), "Alice", 100.0)
await repo_acme.save(order)
# Load with Tenant A
loaded = await repo_acme.load(order.aggregate_id)
# Cannot save via Tenant B's repository
with pytest.raises(PermissionError, match="Cannot save order"):
await repo_globex.save(loaded)
Projection Isolation Test¶
@pytest.mark.asyncio
async def test_projection_tenant_filtering():
"""Verify projections correctly filter by tenant."""
event_store = InMemoryEventStore()
tenant_acme = uuid4()
tenant_globex = uuid4()
# Create orders for both tenants
repo_acme = TenantOrderRepository(event_store, tenant_acme)
repo_globex = TenantOrderRepository(event_store, tenant_globex)
for i in range(3):
order = repo_acme.create_new(uuid4())
order.place_order(uuid4(), f"Customer {i}", 100.0)
await repo_acme.save(order)
for i in range(2):
order = repo_globex.create_new(uuid4())
order.place_order(uuid4(), f"Customer {i}", 200.0)
await repo_globex.save(order)
# ACME projection should only see 3 orders
projection_acme = TenantOrderSummaryProjection(tenant_acme)
async for stored in event_store.read_all():
await projection_acme.handle(stored.event)
assert projection_acme.get_order_count() == 3
# Globex projection should only see 2 orders
projection_globex = TenantOrderSummaryProjection(tenant_globex)
async for stored in event_store.read_all():
await projection_globex.handle(stored.event)
assert projection_globex.get_order_count() == 2
Query Filtering Test¶
@pytest.mark.asyncio
async def test_event_store_tenant_filtering():
"""Verify event store filters events by tenant."""
event_store = InMemoryEventStore()
tenant_acme = uuid4()
tenant_globex = uuid4()
# Create events for both tenants
repo_acme = TenantOrderRepository(event_store, tenant_acme)
repo_globex = TenantOrderRepository(event_store, tenant_globex)
# 5 ACME orders
for _ in range(5):
order = repo_acme.create_new(uuid4())
order.place_order(uuid4(), "Customer", 100.0)
await repo_acme.save(order)
# 3 Globex orders
for _ in range(3):
order = repo_globex.create_new(uuid4())
order.place_order(uuid4(), "Customer", 100.0)
await repo_globex.save(order)
# Query by tenant
acme_events = await event_store.get_events_by_type(
"Order",
tenant_id=tenant_acme,
)
globex_events = await event_store.get_events_by_type(
"Order",
tenant_id=tenant_globex,
)
assert len(acme_events) == 5
assert len(globex_events) == 3
# All events have correct tenant
assert all(e.tenant_id == tenant_acme for e in acme_events)
assert all(e.tenant_id == tenant_globex for e in globex_events)
Complete Working Example¶
Here's a runnable example demonstrating all multi-tenancy concepts:
"""
Tutorial 16: Multi-Tenancy
Run with: python tutorial_16_multi_tenancy.py
"""
import asyncio
from uuid import UUID, uuid4
from pydantic import BaseModel
from eventsource import (
AggregateRepository,
AggregateRoot,
DomainEvent,
InMemoryEventStore,
register_event,
)
from eventsource.projections import DeclarativeProjection, handles
# =============================================================================
# Events
# =============================================================================
@register_event
class OrderPlaced(DomainEvent):
event_type: str = "OrderPlaced"
aggregate_type: str = "Order"
customer_id: UUID
customer_name: str
total_amount: float
@register_event
class OrderShipped(DomainEvent):
event_type: str = "OrderShipped"
aggregate_type: str = "Order"
tracking_number: str
# =============================================================================
# State
# =============================================================================
class OrderState(BaseModel):
order_id: UUID
tenant_id: UUID | None = None
customer_id: UUID | None = None
customer_name: str = ""
total_amount: float = 0.0
status: str = "draft"
tracking_number: str | None = None
# =============================================================================
# Aggregate
# =============================================================================
class OrderAggregate(AggregateRoot[OrderState]):
aggregate_type = "Order"
def __init__(self, aggregate_id: UUID, tenant_id: UUID | None = None):
super().__init__(aggregate_id)
self._tenant_id = tenant_id
@property
def tenant_id(self) -> UUID | None:
return self._tenant_id
def _get_initial_state(self) -> OrderState:
return OrderState(order_id=self.aggregate_id, tenant_id=self._tenant_id)
def _apply(self, event: DomainEvent) -> None:
if isinstance(event, OrderPlaced):
self._state = OrderState(
order_id=self.aggregate_id,
tenant_id=event.tenant_id,
customer_id=event.customer_id,
customer_name=event.customer_name,
total_amount=event.total_amount,
status="placed",
)
if event.tenant_id:
self._tenant_id = event.tenant_id
elif isinstance(event, OrderShipped):
if self._state:
self._state = self._state.model_copy(
update={
"status": "shipped",
"tracking_number": event.tracking_number,
}
)
def place_order(
self, customer_id: UUID, customer_name: str, total_amount: float
) -> None:
if self.version > 0:
raise ValueError("Order already placed")
if not self._tenant_id:
raise ValueError("Tenant context required")
self.apply_event(
OrderPlaced(
aggregate_id=self.aggregate_id,
tenant_id=self._tenant_id,
customer_id=customer_id,
customer_name=customer_name,
total_amount=total_amount,
aggregate_version=self.get_next_version(),
)
)
def ship(self, tracking_number: str) -> None:
if not self._state or self._state.status != "placed":
raise ValueError("Order must be placed before shipping")
self.apply_event(
OrderShipped(
aggregate_id=self.aggregate_id,
tenant_id=self._tenant_id,
tracking_number=tracking_number,
aggregate_version=self.get_next_version(),
)
)
# =============================================================================
# Repository
# =============================================================================
class TenantOrderRepository:
"""Repository with tenant boundary enforcement."""
def __init__(self, event_store: InMemoryEventStore, tenant_id: UUID):
self._tenant_id = tenant_id
self._inner_repo = AggregateRepository(
event_store=event_store,
aggregate_factory=lambda aid: OrderAggregate(aid, tenant_id),
aggregate_type="Order",
)
@property
def tenant_id(self) -> UUID:
return self._tenant_id
def create_new(self, order_id: UUID) -> OrderAggregate:
return OrderAggregate(order_id, self._tenant_id)
async def save(self, order: OrderAggregate) -> None:
if order.tenant_id != self._tenant_id:
raise PermissionError("Cannot save order from different tenant")
await self._inner_repo.save(order)
async def load(self, order_id: UUID) -> OrderAggregate:
order = await self._inner_repo.load(order_id)
if order.tenant_id != self._tenant_id:
raise PermissionError("Order belongs to different tenant")
return order
# =============================================================================
# Projections
# =============================================================================
class TenantOrderProjection(DeclarativeProjection):
"""Single-tenant projection."""
def __init__(self, tenant_id: UUID):
super().__init__()
self._target_tenant = tenant_id
self._orders: dict[UUID, dict] = {}
@handles(OrderPlaced)
async def _on_order_placed(self, event: OrderPlaced) -> None:
if event.tenant_id != self._target_tenant:
return
self._orders[event.aggregate_id] = {
"order_id": event.aggregate_id,
"customer_name": event.customer_name,
"total_amount": event.total_amount,
"status": "placed",
}
@handles(OrderShipped)
async def _on_order_shipped(self, event: OrderShipped) -> None:
if event.tenant_id != self._target_tenant:
return
if event.aggregate_id in self._orders:
self._orders[event.aggregate_id]["status"] = "shipped"
async def _truncate_read_models(self) -> None:
self._orders.clear()
def get_orders(self) -> list[dict]:
return list(self._orders.values())
class GlobalStatsProjection(DeclarativeProjection):
"""Cross-tenant projection for analytics."""
def __init__(self):
super().__init__()
self._stats: dict[UUID, dict] = {}
@handles(OrderPlaced)
async def _on_order_placed(self, event: OrderPlaced) -> None:
tenant_id = event.tenant_id or uuid4()
if tenant_id not in self._stats:
self._stats[tenant_id] = {
"tenant_id": tenant_id,
"order_count": 0,
"total_revenue": 0.0,
}
self._stats[tenant_id]["order_count"] += 1
self._stats[tenant_id]["total_revenue"] += event.total_amount
async def _truncate_read_models(self) -> None:
self._stats.clear()
def get_stats(self, tenant_id: UUID) -> dict | None:
return self._stats.get(tenant_id)
def get_all_stats(self) -> list[dict]:
return list(self._stats.values())
# =============================================================================
# Demo
# =============================================================================
async def main():
print("=" * 70)
print("Tutorial 16: Multi-Tenancy")
print("=" * 70)
# Setup
event_store = InMemoryEventStore()
# Create tenants
print("\n1. Creating tenant contexts")
print("-" * 70)
tenant_acme = uuid4()
tenant_globex = uuid4()
print(f" ACME Corp: {tenant_acme}")
print(f" Globex Inc: {tenant_globex}")
# Create tenant-scoped repositories
print("\n2. Creating tenant-scoped repositories")
print("-" * 70)
repo_acme = TenantOrderRepository(event_store, tenant_acme)
repo_globex = TenantOrderRepository(event_store, tenant_globex)
print(" - ACME repository created")
print(" - Globex repository created")
# Create orders for ACME
print("\n3. Creating orders for ACME Corp")
print("-" * 70)
acme_orders = []
for i in range(3):
order_id = uuid4()
order = repo_acme.create_new(order_id)
order.place_order(
customer_id=uuid4(),
customer_name=f"ACME Customer {i+1}",
total_amount=100.0 * (i + 1),
)
await repo_acme.save(order)
acme_orders.append(order_id)
print(f" Order {i+1}: ${order.state.total_amount:.2f}")
# Ship one ACME order
order = await repo_acme.load(acme_orders[0])
order.ship("ACME-TRACK-001")
await repo_acme.save(order)
print(f" Shipped order 1")
# Create orders for Globex
print("\n4. Creating orders for Globex Inc")
print("-" * 70)
globex_orders = []
for i in range(2):
order_id = uuid4()
order = repo_globex.create_new(order_id)
order.place_order(
customer_id=uuid4(),
customer_name=f"Globex Customer {i+1}",
total_amount=500.0 * (i + 1),
)
await repo_globex.save(order)
globex_orders.append(order_id)
print(f" Order {i+1}: ${order.state.total_amount:.2f}")
# Test tenant isolation
print("\n5. Testing tenant isolation")
print("-" * 70)
try:
# Try to load ACME order via Globex repository
await repo_globex.load(acme_orders[0])
print(" ERROR: Cross-tenant access allowed!")
except PermissionError as e:
print(f" SUCCESS: Cross-tenant access blocked")
print(f" Message: {e}")
# Query events by tenant
print("\n6. Querying events by tenant")
print("-" * 70)
acme_events = await event_store.get_events_by_type(
"Order", tenant_id=tenant_acme
)
globex_events = await event_store.get_events_by_type(
"Order", tenant_id=tenant_globex
)
print(f" ACME events: {len(acme_events)}")
print(f" Globex events: {len(globex_events)}")
# Build tenant-scoped projections
print("\n7. Building tenant-scoped projections")
print("-" * 70)
projection_acme = TenantOrderProjection(tenant_acme)
projection_globex = TenantOrderProjection(tenant_globex)
async for stored in event_store.read_all():
await projection_acme.handle(stored.event)
await projection_globex.handle(stored.event)
acme_orders_list = projection_acme.get_orders()
globex_orders_list = projection_globex.get_orders()
print(f" ACME projection: {len(acme_orders_list)} orders")
print(f" Globex projection: {len(globex_orders_list)} orders")
# Build cross-tenant projection
print("\n8. Building cross-tenant analytics")
print("-" * 70)
global_stats = GlobalStatsProjection()
async for stored in event_store.read_all():
await global_stats.handle(stored.event)
print("\n Tenant Statistics:")
for stats in global_stats.get_all_stats():
tenant_name = "ACME Corp" if stats["tenant_id"] == tenant_acme else "Globex Inc"
print(f" {tenant_name}:")
print(f" Orders: {stats['order_count']}")
print(f" Revenue: ${stats['total_revenue']:.2f}")
# Summary
print("\n9. Summary")
print("-" * 70)
print(f" Total tenants: 2")
print(f" Total orders: {len(acme_events) + len(globex_events)}")
print(f" Isolation verified: Yes")
print(f" Cross-tenant queries: Working")
print("\n" + "=" * 70)
print("Tutorial complete!")
print("=" * 70)
if __name__ == "__main__":
asyncio.run(main())
Expected output:
======================================================================
Tutorial 16: Multi-Tenancy
======================================================================
1. Creating tenant contexts
----------------------------------------------------------------------
ACME Corp: [UUID]
Globex Inc: [UUID]
2. Creating tenant-scoped repositories
----------------------------------------------------------------------
- ACME repository created
- Globex repository created
3. Creating orders for ACME Corp
----------------------------------------------------------------------
Order 1: $100.00
Order 2: $200.00
Order 3: $300.00
Shipped order 1
4. Creating orders for Globex Inc
----------------------------------------------------------------------
Order 1: $500.00
Order 2: $1000.00
5. Testing tenant isolation
----------------------------------------------------------------------
SUCCESS: Cross-tenant access blocked
Message: Order belongs to different tenant
6. Querying events by tenant
----------------------------------------------------------------------
ACME events: 4
Globex events: 2
7. Building tenant-scoped projections
----------------------------------------------------------------------
ACME projection: 3 orders
Globex projection: 2 orders
8. Building cross-tenant analytics
----------------------------------------------------------------------
Tenant Statistics:
ACME Corp:
Orders: 3
Revenue: $600.00
Globex Inc:
Orders: 2
Revenue: $1500.00
9. Summary
----------------------------------------------------------------------
Total tenants: 2
Total orders: 6
Isolation verified: Yes
Cross-tenant queries: Working
======================================================================
Tutorial complete!
======================================================================
Key Takeaways¶
- tenant_id is built-in: All events have optional
tenant_idfield, automatically indexed - Application-level isolation: Shared database with filtering, not separate databases
- Repository enforcement: Tenant-scoped repositories prevent cross-tenant access
- Event store filtering: All stores support efficient tenant filtering via
tenant_id - Projection flexibility: Build single-tenant or cross-tenant projections as needed
- Context propagation: Use
ContextVarfor clean tenant context flow - Security first: Never trust client input, always validate after loading
- Test thoroughly: Verify tenant isolation with comprehensive tests
- Defense in depth: Multiple validation layers prevent data leaks
- Production ready: Built-in support in all event stores and projections
Common Patterns¶
Admin Operations Across Tenants¶
async def admin_get_all_tenants(event_store: EventStore) -> set[UUID]:
"""Get list of all tenant IDs (admin only)."""
tenants = set()
async for stored in event_store.read_all():
if stored.event.tenant_id:
tenants.add(stored.event.tenant_id)
return tenants
Tenant Migration¶
async def migrate_tenant(
old_tenant_id: UUID,
new_tenant_id: UUID,
event_store: EventStore,
):
"""Migrate events from one tenant to another (admin only)."""
# Note: This is conceptual - actual implementation would need
# special migration tools since events are immutable
options = ReadOptions(tenant_id=old_tenant_id)
async for stored in event_store.read_all(options):
# Create new event with different tenant_id
# This requires careful handling and is usually done
# via data migration scripts, not runtime code
pass
Tenant-Specific Configuration¶
class TenantConfig(BaseModel):
tenant_id: UUID
max_orders_per_day: int
features_enabled: list[str]
class TenantOrderRepository:
def __init__(
self,
event_store: EventStore,
tenant_id: UUID,
config: TenantConfig,
):
self._tenant_id = tenant_id
self._config = config
# ... repository setup
async def save(self, order: OrderAggregate) -> None:
# Check tenant-specific limits
if await self._exceeds_daily_limit():
raise ValueError("Daily order limit exceeded")
await super().save(order)
Next Steps¶
Now that you understand multi-tenancy patterns, you're ready to learn about distributed event propagation.
Continue to Tutorial 17: Redis Event Bus to learn about:
- Distributing events across services with Redis Streams
- Implementing producer-consumer patterns with tenant context
- Scaling projections horizontally while maintaining tenant isolation
- Building real-time event-driven applications with multi-tenancy
For more examples, see:
- examples/multi_tenant/ - Complete multi-tenant application
- tests/integration/test_multi_tenancy.py - Comprehensive test suite
- docs/guides/multi-tenancy.md - Advanced multi-tenancy patterns