Lesson 35-NestJS Advanced Application Development

Microservices Architecture

Microservices Decomposition and Communication

gRPC Communication Implementation:

// grpc-client.service.ts
@Injectable()
export class GrpcClientService {
  private client: ClientGrpc

  constructor(@Inject('USER_PACKAGE') private clientProvider: ClientProxy) {
    this.client = this.clientProvider.getClientByServiceName('UserService')
  }

  async getUser(id: string): Promise<User> {
    return this.client.send<User>('getUser', { id }).toPromise()
  }
}

// grpc-server.module.ts
@Module({
  providers: [
    {
      provide: 'USER_PACKAGE',
      useFactory: (config: ConfigService) => {
        return ClientGrpc.create({
          package: 'user',
          protoPath: join(__dirname, 'user.proto'),
          url: config.get('GRPC_SERVER_URL')
        })
      },
      inject: [ConfigService]
    }
  ]
})
export class GrpcServerModule {}

MQTT Communication Implementation:

// mqtt-client.service.ts
@Injectable()
export class MqttClientService {
  private client: MqttClient

  constructor() {
    this.client = connect(process.env.MQTT_BROKER_URL)

    this.client.on('connect', () => {
      this.client.subscribe('user/created')
    })

    this.client.on('message', (topic, message) => {
      if (topic === 'user/created') {
        const user = JSON.parse(message.toString())
        // Handle user creation event
      }
    })
  }

  publishUserCreated(user: User) {
    this.client.publish('user/created', JSON.stringify(user))
  }
}

NestJS Microservices Module

Transport Configuration:

// main.ts
async function bootstrap() {
  const app = await NestFactory.createMicroservice(AppModule, {
    transport: Transport.TCP,
    options: {
      host: 'localhost',
      port: 3001
    }
  })

  await app.listen()
}
bootstrap()

// Or use hybrid application
const app = await NestFactory.create(AppModule)
app.connectMicroservice({
  transport: Transport.NATS,
  options: {
    url: 'nats://localhost:4222'
  }
})
await app.startAllMicroservices()
await app.listen(3000)

ClientProxy Advanced Configuration:

// user.client.ts
@Injectable()
export class UserClient {
  constructor(
    @Inject('USER_SERVICE') private client: ClientProxy
  ) {}

  async createUser(user: UserDto): Promise<User> {
    return this.client.send<User>('createUser', user).pipe(
      timeout(5000),
      retry(3)
    ).toPromise()
  }
}

Service Discovery and Registration

Consul Integration:

// consul.module.ts
@Module({
  providers: [
    {
      provide: 'CONSUL_CLIENT',
      useFactory: () => {
        return new Consul({
          host: process.env.CONSUL_HOST,
          port: process.env.CONSUL_PORT
        })
      }
    },
    ConsulService
  ],
  exports: [ConsulService]
})
export class ConsulModule {}

// consul.service.ts
@Injectable()
export class ConsulService {
  constructor(@Inject('CONSUL_CLIENT') private consul: Consul) {}

  async registerService(service: ServiceRegistration) {
    await this.consul.agent.service.register(service)
  }

  async discoverService(name: string): Promise<ServiceEntry[]> {
    return this.consul.catalog.service.nodes(name)
  }
}

Dynamic Service Discovery:

// dynamic-client-proxy.ts
@Injectable()
export class DynamicClientProxy {
  private clients: Map<string, ClientProxy> = new Map()

  async getClient(serviceName: string): Promise<ClientProxy> {
    if (this.clients.has(serviceName)) {
      return this.clients.get(serviceName)
    }

    const instances = await this.consulService.discoverService(serviceName)
    if (instances.length === 0) {
      throw new Error(`Service ${serviceName} not found`)
    }

    const client = ClientProxyFactory.create({
      transport: Transport.TCP,
      options: {
        host: instances[0].Address,
        port: instances[0].ServicePort
      }
    })

    this.clients.set(serviceName, client)
    return client
  }
}

Distributed Transactions and Message Queues

RabbitMQ Transactional Messages:

// rabbitmq.service.ts
@Injectable()
export class RabbitMQService {
  private channel: Channel

  constructor(@Inject('RABBITMQ_CONNECTION') private connection: Connection) {
    this.channel = this.connection.createChannel()
  }

  async sendTransactionalMessage(exchange: string, routingKey: string, message: any) {
    await this.channel.assertExchange(exchange, 'topic', { durable: true })

    const tx = await this.channel.txSelect()
    try {
      await this.channel.publish(
        exchange,
        routingKey,
        Buffer.from(JSON.stringify(message)),
        { persistent: true }
      )
      await this.channel.txCommit()
    } catch (err) {
      await this.channel.txRollback()
      throw err
    }
  }
}

Kafka Consumer Implementation:

// kafka.consumer.ts
@Injectable()
export class KafkaConsumer {
  constructor(
    @Inject('KAFKA_CLIENT') private client: ClientKafka
  ) {}

  async listen() {
    this.client.subscribeToResponseOf('user.created')
    this.client.consume()

    this.client.on('message', async (message) => {
      if (message.topic === 'user.created') {
        const user = JSON.parse(message.value.toString())
        // Handle user creation event
      }
    })
  }
}

Microservices Monitoring and Logging

Prometheus Monitoring Integration:

// metrics.module.ts
@Module({
  providers: [
    {
      provide: 'METRICS_INSTANCE',
      useFactory: () => {
        return new client.Registry()
      }
    },
    MetricsService
  ],
  exports: [MetricsService]
})
export class MetricsModule {}

// metrics.middleware.ts
@Injectable()
export class MetricsMiddleware implements NestMiddleware {
  constructor(@Inject('METRICS_INSTANCE') private registry: client.Registry) {}

  use(req: Request, res: Response, next: NextFunction) {
    const end = this.registry.startTimer({ path: req.path, method: req.method })
    res.on('finish', () => {
      end()
    })
    next()
  }
}

ELK Log Aggregation:

// elk.logger.ts
export class ElkLogger {
  constructor(private config: ConfigService) {}

  log(message: string, context?: string, metadata?: any) {
    const logEntry = {
      timestamp: new Date().toISOString(),
      message,
      context,
      ...metadata
    }

    axios.post(this.config.get('LOGSTASH_URL'), logEntry)
  }
}

Real-Time Application Development

WebSocket Integration

WebSocketGateway Advanced Usage:

@WebSocketGateway({
  port: 3001,
  transports: ['websocket'],
  cors: {
    origin: '*'
  }
})
export class EventsGateway {
  @WebSocketServer()
  server: Server

  @SubscribeMessage('events')
  handleEvent(client: Socket, payload: any): WsResponse<any> {
    return { event: 'events', data: payload }
  }

  @SubscribeMessage('identity')
  async identity(client: Socket, payload: number): Promise<number> {
    return payload
  }
}

Rooms and Namespaces:

@WebSocketGateway()
export class ChatGateway {
  @WebSocketServer()
  server: Server

  @SubscribeMessage('joinRoom')
  handleJoinRoom(client: Socket, roomId: string) {
    client.join(roomId)
    client.to(roomId).emit('userJoined', { userId: client.id })
  }

  @SubscribeMessage('sendMessage')
  handleMessage(client: Socket, { roomId, message }: { roomId: string; message: string }) {
    client.to(roomId).emit('newMessage', { userId: client.id, message })
  }
}

GraphQL Integration

GraphQL Module Configuration:

@Module({
  imports: [
    GraphQLModule.forRoot<ApolloDriverConfig>({
      driver: ApolloDriver,
      autoSchemaFile: true,
      context: ({ req }) => ({ req }),
      plugins: [
        {
          requestDidStart() {
            return {
              willSendResponse({ context, response }) {
                // Add custom response header
                context.res.setHeader('X-Custom-Header', 'value')
              }
            }
          }
        }
      ]
    })
  ],
  providers: [UserResolver]
})
export class AppModule {}

Subscription Implementation:

@Resolver('User')
export class UserResolver {
  @Subscription('userCreated')
  userCreated(@Root() payload: any) {
    return payload
  }

  @ResolveField('posts')
  async posts(@Parent() user: User) {
    return this.postService.findByUserId(user.id)
  }
}

Server-Sent Events

SSE Implementation:

@Controller('events')
export class EventsController {
  @Get()
  @Header('Content-Type', 'text/event-stream')
  @Header('Cache-Control', 'no-cache')
  @Header('Connection', 'keep-alive')
  async getEvents(@Res() res: Response) {
    const interval = setInterval(() => {
      res.write(`data: ${JSON.stringify({ time: new Date() })}\n\n`)
    }, 1000)

    req.on('close', () => {
      clearInterval(interval)
    })
  }
}

Real-Time Collaborative Editing

CRDT Implementation Example:

// crdt.service.ts
@Injectable()
export class CrdtService {
  private documents: Map<string, Y.Doc> = new Map()

  getDocument(docId: string): Y.Doc {
    if (!this.documents.has(docId)) {
      this.documents.set(docId, new Y.Doc())
    }
    return this.documents.get(docId)
  }

  applyRemoteUpdate(docId: string, update: Uint8Array) {
    const doc = this.getDocument(docId)
    Y.applyUpdate(doc, update)
  }

  generateLocalUpdate(docId: string): Uint8Array {
    const doc = this.getDocument(docId)
    return Y.encodeStateAsUpdate(doc)
  }
}

Real-Time Application Security

WebSocket Authentication Middleware:

@Injectable()
export class WsJwtGuard implements CanActivate {
  canActivate(context: ExecutionContext): boolean {
    const client = context.switchToWs().getClient()
    const token = client.handshake.auth.token

    try {
      const decoded = verifyJwt(token)
      client['user'] = decoded
      return true
    } catch (err) {
      throw new WsException('Unauthorized')
    }
  }
}

// Using guard
@WebSocketGateway()
@UseGuards(WsJwtGuard)
export class SecureGateway {}

CLI Tool Development

Custom Command-Line Tools

Commander.js Implementation:

#!/usr/bin/env node
import { Command } from 'commander'
import { generateModule } from './commands/generate-module'
import { generateService } from './commands/generate-service'

const program = new Command()

program
  .name('nestjs-cli')
  .description('NestJS Custom CLI Tool')
  .version('1.0.0')

program
  .command('generate:module <name>')
  .description('Generate NestJS module')
  .action(generateModule)

program
  .command('generate:service <name>')
  .description('Generate NestJS service')
  .action(generateService)

program.parse(process.argv)

NestJS and CLI Integration

Custom NestJS Command:

// cli.service.ts
@Injectable()
export class CliService {
  async generateModule(name: string) {
    const modulePath = join(process.cwd(), 'src', `${name}`, `${name}.module.ts`)
    const content = `
      import { Module } from '@nestjs/common'

      @Module({})
      export class ${capitalize(name)}Module {}
    `

    writeFileSync(modulePath, content)
    console.log(`Module ${name} has been generated`)
  }
}

// Usage in CLI tool
program
  .command('generate:module <name>')
  .action(async (name) => {
    const cliService = container.resolve(CliService)
    await cliService.generateModule(name)
  })

Interactive Command Line

Inquirer.js Implementation:

// interactive-cli.ts
import inquirer from 'inquirer'

async function promptOptions() {
  const answers = await inquirer.prompt([
    {
      type: 'list',
      name: 'action',
      message: 'Select an action:',
      choices: ['Generate module', 'Generate service', 'Exit']
    },
    {
      type: 'input',
      name: 'name',
      message: 'Enter name:',
      when: (answers) => ['Generate module', 'Generate service'].includes(answers.action)
    }
  ])

  return answers
}

// Main loop
async function main() {
  while (true) {
    const { action, name } = await promptOptions()

    if (action === 'Exit') break

    if (action === 'Generate module') {
      await generateModule(name)
    } else if (action === 'Generate service') {
      await generateService(name)
    }
  }
}

Plugin System Design

Plugin Interface Definition:

// plugin.interface.ts
export interface NestCliPlugin {
  name: string
  commands: Array<{
    name: string
    description: string
    action: (args: any) => Promise<void>
  }>
  hooks?: {
    beforeCommand?: (command: string) => Promise<void>
    afterCommand?: (command: string) => Promise<void>
  }
}

Plugin Loader Implementation:

// plugin.loader.ts
@Injectable()
export class PluginLoader {
  private plugins: Map<string, NestCliPlugin> = new Map()

  async loadPlugins(pluginsDir: string) {
    const files = glob.sync('*.plugin.js', { cwd: pluginsDir })

    for (const file of files) {
      const pluginModule = require(join(pluginsDir, file))
      const plugin = pluginModule.default as NestCliPlugin

      this.plugins.set(plugin.name, plugin)
    }
  }

  getPlugin(name: string): NestCliPlugin | undefined {
    return this.plugins.get(name)
  }
}

Global and Local Installation

package.json Configuration:

{
  "bin": {
    "nestjs-cli": "./dist/cli.js"
  },
  "files": [
    "dist/**/*"
  ],
  "scripts": {
    "build": "tsc",
    "prepublishOnly": "npm run build"
  }
}

Local Installation Detection:

// cli.service.ts
@Injectable()
export class CliService {
  private isGlobal: boolean

  constructor() {
    this.isGlobal = !process.env.npm_config_local_prefix
  }

  async generateModule(name: string) {
    if (this.isGlobal) {
      console.log('Generating module in global mode')
      // Global installation logic
    } else {
      console.log('Generating module in local mode')
      // Local installation logic
    }
  }
}
Share your love