Lesson 18-NestJS Task Scheduling

Task scheduling in NestJS is a common requirement, especially in background services where tasks like clearing caches, sending reminder emails, or updating statistics need to be executed periodically. NestJS does not provide built-in task scheduling functionality, but you can leverage third-party libraries to achieve this.

Popular third-party task scheduling libraries include node-cron and bull. node-cron is a cron expression-based task scheduler, while bull is a Redis-based queue system that supports delayed and periodic tasks.

Using node-cron

Installing node-cron
First, install node-cron:

npm install --save node-cron

Creating a Task Scheduler
Create a service in your NestJS project to manage task scheduling:

// tasks/cron.service.ts
import { Injectable } from '@nestjs/common';
import cron from 'node-cron';

@Injectable()
export class CronService {
  scheduleTask() {
    cron.schedule('*/5 * * * *', () => {
      console.log('Running a job at', new Date().toISOString());
    });
  }
}

In this example, the cron expression */5 * * * * schedules a task to run every 5 minutes.

Registering the Task Scheduler

Register and start the task scheduler in your module:

// tasks/tasks.module.ts
import { Module } from '@nestjs/common';
import { CronService } from './cron.service';

@Module({
  providers: [CronService],
})
export class TasksModule {}

// app.module.ts
import { Module } from '@nestjs/common';
import { AppController } from './app.controller';
import { AppService } from './app.service';
import { TasksModule } from './tasks/tasks.module';

@Module({
  imports: [TasksModule],
  controllers: [AppController],
  providers: [AppService],
})
export class AppModule {
  constructor(private readonly cronService: CronService) {
    this.cronService.scheduleTask(); // Start the task in the constructor
  }
}

Using bull

Installing bull
Install bull and its dependencies:

npm install --save bull

Creating a Queue
Create a queue to manage tasks:

// tasks/queue.service.ts
import { Injectable } from '@nestjs/common';
import { Queue } from 'bull';

@Injectable()
export class QueueService {
  private queue: Queue;

  constructor() {
    this.queue = new Queue('exampleQueue', { redis: { host: 'localhost' } });
  }

  async addJob(data: any) {
    await this.queue.add(data);
  }

  startProcessing() {
    this.queue.process(async (job, done) => {
      console.log('Processing job', job.data);
      done();
    });
  }
}

Registering the Queue Service

Register the queue service in the module:

// tasks/tasks.module.ts
import { Module } from '@nestjs/common';
import { QueueService } from './queue.service';

@Module({
  providers: [QueueService],
})
export class TasksModule {}

// app.module.ts
import { Module } from '@nestjs/common';
import { AppController } from './app.controller';
import { AppService } from './app.service';
import { TasksModule } from './tasks/tasks.module';

@Module({
  imports: [TasksModule],
  controllers: [AppController],
  providers: [AppService],
})
export class AppModule {
  constructor(private readonly queueService: QueueService) {
    this.queueService.startProcessing(); // Start task processing in the constructor
  }
}

Periodic Tasks

To execute a task periodically, combine node-cron and bull:

// tasks/cron.service.ts
import { Injectable } from '@nestjs/common';
import cron from 'node-cron';
import { QueueService } from './queue.service';

@Injectable()
export class CronService {
  constructor(private readonly queueService: QueueService) {}

  scheduleTask() {
    cron.schedule('*/5 * * * *', () => {
      this.queueService.addJob({ message: 'Hello from cron!' });
    });
  }
}

Handling Task Failures

Tasks may fail for various reasons. With bull, you can easily handle these failures:

// tasks/queue.service.ts
import { Injectable } from '@nestjs/common';
import { Queue } from 'bull';

@Injectable()
export class QueueService {
  private queue: Queue;

  constructor() {
    this.queue = new Queue('exampleQueue', { redis: { host: 'localhost' } });
  }

  async addJob(data: any) {
    await this.queue.add(data);
  }

  startProcessing() {
    this.queue.process(async (job, done) => {
      try {
        console.log('Processing job', job.data);
        // Execute task logic
        done();
      } catch (error) {
        console.error('Error processing job:', error);
        done(error); // Mark task as failed
      }
    });

    // Listen for failed events
    this.queue.on('failed', (job, err) => {
      console.error(`Job ${job.id} failed: ${err.message}`);
    });
  }
}

Task Retries

bull supports task retries. You can configure a retry strategy to automatically retry failed tasks:

// tasks/queue.service.ts
import { Injectable } from '@nestjs/common';
import { Queue } from 'bull';

@Injectable()
export class QueueService {
  private queue: Queue;

  constructor() {
    this.queue = new Queue('exampleQueue', { redis: { host: 'localhost' } });
  }

  async addJob(data: any) {
    await this.queue.add(data, {
      attempts: 3, // Retry 3 times
      backoff: {
        type: 'exponential', // Exponential backoff
        delay: 1000, // Initial delay
      },
    });
  }

  startProcessing() {
    this.queue.process(async (job, done) => {
      try {
        console.log('Processing job', job.data);
        // Execute task logic
        done();
      } catch (error) {
        console.error('Error processing job:', error);
        done(error); // Mark task as failed
      }
    });

    // Listen for failed events
    this.queue.on('failed', (job, err) => {
      console.error(`Job ${job.id} failed: ${err.message}`);
    });
  }
}

Best Practices

Configuration Files

Store sensitive information like Redis connection strings in configuration files to avoid hardcoding:

// config/redis.config.ts
import { ConfigModule, ConfigService } from '@nestjs/config';

export class RedisConfig {
  static register(configService: ConfigService) {
    return {
      host: configService.get<string>('REDIS_HOST'),
      port: configService.get<number>('REDIS_PORT'),
      password: configService.get<string>('REDIS_PASSWORD'),
    };
  }
}

Then use it in queue.service.ts:

// tasks/queue.service.ts
import { Injectable } from '@nestjs/common';
import { Queue } from 'bull';
import { RedisConfig } from '../config/redis.config';

@Injectable()
export class QueueService {
  private queue: Queue;

  constructor(private readonly configService: ConfigService) {
    const config = RedisConfig.register(configService);
    this.queue = new Queue('exampleQueue', { redis: config });
  }

  // ...
}

Logging

Use logging libraries (e.g., Winston or Pino) to record task states and errors, which is invaluable for debugging and monitoring.

Task Isolation

For different task types, use separate queues to avoid interference between tasks.

Task Priority

bull supports task priority, allowing you to adjust execution order based on task importance.

Monitoring and Visualization

Use tools like bull-board or bullmq to monitor queue states and task progress, aiding in debugging and optimization.

Example

Below is a complete example demonstrating how to use node-cron and bull for scheduled and periodic tasks:

// tasks/cron.service.ts
import { Injectable } from '@nestjs/common';
import cron from 'node-cron';
import { QueueService } from './queue.service';

@Injectable()
export class CronService {
  constructor(private readonly queueService: QueueService) {}

  scheduleTask() {
    cron.schedule('*/5 * * * *', () => {
      this.queueService.addJob({ message: 'Hello from cron!' });
    });
  }
}
// tasks/queue.service.ts
import { Injectable } from '@nestjs/common';
import { Queue } from 'bull';
import { RedisConfig } from '../config/redis.config';

@Injectable()
export class QueueService {
  private queue: Queue;

  constructor(private readonly configService: ConfigService) {
    const config = RedisConfig.register(configService);
    this.queue = new Queue('exampleQueue', { redis: config });
  }

  async addJob(data: any) {
    await this.queue.add(data, {
      attempts: 3,
      backoff: {
        type: 'exponential',
        delay: 1000,
      },
    });
  }

  startProcessing() {
    this.queue.process(async (job, done) => {
      try {
        console.log('Processing job', job.data);
        // Execute task logic
        done();
      } catch (error) {
        console.error('Error processing job:', error);
        done(error); // Mark task as failed
      }
    });

    // Listen for failed events
    this.queue.on('failed', (job, err) => {
      console.error(`Job ${job.id} failed: ${err.message}`);
    });
  }
}
// app.module.ts
import { Module } from '@nestjs/common';
import { AppController } from './app.controller';
import { AppService } from './app.service';
import { TasksModule } from './tasks/tasks.module';
import { ConfigModule } from '@nestjs/config';
import { RedisConfig } from './config/redis.config';

@Module({
  imports: [
    TasksModule,
    ConfigModule.forRoot({
      isGlobal: true,
      load: [RedisConfig],
    }),
  ],
  controllers: [AppController],
  providers: [AppService],
})
export class AppModule {
  constructor(private readonly cronService: CronService, private readonly queueService: QueueService) {
    this.cronService.scheduleTask(); // Start scheduled tasks in the constructor
    this.queueService.startProcessing(); // Start task processing in the constructor
  }
}
Share your love