Issue
I'm creating microservice by nestjs, transfer throw rabbitmq. How to make microservice receive messages from queue in turn waiting for complete of the previous one.
- main.ts
import { NestFactory } from '@nestjs/core';
import { AppModule } from './app.module';
import { Transport } from '@nestjs/microservices';
async function bootstrap() {
const app = await NestFactory.createMicroservice(AppModule, {
transport: Transport.RMQ,
options: {
urls: [`amqp://localhost:5672`],
queue: 'rmq_queue',
queueOptions: { durable: false },
prefetchCount: 1,
},
});
await app.listenAsync();
}
bootstrap();
- app.controller.ts
import { Controller, Logger } from '@nestjs/common';
import { EventPattern } from '@nestjs/microservices';
@Controller()
export class AppController {
@EventPattern('hello')
async handleHello(): Promise<void> {
Logger.log('-handle-');
await (new Promise(resolve => setTimeout(resolve, 5000)));
Logger.log('---hello---');
}
}
- client.js
const { ClientRMQ } = require('@nestjs/microservices');
(async () => {
const client = new ClientRMQ({
urls: ['amqp://localhost:5672'],
queue: 'rmq_queue',
queueOptions: { durable: false },
});
await client.connect();
for (let i = 0; i < 3; i++) {
client.emit('hello', 0).subscribe();
}
})();
https://github.com/heySasha/nest-rmq
Actual output:
[Nest] 9560 - 05/14/2019, 1:53 PM -handle- +2ms
[Nest] 9560 - 05/14/2019, 1:53 PM -handle- +9ms
[Nest] 9560 - 05/14/2019, 1:53 PM -handle- +12ms
[Nest] 9560 - 05/14/2019, 1:54 PM ---hello--- +4967ms
[Nest] 9560 - 05/14/2019, 1:54 PM ---hello--- +2ms
[Nest] 9560 - 05/14/2019, 1:54 PM ---hello--- +1ms
But i expect:
[Nest] 9560 - 05/14/2019, 1:53 PM -handle- +2ms
[Nest] 9560 - 05/14/2019, 1:54 PM ---hello--- +5067ms
[Nest] 9560 - 05/14/2019, 1:53 PM -handle- +2ms
[Nest] 9560 - 05/14/2019, 1:54 PM ---hello--- +5067ms
[Nest] 9560 - 05/14/2019, 1:53 PM -handle- +2ms
[Nest] 9560 - 05/14/2019, 1:54 PM ---hello--- +5067ms
Solution
I have written custom strategy.
import { isString, isUndefined } from '@nestjs/common/utils/shared.utils';
import { Observable } from 'rxjs';
import { CustomTransportStrategy, RmqOptions, Server } from '@nestjs/microservices';
import {
CONNECT_EVENT, DISCONNECT_EVENT, DISCONNECTED_RMQ_MESSAGE, NO_MESSAGE_HANDLER,
RQM_DEFAULT_IS_GLOBAL_PREFETCH_COUNT,
RQM_DEFAULT_PREFETCH_COUNT,
RQM_DEFAULT_QUEUE, RQM_DEFAULT_QUEUE_OPTIONS,
RQM_DEFAULT_URL,
} from '@nestjs/microservices/constants';
let rqmPackage: any = {};
export class ServerRMQ extends Server implements CustomTransportStrategy {
private server: any = null;
private channel: any = null;
private readonly urls: string[];
private readonly queue: string;
private readonly prefetchCount: number;
private readonly queueOptions: any;
private readonly isGlobalPrefetchCount: boolean;
constructor(private readonly options: RmqOptions['options']) {
super();
this.urls = this.getOptionsProp(this.options, 'urls') || [RQM_DEFAULT_URL];
this.queue =
this.getOptionsProp(this.options, 'queue') || RQM_DEFAULT_QUEUE;
this.prefetchCount =
this.getOptionsProp(this.options, 'prefetchCount') ||
RQM_DEFAULT_PREFETCH_COUNT;
this.isGlobalPrefetchCount =
this.getOptionsProp(this.options, 'isGlobalPrefetchCount') ||
RQM_DEFAULT_IS_GLOBAL_PREFETCH_COUNT;
this.queueOptions =
this.getOptionsProp(this.options, 'queueOptions') ||
RQM_DEFAULT_QUEUE_OPTIONS;
this.loadPackage('amqplib', ServerRMQ.name, () => require('amqplib'));
rqmPackage = this.loadPackage(
'amqp-connection-manager',
ServerRMQ.name,
() => require('amqp-connection-manager'),
);
}
public async listen(callback: () => void): Promise<void> {
await this.start(callback);
}
public close(): void {
if (this.channel) {
this.channel.close();
}
if (this.server) {
this.server.close();
}
}
public async start(callback?: () => void) {
this.server = this.createClient();
this.server.on(CONNECT_EVENT, (_: any) => {
this.channel = this.server.createChannel({
json: false,
setup: (channel: any) => this.setupChannel(channel, callback),
});
});
this.server.on(DISCONNECT_EVENT, (err: any) => {
this.logger.error(DISCONNECTED_RMQ_MESSAGE);
});
}
public createClient<T = any>(): T {
const socketOptions = this.getOptionsProp(this.options, 'socketOptions');
return rqmPackage.connect(this.urls, socketOptions);
}
public async setupChannel(channel: any, callback: () => void) {
await channel.assertQueue(this.queue, this.queueOptions);
await channel.prefetch(this.prefetchCount, this.isGlobalPrefetchCount);
channel.consume(
this.queue,
(msg: any) => this.handleMessage(msg)
.then(() => this.channel.ack(msg)) // Ack message after complete
.catch(err => {
// error handling
this.logger.error(err);
return this.channel.ack(msg);
}),
{ noAck: false },
);
callback();
}
public async handleMessage(message: any): Promise<void> {
const { content, properties } = message;
const packet = JSON.parse(content.toString());
const pattern = isString(packet.pattern)
? packet.pattern
: JSON.stringify(packet.pattern);
if (isUndefined(packet.id)) {
return this.handleEvent(pattern, packet);
}
const handler = this.getHandlerByPattern(pattern);
if (!handler) {
const status = 'error';
return this.sendMessage(
{ status, err: NO_MESSAGE_HANDLER },
properties.replyTo,
properties.correlationId,
);
}
const response$ = this.transformToObservable(
await handler(packet.data),
) as Observable<any>;
const publish = <T>(data: T) =>
this.sendMessage(data, properties.replyTo, properties.correlationId);
if (response$) {
this.send(response$, publish);
}
}
public sendMessage<T = any>(
message: T,
replyTo: any,
correlationId: string,
): void {
const buffer = Buffer.from(JSON.stringify(message));
this.channel.sendToQueue(replyTo, buffer, { correlationId });
}
}
The core thing changed from the standard ServerRMQ
is setupChannel()
part where we now pass noAck: false
and acknowledge manually in the finally part of the this.handleMessage(msg)
using this.channel.ack(msg)
.
Answered By - Aleksandr Yatsenko Answer Checked By - Robin (PHPFixing Admin)
0 Comments:
Post a Comment
Note: Only a member of this blog may post a comment.