Observability Guide¶
This guide covers observability features in eventsource-py: tracing, metrics, and logging.
Contents: - Overview - Quick Start - Traced Components - All traceable components and span names - Standard Attributes - Attribute constants for spans - Custom Component Tracing - Add tracing to custom components - Distributed Tracing - Cross-service trace propagation - Metrics - Kafka and subscription metrics - Logging - Structured logging configuration - Troubleshooting
Overview¶
eventsource-py provides three observability pillars:
| Pillar | Technology | Purpose |
|---|---|---|
| Tracing | OpenTelemetry | Distributed request tracking, latency analysis |
| Metrics | OpenTelemetry | Throughput, error rates, lag monitoring |
| Logging | Python logging |
Structured operational logs |
All observability features are optional and degrade gracefully when dependencies are not installed.
Quick Start¶
Installation¶
# Install with telemetry support
pip install eventsource-py[telemetry]
# Includes: opentelemetry-api, opentelemetry-sdk
Minimal Setup¶
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter
# Configure tracing (do this once at startup)
trace.set_tracer_provider(TracerProvider())
trace.get_tracer_provider().add_span_processor(
BatchSpanProcessor(ConsoleSpanExporter()) # Replace with your exporter
)
# All eventsource components have tracing enabled by default
from eventsource.stores import SQLiteEventStore
from eventsource.aggregates import AggregateRepository
store = SQLiteEventStore(":memory:") # Tracing ON by default
repo = AggregateRepository(
event_store=store,
aggregate_factory=OrderAggregate,
aggregate_type="Order",
)
Disable Tracing Per-Component¶
# Disable for high-frequency operations
store = SQLiteEventStore(":memory:", enable_tracing=False)
projection = MyProjection(connection, enable_tracing=False)
Traced Components¶
Event Stores¶
All event store implementations support tracing:
| Component | Span Names |
|---|---|
| SQLiteEventStore | sqlite_event_store.append_events, sqlite_event_store.get_events |
| PostgreSQLEventStore | postgresql_event_store.append_events, postgresql_event_store.get_events |
| InMemoryEventStore | in_memory_event_store.append_events, in_memory_event_store.get_events |
Event Buses¶
| Component | Span Names |
|---|---|
| InMemoryEventBus | eventsource.event_bus.dispatch, eventsource.event_bus.handle |
| RedisEventBus | eventsource.event_bus.publish, eventsource.event_bus.process, eventsource.event_bus.dispatch, eventsource.event_bus.handle |
| RabbitMQEventBus | eventsource.event_bus.publish, eventsource.event_bus.consume, eventsource.event_bus.dispatch |
| KafkaEventBus | eventsource.event_bus.publish {event_type}, eventsource.event_bus.consume {event_type}, eventsource.event_bus.dispatch {handler} |
Repositories¶
| Component | Span Names |
|---|---|
| AggregateRepository | eventsource.repository.load, eventsource.repository.save, eventsource.repository.exists, eventsource.repository.create_snapshot |
| CheckpointRepository | eventsource.checkpoint.get_checkpoint, eventsource.checkpoint.update_checkpoint, eventsource.checkpoint.reset_checkpoint, eventsource.checkpoint.get_all_checkpoints, eventsource.checkpoint.get_lag_metrics |
| DLQRepository | eventsource.dlq.add, eventsource.dlq.get, eventsource.dlq.get_by_id, eventsource.dlq.resolve, eventsource.dlq.retry, eventsource.dlq.get_stats, eventsource.dlq.get_projection_counts, eventsource.dlq.delete_resolved |
| OutboxRepository | eventsource.outbox.add, eventsource.outbox.get_pending, eventsource.outbox.mark_published, eventsource.outbox.increment_retry, eventsource.outbox.mark_failed, eventsource.outbox.cleanup, eventsource.outbox.get_stats |
Snapshot Stores¶
| Component | Span Names |
|---|---|
| InMemorySnapshotStore | eventsource.snapshot.save, eventsource.snapshot.get, eventsource.snapshot.delete, eventsource.snapshot.exists, eventsource.snapshot.delete_by_type, eventsource.snapshot.clear |
| PostgreSQLSnapshotStore | eventsource.snapshot.save, eventsource.snapshot.get, eventsource.snapshot.delete, eventsource.snapshot.exists, eventsource.snapshot.delete_by_type |
| SQLiteSnapshotStore | eventsource.snapshot.save, eventsource.snapshot.get, eventsource.snapshot.delete, eventsource.snapshot.exists, eventsource.snapshot.delete_by_type |
Projections¶
| Component | Span Names |
|---|---|
| Projection | eventsource.projection.handle |
| DeclarativeProjection | eventsource.projection.handle, eventsource.projection.handler |
| ProjectionCoordinator | eventsource.projection.coordinate, eventsource.projection.registry.dispatch |
Note: Projection tracing is OFF by default. Enable with enable_tracing=True.
Subscription Manager¶
| Component | Span Names |
|---|---|
| SubscriptionManager | eventsource.subscription_manager.subscribe, eventsource.subscription_manager.start_subscription, eventsource.subscription_manager.stop |
| TransitionCoordinator | eventsource.transition_coordinator.execute |
| CatchUpRunner | eventsource.catchup_runner.run_until_position, eventsource.catchup_runner.deliver_event |
| LiveRunner | eventsource.live_runner.start, eventsource.live_runner.process_event |
| PauseResumeController | eventsource.pause_resume.pause, eventsource.pause_resume.resume |
| SubscriptionLifecycleManager | eventsource.lifecycle.start, eventsource.lifecycle.stop |
Standard Attributes¶
All spans include standardized attributes from eventsource.observability.attributes. These follow OpenTelemetry semantic conventions where applicable.
Aggregate Attributes¶
| Constant | Attribute Name | Description | Example Value |
|---|---|---|---|
ATTR_AGGREGATE_ID |
eventsource.aggregate.id |
Unique identifier for the aggregate instance (UUID string) | "550e8400-e29b-41d4-a716-446655440000" |
ATTR_AGGREGATE_TYPE |
eventsource.aggregate.type |
Type name of the aggregate | "Order" |
Event Attributes¶
| Constant | Attribute Name | Description | Example Value |
|---|---|---|---|
ATTR_EVENT_ID |
eventsource.event.id |
Unique identifier for the event (UUID string) | "550e8400-e29b-41d4-a716-446655440001" |
ATTR_EVENT_TYPE |
eventsource.event.type |
Type name of the event | "OrderCreated" |
ATTR_EVENT_COUNT |
eventsource.event.count |
Number of events in an operation | 3 |
Version Attributes¶
| Constant | Attribute Name | Description | Example Value |
|---|---|---|---|
ATTR_VERSION |
eventsource.version |
Current version of an aggregate or stream | 5 |
ATTR_EXPECTED_VERSION |
eventsource.expected_version |
Expected version for optimistic concurrency | 4 |
ATTR_FROM_VERSION |
eventsource.from_version |
Starting version for event retrieval | 0 |
Tenant and Actor Attributes¶
| Constant | Attribute Name | Description | Example Value |
|---|---|---|---|
ATTR_TENANT_ID |
eventsource.tenant.id |
Tenant identifier for multi-tenant systems | "tenant-123" |
ATTR_ACTOR_ID |
eventsource.actor.id |
Actor/user identifier who initiated the action | "user@example.com" |
Component-Specific Attributes¶
| Constant | Attribute Name | Description | Example Value |
|---|---|---|---|
ATTR_PROJECTION_NAME |
eventsource.projection.name |
Name of the projection processing events | "OrderSummaryProjection" |
ATTR_HANDLER_NAME |
eventsource.handler.name |
Name of the event handler being invoked | "OrderCreatedHandler" |
ATTR_HANDLER_COUNT |
eventsource.handler.count |
Number of handlers registered for an event type | 2 |
ATTR_HANDLER_SUCCESS |
eventsource.handler.success |
Whether the handler executed successfully | true |
ATTR_STREAM_ID |
eventsource.stream.id |
Identifier for an event stream | "order-123:Order" |
ATTR_POSITION |
eventsource.position |
Position in the event stream | 42 |
Database Attributes (OpenTelemetry Semantic Conventions)¶
| Constant | Attribute Name | Description | Example Value |
|---|---|---|---|
ATTR_DB_SYSTEM |
db.system |
Database system identifier | "postgresql", "sqlite" |
ATTR_DB_NAME |
db.name |
Database name being accessed | "events_db" |
ATTR_DB_OPERATION |
db.operation |
Database operation type | "INSERT", "SELECT" |
Messaging Attributes (OpenTelemetry Semantic Conventions)¶
| Constant | Attribute Name | Description | Example Value |
|---|---|---|---|
ATTR_MESSAGING_SYSTEM |
messaging.system |
Messaging system identifier | "rabbitmq", "redis", "kafka" |
ATTR_MESSAGING_DESTINATION |
messaging.destination |
Destination queue or topic name | "events" |
ATTR_MESSAGING_OPERATION |
messaging.operation |
Messaging operation type | "publish", "receive" |
Subscription Attributes¶
| Constant | Attribute Name | Description | Example Value |
|---|---|---|---|
ATTR_SUBSCRIPTION_NAME |
eventsource.subscription.name |
Subscription name | "OrderProjection" |
ATTR_SUBSCRIPTION_STATE |
eventsource.subscription.state |
Current state | "live", "catching_up" |
ATTR_SUBSCRIPTION_PHASE |
eventsource.subscription.phase |
Transition phase | "initial_catchup" |
ATTR_FROM_POSITION |
eventsource.from_position |
Starting position | 0 |
ATTR_TO_POSITION |
eventsource.to_position |
Target position | 1000 |
ATTR_BATCH_SIZE |
eventsource.batch.size |
Events per batch | 100 |
ATTR_EVENTS_PROCESSED |
eventsource.events.processed |
Events processed count | 5000 |
Error and Retry Attributes¶
| Constant | Attribute Name | Description | Example Value |
|---|---|---|---|
ATTR_RETRY_COUNT |
eventsource.retry.count |
Retry attempts | 3 |
ATTR_ERROR_TYPE |
eventsource.error.type |
Exception class name | "OptimisticLockError" |
Custom Component Tracing¶
Add tracing to your own components using the composition-based Tracer API:
from eventsource.observability import create_tracer
from eventsource.observability.attributes import (
ATTR_AGGREGATE_ID,
ATTR_EVENT_TYPE,
)
class MyCustomStore:
def __init__(self, enable_tracing: bool = True):
# Initialize tracing via composition
self._tracer = create_tracer(__name__, enable_tracing)
self._enable_tracing = self._tracer.enabled
async def save(self, aggregate_id: str, data: dict) -> None:
# Create span with dynamic attributes
with self._tracer.span(
"my_store.save",
{
ATTR_AGGREGATE_ID: aggregate_id,
"data.size": len(data),
},
) as span:
# Your implementation
result = await self._do_save(aggregate_id, data)
# Add additional attributes after operation
if span:
span.set_attribute("result.success", True)
return result
Tracer API¶
| Method/Property | Description |
|---|---|
create_tracer(name, enable_tracing) |
Factory function to create appropriate tracer. |
tracer.span(name, attributes) |
Create a span context manager. |
tracer.enabled (property) |
Check if tracing is currently enabled and available. |
Using the @traced Decorator¶
For methods with only static attributes, use the @traced decorator:
from eventsource.observability import traced, create_tracer
class MyService:
def __init__(self, enable_tracing: bool = True):
self._tracer = create_tracer(__name__, enable_tracing)
self._enable_tracing = self._tracer.enabled
@traced("my_service.operation", attributes={"db.system": "sqlite"})
async def operation(self, item_id: str) -> None:
# Method body - automatically traced
pass
Distributed Tracing¶
RabbitMQ Context Propagation¶
RabbitMQEventBus supports distributed trace context propagation using W3C Trace Context format:
from eventsource.bus.rabbitmq import RabbitMQEventBus, RabbitMQEventBusConfig
# Publisher service
config = RabbitMQEventBusConfig(
rabbitmq_url="amqp://localhost:5672/",
enable_tracing=True,
)
bus = RabbitMQEventBus(config=config)
async with bus:
# Trace context is automatically injected into message headers
await bus.publish([order_created])
# Consumer service (different process/server)
config = RabbitMQEventBusConfig(
rabbitmq_url="amqp://localhost:5672/",
consumer_group="order-projections",
enable_tracing=True,
)
bus = RabbitMQEventBus(config=config)
async with bus:
bus.subscribe(OrderCreated, order_handler)
await bus.start_consuming()
# Consumer spans are children of publisher spans
# Full distributed trace is maintained
Kafka Context Propagation¶
KafkaEventBus also supports distributed tracing:
from eventsource.bus.kafka import KafkaEventBus, KafkaEventBusConfig
config = KafkaEventBusConfig(
bootstrap_servers="localhost:9092",
enable_tracing=True,
)
bus = KafkaEventBus(config=config)
Example Traces¶
Aggregate Load with Snapshot¶
[eventsource.repository.load] 45ms
eventsource.aggregate.id: "550e8400-..."
eventsource.aggregate.type: "Order"
snapshot.used: true
events.replayed: 15
eventsource.version: 115
[eventsource.snapshot.get] 5ms
db.system: postgresql
[postgresql_event_store.get_events] 35ms
db.system: postgresql
eventsource.from_version: 100
Event Publishing via RabbitMQ¶
[eventsource.repository.save] 80ms
eventsource.aggregate.id: "550e8400-..."
eventsource.event.count: 2
eventsource.version: 117
[sqlite_event_store.append_events] 40ms
db.system: sqlite
eventsource.event.count: 2
[eventsource.event_bus.publish] 35ms
messaging.system: rabbitmq
messaging.destination: events
eventsource.event.type: OrderCreated
Distributed Trace Across Services¶
Service A (Publisher):
[eventsource.event_bus.publish] 25ms
messaging.system: rabbitmq
eventsource.event.type: OrderCreated
eventsource.aggregate.id: "order-123"
Service B (Consumer):
[eventsource.event_bus.consume] 150ms (child of publish span)
eventsource.event.type: OrderCreated
[eventsource.event_bus.dispatch] 145ms
eventsource.handler.count: 2
[eventsource.event_bus.handle] 70ms
eventsource.handler.name: OrderProjection
eventsource.handler.success: true
[eventsource.event_bus.handle] 72ms
eventsource.handler.name: NotificationService
eventsource.handler.success: true
Example Configurations¶
Jaeger¶
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.jaeger.thrift import JaegerExporter
provider = TracerProvider()
jaeger_exporter = JaegerExporter(
agent_host_name="localhost",
agent_port=6831,
)
provider.add_span_processor(BatchSpanProcessor(jaeger_exporter))
trace.set_tracer_provider(provider)
Zipkin¶
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.zipkin.json import ZipkinExporter
provider = TracerProvider()
zipkin_exporter = ZipkinExporter(endpoint="http://localhost:9411/api/v2/spans")
provider.add_span_processor(BatchSpanProcessor(zipkin_exporter))
trace.set_tracer_provider(provider)
OTLP (Grafana Tempo, Honeycomb, etc.)¶
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
provider = TracerProvider()
# For Grafana Tempo
otlp_exporter = OTLPSpanExporter(endpoint="http://tempo:4317", insecure=True)
# For Honeycomb
# otlp_exporter = OTLPSpanExporter(
# endpoint="https://api.honeycomb.io:443",
# headers={"x-honeycomb-team": "YOUR_API_KEY"},
# )
provider.add_span_processor(BatchSpanProcessor(otlp_exporter))
trace.set_tracer_provider(provider)
Console Exporter (Development)¶
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import SimpleSpanProcessor, ConsoleSpanExporter
provider = TracerProvider()
provider.add_span_processor(SimpleSpanProcessor(ConsoleSpanExporter()))
trace.set_tracer_provider(provider)
Best Practices¶
1. Enable Tracing Selectively¶
import os
# Enable full tracing in development
ENABLE_TRACING = os.getenv("ENVIRONMENT") in ("development", "staging")
# Or use sampling in production
from opentelemetry.sdk.trace.sampling import TraceIdRatioBased
provider = TracerProvider(
sampler=TraceIdRatioBased(0.1) # Sample 10% of traces
)
2. Use Standard Attributes¶
Import and use attribute constants for consistency:
from eventsource.observability.attributes import (
ATTR_AGGREGATE_ID,
ATTR_EVENT_TYPE,
ATTR_DB_SYSTEM,
)
# Use constants instead of string literals
span.set_attribute(ATTR_AGGREGATE_ID, str(aggregate_id))
span.set_attribute(ATTR_EVENT_TYPE, event.__class__.__name__)
3. Graceful Degradation¶
eventsource gracefully handles missing OpenTelemetry:
# Works even without OpenTelemetry installed
store = SQLiteEventStore(":memory:", enable_tracing=True)
# Tracing becomes a no-op when OTEL is not available
Check availability programmatically:
from eventsource.observability import OTEL_AVAILABLE
if OTEL_AVAILABLE:
print("OpenTelemetry is available")
else:
print("OpenTelemetry not installed, tracing disabled")
4. Disable High-Frequency Tracing in Production¶
Projection handlers can be invoked very frequently. Consider disabling tracing for projections in production unless debugging:
# High-frequency projection - disable tracing in production
class OrderSummaryProjection(Projection):
def __init__(self, connection):
super().__init__(
connection=connection,
enable_tracing=os.getenv("ENVIRONMENT") != "production",
)
5. Use BatchSpanProcessor¶
Always use BatchSpanProcessor instead of SimpleSpanProcessor in production to avoid blocking:
# Good - batches spans and exports asynchronously
from opentelemetry.sdk.trace.export import BatchSpanProcessor
provider.add_span_processor(BatchSpanProcessor(exporter))
# Bad - blocks on every span (only use for development)
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
provider.add_span_processor(SimpleSpanProcessor(exporter))
Troubleshooting¶
No Spans Appearing¶
-
Check OpenTelemetry is installed:
-
Verify
enable_tracing=Trueis set on components: -
Confirm exporter is configured correctly:
-
Check
OTEL_AVAILABLEfrom observability module:
High Overhead¶
-
Disable projection tracing (highest frequency):
-
Enable sampling in your OpenTelemetry configuration:
-
Use batch span processor, not simple processor:
Missing Distributed Context¶
-
Ensure both publisher and consumer have tracing enabled:
-
Check propagation packages are installed:
-
Verify both services are configured with the same trace provider backend
Metrics¶
eventsource-py emits OpenTelemetry metrics for monitoring throughput, latency, and system health.
Kafka Event Bus Metrics¶
The Kafka event bus provides comprehensive metrics. See Kafka Metrics Guide for details.
from eventsource.bus.kafka import KafkaEventBus, KafkaEventBusConfig
config = KafkaEventBusConfig(
bootstrap_servers="localhost:9092",
enable_metrics=True, # Enabled by default
enable_tracing=True, # Enabled by default
)
bus = KafkaEventBus(config=config)
Key Kafka Metrics:
| Metric | Type | Description |
|---|---|---|
kafka.eventbus.messages.published |
Counter | Messages published |
kafka.eventbus.messages.consumed |
Counter | Messages consumed |
kafka.eventbus.handler.duration |
Histogram | Handler execution time (ms) |
kafka.eventbus.consumer.lag |
Gauge | Consumer lag per partition |
Subscription Manager Metrics¶
The subscription manager emits metrics for event processing:
from eventsource.subscriptions import SubscriptionManager, get_metrics
# Metrics are emitted automatically
manager = SubscriptionManager(
event_store=store,
event_bus=bus,
checkpoint_repo=checkpoint_repo,
)
# Access metrics programmatically
metrics = get_metrics("OrderProjection")
snapshot = metrics.get_snapshot()
print(f"Processed: {snapshot.events_processed}")
print(f"Lag: {snapshot.current_lag}")
Subscription Metrics:
| Metric | Type | Description |
|---|---|---|
subscription.events.processed |
Counter | Events processed successfully |
subscription.events.failed |
Counter | Events that failed processing |
subscription.processing.duration |
Histogram | Processing time (ms) |
subscription.lag |
Gauge | Current event lag |
subscription.state |
Gauge | Current state (numeric) |
State Values:
| Value | State | Description |
|---|---|---|
| 1 | starting |
Subscription initializing |
| 2 | catching_up |
Processing historical events |
| 3 | live |
Processing real-time events |
| 4 | paused |
Temporarily paused |
| 5 | stopped |
Stopped |
| 6 | error |
Error state |
Configure Metrics Export¶
from opentelemetry import metrics
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader
from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter
# OTLP export to collector
reader = PeriodicExportingMetricReader(
OTLPMetricExporter(endpoint="http://otel-collector:4317"),
export_interval_millis=60000,
)
metrics.set_meter_provider(MeterProvider(metric_readers=[reader]))
# Prometheus export
from opentelemetry.exporter.prometheus import PrometheusMetricReader
from prometheus_client import start_http_server
start_http_server(port=8000)
reader = PrometheusMetricReader()
metrics.set_meter_provider(MeterProvider(metric_readers=[reader]))
Logging¶
eventsource-py uses Python's standard logging module. All components log to named loggers under the eventsource namespace.
Configure Logging¶
import logging
# Basic configuration
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)
# Adjust eventsource log level
logging.getLogger("eventsource").setLevel(logging.DEBUG)
# Fine-tune specific components
logging.getLogger("eventsource.stores.sqlite").setLevel(logging.WARNING)
logging.getLogger("eventsource.bus.kafka").setLevel(logging.DEBUG)
Logger Hierarchy¶
| Logger Name | Component |
|---|---|
eventsource.stores.sqlite |
SQLite event store |
eventsource.stores.postgresql |
PostgreSQL event store |
eventsource.bus.kafka |
Kafka event bus |
eventsource.bus.rabbitmq |
RabbitMQ event bus |
eventsource.bus.redis |
Redis event bus |
eventsource.aggregates.repository |
Aggregate repository |
eventsource.subscriptions.manager |
Subscription manager |
eventsource.projections |
Projection system |
Structured Logging with JSON¶
For production environments, use structured JSON logging:
import logging
import json
from datetime import datetime, UTC
class JSONFormatter(logging.Formatter):
def format(self, record):
log_obj = {
"timestamp": datetime.now(UTC).isoformat(),
"level": record.levelname,
"logger": record.name,
"message": record.getMessage(),
}
if record.exc_info:
log_obj["exception"] = self.formatException(record.exc_info)
return json.dumps(log_obj)
handler = logging.StreamHandler()
handler.setFormatter(JSONFormatter())
logging.getLogger("eventsource").addHandler(handler)
Correlating Logs with Traces¶
Inject trace context into log records:
from opentelemetry import trace
class TraceContextFilter(logging.Filter):
def filter(self, record):
span = trace.get_current_span()
ctx = span.get_span_context()
record.trace_id = format(ctx.trace_id, "032x") if ctx.is_valid else ""
record.span_id = format(ctx.span_id, "016x") if ctx.is_valid else ""
return True
handler = logging.StreamHandler()
handler.addFilter(TraceContextFilter())
handler.setFormatter(logging.Formatter(
"%(asctime)s [%(levelname)s] trace_id=%(trace_id)s span_id=%(span_id)s - %(message)s"
))
logging.getLogger("eventsource").addHandler(handler)
Related Resources¶
eventsource-py guides: - Kafka Metrics Guide - Detailed Kafka metrics, PromQL queries, alerting - Subscriptions Guide - Subscription manager with metrics section - Production Guide - Production deployment recommendations
OpenTelemetry: - OpenTelemetry Python - Semantic Conventions
Observability backends: - Jaeger - Zipkin - Grafana Tempo - Honeycomb