Lesson 19-NestJS Queues

Queues

Installing bull

First, install bull and its related dependencies:

npm install --save bull

Creating a Queue Service

Next, create a queue service to manage adding and processing tasks.

Creating the Queue Service

// tasks/queue.service.ts
import { Injectable } from '@nestjs/common';
import { Queue } from 'bull';

@Injectable()
export class QueueService {
  private queue: Queue;

  constructor() {
    this.queue = new Queue('exampleQueue', { redis: { host: 'localhost' } });
  }

  async addJob(data: any) {
    await this.queue.add(data);
  }

  startProcessing() {
    this.queue.process(async (job, done) => {
      try {
        console.log('Processing job', job.data);
        // Execute task logic
        done();
      } catch (error) {
        console.error('Error processing job:', error);
        done(error); // Mark task as failed
      }
    });

    // Listen for failed events
    this.queue.on('failed', (job, err) => {
      console.error(`Job ${job.id} failed: ${err.message}`);
    });
  }
}

Creating the Queue Module

Now, create a module to include the queue service:

// tasks/tasks.module.ts
import { Module } from '@nestjs/common';
import { QueueService } from './queue.service';

@Module({
  providers: [QueueService],
  exports: [QueueService],
})
export class TasksModule {}

Registering the Queue Service in the Main Module

Register the queue service in the main module (typically AppModule) and start queue processing.

Registering the Queue Service

// app.module.ts
import { Module } from '@nestjs/common';
import { AppController } from './app.controller';
import { AppService } from './app.service';
import { TasksModule } from './tasks/tasks.module';

@Module({
  imports: [TasksModule],
  controllers: [AppController],
  providers: [AppService],
})
export class AppModule {
  constructor(private readonly queueService: QueueService) {
    this.queueService.startProcessing(); // Start task processing in the constructor
  }
}

Adding Tasks

You can now add tasks to the queue from a controller or other services:

// app.controller.ts
import { Controller, Get } from '@nestjs/common';
import { QueueService } from './tasks/queue.service';

@Controller()
export class AppController {
  constructor(private readonly queueService: QueueService) {}

  @Get('/add-job')
  addJobToQueue() {
    this.queueService.addJob({ message: 'Hello from the queue!' });
    return 'Job added to the queue!';
  }
}

Running the NestJS Application

Ensure your Redis service is running, then start the NestJS application:

npm run start:dev

Testing

You can add tasks to the queue by accessing the /add-job endpoint. For example, use Postman or a browser to visit http://localhost:3000/add-job.

Handling Task Failures

In the queue.service.ts file, you can add handling for task failures:

this.queue.on('failed', (job, err) => {
  console.error(`Job ${job.id} failed: ${err.message}`);
});

Task Retries

You can configure a retry strategy for tasks:

await this.queue.add(data, {
  attempts: 3, // Retry 3 times
  backoff: {
    type: 'exponential', // Exponential backoff
    delay: 1000, // Initial delay
  },
});

Monitoring and Visualization

To better monitor the queue’s state, you can use bull-board or bullmq visualization tools. Here, we use bull-board:

npm install --save bull-board

Configuring bull-board

Add the following code in main.ts or bootstrap.ts to configure bull-board:

import { NestFactory } from '@nestjs/core';
import { BullAdapter } from '@bull-board/api';
import { BullMQAdapter } from '@bull-board/api/bullMQAdapter';
import { Queue } from 'bull';
import { QueueService } from './tasks/queue.service';
import { AppModule } from './app.module';

async function bootstrap() {
  const app = await NestFactory.create(AppModule);

  const queueService = app.get(QueueService);
  const queue = queueService.queue;
  const serverAdapter = new BullAdapter(new BullMQAdapter(queue));

  app.use('/admin/queues', serverAdapter.getRouter());

  await app.listen(3000);
}

bootstrap();

You can now visit http://localhost:3000/admin/queues to view the queue’s state.

Task Priority

When handling different types of tasks, some may be more important than others. You can set task priorities with bull:

// tasks/queue.service.ts
await this.queue.add(data, {
  priority: 10, // Higher number indicates higher priority, default is 0
});

Delayed Tasks

To execute a task at a future time, you can set a delay:

await this.queue.add(data, {
  delay: 60000, // Delay execution by 60 seconds
});

Message Acknowledgment

Ensuring tasks are processed correctly is crucial. bull provides a message acknowledgment mechanism. You can call job.finished() or done() in the processing function to manually confirm task completion, or it will be confirmed automatically upon successful processing.

Cluster Mode

In high-load scenarios, a single process may not suffice for handling a large number of tasks. bull supports cluster mode, allowing multiple processes to handle tasks concurrently, improving throughput and availability.

const numWorkers = require('os').cpus().length;

for (let i = 0; i < numWorkers; i++) {
  const worker = new Worker(__filename);
}

In each worker process, initialize the queue and start processing tasks.

Error Handling

Beyond basic error catching, you can use bull events for more granular error handling and monitoring, such as listening for stalled events for tasks that take too long or completed events to track successful task execution.

this.queue.on('stalled', (job) => {
  console.log(`Job ${job.id} is stalled.`);
});

this.queue.on('completed', (job, result) => {
  console.log(`Job ${job.id} completed with result ${result}.`);
});

Using Middleware

bull supports middleware to perform actions before and after task processing, such as logging, performance monitoring, or permission checks.

this.queue.on('beforeProcess', (job, jobPromise) => {
  console.log(`Before processing job ${job.id}`);
  return jobPromise.then((result) => {
    console.log(`After processing job ${job.id}`);
    return result;
  });
});

Integrating External Services

In real-world applications, queues may need to interact with external services like databases or message queues. Ensure data consistency and reliability using transactions or idempotent designs.

Monitoring with bull-board

As mentioned earlier, bull-board is a powerful visualization tool that allows you to monitor queue states, including task counts, processing speed, and failed tasks. Enable monitoring in production to detect and resolve issues promptly.

Performance Optimization

  • Batch Processing: For certain tasks, use bulkWrite or processJobs to process tasks in batches, reducing database operations.
  • Concurrency Limits: Set appropriate concurrency limits based on server resources to avoid performance degradation due to resource overuse.
  • Message Compression: If task data is large, consider compressing messages to reduce network overhead.

Using BullMQ for Queues

Installing Dependencies

First, install the @nestjs/bullmq and bullmq packages:

npm install @nestjs/bullmq bullmq

Configuring the Queue

Create a new module to configure the queue.

Creating the Queue Module:

nest generate module queue

Define the queue in queue.module.ts:

import { Module } from '@nestjs/common';
import { BullMQModule } from '@nestjs/bullmq';

@Module({
  imports: [
    BullMQModule.registerQueue({
      name: 'example-queue',
      // Configure Redis connection details here
      redis: {
        host: 'localhost',
        port: 6379,
      },
    }),
  ],
})
export class QueueModule {}

Producer

The producer is responsible for adding tasks to the queue.

Creating the Service:

nest generate service example-producer

Add tasks in example-producer.service.ts:

import { Injectable } from '@nestjs/common';
import { InjectQueue } from '@nestjs/bullmq';
import { Queue } from 'bullmq';

@Injectable()
export class ExampleProducerService {
  constructor(@InjectQueue('example-queue') private queue: Queue) {}

  async addJob(data: any, jobId?: string) {
    const options = jobId ? { jobId } : {};
    await this.queue.add('job-name', data, options);
  }
}

Job Options

You can specify job options when adding tasks, such as delayed execution:

async addJob(data: any, delay?: number, jobId?: string) {
  const options = {
    delay,
    jobId: jobId || undefined,
  };
  await this.queue.add('job-name', data, options);
}

Consumer

The consumer processes tasks in the queue.

Creating the Consumer Controller:

nest generate controller example-consumer

Handle tasks in example-consumer.controller.ts:

import { Controller, Get } from '@nestjs/common';
import { Process, Processor } from '@nestjs/bullmq';
import { Job } from 'bullmq';

@Processor('example-queue')
@Controller()
export class ExampleConsumerController {
  @Process('job-name')
  async handleJob(job: Job<any>) {
    console.log('Handling job:', job.data);
  }
}

Request-Scoped Consumers

For consumers that handle different tasks per request, use request-scoped processing:

import { OnModuleInit } from '@nestjs/common';
import { Process, Processor } from '@nestjs/bullmq';

@Processor('example-queue')
export class ExampleConsumerController implements OnModuleInit {
  onModuleInit() {
    this.queue.process('job-name', async (job) => {
      // Process task
    });
  }

  constructor(private readonly queue: Queue) {}
}

Event Listeners

Listen to queue events, such as task completion or failure:

constructor(private readonly queue: Queue) {
  this.queue.on('completed', (job) => {
    console.log(`Job ${job.id} completed`);
  });
}

Queue Management

Manage the queue through the queue instance, such as retrieving queue status:

async getQueueStatus(): Promise<string> {
  return await this.queue.getQueueState();
}

Separate Processes

To improve performance and reliability, deploy consumers as separate processes.

Using PM2 to Manage Processes:

pm2 start dist/example-consumer.js --name "example-consumer"

Asynchronous Configuration

For dynamic queue parameter configuration, use asynchronous configuration in queue.module.ts:

BullMQModule.registerQueueAsync({
  useFactory: () => ({
    name: process.env.QUEUE_NAME,
    redis: {
      host: process.env.REDIS_HOST,
      port: parseInt(process.env.REDIS_PORT),
    },
  }),
})
Share your love