Tasks
Tasks are atomic units of work that represent discrete operations within a distributed system. They serve as the fundamental building blocks for workflows, batch processing, and background job execution. Tasks encapsulate business logic, handle execution context, and provide mechanisms for retry, monitoring, and failure recovery.
Core Responsibilities
Execution Management
- Atomic Operations: Execute discrete, well-defined units of work
- Context Handling: Manage execution context and input/output data
- Resource Management: Efficiently utilize system resources during execution
- Lifecycle Management: Handle task creation, execution, completion, and cleanup
Reliability and Resilience
- Error Handling: Gracefully handle and report execution failures
- Retry Logic: Implement configurable retry strategies for transient failures
- Timeout Management: Enforce execution time limits and handle timeouts
- State Persistence: Maintain task state for recovery and monitoring
Task Types
Synchronous Tasks
interface SyncTask<TInput, TOutput> {
id: string;
name: string;
execute(input: TInput): Promise<TOutput>;
timeout?: number;
retryPolicy?: RetryPolicy;
}
class DataValidationTask implements SyncTask<UserData, ValidationResult> {
async execute(userData: UserData): Promise<ValidationResult> {
const errors: string[] = [];
if (!userData.email || !this.isValidEmail(userData.email)) {
errors.push('Invalid email address');
}
if (!userData.phone || !this.isValidPhone(userData.phone)) {
errors.push('Invalid phone number');
}
return {
isValid: errors.length === 0,
errors
};
}
}
Asynchronous Tasks
interface AsyncTask<TInput, TOutput> {
id: string;
name: string;
execute(input: TInput): Promise<TaskHandle<TOutput>>;
status(taskId: string): Promise<TaskStatus>;
result(taskId: string): Promise<TOutput>;
}
class ImageProcessingTask implements AsyncTask<ImageRequest, ProcessedImage> {
async execute(request: ImageRequest): Promise<TaskHandle<ProcessedImage>> {
const taskId = generateId();
// Queue the processing job
await this.jobQueue.enqueue({
taskId,
operation: 'resize',
inputUrl: request.imageUrl,
width: request.targetWidth,
height: request.targetHeight
});
return { taskId, status: 'queued' };
}
}
Batch Tasks
interface BatchTask<TInput, TOutput> {
id: string;
name: string;
batchSize: number;
execute(batch: TInput[]): Promise<TOutput[]>;
partition(items: TInput[]): TInput[][];
}
class EmailNotificationBatchTask implements BatchTask<EmailRequest, EmailResult> {
batchSize = 100;
async execute(batch: EmailRequest[]): Promise<EmailResult[]> {
const results: EmailResult[] = [];
for (const emailRequest of batch) {
try {
const messageId = await this.emailService.send(emailRequest);
results.push({ success: true, messageId });
} catch (error) {
results.push({ success: false, error: error.message });
}
}
return results;
}
}
Task Execution Patterns
Queue-Based Execution
class TaskQueue {
private queue: Queue<Task>;
private workers: Worker[];
async enqueue<T>(task: Task<T>): Promise<string> {
const taskId = generateId();
await this.queue.push({
id: taskId,
task,
enqueuedAt: new Date(),
attempts: 0
});
return taskId;
}
async process(): Promise<void> {
while (true) {
const queuedTask = await this.queue.pop();
if (!queuedTask) {
await this.sleep(1000);
continue;
}
await this.executeTask(queuedTask);
}
}
}
Event-Driven Execution
class EventDrivenTaskExecutor {
constructor(private eventBus: EventBus) {
this.eventBus.on('user.created', this.handleUserCreated.bind(this));
this.eventBus.on('order.placed', this.handleOrderPlaced.bind(this));
}
async handleUserCreated(event: UserCreatedEvent): Promise<void> {
const tasks = [
new SendWelcomeEmailTask(),
new CreateUserProfileTask(),
new SetupDefaultPreferencesTask()
];
for (const task of tasks) {
await this.executeTask(task, event.userData);
}
}
}
Scheduled Execution
class ScheduledTaskRunner {
private scheduler: CronScheduler;
constructor() {
this.scheduler = new CronScheduler();
this.setupScheduledTasks();
}
private setupScheduledTasks(): void {
// Daily cleanup at 2 AM
this.scheduler.schedule('0 2 * * *', new DatabaseCleanupTask());
// Hourly health checks
this.scheduler.schedule('0 * * * *', new SystemHealthCheckTask());
// Weekly reports on Sundays
this.scheduler.schedule('0 9 * * 0', new WeeklyReportTask());
}
}
Error Handling and Retry Strategies
Retry Policies
interface RetryPolicy {
maxAttempts: number;
backoffStrategy: 'fixed' | 'exponential' | 'linear';
initialDelay: number;
maxDelay: number;
retryableErrors: string[];
}
class TaskExecutor {
async executeWithRetry<T>(
task: Task<T>,
input: any,
retryPolicy: RetryPolicy
): Promise<T> {
let attempts = 0;
let lastError: Error;
while (attempts < retryPolicy.maxAttempts) {
try {
return await task.execute(input);
} catch (error) {
lastError = error;
attempts++;
if (!this.isRetryableError(error, retryPolicy)) {
throw error;
}
if (attempts < retryPolicy.maxAttempts) {
const delay = this.calculateDelay(attempts, retryPolicy);
await this.sleep(delay);
}
}
}
throw lastError;
}
}
Circuit Breaker Pattern
class CircuitBreakerTask<TInput, TOutput> {
private circuitBreaker: CircuitBreaker;
constructor(
private wrappedTask: Task<TInput, TOutput>,
private options: CircuitBreakerOptions
) {
this.circuitBreaker = new CircuitBreaker(options);
}
async execute(input: TInput): Promise<TOutput> {
return await this.circuitBreaker.execute(() =>
this.wrappedTask.execute(input)
);
}
}
Task Monitoring and Observability
Metrics Collection
class InstrumentedTask<TInput, TOutput> implements Task<TInput, TOutput> {
constructor(
private wrappedTask: Task<TInput, TOutput>,
private metrics: MetricsCollector
) {}
async execute(input: TInput): Promise<TOutput> {
const startTime = Date.now();
const timer = this.metrics.startTimer('task.execution.duration', {
taskName: this.wrappedTask.name
});
try {
const result = await this.wrappedTask.execute(input);
this.metrics.increment('task.execution.success', {
taskName: this.wrappedTask.name
});
return result;
} catch (error) {
this.metrics.increment('task.execution.failure', {
taskName: this.wrappedTask.name,
errorType: error.constructor.name
});
throw error;
} finally {
timer.end();
this.metrics.histogram('task.execution.duration', Date.now() - startTime, {
taskName: this.wrappedTask.name
});
}
}
}
Distributed Tracing
import { trace } from '@opentelemetry/api';
class TracedTask<TInput, TOutput> implements Task<TInput, TOutput> {
async execute(input: TInput): Promise<TOutput> {
const tracer = trace.getTracer('task-executor');
return await tracer.startActiveSpan(`task.${this.name}`, async (span) => {
span.setAttributes({
'task.name': this.name,
'task.id': this.id,
'task.input.size': JSON.stringify(input).length
});
try {
const result = await this.wrappedTask.execute(input);
span.setStatus({ code: SpanStatusCode.OK });
return result;
} catch (error) {
span.recordException(error);
span.setStatus({
code: SpanStatusCode.ERROR,
message: error.message
});
throw error;
}
});
}
}
Task Composition and Chaining
Sequential Composition
class TaskChain<TInput, TOutput> {
private tasks: Task<any, any>[] = [];
addTask<TNext>(task: Task<TOutput, TNext>): TaskChain<TInput, TNext> {
return new TaskChain([...this.tasks, task]);
}
async execute(input: TInput): Promise<TOutput> {
let result = input;
for (const task of this.tasks) {
result = await task.execute(result);
}
return result as TOutput;
}
}
// Usage
const processingChain = new TaskChain<RawData, ProcessedResult>()
.addTask(new DataValidationTask())
.addTask(new DataTransformationTask())
.addTask(new DataEnrichmentTask())
.addTask(new DataPersistenceTask());
Parallel Composition
class ParallelTaskGroup<TInput, TOutput> {
constructor(private tasks: Task<TInput, any>[]) {}
async execute(input: TInput): Promise<TOutput[]> {
const promises = this.tasks.map(task => task.execute(input));
return await Promise.all(promises);
}
async executeWithFailureHandling(input: TInput): Promise<TaskResult<TOutput>[]> {
const promises = this.tasks.map(async (task) => {
try {
const result = await task.execute(input);
return { success: true, result, taskId: task.id };
} catch (error) {
return { success: false, error, taskId: task.id };
}
});
return await Promise.all(promises);
}
}
Performance Optimization
Task Pooling
class TaskPool<TInput, TOutput> {
private pool: Task<TInput, TOutput>[] = [];
private busy: Set<Task<TInput, TOutput>> = new Set();
constructor(
private taskFactory: () => Task<TInput, TOutput>,
private poolSize: number
) {
this.initializePool();
}
async execute(input: TInput): Promise<TOutput> {
const task = await this.acquireTask();
try {
return await task.execute(input);
} finally {
this.releaseTask(task);
}
}
private async acquireTask(): Promise<Task<TInput, TOutput>> {
while (this.pool.length === 0) {
await this.sleep(10);
}
const task = this.pool.pop()!;
this.busy.add(task);
return task;
}
}
Batch Processing Optimization
class BatchProcessor<TInput, TOutput> {
private batch: TInput[] = [];
private batchTimer: NodeJS.Timeout | null = null;
constructor(
private batchTask: BatchTask<TInput, TOutput>,
private batchSize: number = 100,
private batchTimeout: number = 5000
) {}
async process(input: TInput): Promise<TOutput> {
return new Promise((resolve, reject) => {
this.batch.push({ input, resolve, reject });
if (this.batch.length >= this.batchSize) {
this.processBatch();
} else if (!this.batchTimer) {
this.batchTimer = setTimeout(() => this.processBatch(), this.batchTimeout);
}
});
}
private async processBatch(): Promise<void> {
if (this.batchTimer) {
clearTimeout(this.batchTimer);
this.batchTimer = null;
}
const currentBatch = this.batch.splice(0, this.batchSize);
const inputs = currentBatch.map(item => item.input);
try {
const results = await this.batchTask.execute(inputs);
currentBatch.forEach((item, index) => {
item.resolve(results[index]);
});
} catch (error) {
currentBatch.forEach(item => {
item.reject(error);
});
}
}
}
Testing Strategies
Unit Testing
describe('DataValidationTask', () => {
let task: DataValidationTask;
beforeEach(() => {
task = new DataValidationTask();
});
it('should validate correct user data', async () => {
const validData = {
email: 'user@example.com',
phone: '+1-555-0123'
};
const result = await task.execute(validData);
expect(result.isValid).toBe(true);
expect(result.errors).toHaveLength(0);
});
it('should reject invalid email', async () => {
const invalidData = {
email: 'invalid-email',
phone: '+1-555-0123'
};
const result = await task.execute(invalidData);
expect(result.isValid).toBe(false);
expect(result.errors).toContain('Invalid email address');
});
});
Integration Testing
describe('Task Integration', () => {
let taskQueue: TaskQueue;
let database: TestDatabase;
beforeEach(async () => {
database = await TestDatabase.create();
taskQueue = new TaskQueue(database);
});
it('should process tasks in order', async () => {
const task1 = new TestTask('task1');
const task2 = new TestTask('task2');
await taskQueue.enqueue(task1);
await taskQueue.enqueue(task2);
await taskQueue.processAll();
const results = await database.getTaskResults();
expect(results).toEqual(['task1', 'task2']);
});
});
Best Practices
Design Principles
- Single Responsibility: Each task should have one clear purpose
- Idempotency: Tasks should be safely repeatable
- Statelessness: Avoid maintaining state between executions
- Input Validation: Always validate task inputs
Performance Guidelines
- Resource Management: Properly clean up resources after execution
- Timeout Handling: Set appropriate timeouts for all operations
- Memory Usage: Be mindful of memory consumption for large datasets
- Batch Processing: Use batching for high-volume operations
Security Considerations
- Input Sanitization: Sanitize all external inputs
- Access Control: Implement proper authorization checks
- Audit Logging: Log task executions for compliance
- Secret Management: Securely handle sensitive data
Related Components
- Workflows: Higher-level orchestration of multiple tasks
- Services: Business services that execute tasks
- Routers: Message routing for task communication
- Orchestrators: Coordination of task execution