Lesson 30-NestJS Microservices

Overview

Introduction

NestJS provides a robust set of tools for building microservice architecture applications. It supports multiple communication patterns for microservices, including request-response, asynchronous response, and event-based communication.

Installation

Ensure that Node.js and npm are installed.

npm install -g @nestjs/cli

Getting Started

Create a microservices project:

nest new microservices-app
cd microservices-app

Install microservices dependencies:

npm install @nestjs/microservices

Patterns

Request-Response Pattern:
Suitable for scenarios requiring synchronous responses.

Asynchronous Response Pattern:
Suitable for scenarios where a response is not immediately required.

Event-Based Pattern:
Suitable for publishing and subscribing to events.

Request-Response

Defining a Microservice Client:

import { ClientKafka, ClientProxyFactory, Transport } from '@nestjs/microservices';

const kafkaClient = ClientProxyFactory.create({
  transport: Transport.KAFKA,
  options: {
    client: {
      brokers: ['localhost:9092'],
    },
    consumer: {
      groupId: 'group-id',
    },
  },
});

Defining a Microservice Server:

import { Controller, MessagePattern } from '@nestjs/common';
import { ClientKafka } from '@nestjs/microservices';

@Controller()
export class AppController {
  constructor(private readonly clientKafka: ClientKafka) {}

  @MessagePattern('hello')
  hello(data: string): string {
    return `Hello, ${data}!`;
  }
}

Asynchronous Response

Defining an Asynchronous Pattern Server:

import { Controller, MessagePattern } from '@nestjs/common';
import { ClientKafka } from '@nestjs/microservices';

@Controller()
export class AppController {
  constructor(private readonly clientKafka: ClientKafka) {}

  @MessagePattern('async-hello')
  asyncHello(data: string): void {
    this.clientKafka.send('response-topic', `Async Hello, ${data}!`);
  }
}

Event-Based

Defining an Event Publisher:

import { ClientKafka } from '@nestjs/microservices';

export class EventPublisher {
  constructor(private readonly clientKafka: ClientKafka) {}

  publishEvent(data: string): void {
    this.clientKafka.emit('event-topic', data);
  }
}

Defining an Event Subscriber:

import { Controller, MessagePattern } from '@nestjs/common';
import { ClientKafka } from '@nestjs/microservices';

@Controller()
export class AppController {
  constructor(private readonly clientKafka: ClientKafka) {}

  @MessagePattern('event-topic')
  onEvent(data: string): void {
    console.log(`Received event: ${data}`);
  }
}

Decorators

@MessagePattern:
Used to define a message pattern.

@EventPattern:
Used to define an event pattern.

@Client:
Used to define client configuration.

Client

Creating a Client Instance:

import { ClientKafka, ClientProxyFactory, Transport } from '@nestjs/microservices';

const kafkaClient = ClientProxyFactory.create({
  transport: Transport.KAFKA,
  options: {
    client: {
      brokers: ['localhost:9092'],
    },
    consumer: {
      groupId: 'group-id',
    },
  },
});

Sending Messages:

kafkaClient.send('request-topic', 'Hello, Kafka!');

Publishing Events:

const eventPublisher = new EventPublisher(kafkaClient);
eventPublisher.publishEvent('New event');

Scopes

Singleton Scope:
The default scope, where each service has a single instance.

Request Scope:
A new instance is created for each request.

Session Scope:
A new instance is created for each session.

Handling Timeouts

Setting a Timeout:

kafkaClient.send('request-topic', 'Hello, Kafka!', {
  replyTo: 'response-topic',
  timeout: 5000, // 5-second timeout
});

Example Code

Defining a Microservice Client

// src/client/client.module.ts
import { Module } from '@nestjs/common';
import { ClientKafka } from '@nestjs/microservices';

@Module({
  providers: [
    {
      provide: 'KAFKA_CLIENT',
      useValue: ClientKafka,
    },
  ],
})
export class ClientModule {}

Defining a Microservice Server

// src/server/app.controller.ts
import { Controller, MessagePattern } from '@nestjs/common';
import { ClientKafka } from '@nestjs/microservices';

@Controller()
export class AppController {
  constructor(private readonly clientKafka: ClientKafka) {}

  @MessagePattern('hello')
  hello(data: string): string {
    return `Hello, ${data}!`;
  }
}

Defining an Event Publisher

// src/event-publisher/event-publisher.service.ts
import { Injectable } from '@nestjs/common';
import { ClientKafka } from '@nestjs/microservices';

@Injectable()
export class EventPublisher {
  constructor(private readonly clientKafka: ClientKafka) {}

  publishEvent(data: string): void {
    this.clientKafka.emit('event-topic', data);
  }
}

Defining an Event Subscriber

// src/server/app.controller.ts
import { Controller, MessagePattern } from '@nestjs/common';
import { ClientKafka } from '@nestjs/microservices';

@Controller()
export class AppController {
  constructor(private readonly clientKafka: ClientKafka) {}

  @MessagePattern('event-topic')
  onEvent(data: string): void {
    console.log(`Received event: ${data}`);
  }
}

Client Instance

// src/client/client.service.ts
import { Injectable } from '@nestjs/common';
import { ClientKafka, ClientProxyFactory, Transport } from '@nestjs/microservices';

@Injectable()
export class ClientService {
  private kafkaClient: ClientKafka;

  constructor() {
    this.kafkaClient = ClientProxyFactory.create({
      transport: Transport.KAFKA,
      options: {
        client: {
          brokers: ['localhost:9092'],
        },
        consumer: {
          groupId: 'group-id',
        },
      },
    });
  }

  sendHello(): void {
    this.kafkaClient.send('request-topic', 'Hello, Kafka!');
  }

  publishEvent(): void {
    const eventPublisher = new EventPublisher(this.kafkaClient);
    eventPublisher.publishEvent('New event');
  }
}

Redis

Installing Required Dependencies

Ensure you have installed @nestjs/microservices and the redis package:

npm install --save @nestjs/microservices redis

Creating a Microservice Client

In your NestJS application, create a microservice client to connect to a Redis server. This is typically done in the main module (e.g., app.module.ts).

Example Code:

import { Module } from '@nestjs/common';
import { AppController } from './app.controller';
import { AppService } from './app.service';
import { ClientsModule, Transport } from '@nestjs/microservices';

@Module({
  imports: [
    ClientsModule.register([
      {
        name: 'REDIS_CLIENT',
        transport: Transport.REDIS,
        options: {
          url: 'redis://localhost:6379',
        },
      },
    ]),
  ],
  controllers: [AppController],
  providers: [AppService],
})
export class AppModule {}

Using the Microservice Client

Once the client is registered, you can use it in a service to send and receive messages.

Example Code:

import { Injectable } from '@nestjs/common';
import { ClientProxy } from '@nestjs/microservices';

@Injectable()
export class AppService {
  constructor(@Inject('REDIS_CLIENT') private readonly client: ClientProxy) {}

  async sendMessage(message: string): Promise<string> {
    const pattern = { cmd: 'message' };
    return this.client.send(pattern, message).toPromise();
  }
}

Creating a Microservice Provider

To allow other microservices to call your service, create a provider to listen for messages.

Example Code:

import { Controller } from '@nestjs/common';
import { MessagePattern } from '@nestjs/microservices';

@Controller()
export class AppController {
  constructor(private readonly appService: AppService) {}

  @MessagePattern({ cmd: 'message' })
  handleMessage(message: string): string {
    console.log(`Received message: ${message}`);
    return `Message received: ${message}`;
  }
}

Starting the Application

Finally, start the NestJS application to begin listening for Redis messages.

Example Code:

import { NestFactory } from '@nestjs/core';
import { AppModule } from './app.module';

async function bootstrap() {
  const app = await NestFactory.create(AppModule);
  await app.listen(3000);
}
bootstrap();

MQTT

Installing Required Dependencies

Install @nestjs/microservices and MQTT-related libraries, such as mqtt or paho-mqtt.

npm install --save @nestjs/microservices mqtt

Configuring the MQTT Client

In your NestJS application, create an MQTT client to connect to an MQTT server. This is typically done in the main module.

Example Code:

import { Module } from '@nestjs/common';
import { AppController } from './app.controller';
import { AppService } from './app.service';
import { ClientsModule, Transport } from '@nestjs/microservices';

@Module({
  imports: [
    ClientsModule.register([
      {
        name: 'MQTT_CLIENT',
        transport: Transport.MQTT,
        options: {
          host: 'localhost',
          port: 1883,
          username: 'your_username',
          password: 'your_password',
          clientId: Math.random().toString(16).slice(2),
        },
      },
    ]),
  ],
  controllers: [AppController],
  providers: [AppService],
})
export class AppModule {}

Using the MQTT Client

Once the client is registered, you can use it in a service to send and receive messages.

Example Code:

import { Injectable } from '@nestjs/common';
import { ClientProxy } from '@nestjs/microservices';

@Injectable()
export class AppService {
  constructor(@Inject('MQTT_CLIENT') private readonly client: ClientProxy) {}

  async sendMessage(topic: string, message: string): Promise<any> {
    const pattern = { topic };
    return this.client.send(pattern, message).toPromise();
  }
}

Creating an MQTT Service Provider

To allow other microservices to call your service, create a provider to listen to specific topics.

Example Code:

import { Controller } from '@nestjs/common';
import { MessagePattern } from '@nestjs/microservices';

@Controller()
export class AppController {
  constructor(private readonly appService: AppService) {}

  @MessagePattern({ topic: 'greeting' })
  handleGreeting(message: string): string {
    console.log(`Received greeting: ${message}`);
    return `Hello, ${message}!`;
  }
}

Starting the Application

Finally, start the NestJS application to begin listening for MQTT messages.

Example Code:

import { NestFactory } from '@nestjs/core';
import { AppModule } from './app.module';

async function bootstrap() {
  const app = await NestFactory.create(AppModule);
  await app.listen(3000);
}
bootstrap();

NATS

Installing Required Dependencies

Install @nestjs/microservices and NATS-related libraries.

npm install --save @nestjs/microservices nats

Configuring the NATS Client

In your NestJS application, create a NATS client to connect to a NATS server. This is typically done in the main module.

Example Code:

import { Module } from '@nestjs/common';
import { AppController } from './app.controller';
import { AppService } from './app.service';
import { ClientsModule, Transport } from '@nestjs/microservices';

@Module({
  imports: [
    ClientsModule.register([
      {
        name: 'NATS_CLIENT',
        transport: Transport.NATS,
        options: {
          servers: ['nats://localhost:4222'],
        },
      },
    ]),
  ],
  controllers: [AppController],
  providers: [AppService],
})
export class AppModule {}

Using the NATS Client

Once the client is registered, you can use it in a service to send and receive messages.

Example Code:

import { Injectable } from '@nestjs/common';
import { ClientProxy } from '@nestjs/microservices';

@Injectable()
export class AppService {
  constructor(@Inject('NATS_CLIENT') private readonly client: ClientProxy) {}

  async sendMessage(subject: string, message: any): Promise<any> {
    const pattern = { subject };
    return this.client.send(pattern, message).toPromise();
  }
}

Creating a NATS Service Provider

To allow other microservices to call your service, create a provider to listen to specific subjects.

Example Code:

import { Controller } from '@nestjs/common';
import { MessagePattern } from '@nestjs/microservices';

@Controller()
export class AppController {
  constructor(private readonly appService: AppService) {}

  @MessagePattern({ subject: 'greeting' })
  handleGreeting(message: string): string {
    console.log(`Received greeting: ${message}`);
    return `Hello, ${message}!`;
  }
}

Starting the Application

Finally, start the NestJS application to begin listening for NATS messages.

Example Code:

import { NestFactory } from '@nestjs/core';
import { AppModule } from './app.module';

async function bootstrap() {
  const app = await NestFactory.create(AppModule);
  await app.listen(3000);
}
bootstrap();

RabbitMQ

Installing Dependencies

npm install --save @nestjs/microservices amqplib

Example Code

// app.module.ts
import { Module } from '@nestjs/common';
import { AppController } from './app.controller';
import { AppService } from './app.service';
import { ClientsModule, Transport } from '@nestjs/microservices';

@Module({
  imports: [
    ClientsModule.register([
      {
        name: 'RABBITMQ_CLIENT',
        transport: Transport.RMQ,
        options: {
          urls: ['amqp://localhost:5672'],
          queue: 'nest_queue',
          queueOptions: {
            durable: false,
          },
        },
      },
    ]),
  ],
  controllers: [AppController],
  providers: [AppService],
})
export class AppModule {}
// app.service.ts
import { Injectable } from '@nestjs/common';
import { ClientProxy } from '@nestjs/microservices';

@Injectable()
export class AppService {
  constructor(@Inject('RABBITMQ_CLIENT') private readonly client: ClientProxy) {}

  async sendMessage(pattern: object, data: any): Promise<any> {
    return this.client.send(pattern, data).toPromise();
  }
}
// app.controller.ts
import { Controller } from '@nestjs/common';
import { MessagePattern } from '@nestjs/microservices';

@Controller()
export class AppController {
  constructor(private readonly appService: AppService) {}

  @MessagePattern({ cmd: 'greeting' })
  handleGreeting(message: string): string {
    console.log(`Received greeting: ${message}`);
    return `Hello, ${message}!`;
  }
}

Kafka

Installing Dependencies

npm install --save @nestjs/microservices @nestjs/microservices/kafka

Example Code

// app.module.ts
import { Module } from '@nestjs/common';
import { AppController } from './app.controller';
import { AppService } from './app.service';
import { ClientsModule, Transport } from '@nestjs/microservices';

@Module({
  imports: [
    ClientsModule.register([
      {
        name: 'KAFKA_CLIENT',
        transport: Transport.KAFKA,
        options: {
          client: {
            brokers: ['localhost:9092'],
          },
          consumer: {
            groupId: 'nest-consumer-group',
          },
        },
      },
    ]),
  ],
  controllers: [AppController],
  providers: [AppService],
})
export class AppModule {}
// app.service.ts
import { Injectable } from '@nestjs/common';
import { ClientKafka } from '@nestjs/microservices';

@Injectable()
export class AppService {
  constructor(@Inject('KAFKA_CLIENT') private readonly client: ClientKafka) {}

  async sendMessage(topic: string, data: any): Promise<any> {
    return this.client.emit(topic, data);
  }
}
// app.controller.ts
import { Controller } from '@nestjs/common';
import { MessagePattern } from '@nestjs/microservices';

@Controller()
export class AppController {
  constructor(private readonly appService: AppService) {}

  @MessagePattern({ cmd: 'greeting' })
  handleGreeting(message: string): string {
    console.log(`Received greeting: ${message}`);
    return `Hello, ${message}!`;
  }
}

gRPC

Installing Dependencies

npm install --save @nestjs/microservices @grpc/grpc-js @grpc/proto-loader

Example Code

// greeting.proto
syntax = "proto3";

package greeting;

service Greeting {
  rpc SayHello (HelloRequest) returns (HelloReply);
}

message HelloRequest {
  string name = 1;
}

message HelloReply {
  string message = 1;
}
// app.module.ts
import { Module } from '@nestjs/common';
import { AppController } from './app.controller';
import { AppService } from './app.service';
import { ClientsModule, Transport } from '@nestjs/microservices';
import * as protoLoader from '@grpc/proto-loader';
import * as grpc from '@grpc/grpc-js';

@Module({
  imports: [
    ClientsModule.register([
      {
        name: 'GRPC_CLIENT',
        transport: Transport.GRPC,
        options: {
          package: 'greeting',
          protoPath: './greeting.proto',
          loader: protoLoader,
          clientOptions: {
            url: 'localhost:50051',
          },
        },
      },
    ]),
  ],
  controllers: [AppController],
  providers: [AppService],
})
export class AppModule {}
// app.service.ts
import { Injectable } from '@nestjs/common';
import { ClientGrpc } from '@nestjs/microservices';

@Injectable()
export class AppService {
  private client: any;

  constructor(@Inject('GRPC_CLIENT') private readonly clientGrpc: ClientGrpc) {
    this.client = this.clientGrpc.getService('Greeting');
  }

  async sayHello(name: string): Promise<string> {
    const request = { name };
    const response = await this.client.SayHello(request);
    return response.message;
  }
}
// app.controller.ts
import { Controller } from '@nestjs/common';
import { AppService } from './app.service';

@Controller()
export class AppController {
  constructor(private readonly appService: AppService) {}

  async sayHello(name: string): Promise<string> {
    return this.appService.sayHello(name);
  }
}

Starting the Application

// main.ts
import { NestFactory } from '@nestjs/core';
import { AppModule } from './app.module';

async function bootstrap() {
  const app = await NestFactory.create(AppModule);
  await app.listen(3000);
}
bootstrap();

Custom Transports

NestJS allows you to create custom transport strategies to support additional communication protocols or customize existing ones. This is achieved by implementing the CustomTransportStrategy interface.

Example Code:

import { CustomTransportStrategy, Server } from '@nestjs/microservices';
import { Injectable } from '@nestjs/common';

@Injectable()
export class CustomTransport extends Server implements CustomTransportStrategy {
  listen(callback: () => void) {
    // Implement custom listening logic
    console.log('Custom transport listening...');
    callback();
  }

  close() {
    // Implement cleanup logic
    console.log('Custom transport closed');
  }
}

Register the custom transport in the module:

import { Module } from '@nestjs/common';
import { CustomTransport } from './custom.transport';

@Module({
  providers: [
    {
      provide: 'CUSTOM_TRANSPORT',
      useClass: CustomTransport,
    },
  ],
})
export class AppModule {}

Exception Filters

Exception filters handle errors thrown during microservice message processing.

Example Code:

import { Catch, ArgumentsHost, ExceptionFilter } from '@nestjs/common';
import { RpcException } from '@nestjs/microservices';

@Catch(RpcException)
export class RpcExceptionFilter implements ExceptionFilter {
  catch(exception: RpcException, host: ArgumentsHost) {
    const ctx = host.switchToRpc();
    const response = ctx.getContext();

    response.status('error').json({
      message: exception.message,
      status: 'error',
    });
  }
}

Apply the filter globally or to a specific controller:

import { Module } from '@nestjs/common';
import { APP_FILTER } from '@nestjs/core';
import { RpcExceptionFilter } from './rpc-exception.filter';

@Module({
  providers: [
    {
      provide: APP_FILTER,
      useClass: RpcExceptionFilter,
    },
  ],
})
export class AppModule {}

Pipes

Pipes validate or transform incoming messages in microservices.

Example Code:

import { PipeTransform, Injectable, BadRequestException } from '@nestjs/common';

@Injectable()
export class ValidationPipe implements PipeTransform {
  transform(value: any) {
    if (!value || typeof value !== 'string') {
      throw new BadRequestException('Invalid message format');
    }
    return value;
  }
}

Apply the pipe to a message handler:

import { Controller, MessagePattern, UsePipes } from '@nestjs/common';
import { ValidationPipe } from './validation.pipe';

@Controller()
export class AppController {
  @MessagePattern('validate')
  @UsePipes(new ValidationPipe())
  handleMessage(data: string): string {
    return `Validated: ${data}`;
  }
}

Guards

Guards protect microservice message handlers from unauthorized access.

Example Code:

import { CanActivate, ExecutionContext, Injectable } from '@nestjs/common';

@Injectable()
export class AuthGuard implements CanActivate {
  canActivate(context: ExecutionContext): boolean {
    const data = context.switchToRpc().getData();
    // Implement authorization logic
    return data.token === 'valid-token';
  }
}

Apply the guard to a message handler:

import { Controller, MessagePattern, UseGuards } from '@nestjs/common';
import { AuthGuard } from './auth.guard';

@Controller()
export class AppController {
  @MessagePattern('protected')
  @UseGuards(AuthGuard)
  handleProtected(data: any): string {
    return 'Protected resource accessed';
  }
}

Interceptors

Interceptors modify incoming or outgoing messages in microservices.

Example Code:

import { Injectable, NestInterceptor, ExecutionContext, CallHandler } from '@nestjs/common';
import { Observable } from 'rxjs';
import { map } from 'rxjs/operators';

@Injectable()
export class LoggingInterceptor implements NestInterceptor {
  intercept(context: ExecutionContext, next: CallHandler): Observable<any> {
    console.log('Before message processing...');
    const now = Date.now();

    return next.handle().pipe(
      map((data) => {
        console.log(`After message processing... ${Date.now() - now}ms`);
        return data;
      }),
    );
  }
}

Apply the interceptor to a message handler:

import { Controller, MessagePattern, UseInterceptors } from '@nestjs/common';
import { LoggingInterceptor } from './logging.interceptor';

@Controller()
export class AppController {
  @MessagePattern('log')
  @UseInterceptors(LoggingInterceptor)
  handleMessage(data: string): string {
    return `Message: ${data}`;
  }
}
Share your love