Skip to main content

Pipelines and Handlers

Pipelines and handlers streamline processing by organizing tasks into sequential steps. Each step, or handler, processes data and passes it along the pipeline. This modular approach enhances scalability, testability, and flexibility, making it an ideal choice for complex workflows like data processing, request validation, or transformation layers.

Example: API Request Processing Pipeline

Consider an API gateway that processes incoming requests through a series of handlers before reaching the business logic:

Pipeline Architecture

Click to enlarge

Handler Implementation

// Base Handler Interface
interface Handler<T> {
handle(context: PipelineContext<T>): Promise<PipelineContext<T>>;
setNext(handler: Handler<T>): Handler<T>;
}

// Abstract Handler Base Class
abstract class BaseHandler<T> implements Handler<T> {
private nextHandler: Handler<T> | null = null;

setNext(handler: Handler<T>): Handler<T> {
this.nextHandler = handler;
return handler;
}

async handle(context: PipelineContext<T>): Promise<PipelineContext<T>> {
const result = await this.process(context);

if (this.nextHandler && !result.shouldStop) {
return this.nextHandler.handle(result);
}

return result;
}

protected abstract process(context: PipelineContext<T>): Promise<PipelineContext<T>>;
}

// Pipeline Context
interface PipelineContext<T> {
data: T;
metadata: Record<string, any>;
errors: string[];
shouldStop: boolean;
timestamp: Date;
}

Concrete Handler Implementations

// Authentication Handler
class AuthenticationHandler extends BaseHandler<APIRequest> {
protected async process(context: PipelineContext<APIRequest>): Promise<PipelineContext<APIRequest>> {
const { authorization } = context.data.headers;

if (!authorization) {
context.errors.push('Missing authorization header');
context.shouldStop = true;
return context;
}

try {
const user = await this.validateToken(authorization);
context.metadata.user = user;
context.metadata.authenticated = true;
} catch (error) {
context.errors.push('Invalid authentication token');
context.shouldStop = true;
}

return context;
}

private async validateToken(token: string): Promise<User> {
// JWT validation logic
return jwt.verify(token, process.env.JWT_SECRET) as User;
}
}

// Rate Limiting Handler
class RateLimitHandler extends BaseHandler<APIRequest> {
private rateLimiter = new Map<string, { count: number; resetTime: number }>();

protected async process(context: PipelineContext<APIRequest>): Promise<PipelineContext<APIRequest>> {
const clientId = context.metadata.user?.id || context.data.ip;
const limit = this.getRateLimit(clientId);

if (this.isRateLimited(clientId, limit)) {
context.errors.push('Rate limit exceeded');
context.shouldStop = true;
context.metadata.rateLimited = true;
}

return context;
}

private isRateLimited(clientId: string, limit: number): boolean {
const now = Date.now();
const client = this.rateLimiter.get(clientId);

if (!client || now > client.resetTime) {
this.rateLimiter.set(clientId, { count: 1, resetTime: now + 60000 });
return false;
}

if (client.count >= limit) {
return true;
}

client.count++;
return false;
}
}

// Validation Handler
class ValidationHandler extends BaseHandler<APIRequest> {
protected async process(context: PipelineContext<APIRequest>): Promise<PipelineContext<APIRequest>> {
const schema = this.getSchemaForEndpoint(context.data.endpoint);

try {
const validatedData = await schema.validate(context.data.body);
context.data.body = validatedData;
context.metadata.validated = true;
} catch (error) {
context.errors.push(`Validation failed: ${error.message}`);
context.shouldStop = true;
}

return context;
}
}

Pipeline Orchestration

// Pipeline Builder
class PipelineBuilder<T> {
private handlers: Handler<T>[] = [];

addHandler(handler: Handler<T>): PipelineBuilder<T> {
this.handlers.push(handler);
return this;
}

build(): Pipeline<T> {
if (this.handlers.length === 0) {
throw new Error('Pipeline must have at least one handler');
}

// Chain handlers together
for (let i = 0; i < this.handlers.length - 1; i++) {
this.handlers[i].setNext(this.handlers[i + 1]);
}

return new Pipeline(this.handlers[0]);
}
}

// Pipeline Executor
class Pipeline<T> {
constructor(private firstHandler: Handler<T>) {}

async execute(data: T): Promise<PipelineContext<T>> {
const context: PipelineContext<T> = {
data,
metadata: {},
errors: [],
shouldStop: false,
timestamp: new Date()
};

try {
return await this.firstHandler.handle(context);
} catch (error) {
context.errors.push(`Pipeline execution failed: ${error.message}`);
context.shouldStop = true;
return context;
}
}
}

// Usage Example
const apiPipeline = new PipelineBuilder<APIRequest>()
.addHandler(new AuthenticationHandler())
.addHandler(new RateLimitHandler())
.addHandler(new ValidationHandler())
.addHandler(new TransformationHandler())
.addHandler(new DataEnrichmentHandler())
.addHandler(new RoutingHandler())
.build();

// Process incoming request
app.post('/api/*', async (req, res) => {
const apiRequest: APIRequest = {
endpoint: req.path,
method: req.method,
headers: req.headers,
body: req.body,
ip: req.ip
};

const result = await apiPipeline.execute(apiRequest);

if (result.errors.length > 0) {
return res.status(400).json({ errors: result.errors });
}

// Continue to business logic
const response = await processBusinessLogic(result.data);
res.json(response);
});

Advanced Pipeline Features

// Parallel Processing Handler
class ParallelProcessingHandler extends BaseHandler<APIRequest> {
protected async process(context: PipelineContext<APIRequest>): Promise<PipelineContext<APIRequest>> {
const tasks = [
this.enrichUserData(context),
this.fetchPermissions(context),
this.logRequest(context)
];

try {
await Promise.all(tasks);
} catch (error) {
context.errors.push(`Parallel processing failed: ${error.message}`);
}

return context;
}
}

// Conditional Handler
class ConditionalHandler extends BaseHandler<APIRequest> {
protected async process(context: PipelineContext<APIRequest>): Promise<PipelineContext<APIRequest>> {
if (this.shouldProcess(context)) {
// Process only if condition is met
await this.performConditionalLogic(context);
}

return context;
}

private shouldProcess(context: PipelineContext<APIRequest>): boolean {
return context.metadata.user?.role === 'admin';
}
}

// Retry Handler
class RetryHandler extends BaseHandler<APIRequest> {
private maxRetries = 3;

protected async process(context: PipelineContext<APIRequest>): Promise<PipelineContext<APIRequest>> {
let attempt = 0;

while (attempt < this.maxRetries) {
try {
await this.performOperation(context);
break;
} catch (error) {
attempt++;

if (attempt >= this.maxRetries) {
context.errors.push(`Operation failed after ${this.maxRetries} attempts`);
} else {
await this.delay(1000 * attempt); // Exponential backoff
}
}
}

return context;
}
}

Benefits of Pipeline Pattern

Modularity: Each handler has a single responsibility Testability: Handlers can be tested in isolation Flexibility: Easy to add, remove, or reorder handlers Reusability: Handlers can be shared across different pipelines Maintainability: Changes to one step don't affect others Monitoring: Each step can be logged and monitored independently

When to Use Pipelines

Good fit when:

  • Complex processing requires multiple sequential steps
  • Different steps have different responsibilities
  • You need to add/remove processing steps dynamically
  • Error handling needs to be granular
  • Processing steps might be reused in different contexts

Avoid when:

  • Simple, single-step processing is sufficient
  • Steps are tightly coupled and can't be separated
  • Performance is critical and pipeline overhead is significant
  • Processing is highly parallel rather than sequential

Implementation Guidelines

  1. Keep Handlers Small: Each handler should have a single, focused responsibility
  2. Design for Immutability: Avoid modifying shared state between handlers
  3. Handle Errors Gracefully: Decide whether to stop or continue on errors
  4. Add Observability: Log entry/exit and performance metrics for each handler
  5. Consider Async Operations: Design handlers to work well with async processing
  6. Plan for Scalability: Consider how handlers will perform under load