Python Observability
This guide covers implementing all four observability pillars in Python applications, with examples for FastAPI.
Logging
Use structlog for structured JSON logging with async support.
Installation
pip install structlog
Configuration
import structlog
import logging
import sys
def configure_logging():
"""Configure structlog for JSON output to stdout."""
structlog.configure(
processors=[
structlog.stdlib.filter_by_level,
structlog.stdlib.add_logger_name,
structlog.stdlib.add_log_level,
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.StackInfoRenderer(),
structlog.processors.format_exc_info,
structlog.processors.JSONRenderer()
],
wrapper_class=structlog.stdlib.BoundLogger,
context_class=dict,
logger_factory=structlog.stdlib.LoggerFactory(),
)
# Configure standard library logging
logging.basicConfig(
format="%(message)s",
stream=sys.stdout,
level=logging.INFO,
)
Usage
import structlog
logger = structlog.get_logger()
# Basic logging
logger.info("user_login", user_id="usr_123", email="user@example.com")
logger.error("database_error", error=str(e), query=sql)
# Bind context for request lifecycle
request_logger = logger.bind(request_id="req_abc123")
request_logger.info("processing_request", path="/api/users")
FastAPI Integration
from fastapi import FastAPI, Request
from starlette.middleware.base import BaseHTTPMiddleware
import structlog
import uuid
app = FastAPI()
logger = structlog.get_logger()
class LoggingMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next):
request_id = request.headers.get("x-request-id", str(uuid.uuid4()))
request_logger = logger.bind(
request_id=request_id,
method=request.method,
path=request.url.path
)
request.state.logger = request_logger
request_logger.info("request_started")
response = await call_next(request)
request_logger.info("request_completed", status_code=response.status_code)
return response
app.add_middleware(LoggingMiddleware)
Metrics
Use prometheus-client for Prometheus metrics.
Installation
pip install prometheus-client
Define Metrics
from prometheus_client import Counter, Histogram, Gauge, Info
# Counters
http_requests_total = Counter(
'http_requests_total',
'Total HTTP requests',
['method', 'path', 'status']
)
# Histograms
http_request_duration_seconds = Histogram(
'http_request_duration_seconds',
'HTTP request latency',
['method', 'path'],
buckets=[0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0]
)
# Gauges
active_connections = Gauge(
'active_connections',
'Number of active connections'
)
# Info
app_info = Info('app', 'Application information')
app_info.info({'version': '1.0.0', 'python_version': '3.11'})
FastAPI Integration
from fastapi import FastAPI, Request
from prometheus_client import generate_latest, CONTENT_TYPE_LATEST
from starlette.responses import Response
import time
app = FastAPI()
@app.middleware("http")
async def metrics_middleware(request: Request, call_next):
start_time = time.time()
response = await call_next(request)
# Use route path template to avoid high-cardinality metrics
route = request.scope.get("route")
path_template = getattr(route, "path", request.url.path) if route else request.url.path
duration = time.time() - start_time
http_request_duration_seconds.labels(
method=request.method,
path=path_template
).observe(duration)
http_requests_total.labels(
method=request.method,
path=path_template,
status=response.status_code
).inc()
return response
@app.get("/metrics")
async def metrics():
return Response(
content=generate_latest(),
media_type=CONTENT_TYPE_LATEST
)
Tracing
Use OpenTelemetry for distributed tracing.
Installation
pip install opentelemetry-api opentelemetry-sdk \
opentelemetry-instrumentation-fastapi \
opentelemetry-instrumentation-httpx \
opentelemetry-instrumentation-sqlalchemy \
opentelemetry-exporter-otlp
Configuration
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.resources import Resource, SERVICE_NAME
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
from opentelemetry.instrumentation.httpx import HTTPXClientInstrumentor
def configure_tracing(service_name: str, otlp_endpoint: str, app=None):
"""Configure OpenTelemetry tracing."""
resource = Resource.create({SERVICE_NAME: service_name})
provider = TracerProvider(resource=resource)
trace.set_tracer_provider(provider)
exporter = OTLPSpanExporter(endpoint=otlp_endpoint)
provider.add_span_processor(BatchSpanProcessor(exporter))
# Auto-instrument libraries
if app:
FastAPIInstrumentor.instrument_app(app)
HTTPXClientInstrumentor().instrument()
return trace.get_tracer(service_name)
# Usage:
# app = FastAPI()
# tracer = configure_tracing("my-service", "otel-collector:4317", app)
Manual Instrumentation
from opentelemetry import trace
from opentelemetry.trace import Status, StatusCode
tracer = trace.get_tracer(__name__)
async def process_order(order_id: str):
with tracer.start_as_current_span("process_order") as span:
span.set_attribute("order.id", order_id)
try:
with tracer.start_as_current_span("validate_order"):
await validate_order(order_id)
with tracer.start_as_current_span("process_payment"):
await process_payment(order_id)
span.set_status(Status(StatusCode.OK))
except Exception as e:
span.set_status(Status(StatusCode.ERROR, str(e)))
span.record_exception(e)
raise
Connecting Logs and Traces
from opentelemetry import trace
def get_trace_context() -> dict:
"""Get current trace context for logging."""
span = trace.get_current_span()
ctx = span.get_span_context()
if ctx.is_valid:
return {
"trace_id": format(ctx.trace_id, '032x'),
"span_id": format(ctx.span_id, '016x')
}
return {}
# Use with structlog
logger = structlog.get_logger()
logger.info("order_created", order_id="123", **get_trace_context())
Health Checks
Implement Kubernetes health check endpoints with FastAPI.
Basic Implementation
from fastapi import FastAPI
from fastapi.responses import JSONResponse
from datetime import datetime
app = FastAPI()
start_time = datetime.now()
@app.get("/health/live")
async def liveness():
"""Liveness probe - is the process running?"""
return {"status": "ok"}
@app.get("/health/ready")
async def readiness():
"""Readiness probe - can we handle traffic?"""
checks = {}
# Check database
try:
await database.execute("SELECT 1")
checks["database"] = {"status": "healthy"}
except Exception as e:
checks["database"] = {"status": "unhealthy", "error": str(e)}
return JSONResponse(
status_code=503,
content={"status": "unhealthy", "checks": checks}
)
# Check Redis (if required)
try:
await redis.ping()
checks["redis"] = {"status": "healthy"}
except Exception as e:
checks["redis"] = {"status": "unhealthy", "error": str(e)}
return JSONResponse(
status_code=503,
content={"status": "unhealthy", "checks": checks}
)
return {"status": "healthy", "checks": checks}
@app.get("/health")
async def health():
"""Comprehensive health check."""
uptime = datetime.now() - start_time
return {
"status": "healthy",
"version": "1.0.0",
"uptime_seconds": int(uptime.total_seconds()),
"checks": {
"database": await check_database(),
"redis": await check_redis(),
}
}
Health Check with Caching
For expensive health checks, cache results briefly:
from datetime import datetime, timedelta
class HealthCache:
def __init__(self, ttl_seconds: int = 5):
self.ttl = timedelta(seconds=ttl_seconds)
self.cache = {}
async def check(self, name: str, check_func):
now = datetime.now()
if name in self.cache:
result, expires = self.cache[name]
if now < expires:
return result
result = await check_func()
self.cache[name] = (result, now + self.ttl)
return result
health_cache = HealthCache(ttl_seconds=5)
@app.get("/health/ready")
async def readiness():
db_check = await health_cache.check("database", check_database)
if db_check["status"] != "healthy":
return JSONResponse(status_code=503, content=db_check)
return {"status": "healthy"}
Complete Example
from fastapi import FastAPI, Request
from starlette.middleware.base import BaseHTTPMiddleware
from prometheus_client import Counter, Histogram, generate_latest
from opentelemetry import trace
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
import structlog
import time
# Initialize
app = FastAPI()
configure_logging()
configure_tracing("my-service", "http://otel-collector:4317", app)
logger = structlog.get_logger()
tracer = trace.get_tracer(__name__)
# Metrics
http_requests = Counter('http_requests_total', 'Total requests', ['method', 'path', 'status'])
http_duration = Histogram('http_request_duration_seconds', 'Request latency', ['method', 'path'])
# Middleware
class ObservabilityMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next):
start = time.time()
response = await call_next(request)
# Use route path template to avoid high-cardinality metrics
route = request.scope.get("route")
path_template = getattr(route, "path", request.url.path) if route else request.url.path
duration = time.time() - start
http_requests.labels(request.method, path_template, response.status_code).inc()
http_duration.labels(request.method, path_template).observe(duration)
logger.info(
"request_completed",
method=request.method,
path=request.url.path,
status=response.status_code,
duration_ms=round(duration * 1000, 2),
**get_trace_context()
)
return response
app.add_middleware(ObservabilityMiddleware)
# Health endpoints
@app.get("/health/live")
async def liveness():
return {"status": "ok"}
@app.get("/health/ready")
async def readiness():
return {"status": "healthy"}
@app.get("/metrics")
async def metrics():
return Response(content=generate_latest(), media_type="text/plain")