Microservices Architecture
Microservices Decomposition and Communication
gRPC Communication Implementation:
// grpc-client.service.ts
@Injectable()
export class GrpcClientService {
private client: ClientGrpc
constructor(@Inject('USER_PACKAGE') private clientProvider: ClientProxy) {
this.client = this.clientProvider.getClientByServiceName('UserService')
}
async getUser(id: string): Promise<User> {
return this.client.send<User>('getUser', { id }).toPromise()
}
}
// grpc-server.module.ts
@Module({
providers: [
{
provide: 'USER_PACKAGE',
useFactory: (config: ConfigService) => {
return ClientGrpc.create({
package: 'user',
protoPath: join(__dirname, 'user.proto'),
url: config.get('GRPC_SERVER_URL')
})
},
inject: [ConfigService]
}
]
})
export class GrpcServerModule {}MQTT Communication Implementation:
// mqtt-client.service.ts
@Injectable()
export class MqttClientService {
private client: MqttClient
constructor() {
this.client = connect(process.env.MQTT_BROKER_URL)
this.client.on('connect', () => {
this.client.subscribe('user/created')
})
this.client.on('message', (topic, message) => {
if (topic === 'user/created') {
const user = JSON.parse(message.toString())
// Handle user creation event
}
})
}
publishUserCreated(user: User) {
this.client.publish('user/created', JSON.stringify(user))
}
}NestJS Microservices Module
Transport Configuration:
// main.ts
async function bootstrap() {
const app = await NestFactory.createMicroservice(AppModule, {
transport: Transport.TCP,
options: {
host: 'localhost',
port: 3001
}
})
await app.listen()
}
bootstrap()
// Or use hybrid application
const app = await NestFactory.create(AppModule)
app.connectMicroservice({
transport: Transport.NATS,
options: {
url: 'nats://localhost:4222'
}
})
await app.startAllMicroservices()
await app.listen(3000)ClientProxy Advanced Configuration:
// user.client.ts
@Injectable()
export class UserClient {
constructor(
@Inject('USER_SERVICE') private client: ClientProxy
) {}
async createUser(user: UserDto): Promise<User> {
return this.client.send<User>('createUser', user).pipe(
timeout(5000),
retry(3)
).toPromise()
}
}Service Discovery and Registration
Consul Integration:
// consul.module.ts
@Module({
providers: [
{
provide: 'CONSUL_CLIENT',
useFactory: () => {
return new Consul({
host: process.env.CONSUL_HOST,
port: process.env.CONSUL_PORT
})
}
},
ConsulService
],
exports: [ConsulService]
})
export class ConsulModule {}
// consul.service.ts
@Injectable()
export class ConsulService {
constructor(@Inject('CONSUL_CLIENT') private consul: Consul) {}
async registerService(service: ServiceRegistration) {
await this.consul.agent.service.register(service)
}
async discoverService(name: string): Promise<ServiceEntry[]> {
return this.consul.catalog.service.nodes(name)
}
}Dynamic Service Discovery:
// dynamic-client-proxy.ts
@Injectable()
export class DynamicClientProxy {
private clients: Map<string, ClientProxy> = new Map()
async getClient(serviceName: string): Promise<ClientProxy> {
if (this.clients.has(serviceName)) {
return this.clients.get(serviceName)
}
const instances = await this.consulService.discoverService(serviceName)
if (instances.length === 0) {
throw new Error(`Service ${serviceName} not found`)
}
const client = ClientProxyFactory.create({
transport: Transport.TCP,
options: {
host: instances[0].Address,
port: instances[0].ServicePort
}
})
this.clients.set(serviceName, client)
return client
}
}Distributed Transactions and Message Queues
RabbitMQ Transactional Messages:
// rabbitmq.service.ts
@Injectable()
export class RabbitMQService {
private channel: Channel
constructor(@Inject('RABBITMQ_CONNECTION') private connection: Connection) {
this.channel = this.connection.createChannel()
}
async sendTransactionalMessage(exchange: string, routingKey: string, message: any) {
await this.channel.assertExchange(exchange, 'topic', { durable: true })
const tx = await this.channel.txSelect()
try {
await this.channel.publish(
exchange,
routingKey,
Buffer.from(JSON.stringify(message)),
{ persistent: true }
)
await this.channel.txCommit()
} catch (err) {
await this.channel.txRollback()
throw err
}
}
}Kafka Consumer Implementation:
// kafka.consumer.ts
@Injectable()
export class KafkaConsumer {
constructor(
@Inject('KAFKA_CLIENT') private client: ClientKafka
) {}
async listen() {
this.client.subscribeToResponseOf('user.created')
this.client.consume()
this.client.on('message', async (message) => {
if (message.topic === 'user.created') {
const user = JSON.parse(message.value.toString())
// Handle user creation event
}
})
}
}Microservices Monitoring and Logging
Prometheus Monitoring Integration:
// metrics.module.ts
@Module({
providers: [
{
provide: 'METRICS_INSTANCE',
useFactory: () => {
return new client.Registry()
}
},
MetricsService
],
exports: [MetricsService]
})
export class MetricsModule {}
// metrics.middleware.ts
@Injectable()
export class MetricsMiddleware implements NestMiddleware {
constructor(@Inject('METRICS_INSTANCE') private registry: client.Registry) {}
use(req: Request, res: Response, next: NextFunction) {
const end = this.registry.startTimer({ path: req.path, method: req.method })
res.on('finish', () => {
end()
})
next()
}
}ELK Log Aggregation:
// elk.logger.ts
export class ElkLogger {
constructor(private config: ConfigService) {}
log(message: string, context?: string, metadata?: any) {
const logEntry = {
timestamp: new Date().toISOString(),
message,
context,
...metadata
}
axios.post(this.config.get('LOGSTASH_URL'), logEntry)
}
}Real-Time Application Development
WebSocket Integration
WebSocketGateway Advanced Usage:
@WebSocketGateway({
port: 3001,
transports: ['websocket'],
cors: {
origin: '*'
}
})
export class EventsGateway {
@WebSocketServer()
server: Server
@SubscribeMessage('events')
handleEvent(client: Socket, payload: any): WsResponse<any> {
return { event: 'events', data: payload }
}
@SubscribeMessage('identity')
async identity(client: Socket, payload: number): Promise<number> {
return payload
}
}Rooms and Namespaces:
@WebSocketGateway()
export class ChatGateway {
@WebSocketServer()
server: Server
@SubscribeMessage('joinRoom')
handleJoinRoom(client: Socket, roomId: string) {
client.join(roomId)
client.to(roomId).emit('userJoined', { userId: client.id })
}
@SubscribeMessage('sendMessage')
handleMessage(client: Socket, { roomId, message }: { roomId: string; message: string }) {
client.to(roomId).emit('newMessage', { userId: client.id, message })
}
}GraphQL Integration
GraphQL Module Configuration:
@Module({
imports: [
GraphQLModule.forRoot<ApolloDriverConfig>({
driver: ApolloDriver,
autoSchemaFile: true,
context: ({ req }) => ({ req }),
plugins: [
{
requestDidStart() {
return {
willSendResponse({ context, response }) {
// Add custom response header
context.res.setHeader('X-Custom-Header', 'value')
}
}
}
}
]
})
],
providers: [UserResolver]
})
export class AppModule {}Subscription Implementation:
@Resolver('User')
export class UserResolver {
@Subscription('userCreated')
userCreated(@Root() payload: any) {
return payload
}
@ResolveField('posts')
async posts(@Parent() user: User) {
return this.postService.findByUserId(user.id)
}
}Server-Sent Events
SSE Implementation:
@Controller('events')
export class EventsController {
@Get()
@Header('Content-Type', 'text/event-stream')
@Header('Cache-Control', 'no-cache')
@Header('Connection', 'keep-alive')
async getEvents(@Res() res: Response) {
const interval = setInterval(() => {
res.write(`data: ${JSON.stringify({ time: new Date() })}\n\n`)
}, 1000)
req.on('close', () => {
clearInterval(interval)
})
}
}Real-Time Collaborative Editing
CRDT Implementation Example:
// crdt.service.ts
@Injectable()
export class CrdtService {
private documents: Map<string, Y.Doc> = new Map()
getDocument(docId: string): Y.Doc {
if (!this.documents.has(docId)) {
this.documents.set(docId, new Y.Doc())
}
return this.documents.get(docId)
}
applyRemoteUpdate(docId: string, update: Uint8Array) {
const doc = this.getDocument(docId)
Y.applyUpdate(doc, update)
}
generateLocalUpdate(docId: string): Uint8Array {
const doc = this.getDocument(docId)
return Y.encodeStateAsUpdate(doc)
}
}Real-Time Application Security
WebSocket Authentication Middleware:
@Injectable()
export class WsJwtGuard implements CanActivate {
canActivate(context: ExecutionContext): boolean {
const client = context.switchToWs().getClient()
const token = client.handshake.auth.token
try {
const decoded = verifyJwt(token)
client['user'] = decoded
return true
} catch (err) {
throw new WsException('Unauthorized')
}
}
}
// Using guard
@WebSocketGateway()
@UseGuards(WsJwtGuard)
export class SecureGateway {}CLI Tool Development
Custom Command-Line Tools
Commander.js Implementation:
#!/usr/bin/env node
import { Command } from 'commander'
import { generateModule } from './commands/generate-module'
import { generateService } from './commands/generate-service'
const program = new Command()
program
.name('nestjs-cli')
.description('NestJS Custom CLI Tool')
.version('1.0.0')
program
.command('generate:module <name>')
.description('Generate NestJS module')
.action(generateModule)
program
.command('generate:service <name>')
.description('Generate NestJS service')
.action(generateService)
program.parse(process.argv)NestJS and CLI Integration
Custom NestJS Command:
// cli.service.ts
@Injectable()
export class CliService {
async generateModule(name: string) {
const modulePath = join(process.cwd(), 'src', `${name}`, `${name}.module.ts`)
const content = `
import { Module } from '@nestjs/common'
@Module({})
export class ${capitalize(name)}Module {}
`
writeFileSync(modulePath, content)
console.log(`Module ${name} has been generated`)
}
}
// Usage in CLI tool
program
.command('generate:module <name>')
.action(async (name) => {
const cliService = container.resolve(CliService)
await cliService.generateModule(name)
})Interactive Command Line
Inquirer.js Implementation:
// interactive-cli.ts
import inquirer from 'inquirer'
async function promptOptions() {
const answers = await inquirer.prompt([
{
type: 'list',
name: 'action',
message: 'Select an action:',
choices: ['Generate module', 'Generate service', 'Exit']
},
{
type: 'input',
name: 'name',
message: 'Enter name:',
when: (answers) => ['Generate module', 'Generate service'].includes(answers.action)
}
])
return answers
}
// Main loop
async function main() {
while (true) {
const { action, name } = await promptOptions()
if (action === 'Exit') break
if (action === 'Generate module') {
await generateModule(name)
} else if (action === 'Generate service') {
await generateService(name)
}
}
}Plugin System Design
Plugin Interface Definition:
// plugin.interface.ts
export interface NestCliPlugin {
name: string
commands: Array<{
name: string
description: string
action: (args: any) => Promise<void>
}>
hooks?: {
beforeCommand?: (command: string) => Promise<void>
afterCommand?: (command: string) => Promise<void>
}
}Plugin Loader Implementation:
// plugin.loader.ts
@Injectable()
export class PluginLoader {
private plugins: Map<string, NestCliPlugin> = new Map()
async loadPlugins(pluginsDir: string) {
const files = glob.sync('*.plugin.js', { cwd: pluginsDir })
for (const file of files) {
const pluginModule = require(join(pluginsDir, file))
const plugin = pluginModule.default as NestCliPlugin
this.plugins.set(plugin.name, plugin)
}
}
getPlugin(name: string): NestCliPlugin | undefined {
return this.plugins.get(name)
}
}Global and Local Installation
package.json Configuration:
{
"bin": {
"nestjs-cli": "./dist/cli.js"
},
"files": [
"dist/**/*"
],
"scripts": {
"build": "tsc",
"prepublishOnly": "npm run build"
}
}Local Installation Detection:
// cli.service.ts
@Injectable()
export class CliService {
private isGlobal: boolean
constructor() {
this.isGlobal = !process.env.npm_config_local_prefix
}
async generateModule(name: string) {
if (this.isGlobal) {
console.log('Generating module in global mode')
// Global installation logic
} else {
console.log('Generating module in local mode')
// Local installation logic
}
}
}



