Transactional Outbox
The transactional outbox pattern ensures data consistency across distributed systems by managing database transactions alongside message publishing. It leverages a single source of truth to store changes in an outbox table, ensuring reliability and atomicity. This pattern is particularly valuable for event-driven architectures, preventing data loss or duplication during system failures.
Example: Order Processing with Event Publishing
Consider an e-commerce order service that needs to ensure both order persistence and event publishing happen atomically:
Architecture Overview
Database Schema
-- Main business table
CREATE TABLE orders (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
customer_id UUID NOT NULL,
total_amount DECIMAL(10,2) NOT NULL,
status VARCHAR(50) NOT NULL DEFAULT 'pending',
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);
-- Outbox table for reliable event publishing
CREATE TABLE outbox_events (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
aggregate_id UUID NOT NULL,
aggregate_type VARCHAR(100) NOT NULL,
event_type VARCHAR(100) NOT NULL,
event_data JSONB NOT NULL,
event_version INTEGER NOT NULL DEFAULT 1,
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
processed_at TIMESTAMP WITH TIME ZONE NULL,
retry_count INTEGER DEFAULT 0,
max_retries INTEGER DEFAULT 3,
next_retry_at TIMESTAMP WITH TIME ZONE NULL
);
-- Index for efficient polling
CREATE INDEX idx_outbox_unprocessed
ON outbox_events (created_at)
WHERE processed_at IS NULL;
-- Index for cleanup operations
CREATE INDEX idx_outbox_processed
ON outbox_events (processed_at)
WHERE processed_at IS NOT NULL;
Service Implementation
// Event definitions
interface OrderEvent {
orderId: string;
customerId: string;
totalAmount: number;
items: OrderItem[];
timestamp: Date;
}
interface OrderCreatedEvent extends OrderEvent {
type: 'OrderCreated';
}
interface OrderUpdatedEvent extends OrderEvent {
type: 'OrderUpdated';
previousStatus: string;
newStatus: string;
}
// Outbox event structure
interface OutboxEvent {
id?: string;
aggregateId: string;
aggregateType: string;
eventType: string;
eventData: any;
eventVersion: number;
createdAt?: Date;
processedAt?: Date;
retryCount?: number;
maxRetries?: number;
nextRetryAt?: Date;
}
// Order service with transactional outbox
class OrderService {
constructor(
private db: DatabaseConnection,
private eventPublisher: EventPublisher
) {}
async createOrder(orderData: CreateOrderRequest): Promise<Order> {
return await this.db.transaction(async (trx) => {
// 1. Insert order data
const order = await trx.table('orders').insert({
customer_id: orderData.customerId,
total_amount: orderData.totalAmount,
status: 'pending'
}).returning('*');
// 2. Create event data
const event: OrderCreatedEvent = {
type: 'OrderCreated',
orderId: order.id,
customerId: order.customer_id,
totalAmount: order.total_amount,
items: orderData.items,
timestamp: new Date()
};
// 3. Insert event into outbox table (same transaction)
await this.insertOutboxEvent(trx, {
aggregateId: order.id,
aggregateType: 'Order',
eventType: 'OrderCreated',
eventData: event,
eventVersion: 1
});
return order;
});
}
async updateOrderStatus(orderId: string, newStatus: string): Promise<Order> {
return await this.db.transaction(async (trx) => {
// 1. Get current order
const currentOrder = await trx.table('orders')
.where('id', orderId)
.first();
if (!currentOrder) {
throw new Error('Order not found');
}
// 2. Update order status
const updatedOrder = await trx.table('orders')
.where('id', orderId)
.update({
status: newStatus,
updated_at: new Date()
})
.returning('*');
// 3. Create event for status change
const event: OrderUpdatedEvent = {
type: 'OrderUpdated',
orderId: updatedOrder.id,
customerId: updatedOrder.customer_id,
totalAmount: updatedOrder.total_amount,
items: [], // Would be fetched from order_items table
previousStatus: currentOrder.status,
newStatus: newStatus,
timestamp: new Date()
};
// 4. Insert event into outbox (same transaction)
await this.insertOutboxEvent(trx, {
aggregateId: orderId,
aggregateType: 'Order',
eventType: 'OrderUpdated',
eventData: event,
eventVersion: currentOrder.version + 1
});
return updatedOrder;
});
}
private async insertOutboxEvent(
trx: Transaction,
event: Omit<OutboxEvent, 'id' | 'createdAt'>
): Promise<void> {
await trx.table('outbox_events').insert({
aggregate_id: event.aggregateId,
aggregate_type: event.aggregateType,
event_type: event.eventType,
event_data: JSON.stringify(event.eventData),
event_version: event.eventVersion,
retry_count: 0,
max_retries: 3
});
}
}
Event Publisher Implementation
// Outbox event publisher
class OutboxEventPublisher {
private isRunning = false;
private pollInterval = 5000; // 5 seconds
constructor(
private db: DatabaseConnection,
private messageBus: MessageBus,
private logger: Logger
) {}
async start(): Promise<void> {
this.isRunning = true;
this.logger.info('Starting outbox event publisher');
// Start polling for unprocessed events
this.pollOutboxEvents();
}
async stop(): Promise<void> {
this.isRunning = false;
this.logger.info('Stopping outbox event publisher');
}
private async pollOutboxEvents(): Promise<void> {
while (this.isRunning) {
try {
await this.processUnpublishedEvents();
await this.sleep(this.pollInterval);
} catch (error) {
this.logger.error('Error polling outbox events:', error);
await this.sleep(this.pollInterval);
}
}
}
private async processUnpublishedEvents(): Promise<void> {
const batchSize = 100;
const events = await this.fetchUnprocessedEvents(batchSize);
if (events.length === 0) {
return;
}
this.logger.info(`Processing ${events.length} outbox events`);
for (const event of events) {
await this.processEvent(event);
}
}
private async fetchUnprocessedEvents(limit: number): Promise<OutboxEvent[]> {
return await this.db.table('outbox_events')
.where('processed_at', null)
.where(function() {
this.where('next_retry_at', null)
.orWhere('next_retry_at', '<=', new Date());
})
.orderBy('created_at', 'asc')
.limit(limit);
}
private async processEvent(event: OutboxEvent): Promise<void> {
try {
// Publish event to message bus
await this.messageBus.publish({
topic: `order.${event.eventType.toLowerCase()}`,
key: event.aggregateId,
value: event.eventData,
headers: {
eventType: event.eventType,
aggregateType: event.aggregateType,
eventVersion: event.eventVersion.toString(),
timestamp: event.createdAt?.toISOString()
}
});
// Mark as processed
await this.markAsProcessed(event.id!);
this.logger.debug(`Successfully published event ${event.id}`);
} catch (error) {
this.logger.error(`Failed to publish event ${event.id}:`, error);
await this.handlePublishFailure(event);
}
}
private async markAsProcessed(eventId: string): Promise<void> {
await this.db.table('outbox_events')
.where('id', eventId)
.update({
processed_at: new Date()
});
}
private async handlePublishFailure(event: OutboxEvent): Promise<void> {
const retryCount = (event.retryCount || 0) + 1;
const maxRetries = event.maxRetries || 3;
if (retryCount >= maxRetries) {
this.logger.error(`Event ${event.id} exceeded max retries, marking as failed`);
// Move to dead letter or mark as failed
await this.db.table('outbox_events')
.where('id', event.id)
.update({
retry_count: retryCount,
next_retry_at: null // Stop retrying
});
return;
}
// Calculate exponential backoff
const backoffMs = Math.pow(2, retryCount) * 1000; // 2^n seconds
const nextRetry = new Date(Date.now() + backoffMs);
await this.db.table('outbox_events')
.where('id', event.id)
.update({
retry_count: retryCount,
next_retry_at: nextRetry
});
this.logger.warn(`Scheduled retry ${retryCount}/${maxRetries} for event ${event.id} at ${nextRetry}`);
}
private sleep(ms: number): Promise<void> {
return new Promise(resolve => setTimeout(resolve, ms));
}
}
Advanced Features
// Event deduplication using idempotency keys
class IdempotentOutboxService extends OrderService {
async createOrderIdempotent(
orderData: CreateOrderRequest,
idempotencyKey: string
): Promise<Order> {
return await this.db.transaction(async (trx) => {
// Check if this operation was already performed
const existingEvent = await trx.table('outbox_events')
.where('event_data->idempotencyKey', idempotencyKey)
.first();
if (existingEvent) {
// Return existing order
return await trx.table('orders')
.where('id', existingEvent.aggregate_id)
.first();
}
// Proceed with normal creation, adding idempotency key
const order = await this.createOrder({
...orderData,
idempotencyKey
});
return order;
});
}
}
// Cleanup service for processed events
class OutboxCleanupService {
constructor(private db: DatabaseConnection) {}
async cleanupProcessedEvents(olderThanDays: number = 7): Promise<number> {
const cutoffDate = new Date();
cutoffDate.setDate(cutoffDate.getDate() - olderThanDays);
const deleted = await this.db.table('outbox_events')
.where('processed_at', '<', cutoffDate)
.del();
return deleted;
}
async archiveProcessedEvents(olderThanDays: number = 30): Promise<number> {
const cutoffDate = new Date();
cutoffDate.setDate(cutoffDate.getDate() - olderThanDays);
// Move to archive table
const events = await this.db.table('outbox_events')
.where('processed_at', '<', cutoffDate);
if (events.length > 0) {
await this.db.table('outbox_events_archive').insert(events);
await this.db.table('outbox_events')
.where('processed_at', '<', cutoffDate)
.del();
}
return events.length;
}
}
Monitoring and Observability
// Metrics for outbox pattern
class OutboxMetrics {
private unprocessedEventsGauge: Gauge;
private processingTimeHistogram: Histogram;
private failureCounter: Counter;
constructor(private metricsRegistry: MetricsRegistry) {
this.unprocessedEventsGauge = metricsRegistry.gauge(
'outbox_unprocessed_events_total',
'Number of unprocessed events in outbox'
);
this.processingTimeHistogram = metricsRegistry.histogram(
'outbox_event_processing_duration_seconds',
'Time taken to process outbox events'
);
this.failureCounter = metricsRegistry.counter(
'outbox_event_failures_total',
'Number of failed outbox event publications',
['event_type', 'failure_reason']
);
}
async collectMetrics(db: DatabaseConnection): Promise<void> {
// Count unprocessed events
const unprocessedCount = await db.table('outbox_events')
.where('processed_at', null)
.count('* as count')
.first();
this.unprocessedEventsGauge.set(unprocessedCount.count);
}
recordProcessingTime(durationMs: number): void {
this.processingTimeHistogram.observe(durationMs / 1000);
}
recordFailure(eventType: string, reason: string): void {
this.failureCounter.inc({ event_type: eventType, failure_reason: reason });
}
}
Benefits of Transactional Outbox
Atomicity: Business operations and event publishing are atomic Reliability: Events are never lost, even during system failures Consistency: Avoids dual-write problems in distributed systems Eventual Delivery: Events are eventually published with retry logic Ordering: Events can maintain order per aggregate Idempotency: Supports idempotent operations to prevent duplicates
When to Use Transactional Outbox
Good fit when:
- You need reliable event publishing in distributed systems
- Business operations must be atomic with event publishing
- System availability is more important than immediate consistency
- You're implementing event sourcing or CQRS patterns
- Network failures or service outages are common
Avoid when:
- Simple, single-service applications without external dependencies
- Immediate consistency is absolutely required
- Event ordering across different aggregates is critical
- The overhead of polling and cleanup is too expensive
- You already have reliable message queues with transactional guarantees
Implementation Guidelines
- Design for Idempotency: Ensure event handlers can process duplicate events safely
- Monitor Lag: Alert when unprocessed events accumulate beyond thresholds
- Handle Poison Messages: Implement dead letter queues for repeatedly failing events
- Cleanup Strategy: Regularly archive or delete processed events
- Batch Processing: Process events in batches for better performance
- Graceful Degradation: Handle publisher failures without affecting business operations