microservices in nest

Published on
Hamed Gholami-
35 min read

Overview

Creating a simple nest.js (with Grpc)

nest-microservices-project/
├── src/
│   ├── microservices/
│   │   ├── redis/
│   │   │   ├── redis.service.ts
│   │   │   ├── redis.controller.ts
│   │   │   ├── redis.module.ts
│   │   │   └── redis-client/
│   │   │       ├── redis-client.controller.ts
│   │   │       ├── redis-client.service.ts
│   │   │       └── redis-client.module.ts
│   │   ├── mqtt/
│   │   │   ├── mqtt.service.ts
│   │   │   ├── mqtt.controller.ts
│   │   │   ├── mqtt.module.ts
│   │   │   └── mqtt-client/
│   │   │       ├── mqtt-client.controller.ts
│   │   │       ├── mqtt-client.service.ts
│   │   │       └── mqtt-client.module.ts
│   │   ├── nats/
│   │   │   ├── nats.service.ts
│   │   │   ├── nats.controller.ts
│   │   │   ├── nats.module.ts
│   │   │   └── nats-client/
│   │   │       ├── nats-client.controller.ts
│   │   │       ├── nats-client.service.ts
│   │   │       └── nats-client.module.ts
│   │   ├── rabbitmq/
│   │   │   ├── rabbitmq.service.ts
│   │   │   ├── rabbitmq.controller.ts
│   │   │   ├── rabbitmq.module.ts
│   │   │   └── rabbitmq-client/
│   │   │       ├── rabbitmq-client.controller.ts
│   │   │       ├── rabbitmq-client.service.ts
│   │   │       └── rabbitmq-client.module.ts
│   │   ├── kafka/
│   │   │   ├── kafka.service.ts
│   │   │   ├── kafka.controller.ts
│   │   │   ├── kafka.module.ts
│   │   │   └── kafka-client/
│   │   │       ├── kafka-client.controller.ts
│   │   │       ├── kafka-client.service.ts
│   │   │       └── kafka-client.module.ts
│   │   ├── custom-transport/
│   │   │   ├── custom-transport.service.ts
│   │   │   ├── custom-transport.controller.ts
│   │   │   ├── custom-transport.module.ts
│   │   │   └── custom-transport-client/
│   │   │       ├── custom-transport-client.controller.ts
│   │   │       ├── custom-transport-client.service.ts
│   │   │       └── custom-transport-client.module.ts
│   │   ├── exception-filters/
│   │   │   ├── exception-filters.controller.ts
│   │   │   ├── exception-filters.service.ts
│   │   │   ├── exception-filters.module.ts
│   │   │   └── exception-filters-client/
│   │   │       ├── exception-filters-client.controller.ts
│   │   │       ├── exception-filters-client.service.ts
│   │   │       └── exception-filters-client.module.ts
│   │   ├── pipes/
│   │   │   ├── pipes.service.ts
│   │   │   ├── pipes.controller.ts
│   │   │   ├── pipes.module.ts
│   │   │   └── pipes-client/
│   │   │       ├── pipes-client.controller.ts
│   │   │       ├── pipes-client.service.ts
│   │   │       └── pipes-client.module.ts
│   │   ├── guards/
│   │   │   ├── guards.service.ts
│   │   │   ├── guards.controller.ts
│   │   │   ├── guards.module.ts
│   │   │   └── guards-client/
│   │   │       ├── guards-client.controller.ts
│   │   │       ├── guards-client.service.ts
│   │   │       └── guards-client.module.ts
│   │   ├── interceptors/
│   │       ├── interceptors.service.ts
│   │       ├── interceptors.controller.ts
│   │       ├── interceptors.module.ts
│   │       └── interceptors-client/
│   │           ├── interceptors-client.controller.ts
│   │           ├── interceptors-client.service.ts
│   │           └── interceptors-client.module.ts
│   ├── app.module.ts (Root module)
│   ├── main.ts (Entry point)
├── node_modules/ (npm dependencies)
├── test/ (test suites)
├── nest-cli.json (Nest CLI configuration)
├── tsconfig.json (TypeScript compiler options)
├── tsconfig.build.json (TypeScript compiler options for build)
├── package.json (npm package definition and scripts)
└── package-lock.json (locked versions of npm dependencies)

redis

//redis.service.ts
import { Injectable } from '@nestjs/common';
import { ClientProxy, ClientProxyFactory, Transport } from '@nestjs/microservices';
import { RedisOptions } from 'nestjs-redis';

@Injectable()
export class RedisService {
  private readonly redisClient: ClientProxy;

  constructor(private readonly redisOptions: RedisOptions) {
    // Create a Redis client using the provided Redis options.
    this.redisClient = ClientProxyFactory.create({
      transport: Transport.REDIS,
      options: redisOptions,
    });
  }

  // Set a key-value pair in Redis.
  async setValue(key: string, value: string): Promise<void> {
    await this.redisClient.emit('set', { key, value }).toPromise();
  }

  // Get the value associated with a key from Redis.
  async getValue(key: string): Promise<string | null> {
    // Send a 'get' request to Redis and await the response.
    const response = await this.redisClient.send<string | null, string>('get', key).toPromise();
    return response;
  }

  // Delete a key from Redis and return the number of keys deleted (0 or 1).
  async deleteValue(key: string): Promise<number> {
    // Send a 'del' request to Redis and await the response.
    const response = await this.redisClient.send<number, string>('del', key).toPromise();
    return response;
  }

  // Increment the value associated with a key in Redis and return the new value.
  async increment(key: string): Promise<number> {
    // Send an 'incr' request to Redis and await the response.
    const response = await this.redisClient.send<number, string>('incr', key).toPromise();
    return response;
  }
}
//redis.controller.ts
import { Controller, Get, Post, Body, Param, Delete, Put } from '@nestjs/common';
import { RedisService } from './redis.service';

@Controller('redis')
export class RedisController {
  constructor(private readonly redisService: RedisService) {}

  // POST /redis/set/:key
  @Post('set/:key')
  async setValue(@Param('key') key: string, @Body('value') value: string) {
    await this.redisService.setValue(key, value);
    return `Key "${key}" set with value "${value}"`;
  }

  // GET /redis/get/:key
  @Get('get/:key')
  async getValue(@Param('key') key: string) {
    const value = await this.redisService.getValue(key);
    if (value !== null) {
      return `Value for key "${key}": "${value}"`;
    } else {
      return `Key "${key}" not found in Redis`;
    }
  }

  // DELETE /redis/delete/:key
  @Delete('delete/:key')
  async deleteValue(@Param('key') key: string) {
    const deletedCount = await this.redisService.deleteValue(key);
    if (deletedCount === 1) {
      return `Key "${key}" deleted from Redis`;
    } else {
      return `Key "${key}" not found in Redis`;
    }
  }

  // PUT /redis/increment/:key
  @Put('increment/:key')
  async incrementValue(@Param('key') key: string) {
    const incrementedValue = await this.redisService.increment(key);
    return `Key "${key}" incremented. New value: "${incrementedValue}"`;
  }
}
//redis.module.ts
import { Module } from '@nestjs/common';
import { ClientsModule, Transport } from '@nestjs/microservices';
import { RedisService } from './redis.service';
import { RedisController } from './redis.controller';
import { RedisOptions } from 'nestjs-redis';

@Module({
  imports: [
    ClientsModule.register([
      {
        name: 'REDIS_CLIENT', // Name of the Redis client for reference
        transport: Transport.REDIS, // Use Redis as the transport layer
        options: {
          // Specify your Redis configuration options here.
          // Refer to the provided link for more details on Redis configuration.
          // Example:
          url: 'redis://localhost:6379', // Replace with your Redis server URL
        } as RedisOptions, // Define Redis-specific options from 'nestjs-redis'
      },
    ]),
  ],
  controllers: [RedisController], // Include the RedisController for handling HTTP requests
  providers: [RedisService], // Include the RedisService for Redis-related operations
})
export class RedisModule {}
//redis-client.service.ts
import { Injectable, NotFoundException } from '@nestjs/common';
import { ClientProxy, ClientProxyFactory, Transport } from '@nestjs/microservices';
import { RedisOptions } from 'nestjs-redis';

@Injectable()
export class RedisClientService {
  private readonly redisClient: ClientProxy;

  constructor(private readonly redisOptions: RedisOptions) {
    // Create a Redis client using the provided Redis options.
    this.redisClient = ClientProxyFactory.create({
      transport: Transport.REDIS,
      options: redisOptions,
    });
  }

  /**
   * Set a key-value pair in Redis.
   * @param key The Redis key.
   * @param value The value to be set.
   * @throws NotFoundException if the key does not exist.
   */
  async setValue(key: string, value: string): Promise<void> {
    // Ensure the key exists in Redis.
    const existingValue = await this.getValue(key);
    if (existingValue === null) {
      throw new NotFoundException(`Key '${key}' does not exist in Redis.`);
    }

    // Set the key-value pair in Redis.
    await this.redisClient.emit('set', { key, value }).toPromise();
  }

  /**
   * Get the value associated with a key from Redis.
   * @param key The Redis key.
   * @returns The value associated with the key, or null if the key does not exist.
   */
  async getValue(key: string): Promise<string | null> {
    // Send a 'get' request to Redis and await the response.
    const response = await this.redisClient.send<string | null, string>('get', key).toPromise();
    return response;
  }

  /**
   * Delete a key from Redis and return the number of keys deleted (0 or 1).
   * @param key The Redis key to be deleted.
   * @returns The number of keys deleted (0 or 1).
   */
  async deleteValue(key: string): Promise<number> {
    // Send a 'del' request to Redis and await the response.
    const response = await this.redisClient.send<number, string>('del', key).toPromise();
    return response;
  }

  /**
   * Increment the value associated with a key in Redis and return the new value.
   * @param key The Redis key to be incremented.
   * @returns The new incremented value.
   */
  async increment(key: string): Promise<number> {
    // Send an 'incr' request to Redis and await the response.
    const response = await this.redisClient.send<number, string>('incr', key).toPromise();
    return response;
  }
}
//redis-client.module.ts
import { Module } from '@nestjs/common';
import { ClientsModule, Transport } from '@nestjs/microservices';
import { RedisService } from './redis.service';
import { RedisClientController } from './redis-client.controller';
import { RedisClientService } from './redis-client.service';
import { RedisOptions } from 'nestjs-redis';

@Module({
  imports: [
    // Configure the Redis microservice client.
    ClientsModule.register([
      {
        name: 'REDIS_SERVICE', // A unique name for the Redis microservice client.
        transport: Transport.REDIS, // Use Redis as the transport method.

        // Provide Redis connection options here, which should match your Redis server setup.
        // You can configure host, port, password, and other connection parameters.
        options: {
          url: 'redis://localhost:6379', // Example URL, update as needed.

          // Add more Redis-specific options here as necessary.
          // For example:
          // password: 'your-redis-password',
          // db: 0, // Redis database index
          // maxRetriesPerRequest: 3, // Number of maximum retries for requests
        } as RedisOptions,
      },
    ]),
  ],
  controllers: [RedisClientController],

  // Providers encapsulate the business logic and interact with the Redis microservice client.
  providers: [RedisClientService],

  // Export the RedisClientService to make it available for other modules that import this module.
  exports: [RedisClientService],
})
export class RedisClientModule {}

mqtt

//mqtt.service.ts

import { Injectable } from '@nestjs/common';
import { ClientProxy, ClientProxyFactory, Transport } from '@nestjs/microservices';

@Injectable()
export class MqttService {
  private readonly mqttClient: ClientProxy;

  constructor() {
    // Create an MQTT client using the specified transport.
    this.mqttClient = ClientProxyFactory.create({
      transport: Transport.MQTT,
      options: {
        // Specify your MQTT broker connection details here.
        // host: 'mqtt://localhost', // Example MQTT broker host
        // port: 1883, // Example MQTT broker port
      },
    });
  }

  // Publish a message to an MQTT topic.
  async publish(topic: string, message: any): Promise<void> {
    // Use the `emit` method to publish a message to the MQTT topic.
    await this.mqttClient.emit(topic, message).toPromise();
  }

  // Subscribe to an MQTT topic and handle incoming messages.
  async subscribe(topic: string, callback: (message: any) => void): Promise<void> {
    // Connect to the MQTT broker before subscribing to a topic.
    await this.mqttClient.connect();

    // Subscribe to the specified MQTT topic.
    await this.mqttClient.subscribe(topic);

    // Use the `observe` method to receive incoming messages and handle them using a callback.
    this.mqttClient.observe(topic).subscribe((message) => {
      // Handle incoming messages using the provided callback function.
      callback(message);
    });
  }

  // Unsubscribe from an MQTT topic.
  async unsubscribe(topic: string): Promise<void> {
    // Unsubscribe from the specified MQTT topic.
    await this.mqttClient.unsubscribe(topic);
  }
}
//mqtt.controller.ts

import { Controller, Get, Post, Delete, Param, Body, Res } from '@nestjs/common';
import { MqttService } from './mqtt.service'; // Import the MQTT service
import { Response } from 'express'; // Import the Express Response object for HTTP responses

@Controller('mqtt') // Define a controller with the base route '/mqtt'
export class MqttController {
  constructor(private readonly mqttService: MqttService) {}

  // Define an HTTP endpoint to publish a message to an MQTT topic
  // This corresponds to the '/mqtt/publish/:topic' route
  @Post('publish/:topic')
  async publishMessage(@Param('topic') topic: string, @Body() message: any): Promise<void> {
    try {
      // Use the MQTT service to publish the message to the specified topic
      await this.mqttService.publish(topic, message);
      // Respond with a success message
      return 'Message published to MQTT topic successfully';
    } catch (error) {
      // Handle any errors that may occur during publishing
      throw new Error(`Failed to publish message: ${error.message}`);
    }
  }

  // Define an HTTP endpoint to subscribe to an MQTT topic and handle incoming messages
  // This corresponds to the '/mqtt/subscribe/:topic' route
  @Get('subscribe/:topic')
  async subscribeToTopic(@Param('topic') topic: string, @Res() response: Response): Promise<void> {
    try {
      // Create an MQTT subscription and handle incoming messages
      await this.mqttService.subscribe(topic, (message) => {
        // Handle incoming messages here (e.g., log, send a response)
        console.log(`Received MQTT message on topic '${topic}':`, message);

        // Send an HTTP response with the received message
        response.status(200).json(message);
      });
    } catch (error) {
      // Handle any errors that may occur during subscription
      throw new Error(`Failed to subscribe to topic: ${error.message}`);
    }
  }

  // Define an HTTP endpoint to unsubscribe from an MQTT topic
  // This corresponds to the '/mqtt/unsubscribe/:topic' route
  @Delete('unsubscribe/:topic')
  async unsubscribeFromTopic(@Param('topic') topic: string): Promise<void> {
    try {
      // Unsubscribe from the specified MQTT topic
      await this.mqttService.unsubscribe(topic);
      // Respond with a success message
      return 'Unsubscribed from MQTT topic successfully';
    } catch (error) {
      // Handle any errors that may occur during unsubscription
      throw new Error(`Failed to unsubscribe from topic: ${error.message}`);
    }
  }
}
//mqtt.module.ts

import { Module } from '@nestjs/common';
import { MqttService } from './mqtt.service'; // Import the MQTT service
import { MqttController } from './mqtt.controller'; // Import the MQTT controller

@Module({
  controllers: [MqttController], // Declare the MQTT controller
  providers: [MqttService], // Declare the MQTT service
})
export class MqttModule {}
//mqtt-client.service.ts

import { Injectable } from '@nestjs/common';
import { ClientProxy, ClientProxyFactory, Transport } from '@nestjs/microservices';

@Injectable()
export class MqttClientService {
  private readonly mqttClient: ClientProxy;

  constructor() {
    // Create an MQTT client using the specified transport.
    this.mqttClient = ClientProxyFactory.create({
      transport: Transport.MQTT,
      options: {
        // Specify your MQTT broker connection details here.
        // host: 'mqtt://localhost', // Example MQTT broker host
        // port: 1883, // Example MQTT broker port
      },
    });
  }

  /**
   * Publish a message to an MQTT topic.
   * @param topic The MQTT topic to publish to.
   * @param message The message to publish.
   */
  async publish(topic: string, message: any): Promise<void> {
    // Use the `emit` method to publish a message to the MQTT topic.
    await this.mqttClient.emit(topic, message).toPromise();
  }

  /**
   * Subscribe to an MQTT topic and handle incoming messages.
   * @param topic The MQTT topic to subscribe to.
   * @param callback A callback function to handle incoming messages.
   */
  async subscribe(topic: string, callback: (message: any) => void): Promise<void> {
    // Connect to the MQTT broker before subscribing to a topic.
    await this.mqttClient.connect();

    // Subscribe to the specified MQTT topic.
    await this.mqttClient.subscribe(topic);

    // Use the `observe` method to receive incoming messages and handle them using a callback.
    this.mqttClient.observe(topic).subscribe((message) => {
      // Handle incoming messages using the provided callback function.
      callback(message);
    });
  }

  /**
   * Unsubscribe from an MQTT topic.
   * @param topic The MQTT topic to unsubscribe from.
   */
  async unsubscribe(topic: string): Promise<void> {
    // Unsubscribe from the specified MQTT topic.
    await this.mqttClient.unsubscribe(topic);
  }
}
//mqtt-client.controller.ts

import { Controller, Inject, Post, Body } from '@nestjs/common';
import { MqttClientService } from './mqtt.service';

@Controller('mqtt')
export class MqttClientController {
  constructor(@Inject(MqttClientService) private readonly mqttService: MqttClientService) {}

  /**
   * Endpoint to publish a message to an MQTT topic.
   *
   * This endpoint allows clients to send messages to an MQTT topic.
   * It expects a JSON payload with the "topic" and "message" fields.
   *
   * Example request:
   * POST /mqtt/publish
   * Request body: { "topic": "exampleTopic", "message": "Hello, MQTT!" }
   */
  @Post('publish')
  async publishMessage(@Body() messageData: { topic: string; message: any }): Promise<void> {
    const { topic, message } = messageData;

    // Use the MQTT service to publish the message to the specified topic
    await this.mqttService.publish(topic, message);
  }

  /**
   * Endpoint to subscribe to an MQTT topic and handle incoming messages.
   *
   * This endpoint allows clients to subscribe to an MQTT topic and receive incoming messages.
   * It expects a JSON payload with the "topic" field.
   *
   * Example request:
   * POST /mqtt/subscribe
   * Request body: { "topic": "exampleTopic" }
   */
  @Post('subscribe')
  async subscribeToTopic(@Body() topicData: { topic: string }): Promise<void> {
    const { topic } = topicData;

    // Define a callback function to handle incoming MQTT messages
    const handleMqttMessage = (message: any) => {
      // Handle the incoming message as needed
      console.log(`Received MQTT message on topic ${topic}:`, message);
    };

    // Use the MQTT service to subscribe to the specified topic and handle incoming messages
    await this.mqttService.subscribe(topic, handleMqttMessage);
  }

  /**
   * Endpoint to unsubscribe from an MQTT topic.
   *
   * This endpoint allows clients to unsubscribe from an MQTT topic.
   * It expects a JSON payload with the "topic" field.
   *
   * Example request:
   * POST /mqtt/unsubscribe
   * Request body: { "topic": "exampleTopic" }
   */
  @Post('unsubscribe')
  async unsubscribeFromTopic(@Body() topicData: { topic: string }): Promise<void> {
    const { topic } = topicData;

    // Use the MQTT service to unsubscribe from the specified topic
    await this.mqttService.unsubscribe(topic);
  }
}
//mqtt-client.module.ts

import { Module } from '@nestjs/common';
import { ClientsModule, Transport, ClientProxyFactory } from '@nestjs/microservices';
import { MqttService } from './mqtt.service';

@Module({
  imports: [
    // Use the `ClientsModule` to create an MQTT client instance.
    // The `name` property is used to identify the client.
    // The `transport` property is set to `Transport.MQTT` for MQTT communication.
    ClientsModule.register([
      {
        name: 'MQTT_CLIENT', // Client name
        transport: Transport.MQTT, // Transport type (MQTT)
        options: {
          // Specify MQTT broker connection details here.
          // host: 'mqtt://localhost', // Example MQTT broker host
          // port: 1883, // Example MQTT broker port
        },
      },
    ]),
  ],
  providers: [MqttService], // The MQTT service provided by this module
  exports: [MqttService], // Export the MQTT service for use in other modules
})
export class MqttClientModule {}

NATS

//nats.service.ts

import { Injectable } from '@nestjs/common';
import { ClientProxy, ClientProxyFactory, Transport } from '@nestjs/microservices';

@Injectable()
export class NatsService {
  private readonly natsClient: ClientProxy;

  constructor() {
    // Create a NATS client using the specified transport.
    this.natsClient = ClientProxyFactory.create({
      transport: Transport.NATS,
      options: {
        // Specify NATS server connection options here.
        // servers: ['nats://localhost:4222'], // Example NATS server URL
      },
    });
  }

  /**
   * Publishes a message to a NATS subject.
   * @param subject - The NATS subject to which the message will be published.
   * @param message - The message payload to be sent.
   */
  async publish(subject: string, message: any): Promise<void> {
    // Use the `emit` method to publish a message to the NATS subject.
    await this.natsClient.emit(subject, message).toPromise();
  }

  /**
   * Subscribes to a NATS subject and handles incoming messages with a callback function.
   * @param subject - The NATS subject to subscribe to.
   * @param callback - A callback function to handle incoming messages.
   */
  async subscribe(subject: string, callback: (message: any) => void): Promise<void> {
    // Connect to the NATS server before subscribing to a subject.
    await this.natsClient.connect();

    // Subscribe to the specified NATS subject.
    await this.natsClient.subscribe(subject);

    // Use the `observe` method to receive incoming messages and handle them using a callback.
    this.natsClient.observe(subject).subscribe((message) => {
      // Handle incoming messages using the provided callback function.
      callback(message);
    });
  }

  /**
   * Unsubscribes from a NATS subject.
   * @param subject - The NATS subject to unsubscribe from.
   */
  async unsubscribe(subject: string): Promise<void> {
    // Unsubscribe from the specified NATS subject.
    await this.natsClient.unsubscribe(subject);
  }

  /**
   * Closes the NATS connection.
   */
  async closeConnection(): Promise<void> {
    // Close the NATS connection when it's no longer needed.
    await this.natsClient.close();
  }
}
//nats.controller.ts
import { Controller } from '@nestjs/common';
import { MessagePattern, ClientProxy, Transport, ClientProxyFactory } from '@nestjs/microservices';
import { NatsService } from './nats.service';

@Controller()
export class NatsController {
  private readonly natsClient: ClientProxy;

  constructor(private readonly natsService: NatsService) {
    // Create a NATS client using the specified transport.
    this.natsClient = ClientProxyFactory.create({
      transport: Transport.NATS,
      options: {
        // Specify NATS server connection options here.
        // servers: ['nats://localhost:4222'], // Example NATS server URL
      },
    });
  }

  /**
   * Message handler for incoming messages on the specified NATS subject.
   * @param message - The incoming message payload.
   */
  @MessagePattern('your-nats-subject') // Replace with your NATS subject
  async handleMessage(message: any): Promise<void> {
    try {
      // Handle incoming messages here.
      // You can process the message and invoke service methods as needed.
      const result = await this.natsService.processMessage(message);

      // Optionally, you can send a response back to the sender.
      this.natsClient.emit('response-nats-subject', result);
    } catch (error) {
      // Handle errors or exceptions that occur during message processing.
      // You can log the error or take appropriate actions.
    }
  }
}
//nats.module.ts
import { Module } from '@nestjs/common';
import { ClientsModule, Transport, ClientProxyFactory } from '@nestjs/microservices';
import { NatsService } from './nats.service';

@Module({
  imports: [
    // Create a client connection to the NATS server
    ClientsModule.register([
      {
        name: 'NATS_CLIENT', // Unique name for the NATS client
        transport: Transport.NATS, // Use NATS transport
        options: {
          // Specify NATS server connection options here
          // For example, servers: ['nats://localhost:4222']
        },
      },
    ]),
  ],
  providers: [NatsService], // Declare NatsService as a provider
  exports: [NatsService], // Export NatsService for use in other modules
})
export class NatsModule {
  constructor() {
    // Create a NATS client using ClientProxyFactory
    const natsClient = ClientProxyFactory.create({
      transport: Transport.NATS,
      options: {
        // Specify NATS server connection options here
        // For example, servers: ['nats://localhost:4222']
      },
    });

    // Establish a connection to the NATS server
    natsClient.connect();
  }
}
//nats-client.service.ts
import { Injectable } from '@nestjs/common';
import { ClientProxyFactory, Transport, ClientProxy } from '@nestjs/microservices';

@Injectable()
export class NatsClientService {
  private readonly natsClient: ClientProxy;

  constructor() {
    // Create a NATS client using the specified transport.
    this.natsClient = ClientProxyFactory.create({
      transport: Transport.NATS,
      options: {
        // Specify NATS server connection options here.
        // servers: ['nats://localhost:4222'], // Example NATS server URL
      },
    });
  }

  /**
   * Publishes a message to a NATS subject.
   * @param subject - The NATS subject to which the message will be published.
   * @param message - The message payload to be sent.
   */
  async publish(subject: string, message: any): Promise<void> {
    // Use the `emit` method to publish a message to the NATS subject.
    await this.natsClient.emit(subject, message).toPromise();
  }

  /**
   * Subscribes to a NATS subject and handles incoming messages with a callback function.
   * @param subject - The NATS subject to subscribe to.
   * @param callback - A callback function to handle incoming messages.
   */
  async subscribe(subject: string, callback: (message: any) => void): Promise<void> {
    // Connect to the NATS server before subscribing to a subject.
    await this.natsClient.connect();

    // Subscribe to the specified NATS subject.
    await this.natsClient.subscribe(subject);

    // Use the `observe` method to receive incoming messages and handle them using a callback.
    this.natsClient.observe(subject).subscribe((message) => {
      // Handle incoming messages using the provided callback function.
      callback(message);
    });
  }

  /**
   * Unsubscribes from a NATS subject.
   * @param subject - The NATS subject to unsubscribe from.
   */
  async unsubscribe(subject: string): Promise<void> {
    // Unsubscribe from the specified NATS subject.
    await this.natsClient.unsubscribe(subject);
  }

  /**
   * Closes the NATS connection.
   */
  async closeConnection(): Promise<void> {
    // Close the NATS connection when it's no longer needed.
    await this.natsClient.close();
  }
}
//nats-client.controller.ts
import { Controller, Post, Body, Param } from '@nestjs/common';
import { NatsService } from './nats.service';

@Controller('nats')
export class NatsClientController {
  constructor(private readonly natsService: NatsService) {}

  // Endpoint to publish a message to a NATS subject
  @Post('publish/:subject')
  async publishMessage(@Param('subject') subject: string, @Body() message: any) {
    try {
      await this.natsService.publish(subject, message);
      return 'Message published successfully';
    } catch (error) {
      throw new InternalServerErrorException('Failed to publish message to NATS');
    }
  }

  // Endpoint to subscribe to a NATS subject
  @Post('subscribe/:subject')
  async subscribeToSubject(@Param('subject') subject: string) {
    try {
      await this.natsService.subscribe(subject, (message) => {
        // Handle incoming messages here
      });
      return 'Subscribed to NATS subject';
    } catch (error) {
      throw new InternalServerErrorException('Failed to subscribe to NATS subject');
    }
  }

  // Endpoint to unsubscribe from a NATS subject
  @Post('unsubscribe/:subject')
  async unsubscribeFromSubject(@Param('subject') subject: string) {
    try {
      await this.natsService.unsubscribe(subject);
      return 'Unsubscribed from NATS subject';
    } catch (error) {
      throw new InternalServerErrorException('Failed to unsubscribe from NATS subject');
    }
  }

  // Endpoint to close the NATS connection
  @Post('close-connection')
  async closeNatsConnection() {
    try {
      await this.natsService.closeConnection();
      return 'NATS connection closed';
    } catch (error) {
      throw new InternalServerErrorException('Failed to close NATS connection');
    }
  }
}
//nats-client.module.ts

import { Module, Global } from '@nestjs/common';
import { ClientProxy, ClientProxyFactory, Transport } from '@nestjs/microservices';
import { NatsService } from './nats.service';

@Global() // Mark the module as global for shared access across the entire application
@Module({
  providers: [
    NatsService,
    {
      provide: 'NATS_CLIENT', // Provide a token for injection
      useFactory: (): ClientProxy => {
        // Create a NATS client using the specified transport and connection options
        return ClientProxyFactory.create({
          transport: Transport.NATS,
          options: {
            // Specify NATS server connection options here.
            // servers: ['nats://localhost:4222'], // Example NATS server URL
          },
        });
      },
    },
  ],
  exports: [NatsService], // Export the NatsService for use in other modules
})
export class NatsClientModule {}

rabbitmq

//rabbitmq.service.ts

import { Injectable } from '@nestjs/common';
import { ClientProxy, ClientProxyFactory, Transport } from '@nestjs/microservices';

@Injectable()
export class RabbitmqService {
  private readonly rabbitmqClient: ClientProxy;

  constructor() {
    this.rabbitmqClient = ClientProxyFactory.create({
      transport: Transport.RMQ,
      options: {
        urls: ['amqp://localhost:5672'], // RabbitMQ server URL
        queue: 'my_queue', // Queue name
        queueOptions: {
          durable: true, // Make the queue durable
        },
        exchange: 'my_exchange', // Exchange name
        exchangeType: 'fanout', // Exchange type
        exchangeOptions: {
          durable: true, // Make the exchange durable
        },
      },
    });
  }

  async sendToQueue(pattern: string, data: any): Promise<void> {
    await this.rabbitmqClient.send(pattern, data).toPromise();
  }

  async publishToExchange(exchange: string, pattern: string, data: any): Promise<void> {
    await this.rabbitmqClient.emit(pattern, data, { exchange }).toPromise();
  }

  async receive(pattern: string, callback: (data: any) => void): Promise<void> {
    await this.rabbitmqClient.connect();
    await this.rabbitmqClient.subscribe(pattern, callback);
  }

  async bindQueueToExchange(queue: string, exchange: string, pattern: string): Promise<void> {
    await this.rabbitmqClient.bindQueue(queue, exchange, pattern);
  }

  async unbindQueueFromExchange(queue: string, exchange: string, pattern: string): Promise<void> {
    await this.rabbitmqClient.unbindQueue(queue, exchange, pattern);
  }

  async assertQueue(queue: string, options: { durable: boolean }): Promise<void> {
    await this.rabbitmqClient.assertQueue(queue, options);
  }

  async assertExchange(
    exchange: string,
    options: { durable: boolean; type: string }
  ): Promise<void> {
    await this.rabbitmqClient.assertExchange(exchange, options);
  }

  async deleteQueue(queue: string): Promise<void> {
    await this.rabbitmqClient.deleteQueue(queue);
  }

  async deleteExchange(exchange: string): Promise<void> {
    await this.rabbitmqClient.deleteExchange(exchange);
  }

  async purgeQueue(queue: string): Promise<void> {
    await this.rabbitmqClient.purgeQueue(queue);
  }

  async closeConnection(): Promise<void> {
    await this.rabbitmqClient.close();
  }
}
//rabbitmq.controller.ts
import { Controller } from '@nestjs/common';
import { MessagePattern, Payload, Ctx, RmqContext } from '@nestjs/microservices';
import { RabbitmqService } from './rabbitmq.service';

@Controller()
export class RabbitmqController {
  constructor(private readonly rabbitmqService: RabbitmqService) {}

  @MessagePattern('send_to_queue')
  async sendToQueue(@Payload() data: any): Promise<void> {
    await this.rabbitmqService.sendToQueue('my_queue', data);
  }

  @MessagePattern('publish_to_exchange')
  async publishToExchange(@Payload() data: any, @Ctx() context: RmqContext): Promise<void> {
    const channel = context.getChannelRef();
    const originalMessage = context.getMessage();

    await this.rabbitmqService.publishToExchange('my_exchange', 'fanout', data);

    channel.ack(originalMessage);
  }

  @MessagePattern('receive')
  async receive(@Payload() data: any): Promise<void> {
    // Handle incoming data
    console.log('Received:', data);
  }

  @MessagePattern('bind_queue_to_exchange')
  async bindQueueToExchange(@Payload() data: any): Promise<void> {
    await this.rabbitmqService.bindQueueToExchange('my_queue', 'my_exchange', 'fanout');
  }

  @MessagePattern('unbind_queue_from_exchange')
  async unbindQueueFromExchange(@Payload() data: any): Promise<void> {
    await this.rabbitmqService.unbindQueueFromExchange('my_queue', 'my_exchange', 'fanout');
  }

  @MessagePattern('assert_queue')
  async assertQueue(@Payload() data: any): Promise<void> {
    await this.rabbitmqService.assertQueue('my_queue', { durable: true });
  }

  @MessagePattern('assert_exchange')
  async assertExchange(@Payload() data: any): Promise<void> {
    await this.rabbitmqService.assertExchange('my_exchange', { durable: true, type: 'fanout' });
  }

  @MessagePattern('delete_queue')
  async deleteQueue(@Payload() data: any): Promise<void> {
    await this.rabbitmqService.deleteQueue('my_queue');
  }

  @MessagePattern('delete_exchange')
  async deleteExchange(@Payload() data: any): Promise<void> {
    await this.rabbitmqService.deleteExchange('my_exchange');
  }

  @MessagePattern('purge_queue')
  async purgeQueue(@Payload() data: any): Promise<void> {
    await this.rabbitmqService.purgeQueue('my_queue');
  }
}
//rabbitmq.module.ts

import { Module } from '@nestjs/common';
import { ClientsModule, Transport, RmqOptions } from '@nestjs/microservices';
import { RabbitmqController } from './rabbitmq.controller';
import { RabbitmqService } from './rabbitmq.service';

@Module({
  imports: [
    ClientsModule.register([
      {
        name: 'RABBITMQ_SERVICE', // Service name
        transport: Transport.RMQ,
        options: {
          urls: ['amqp://localhost:5672'], // RabbitMQ server URL
          queue: 'my_queue', // Queue name
          queueOptions: {
            durable: true, // Make the queue durable
          },
          exchange: 'my_exchange', // Exchange name
          exchangeType: 'fanout', // Exchange type
          exchangeOptions: {
            durable: true, // Make the exchange durable
          },
        } as RmqOptions,
      },
    ]),
  ],
  controllers: [RabbitmqController],
  providers: [RabbitmqService],
})
export class RabbitmqModule {}
//rabbitmq-client.service.ts

import { Injectable } from '@nestjs/common';
import { ClientProxy, ClientProxyFactory, Transport, RmqOptions } from '@nestjs/microservices';

@Injectable()
export class RabbitmqClientService {
  private readonly rabbitmqClient: ClientProxy;

  constructor() {
    this.rabbitmqClient = ClientProxyFactory.create({
      transport: Transport.RMQ,
      options: {
        urls: ['amqp://localhost:5672'], // RabbitMQ server URL
        queue: 'my_queue', // Queue name
        queueOptions: {
          durable: true, // Make the queue durable
        },
        exchange: 'my_exchange', // Exchange name
        exchangeType: 'fanout', // Exchange type
        exchangeOptions: {
          durable: true, // Make the exchange durable
        },
      } as RmqOptions,
    });
  }

  async sendToQueue(pattern: string, data: any): Promise<void> {
    await this.rabbitmqClient.send(pattern, data).toPromise();
  }

  async publishToExchange(exchange: string, pattern: string, data: any): Promise<void> {
    await this.rabbitmqClient.emit(pattern, data, { exchange }).toPromise();
  }

  async receive(pattern: string, callback: (data: any) => void): Promise<void> {
    await this.rabbitmqClient.connect();
    await this.rabbitmqClient.subscribe(pattern, callback);
  }

  async bindQueueToExchange(queue: string, exchange: string, pattern: string): Promise<void> {
    await this.rabbitmqClient.bindQueue(queue, exchange, pattern);
  }

  async unbindQueueFromExchange(queue: string, exchange: string, pattern: string): Promise<void> {
    await this.rabbitmqClient.unbindQueue(queue, exchange, pattern);
  }

  async assertQueue(queue: string, options: { durable: boolean }): Promise<void> {
    await this.rabbitmqClient.assertQueue(queue, options);
  }

  async assertExchange(
    exchange: string,
    options: { durable: boolean; type: string }
  ): Promise<void> {
    await this.rabbitmqClient.assertExchange(exchange, options);
  }

  async deleteQueue(queue: string): Promise<void> {
    await this.rabbitmqClient.deleteQueue(queue);
  }

  async deleteExchange(exchange: string): Promise<void> {
    await this.rabbitmqClient.deleteExchange(exchange);
  }

  async purgeQueue(queue: string): Promise<void> {
    await this.rabbitmqClient.purgeQueue(queue);
  }

  async closeConnection(): Promise<void> {
    await this.rabbitmqClient.close();
  }
}
//rabbitmq-client.controller.ts
import { Controller } from '@nestjs/common';
import { MessagePattern, Payload } from '@nestjs/microservices';
import { RabbitmqClientService } from './rabbitmq-client.service';

@Controller()
export class RabbitmqClientController {
  constructor(private readonly rabbitmqClientService: RabbitmqClientService) {}

  @MessagePattern('send-to-queue')
  async sendToQueue(@Payload() data: any): Promise<void> {
    // Send a message to a RabbitMQ queue
    await this.rabbitmqClientService.sendToQueue('my_queue', data);
  }

  @MessagePattern('publish-to-exchange')
  async publishToExchange(@Payload() data: any): Promise<void> {
    // Publish a message to a RabbitMQ exchange
    await this.rabbitmqClientService.publishToExchange('my_exchange', 'fanout', data);
  }

  @MessagePattern('receive')
  async receive(@Payload() data: any): Promise<void> {
    // Receive and process messages from RabbitMQ queue
    await this.rabbitmqClientService.receive('my_queue', (message) => {
      // Handle the received message
    });
  }

  @MessagePattern('bind-queue-to-exchange')
  async bindQueueToExchange(@Payload() data: any): Promise<void> {
    // Bind a queue to a RabbitMQ exchange
    await this.rabbitmqClientService.bindQueueToExchange('my_queue', 'my_exchange', 'fanout');
  }

  @MessagePattern('unbind-queue-from-exchange')
  async unbindQueueFromExchange(@Payload() data: any): Promise<void> {
    // Unbind a queue from a RabbitMQ exchange
    await this.rabbitmqClientService.unbindQueueFromExchange('my_queue', 'my_exchange', 'fanout');
  }

  @MessagePattern('assert-queue')
  async assertQueue(@Payload() data: any): Promise<void> {
    // Assert the existence of a RabbitMQ queue
    await this.rabbitmqClientService.assertQueue('my_queue', { durable: true });
  }

  @MessagePattern('assert-exchange')
  async assertExchange(@Payload() data: any): Promise<void> {
    // Assert the existence of a RabbitMQ exchange
    await this.rabbitmqClientService.assertExchange('my_exchange', {
      durable: true,
      type: 'fanout',
    });
  }

  @MessagePattern('delete-queue')
  async deleteQueue(@Payload() data: any): Promise<void> {
    // Delete a RabbitMQ queue
    await this.rabbitmqClientService.deleteQueue('my_queue');
  }

  @MessagePattern('delete-exchange')
  async deleteExchange(@Payload() data: any): Promise<void> {
    // Delete a RabbitMQ exchange
    await this.rabbitmqClientService.deleteExchange('my_exchange');
  }

  @MessagePattern('purge-queue')
  async purgeQueue(@Payload() data: any): Promise<void> {
    // Purge all messages from a RabbitMQ queue
    await this.rabbitmqClientService.purgeQueue('my_queue');
  }

  @MessagePattern('close-connection')
  async closeConnection(@Payload() data: any): Promise<void> {
    // Close the RabbitMQ connection
    await this.rabbitmqClientService.closeConnection();
  }
}
//rabbitmq-client.module.ts
import { Module } from '@nestjs/common';
import { ClientsModule, Transport, ClientProxyFactory } from '@nestjs/microservices';
import { RabbitmqClientController } from './rabbitmq-client.controller';
import { RabbitmqClientService } from './rabbitmq-client.service';

@Module({
  imports: [
    ClientsModule.register([
      {
        name: 'RABBITMQ_SERVICE',
        transport: Transport.RMQ,
        options: {
          urls: ['amqp://localhost:5672'], // RabbitMQ server URL
          queue: 'my_queue', // Queue name
          queueOptions: {
            durable: true, // Make the queue durable
          },
          exchange: 'my_exchange', // Exchange name
          exchangeType: 'fanout', // Exchange type
          exchangeOptions: {
            durable: true, // Make the exchange durable
          },
        },
      },
    ]),
  ],
  controllers: [RabbitmqClientController],
  providers: [RabbitmqClientService],
})
export class RabbitmqClientModule {}

kafka

//kafka.service.ts
import { Injectable } from '@nestjs/common';
import { ClientKafka, ClientProxyFactory, Transport } from '@nestjs/microservices';

@Injectable()
export class KafkaService {
  private readonly kafkaClient: ClientKafka;

  constructor() {
    this.kafkaClient = ClientProxyFactory.create({
      transport: Transport.KAFKA,
      options: {
        client: {
          clientId: 'kafka-client',
          brokers: ['localhost:9092'], // Kafka broker(s) URL
        },
        consumer: {
          groupId: 'my-group', // Consumer group ID
        },
      },
    });
  }

  async send(topic: string, message: any): Promise<void> {
    await this.kafkaClient.send(topic, message).toPromise();
  }

  async emit(topic: string, message: any): Promise<void> {
    await this.kafkaClient.emit(topic, message).toPromise();
  }

  async subscribeToResponseOf(pattern: string): Promise<void> {
    await this.kafkaClient.subscribeToResponseOf(pattern);
  }

  async close(): Promise<void> {
    await this.kafkaClient.close();
  }
}
//kafka.controller.ts
import { Controller, Post, Body, Get, Query } from '@nestjs/common';
import { KafkaService } from './kafka.service';
import { MessagePattern, Payload, Ctx, KafkaContext } from '@nestjs/microservices';

@Controller('kafka')
export class KafkaController {
  constructor(private readonly kafkaService: KafkaService) {}

  @Post('send')
  async sendMessage(@Body('topic') topic: string, @Body('message') message: any) {
    await this.kafkaService.send(topic, message);
  }

  @Post('emit')
  async emitMessage(@Body('topic') topic: string, @Body('message') message: any) {
    await this.kafkaService.emit(topic, message);
  }

  @MessagePattern('topic-name') // Replace 'topic-name' with your topic
  async handleMessage(@Payload() message: any, @Ctx() context: KafkaContext) {
    const originalMessage = context.getMessage();
    const responseTopic = context.getTopic();
    console.log(
      `Received message: ${JSON.stringify(originalMessage.value)} on topic: ${responseTopic}`
    );
    // Handle the message
  }

  @Get('request-response')
  async requestResponse(@Query('pattern') pattern: string, @Query('message') message: string) {
    await this.kafkaService.subscribeToResponseOf(pattern);
    return this.kafkaService.send(pattern, message);
  }
}
//kafka.module.ts
import { Module, OnModuleInit, OnModuleDestroy } from '@nestjs/common';
import { KafkaService } from './kafka.service';
import { ClientsModule, Transport } from '@nestjs/microservices';
import { KafkaOptions, TransportStrategy } from '@nestjs/microservices/interfaces';

@Module({
  imports: [
    ClientsModule.registerAsync([
      {
        name: 'KAFKA_SERVICE',
        useFactory: (): KafkaOptions['options'] => ({
          transport: Transport.KAFKA,
          options: {
            client: {
              clientId: 'kafka-client',
              brokers: ['localhost:9092'],
            },
            consumer: {
              groupId: 'my-group',
            },
            subscribe: {
              // Kafka consumer subscribe options
            },
            run: {
              // Kafka consumer run options
            },
          },
        }),
      },
    ]),
  ],
  providers: [KafkaService],
  exports: [KafkaService],
})
export class KafkaModule implements OnModuleInit, OnModuleDestroy {
  constructor(private readonly kafkaService: KafkaService) {}

  async onModuleInit() {
    // Logic to be executed during module initialization
  }

  async onModuleDestroy() {
    // Ensure graceful Kafka client disconnection
    await this.kafkaService.close();
  }
}
//kafka-client.service.ts
import { Injectable, OnModuleInit, OnModuleDestroy } from '@nestjs/common';
import { ClientKafka, ClientProxyFactory, Transport } from '@nestjs/microservices';

@Injectable()
export class KafkaClientService implements OnModuleInit, OnModuleDestroy {
  private readonly kafkaClient: ClientKafka;

  constructor() {
    this.kafkaClient = ClientProxyFactory.create({
      transport: Transport.KAFKA,
      options: {
        client: {
          clientId: 'kafka-client',
          brokers: ['localhost:9092'],
        },
        consumer: {
          groupId: 'kafka-client-consumer',
        },
      },
    });
  }

  async onModuleInit() {
    const requestPatterns = ['pattern1', 'pattern2']; // Add your request patterns here
    requestPatterns.forEach((pattern) => this.kafkaClient.subscribeToResponseOf(pattern));
    await this.kafkaClient.connect();
  }

  async send(topic: string, message: any): Promise<void> {
    return this.kafkaClient.send(topic, message).toPromise();
  }

  async emit(topic: string, message: any): Promise<void> {
    return this.kafkaClient.emit(topic, message).toPromise();
  }

  async onModuleDestroy() {
    await this.kafkaClient.close();
  }
}
//kafka-client.controller.ts
import { Controller, Post, Body, Get, Query } from '@nestjs/common';
import { KafkaService } from './kafka.service';
import { Observable } from 'rxjs';

@Controller('kafka-client')
export class KafkaClientController {
  constructor(private readonly kafkaService: KafkaService) {}

  @Post('send')
  async sendMessage(@Body('topic') topic: string, @Body('message') message: any): Promise<void> {
    await this.kafkaService.send(topic, message);
  }

  @Post('emit')
  async emitMessage(@Body('topic') topic: string, @Body('message') message: any): Promise<void> {
    await this.kafkaService.emit(topic, message);
  }

  @Get('subscribe')
  async subscribeToTopic(@Query('pattern') pattern: string): Promise<void> {
    await this.kafkaService.subscribeToResponseOf(pattern);
  }

  // Additional methods can be added here to handle more Kafka operations as needed
}
//kafka-client.module.ts
import { Module } from '@nestjs/common';
import { KafkaService } from './kafka.service';
import { KafkaClientController } from './kafka-client.controller';
import { ClientsModule, Transport } from '@nestjs/microservices';

@Module({
  imports: [
    ClientsModule.register([
      {
        name: 'KAFKA_SERVICE',
        transport: Transport.KAFKA,
        options: {
          client: {
            clientId: 'kafka-client',
            brokers: ['localhost:9092'],
          },
          consumer: {
            groupId: 'my-group',
          },
        },
      },
    ]),
  ],
  controllers: [KafkaClientController],
  providers: [KafkaService],
})
export class KafkaClientModule {}

custom-transport

//custom-transport.service.ts
import { Injectable } from '@nestjs/common';
import {
  Server,
  CustomTransportStrategy,
  PacketId,
  ReadPacket,
  WritePacket,
} from '@nestjs/microservices';

@Injectable()
export class CustomTransportService extends Server implements CustomTransportStrategy {
  private server: any; // This should be replaced with your actual server type.

  public async listen(callback: () => void) {
    this.server = this.createCustomServer();
    this.bindEvents(this.server);
    this.server.listen(() => {
      console.log('Custom transport server is listening');
      callback();
    });
  }

  public close() {
    this.server.close();
  }

  private createCustomServer(): any {
    // Replace this with actual server initialization logic
    return {
      listen: (callback: () => void) => callback(),
      close: () => {},
      onMessage: (callback: (packet: ReadPacket) => void) => {},
      sendMessage: (packet: WritePacket) => {},
    };
  }

  private bindEvents(server: any) {
    server.onMessage(async (packet: ReadPacket) => {
      const handler = this.getHandlerByPattern(packet.pattern);
      if (!handler) {
        return;
      }
      const response = await handler(packet.data);
      server.sendMessage({ id: packet.id, data: response });
    });
  }

  protected async dispatchEvent(packet: ReadPacket): Promise<any> {
    const pattern = this.normalizePattern(packet.pattern);
    const handler = this.getHandlerByPattern(pattern);
    if (handler) {
      await handler(packet.data);
    }
  }

  private getHandlerByPattern(pattern: string): Function | undefined {
    return this.messageHandlers.get(pattern);
  }

  protected normalizePattern(pattern: string): string {
    return JSON.stringify(patter);
  }

  protected async handleMessage(
    packet: ReadPacket,
    callback: (packet: WritePacket) => void
  ): Promise<void> {
    const pattern = this.normalizePattern(packet.pattern);
    const handler = this.getHandlerByPattern(pattern);
    if (handler) {
      const response = await handler(packet.data);
      const responsePacket: WritePacket = { id: packet.id, data: response, err: null };
      callback(responsePacket);
    } else {
      const noHandlerPacket: WritePacket = {
        id: packet.id,
        err: new Error('No handler for pattern'),
        data: null,
      };
      callback(noHandlerPacket);
    }
  }
}
//custom-transport.controller.ts
import { Controller, Inject } from '@nestjs/common';
import { MessagePattern, EventPattern, Ctx, Payload } from '@nestjs/microservices';
import { CustomTransportService } from './custom-transport.service';

@Controller()
export class CustomTransportController {
  constructor(private readonly customTransportService: CustomTransportService) {}

  @MessagePattern('pattern1')
  async handleMessagePattern1(@Payload() data: any, @Ctx() context: any): Promise<any> {
    try {
      // Handle the message based on the custom protocol for pattern1
      return this.processDataForPattern1(data, context);
    } catch (error) {
      // Handle errors specifically for pattern1
      return this.handleError(error, 'pattern1');
    }
  }

  @MessagePattern('pattern2')
  async handleMessagePattern2(@Payload() data: any, @Ctx() context: any): Promise<any> {
    // Similar implementation for pattern2
    // ...
  }

  @EventPattern('event1')
  async handleEventPattern1(@Payload() data: any, @Ctx() context: any): Promise<void> {
    // Handle events, these might not need a response
    this.processEventForPattern1(data, context);
  }

  private processDataForPattern1(data: any, context: any): any {
    // Process the data for pattern1
    // This method should be implemented based on your specific business logic
    // ...
  }

  private processEventForPattern1(data: any, context: any): void {
    // Process the event for event1
    // This could be logging, triggering other processes, etc.
    // ...
  }

  private handleError(error: any, pattern: string): any {
    // Generic error handling logic
    console.error(`Error handling message for ${pattern}:`, error);
    return { error: 'An error occurred' };
  }

  // Additional utility methods or business logic can be added here
}
//custom-transport.module.ts
import { Module } from '@nestjs/common';
import { CustomTransportService } from './custom-transport.service';
import { CustomTransportController } from './custom-transport.controller';

@Module({
  controllers: [CustomTransportController],
  providers: [CustomTransportService],
  exports: [CustomTransportService],
})
export class CustomTransportModule {}
//custom-transport-client.service.ts
import { Injectable } from '@nestjs/common';
import {
  Client,
  CustomTransportStrategy,
  PacketId,
  ReadPacket,
  WritePacket,
} from '@nestjs/microservices';

@Injectable()
export class CustomTransportClientService extends Client implements CustomTransportStrategy {
  private client: any; // Replace with your actual client type

  constructor() {
    super();
    this.client = this.createCustomClient();
  }

  public async connect(): Promise<any> {
    return new Promise((resolve, reject) => {
      // Implement the connection logic for your custom client
      this.client.connect((err) => {
        if (err) {
          return reject(err);
        }
        this.isConnected = true;
        resolve();
      });
    });
  }

  public async close(): Promise<any> {
    this.client.close();
    this.isConnected = false;
  }

  public async send(packet: ReadPacket, callback: (packet: WritePacket) => void): Promise<any> {
    // Implement the send logic of your custom client
    this.client.sendMessage(packet, (response) => {
      callback({ err: null, response, isDisposed: true });
    });
  }

  public async emit(packet: ReadPacket): Promise<any> {
    // Implement the emit logic of your custom client
    this.client.emitMessage(packet);
  }

  private createCustomClient(): any {
    // Replace with your custom client initialization logic
    return {
      connect: (callback: (err: any) => void) => callback(null),
      close: () => {},
      sendMessage: (packet: ReadPacket, callback: (response: WritePacket) => void) =>
        callback({ response: 'ok' }),
      emitMessage: (packet: ReadPacket) => {},
    };
  }
}
//custom-transport-client.controller.ts
import { Controller, Get, Post, Put, Delete, Body, Param, Query } from '@nestjs/common';
import { CustomTransportClientService } from './custom-transport-client.service';

@Controller('custom-transport')
export class CustomTransportClientController {
  constructor(private readonly customTransportClientService: CustomTransportClientService) {}

  @Get('query/:pattern')
  async queryData(@Param('pattern') pattern: string, @Query() query: any): Promise<any> {
    return this.sendRequest(pattern, query);
  }

  @Get('data/:pattern')
  async getData(@Param('pattern') pattern: string): Promise<any> {
    return this.sendRequest(pattern, null);
  }

  @Post('data/:pattern')
  async postData(@Param('pattern') pattern: string, @Body() data: any): Promise<any> {
    return this.sendRequest(pattern, data);
  }

  @Put('data/:pattern')
  async updateData(@Param('pattern') pattern: string, @Body() data: any): Promise<any> {
    return this.sendRequest(pattern, data);
  }

  @Delete('data/:pattern')
  async deleteData(@Param('pattern') pattern: string): Promise<any> {
    return this.sendRequest(pattern, null);
  }

  private sendRequest(pattern: string, data: any): Promise<any> {
    return new Promise((resolve, reject) => {
      this.customTransportClientService.send({ pattern, data }, (response) => {
        if (response.err) {
          reject(response.err);
        } else {
          resolve(response.response);
        }
      });
    });
  }
}
//custom-transport-client.module.ts
import { Module } from '@nestjs/common';
import { CustomTransportClientService } from './custom-transport-client.service';
import { CustomTransportClientController } from './custom-transport-client.controller';

@Module({
  controllers: [CustomTransportClientController],
  providers: [CustomTransportClientService],
})
export class CustomTransportClientModule {}

Exception-Filter

//exception-filters.service.ts
import { Injectable, Logger } from '@nestjs/common';
import { RpcException } from '@nestjs/microservices';

@Injectable()
export class ExceptionFiltersService {
  private readonly logger = new Logger(ExceptionFiltersService.name);

  handleRpcException(exception: any): RpcException {
    // Process and log the exception, then return a formatted RpcException
    this.logException(exception);
    return this.formatRpcException(exception);
  }

  private logException(exception: any): void {
    // Implement logging logic here
    this.logger.error(`Exception caught: ${exception.message}`, exception.stack);
  }

  private formatRpcException(exception: any): RpcException {
    // Convert the exception to a RpcException if it's not already one
    if (exception instanceof RpcException) {
      return exception;
    }

    // Format the exception to RpcException format
    const formattedException = new RpcException({
      message: exception.message,
      status: 'error',
      data: exception.response || null,
    });

    return formattedException;
  }

  // Additional methods for handling other types of exceptions can be added here
}
// exception-filters.controller.ts

import { Controller, Get, UseFilters, HttpException, HttpStatus } from '@nestjs/common';
import { RpcException, RpcExceptionFilter, Transport } from '@nestjs/microservices';
import { ExceptionFiltersService } from './exception-filters.service';

@Controller('exception-filters')
export class ExceptionFiltersController {
  constructor(private exceptionFiltersService: ExceptionFiltersService) {}

  @Get()
  @UseFilters(new RpcExceptionFilter())
  getDefaultException() {
    // Simulate a default exception
    throw new RpcException('Default exception');
  }

  @Get('custom-exception')
  getCustomException() {
    try {
      // Simulate a custom exception
      throw new HttpException('Custom exception', HttpStatus.BAD_REQUEST);
    } catch (exception) {
      throw this.exceptionFiltersService.handleRpcException(exception);
    }
  }

  @Get('async-exception')
  async getAsyncException() {
    // Simulate an async exception
    await new Promise((_, reject) => {
      setTimeout(() => reject(new RpcException('Async exception')), 1000);
    }).catch((exception) => {
      throw this.exceptionFiltersService.handleRpcException(exception);
    });
  }

  @Get('transport-specific-exception')
  @UseFilters(new RpcExceptionFilter({ transport: Transport.TCP }))
  getTransportSpecificException() {
    // Simulate a transport-specific exception
    throw new RpcException('Transport-specific exception');
  }

  // Additional endpoints to demonstrate different exception scenarios can be added here
}
// exception-filters.module.ts

import { Module } from '@nestjs/common';
import { APP_FILTER } from '@nestjs/core';
import { RpcExceptionFilter } from '@nestjs/microservices';
import { ExceptionFiltersService } from './exception-filters.service';

@Module({
  providers: [
    {
      provide: APP_FILTER,
      useFactory: (exceptionFiltersService: ExceptionFiltersService) => {
        return new RpcExceptionFilter({
          catch: (exception, host) => {
            return exceptionFiltersService.handleRpcException(exception);
          },
        });
      },
      inject: [ExceptionFiltersService],
    },
    ExceptionFiltersService,
  ],
})
export class ExceptionFiltersModule {}
// exception-filters-client.service.ts

import { Injectable, Catch, ArgumentsHost, ExceptionFilter } from '@nestjs/common';
import { RpcException, ClientProxy } from '@nestjs/microservices';
import { ExceptionFiltersService } from './exception-filters.service';

@Injectable()
@Catch()
export class ExceptionFiltersClientService implements ExceptionFilter {
  constructor(private exceptionFiltersService: ExceptionFiltersService) {}

  catch(exception: any, host: ArgumentsHost) {
    if (exception instanceof RpcException) {
      // Specialized handling for RPC exceptions
      const formattedException = this.exceptionFiltersService.handleRpcException(exception);
      this.handleRpcException(formattedException, host);
    } else {
      // General exception handling
      this.handleGeneralException(exception, host);
    }
  }

  private handleRpcException(exception: RpcException, host: ArgumentsHost) {
    // Implement custom logic for RpcException
    // e.g., logging, custom response formatting
    // You can use host.switchToHttp() or host.switchToWs() for http/ws specific context
  }

  private handleGeneralException(exception: any, host: ArgumentsHost) {
    // Implement custom logic for other types of exceptions
    // e.g., logging, different response formatting
    // You can use host.switchToHttp() or host.switchToWs() for http/ws specific context
  }

  // Additional methods for more specific exception handling can be added here
}
// exception-filters-client.controller.ts

import { Controller, Get, Param, Post, Body, UseFilters } from '@nestjs/common';
import { RpcException } from '@nestjs/microservices';
import { ExceptionFiltersClientService } from './exception-filters-client.service';

@Controller('exceptions')
@UseFilters(ExceptionFiltersClientService)
export class ExceptionFiltersClientController {
  @Get('/rpc/:message')
  triggerRpcException(@Param('message') message: string) {
    throw new RpcException(`RPC error: ${message}`);
  }

  @Get('/standard/:message')
  triggerStandardException(@Param('message') message: string) {
    throw new Error(`Standard error: ${message}`);
  }

  @Post('/custom')
  triggerCustomException(@Body() body: { type: string; message: string }) {
    if (body.type === 'rpc') {
      throw new RpcException(body.message);
    } else {
      throw new Error(body.message);
    }
  }

  // Add more endpoints here to trigger various types of exceptions
}
// exception-filters-client.module.ts

import { Module } from '@nestjs/common';
import { ExceptionFiltersClientController } from './exception-filters-client.controller';
import { ExceptionFiltersClientService } from './exception-filters-client.service';
import { ExceptionFiltersService } from './exception-filters.service';

@Module({
  controllers: [ExceptionFiltersClientController],
  providers: [
    ExceptionFiltersService,
    {
      provide: 'EXCEPTION_FILTERS_CLIENT',
      useClass: ExceptionFiltersClientService,
    },
  ],
})
export class ExceptionFiltersClientModule {}
Previous Blognest-mongoose