Skip to main content

Tasks

Tasks are atomic units of work that represent discrete operations within a distributed system. They serve as the fundamental building blocks for workflows, batch processing, and background job execution. Tasks encapsulate business logic, handle execution context, and provide mechanisms for retry, monitoring, and failure recovery.

Core Responsibilities

Execution Management

  • Atomic Operations: Execute discrete, well-defined units of work
  • Context Handling: Manage execution context and input/output data
  • Resource Management: Efficiently utilize system resources during execution
  • Lifecycle Management: Handle task creation, execution, completion, and cleanup

Reliability and Resilience

  • Error Handling: Gracefully handle and report execution failures
  • Retry Logic: Implement configurable retry strategies for transient failures
  • Timeout Management: Enforce execution time limits and handle timeouts
  • State Persistence: Maintain task state for recovery and monitoring

Task Types

Synchronous Tasks

interface SyncTask<TInput, TOutput> {
id: string;
name: string;
execute(input: TInput): Promise<TOutput>;
timeout?: number;
retryPolicy?: RetryPolicy;
}

class DataValidationTask implements SyncTask<UserData, ValidationResult> {
async execute(userData: UserData): Promise<ValidationResult> {
const errors: string[] = [];

if (!userData.email || !this.isValidEmail(userData.email)) {
errors.push('Invalid email address');
}

if (!userData.phone || !this.isValidPhone(userData.phone)) {
errors.push('Invalid phone number');
}

return {
isValid: errors.length === 0,
errors
};
}
}

Asynchronous Tasks

interface AsyncTask<TInput, TOutput> {
id: string;
name: string;
execute(input: TInput): Promise<TaskHandle<TOutput>>;
status(taskId: string): Promise<TaskStatus>;
result(taskId: string): Promise<TOutput>;
}

class ImageProcessingTask implements AsyncTask<ImageRequest, ProcessedImage> {
async execute(request: ImageRequest): Promise<TaskHandle<ProcessedImage>> {
const taskId = generateId();

// Queue the processing job
await this.jobQueue.enqueue({
taskId,
operation: 'resize',
inputUrl: request.imageUrl,
width: request.targetWidth,
height: request.targetHeight
});

return { taskId, status: 'queued' };
}
}

Batch Tasks

interface BatchTask<TInput, TOutput> {
id: string;
name: string;
batchSize: number;
execute(batch: TInput[]): Promise<TOutput[]>;
partition(items: TInput[]): TInput[][];
}

class EmailNotificationBatchTask implements BatchTask<EmailRequest, EmailResult> {
batchSize = 100;

async execute(batch: EmailRequest[]): Promise<EmailResult[]> {
const results: EmailResult[] = [];

for (const emailRequest of batch) {
try {
const messageId = await this.emailService.send(emailRequest);
results.push({ success: true, messageId });
} catch (error) {
results.push({ success: false, error: error.message });
}
}

return results;
}
}

Task Execution Patterns

Queue-Based Execution

class TaskQueue {
private queue: Queue<Task>;
private workers: Worker[];

async enqueue<T>(task: Task<T>): Promise<string> {
const taskId = generateId();
await this.queue.push({
id: taskId,
task,
enqueuedAt: new Date(),
attempts: 0
});
return taskId;
}

async process(): Promise<void> {
while (true) {
const queuedTask = await this.queue.pop();
if (!queuedTask) {
await this.sleep(1000);
continue;
}

await this.executeTask(queuedTask);
}
}
}

Event-Driven Execution

class EventDrivenTaskExecutor {
constructor(private eventBus: EventBus) {
this.eventBus.on('user.created', this.handleUserCreated.bind(this));
this.eventBus.on('order.placed', this.handleOrderPlaced.bind(this));
}

async handleUserCreated(event: UserCreatedEvent): Promise<void> {
const tasks = [
new SendWelcomeEmailTask(),
new CreateUserProfileTask(),
new SetupDefaultPreferencesTask()
];

for (const task of tasks) {
await this.executeTask(task, event.userData);
}
}
}

Scheduled Execution

class ScheduledTaskRunner {
private scheduler: CronScheduler;

constructor() {
this.scheduler = new CronScheduler();
this.setupScheduledTasks();
}

private setupScheduledTasks(): void {
// Daily cleanup at 2 AM
this.scheduler.schedule('0 2 * * *', new DatabaseCleanupTask());

// Hourly health checks
this.scheduler.schedule('0 * * * *', new SystemHealthCheckTask());

// Weekly reports on Sundays
this.scheduler.schedule('0 9 * * 0', new WeeklyReportTask());
}
}

Error Handling and Retry Strategies

Retry Policies

interface RetryPolicy {
maxAttempts: number;
backoffStrategy: 'fixed' | 'exponential' | 'linear';
initialDelay: number;
maxDelay: number;
retryableErrors: string[];
}

class TaskExecutor {
async executeWithRetry<T>(
task: Task<T>,
input: any,
retryPolicy: RetryPolicy
): Promise<T> {
let attempts = 0;
let lastError: Error;

while (attempts < retryPolicy.maxAttempts) {
try {
return await task.execute(input);
} catch (error) {
lastError = error;
attempts++;

if (!this.isRetryableError(error, retryPolicy)) {
throw error;
}

if (attempts < retryPolicy.maxAttempts) {
const delay = this.calculateDelay(attempts, retryPolicy);
await this.sleep(delay);
}
}
}

throw lastError;
}
}

Circuit Breaker Pattern

class CircuitBreakerTask<TInput, TOutput> {
private circuitBreaker: CircuitBreaker;

constructor(
private wrappedTask: Task<TInput, TOutput>,
private options: CircuitBreakerOptions
) {
this.circuitBreaker = new CircuitBreaker(options);
}

async execute(input: TInput): Promise<TOutput> {
return await this.circuitBreaker.execute(() =>
this.wrappedTask.execute(input)
);
}
}

Task Monitoring and Observability

Metrics Collection

class InstrumentedTask<TInput, TOutput> implements Task<TInput, TOutput> {
constructor(
private wrappedTask: Task<TInput, TOutput>,
private metrics: MetricsCollector
) {}

async execute(input: TInput): Promise<TOutput> {
const startTime = Date.now();
const timer = this.metrics.startTimer('task.execution.duration', {
taskName: this.wrappedTask.name
});

try {
const result = await this.wrappedTask.execute(input);

this.metrics.increment('task.execution.success', {
taskName: this.wrappedTask.name
});

return result;
} catch (error) {
this.metrics.increment('task.execution.failure', {
taskName: this.wrappedTask.name,
errorType: error.constructor.name
});

throw error;
} finally {
timer.end();

this.metrics.histogram('task.execution.duration', Date.now() - startTime, {
taskName: this.wrappedTask.name
});
}
}
}

Distributed Tracing

import { trace } from '@opentelemetry/api';

class TracedTask<TInput, TOutput> implements Task<TInput, TOutput> {
async execute(input: TInput): Promise<TOutput> {
const tracer = trace.getTracer('task-executor');

return await tracer.startActiveSpan(`task.${this.name}`, async (span) => {
span.setAttributes({
'task.name': this.name,
'task.id': this.id,
'task.input.size': JSON.stringify(input).length
});

try {
const result = await this.wrappedTask.execute(input);
span.setStatus({ code: SpanStatusCode.OK });
return result;
} catch (error) {
span.recordException(error);
span.setStatus({
code: SpanStatusCode.ERROR,
message: error.message
});
throw error;
}
});
}
}

Task Composition and Chaining

Sequential Composition

class TaskChain<TInput, TOutput> {
private tasks: Task<any, any>[] = [];

addTask<TNext>(task: Task<TOutput, TNext>): TaskChain<TInput, TNext> {
return new TaskChain([...this.tasks, task]);
}

async execute(input: TInput): Promise<TOutput> {
let result = input;

for (const task of this.tasks) {
result = await task.execute(result);
}

return result as TOutput;
}
}

// Usage
const processingChain = new TaskChain<RawData, ProcessedResult>()
.addTask(new DataValidationTask())
.addTask(new DataTransformationTask())
.addTask(new DataEnrichmentTask())
.addTask(new DataPersistenceTask());

Parallel Composition

class ParallelTaskGroup<TInput, TOutput> {
constructor(private tasks: Task<TInput, any>[]) {}

async execute(input: TInput): Promise<TOutput[]> {
const promises = this.tasks.map(task => task.execute(input));
return await Promise.all(promises);
}

async executeWithFailureHandling(input: TInput): Promise<TaskResult<TOutput>[]> {
const promises = this.tasks.map(async (task) => {
try {
const result = await task.execute(input);
return { success: true, result, taskId: task.id };
} catch (error) {
return { success: false, error, taskId: task.id };
}
});

return await Promise.all(promises);
}
}

Performance Optimization

Task Pooling

class TaskPool<TInput, TOutput> {
private pool: Task<TInput, TOutput>[] = [];
private busy: Set<Task<TInput, TOutput>> = new Set();

constructor(
private taskFactory: () => Task<TInput, TOutput>,
private poolSize: number
) {
this.initializePool();
}

async execute(input: TInput): Promise<TOutput> {
const task = await this.acquireTask();

try {
return await task.execute(input);
} finally {
this.releaseTask(task);
}
}

private async acquireTask(): Promise<Task<TInput, TOutput>> {
while (this.pool.length === 0) {
await this.sleep(10);
}

const task = this.pool.pop()!;
this.busy.add(task);
return task;
}
}

Batch Processing Optimization

class BatchProcessor<TInput, TOutput> {
private batch: TInput[] = [];
private batchTimer: NodeJS.Timeout | null = null;

constructor(
private batchTask: BatchTask<TInput, TOutput>,
private batchSize: number = 100,
private batchTimeout: number = 5000
) {}

async process(input: TInput): Promise<TOutput> {
return new Promise((resolve, reject) => {
this.batch.push({ input, resolve, reject });

if (this.batch.length >= this.batchSize) {
this.processBatch();
} else if (!this.batchTimer) {
this.batchTimer = setTimeout(() => this.processBatch(), this.batchTimeout);
}
});
}

private async processBatch(): Promise<void> {
if (this.batchTimer) {
clearTimeout(this.batchTimer);
this.batchTimer = null;
}

const currentBatch = this.batch.splice(0, this.batchSize);
const inputs = currentBatch.map(item => item.input);

try {
const results = await this.batchTask.execute(inputs);

currentBatch.forEach((item, index) => {
item.resolve(results[index]);
});
} catch (error) {
currentBatch.forEach(item => {
item.reject(error);
});
}
}
}

Testing Strategies

Unit Testing

describe('DataValidationTask', () => {
let task: DataValidationTask;

beforeEach(() => {
task = new DataValidationTask();
});

it('should validate correct user data', async () => {
const validData = {
email: 'user@example.com',
phone: '+1-555-0123'
};

const result = await task.execute(validData);

expect(result.isValid).toBe(true);
expect(result.errors).toHaveLength(0);
});

it('should reject invalid email', async () => {
const invalidData = {
email: 'invalid-email',
phone: '+1-555-0123'
};

const result = await task.execute(invalidData);

expect(result.isValid).toBe(false);
expect(result.errors).toContain('Invalid email address');
});
});

Integration Testing

describe('Task Integration', () => {
let taskQueue: TaskQueue;
let database: TestDatabase;

beforeEach(async () => {
database = await TestDatabase.create();
taskQueue = new TaskQueue(database);
});

it('should process tasks in order', async () => {
const task1 = new TestTask('task1');
const task2 = new TestTask('task2');

await taskQueue.enqueue(task1);
await taskQueue.enqueue(task2);

await taskQueue.processAll();

const results = await database.getTaskResults();
expect(results).toEqual(['task1', 'task2']);
});
});

Best Practices

Design Principles

  • Single Responsibility: Each task should have one clear purpose
  • Idempotency: Tasks should be safely repeatable
  • Statelessness: Avoid maintaining state between executions
  • Input Validation: Always validate task inputs

Performance Guidelines

  • Resource Management: Properly clean up resources after execution
  • Timeout Handling: Set appropriate timeouts for all operations
  • Memory Usage: Be mindful of memory consumption for large datasets
  • Batch Processing: Use batching for high-volume operations

Security Considerations

  • Input Sanitization: Sanitize all external inputs
  • Access Control: Implement proper authorization checks
  • Audit Logging: Log task executions for compliance
  • Secret Management: Securely handle sensitive data
  • Workflows: Higher-level orchestration of multiple tasks
  • Services: Business services that execute tasks
  • Routers: Message routing for task communication
  • Orchestrators: Coordination of task execution