Queues
Installing bull
First, install bull and its related dependencies:
npm install --save bullCreating a Queue Service
Next, create a queue service to manage adding and processing tasks.
Creating the Queue Service
// tasks/queue.service.ts
import { Injectable } from '@nestjs/common';
import { Queue } from 'bull';
@Injectable()
export class QueueService {
private queue: Queue;
constructor() {
this.queue = new Queue('exampleQueue', { redis: { host: 'localhost' } });
}
async addJob(data: any) {
await this.queue.add(data);
}
startProcessing() {
this.queue.process(async (job, done) => {
try {
console.log('Processing job', job.data);
// Execute task logic
done();
} catch (error) {
console.error('Error processing job:', error);
done(error); // Mark task as failed
}
});
// Listen for failed events
this.queue.on('failed', (job, err) => {
console.error(`Job ${job.id} failed: ${err.message}`);
});
}
}Creating the Queue Module
Now, create a module to include the queue service:
// tasks/tasks.module.ts
import { Module } from '@nestjs/common';
import { QueueService } from './queue.service';
@Module({
providers: [QueueService],
exports: [QueueService],
})
export class TasksModule {}Registering the Queue Service in the Main Module
Register the queue service in the main module (typically AppModule) and start queue processing.
Registering the Queue Service
// app.module.ts
import { Module } from '@nestjs/common';
import { AppController } from './app.controller';
import { AppService } from './app.service';
import { TasksModule } from './tasks/tasks.module';
@Module({
imports: [TasksModule],
controllers: [AppController],
providers: [AppService],
})
export class AppModule {
constructor(private readonly queueService: QueueService) {
this.queueService.startProcessing(); // Start task processing in the constructor
}
}Adding Tasks
You can now add tasks to the queue from a controller or other services:
// app.controller.ts
import { Controller, Get } from '@nestjs/common';
import { QueueService } from './tasks/queue.service';
@Controller()
export class AppController {
constructor(private readonly queueService: QueueService) {}
@Get('/add-job')
addJobToQueue() {
this.queueService.addJob({ message: 'Hello from the queue!' });
return 'Job added to the queue!';
}
}Running the NestJS Application
Ensure your Redis service is running, then start the NestJS application:
npm run start:devTesting
You can add tasks to the queue by accessing the /add-job endpoint. For example, use Postman or a browser to visit http://localhost:3000/add-job.
Handling Task Failures
In the queue.service.ts file, you can add handling for task failures:
this.queue.on('failed', (job, err) => {
console.error(`Job ${job.id} failed: ${err.message}`);
});Task Retries
You can configure a retry strategy for tasks:
await this.queue.add(data, {
attempts: 3, // Retry 3 times
backoff: {
type: 'exponential', // Exponential backoff
delay: 1000, // Initial delay
},
});Monitoring and Visualization
To better monitor the queue’s state, you can use bull-board or bullmq visualization tools. Here, we use bull-board:
npm install --save bull-boardConfiguring bull-board
Add the following code in main.ts or bootstrap.ts to configure bull-board:
import { NestFactory } from '@nestjs/core';
import { BullAdapter } from '@bull-board/api';
import { BullMQAdapter } from '@bull-board/api/bullMQAdapter';
import { Queue } from 'bull';
import { QueueService } from './tasks/queue.service';
import { AppModule } from './app.module';
async function bootstrap() {
const app = await NestFactory.create(AppModule);
const queueService = app.get(QueueService);
const queue = queueService.queue;
const serverAdapter = new BullAdapter(new BullMQAdapter(queue));
app.use('/admin/queues', serverAdapter.getRouter());
await app.listen(3000);
}
bootstrap();You can now visit http://localhost:3000/admin/queues to view the queue’s state.
Task Priority
When handling different types of tasks, some may be more important than others. You can set task priorities with bull:
// tasks/queue.service.ts
await this.queue.add(data, {
priority: 10, // Higher number indicates higher priority, default is 0
});Delayed Tasks
To execute a task at a future time, you can set a delay:
await this.queue.add(data, {
delay: 60000, // Delay execution by 60 seconds
});Message Acknowledgment
Ensuring tasks are processed correctly is crucial. bull provides a message acknowledgment mechanism. You can call job.finished() or done() in the processing function to manually confirm task completion, or it will be confirmed automatically upon successful processing.
Cluster Mode
In high-load scenarios, a single process may not suffice for handling a large number of tasks. bull supports cluster mode, allowing multiple processes to handle tasks concurrently, improving throughput and availability.
const numWorkers = require('os').cpus().length;
for (let i = 0; i < numWorkers; i++) {
const worker = new Worker(__filename);
}In each worker process, initialize the queue and start processing tasks.
Error Handling
Beyond basic error catching, you can use bull events for more granular error handling and monitoring, such as listening for stalled events for tasks that take too long or completed events to track successful task execution.
this.queue.on('stalled', (job) => {
console.log(`Job ${job.id} is stalled.`);
});
this.queue.on('completed', (job, result) => {
console.log(`Job ${job.id} completed with result ${result}.`);
});Using Middleware
bull supports middleware to perform actions before and after task processing, such as logging, performance monitoring, or permission checks.
this.queue.on('beforeProcess', (job, jobPromise) => {
console.log(`Before processing job ${job.id}`);
return jobPromise.then((result) => {
console.log(`After processing job ${job.id}`);
return result;
});
});Integrating External Services
In real-world applications, queues may need to interact with external services like databases or message queues. Ensure data consistency and reliability using transactions or idempotent designs.
Monitoring with bull-board
As mentioned earlier, bull-board is a powerful visualization tool that allows you to monitor queue states, including task counts, processing speed, and failed tasks. Enable monitoring in production to detect and resolve issues promptly.
Performance Optimization
- Batch Processing: For certain tasks, use
bulkWriteorprocessJobsto process tasks in batches, reducing database operations. - Concurrency Limits: Set appropriate concurrency limits based on server resources to avoid performance degradation due to resource overuse.
- Message Compression: If task data is large, consider compressing messages to reduce network overhead.
Using BullMQ for Queues
Installing Dependencies
First, install the @nestjs/bullmq and bullmq packages:
npm install @nestjs/bullmq bullmqConfiguring the Queue
Create a new module to configure the queue.
Creating the Queue Module:
nest generate module queueDefine the queue in queue.module.ts:
import { Module } from '@nestjs/common';
import { BullMQModule } from '@nestjs/bullmq';
@Module({
imports: [
BullMQModule.registerQueue({
name: 'example-queue',
// Configure Redis connection details here
redis: {
host: 'localhost',
port: 6379,
},
}),
],
})
export class QueueModule {}Producer
The producer is responsible for adding tasks to the queue.
Creating the Service:
nest generate service example-producerAdd tasks in example-producer.service.ts:
import { Injectable } from '@nestjs/common';
import { InjectQueue } from '@nestjs/bullmq';
import { Queue } from 'bullmq';
@Injectable()
export class ExampleProducerService {
constructor(@InjectQueue('example-queue') private queue: Queue) {}
async addJob(data: any, jobId?: string) {
const options = jobId ? { jobId } : {};
await this.queue.add('job-name', data, options);
}
}Job Options
You can specify job options when adding tasks, such as delayed execution:
async addJob(data: any, delay?: number, jobId?: string) {
const options = {
delay,
jobId: jobId || undefined,
};
await this.queue.add('job-name', data, options);
}Consumer
The consumer processes tasks in the queue.
Creating the Consumer Controller:
nest generate controller example-consumerHandle tasks in example-consumer.controller.ts:
import { Controller, Get } from '@nestjs/common';
import { Process, Processor } from '@nestjs/bullmq';
import { Job } from 'bullmq';
@Processor('example-queue')
@Controller()
export class ExampleConsumerController {
@Process('job-name')
async handleJob(job: Job<any>) {
console.log('Handling job:', job.data);
}
}Request-Scoped Consumers
For consumers that handle different tasks per request, use request-scoped processing:
import { OnModuleInit } from '@nestjs/common';
import { Process, Processor } from '@nestjs/bullmq';
@Processor('example-queue')
export class ExampleConsumerController implements OnModuleInit {
onModuleInit() {
this.queue.process('job-name', async (job) => {
// Process task
});
}
constructor(private readonly queue: Queue) {}
}Event Listeners
Listen to queue events, such as task completion or failure:
constructor(private readonly queue: Queue) {
this.queue.on('completed', (job) => {
console.log(`Job ${job.id} completed`);
});
}Queue Management
Manage the queue through the queue instance, such as retrieving queue status:
async getQueueStatus(): Promise<string> {
return await this.queue.getQueueState();
}Separate Processes
To improve performance and reliability, deploy consumers as separate processes.
Using PM2 to Manage Processes:
pm2 start dist/example-consumer.js --name "example-consumer"Asynchronous Configuration
For dynamic queue parameter configuration, use asynchronous configuration in queue.module.ts:
BullMQModule.registerQueueAsync({
useFactory: () => ({
name: process.env.QUEUE_NAME,
redis: {
host: process.env.REDIS_HOST,
port: parseInt(process.env.REDIS_PORT),
},
}),
})



