[Integration Pattern Name]
Type: [Synchronous | Asynchronous | Hybrid]
Protocol: [REST | GraphQL | gRPC | Message Queue | Event Stream]
Complexity: [Simple | Moderate | Complex]
Reliability: [High | Medium | Low]
Last Updated: [YYYY-MM-DD]
Maintainer: [Team/Individual]
Overview
Pattern Purpose
[Brief description of what this integration pattern solves and when to use it]
Problem Statement
[Describe the integration challenge this pattern addresses]
Key Benefits
- Benefit 1: Description
- Benefit 2: Description
- Benefit 3: Description
Trade-offs
- Trade-off 1: Description and mitigation
- Trade-off 2: Description and mitigation
When to Use This Pattern
Ideal Scenarios
[List specific scenarios where this pattern is most appropriate:]
- Scenario 1: Description
- Scenario 2: Description
- Scenario 3: Description
Avoid When
[Important: describe scenarios where this pattern is not appropriate:]
- Anti-pattern 1: Description
- Anti-pattern 2: Description
Decision Criteria
[Factors to consider when choosing this pattern:]
- Data Volume: [Low | Medium | High] - Impact on pattern choice
- Latency Requirements: [Real-time | Near real-time | Batch] - How it affects implementation
- Consistency Requirements: [Strong | Eventual | Weak] - Consistency implications
- Error Handling: [Critical | Standard | Best-effort] - Error handling complexity
Integration Architecture
High-Level Architecture
[Include a comprehensive integration diagram using Mermaid]
Component Responsibilities
[Producer Component]
Purpose: [What this component does in the integration] Responsibilities:
- Responsibility 1: Description
- Responsibility 2: Description
- Responsibility 3: Description
Key Technologies: [List main technologies used] Scaling Considerations: [How this component scales]
[Integration Layer]
Purpose: [What this component does in the integration] Responsibilities:
- Responsibility 1: Description
- Responsibility 2: Description
- Responsibility 3: Description
Key Technologies: [List main technologies used] Scaling Considerations: [How this component scales]
[Consumer Component]
Purpose: [What this component does in the integration] Responsibilities:
- Responsibility 1: Description
- Responsibility 2: Description
- Responsibility 3: Description
Key Technologies: [List main technologies used] Scaling Considerations: [How this component scales]
Data Flow & Communication
Message Flow
[Describe how data flows through the integration]
Data Formats
Request Format
{
"messageId": "uuid",
"timestamp": "2024-01-15T10:30:00Z",
"source": "producer-service",
"eventType": "user.created",
"version": "1.0",
"data": {
"userId": "12345",
"email": "user@example.com",
"metadata": {
"source": "web-app",
"userAgent": "Mozilla/5.0..."
}
}
}
Response Format
{
"messageId": "uuid",
"status": "success",
"timestamp": "2024-01-15T10:30:01Z",
"processingTime": 150,
"acknowledgment": {
"received": true,
"processed": true,
"stored": true
}
}
Error Format
{
"messageId": "uuid",
"status": "error",
"timestamp": "2024-01-15T10:30:01Z",
"error": {
"code": "VALIDATION_ERROR",
"message": "Invalid email format",
"details": {
"field": "email",
"value": "invalid-email",
"expected": "valid email address"
}
},
"retryable": true,
"retryAfter": 30
}
Protocol Specifications
[Protocol Name] Configuration
# Example protocol configuration
protocol:
type: [REST | GraphQL | gRPC | AMQP | etc.]
version: "1.0"
endpoint: "https://api.example.com/v1/integration"
authentication:
type: "Bearer"
tokenEndpoint: "https://auth.example.com/token"
timeout: 30000 # 30 seconds
retries: 3
backoff: exponential
Implementation Guide
Prerequisites
[List what needs to be in place before implementation]
- Infrastructure requirement 1
- Security requirement 2
- Dependency requirement 3
Step-by-Step Implementation
Phase 1: Basic Integration Setup (Week 1)
[Detailed steps for setting up basic integration]
-
Infrastructure Setup
# Example infrastructure setup commands
kubectl create namespace integration
kubectl apply -f integration-infrastructure.yaml -
Message Queue Setup
# Example message queue configuration
apiVersion: v1
kind: ConfigMap
metadata:
name: queue-config
data:
broker.conf: |
broker.id=1
listeners=PLAINTEXT://0.0.0.0:9092
auto.create.topics.enable=true
default.replication.factor=3 -
Basic Producer Implementation
// Example producer implementation
class MessageProducer {
constructor(config) {
this.config = config;
this.client = new IntegrationClient(config);
}
async sendMessage(topic, message) {
try {
const result = await this.client.send({
topic,
message: JSON.stringify(message),
timestamp: Date.now()
});
return result;
} catch (error) {
console.error('Failed to send message:', error);
throw error;
}
}
}
Phase 2: Consumer Implementation (Week 2)
[Steps for implementing the consumer side]
-
Consumer Setup
// Example consumer implementation
class MessageConsumer {
constructor(config) {
this.config = config;
this.client = new IntegrationClient(config);
}
async startConsuming(topic, handler) {
const consumer = this.client.consumer({
groupId: 'integration-consumer'
});
await consumer.subscribe({ topic });
await consumer.run({
eachMessage: async ({ message }) => {
try {
const data = JSON.parse(message.value.toString());
await handler(data);
} catch (error) {
console.error('Failed to process message:', error);
}
}
});
}
} -
Data Transformation
// Example data transformation
class DataTransformer {
transform(inputData) {
return {
id: inputData.userId,
email: inputData.email,
createdAt: new Date(inputData.timestamp),
source: inputData.metadata?.source || 'unknown',
// Additional transformation logic
};
}
validate(data) {
const errors = [];
if (!data.id) errors.push('Missing required field: id');
if (!data.email) errors.push('Missing required field: email');
if (errors.length > 0) {
throw new ValidationError(errors);
}
return true;
}
}
Phase 3: Advanced Features (Week 3)
[Steps for implementing advanced integration features]
- Error Handling & Retry Logic
- Monitoring & Alerting
- Security Implementation
Configuration Examples
Environment Configuration
# config/integration.yml
integration:
producer:
brokers: ["kafka-1:9092", "kafka-2:9092", "kafka-3:9092"]
topic: "user.events"
acks: "all"
retries: 3
consumer:
brokers: ["kafka-1:9092", "kafka-2:9092", "kafka-3:9092"]
groupId: "user-processor"
autoOffsetReset: "earliest"
security:
ssl:
enabled: true
keystore: "/path/to/keystore"
truststore: "/path/to/truststore"
sasl:
mechanism: "PLAIN"
username: "${KAFKA_USERNAME}"
password: "${KAFKA_PASSWORD}"
Application Configuration
// Example application configuration
const integrationConfig = {
producer: {
clientId: 'user-service-producer',
brokers: process.env.KAFKA_BROKERS.split(','),
retry: {
initialRetryTime: 100,
retries: 8
}
},
consumer: {
clientId: 'user-service-consumer',
brokers: process.env.KAFKA_BROKERS.split(','),
groupId: process.env.CONSUMER_GROUP_ID
},
topics: {
userEvents: 'user.events',
userNotifications: 'user.notifications'
}
};
Error Handling & Resilience
Error Handling Strategy
[Describe comprehensive error handling approach]
Error Categories
- Transient Errors: Network timeouts, temporary service unavailability
- Permanent Errors: Invalid data format, authentication failures
- Business Logic Errors: Data validation failures, business rule violations
Retry Mechanisms
// Example retry implementation
class RetryHandler {
constructor(maxRetries = 3, baseDelay = 1000) {
this.maxRetries = maxRetries;
this.baseDelay = baseDelay;
}
async executeWithRetry(operation, context) {
let lastError;
for (let attempt = 1; attempt <= this.maxRetries; attempt++) {
try {
return await operation(context);
} catch (error) {
lastError = error;
if (!this.isRetryable(error) || attempt === this.maxRetries) {
throw error;
}
const delay = this.calculateDelay(attempt);
console.log(`Attempt ${attempt} failed, retrying in ${delay}ms`);
await this.sleep(delay);
}
}
throw lastError;
}
isRetryable(error) {
return error.code === 'NETWORK_ERROR' ||
error.code === 'TIMEOUT' ||
error.status >= 500;
}
calculateDelay(attempt) {
return this.baseDelay * Math.pow(2, attempt - 1); // Exponential backoff
}
sleep(ms) {
return new Promise(resolve => setTimeout(resolve, ms));
}
}
Dead Letter Queue
// Example dead letter queue implementation
class DeadLetterHandler {
constructor(dlqTopic) {
this.dlqTopic = dlqTopic;
}
async sendToDeadLetter(originalMessage, error) {
const dlqMessage = {
originalMessage,
error: {
message: error.message,
stack: error.stack,
timestamp: new Date().toISOString()
},
metadata: {
attempts: originalMessage.attempts || 1,
firstFailure: originalMessage.firstFailure || new Date().toISOString(),
lastFailure: new Date().toISOString()
}
};
await this.producer.send({
topic: this.dlqTopic,
messages: [{ value: JSON.stringify(dlqMessage) }]
});
}
}
Circuit Breaker Pattern
// Example circuit breaker implementation
class CircuitBreaker {
constructor(threshold = 5, timeout = 60000) {
this.threshold = threshold;
this.timeout = timeout;
this.failureCount = 0;
this.state = 'CLOSED'; // CLOSED, OPEN, HALF_OPEN
this.nextAttempt = Date.now();
}
async execute(operation) {
if (this.state === 'OPEN') {
if (Date.now() < this.nextAttempt) {
throw new Error('Circuit breaker is OPEN');
}
this.state = 'HALF_OPEN';
}
try {
const result = await operation();
this.onSuccess();
return result;
} catch (error) {
this.onFailure();
throw error;
}
}
onSuccess() {
this.failureCount = 0;
this.state = 'CLOSED';
}
onFailure() {
this.failureCount++;
if (this.failureCount >= this.threshold) {
this.state = 'OPEN';
this.nextAttempt = Date.now() + this.timeout;
}
}
}
Security Considerations
Authentication & Authorization
[Describe security implementation for the integration]
API Key Authentication
// Example API key authentication
class ApiKeyAuth {
constructor(apiKey) {
this.apiKey = apiKey;
}
addAuthHeaders(headers = {}) {
return {
...headers,
'Authorization': `Bearer ${this.apiKey}`,
'X-API-Key': this.apiKey
};
}
}
OAuth 2.0 Implementation
// Example OAuth 2.0 implementation
class OAuth2Handler {
constructor(clientId, clientSecret, tokenEndpoint) {
this.clientId = clientId;
this.clientSecret = clientSecret;
this.tokenEndpoint = tokenEndpoint;
this.accessToken = null;
this.tokenExpiry = null;
}
async getAccessToken() {
if (this.accessToken && Date.now() < this.tokenExpiry) {
return this.accessToken;
}
const response = await fetch(this.tokenEndpoint, {
method: 'POST',
headers: {
'Content-Type': 'application/x-www-form-urlencoded'
},
body: new URLSearchParams({
grant_type: 'client_credentials',
client_id: this.clientId,
client_secret: this.clientSecret
})
});
const data = await response.json();
this.accessToken = data.access_token;
this.tokenExpiry = Date.now() + (data.expires_in * 1000);
return this.accessToken;
}
}
Data Encryption
[Describe data protection measures]
Message Encryption
// Example message encryption
const crypto = require('crypto');
class MessageEncryption {
constructor(encryptionKey) {
this.algorithm = 'aes-256-gcm';
this.key = Buffer.from(encryptionKey, 'hex');
}
encrypt(data) {
const iv = crypto.randomBytes(16);
const cipher = crypto.createCipher(this.algorithm, this.key);
cipher.setAAD(Buffer.from('integration-data'));
let encrypted = cipher.update(JSON.stringify(data), 'utf8', 'hex');
encrypted += cipher.final('hex');
const authTag = cipher.getAuthTag();
return {
encrypted,
iv: iv.toString('hex'),
authTag: authTag.toString('hex')
};
}
decrypt(encryptedData) {
const decipher = crypto.createDecipher(this.algorithm, this.key);
decipher.setAAD(Buffer.from('integration-data'));
decipher.setAuthTag(Buffer.from(encryptedData.authTag, 'hex'));
let decrypted = decipher.update(encryptedData.encrypted, 'hex', 'utf8');
decrypted += decipher.final('utf8');
return JSON.parse(decrypted);
}
}
Testing Strategy
Testing Pyramid
[Describe comprehensive testing approach]
Unit Tests
// Example unit tests for integration components
describe('MessageProducer', () => {
let producer;
beforeEach(() => {
producer = new MessageProducer(mockConfig);
});
it('should send message successfully', async () => {
const message = {
id: 'test-123',
type: 'user.created',
data: { userId: '456' }
};
const result = await producer.sendMessage('test-topic', message);
expect(result.success).toBe(true);
});
});
Integration Tests
// Example integration tests
describe('End-to-End Integration', () => {
let producer, consumer;
beforeAll(async () => {
producer = new MessageProducer(testConfig);
consumer = new MessageConsumer(testConfig);
await setupTestEnvironment();
});
it('should process message end-to-end', async () => {
const testMessage = {
id: 'integration-test-1',
type: 'test.event',
data: { value: 'test-data' }
};
// Send message
await producer.sendMessage('test-topic', testMessage);
// Verify message was processed
const result = await waitForProcessing('integration-test-1');
expect(result.processed).toBe(true);
});
});
Contract Tests
// Example contract tests using Pact
const { Pact } = require('@pact-foundation/pact');
describe('Integration Contract Tests', () => {
const provider = new Pact({
consumer: 'user-service',
provider: 'notification-service',
port: 1234
});
beforeAll(() => provider.setup());
afterAll(() => provider.finalize());
it('should handle user created event', async () => {
await provider
.given('user service sends user created event')
.uponReceiving('a user created event')
.withRequest({
method: 'POST',
path: '/events/user-created',
headers: { 'Content-Type': 'application/json' },
body: {
userId: '123',
email: 'test@example.com'
}
})
.willRespondWith({
status: 200,
body: { processed: true }
});
// Test the integration
const result = await sendUserCreatedEvent({
userId: '123',
email: 'test@example.com'
});
expect(result.processed).toBe(true);
});
});
Monitoring & Observability
Key Metrics
[Define what to monitor for this integration pattern]
Business Metrics
- Message Throughput: Messages processed per second
- Processing Latency: End-to-end processing time
- Success Rate: Percentage of successfully processed messages
- Error Rate: Percentage of failed messages
Technical Metrics
- Queue Depth: Number of pending messages
- Consumer Lag: Delay between message production and consumption
- Connection Health: Status of integration connections
- Resource Utilization: CPU, memory, network usage
Monitoring Setup
# Example monitoring configuration
monitoring:
prometheus:
scrape_configs:
- job_name: 'integration-metrics'
static_configs:
- targets: ['integration-service:8080']
metrics_path: /metrics
scrape_interval: 15s
grafana:
dashboards:
- name: Integration Overview
panels:
- title: Message Throughput
type: graph
targets:
- expr: rate(messages_processed_total[5m])
- title: Processing Latency
type: graph
targets:
- expr: histogram_quantile(0.95, message_processing_duration_seconds_bucket)
Alerting Rules
# Example alerting rules
groups:
- name: integration.rules
rules:
- alert: HighMessageLatency
expr: histogram_quantile(0.95, message_processing_duration_seconds_bucket) > 5
for: 5m
labels:
severity: warning
annotations:
summary: "High message processing latency"
description: "95th percentile latency is {{ $value }}s"
- alert: MessageProcessingFailures
expr: rate(message_processing_failures_total[5m]) > 0.1
for: 2m
labels:
severity: critical
annotations:
summary: "High message processing failure rate"
description: "Failure rate is {{ $value }} per second"
Distributed Tracing
// Example distributed tracing implementation
const opentelemetry = require('@opentelemetry/api');
class TracedMessageHandler {
constructor(tracer) {
this.tracer = tracer;
}
async handleMessage(message) {
const span = this.tracer.startSpan('message.process', {
attributes: {
'message.id': message.id,
'message.type': message.type,
'integration.pattern': 'async-messaging'
}
});
try {
// Add message processing logic
const result = await this.processMessage(message);
span.setAttributes({
'message.processed': true,
'processing.duration': result.duration
});
return result;
} catch (error) {
span.recordException(error);
span.setStatus({ code: opentelemetry.SpanStatusCode.ERROR });
throw error;
} finally {
span.end();
}
}
}
Performance Considerations
Throughput Optimization
[Describe how to optimize for high throughput]
Batch Processing
// Example batch processing implementation
class BatchProcessor {
constructor(batchSize = 100, flushInterval = 5000) {
this.batchSize = batchSize;
this.flushInterval = flushInterval;
this.batch = [];
this.timer = null;
}
addMessage(message) {
this.batch.push(message);
if (this.batch.length >= this.batchSize) {
this.flush();
} else if (!this.timer) {
this.timer = setTimeout(() => this.flush(), this.flushInterval);
}
}
async flush() {
if (this.batch.length === 0) return;
const currentBatch = [...this.batch];
this.batch = [];
if (this.timer) {
clearTimeout(this.timer);
this.timer = null;
}
try {
await this.processBatch(currentBatch);
} catch (error) {
console.error('Batch processing failed:', error);
// Implement error handling strategy
}
}
async processBatch(messages) {
// Process messages in batch
console.log(`Processing batch of ${messages.length} messages`);
}
}
Connection Pooling
// Example connection pooling
class ConnectionPool {
constructor(config) {
this.config = config;
this.connections = [];
this.inUse = new Set();
this.available = [];
}
async getConnection() {
if (this.available.length > 0) {
const connection = this.available.pop();
this.inUse.add(connection);
return connection;
}
if (this.connections.length < this.config.maxConnections) {
const connection = await this.createConnection();
this.connections.push(connection);
this.inUse.add(connection);
return connection;
}
// Wait for available connection
return this.waitForConnection();
}
releaseConnection(connection) {
this.inUse.delete(connection);
this.available.push(connection);
}
}
Latency Optimization
[Describe how to minimize latency]
Async Processing
// Example async processing optimization
class AsyncProcessor {
constructor() {
this.queue = [];
this.processing = false;
}
async processAsync(message) {
return new Promise((resolve, reject) => {
this.queue.push({ message, resolve, reject });
this.startProcessing();
});
}
async startProcessing() {
if (this.processing) return;
this.processing = true;
while (this.queue.length > 0) {
const { message, resolve, reject } = this.queue.shift();
try {
const result = await this.processMessage(message);
resolve(result);
} catch (error) {
reject(error);
}
}
this.processing = false;
}
}
Troubleshooting
Common Issues
Issue: Message Loss
Symptoms:
- Messages sent but not received
- Missing data in target system
- Inconsistent data between systems
Root Causes:
- Network failures during transmission
- Consumer service downtime
- Message queue overflow
Diagnostic Steps:
- Check message queue metrics
- Verify consumer service health
- Review network connectivity
- Check for message expiration
Solutions:
- Implement message persistence
- Add dead letter queues
- Increase consumer capacity
- Implement message acknowledgments
Issue: Duplicate Messages
Symptoms:
- Same message processed multiple times
- Duplicate data in target system
- Inconsistent system state
Root Causes:
- Network retries
- Consumer restart during processing
- Lack of idempotency
Diagnostic Steps:
- Check message IDs for duplicates
- Review retry configurations
- Analyze consumer logs
- Verify idempotency implementation
Solutions:
- Implement idempotent processing
- Use unique message IDs
- Add deduplication logic
- Implement exactly-once semantics
Diagnostic Tools
[List tools for troubleshooting integration issues]
Message Tracing
// Example message tracing
class MessageTracer {
constructor() {
this.traces = new Map();
}
startTrace(messageId) {
this.traces.set(messageId, {
id: messageId,
startTime: Date.now(),
events: []
});
}
addEvent(messageId, event, data = {}) {
const trace = this.traces.get(messageId);
if (trace) {
trace.events.push({
event,
timestamp: Date.now(),
duration: Date.now() - trace.startTime,
data
});
}
}
getTrace(messageId) {
return this.traces.get(messageId);
}
}
Related Patterns
Complementary Patterns
- Event Sourcing (Coming Soon)
- CQRS (Coming Soon)
- Saga Pattern (Coming Soon)
Alternative Patterns
- Request-Response Pattern (Coming Soon)
- Publish-Subscribe Pattern (Coming Soon)
- Point-to-Point Pattern (Coming Soon)
References
Documentation
External Resources
Standards
Document Metadata:
- Created: [YYYY-MM-DD]
- Last Updated: [YYYY-MM-DD]
- Next Review: [YYYY-MM-DD]
- Version: 1.0
- Integration Endpoints: [List of endpoints/queues]
Template Usage Notes
How to Use This Template
- Copy this template for new integration patterns
- Replace all bracketed placeholders with specific content
- Include actual protocol specifications and examples
- Test all code examples for accuracy
- Document actual endpoints and configurations
Required Sections
- Overview, Integration Architecture, Data Flow, Implementation Guide, Testing Strategy
Optional Sections
- Remove if not applicable: Performance Considerations, Troubleshooting
Integration Documentation Best Practices
- Include working code examples with real protocols
- Document actual message formats and schemas
- Provide complete configuration examples
- Include error scenarios and handling
- Test all integration examples in realistic environments