Rust Observability
This guide covers implementing all four observability pillars in Rust applications, with examples for Axum and Tonic.
Logging
Use the tracing crate ecosystem for structured logging.
Dependencies
# Cargo.toml
[dependencies]
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["json", "env-filter"] }
Configuration
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, EnvFilter};
fn init_logging() {
let env_filter = EnvFilter::try_from_default_env()
.unwrap_or_else(|_| EnvFilter::new("info"));
tracing_subscriber::registry()
.with(env_filter)
.with(tracing_subscriber::fmt::layer().json())
.init();
}
fn main() {
init_logging();
// Application code
}
Usage
use tracing::{info, error, warn, debug, instrument, Span};
#[instrument(skip(db), fields(order_id = %order.id))]
async fn process_order(order: Order, db: &Database) -> Result<(), Error> {
info!(customer_id = %order.customer_id, "Processing order");
match process_payment(&order).await {
Ok(payment) => {
info!(payment_id = %payment.id, "Payment successful");
Ok(())
}
Err(e) => {
error!(error = %e, "Payment failed");
Err(e)
}
}
}
// Add context to current span
fn validate_order(order: &Order) {
Span::current().record("item_count", order.items.len());
debug!("Validating order");
}
Axum Integration
use axum::{Router, middleware};
use tower_http::trace::TraceLayer;
use tracing::Level;
let app = Router::new()
.route("/api/orders", post(create_order))
.layer(
TraceLayer::new_for_http()
.make_span_with(|request: &Request<Body>| {
let request_id = request
.headers()
.get("x-request-id")
.and_then(|v| v.to_str().ok())
.unwrap_or("unknown");
tracing::span!(
Level::INFO,
"http_request",
method = %request.method(),
uri = %request.uri(),
request_id = %request_id,
)
})
);
Metrics
Use the prometheus crate for Prometheus metrics.
Dependencies
# Cargo.toml
[dependencies]
prometheus = "0.13"
lazy_static = "1.4"
Define Metrics
use prometheus::{
CounterVec, HistogramVec, IntGauge, opts,
register_counter_vec, register_histogram_vec, register_int_gauge,
};
use lazy_static::lazy_static;
lazy_static! {
pub static ref HTTP_REQUESTS_TOTAL: CounterVec = register_counter_vec!(
opts!("http_requests_total", "Total HTTP requests"),
&["method", "path", "status"]
).unwrap();
pub static ref HTTP_REQUEST_DURATION: HistogramVec = register_histogram_vec!(
"http_request_duration_seconds",
"HTTP request latency",
&["method", "path"],
vec![0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0]
).unwrap();
pub static ref ACTIVE_CONNECTIONS: IntGauge = register_int_gauge!(
"active_connections",
"Number of active connections"
).unwrap();
}
Metrics Endpoint
use axum::{Router, routing::get, response::IntoResponse};
use prometheus::{Encoder, TextEncoder};
async fn metrics_handler() -> impl IntoResponse {
let encoder = TextEncoder::new();
let metric_families = prometheus::gather();
let mut buffer = Vec::new();
encoder.encode(&metric_families, &mut buffer).unwrap();
(
[("content-type", "text/plain; version=0.0.4")],
buffer
)
}
let app = Router::new()
.route("/metrics", get(metrics_handler));
Recording Metrics
use std::time::Instant;
use axum::extract::MatchedPath;
async fn request_middleware<B>(
matched_path: Option<MatchedPath>,
request: Request<B>,
next: Next<B>,
) -> Response {
let start = Instant::now();
let method = request.method().to_string();
// Use matched route pattern to avoid high-cardinality metrics
let path = matched_path
.map(|p| p.as_str().to_string())
.unwrap_or_else(|| request.uri().path().to_string());
ACTIVE_CONNECTIONS.inc();
let response = next.run(request).await;
ACTIVE_CONNECTIONS.dec();
let duration = start.elapsed().as_secs_f64();
let status = response.status().as_u16().to_string();
HTTP_REQUESTS_TOTAL
.with_label_values(&[&method, &path, &status])
.inc();
HTTP_REQUEST_DURATION
.with_label_values(&[&method, &path])
.observe(duration);
response
}
Tracing
Use opentelemetry-rust for distributed tracing.
Dependencies
# Cargo.toml
[dependencies]
opentelemetry = "0.21"
opentelemetry-otlp = { version = "0.14", features = ["tonic"] }
opentelemetry_sdk = { version = "0.21", features = ["rt-tokio"] }
tracing-opentelemetry = "0.22"
Configuration
use opentelemetry::global;
use opentelemetry_otlp::WithExportConfig;
use opentelemetry_sdk::{runtime, trace as sdktrace, Resource};
use opentelemetry::KeyValue;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
fn init_tracing(service_name: &str, otlp_endpoint: &str) -> Result<(), Box<dyn std::error::Error>> {
let tracer = opentelemetry_otlp::new_pipeline()
.tracing()
.with_exporter(
opentelemetry_otlp::new_exporter()
.tonic()
.with_endpoint(otlp_endpoint)
)
.with_trace_config(
sdktrace::config().with_resource(Resource::new(vec![
KeyValue::new("service.name", service_name.to_string()),
]))
)
.install_batch(runtime::Tokio)?;
let telemetry = tracing_opentelemetry::layer().with_tracer(tracer);
tracing_subscriber::registry()
.with(tracing_subscriber::EnvFilter::from_default_env())
.with(tracing_subscriber::fmt::layer().json())
.with(telemetry)
.init();
Ok(())
}
// Shutdown on exit
fn shutdown_tracing() {
global::shutdown_tracer_provider();
}
Usage with tracing
use tracing::{instrument, info_span, Instrument};
#[instrument(skip(db))]
async fn process_order(order_id: String, db: &Database) -> Result<Order, Error> {
let order = db.get_order(&order_id).await?;
validate_order(&order)
.instrument(info_span!("validate_order"))
.await?;
process_payment(&order)
.instrument(info_span!("process_payment", amount = %order.amount))
.await?;
Ok(order)
}
Health Checks
Implement health check endpoints for Kubernetes probes.
Axum Health Checks
use axum::{Router, routing::get, Json, response::IntoResponse, http::StatusCode};
use serde::Serialize;
use std::sync::OnceLock;
use std::time::Instant;
static START_TIME: OnceLock<Instant> = OnceLock::new();
// Call this at application startup (e.g., in main before starting the server)
fn init_start_time() {
START_TIME.get_or_init(Instant::now);
}
#[derive(Serialize)]
struct LivenessResponse {
status: &'static str,
}
#[derive(Serialize)]
struct ReadinessResponse {
status: &'static str,
checks: HealthChecks,
}
#[derive(Serialize)]
struct HealthChecks {
database: CheckResult,
}
#[derive(Serialize)]
struct CheckResult {
status: &'static str,
#[serde(skip_serializing_if = "Option::is_none")]
latency_ms: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
error: Option<String>,
}
async fn liveness() -> Json<LivenessResponse> {
Json(LivenessResponse { status: "ok" })
}
async fn readiness(State(db): State<Database>) -> impl IntoResponse {
let db_check = check_database(&db).await;
let all_healthy = db_check.status == "healthy";
let response = ReadinessResponse {
status: if all_healthy { "healthy" } else { "unhealthy" },
checks: HealthChecks {
database: db_check,
},
};
if all_healthy {
(StatusCode::OK, Json(response))
} else {
(StatusCode::SERVICE_UNAVAILABLE, Json(response))
}
}
async fn check_database(db: &Database) -> CheckResult {
let start = Instant::now();
match db.execute("SELECT 1").await {
Ok(_) => CheckResult {
status: "healthy",
latency_ms: Some(start.elapsed().as_millis() as u64),
error: None,
},
Err(e) => CheckResult {
status: "unhealthy",
latency_ms: None,
error: Some(e.to_string()),
},
}
}
async fn health(State(db): State<Database>) -> Json<serde_json::Value> {
let uptime = START_TIME.get()
.map(|s| s.elapsed().as_secs())
.unwrap_or(0);
let db_check = check_database(&db).await;
Json(serde_json::json!({
"status": if db_check.status == "healthy" { "healthy" } else { "unhealthy" },
"version": env!("CARGO_PKG_VERSION"),
"uptime_seconds": uptime,
"checks": {
"database": db_check
}
}))
}
let app = Router::new()
.route("/health", get(health))
.route("/health/live", get(liveness))
.route("/health/ready", get(readiness));
gRPC Health Checks (Tonic)
use tonic_health::server::health_reporter;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let (mut health_reporter, health_service) = health_reporter();
// Set service as serving
health_reporter
.set_serving::<MyServiceServer<MyService>>()
.await;
Server::builder()
.add_service(health_service)
.add_service(MyServiceServer::new(my_service))
.serve(addr)
.await?;
Ok(())
}
Complete Example
use axum::{Router, routing::get, middleware};
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
use std::net::SocketAddr;
mod metrics;
mod health;
mod tracing_setup;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Initialize observability
tracing_setup::init("my-service", "http://otel-collector:4317")?;
let app = Router::new()
// Health endpoints
.route("/health", get(health::health))
.route("/health/live", get(health::liveness))
.route("/health/ready", get(health::readiness))
// Metrics
.route("/metrics", get(metrics::handler))
// Application routes
.route("/api/orders", get(list_orders).post(create_order))
// Middleware
.layer(middleware::from_fn(metrics::request_middleware))
.layer(tower_http::trace::TraceLayer::new_for_http());
let addr = SocketAddr::from(([0, 0, 0, 0], 8080));
tracing::info!("Starting server on {}", addr);
axum::Server::bind(&addr)
.serve(app.into_make_service())
.await?;
tracing_setup::shutdown();
Ok(())
}