Skip to main content

Transactional Outbox

The transactional outbox pattern ensures data consistency across distributed systems by managing database transactions alongside message publishing. It leverages a single source of truth to store changes in an outbox table, ensuring reliability and atomicity. This pattern is particularly valuable for event-driven architectures, preventing data loss or duplication during system failures.

Example: Order Processing with Event Publishing

Consider an e-commerce order service that needs to ensure both order persistence and event publishing happen atomically:

Architecture Overview

Database Schema

-- Main business table
CREATE TABLE orders (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
customer_id UUID NOT NULL,
total_amount DECIMAL(10,2) NOT NULL,
status VARCHAR(50) NOT NULL DEFAULT 'pending',
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);

-- Outbox table for reliable event publishing
CREATE TABLE outbox_events (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
aggregate_id UUID NOT NULL,
aggregate_type VARCHAR(100) NOT NULL,
event_type VARCHAR(100) NOT NULL,
event_data JSONB NOT NULL,
event_version INTEGER NOT NULL DEFAULT 1,
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
processed_at TIMESTAMP WITH TIME ZONE NULL,
retry_count INTEGER DEFAULT 0,
max_retries INTEGER DEFAULT 3,
next_retry_at TIMESTAMP WITH TIME ZONE NULL
);

-- Index for efficient polling
CREATE INDEX idx_outbox_unprocessed
ON outbox_events (created_at)
WHERE processed_at IS NULL;

-- Index for cleanup operations
CREATE INDEX idx_outbox_processed
ON outbox_events (processed_at)
WHERE processed_at IS NOT NULL;

Service Implementation

// Event definitions
interface OrderEvent {
orderId: string;
customerId: string;
totalAmount: number;
items: OrderItem[];
timestamp: Date;
}

interface OrderCreatedEvent extends OrderEvent {
type: 'OrderCreated';
}

interface OrderUpdatedEvent extends OrderEvent {
type: 'OrderUpdated';
previousStatus: string;
newStatus: string;
}

// Outbox event structure
interface OutboxEvent {
id?: string;
aggregateId: string;
aggregateType: string;
eventType: string;
eventData: any;
eventVersion: number;
createdAt?: Date;
processedAt?: Date;
retryCount?: number;
maxRetries?: number;
nextRetryAt?: Date;
}

// Order service with transactional outbox
class OrderService {
constructor(
private db: DatabaseConnection,
private eventPublisher: EventPublisher
) {}

async createOrder(orderData: CreateOrderRequest): Promise<Order> {
return await this.db.transaction(async (trx) => {
// 1. Insert order data
const order = await trx.table('orders').insert({
customer_id: orderData.customerId,
total_amount: orderData.totalAmount,
status: 'pending'
}).returning('*');

// 2. Create event data
const event: OrderCreatedEvent = {
type: 'OrderCreated',
orderId: order.id,
customerId: order.customer_id,
totalAmount: order.total_amount,
items: orderData.items,
timestamp: new Date()
};

// 3. Insert event into outbox table (same transaction)
await this.insertOutboxEvent(trx, {
aggregateId: order.id,
aggregateType: 'Order',
eventType: 'OrderCreated',
eventData: event,
eventVersion: 1
});

return order;
});
}

async updateOrderStatus(orderId: string, newStatus: string): Promise<Order> {
return await this.db.transaction(async (trx) => {
// 1. Get current order
const currentOrder = await trx.table('orders')
.where('id', orderId)
.first();

if (!currentOrder) {
throw new Error('Order not found');
}

// 2. Update order status
const updatedOrder = await trx.table('orders')
.where('id', orderId)
.update({
status: newStatus,
updated_at: new Date()
})
.returning('*');

// 3. Create event for status change
const event: OrderUpdatedEvent = {
type: 'OrderUpdated',
orderId: updatedOrder.id,
customerId: updatedOrder.customer_id,
totalAmount: updatedOrder.total_amount,
items: [], // Would be fetched from order_items table
previousStatus: currentOrder.status,
newStatus: newStatus,
timestamp: new Date()
};

// 4. Insert event into outbox (same transaction)
await this.insertOutboxEvent(trx, {
aggregateId: orderId,
aggregateType: 'Order',
eventType: 'OrderUpdated',
eventData: event,
eventVersion: currentOrder.version + 1
});

return updatedOrder;
});
}

private async insertOutboxEvent(
trx: Transaction,
event: Omit<OutboxEvent, 'id' | 'createdAt'>
): Promise<void> {
await trx.table('outbox_events').insert({
aggregate_id: event.aggregateId,
aggregate_type: event.aggregateType,
event_type: event.eventType,
event_data: JSON.stringify(event.eventData),
event_version: event.eventVersion,
retry_count: 0,
max_retries: 3
});
}
}

Event Publisher Implementation

// Outbox event publisher
class OutboxEventPublisher {
private isRunning = false;
private pollInterval = 5000; // 5 seconds

constructor(
private db: DatabaseConnection,
private messageBus: MessageBus,
private logger: Logger
) {}

async start(): Promise<void> {
this.isRunning = true;
this.logger.info('Starting outbox event publisher');

// Start polling for unprocessed events
this.pollOutboxEvents();
}

async stop(): Promise<void> {
this.isRunning = false;
this.logger.info('Stopping outbox event publisher');
}

private async pollOutboxEvents(): Promise<void> {
while (this.isRunning) {
try {
await this.processUnpublishedEvents();
await this.sleep(this.pollInterval);
} catch (error) {
this.logger.error('Error polling outbox events:', error);
await this.sleep(this.pollInterval);
}
}
}

private async processUnpublishedEvents(): Promise<void> {
const batchSize = 100;
const events = await this.fetchUnprocessedEvents(batchSize);

if (events.length === 0) {
return;
}

this.logger.info(`Processing ${events.length} outbox events`);

for (const event of events) {
await this.processEvent(event);
}
}

private async fetchUnprocessedEvents(limit: number): Promise<OutboxEvent[]> {
return await this.db.table('outbox_events')
.where('processed_at', null)
.where(function() {
this.where('next_retry_at', null)
.orWhere('next_retry_at', '<=', new Date());
})
.orderBy('created_at', 'asc')
.limit(limit);
}

private async processEvent(event: OutboxEvent): Promise<void> {
try {
// Publish event to message bus
await this.messageBus.publish({
topic: `order.${event.eventType.toLowerCase()}`,
key: event.aggregateId,
value: event.eventData,
headers: {
eventType: event.eventType,
aggregateType: event.aggregateType,
eventVersion: event.eventVersion.toString(),
timestamp: event.createdAt?.toISOString()
}
});

// Mark as processed
await this.markAsProcessed(event.id!);

this.logger.debug(`Successfully published event ${event.id}`);

} catch (error) {
this.logger.error(`Failed to publish event ${event.id}:`, error);
await this.handlePublishFailure(event);
}
}

private async markAsProcessed(eventId: string): Promise<void> {
await this.db.table('outbox_events')
.where('id', eventId)
.update({
processed_at: new Date()
});
}

private async handlePublishFailure(event: OutboxEvent): Promise<void> {
const retryCount = (event.retryCount || 0) + 1;
const maxRetries = event.maxRetries || 3;

if (retryCount >= maxRetries) {
this.logger.error(`Event ${event.id} exceeded max retries, marking as failed`);

// Move to dead letter or mark as failed
await this.db.table('outbox_events')
.where('id', event.id)
.update({
retry_count: retryCount,
next_retry_at: null // Stop retrying
});

return;
}

// Calculate exponential backoff
const backoffMs = Math.pow(2, retryCount) * 1000; // 2^n seconds
const nextRetry = new Date(Date.now() + backoffMs);

await this.db.table('outbox_events')
.where('id', event.id)
.update({
retry_count: retryCount,
next_retry_at: nextRetry
});

this.logger.warn(`Scheduled retry ${retryCount}/${maxRetries} for event ${event.id} at ${nextRetry}`);
}

private sleep(ms: number): Promise<void> {
return new Promise(resolve => setTimeout(resolve, ms));
}
}

Advanced Features

// Event deduplication using idempotency keys
class IdempotentOutboxService extends OrderService {
async createOrderIdempotent(
orderData: CreateOrderRequest,
idempotencyKey: string
): Promise<Order> {
return await this.db.transaction(async (trx) => {
// Check if this operation was already performed
const existingEvent = await trx.table('outbox_events')
.where('event_data->idempotencyKey', idempotencyKey)
.first();

if (existingEvent) {
// Return existing order
return await trx.table('orders')
.where('id', existingEvent.aggregate_id)
.first();
}

// Proceed with normal creation, adding idempotency key
const order = await this.createOrder({
...orderData,
idempotencyKey
});

return order;
});
}
}

// Cleanup service for processed events
class OutboxCleanupService {
constructor(private db: DatabaseConnection) {}

async cleanupProcessedEvents(olderThanDays: number = 7): Promise<number> {
const cutoffDate = new Date();
cutoffDate.setDate(cutoffDate.getDate() - olderThanDays);

const deleted = await this.db.table('outbox_events')
.where('processed_at', '<', cutoffDate)
.del();

return deleted;
}

async archiveProcessedEvents(olderThanDays: number = 30): Promise<number> {
const cutoffDate = new Date();
cutoffDate.setDate(cutoffDate.getDate() - olderThanDays);

// Move to archive table
const events = await this.db.table('outbox_events')
.where('processed_at', '<', cutoffDate);

if (events.length > 0) {
await this.db.table('outbox_events_archive').insert(events);
await this.db.table('outbox_events')
.where('processed_at', '<', cutoffDate)
.del();
}

return events.length;
}
}

Monitoring and Observability

// Metrics for outbox pattern
class OutboxMetrics {
private unprocessedEventsGauge: Gauge;
private processingTimeHistogram: Histogram;
private failureCounter: Counter;

constructor(private metricsRegistry: MetricsRegistry) {
this.unprocessedEventsGauge = metricsRegistry.gauge(
'outbox_unprocessed_events_total',
'Number of unprocessed events in outbox'
);

this.processingTimeHistogram = metricsRegistry.histogram(
'outbox_event_processing_duration_seconds',
'Time taken to process outbox events'
);

this.failureCounter = metricsRegistry.counter(
'outbox_event_failures_total',
'Number of failed outbox event publications',
['event_type', 'failure_reason']
);
}

async collectMetrics(db: DatabaseConnection): Promise<void> {
// Count unprocessed events
const unprocessedCount = await db.table('outbox_events')
.where('processed_at', null)
.count('* as count')
.first();

this.unprocessedEventsGauge.set(unprocessedCount.count);
}

recordProcessingTime(durationMs: number): void {
this.processingTimeHistogram.observe(durationMs / 1000);
}

recordFailure(eventType: string, reason: string): void {
this.failureCounter.inc({ event_type: eventType, failure_reason: reason });
}
}

Benefits of Transactional Outbox

Atomicity: Business operations and event publishing are atomic Reliability: Events are never lost, even during system failures Consistency: Avoids dual-write problems in distributed systems Eventual Delivery: Events are eventually published with retry logic Ordering: Events can maintain order per aggregate Idempotency: Supports idempotent operations to prevent duplicates

When to Use Transactional Outbox

Good fit when:

  • You need reliable event publishing in distributed systems
  • Business operations must be atomic with event publishing
  • System availability is more important than immediate consistency
  • You're implementing event sourcing or CQRS patterns
  • Network failures or service outages are common

Avoid when:

  • Simple, single-service applications without external dependencies
  • Immediate consistency is absolutely required
  • Event ordering across different aggregates is critical
  • The overhead of polling and cleanup is too expensive
  • You already have reliable message queues with transactional guarantees

Implementation Guidelines

  1. Design for Idempotency: Ensure event handlers can process duplicate events safely
  2. Monitor Lag: Alert when unprocessed events accumulate beyond thresholds
  3. Handle Poison Messages: Implement dead letter queues for repeatedly failing events
  4. Cleanup Strategy: Regularly archive or delete processed events
  5. Batch Processing: Process events in batches for better performance
  6. Graceful Degradation: Handle publisher failures without affecting business operations