Skip to main content

Rust Observability

This guide covers implementing all four observability pillars in Rust applications, with examples for Axum and Tonic.

Logging

Use the tracing crate ecosystem for structured logging.

Dependencies

# Cargo.toml
[dependencies]
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["json", "env-filter"] }

Configuration

use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, EnvFilter};

fn init_logging() {
let env_filter = EnvFilter::try_from_default_env()
.unwrap_or_else(|_| EnvFilter::new("info"));

tracing_subscriber::registry()
.with(env_filter)
.with(tracing_subscriber::fmt::layer().json())
.init();
}

fn main() {
init_logging();
// Application code
}

Usage

use tracing::{info, error, warn, debug, instrument, Span};

#[instrument(skip(db), fields(order_id = %order.id))]
async fn process_order(order: Order, db: &Database) -> Result<(), Error> {
info!(customer_id = %order.customer_id, "Processing order");

match process_payment(&order).await {
Ok(payment) => {
info!(payment_id = %payment.id, "Payment successful");
Ok(())
}
Err(e) => {
error!(error = %e, "Payment failed");
Err(e)
}
}
}

// Add context to current span
fn validate_order(order: &Order) {
Span::current().record("item_count", order.items.len());
debug!("Validating order");
}

Axum Integration

use axum::{Router, middleware};
use tower_http::trace::TraceLayer;
use tracing::Level;

let app = Router::new()
.route("/api/orders", post(create_order))
.layer(
TraceLayer::new_for_http()
.make_span_with(|request: &Request<Body>| {
let request_id = request
.headers()
.get("x-request-id")
.and_then(|v| v.to_str().ok())
.unwrap_or("unknown");

tracing::span!(
Level::INFO,
"http_request",
method = %request.method(),
uri = %request.uri(),
request_id = %request_id,
)
})
);

Metrics

Use the prometheus crate for Prometheus metrics.

Dependencies

# Cargo.toml
[dependencies]
prometheus = "0.13"
lazy_static = "1.4"

Define Metrics

use prometheus::{
CounterVec, HistogramVec, IntGauge, opts,
register_counter_vec, register_histogram_vec, register_int_gauge,
};
use lazy_static::lazy_static;

lazy_static! {
pub static ref HTTP_REQUESTS_TOTAL: CounterVec = register_counter_vec!(
opts!("http_requests_total", "Total HTTP requests"),
&["method", "path", "status"]
).unwrap();

pub static ref HTTP_REQUEST_DURATION: HistogramVec = register_histogram_vec!(
"http_request_duration_seconds",
"HTTP request latency",
&["method", "path"],
vec![0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0]
).unwrap();

pub static ref ACTIVE_CONNECTIONS: IntGauge = register_int_gauge!(
"active_connections",
"Number of active connections"
).unwrap();
}

Metrics Endpoint

use axum::{Router, routing::get, response::IntoResponse};
use prometheus::{Encoder, TextEncoder};

async fn metrics_handler() -> impl IntoResponse {
let encoder = TextEncoder::new();
let metric_families = prometheus::gather();

let mut buffer = Vec::new();
encoder.encode(&metric_families, &mut buffer).unwrap();

(
[("content-type", "text/plain; version=0.0.4")],
buffer
)
}

let app = Router::new()
.route("/metrics", get(metrics_handler));

Recording Metrics

use std::time::Instant;
use axum::extract::MatchedPath;

async fn request_middleware<B>(
matched_path: Option<MatchedPath>,
request: Request<B>,
next: Next<B>,
) -> Response {
let start = Instant::now();
let method = request.method().to_string();
// Use matched route pattern to avoid high-cardinality metrics
let path = matched_path
.map(|p| p.as_str().to_string())
.unwrap_or_else(|| request.uri().path().to_string());

ACTIVE_CONNECTIONS.inc();
let response = next.run(request).await;
ACTIVE_CONNECTIONS.dec();

let duration = start.elapsed().as_secs_f64();
let status = response.status().as_u16().to_string();

HTTP_REQUESTS_TOTAL
.with_label_values(&[&method, &path, &status])
.inc();

HTTP_REQUEST_DURATION
.with_label_values(&[&method, &path])
.observe(duration);

response
}

Tracing

Use opentelemetry-rust for distributed tracing.

Dependencies

# Cargo.toml
[dependencies]
opentelemetry = "0.21"
opentelemetry-otlp = { version = "0.14", features = ["tonic"] }
opentelemetry_sdk = { version = "0.21", features = ["rt-tokio"] }
tracing-opentelemetry = "0.22"

Configuration

use opentelemetry::global;
use opentelemetry_otlp::WithExportConfig;
use opentelemetry_sdk::{runtime, trace as sdktrace, Resource};
use opentelemetry::KeyValue;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};

fn init_tracing(service_name: &str, otlp_endpoint: &str) -> Result<(), Box<dyn std::error::Error>> {
let tracer = opentelemetry_otlp::new_pipeline()
.tracing()
.with_exporter(
opentelemetry_otlp::new_exporter()
.tonic()
.with_endpoint(otlp_endpoint)
)
.with_trace_config(
sdktrace::config().with_resource(Resource::new(vec![
KeyValue::new("service.name", service_name.to_string()),
]))
)
.install_batch(runtime::Tokio)?;

let telemetry = tracing_opentelemetry::layer().with_tracer(tracer);

tracing_subscriber::registry()
.with(tracing_subscriber::EnvFilter::from_default_env())
.with(tracing_subscriber::fmt::layer().json())
.with(telemetry)
.init();

Ok(())
}

// Shutdown on exit
fn shutdown_tracing() {
global::shutdown_tracer_provider();
}

Usage with tracing

use tracing::{instrument, info_span, Instrument};

#[instrument(skip(db))]
async fn process_order(order_id: String, db: &Database) -> Result<Order, Error> {
let order = db.get_order(&order_id).await?;

validate_order(&order)
.instrument(info_span!("validate_order"))
.await?;

process_payment(&order)
.instrument(info_span!("process_payment", amount = %order.amount))
.await?;

Ok(order)
}

Health Checks

Implement health check endpoints for Kubernetes probes.

Axum Health Checks

use axum::{Router, routing::get, Json, response::IntoResponse, http::StatusCode};
use serde::Serialize;
use std::sync::OnceLock;
use std::time::Instant;

static START_TIME: OnceLock<Instant> = OnceLock::new();

// Call this at application startup (e.g., in main before starting the server)
fn init_start_time() {
START_TIME.get_or_init(Instant::now);
}

#[derive(Serialize)]
struct LivenessResponse {
status: &'static str,
}

#[derive(Serialize)]
struct ReadinessResponse {
status: &'static str,
checks: HealthChecks,
}

#[derive(Serialize)]
struct HealthChecks {
database: CheckResult,
}

#[derive(Serialize)]
struct CheckResult {
status: &'static str,
#[serde(skip_serializing_if = "Option::is_none")]
latency_ms: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
error: Option<String>,
}

async fn liveness() -> Json<LivenessResponse> {
Json(LivenessResponse { status: "ok" })
}

async fn readiness(State(db): State<Database>) -> impl IntoResponse {
let db_check = check_database(&db).await;

let all_healthy = db_check.status == "healthy";

let response = ReadinessResponse {
status: if all_healthy { "healthy" } else { "unhealthy" },
checks: HealthChecks {
database: db_check,
},
};

if all_healthy {
(StatusCode::OK, Json(response))
} else {
(StatusCode::SERVICE_UNAVAILABLE, Json(response))
}
}

async fn check_database(db: &Database) -> CheckResult {
let start = Instant::now();

match db.execute("SELECT 1").await {
Ok(_) => CheckResult {
status: "healthy",
latency_ms: Some(start.elapsed().as_millis() as u64),
error: None,
},
Err(e) => CheckResult {
status: "unhealthy",
latency_ms: None,
error: Some(e.to_string()),
},
}
}

async fn health(State(db): State<Database>) -> Json<serde_json::Value> {
let uptime = START_TIME.get()
.map(|s| s.elapsed().as_secs())
.unwrap_or(0);

let db_check = check_database(&db).await;

Json(serde_json::json!({
"status": if db_check.status == "healthy" { "healthy" } else { "unhealthy" },
"version": env!("CARGO_PKG_VERSION"),
"uptime_seconds": uptime,
"checks": {
"database": db_check
}
}))
}

let app = Router::new()
.route("/health", get(health))
.route("/health/live", get(liveness))
.route("/health/ready", get(readiness));

gRPC Health Checks (Tonic)

use tonic_health::server::health_reporter;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let (mut health_reporter, health_service) = health_reporter();

// Set service as serving
health_reporter
.set_serving::<MyServiceServer<MyService>>()
.await;

Server::builder()
.add_service(health_service)
.add_service(MyServiceServer::new(my_service))
.serve(addr)
.await?;

Ok(())
}

Complete Example

use axum::{Router, routing::get, middleware};
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
use std::net::SocketAddr;

mod metrics;
mod health;
mod tracing_setup;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Initialize observability
tracing_setup::init("my-service", "http://otel-collector:4317")?;

let app = Router::new()
// Health endpoints
.route("/health", get(health::health))
.route("/health/live", get(health::liveness))
.route("/health/ready", get(health::readiness))
// Metrics
.route("/metrics", get(metrics::handler))
// Application routes
.route("/api/orders", get(list_orders).post(create_order))
// Middleware
.layer(middleware::from_fn(metrics::request_middleware))
.layer(tower_http::trace::TraceLayer::new_for_http());

let addr = SocketAddr::from(([0, 0, 0, 0], 8080));
tracing::info!("Starting server on {}", addr);

axum::Server::bind(&addr)
.serve(app.into_make_service())
.await?;

tracing_setup::shutdown();
Ok(())
}