Skip to main content

Python Observability

This guide covers implementing all four observability pillars in Python applications, with examples for FastAPI.

Logging

Use structlog for structured JSON logging with async support.

Installation

pip install structlog

Configuration

import structlog
import logging
import sys

def configure_logging():
"""Configure structlog for JSON output to stdout."""
structlog.configure(
processors=[
structlog.stdlib.filter_by_level,
structlog.stdlib.add_logger_name,
structlog.stdlib.add_log_level,
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.StackInfoRenderer(),
structlog.processors.format_exc_info,
structlog.processors.JSONRenderer()
],
wrapper_class=structlog.stdlib.BoundLogger,
context_class=dict,
logger_factory=structlog.stdlib.LoggerFactory(),
)

# Configure standard library logging
logging.basicConfig(
format="%(message)s",
stream=sys.stdout,
level=logging.INFO,
)

Usage

import structlog

logger = structlog.get_logger()

# Basic logging
logger.info("user_login", user_id="usr_123", email="user@example.com")
logger.error("database_error", error=str(e), query=sql)

# Bind context for request lifecycle
request_logger = logger.bind(request_id="req_abc123")
request_logger.info("processing_request", path="/api/users")

FastAPI Integration

from fastapi import FastAPI, Request
from starlette.middleware.base import BaseHTTPMiddleware
import structlog
import uuid

app = FastAPI()
logger = structlog.get_logger()

class LoggingMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next):
request_id = request.headers.get("x-request-id", str(uuid.uuid4()))
request_logger = logger.bind(
request_id=request_id,
method=request.method,
path=request.url.path
)

request.state.logger = request_logger
request_logger.info("request_started")

response = await call_next(request)

request_logger.info("request_completed", status_code=response.status_code)
return response

app.add_middleware(LoggingMiddleware)

Metrics

Use prometheus-client for Prometheus metrics.

Installation

pip install prometheus-client

Define Metrics

from prometheus_client import Counter, Histogram, Gauge, Info

# Counters
http_requests_total = Counter(
'http_requests_total',
'Total HTTP requests',
['method', 'path', 'status']
)

# Histograms
http_request_duration_seconds = Histogram(
'http_request_duration_seconds',
'HTTP request latency',
['method', 'path'],
buckets=[0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0]
)

# Gauges
active_connections = Gauge(
'active_connections',
'Number of active connections'
)

# Info
app_info = Info('app', 'Application information')
app_info.info({'version': '1.0.0', 'python_version': '3.11'})

FastAPI Integration

from fastapi import FastAPI, Request
from prometheus_client import generate_latest, CONTENT_TYPE_LATEST
from starlette.responses import Response
import time

app = FastAPI()

@app.middleware("http")
async def metrics_middleware(request: Request, call_next):
start_time = time.time()

response = await call_next(request)

# Use route path template to avoid high-cardinality metrics
route = request.scope.get("route")
path_template = getattr(route, "path", request.url.path) if route else request.url.path

duration = time.time() - start_time
http_request_duration_seconds.labels(
method=request.method,
path=path_template
).observe(duration)

http_requests_total.labels(
method=request.method,
path=path_template,
status=response.status_code
).inc()

return response

@app.get("/metrics")
async def metrics():
return Response(
content=generate_latest(),
media_type=CONTENT_TYPE_LATEST
)

Tracing

Use OpenTelemetry for distributed tracing.

Installation

pip install opentelemetry-api opentelemetry-sdk \
opentelemetry-instrumentation-fastapi \
opentelemetry-instrumentation-httpx \
opentelemetry-instrumentation-sqlalchemy \
opentelemetry-exporter-otlp

Configuration

from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.resources import Resource, SERVICE_NAME
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
from opentelemetry.instrumentation.httpx import HTTPXClientInstrumentor

def configure_tracing(service_name: str, otlp_endpoint: str, app=None):
"""Configure OpenTelemetry tracing."""
resource = Resource.create({SERVICE_NAME: service_name})

provider = TracerProvider(resource=resource)
trace.set_tracer_provider(provider)

exporter = OTLPSpanExporter(endpoint=otlp_endpoint)
provider.add_span_processor(BatchSpanProcessor(exporter))

# Auto-instrument libraries
if app:
FastAPIInstrumentor.instrument_app(app)
HTTPXClientInstrumentor().instrument()

return trace.get_tracer(service_name)

# Usage:
# app = FastAPI()
# tracer = configure_tracing("my-service", "otel-collector:4317", app)

Manual Instrumentation

from opentelemetry import trace
from opentelemetry.trace import Status, StatusCode

tracer = trace.get_tracer(__name__)

async def process_order(order_id: str):
with tracer.start_as_current_span("process_order") as span:
span.set_attribute("order.id", order_id)

try:
with tracer.start_as_current_span("validate_order"):
await validate_order(order_id)

with tracer.start_as_current_span("process_payment"):
await process_payment(order_id)

span.set_status(Status(StatusCode.OK))
except Exception as e:
span.set_status(Status(StatusCode.ERROR, str(e)))
span.record_exception(e)
raise

Connecting Logs and Traces

from opentelemetry import trace

def get_trace_context() -> dict:
"""Get current trace context for logging."""
span = trace.get_current_span()
ctx = span.get_span_context()
if ctx.is_valid:
return {
"trace_id": format(ctx.trace_id, '032x'),
"span_id": format(ctx.span_id, '016x')
}
return {}

# Use with structlog
logger = structlog.get_logger()
logger.info("order_created", order_id="123", **get_trace_context())

Health Checks

Implement Kubernetes health check endpoints with FastAPI.

Basic Implementation

from fastapi import FastAPI
from fastapi.responses import JSONResponse
from datetime import datetime

app = FastAPI()
start_time = datetime.now()

@app.get("/health/live")
async def liveness():
"""Liveness probe - is the process running?"""
return {"status": "ok"}

@app.get("/health/ready")
async def readiness():
"""Readiness probe - can we handle traffic?"""
checks = {}

# Check database
try:
await database.execute("SELECT 1")
checks["database"] = {"status": "healthy"}
except Exception as e:
checks["database"] = {"status": "unhealthy", "error": str(e)}
return JSONResponse(
status_code=503,
content={"status": "unhealthy", "checks": checks}
)

# Check Redis (if required)
try:
await redis.ping()
checks["redis"] = {"status": "healthy"}
except Exception as e:
checks["redis"] = {"status": "unhealthy", "error": str(e)}
return JSONResponse(
status_code=503,
content={"status": "unhealthy", "checks": checks}
)

return {"status": "healthy", "checks": checks}

@app.get("/health")
async def health():
"""Comprehensive health check."""
uptime = datetime.now() - start_time
return {
"status": "healthy",
"version": "1.0.0",
"uptime_seconds": int(uptime.total_seconds()),
"checks": {
"database": await check_database(),
"redis": await check_redis(),
}
}

Health Check with Caching

For expensive health checks, cache results briefly:

from datetime import datetime, timedelta

class HealthCache:
def __init__(self, ttl_seconds: int = 5):
self.ttl = timedelta(seconds=ttl_seconds)
self.cache = {}

async def check(self, name: str, check_func):
now = datetime.now()
if name in self.cache:
result, expires = self.cache[name]
if now < expires:
return result

result = await check_func()
self.cache[name] = (result, now + self.ttl)
return result

health_cache = HealthCache(ttl_seconds=5)

@app.get("/health/ready")
async def readiness():
db_check = await health_cache.check("database", check_database)
if db_check["status"] != "healthy":
return JSONResponse(status_code=503, content=db_check)
return {"status": "healthy"}

Complete Example

from fastapi import FastAPI, Request
from starlette.middleware.base import BaseHTTPMiddleware
from prometheus_client import Counter, Histogram, generate_latest
from opentelemetry import trace
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
import structlog
import time

# Initialize
app = FastAPI()
configure_logging()
configure_tracing("my-service", "http://otel-collector:4317", app)

logger = structlog.get_logger()
tracer = trace.get_tracer(__name__)

# Metrics
http_requests = Counter('http_requests_total', 'Total requests', ['method', 'path', 'status'])
http_duration = Histogram('http_request_duration_seconds', 'Request latency', ['method', 'path'])

# Middleware
class ObservabilityMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next):
start = time.time()

response = await call_next(request)

# Use route path template to avoid high-cardinality metrics
route = request.scope.get("route")
path_template = getattr(route, "path", request.url.path) if route else request.url.path

duration = time.time() - start
http_requests.labels(request.method, path_template, response.status_code).inc()
http_duration.labels(request.method, path_template).observe(duration)

logger.info(
"request_completed",
method=request.method,
path=request.url.path,
status=response.status_code,
duration_ms=round(duration * 1000, 2),
**get_trace_context()
)

return response

app.add_middleware(ObservabilityMiddleware)

# Health endpoints
@app.get("/health/live")
async def liveness():
return {"status": "ok"}

@app.get("/health/ready")
async def readiness():
return {"status": "healthy"}

@app.get("/metrics")
async def metrics():
return Response(content=generate_latest(), media_type="text/plain")