microservices in nest
- Published on
- Hamed Gholami--35 min read
Overview
Creating a simple nest.js (with Grpc)
nest-microservices-project/
├── src/
│ ├── microservices/
│ │ ├── redis/
│ │ │ ├── redis.service.ts
│ │ │ ├── redis.controller.ts
│ │ │ ├── redis.module.ts
│ │ │ └── redis-client/
│ │ │ ├── redis-client.controller.ts
│ │ │ ├── redis-client.service.ts
│ │ │ └── redis-client.module.ts
│ │ ├── mqtt/
│ │ │ ├── mqtt.service.ts
│ │ │ ├── mqtt.controller.ts
│ │ │ ├── mqtt.module.ts
│ │ │ └── mqtt-client/
│ │ │ ├── mqtt-client.controller.ts
│ │ │ ├── mqtt-client.service.ts
│ │ │ └── mqtt-client.module.ts
│ │ ├── nats/
│ │ │ ├── nats.service.ts
│ │ │ ├── nats.controller.ts
│ │ │ ├── nats.module.ts
│ │ │ └── nats-client/
│ │ │ ├── nats-client.controller.ts
│ │ │ ├── nats-client.service.ts
│ │ │ └── nats-client.module.ts
│ │ ├── rabbitmq/
│ │ │ ├── rabbitmq.service.ts
│ │ │ ├── rabbitmq.controller.ts
│ │ │ ├── rabbitmq.module.ts
│ │ │ └── rabbitmq-client/
│ │ │ ├── rabbitmq-client.controller.ts
│ │ │ ├── rabbitmq-client.service.ts
│ │ │ └── rabbitmq-client.module.ts
│ │ ├── kafka/
│ │ │ ├── kafka.service.ts
│ │ │ ├── kafka.controller.ts
│ │ │ ├── kafka.module.ts
│ │ │ └── kafka-client/
│ │ │ ├── kafka-client.controller.ts
│ │ │ ├── kafka-client.service.ts
│ │ │ └── kafka-client.module.ts
│ │ ├── custom-transport/
│ │ │ ├── custom-transport.service.ts
│ │ │ ├── custom-transport.controller.ts
│ │ │ ├── custom-transport.module.ts
│ │ │ └── custom-transport-client/
│ │ │ ├── custom-transport-client.controller.ts
│ │ │ ├── custom-transport-client.service.ts
│ │ │ └── custom-transport-client.module.ts
│ │ ├── exception-filters/
│ │ │ ├── exception-filters.controller.ts
│ │ │ ├── exception-filters.service.ts
│ │ │ ├── exception-filters.module.ts
│ │ │ └── exception-filters-client/
│ │ │ ├── exception-filters-client.controller.ts
│ │ │ ├── exception-filters-client.service.ts
│ │ │ └── exception-filters-client.module.ts
│ │ ├── pipes/
│ │ │ ├── pipes.service.ts
│ │ │ ├── pipes.controller.ts
│ │ │ ├── pipes.module.ts
│ │ │ └── pipes-client/
│ │ │ ├── pipes-client.controller.ts
│ │ │ ├── pipes-client.service.ts
│ │ │ └── pipes-client.module.ts
│ │ ├── guards/
│ │ │ ├── guards.service.ts
│ │ │ ├── guards.controller.ts
│ │ │ ├── guards.module.ts
│ │ │ └── guards-client/
│ │ │ ├── guards-client.controller.ts
│ │ │ ├── guards-client.service.ts
│ │ │ └── guards-client.module.ts
│ │ ├── interceptors/
│ │ ├── interceptors.service.ts
│ │ ├── interceptors.controller.ts
│ │ ├── interceptors.module.ts
│ │ └── interceptors-client/
│ │ ├── interceptors-client.controller.ts
│ │ ├── interceptors-client.service.ts
│ │ └── interceptors-client.module.ts
│ ├── app.module.ts (Root module)
│ ├── main.ts (Entry point)
├── node_modules/ (npm dependencies)
├── test/ (test suites)
├── nest-cli.json (Nest CLI configuration)
├── tsconfig.json (TypeScript compiler options)
├── tsconfig.build.json (TypeScript compiler options for build)
├── package.json (npm package definition and scripts)
└── package-lock.json (locked versions of npm dependencies)
redis
//redis.service.ts
import { Injectable } from '@nestjs/common';
import { ClientProxy, ClientProxyFactory, Transport } from '@nestjs/microservices';
import { RedisOptions } from 'nestjs-redis';
@Injectable()
export class RedisService {
private readonly redisClient: ClientProxy;
constructor(private readonly redisOptions: RedisOptions) {
// Create a Redis client using the provided Redis options.
this.redisClient = ClientProxyFactory.create({
transport: Transport.REDIS,
options: redisOptions,
});
}
// Set a key-value pair in Redis.
async setValue(key: string, value: string): Promise<void> {
await this.redisClient.emit('set', { key, value }).toPromise();
}
// Get the value associated with a key from Redis.
async getValue(key: string): Promise<string | null> {
// Send a 'get' request to Redis and await the response.
const response = await this.redisClient.send<string | null, string>('get', key).toPromise();
return response;
}
// Delete a key from Redis and return the number of keys deleted (0 or 1).
async deleteValue(key: string): Promise<number> {
// Send a 'del' request to Redis and await the response.
const response = await this.redisClient.send<number, string>('del', key).toPromise();
return response;
}
// Increment the value associated with a key in Redis and return the new value.
async increment(key: string): Promise<number> {
// Send an 'incr' request to Redis and await the response.
const response = await this.redisClient.send<number, string>('incr', key).toPromise();
return response;
}
}
//redis.controller.ts
import { Controller, Get, Post, Body, Param, Delete, Put } from '@nestjs/common';
import { RedisService } from './redis.service';
@Controller('redis')
export class RedisController {
constructor(private readonly redisService: RedisService) {}
// POST /redis/set/:key
@Post('set/:key')
async setValue(@Param('key') key: string, @Body('value') value: string) {
await this.redisService.setValue(key, value);
return `Key "${key}" set with value "${value}"`;
}
// GET /redis/get/:key
@Get('get/:key')
async getValue(@Param('key') key: string) {
const value = await this.redisService.getValue(key);
if (value !== null) {
return `Value for key "${key}": "${value}"`;
} else {
return `Key "${key}" not found in Redis`;
}
}
// DELETE /redis/delete/:key
@Delete('delete/:key')
async deleteValue(@Param('key') key: string) {
const deletedCount = await this.redisService.deleteValue(key);
if (deletedCount === 1) {
return `Key "${key}" deleted from Redis`;
} else {
return `Key "${key}" not found in Redis`;
}
}
// PUT /redis/increment/:key
@Put('increment/:key')
async incrementValue(@Param('key') key: string) {
const incrementedValue = await this.redisService.increment(key);
return `Key "${key}" incremented. New value: "${incrementedValue}"`;
}
}
//redis.module.ts
import { Module } from '@nestjs/common';
import { ClientsModule, Transport } from '@nestjs/microservices';
import { RedisService } from './redis.service';
import { RedisController } from './redis.controller';
import { RedisOptions } from 'nestjs-redis';
@Module({
imports: [
ClientsModule.register([
{
name: 'REDIS_CLIENT', // Name of the Redis client for reference
transport: Transport.REDIS, // Use Redis as the transport layer
options: {
// Specify your Redis configuration options here.
// Refer to the provided link for more details on Redis configuration.
// Example:
url: 'redis://localhost:6379', // Replace with your Redis server URL
} as RedisOptions, // Define Redis-specific options from 'nestjs-redis'
},
]),
],
controllers: [RedisController], // Include the RedisController for handling HTTP requests
providers: [RedisService], // Include the RedisService for Redis-related operations
})
export class RedisModule {}
//redis-client.service.ts
import { Injectable, NotFoundException } from '@nestjs/common';
import { ClientProxy, ClientProxyFactory, Transport } from '@nestjs/microservices';
import { RedisOptions } from 'nestjs-redis';
@Injectable()
export class RedisClientService {
private readonly redisClient: ClientProxy;
constructor(private readonly redisOptions: RedisOptions) {
// Create a Redis client using the provided Redis options.
this.redisClient = ClientProxyFactory.create({
transport: Transport.REDIS,
options: redisOptions,
});
}
/**
* Set a key-value pair in Redis.
* @param key The Redis key.
* @param value The value to be set.
* @throws NotFoundException if the key does not exist.
*/
async setValue(key: string, value: string): Promise<void> {
// Ensure the key exists in Redis.
const existingValue = await this.getValue(key);
if (existingValue === null) {
throw new NotFoundException(`Key '${key}' does not exist in Redis.`);
}
// Set the key-value pair in Redis.
await this.redisClient.emit('set', { key, value }).toPromise();
}
/**
* Get the value associated with a key from Redis.
* @param key The Redis key.
* @returns The value associated with the key, or null if the key does not exist.
*/
async getValue(key: string): Promise<string | null> {
// Send a 'get' request to Redis and await the response.
const response = await this.redisClient.send<string | null, string>('get', key).toPromise();
return response;
}
/**
* Delete a key from Redis and return the number of keys deleted (0 or 1).
* @param key The Redis key to be deleted.
* @returns The number of keys deleted (0 or 1).
*/
async deleteValue(key: string): Promise<number> {
// Send a 'del' request to Redis and await the response.
const response = await this.redisClient.send<number, string>('del', key).toPromise();
return response;
}
/**
* Increment the value associated with a key in Redis and return the new value.
* @param key The Redis key to be incremented.
* @returns The new incremented value.
*/
async increment(key: string): Promise<number> {
// Send an 'incr' request to Redis and await the response.
const response = await this.redisClient.send<number, string>('incr', key).toPromise();
return response;
}
}
//redis-client.module.ts
import { Module } from '@nestjs/common';
import { ClientsModule, Transport } from '@nestjs/microservices';
import { RedisService } from './redis.service';
import { RedisClientController } from './redis-client.controller';
import { RedisClientService } from './redis-client.service';
import { RedisOptions } from 'nestjs-redis';
@Module({
imports: [
// Configure the Redis microservice client.
ClientsModule.register([
{
name: 'REDIS_SERVICE', // A unique name for the Redis microservice client.
transport: Transport.REDIS, // Use Redis as the transport method.
// Provide Redis connection options here, which should match your Redis server setup.
// You can configure host, port, password, and other connection parameters.
options: {
url: 'redis://localhost:6379', // Example URL, update as needed.
// Add more Redis-specific options here as necessary.
// For example:
// password: 'your-redis-password',
// db: 0, // Redis database index
// maxRetriesPerRequest: 3, // Number of maximum retries for requests
} as RedisOptions,
},
]),
],
controllers: [RedisClientController],
// Providers encapsulate the business logic and interact with the Redis microservice client.
providers: [RedisClientService],
// Export the RedisClientService to make it available for other modules that import this module.
exports: [RedisClientService],
})
export class RedisClientModule {}
mqtt
//mqtt.service.ts
import { Injectable } from '@nestjs/common';
import { ClientProxy, ClientProxyFactory, Transport } from '@nestjs/microservices';
@Injectable()
export class MqttService {
private readonly mqttClient: ClientProxy;
constructor() {
// Create an MQTT client using the specified transport.
this.mqttClient = ClientProxyFactory.create({
transport: Transport.MQTT,
options: {
// Specify your MQTT broker connection details here.
// host: 'mqtt://localhost', // Example MQTT broker host
// port: 1883, // Example MQTT broker port
},
});
}
// Publish a message to an MQTT topic.
async publish(topic: string, message: any): Promise<void> {
// Use the `emit` method to publish a message to the MQTT topic.
await this.mqttClient.emit(topic, message).toPromise();
}
// Subscribe to an MQTT topic and handle incoming messages.
async subscribe(topic: string, callback: (message: any) => void): Promise<void> {
// Connect to the MQTT broker before subscribing to a topic.
await this.mqttClient.connect();
// Subscribe to the specified MQTT topic.
await this.mqttClient.subscribe(topic);
// Use the `observe` method to receive incoming messages and handle them using a callback.
this.mqttClient.observe(topic).subscribe((message) => {
// Handle incoming messages using the provided callback function.
callback(message);
});
}
// Unsubscribe from an MQTT topic.
async unsubscribe(topic: string): Promise<void> {
// Unsubscribe from the specified MQTT topic.
await this.mqttClient.unsubscribe(topic);
}
}
//mqtt.controller.ts
import { Controller, Get, Post, Delete, Param, Body, Res } from '@nestjs/common';
import { MqttService } from './mqtt.service'; // Import the MQTT service
import { Response } from 'express'; // Import the Express Response object for HTTP responses
@Controller('mqtt') // Define a controller with the base route '/mqtt'
export class MqttController {
constructor(private readonly mqttService: MqttService) {}
// Define an HTTP endpoint to publish a message to an MQTT topic
// This corresponds to the '/mqtt/publish/:topic' route
@Post('publish/:topic')
async publishMessage(@Param('topic') topic: string, @Body() message: any): Promise<void> {
try {
// Use the MQTT service to publish the message to the specified topic
await this.mqttService.publish(topic, message);
// Respond with a success message
return 'Message published to MQTT topic successfully';
} catch (error) {
// Handle any errors that may occur during publishing
throw new Error(`Failed to publish message: ${error.message}`);
}
}
// Define an HTTP endpoint to subscribe to an MQTT topic and handle incoming messages
// This corresponds to the '/mqtt/subscribe/:topic' route
@Get('subscribe/:topic')
async subscribeToTopic(@Param('topic') topic: string, @Res() response: Response): Promise<void> {
try {
// Create an MQTT subscription and handle incoming messages
await this.mqttService.subscribe(topic, (message) => {
// Handle incoming messages here (e.g., log, send a response)
console.log(`Received MQTT message on topic '${topic}':`, message);
// Send an HTTP response with the received message
response.status(200).json(message);
});
} catch (error) {
// Handle any errors that may occur during subscription
throw new Error(`Failed to subscribe to topic: ${error.message}`);
}
}
// Define an HTTP endpoint to unsubscribe from an MQTT topic
// This corresponds to the '/mqtt/unsubscribe/:topic' route
@Delete('unsubscribe/:topic')
async unsubscribeFromTopic(@Param('topic') topic: string): Promise<void> {
try {
// Unsubscribe from the specified MQTT topic
await this.mqttService.unsubscribe(topic);
// Respond with a success message
return 'Unsubscribed from MQTT topic successfully';
} catch (error) {
// Handle any errors that may occur during unsubscription
throw new Error(`Failed to unsubscribe from topic: ${error.message}`);
}
}
}
//mqtt.module.ts
import { Module } from '@nestjs/common';
import { MqttService } from './mqtt.service'; // Import the MQTT service
import { MqttController } from './mqtt.controller'; // Import the MQTT controller
@Module({
controllers: [MqttController], // Declare the MQTT controller
providers: [MqttService], // Declare the MQTT service
})
export class MqttModule {}
//mqtt-client.service.ts
import { Injectable } from '@nestjs/common';
import { ClientProxy, ClientProxyFactory, Transport } from '@nestjs/microservices';
@Injectable()
export class MqttClientService {
private readonly mqttClient: ClientProxy;
constructor() {
// Create an MQTT client using the specified transport.
this.mqttClient = ClientProxyFactory.create({
transport: Transport.MQTT,
options: {
// Specify your MQTT broker connection details here.
// host: 'mqtt://localhost', // Example MQTT broker host
// port: 1883, // Example MQTT broker port
},
});
}
/**
* Publish a message to an MQTT topic.
* @param topic The MQTT topic to publish to.
* @param message The message to publish.
*/
async publish(topic: string, message: any): Promise<void> {
// Use the `emit` method to publish a message to the MQTT topic.
await this.mqttClient.emit(topic, message).toPromise();
}
/**
* Subscribe to an MQTT topic and handle incoming messages.
* @param topic The MQTT topic to subscribe to.
* @param callback A callback function to handle incoming messages.
*/
async subscribe(topic: string, callback: (message: any) => void): Promise<void> {
// Connect to the MQTT broker before subscribing to a topic.
await this.mqttClient.connect();
// Subscribe to the specified MQTT topic.
await this.mqttClient.subscribe(topic);
// Use the `observe` method to receive incoming messages and handle them using a callback.
this.mqttClient.observe(topic).subscribe((message) => {
// Handle incoming messages using the provided callback function.
callback(message);
});
}
/**
* Unsubscribe from an MQTT topic.
* @param topic The MQTT topic to unsubscribe from.
*/
async unsubscribe(topic: string): Promise<void> {
// Unsubscribe from the specified MQTT topic.
await this.mqttClient.unsubscribe(topic);
}
}
//mqtt-client.controller.ts
import { Controller, Inject, Post, Body } from '@nestjs/common';
import { MqttClientService } from './mqtt.service';
@Controller('mqtt')
export class MqttClientController {
constructor(@Inject(MqttClientService) private readonly mqttService: MqttClientService) {}
/**
* Endpoint to publish a message to an MQTT topic.
*
* This endpoint allows clients to send messages to an MQTT topic.
* It expects a JSON payload with the "topic" and "message" fields.
*
* Example request:
* POST /mqtt/publish
* Request body: { "topic": "exampleTopic", "message": "Hello, MQTT!" }
*/
@Post('publish')
async publishMessage(@Body() messageData: { topic: string; message: any }): Promise<void> {
const { topic, message } = messageData;
// Use the MQTT service to publish the message to the specified topic
await this.mqttService.publish(topic, message);
}
/**
* Endpoint to subscribe to an MQTT topic and handle incoming messages.
*
* This endpoint allows clients to subscribe to an MQTT topic and receive incoming messages.
* It expects a JSON payload with the "topic" field.
*
* Example request:
* POST /mqtt/subscribe
* Request body: { "topic": "exampleTopic" }
*/
@Post('subscribe')
async subscribeToTopic(@Body() topicData: { topic: string }): Promise<void> {
const { topic } = topicData;
// Define a callback function to handle incoming MQTT messages
const handleMqttMessage = (message: any) => {
// Handle the incoming message as needed
console.log(`Received MQTT message on topic ${topic}:`, message);
};
// Use the MQTT service to subscribe to the specified topic and handle incoming messages
await this.mqttService.subscribe(topic, handleMqttMessage);
}
/**
* Endpoint to unsubscribe from an MQTT topic.
*
* This endpoint allows clients to unsubscribe from an MQTT topic.
* It expects a JSON payload with the "topic" field.
*
* Example request:
* POST /mqtt/unsubscribe
* Request body: { "topic": "exampleTopic" }
*/
@Post('unsubscribe')
async unsubscribeFromTopic(@Body() topicData: { topic: string }): Promise<void> {
const { topic } = topicData;
// Use the MQTT service to unsubscribe from the specified topic
await this.mqttService.unsubscribe(topic);
}
}
//mqtt-client.module.ts
import { Module } from '@nestjs/common';
import { ClientsModule, Transport, ClientProxyFactory } from '@nestjs/microservices';
import { MqttService } from './mqtt.service';
@Module({
imports: [
// Use the `ClientsModule` to create an MQTT client instance.
// The `name` property is used to identify the client.
// The `transport` property is set to `Transport.MQTT` for MQTT communication.
ClientsModule.register([
{
name: 'MQTT_CLIENT', // Client name
transport: Transport.MQTT, // Transport type (MQTT)
options: {
// Specify MQTT broker connection details here.
// host: 'mqtt://localhost', // Example MQTT broker host
// port: 1883, // Example MQTT broker port
},
},
]),
],
providers: [MqttService], // The MQTT service provided by this module
exports: [MqttService], // Export the MQTT service for use in other modules
})
export class MqttClientModule {}
NATS
//nats.service.ts
import { Injectable } from '@nestjs/common';
import { ClientProxy, ClientProxyFactory, Transport } from '@nestjs/microservices';
@Injectable()
export class NatsService {
private readonly natsClient: ClientProxy;
constructor() {
// Create a NATS client using the specified transport.
this.natsClient = ClientProxyFactory.create({
transport: Transport.NATS,
options: {
// Specify NATS server connection options here.
// servers: ['nats://localhost:4222'], // Example NATS server URL
},
});
}
/**
* Publishes a message to a NATS subject.
* @param subject - The NATS subject to which the message will be published.
* @param message - The message payload to be sent.
*/
async publish(subject: string, message: any): Promise<void> {
// Use the `emit` method to publish a message to the NATS subject.
await this.natsClient.emit(subject, message).toPromise();
}
/**
* Subscribes to a NATS subject and handles incoming messages with a callback function.
* @param subject - The NATS subject to subscribe to.
* @param callback - A callback function to handle incoming messages.
*/
async subscribe(subject: string, callback: (message: any) => void): Promise<void> {
// Connect to the NATS server before subscribing to a subject.
await this.natsClient.connect();
// Subscribe to the specified NATS subject.
await this.natsClient.subscribe(subject);
// Use the `observe` method to receive incoming messages and handle them using a callback.
this.natsClient.observe(subject).subscribe((message) => {
// Handle incoming messages using the provided callback function.
callback(message);
});
}
/**
* Unsubscribes from a NATS subject.
* @param subject - The NATS subject to unsubscribe from.
*/
async unsubscribe(subject: string): Promise<void> {
// Unsubscribe from the specified NATS subject.
await this.natsClient.unsubscribe(subject);
}
/**
* Closes the NATS connection.
*/
async closeConnection(): Promise<void> {
// Close the NATS connection when it's no longer needed.
await this.natsClient.close();
}
}
//nats.controller.ts
import { Controller } from '@nestjs/common';
import { MessagePattern, ClientProxy, Transport, ClientProxyFactory } from '@nestjs/microservices';
import { NatsService } from './nats.service';
@Controller()
export class NatsController {
private readonly natsClient: ClientProxy;
constructor(private readonly natsService: NatsService) {
// Create a NATS client using the specified transport.
this.natsClient = ClientProxyFactory.create({
transport: Transport.NATS,
options: {
// Specify NATS server connection options here.
// servers: ['nats://localhost:4222'], // Example NATS server URL
},
});
}
/**
* Message handler for incoming messages on the specified NATS subject.
* @param message - The incoming message payload.
*/
@MessagePattern('your-nats-subject') // Replace with your NATS subject
async handleMessage(message: any): Promise<void> {
try {
// Handle incoming messages here.
// You can process the message and invoke service methods as needed.
const result = await this.natsService.processMessage(message);
// Optionally, you can send a response back to the sender.
this.natsClient.emit('response-nats-subject', result);
} catch (error) {
// Handle errors or exceptions that occur during message processing.
// You can log the error or take appropriate actions.
}
}
}
//nats.module.ts
import { Module } from '@nestjs/common';
import { ClientsModule, Transport, ClientProxyFactory } from '@nestjs/microservices';
import { NatsService } from './nats.service';
@Module({
imports: [
// Create a client connection to the NATS server
ClientsModule.register([
{
name: 'NATS_CLIENT', // Unique name for the NATS client
transport: Transport.NATS, // Use NATS transport
options: {
// Specify NATS server connection options here
// For example, servers: ['nats://localhost:4222']
},
},
]),
],
providers: [NatsService], // Declare NatsService as a provider
exports: [NatsService], // Export NatsService for use in other modules
})
export class NatsModule {
constructor() {
// Create a NATS client using ClientProxyFactory
const natsClient = ClientProxyFactory.create({
transport: Transport.NATS,
options: {
// Specify NATS server connection options here
// For example, servers: ['nats://localhost:4222']
},
});
// Establish a connection to the NATS server
natsClient.connect();
}
}
//nats-client.service.ts
import { Injectable } from '@nestjs/common';
import { ClientProxyFactory, Transport, ClientProxy } from '@nestjs/microservices';
@Injectable()
export class NatsClientService {
private readonly natsClient: ClientProxy;
constructor() {
// Create a NATS client using the specified transport.
this.natsClient = ClientProxyFactory.create({
transport: Transport.NATS,
options: {
// Specify NATS server connection options here.
// servers: ['nats://localhost:4222'], // Example NATS server URL
},
});
}
/**
* Publishes a message to a NATS subject.
* @param subject - The NATS subject to which the message will be published.
* @param message - The message payload to be sent.
*/
async publish(subject: string, message: any): Promise<void> {
// Use the `emit` method to publish a message to the NATS subject.
await this.natsClient.emit(subject, message).toPromise();
}
/**
* Subscribes to a NATS subject and handles incoming messages with a callback function.
* @param subject - The NATS subject to subscribe to.
* @param callback - A callback function to handle incoming messages.
*/
async subscribe(subject: string, callback: (message: any) => void): Promise<void> {
// Connect to the NATS server before subscribing to a subject.
await this.natsClient.connect();
// Subscribe to the specified NATS subject.
await this.natsClient.subscribe(subject);
// Use the `observe` method to receive incoming messages and handle them using a callback.
this.natsClient.observe(subject).subscribe((message) => {
// Handle incoming messages using the provided callback function.
callback(message);
});
}
/**
* Unsubscribes from a NATS subject.
* @param subject - The NATS subject to unsubscribe from.
*/
async unsubscribe(subject: string): Promise<void> {
// Unsubscribe from the specified NATS subject.
await this.natsClient.unsubscribe(subject);
}
/**
* Closes the NATS connection.
*/
async closeConnection(): Promise<void> {
// Close the NATS connection when it's no longer needed.
await this.natsClient.close();
}
}
//nats-client.controller.ts
import { Controller, Post, Body, Param } from '@nestjs/common';
import { NatsService } from './nats.service';
@Controller('nats')
export class NatsClientController {
constructor(private readonly natsService: NatsService) {}
// Endpoint to publish a message to a NATS subject
@Post('publish/:subject')
async publishMessage(@Param('subject') subject: string, @Body() message: any) {
try {
await this.natsService.publish(subject, message);
return 'Message published successfully';
} catch (error) {
throw new InternalServerErrorException('Failed to publish message to NATS');
}
}
// Endpoint to subscribe to a NATS subject
@Post('subscribe/:subject')
async subscribeToSubject(@Param('subject') subject: string) {
try {
await this.natsService.subscribe(subject, (message) => {
// Handle incoming messages here
});
return 'Subscribed to NATS subject';
} catch (error) {
throw new InternalServerErrorException('Failed to subscribe to NATS subject');
}
}
// Endpoint to unsubscribe from a NATS subject
@Post('unsubscribe/:subject')
async unsubscribeFromSubject(@Param('subject') subject: string) {
try {
await this.natsService.unsubscribe(subject);
return 'Unsubscribed from NATS subject';
} catch (error) {
throw new InternalServerErrorException('Failed to unsubscribe from NATS subject');
}
}
// Endpoint to close the NATS connection
@Post('close-connection')
async closeNatsConnection() {
try {
await this.natsService.closeConnection();
return 'NATS connection closed';
} catch (error) {
throw new InternalServerErrorException('Failed to close NATS connection');
}
}
}
//nats-client.module.ts
import { Module, Global } from '@nestjs/common';
import { ClientProxy, ClientProxyFactory, Transport } from '@nestjs/microservices';
import { NatsService } from './nats.service';
@Global() // Mark the module as global for shared access across the entire application
@Module({
providers: [
NatsService,
{
provide: 'NATS_CLIENT', // Provide a token for injection
useFactory: (): ClientProxy => {
// Create a NATS client using the specified transport and connection options
return ClientProxyFactory.create({
transport: Transport.NATS,
options: {
// Specify NATS server connection options here.
// servers: ['nats://localhost:4222'], // Example NATS server URL
},
});
},
},
],
exports: [NatsService], // Export the NatsService for use in other modules
})
export class NatsClientModule {}
rabbitmq
//rabbitmq.service.ts
import { Injectable } from '@nestjs/common';
import { ClientProxy, ClientProxyFactory, Transport } from '@nestjs/microservices';
@Injectable()
export class RabbitmqService {
private readonly rabbitmqClient: ClientProxy;
constructor() {
this.rabbitmqClient = ClientProxyFactory.create({
transport: Transport.RMQ,
options: {
urls: ['amqp://localhost:5672'], // RabbitMQ server URL
queue: 'my_queue', // Queue name
queueOptions: {
durable: true, // Make the queue durable
},
exchange: 'my_exchange', // Exchange name
exchangeType: 'fanout', // Exchange type
exchangeOptions: {
durable: true, // Make the exchange durable
},
},
});
}
async sendToQueue(pattern: string, data: any): Promise<void> {
await this.rabbitmqClient.send(pattern, data).toPromise();
}
async publishToExchange(exchange: string, pattern: string, data: any): Promise<void> {
await this.rabbitmqClient.emit(pattern, data, { exchange }).toPromise();
}
async receive(pattern: string, callback: (data: any) => void): Promise<void> {
await this.rabbitmqClient.connect();
await this.rabbitmqClient.subscribe(pattern, callback);
}
async bindQueueToExchange(queue: string, exchange: string, pattern: string): Promise<void> {
await this.rabbitmqClient.bindQueue(queue, exchange, pattern);
}
async unbindQueueFromExchange(queue: string, exchange: string, pattern: string): Promise<void> {
await this.rabbitmqClient.unbindQueue(queue, exchange, pattern);
}
async assertQueue(queue: string, options: { durable: boolean }): Promise<void> {
await this.rabbitmqClient.assertQueue(queue, options);
}
async assertExchange(
exchange: string,
options: { durable: boolean; type: string }
): Promise<void> {
await this.rabbitmqClient.assertExchange(exchange, options);
}
async deleteQueue(queue: string): Promise<void> {
await this.rabbitmqClient.deleteQueue(queue);
}
async deleteExchange(exchange: string): Promise<void> {
await this.rabbitmqClient.deleteExchange(exchange);
}
async purgeQueue(queue: string): Promise<void> {
await this.rabbitmqClient.purgeQueue(queue);
}
async closeConnection(): Promise<void> {
await this.rabbitmqClient.close();
}
}
//rabbitmq.controller.ts
import { Controller } from '@nestjs/common';
import { MessagePattern, Payload, Ctx, RmqContext } from '@nestjs/microservices';
import { RabbitmqService } from './rabbitmq.service';
@Controller()
export class RabbitmqController {
constructor(private readonly rabbitmqService: RabbitmqService) {}
@MessagePattern('send_to_queue')
async sendToQueue(@Payload() data: any): Promise<void> {
await this.rabbitmqService.sendToQueue('my_queue', data);
}
@MessagePattern('publish_to_exchange')
async publishToExchange(@Payload() data: any, @Ctx() context: RmqContext): Promise<void> {
const channel = context.getChannelRef();
const originalMessage = context.getMessage();
await this.rabbitmqService.publishToExchange('my_exchange', 'fanout', data);
channel.ack(originalMessage);
}
@MessagePattern('receive')
async receive(@Payload() data: any): Promise<void> {
// Handle incoming data
console.log('Received:', data);
}
@MessagePattern('bind_queue_to_exchange')
async bindQueueToExchange(@Payload() data: any): Promise<void> {
await this.rabbitmqService.bindQueueToExchange('my_queue', 'my_exchange', 'fanout');
}
@MessagePattern('unbind_queue_from_exchange')
async unbindQueueFromExchange(@Payload() data: any): Promise<void> {
await this.rabbitmqService.unbindQueueFromExchange('my_queue', 'my_exchange', 'fanout');
}
@MessagePattern('assert_queue')
async assertQueue(@Payload() data: any): Promise<void> {
await this.rabbitmqService.assertQueue('my_queue', { durable: true });
}
@MessagePattern('assert_exchange')
async assertExchange(@Payload() data: any): Promise<void> {
await this.rabbitmqService.assertExchange('my_exchange', { durable: true, type: 'fanout' });
}
@MessagePattern('delete_queue')
async deleteQueue(@Payload() data: any): Promise<void> {
await this.rabbitmqService.deleteQueue('my_queue');
}
@MessagePattern('delete_exchange')
async deleteExchange(@Payload() data: any): Promise<void> {
await this.rabbitmqService.deleteExchange('my_exchange');
}
@MessagePattern('purge_queue')
async purgeQueue(@Payload() data: any): Promise<void> {
await this.rabbitmqService.purgeQueue('my_queue');
}
}
//rabbitmq.module.ts
import { Module } from '@nestjs/common';
import { ClientsModule, Transport, RmqOptions } from '@nestjs/microservices';
import { RabbitmqController } from './rabbitmq.controller';
import { RabbitmqService } from './rabbitmq.service';
@Module({
imports: [
ClientsModule.register([
{
name: 'RABBITMQ_SERVICE', // Service name
transport: Transport.RMQ,
options: {
urls: ['amqp://localhost:5672'], // RabbitMQ server URL
queue: 'my_queue', // Queue name
queueOptions: {
durable: true, // Make the queue durable
},
exchange: 'my_exchange', // Exchange name
exchangeType: 'fanout', // Exchange type
exchangeOptions: {
durable: true, // Make the exchange durable
},
} as RmqOptions,
},
]),
],
controllers: [RabbitmqController],
providers: [RabbitmqService],
})
export class RabbitmqModule {}
//rabbitmq-client.service.ts
import { Injectable } from '@nestjs/common';
import { ClientProxy, ClientProxyFactory, Transport, RmqOptions } from '@nestjs/microservices';
@Injectable()
export class RabbitmqClientService {
private readonly rabbitmqClient: ClientProxy;
constructor() {
this.rabbitmqClient = ClientProxyFactory.create({
transport: Transport.RMQ,
options: {
urls: ['amqp://localhost:5672'], // RabbitMQ server URL
queue: 'my_queue', // Queue name
queueOptions: {
durable: true, // Make the queue durable
},
exchange: 'my_exchange', // Exchange name
exchangeType: 'fanout', // Exchange type
exchangeOptions: {
durable: true, // Make the exchange durable
},
} as RmqOptions,
});
}
async sendToQueue(pattern: string, data: any): Promise<void> {
await this.rabbitmqClient.send(pattern, data).toPromise();
}
async publishToExchange(exchange: string, pattern: string, data: any): Promise<void> {
await this.rabbitmqClient.emit(pattern, data, { exchange }).toPromise();
}
async receive(pattern: string, callback: (data: any) => void): Promise<void> {
await this.rabbitmqClient.connect();
await this.rabbitmqClient.subscribe(pattern, callback);
}
async bindQueueToExchange(queue: string, exchange: string, pattern: string): Promise<void> {
await this.rabbitmqClient.bindQueue(queue, exchange, pattern);
}
async unbindQueueFromExchange(queue: string, exchange: string, pattern: string): Promise<void> {
await this.rabbitmqClient.unbindQueue(queue, exchange, pattern);
}
async assertQueue(queue: string, options: { durable: boolean }): Promise<void> {
await this.rabbitmqClient.assertQueue(queue, options);
}
async assertExchange(
exchange: string,
options: { durable: boolean; type: string }
): Promise<void> {
await this.rabbitmqClient.assertExchange(exchange, options);
}
async deleteQueue(queue: string): Promise<void> {
await this.rabbitmqClient.deleteQueue(queue);
}
async deleteExchange(exchange: string): Promise<void> {
await this.rabbitmqClient.deleteExchange(exchange);
}
async purgeQueue(queue: string): Promise<void> {
await this.rabbitmqClient.purgeQueue(queue);
}
async closeConnection(): Promise<void> {
await this.rabbitmqClient.close();
}
}
//rabbitmq-client.controller.ts
import { Controller } from '@nestjs/common';
import { MessagePattern, Payload } from '@nestjs/microservices';
import { RabbitmqClientService } from './rabbitmq-client.service';
@Controller()
export class RabbitmqClientController {
constructor(private readonly rabbitmqClientService: RabbitmqClientService) {}
@MessagePattern('send-to-queue')
async sendToQueue(@Payload() data: any): Promise<void> {
// Send a message to a RabbitMQ queue
await this.rabbitmqClientService.sendToQueue('my_queue', data);
}
@MessagePattern('publish-to-exchange')
async publishToExchange(@Payload() data: any): Promise<void> {
// Publish a message to a RabbitMQ exchange
await this.rabbitmqClientService.publishToExchange('my_exchange', 'fanout', data);
}
@MessagePattern('receive')
async receive(@Payload() data: any): Promise<void> {
// Receive and process messages from RabbitMQ queue
await this.rabbitmqClientService.receive('my_queue', (message) => {
// Handle the received message
});
}
@MessagePattern('bind-queue-to-exchange')
async bindQueueToExchange(@Payload() data: any): Promise<void> {
// Bind a queue to a RabbitMQ exchange
await this.rabbitmqClientService.bindQueueToExchange('my_queue', 'my_exchange', 'fanout');
}
@MessagePattern('unbind-queue-from-exchange')
async unbindQueueFromExchange(@Payload() data: any): Promise<void> {
// Unbind a queue from a RabbitMQ exchange
await this.rabbitmqClientService.unbindQueueFromExchange('my_queue', 'my_exchange', 'fanout');
}
@MessagePattern('assert-queue')
async assertQueue(@Payload() data: any): Promise<void> {
// Assert the existence of a RabbitMQ queue
await this.rabbitmqClientService.assertQueue('my_queue', { durable: true });
}
@MessagePattern('assert-exchange')
async assertExchange(@Payload() data: any): Promise<void> {
// Assert the existence of a RabbitMQ exchange
await this.rabbitmqClientService.assertExchange('my_exchange', {
durable: true,
type: 'fanout',
});
}
@MessagePattern('delete-queue')
async deleteQueue(@Payload() data: any): Promise<void> {
// Delete a RabbitMQ queue
await this.rabbitmqClientService.deleteQueue('my_queue');
}
@MessagePattern('delete-exchange')
async deleteExchange(@Payload() data: any): Promise<void> {
// Delete a RabbitMQ exchange
await this.rabbitmqClientService.deleteExchange('my_exchange');
}
@MessagePattern('purge-queue')
async purgeQueue(@Payload() data: any): Promise<void> {
// Purge all messages from a RabbitMQ queue
await this.rabbitmqClientService.purgeQueue('my_queue');
}
@MessagePattern('close-connection')
async closeConnection(@Payload() data: any): Promise<void> {
// Close the RabbitMQ connection
await this.rabbitmqClientService.closeConnection();
}
}
//rabbitmq-client.module.ts
import { Module } from '@nestjs/common';
import { ClientsModule, Transport, ClientProxyFactory } from '@nestjs/microservices';
import { RabbitmqClientController } from './rabbitmq-client.controller';
import { RabbitmqClientService } from './rabbitmq-client.service';
@Module({
imports: [
ClientsModule.register([
{
name: 'RABBITMQ_SERVICE',
transport: Transport.RMQ,
options: {
urls: ['amqp://localhost:5672'], // RabbitMQ server URL
queue: 'my_queue', // Queue name
queueOptions: {
durable: true, // Make the queue durable
},
exchange: 'my_exchange', // Exchange name
exchangeType: 'fanout', // Exchange type
exchangeOptions: {
durable: true, // Make the exchange durable
},
},
},
]),
],
controllers: [RabbitmqClientController],
providers: [RabbitmqClientService],
})
export class RabbitmqClientModule {}
kafka
//kafka.service.ts
import { Injectable } from '@nestjs/common';
import { ClientKafka, ClientProxyFactory, Transport } from '@nestjs/microservices';
@Injectable()
export class KafkaService {
private readonly kafkaClient: ClientKafka;
constructor() {
this.kafkaClient = ClientProxyFactory.create({
transport: Transport.KAFKA,
options: {
client: {
clientId: 'kafka-client',
brokers: ['localhost:9092'], // Kafka broker(s) URL
},
consumer: {
groupId: 'my-group', // Consumer group ID
},
},
});
}
async send(topic: string, message: any): Promise<void> {
await this.kafkaClient.send(topic, message).toPromise();
}
async emit(topic: string, message: any): Promise<void> {
await this.kafkaClient.emit(topic, message).toPromise();
}
async subscribeToResponseOf(pattern: string): Promise<void> {
await this.kafkaClient.subscribeToResponseOf(pattern);
}
async close(): Promise<void> {
await this.kafkaClient.close();
}
}
//kafka.controller.ts
import { Controller, Post, Body, Get, Query } from '@nestjs/common';
import { KafkaService } from './kafka.service';
import { MessagePattern, Payload, Ctx, KafkaContext } from '@nestjs/microservices';
@Controller('kafka')
export class KafkaController {
constructor(private readonly kafkaService: KafkaService) {}
@Post('send')
async sendMessage(@Body('topic') topic: string, @Body('message') message: any) {
await this.kafkaService.send(topic, message);
}
@Post('emit')
async emitMessage(@Body('topic') topic: string, @Body('message') message: any) {
await this.kafkaService.emit(topic, message);
}
@MessagePattern('topic-name') // Replace 'topic-name' with your topic
async handleMessage(@Payload() message: any, @Ctx() context: KafkaContext) {
const originalMessage = context.getMessage();
const responseTopic = context.getTopic();
console.log(
`Received message: ${JSON.stringify(originalMessage.value)} on topic: ${responseTopic}`
);
// Handle the message
}
@Get('request-response')
async requestResponse(@Query('pattern') pattern: string, @Query('message') message: string) {
await this.kafkaService.subscribeToResponseOf(pattern);
return this.kafkaService.send(pattern, message);
}
}
//kafka.module.ts
import { Module, OnModuleInit, OnModuleDestroy } from '@nestjs/common';
import { KafkaService } from './kafka.service';
import { ClientsModule, Transport } from '@nestjs/microservices';
import { KafkaOptions, TransportStrategy } from '@nestjs/microservices/interfaces';
@Module({
imports: [
ClientsModule.registerAsync([
{
name: 'KAFKA_SERVICE',
useFactory: (): KafkaOptions['options'] => ({
transport: Transport.KAFKA,
options: {
client: {
clientId: 'kafka-client',
brokers: ['localhost:9092'],
},
consumer: {
groupId: 'my-group',
},
subscribe: {
// Kafka consumer subscribe options
},
run: {
// Kafka consumer run options
},
},
}),
},
]),
],
providers: [KafkaService],
exports: [KafkaService],
})
export class KafkaModule implements OnModuleInit, OnModuleDestroy {
constructor(private readonly kafkaService: KafkaService) {}
async onModuleInit() {
// Logic to be executed during module initialization
}
async onModuleDestroy() {
// Ensure graceful Kafka client disconnection
await this.kafkaService.close();
}
}
//kafka-client.service.ts
import { Injectable, OnModuleInit, OnModuleDestroy } from '@nestjs/common';
import { ClientKafka, ClientProxyFactory, Transport } from '@nestjs/microservices';
@Injectable()
export class KafkaClientService implements OnModuleInit, OnModuleDestroy {
private readonly kafkaClient: ClientKafka;
constructor() {
this.kafkaClient = ClientProxyFactory.create({
transport: Transport.KAFKA,
options: {
client: {
clientId: 'kafka-client',
brokers: ['localhost:9092'],
},
consumer: {
groupId: 'kafka-client-consumer',
},
},
});
}
async onModuleInit() {
const requestPatterns = ['pattern1', 'pattern2']; // Add your request patterns here
requestPatterns.forEach((pattern) => this.kafkaClient.subscribeToResponseOf(pattern));
await this.kafkaClient.connect();
}
async send(topic: string, message: any): Promise<void> {
return this.kafkaClient.send(topic, message).toPromise();
}
async emit(topic: string, message: any): Promise<void> {
return this.kafkaClient.emit(topic, message).toPromise();
}
async onModuleDestroy() {
await this.kafkaClient.close();
}
}
//kafka-client.controller.ts
import { Controller, Post, Body, Get, Query } from '@nestjs/common';
import { KafkaService } from './kafka.service';
import { Observable } from 'rxjs';
@Controller('kafka-client')
export class KafkaClientController {
constructor(private readonly kafkaService: KafkaService) {}
@Post('send')
async sendMessage(@Body('topic') topic: string, @Body('message') message: any): Promise<void> {
await this.kafkaService.send(topic, message);
}
@Post('emit')
async emitMessage(@Body('topic') topic: string, @Body('message') message: any): Promise<void> {
await this.kafkaService.emit(topic, message);
}
@Get('subscribe')
async subscribeToTopic(@Query('pattern') pattern: string): Promise<void> {
await this.kafkaService.subscribeToResponseOf(pattern);
}
// Additional methods can be added here to handle more Kafka operations as needed
}
//kafka-client.module.ts
import { Module } from '@nestjs/common';
import { KafkaService } from './kafka.service';
import { KafkaClientController } from './kafka-client.controller';
import { ClientsModule, Transport } from '@nestjs/microservices';
@Module({
imports: [
ClientsModule.register([
{
name: 'KAFKA_SERVICE',
transport: Transport.KAFKA,
options: {
client: {
clientId: 'kafka-client',
brokers: ['localhost:9092'],
},
consumer: {
groupId: 'my-group',
},
},
},
]),
],
controllers: [KafkaClientController],
providers: [KafkaService],
})
export class KafkaClientModule {}
custom-transport
//custom-transport.service.ts
import { Injectable } from '@nestjs/common';
import {
Server,
CustomTransportStrategy,
PacketId,
ReadPacket,
WritePacket,
} from '@nestjs/microservices';
@Injectable()
export class CustomTransportService extends Server implements CustomTransportStrategy {
private server: any; // This should be replaced with your actual server type.
public async listen(callback: () => void) {
this.server = this.createCustomServer();
this.bindEvents(this.server);
this.server.listen(() => {
console.log('Custom transport server is listening');
callback();
});
}
public close() {
this.server.close();
}
private createCustomServer(): any {
// Replace this with actual server initialization logic
return {
listen: (callback: () => void) => callback(),
close: () => {},
onMessage: (callback: (packet: ReadPacket) => void) => {},
sendMessage: (packet: WritePacket) => {},
};
}
private bindEvents(server: any) {
server.onMessage(async (packet: ReadPacket) => {
const handler = this.getHandlerByPattern(packet.pattern);
if (!handler) {
return;
}
const response = await handler(packet.data);
server.sendMessage({ id: packet.id, data: response });
});
}
protected async dispatchEvent(packet: ReadPacket): Promise<any> {
const pattern = this.normalizePattern(packet.pattern);
const handler = this.getHandlerByPattern(pattern);
if (handler) {
await handler(packet.data);
}
}
private getHandlerByPattern(pattern: string): Function | undefined {
return this.messageHandlers.get(pattern);
}
protected normalizePattern(pattern: string): string {
return JSON.stringify(patter);
}
protected async handleMessage(
packet: ReadPacket,
callback: (packet: WritePacket) => void
): Promise<void> {
const pattern = this.normalizePattern(packet.pattern);
const handler = this.getHandlerByPattern(pattern);
if (handler) {
const response = await handler(packet.data);
const responsePacket: WritePacket = { id: packet.id, data: response, err: null };
callback(responsePacket);
} else {
const noHandlerPacket: WritePacket = {
id: packet.id,
err: new Error('No handler for pattern'),
data: null,
};
callback(noHandlerPacket);
}
}
}
//custom-transport.controller.ts
import { Controller, Inject } from '@nestjs/common';
import { MessagePattern, EventPattern, Ctx, Payload } from '@nestjs/microservices';
import { CustomTransportService } from './custom-transport.service';
@Controller()
export class CustomTransportController {
constructor(private readonly customTransportService: CustomTransportService) {}
@MessagePattern('pattern1')
async handleMessagePattern1(@Payload() data: any, @Ctx() context: any): Promise<any> {
try {
// Handle the message based on the custom protocol for pattern1
return this.processDataForPattern1(data, context);
} catch (error) {
// Handle errors specifically for pattern1
return this.handleError(error, 'pattern1');
}
}
@MessagePattern('pattern2')
async handleMessagePattern2(@Payload() data: any, @Ctx() context: any): Promise<any> {
// Similar implementation for pattern2
// ...
}
@EventPattern('event1')
async handleEventPattern1(@Payload() data: any, @Ctx() context: any): Promise<void> {
// Handle events, these might not need a response
this.processEventForPattern1(data, context);
}
private processDataForPattern1(data: any, context: any): any {
// Process the data for pattern1
// This method should be implemented based on your specific business logic
// ...
}
private processEventForPattern1(data: any, context: any): void {
// Process the event for event1
// This could be logging, triggering other processes, etc.
// ...
}
private handleError(error: any, pattern: string): any {
// Generic error handling logic
console.error(`Error handling message for ${pattern}:`, error);
return { error: 'An error occurred' };
}
// Additional utility methods or business logic can be added here
}
//custom-transport.module.ts
import { Module } from '@nestjs/common';
import { CustomTransportService } from './custom-transport.service';
import { CustomTransportController } from './custom-transport.controller';
@Module({
controllers: [CustomTransportController],
providers: [CustomTransportService],
exports: [CustomTransportService],
})
export class CustomTransportModule {}
//custom-transport-client.service.ts
import { Injectable } from '@nestjs/common';
import {
Client,
CustomTransportStrategy,
PacketId,
ReadPacket,
WritePacket,
} from '@nestjs/microservices';
@Injectable()
export class CustomTransportClientService extends Client implements CustomTransportStrategy {
private client: any; // Replace with your actual client type
constructor() {
super();
this.client = this.createCustomClient();
}
public async connect(): Promise<any> {
return new Promise((resolve, reject) => {
// Implement the connection logic for your custom client
this.client.connect((err) => {
if (err) {
return reject(err);
}
this.isConnected = true;
resolve();
});
});
}
public async close(): Promise<any> {
this.client.close();
this.isConnected = false;
}
public async send(packet: ReadPacket, callback: (packet: WritePacket) => void): Promise<any> {
// Implement the send logic of your custom client
this.client.sendMessage(packet, (response) => {
callback({ err: null, response, isDisposed: true });
});
}
public async emit(packet: ReadPacket): Promise<any> {
// Implement the emit logic of your custom client
this.client.emitMessage(packet);
}
private createCustomClient(): any {
// Replace with your custom client initialization logic
return {
connect: (callback: (err: any) => void) => callback(null),
close: () => {},
sendMessage: (packet: ReadPacket, callback: (response: WritePacket) => void) =>
callback({ response: 'ok' }),
emitMessage: (packet: ReadPacket) => {},
};
}
}
//custom-transport-client.controller.ts
import { Controller, Get, Post, Put, Delete, Body, Param, Query } from '@nestjs/common';
import { CustomTransportClientService } from './custom-transport-client.service';
@Controller('custom-transport')
export class CustomTransportClientController {
constructor(private readonly customTransportClientService: CustomTransportClientService) {}
@Get('query/:pattern')
async queryData(@Param('pattern') pattern: string, @Query() query: any): Promise<any> {
return this.sendRequest(pattern, query);
}
@Get('data/:pattern')
async getData(@Param('pattern') pattern: string): Promise<any> {
return this.sendRequest(pattern, null);
}
@Post('data/:pattern')
async postData(@Param('pattern') pattern: string, @Body() data: any): Promise<any> {
return this.sendRequest(pattern, data);
}
@Put('data/:pattern')
async updateData(@Param('pattern') pattern: string, @Body() data: any): Promise<any> {
return this.sendRequest(pattern, data);
}
@Delete('data/:pattern')
async deleteData(@Param('pattern') pattern: string): Promise<any> {
return this.sendRequest(pattern, null);
}
private sendRequest(pattern: string, data: any): Promise<any> {
return new Promise((resolve, reject) => {
this.customTransportClientService.send({ pattern, data }, (response) => {
if (response.err) {
reject(response.err);
} else {
resolve(response.response);
}
});
});
}
}
//custom-transport-client.module.ts
import { Module } from '@nestjs/common';
import { CustomTransportClientService } from './custom-transport-client.service';
import { CustomTransportClientController } from './custom-transport-client.controller';
@Module({
controllers: [CustomTransportClientController],
providers: [CustomTransportClientService],
})
export class CustomTransportClientModule {}
Exception-Filter
//exception-filters.service.ts
import { Injectable, Logger } from '@nestjs/common';
import { RpcException } from '@nestjs/microservices';
@Injectable()
export class ExceptionFiltersService {
private readonly logger = new Logger(ExceptionFiltersService.name);
handleRpcException(exception: any): RpcException {
// Process and log the exception, then return a formatted RpcException
this.logException(exception);
return this.formatRpcException(exception);
}
private logException(exception: any): void {
// Implement logging logic here
this.logger.error(`Exception caught: ${exception.message}`, exception.stack);
}
private formatRpcException(exception: any): RpcException {
// Convert the exception to a RpcException if it's not already one
if (exception instanceof RpcException) {
return exception;
}
// Format the exception to RpcException format
const formattedException = new RpcException({
message: exception.message,
status: 'error',
data: exception.response || null,
});
return formattedException;
}
// Additional methods for handling other types of exceptions can be added here
}
// exception-filters.controller.ts
import { Controller, Get, UseFilters, HttpException, HttpStatus } from '@nestjs/common';
import { RpcException, RpcExceptionFilter, Transport } from '@nestjs/microservices';
import { ExceptionFiltersService } from './exception-filters.service';
@Controller('exception-filters')
export class ExceptionFiltersController {
constructor(private exceptionFiltersService: ExceptionFiltersService) {}
@Get()
@UseFilters(new RpcExceptionFilter())
getDefaultException() {
// Simulate a default exception
throw new RpcException('Default exception');
}
@Get('custom-exception')
getCustomException() {
try {
// Simulate a custom exception
throw new HttpException('Custom exception', HttpStatus.BAD_REQUEST);
} catch (exception) {
throw this.exceptionFiltersService.handleRpcException(exception);
}
}
@Get('async-exception')
async getAsyncException() {
// Simulate an async exception
await new Promise((_, reject) => {
setTimeout(() => reject(new RpcException('Async exception')), 1000);
}).catch((exception) => {
throw this.exceptionFiltersService.handleRpcException(exception);
});
}
@Get('transport-specific-exception')
@UseFilters(new RpcExceptionFilter({ transport: Transport.TCP }))
getTransportSpecificException() {
// Simulate a transport-specific exception
throw new RpcException('Transport-specific exception');
}
// Additional endpoints to demonstrate different exception scenarios can be added here
}
// exception-filters.module.ts
import { Module } from '@nestjs/common';
import { APP_FILTER } from '@nestjs/core';
import { RpcExceptionFilter } from '@nestjs/microservices';
import { ExceptionFiltersService } from './exception-filters.service';
@Module({
providers: [
{
provide: APP_FILTER,
useFactory: (exceptionFiltersService: ExceptionFiltersService) => {
return new RpcExceptionFilter({
catch: (exception, host) => {
return exceptionFiltersService.handleRpcException(exception);
},
});
},
inject: [ExceptionFiltersService],
},
ExceptionFiltersService,
],
})
export class ExceptionFiltersModule {}
// exception-filters-client.service.ts
import { Injectable, Catch, ArgumentsHost, ExceptionFilter } from '@nestjs/common';
import { RpcException, ClientProxy } from '@nestjs/microservices';
import { ExceptionFiltersService } from './exception-filters.service';
@Injectable()
@Catch()
export class ExceptionFiltersClientService implements ExceptionFilter {
constructor(private exceptionFiltersService: ExceptionFiltersService) {}
catch(exception: any, host: ArgumentsHost) {
if (exception instanceof RpcException) {
// Specialized handling for RPC exceptions
const formattedException = this.exceptionFiltersService.handleRpcException(exception);
this.handleRpcException(formattedException, host);
} else {
// General exception handling
this.handleGeneralException(exception, host);
}
}
private handleRpcException(exception: RpcException, host: ArgumentsHost) {
// Implement custom logic for RpcException
// e.g., logging, custom response formatting
// You can use host.switchToHttp() or host.switchToWs() for http/ws specific context
}
private handleGeneralException(exception: any, host: ArgumentsHost) {
// Implement custom logic for other types of exceptions
// e.g., logging, different response formatting
// You can use host.switchToHttp() or host.switchToWs() for http/ws specific context
}
// Additional methods for more specific exception handling can be added here
}
// exception-filters-client.controller.ts
import { Controller, Get, Param, Post, Body, UseFilters } from '@nestjs/common';
import { RpcException } from '@nestjs/microservices';
import { ExceptionFiltersClientService } from './exception-filters-client.service';
@Controller('exceptions')
@UseFilters(ExceptionFiltersClientService)
export class ExceptionFiltersClientController {
@Get('/rpc/:message')
triggerRpcException(@Param('message') message: string) {
throw new RpcException(`RPC error: ${message}`);
}
@Get('/standard/:message')
triggerStandardException(@Param('message') message: string) {
throw new Error(`Standard error: ${message}`);
}
@Post('/custom')
triggerCustomException(@Body() body: { type: string; message: string }) {
if (body.type === 'rpc') {
throw new RpcException(body.message);
} else {
throw new Error(body.message);
}
}
// Add more endpoints here to trigger various types of exceptions
}
// exception-filters-client.module.ts
import { Module } from '@nestjs/common';
import { ExceptionFiltersClientController } from './exception-filters-client.controller';
import { ExceptionFiltersClientService } from './exception-filters-client.service';
import { ExceptionFiltersService } from './exception-filters.service';
@Module({
controllers: [ExceptionFiltersClientController],
providers: [
ExceptionFiltersService,
{
provide: 'EXCEPTION_FILTERS_CLIENT',
useClass: ExceptionFiltersClientService,
},
],
})
export class ExceptionFiltersClientModule {}
Previous Blog← nest-mongoose
Next Blognest-typeorm →