How to Design a Job Queue System

How to Design a Job Queue System

Profile-Image
Bright SEO Tools in saas Published: Apr 04, 2026 | Updated: Apr 04, 2026 · 2 months ago
0:00

How to Design a Job Queue System

When your application needs to process background tasks like sending emails, generating reports, or processing videos, executing these operations synchronously blocks user requests and creates poor user experience. A properly designed job queue system decouples task submission from execution, enabling async processing, retry logic, priority handling, and horizontal scaling of workers.

This guide explains how to architect job queue systems that process millions of tasks reliably without losing jobs or processing duplicates. You'll learn queue selection criteria, worker pool management, failure handling strategies, and monitoring approaches that make debugging failed jobs tractable at scale.

We cover the critical design decisions: choosing between in-memory and persistent queues, implementing exactly-once vs at-least-once delivery guarantees, designing idempotent job handlers, and handling poison messages that repeatedly fail without blocking queue progress.

Core Queue Architecture

Job queue systems consist of four main components: producers that enqueue jobs, a queue that stores jobs, workers that process jobs, and a result store that tracks job status and results.

Basic Queue Structure

// Job definition
interface Job {
    id: string;
    type: string;
    data: any;
    priority: number;
    maxRetries: number;
    attempts: number;
    createdAt: Date;
    scheduledFor?: Date;
}

// Job queue interface
interface JobQueue {
    enqueue(job: Job): Promise;
    dequeue(): Promise;
    ack(jobId: string): Promise;
    nack(jobId: string, requeue: boolean): Promise;
    delete(jobId: string): Promise;
}

// Simple in-memory queue implementation
class MemoryJobQueue implements JobQueue {
    private queue: Job[] = [];
    private processing: Map = new Map();

    async enqueue(job: Job): Promise {
        job.id = this.generateId();
        job.createdAt = new Date();
        job.attempts = 0;

        // Insert based on priority
        const insertIndex = this.queue.findIndex(j => j.priority < job.priority);

        if (insertIndex === -1) {
            this.queue.push(job);
        } else {
            this.queue.splice(insertIndex, 0, job);
        }

        return job.id;
    }

    async dequeue(): Promise {
        // Check for scheduled jobs ready to run
        const now = new Date();
        const readyIndex = this.queue.findIndex(
            j => !j.scheduledFor || j.scheduledFor <= now
        );

        if (readyIndex === -1) return null;

        const job = this.queue.splice(readyIndex, 1)[0];
        job.attempts++;

        // Mark as processing
        this.processing.set(job.id, job);

        return job;
    }

    async ack(jobId: string): Promise {
        // Remove from processing (job completed successfully)
        this.processing.delete(jobId);
    }

    async nack(jobId: string, requeue: boolean): Promise {
        const job = this.processing.get(jobId);
        if (!job) return;

        this.processing.delete(jobId);

        if (requeue && job.attempts < job.maxRetries) {
            // Re-queue with exponential backoff
            const delay = Math.pow(2, job.attempts) * 1000;
            job.scheduledFor = new Date(Date.now() + delay);
            await this.enqueue(job);
        }
    }

    async delete(jobId: string): Promise {
        this.processing.delete(jobId);
        this.queue = this.queue.filter(j => j.id !== jobId);
    }

    private generateId(): string {
        return `job_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
    }
}

Producer: Job Submission

class JobProducer {
    constructor(private queue: JobQueue) {}

    async submitJob(type: string, data: any, options = {}) {
        const job: Job = {
            id: '',
            type: type,
            data: data,
            priority: options.priority || 5,
            maxRetries: options.maxRetries || 3,
            attempts: 0,
            createdAt: new Date(),
            scheduledFor: options.scheduledFor
        };

        const jobId = await this.queue.enqueue(job);

        // Store job metadata for tracking
        await this.storeJobMetadata(jobId, job);

        return jobId;
    }

    async scheduleJob(type: string, data: any, runAt: Date) {
        return this.submitJob(type, data, {
            scheduledFor: runAt
        });
    }

    async submitBulk(jobs: Array<{type: string, data: any}>) {
        const jobIds = [];

        for (const job of jobs) {
            const jobId = await this.submitJob(job.type, job.data);
            jobIds.push(jobId);
        }

        return jobIds;
    }

    private async storeJobMetadata(jobId: string, job: Job) {
        await db.jobs.create({
            id: jobId,
            type: job.type,
            status: 'QUEUED',
            priority: job.priority,
            createdAt: job.createdAt,
            scheduledFor: job.scheduledFor
        });
    }
}

Consumer: Worker Pool

class JobWorker {
    constructor(
        private queue: JobQueue,
        private handlers: Map,
        private concurrency: number = 10
    ) {}

    async start() {
        console.log(`Starting worker with concurrency ${this.concurrency}`);

        // Start worker pool
        const workers = [];
        for (let i = 0; i < this.concurrency; i++) {
            workers.push(this.processJobs());
        }

        await Promise.all(workers);
    }

    private async processJobs() {
        while (true) {
            try {
                const job = await this.queue.dequeue();

                if (!job) {
                    // No jobs available, wait before polling again
                    await this.sleep(1000);
                    continue;
                }

                await this.executeJob(job);
            } catch (error) {
                console.error('Worker error:', error);
                await this.sleep(5000);
            }
        }
    }

    private async executeJob(job: Job) {
        console.log(`Processing job ${job.id}, attempt ${job.attempts}`);

        await this.updateJobStatus(job.id, 'PROCESSING');

        try {
            const handler = this.handlers.get(job.type);

            if (!handler) {
                throw new Error(`No handler registered for job type: ${job.type}`);
            }

            // Execute job with timeout
            const result = await this.executeWithTimeout(
                () => handler.process(job.data),
                handler.timeout || 300000 // 5 minute default timeout
            );

            // Job succeeded
            await this.queue.ack(job.id);
            await this.updateJobStatus(job.id, 'COMPLETED', result);

            console.log(`Job ${job.id} completed successfully`);

        } catch (error) {
            console.error(`Job ${job.id} failed:`, error);

            if (job.attempts >= job.maxRetries) {
                // Max retries exceeded, move to dead letter queue
                await this.queue.delete(job.id);
                await this.updateJobStatus(job.id, 'FAILED', null, error.message);
                await this.moveToDeadLetterQueue(job, error);
            } else {
                // Retry job
                await this.queue.nack(job.id, true);
                await this.updateJobStatus(job.id, 'RETRYING');
            }
        }
    }

    private async executeWithTimeout(fn: () => Promise, timeout: number) {
        return Promise.race([
            fn(),
            new Promise((_, reject) =>
                setTimeout(() => reject(new Error('Job timeout')), timeout)
            )
        ]);
    }

    private async updateJobStatus(jobId: string, status: string, result = null, error = null) {
        await db.jobs.update(jobId, {
            status: status,
            result: result,
            error: error,
            updatedAt: new Date()
        });
    }

    private async moveToDeadLetterQueue(job: Job, error: Error) {
        await db.deadLetterQueue.create({
            jobId: job.id,
            jobType: job.type,
            jobData: job.data,
            attempts: job.attempts,
            error: error.message,
            stack: error.stack,
            failedAt: new Date()
        });
    }

    private sleep(ms: number) {
        return new Promise(resolve => setTimeout(resolve, ms));
    }

    registerHandler(jobType: string, handler: JobHandler) {
        this.handlers.set(jobType, handler);
    }
}

// Job handler interface
interface JobHandler {
    process(data: any): Promise;
    timeout?: number;
}

// Example handlers
class EmailJobHandler implements JobHandler {
    timeout = 30000; // 30 seconds

    async process(data: any) {
        const { to, subject, body } = data;
        await emailService.send({ to, subject, body });
        return { sent: true, timestamp: new Date() };
    }
}

class VideoProcessingHandler implements JobHandler {
    timeout = 600000; // 10 minutes

    async process(data: any) {
        const { videoId, operations } = data;
        const result = await videoProcessor.process(videoId, operations);
        return result;
    }
}
Pro Tip: Set worker concurrency based on job characteristics, not arbitrary numbers. I/O-bound jobs (API calls, database queries) benefit from high concurrency. CPU-bound jobs (image processing, data transformations) should use concurrency equal to CPU core count.

Persistent Queue with Redis

In-memory queues lose jobs when the process crashes. Production systems need persistent queues that survive restarts and provide delivery guarantees.

Redis-Based Queue Implementation

class RedisJobQueue implements JobQueue {
    constructor(private redis: RedisClient) {}

    async enqueue(job: Job): Promise {
        job.id = this.generateId();

        // Store job data
        await this.redis.hmset(`job:${job.id}`, {
            type: job.type,
            data: JSON.stringify(job.data),
            priority: job.priority,
            maxRetries: job.maxRetries,
            attempts: 0,
            createdAt: job.createdAt.toISOString(),
            scheduledFor: job.scheduledFor?.toISOString() || ''
        });

        // Add to priority queue
        const score = job.scheduledFor
            ? job.scheduledFor.getTime()
            : Date.now() - (job.priority * 1000000); // Higher priority = lower score

        await this.redis.zadd('jobs:pending', score, job.id);

        return job.id;
    }

    async dequeue(): Promise {
        const now = Date.now();

        // Get highest priority job that is ready to run
        const jobs = await this.redis.zrangebyscore(
            'jobs:pending',
            '-inf',
            now,
            'LIMIT', 0, 1
        );

        if (jobs.length === 0) return null;

        const jobId = jobs[0];

        // Move to processing set atomically
        const script = `
            local jobId = ARGV[1]
            local removed = redis.call('ZREM', 'jobs:pending', jobId)
            if removed == 1 then
                redis.call('SADD', 'jobs:processing', jobId)
                redis.call('HSET', 'job:' .. jobId, 'processingStarted', ARGV[2])
                return 1
            end
            return 0
        `;

        const moved = await this.redis.eval(script, 0, jobId, Date.now());

        if (moved === 0) {
            // Another worker grabbed this job
            return this.dequeue();
        }

        // Load job data
        const jobData = await this.redis.hgetall(`job:${jobId}`);

        return {
            id: jobId,
            type: jobData.type,
            data: JSON.parse(jobData.data),
            priority: parseInt(jobData.priority),
            maxRetries: parseInt(jobData.maxRetries),
            attempts: parseInt(jobData.attempts),
            createdAt: new Date(jobData.createdAt),
            scheduledFor: jobData.scheduledFor ? new Date(jobData.scheduledFor) : undefined
        };
    }

    async ack(jobId: string): Promise {
        // Remove from processing, job completed
        await this.redis.srem('jobs:processing', jobId);
        await this.redis.del(`job:${jobId}`);
    }

    async nack(jobId: string, requeue: boolean): Promise {
        const jobData = await this.redis.hgetall(`job:${jobId}`);
        const attempts = parseInt(jobData.attempts) + 1;

        await this.redis.hset(`job:${jobId}`, 'attempts', attempts);
        await this.redis.srem('jobs:processing', jobId);

        if (requeue && attempts < parseInt(jobData.maxRetries)) {
            // Re-queue with exponential backoff
            const delay = Math.pow(2, attempts) * 1000;
            const scheduledFor = Date.now() + delay;

            await this.redis.zadd('jobs:pending', scheduledFor, jobId);
        } else {
            // Max retries reached, move to dead letter queue
            await this.redis.sadd('jobs:failed', jobId);
        }
    }

    async delete(jobId: string): Promise {
        await this.redis.srem('jobs:processing', jobId);
        await this.redis.zrem('jobs:pending', jobId);
        await this.redis.del(`job:${jobId}`);
    }

    // Recover jobs that workers crashed while processing
    async recoverStuckJobs(timeout: number = 300000) {
        const processingJobs = await this.redis.smembers('jobs:processing');

        for (const jobId of processingJobs) {
            const jobData = await this.redis.hgetall(`job:${jobId}`);

            if (!jobData.processingStarted) continue;

            const processingTime = Date.now() - parseInt(jobData.processingStarted);

            if (processingTime > timeout) {
                console.log(`Recovering stuck job ${jobId}`);
                await this.nack(jobId, true);
            }
        }
    }

    private generateId(): string {
        return `job_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
    }
}

// Run recovery process periodically
setInterval(async () => {
    await queue.recoverStuckJobs();
}, 60000); // Check every minute

Visibility Timeout Pattern

Some queue systems like AWS SQS use visibility timeout instead of explicit ack/nack. When a worker dequeues a job, it becomes invisible to other workers for a timeout period. If the worker doesn't delete the job before timeout expires, it becomes visible again.

class VisibilityTimeoutQueue {
    constructor(private redis: RedisClient, private visibilityTimeout: number = 300000) {}

    async dequeue(): Promise {
        const now = Date.now();

        // Get jobs ready to run
        const jobs = await this.redis.zrangebyscore(
            'jobs:queue',
            '-inf',
            now,
            'LIMIT', 0, 1
        );

        if (jobs.length === 0) return null;

        const jobId = jobs[0];

        // Set visibility timeout
        const visibleAt = now + this.visibilityTimeout;
        await this.redis.zadd('jobs:queue', visibleAt, jobId);

        // Load and return job
        const jobData = await this.redis.hgetall(`job:${jobId}`);

        return this.deserializeJob(jobId, jobData);
    }

    async deleteJob(jobId: string): Promise {
        // Job completed successfully
        await this.redis.zrem('jobs:queue', jobId);
        await this.redis.del(`job:${jobId}`);
    }

    async changeVisibility(jobId: string, newTimeout: number): Promise {
        // Extend visibility timeout for long-running jobs
        const visibleAt = Date.now() + newTimeout;
        await this.redis.zadd('jobs:queue', visibleAt, jobId);
    }
}

Priority Queues and Job Scheduling

Not all jobs are equally urgent. Email confirmations should process before daily digest emails. Scheduled jobs should wait until their scheduled time.

Multi-Priority Queue Implementation

class PriorityQueueManager {
    private queues: Map;

    constructor() {
        this.queues = new Map([
            [10, new RedisJobQueue(redis)], // Critical
            [5, new RedisJobQueue(redis)],  // High
            [3, new RedisJobQueue(redis)],  // Normal
            [1, new RedisJobQueue(redis)]   // Low
        ]);
    }

    async enqueue(job: Job): Promise {
        const queue = this.queues.get(job.priority) || this.queues.get(3);
        return await queue.enqueue(job);
    }

    async dequeue(): Promise {
        // Try queues in priority order
        const priorities = Array.from(this.queues.keys()).sort((a, b) => b - a);

        for (const priority of priorities) {
            const queue = this.queues.get(priority);
            const job = await queue.dequeue();

            if (job) return job;
        }

        return null;
    }
}

// Worker that respects priorities
class PriorityAwareWorker extends JobWorker {
    async processJobs() {
        while (true) {
            // Always check highest priority first
            const job = await this.queue.dequeue();

            if (!job) {
                await this.sleep(1000);
                continue;
            }

            await this.executeJob(job);
        }
    }
}

Scheduled Job Processing

class ScheduledJobProcessor {
    constructor(private queue: JobQueue) {}

    // Schedule a job to run at specific time
    async scheduleJob(job: Job, runAt: Date) {
        job.scheduledFor = runAt;
        return await this.queue.enqueue(job);
    }

    // Schedule recurring job
    async scheduleRecurring(
        jobType: string,
        data: any,
        cronExpression: string
    ) {
        const jobId = `recurring_${jobType}_${Date.now()}`;

        await db.recurringJobs.create({
            id: jobId,
            type: jobType,
            data: data,
            cronExpression: cronExpression,
            nextRun: this.calculateNextRun(cronExpression),
            enabled: true
        });

        return jobId;
    }

    // Process recurring jobs
    async processRecurringJobs() {
        const now = new Date();

        const dueJobs = await db.recurringJobs.find({
            enabled: true,
            nextRun: { $lte: now }
        });

        for (const recurringJob of dueJobs) {
            // Create job instance
            await this.queue.enqueue({
                id: '',
                type: recurringJob.type,
                data: recurringJob.data,
                priority: 5,
                maxRetries: 3,
                attempts: 0,
                createdAt: new Date()
            });

            // Update next run time
            await db.recurringJobs.update(recurringJob.id, {
                nextRun: this.calculateNextRun(recurringJob.cronExpression),
                lastRun: now
            });
        }
    }

    private calculateNextRun(cronExpression: string): Date {
        // Use cron parser library
        const parser = require('cron-parser');
        const interval = parser.parseExpression(cronExpression);
        return interval.next().toDate();
    }
}

// Run recurring job processor every minute
setInterval(async () => {
    await scheduledProcessor.processRecurringJobs();
}, 60000);

Handling Job Failures

Jobs fail for many reasons: network issues, external service outages, bugs in job handlers, or invalid job data. Proper failure handling prevents lost work and enables debugging.

Retry Strategies

class RetryStrategy {
    static exponentialBackoff(attempt: number, baseDelay = 1000, maxDelay = 300000) {
        const delay = Math.min(baseDelay * Math.pow(2, attempt), maxDelay);
        // Add jitter to prevent thundering herd
        const jitter = Math.random() * 0.3 * delay;
        return delay + jitter;
    }

    static linearBackoff(attempt: number, delay = 5000) {
        return delay * attempt;
    }

    static fixedDelay(delay = 5000) {
        return delay;
    }
}

class RetryableJobWorker extends JobWorker {
    private async executeJob(job: Job) {
        try {
            const handler = this.handlers.get(job.type);
            const result = await handler.process(job.data);

            await this.queue.ack(job.id);
            await this.updateJobStatus(job.id, 'COMPLETED', result);

        } catch (error) {
            const shouldRetry = this.isRetryableError(error);

            if (shouldRetry && job.attempts < job.maxRetries) {
                const delay = RetryStrategy.exponentialBackoff(job.attempts);
                console.log(`Retrying job ${job.id} after ${delay}ms`);

                job.scheduledFor = new Date(Date.now() + delay);
                await this.queue.nack(job.id, true);

            } else {
                console.error(`Job ${job.id} failed permanently:`, error);
                await this.handlePermanentFailure(job, error);
            }
        }
    }

    private isRetryableError(error: Error): boolean {
        // Network errors and timeouts are retryable
        if (error.message.includes('ECONNREFUSED')) return true;
        if (error.message.includes('ETIMEDOUT')) return true;
        if (error.message.includes('timeout')) return true;

        // 5xx server errors are retryable
        if (error['statusCode'] >= 500) return true;

        // 4xx client errors are not retryable
        if (error['statusCode'] >= 400 && error['statusCode'] < 500) return false;

        // Default to not retrying unknown errors
        return false;
    }

    private async handlePermanentFailure(job: Job, error: Error) {
        await this.queue.delete(job.id);
        await this.moveToDeadLetterQueue(job, error);

        // Notify monitoring system
        await this.alertOnJobFailure(job, error);
    }
}

Dead Letter Queue

Jobs that fail after all retries move to a dead letter queue for manual investigation. This prevents poison messages from blocking queue progress.

class DeadLetterQueueManager {
    async moveToDeadLetter(job: Job, error: Error) {
        await db.deadLetterQueue.create({
            jobId: job.id,
            jobType: job.type,
            jobData: job.data,
            originalPriority: job.priority,
            attempts: job.attempts,
            error: error.message,
            stack: error.stack,
            createdAt: job.createdAt,
            failedAt: new Date()
        });

        // Keep original job data for investigation
        await db.jobs.update(job.id, {
            status: 'DEAD_LETTER',
            error: error.message
        });
    }

    async getFailedJobs(filters = {}) {
        return await db.deadLetterQueue.find(filters);
    }

    async retryJob(deadLetterJobId: string) {
        const dlJob = await db.deadLetterQueue.findOne({ id: deadLetterJobId });

        if (!dlJob) {
            throw new Error('Dead letter job not found');
        }

        // Re-submit job with fresh attempts count
        const job: Job = {
            id: '',
            type: dlJob.jobType,
            data: dlJob.jobData,
            priority: dlJob.originalPriority,
            maxRetries: 3,
            attempts: 0,
            createdAt: new Date()
        };

        const jobId = await this.queue.enqueue(job);

        // Mark dead letter job as retried
        await db.deadLetterQueue.update(deadLetterJobId, {
            retried: true,
            retriedAt: new Date(),
            retriedJobId: jobId
        });

        return jobId;
    }

    async deleteJob(deadLetterJobId: string) {
        // Permanently delete failed job after investigation
        await db.deadLetterQueue.delete({ id: deadLetterJobId });
    }
}
Warning: Dead letter queues grow unbounded if not managed. Implement alerting when DLQ size exceeds threshold and periodic cleanup of old entries after investigation.

Idempotency and Deduplication

Queue systems often provide at-least-once delivery, meaning jobs might process multiple times. Job handlers must be idempotent to handle duplicate processing safely.

Idempotent Job Handlers

class IdempotentJobHandler implements JobHandler {
    async process(data: any) {
        const { userId, orderId, amount } = data;

        // Use idempotency key to prevent duplicate processing
        const idempotencyKey = `charge:${orderId}`;

        // Check if already processed
        const existing = await db.processedJobs.findOne({
            idempotencyKey: idempotencyKey
        });

        if (existing) {
            console.log(`Job already processed: ${idempotencyKey}`);
            return existing.result;
        }

        // Process job
        const result = await paymentService.charge(userId, amount, orderId);

        // Store result with idempotency key
        await db.processedJobs.create({
            idempotencyKey: idempotencyKey,
            result: result,
            processedAt: new Date()
        });

        return result;
    }
}

// Database-backed idempotency
class DatabaseIdempotencyHandler {
    async executeIdempotent(idempotencyKey: string, fn: () => Promise) {
        // Try to create idempotency record
        try {
            await db.idempotency.create({
                key: idempotencyKey,
                status: 'PROCESSING',
                createdAt: new Date()
            });
        } catch (error) {
            if (error.code === 'DUPLICATE_KEY') {
                // Already processing or processed
                const record = await db.idempotency.findOne({ key: idempotencyKey });

                if (record.status === 'COMPLETED') {
                    return record.result;
                }

                // Wait for in-progress operation
                return await this.waitForCompletion(idempotencyKey);
            }
            throw error;
        }

        try {
            // Execute operation
            const result = await fn();

            // Mark as completed
            await db.idempotency.update(
                { key: idempotencyKey },
                {
                    status: 'COMPLETED',
                    result: result,
                    completedAt: new Date()
                }
            );

            return result;
        } catch (error) {
            // Mark as failed
            await db.idempotency.update(
                { key: idempotencyKey },
                {
                    status: 'FAILED',
                    error: error.message,
                    failedAt: new Date()
                }
            );
            throw error;
        }
    }

    private async waitForCompletion(idempotencyKey: string, maxWait = 30000) {
        const startTime = Date.now();

        while (Date.now() - startTime < maxWait) {
            const record = await db.idempotency.findOne({ key: idempotencyKey });

            if (record.status === 'COMPLETED') {
                return record.result;
            }

            if (record.status === 'FAILED') {
                throw new Error('Idempotent operation failed');
            }

            await this.sleep(1000);
        }

        throw new Error('Timeout waiting for idempotent operation');
    }
}

Job Deduplication

class DeduplicatingQueue extends RedisJobQueue {
    async enqueue(job: Job): Promise {
        // Generate deduplication key from job type and data
        const dedupKey = this.generateDedupKey(job);

        // Check if job with same key already queued
        const existing = await this.redis.get(`dedup:${dedupKey}`);

        if (existing) {
            console.log(`Duplicate job detected: ${dedupKey}`);
            return existing; // Return existing job ID
        }

        // Enqueue job
        const jobId = await super.enqueue(job);

        // Set deduplication key with expiry
        await this.redis.setex(`dedup:${dedupKey}`, 3600, jobId);

        return jobId;
    }

    private generateDedupKey(job: Job): string {
        // Create hash of job type and data
        const content = JSON.stringify({
            type: job.type,
            data: job.data
        });

        return crypto.createHash('sha256').update(content).digest('hex');
    }
}

Monitoring and Observability

Queue systems need comprehensive monitoring to detect problems before they impact users.

Key Metrics

class JobQueueMetrics {
    private metrics: MetricsClient;

    async recordJobEnqueued(jobType: string, priority: number) {
        await this.metrics.increment('jobs.enqueued', {
            type: jobType,
            priority: priority
        });

        await this.metrics.gauge('jobs.queue.depth',
            await this.getQueueDepth()
        );
    }

    async recordJobCompleted(jobType: string, duration: number) {
        await this.metrics.increment('jobs.completed', {
            type: jobType
        });

        await this.metrics.histogram('jobs.duration', duration, {
            type: jobType
        });
    }

    async recordJobFailed(jobType: string, error: string) {
        await this.metrics.increment('jobs.failed', {
            type: jobType,
            error: error
        });
    }

    async recordJobRetried(jobType: string, attempt: number) {
        await this.metrics.increment('jobs.retried', {
            type: jobType,
            attempt: attempt
        });
    }

    async getQueueDepth(): Promise {
        return await this.redis.zcard('jobs:pending');
    }

    async getProcessingCount(): Promise {
        return await this.redis.scard('jobs:processing');
    }

    async getAgeOfOldestJob(): Promise {
        const oldest = await this.redis.zrange('jobs:pending', 0, 0, 'WITHSCORES');
        if (oldest.length === 0) return 0;

        const timestamp = parseInt(oldest[1]);
        return Date.now() - timestamp;
    }
}

// Alerting thresholds
class JobQueueAlerting {
    async checkHealth() {
        const queueDepth = await metrics.getQueueDepth();
        const oldestJobAge = await metrics.getAgeOfOldestJob();
        const processingCount = await metrics.getProcessingCount();

        // Alert if queue depth exceeds threshold
        if (queueDepth > 10000) {
            await this.alert('Queue depth exceeds 10,000', {
                depth: queueDepth,
                severity: 'warning'
            });
        }

        // Alert if jobs are stuck
        if (oldestJobAge > 3600000) { // 1 hour
            await this.alert('Jobs stuck in queue for over 1 hour', {
                age: oldestJobAge,
                severity: 'critical'
            });
        }

        // Alert if no workers processing
        if (queueDepth > 0 && processingCount === 0) {
            await this.alert('No workers processing jobs', {
                severity: 'critical'
            });
        }
    }
}

Frequently Asked Questions

Should you use Redis or a dedicated message queue like RabbitMQ?

Redis works well for simple job queues with moderate throughput and persistence requirements. Use dedicated message queues like RabbitMQ or AWS SQS when you need complex routing, guaranteed message delivery, or when queue size exceeds Redis memory capacity. Redis is simpler to operate and sufficient for most applications. Message queues provide better durability guarantees but add operational complexity.

How do you prevent workers from processing the same job twice?

Use atomic operations to transition jobs from pending to processing state. Redis Lua scripts ensure only one worker can claim a job. Implement idempotent job handlers that check for duplicate processing before executing operations. Store job results with idempotency keys so repeated execution returns cached results instead of re-executing.

What happens to in-flight jobs when workers crash?

Jobs being processed when workers crash become stuck in processing state. Implement visibility timeout or heartbeat mechanisms where stuck jobs automatically return to the queue after timeout. Run periodic recovery processes that identify jobs stuck in processing state and re-queue them. Set appropriate timeouts based on expected job duration.

How do you handle jobs that consistently fail?

Move consistently failing jobs to a dead letter queue after max retries. Alert on DLQ growth to investigate issues. Implement job-specific error handling that distinguishes between retryable errors and permanent failures. For permanent failures, fail fast without retries. Provide tooling to replay DLQ jobs after fixing underlying issues.

Should workers poll for jobs or use push notifications?

Polling is simpler and works with any queue implementation. Workers periodically check for jobs, introducing latency equal to poll interval. Push notifications reduce latency but require queue support and connection management. Use polling for batch processing where latency doesn't matter. Use push for real-time processing where sub-second latency matters. Most systems use polling with 1-5 second intervals as a good balance.

How do you scale job processing horizontally?

Add more worker instances to increase throughput. Each worker competes for jobs from shared queue, automatically distributing work. Partition jobs by type or key for parallel processing if ordering matters. Monitor queue depth and processing rate to determine when to scale. Auto-scale workers based on queue depth metrics to handle traffic bursts.

How do you handle job dependencies where Job B depends on Job A?

Store dependency information with jobs and only make dependent jobs visible after dependencies complete. Use workflow engines like Temporal or Step Functions for complex dependency graphs. For simple cases, have Job A enqueue Job B after completing successfully. Avoid complex dependencies in queue systems since they're designed for independent job processing.

What's the best way to test job handlers?

Test job handlers as pure functions independent of queue infrastructure. Mock external dependencies like databases and APIs. Test idempotency by calling handlers multiple times with same input. Test error handling by injecting failures. For integration tests, use in-memory queues or test queues that don't affect production. Verify retry logic with controlled failures.

Conclusion

Job queue systems decouple task submission from execution, enabling asynchronous processing, automatic retries, and horizontal scaling. Effective implementations require persistent queues for durability, priority handling for urgent jobs, comprehensive retry strategies for transient failures, and dead letter queues for poison messages.

The critical design decisions are choosing appropriate queue backends based on durability and throughput requirements, implementing idempotent job handlers to handle at-least-once delivery, designing retry strategies that distinguish transient from permanent failures, and providing monitoring that makes debugging failed jobs tractable at scale.

Production job queues balance reliability with complexity. Start simple with Redis-based queues and add sophistication as requirements demand. Monitor queue depth, processing rates, and failure rates continuously. Build tooling for investigating dead letter queues and replaying failed jobs after fixes deploy.


Share on Social Media: