PHPFixing
  • Privacy Policy
  • TOS
  • Ask Question
  • Contact Us
  • Home
  • PHP
  • Programming
  • SQL Injection
  • Web3.0
Showing posts with label rabbitmq. Show all posts
Showing posts with label rabbitmq. Show all posts

Thursday, October 20, 2022

[FIXED] How to make waiting for the completion of actions, then receive a new message?

 October 20, 2022     microservices, nestjs, node.js, rabbitmq     No comments   

Issue

I'm creating microservice by nestjs, transfer throw rabbitmq. How to make microservice receive messages from queue in turn waiting for complete of the previous one.

  • main.ts
import { NestFactory } from '@nestjs/core';
import { AppModule } from './app.module';
import { Transport } from '@nestjs/microservices';

async function bootstrap() {
  const app = await NestFactory.createMicroservice(AppModule, {
    transport: Transport.RMQ,
    options: {
      urls: [`amqp://localhost:5672`],
      queue: 'rmq_queue',
      queueOptions: { durable: false },
      prefetchCount: 1,
    },
  });

  await app.listenAsync();
}

bootstrap();

  • app.controller.ts
import { Controller, Logger } from '@nestjs/common';
import { EventPattern } from '@nestjs/microservices';

@Controller()
export class AppController {
  @EventPattern('hello')
  async handleHello(): Promise<void> {
    Logger.log('-handle-');
    await (new Promise(resolve => setTimeout(resolve, 5000)));
    Logger.log('---hello---');
  }
}
  • client.js
const { ClientRMQ } = require('@nestjs/microservices');

(async () => {
  const client = new ClientRMQ({
    urls: ['amqp://localhost:5672'],
    queue: 'rmq_queue',
    queueOptions: { durable: false },
  });

  await client.connect();

  for (let i = 0; i < 3; i++) {
    client.emit('hello', 0).subscribe();
  }
})();

https://github.com/heySasha/nest-rmq

Actual output:

[Nest] 9560   - 05/14/2019, 1:53 PM   -handle- +2ms
[Nest] 9560   - 05/14/2019, 1:53 PM   -handle- +9ms
[Nest] 9560   - 05/14/2019, 1:53 PM   -handle- +12ms
[Nest] 9560   - 05/14/2019, 1:54 PM   ---hello--- +4967ms
[Nest] 9560   - 05/14/2019, 1:54 PM   ---hello--- +2ms
[Nest] 9560   - 05/14/2019, 1:54 PM   ---hello--- +1ms

But i expect:

[Nest] 9560   - 05/14/2019, 1:53 PM   -handle- +2ms
[Nest] 9560   - 05/14/2019, 1:54 PM   ---hello--- +5067ms
[Nest] 9560   - 05/14/2019, 1:53 PM   -handle- +2ms
[Nest] 9560   - 05/14/2019, 1:54 PM   ---hello--- +5067ms
[Nest] 9560   - 05/14/2019, 1:53 PM   -handle- +2ms
[Nest] 9560   - 05/14/2019, 1:54 PM   ---hello--- +5067ms

Solution

I have written custom strategy.

import { isString, isUndefined } from '@nestjs/common/utils/shared.utils';
import { Observable } from 'rxjs';
import { CustomTransportStrategy, RmqOptions, Server } from '@nestjs/microservices';
import {
    CONNECT_EVENT, DISCONNECT_EVENT, DISCONNECTED_RMQ_MESSAGE, NO_MESSAGE_HANDLER,
    RQM_DEFAULT_IS_GLOBAL_PREFETCH_COUNT,
    RQM_DEFAULT_PREFETCH_COUNT,
    RQM_DEFAULT_QUEUE, RQM_DEFAULT_QUEUE_OPTIONS,
    RQM_DEFAULT_URL,
} from '@nestjs/microservices/constants';

let rqmPackage: any = {};

export class ServerRMQ extends Server implements CustomTransportStrategy {
    private server: any = null;
    private channel: any = null;
    private readonly urls: string[];
    private readonly queue: string;
    private readonly prefetchCount: number;
    private readonly queueOptions: any;
    private readonly isGlobalPrefetchCount: boolean;

    constructor(private readonly options: RmqOptions['options']) {
        super();
        this.urls = this.getOptionsProp(this.options, 'urls') || [RQM_DEFAULT_URL];
        this.queue =
            this.getOptionsProp(this.options, 'queue') || RQM_DEFAULT_QUEUE;
        this.prefetchCount =
            this.getOptionsProp(this.options, 'prefetchCount') ||
            RQM_DEFAULT_PREFETCH_COUNT;
        this.isGlobalPrefetchCount =
            this.getOptionsProp(this.options, 'isGlobalPrefetchCount') ||
            RQM_DEFAULT_IS_GLOBAL_PREFETCH_COUNT;
        this.queueOptions =
            this.getOptionsProp(this.options, 'queueOptions') ||
            RQM_DEFAULT_QUEUE_OPTIONS;

        this.loadPackage('amqplib', ServerRMQ.name, () => require('amqplib'));
        rqmPackage = this.loadPackage(
            'amqp-connection-manager',
            ServerRMQ.name,
            () => require('amqp-connection-manager'),
        );
    }

    public async listen(callback: () => void): Promise<void> {
        await this.start(callback);
    }

    public close(): void {
        if (this.channel) {
            this.channel.close();
        }

        if (this.server) {
            this.server.close();
        }
    }

    public async start(callback?: () => void) {
        this.server = this.createClient();
        this.server.on(CONNECT_EVENT, (_: any) => {
            this.channel = this.server.createChannel({
                json: false,
                setup: (channel: any) => this.setupChannel(channel, callback),
            });
        });
        this.server.on(DISCONNECT_EVENT, (err: any) => {
            this.logger.error(DISCONNECTED_RMQ_MESSAGE);
        });
    }

    public createClient<T = any>(): T {
        const socketOptions = this.getOptionsProp(this.options, 'socketOptions');
        return rqmPackage.connect(this.urls, socketOptions);
    }

    public async setupChannel(channel: any, callback: () => void) {
        await channel.assertQueue(this.queue, this.queueOptions);
        await channel.prefetch(this.prefetchCount, this.isGlobalPrefetchCount);
        channel.consume(
            this.queue,
            (msg: any) => this.handleMessage(msg)
                .then(() => this.channel.ack(msg)) // Ack message after complete
                .catch(err => {
                    // error handling
                    this.logger.error(err);
                    return this.channel.ack(msg);
                }),
            { noAck: false },
        );
        callback();
    }

    public async handleMessage(message: any): Promise<void> {
        const { content, properties } = message;
        const packet = JSON.parse(content.toString());
        const pattern = isString(packet.pattern)
            ? packet.pattern
            : JSON.stringify(packet.pattern);

        if (isUndefined(packet.id)) {
            return this.handleEvent(pattern, packet);
        }

        const handler = this.getHandlerByPattern(pattern);

        if (!handler) {
            const status = 'error';

            return this.sendMessage(
                { status, err: NO_MESSAGE_HANDLER },
                properties.replyTo,
                properties.correlationId,
            );
        }

        const response$ = this.transformToObservable(
            await handler(packet.data),
        ) as Observable<any>;

        const publish = <T>(data: T) =>
            this.sendMessage(data, properties.replyTo, properties.correlationId);

        if (response$) {
            this.send(response$, publish);
        }

    }

    public sendMessage<T = any>(
        message: T,
        replyTo: any,
        correlationId: string,
    ): void {
        const buffer = Buffer.from(JSON.stringify(message));
        this.channel.sendToQueue(replyTo, buffer, { correlationId });
    }
}

The core thing changed from the standard ServerRMQ is setupChannel() part where we now pass noAck: false and acknowledge manually in the finally part of the this.handleMessage(msg) using this.channel.ack(msg).



Answered By - Aleksandr Yatsenko
Answer Checked By - Robin (PHPFixing Admin)
Read More
  • Share This:  
  •  Facebook
  •  Twitter
  •  Stumble
  •  Digg

Wednesday, September 21, 2022

[FIXED] How to configure RabbitMq so that multiple apps can manage their own queues and bindings to a shared exchange

 September 21, 2022     amqp, rabbitmq, virtualhost     No comments   

Issue

We have an app that publishing to single exchange (using amqp). Additionally we have a number of apps interested in consuming messages from this exchange. To that end, they create queues and bindings from the queues to the exchange.

We would like to ensure that each app's queues and bindings can only be managed by that app and the user that the app connects with. I envisaged using virtual hosts so that the exchange sat in a /common virtual host which each app's user had read access to and each app's queues and bindings lived in their own /<app> virtual host which the user had full access to.

The documentation, however, suggests that a user cannot access more than one virtual host simultaneously within a channel and the API doesn't provide an option to specify virtual host as part of bindQueue().

Is there a way to achieve this?


Solution

I think to achieve what you asked you need to use federation. I have the same domain problem. One exchange and then different application consuming from different queues. If I am right the step are:

Exchange A => federation To Exchange 1/2/3/4/... these exchange have a different vhost each other.

create different vhost and user for different applications. Give the access to these vhost to different exchange (Exchange 1/2/3/4/)

create different queue to bind from the different Exchange 1/2/3/4/

Is this clear?



Answered By - Danilo
Answer Checked By - Robin (PHPFixing Admin)
Read More
  • Share This:  
  •  Facebook
  •  Twitter
  •  Stumble
  •  Digg

Tuesday, September 20, 2022

[FIXED] How to stop consuming message from selective queue - RabbitMQ

 September 20, 2022     consumer, queue, rabbitmq     No comments   

Issue

QueueingConsumer consumer = new QueueingConsumer(channel);
System.out.println(consumer.getConsumerTag());
channel.basicConsume("queue1", consumer);
channel.basicConsume("queue3", consumer);

Is it possible to stop consuming the messages from the queue "queue3" alone dynamically?


Solution

Yes you can, using channel.basicCancel(consumerTag); EDIT For example:

String tag3 = channel.basicConsume("queue3", consumer);
channel.basicCancel(tag3)

Here you can find a code that unsubscribe a consumer after 5 seconds:

String tag1 = channel.basicConsume(myQueue, autoAck, consumer);
String tag2 = channel.basicConsume(myQueue2, autoAck, consumer);
    executorService.execute(new Runnable() {
        @Override
        public void run() {
            while (true) {
                Delivery delivery;
                try {
                    delivery = consumer.nextDelivery();
                    String message = new String(delivery.getBody());
                    System.out.println("Received: " + message);
                } catch (Exception ex) {
                    Logger.getLogger(TestMng.class.getName()).log(Level.SEVERE, null, ex);
                }
            }
        }
    });
    System.out.println("Consumers Ready");
    try {
        Thread.sleep(5000);
    } catch (InterruptedException ex) {
        Logger.getLogger(TestMng.class.getName()).log(Level.SEVERE, null, ex);
    }

    channel.basicCancel(tag2); /// here you remove only the Myqueue2

I hope it can be useful.



Answered By - Gabriele Santomaggio
Answer Checked By - Mildred Charles (PHPFixing Admin)
Read More
  • Share This:  
  •  Facebook
  •  Twitter
  •  Stumble
  •  Digg

[FIXED] What is an elegant way of halting consumption of messages gracefully in the C# client for RabbitMQ?

 September 20, 2022     amqp, c#, consumer, messaging, rabbitmq     No comments   

Issue

I am setting up a standard standalone thread listening to RabbitMQ in C#. Suppose the method for listening in the thread looks like this:

public void Listen()
{
    using (var channel = connection.CreateModel())
    {
        var consumer = SetupQueues(channel);
        while (true)
        {
            var ea = consumer.Queue.Dequeue();    // blocking call
            handler.HandleMessage(channel, ea);
        }
    }
}

What is an elegant way of halting consumption of messages gracefully in the C# client for RabbitMQ? Keep in mind I have found nothing of use in the RabbitMQ examples/docs or these SO questions:

  • How to stop consuming message from selective queue - RabbitMQ
  • How to pause and resume consumption gracefully in rabbitmq, pika python
  • What is the best way to safely end a java application with running RabbitMQ consumers

The issue here is consumer.Queue.Dequeue() is a blocking call. I have tried these options:

  • Calling channel.BasicCancel(string tag). This causes a System.IO.EndOfStreamException in the blocking call. I do not want to use this exception as part of the control flow for obvious reasons.

  • Calling consumer.Queue.Dequeue(int millisecondsTimeout, out T result) and checking a flag in between loop iterations. This can work but seems hacky.

I want to let the thread exit gracefully and clean up any unmanaged resources I might have, so no thread aborting, etc.

Any help is appreciated. Thanks


Solution

The DeQueue with the timeout & flag is the way to do it. It's a very common pattern, and is why many blocking calls are provided with timeout-enabled versions.

Alternately, throwing a (known) exception isn't necessarily a bad thing for control flow. Gracefully shutting down could mean actually trapping the exception, commenting "this is thrown when requesting the channel shuts down", and then returning cleanly. This is how part of TPL works with the CancellationToken.



Answered By - Bryan Boettcher
Answer Checked By - Gilberto Lyons (PHPFixing Admin)
Read More
  • Share This:  
  •  Facebook
  •  Twitter
  •  Stumble
  •  Digg

[FIXED] How to redelivery messages that RabbitMQ already sent to consummers

 September 20, 2022     consumer, queue, rabbitmq, subscriber, timeout     No comments   

Issue

I created some consumers that connect to a queue in Rabbitmq, unack a number X of messages, like 10,50,100 each time to avoid unnecessary connections taking one by one. Sometimes we have situations that the queue was almost empty and just one consumer got all the messages. Unfortunetly it's possible that one of the messages is slow to process (a third party web service timed out for example) and all the others messages have to wait in line this one finishes, even if they are faster. While this, other consumers are empty and have nothing to do, but they can't take the messages that the first one still haven't processed.

If I could say to Rabbitmq delivery a sort of messages to consummer and if it don't ack after a period of time the massages have to be delivered to queue and be taken by another consumer. Somebody knows if there is a work around?


Solution

Have a look at my answer to this question.

Consider the following scenario:

  • A queue has thousands of messages sitting in it
  • A single consumer subscribes to the queue with AutoAck=true and no pre-fetch count set

What is going to happen?

RabbitMQ's implementation is to deliver an arbitrary number of messages to a client who has not pre-fetch count. Further, with Auto-Ack, prefetch count is irrelevant, because messages are acknowledged upon delivery to the consumer.

Thus, every message in the queue at that point will be delivered to the consumer immediately and the consumer will be inundated with messages. Assuming each message is small, but takes 5 minutes to process, it is entirely possible that this one consumer will be able to drain the entire queue before any other consumers can attach to it. And since AutoAck is turned on, the broker will forget about these messages immediately after delivery.

Obviously this is not a good scenario if you'd like to get those messages processed, because they've left the relative safety of the broker and are now sitting in RAM at the consuming endpoint. Let's say an exception is encountered that crashes the consuming endpoint - poof, all the messages are gone.

I believe what you're seeing is a case where you have AutoAck set to true. When this happens, the first consumer to connect is going to drain the whole queue if no other consumers connect before it has the chance to do so. Try setting AutoAck to false, then select a reasonable pre-fetch count (0-1 maybe?) and you won't see this behavior continue.



Answered By - theMayer
Answer Checked By - Senaida (PHPFixing Volunteer)
Read More
  • Share This:  
  •  Facebook
  •  Twitter
  •  Stumble
  •  Digg

Monday, September 19, 2022

[FIXED] How to declare custom error exchange for each consumer in EasyNetQ?

 September 19, 2022     c#, consumer, easynetq, rabbitmq     No comments   

Issue

I have four consumer when error occured message publishing to default EasyNetQ_Default_Error_Queue is it possible to each queue consumer write own error exchange

For example;

Queue Name : A    ErrorExchange :A_ErrorExchange
Queue Name : B    ErrorExchange :B_ErrorExchange

bus.Advanced.Conventions.ErrorExchangeNamingConvention = new ErrorExchangeNameConvention(info => "A_DeadLetter");

bus.Advanced.Conventions.ErrorExchangeNamingConvention = new ErrorExchangeNameConvention(info2 => "B_DeadLetter");

Solution

From the code you've provided, it looks like you're almost there -- you just need to override ErrorExchangeNamingConvention and ErrorQueueNamingConvention appropriately.

As an example, here's a method that will return an instance of IBus with these conventions overridden to incorporate the specified consumer name:

public IBus CreateBus(string connectionString, string consumerName) 
{
    var bus = RabbitHutch.CreateBus(connectionString);

    // Modify the following to create your error exchange name appropriately
    bus.Advanced.Container.Resolve<IConventions>().ErrorExchangeNamingConvention = 
        info => consumerName + "_ErrorExchange";

    // Modify the following to create your error queue name appropriately
    bus.Advanced.Container.Resolve<IConventions>().ErrorQueueNamingConvention = 
        () => consumerName + "_ErrorQueue";

    return bus;
}


Answered By - Donut
Answer Checked By - Pedro (PHPFixing Volunteer)
Read More
  • Share This:  
  •  Facebook
  •  Twitter
  •  Stumble
  •  Digg

[FIXED] How to change RabbitMQ Heartbeat without restart

 September 19, 2022     consumer, heartbeat, rabbitmq, rabbitmqctl     No comments   

Issue

There are several questions here in SO about RabbitMQ heartbeat but I haven't found one addressing how to actually change the default heartbeat value of 60 seconds (580 seconds in previous versions).

In the case when a consumer is running for longer than 60 seconds and is incapable of producing any traffic that would count as heartbeat (for example PHP consumers), RabbitMQ will close the connection considering the consumer is dead, but the consumer might continue to run, and when it tries to produce the ACK the connection is closed and you get an error message like:

Broken pipe or closed connection

One can set the heartbeat at the consumer side to a higher value, for example 1800 seconds, but if the broker configuration is not changed, then the lower value will be use, in case of the default value then 60 seconds. From RabbitMQ docs:

The broker and client will attempt to negotiate heartbeats by default. When both values are non-0, the lower of the requested values will be used. If one side uses a zero value (attempts to disable heartbeats) but the other does not, the non-zero value will be used.

To change the Heartbeat value one can add the following line in /etc/rabbitmq/rabbitmq.conf (using the new configuration format)

heartbeat = 1800

This requires a restart, so the question is: How to change the rabbitmq heartbeat value without a restart?


Solution

I'm answering my own question since it took me some time to find how to do this, the documentation on how to use eval was not very helpful.

It's possible to change RabbitMQ configuration values using eval:

Evaluate an arbitrary Erlang expression.

Using rabbitmqctl eval is then possible to change the heartbeat value without a restart like so:

# Set
rabbitmqctl eval 'application:set_env(rabbit, heartbeat, 1800).'

# Get 
rabbitmqctl eval 'application:get_env(rabbit, heartbeat).'


Answered By - lloiacono
Answer Checked By - Katrina (PHPFixing Volunteer)
Read More
  • Share This:  
  •  Facebook
  •  Twitter
  •  Stumble
  •  Digg

[FIXED] How do I set a number of retry attempts in RabbitMQ?

 September 19, 2022     .net, consumer, queue, rabbitmq     No comments   

Issue

I am using RabbitMQ and I have a queue that holds email messages. My consumer service de-queues messages and attempts to send them. If, for any reason, my consumer cannot send the message, I would like to re-queue the message to send again. I realize I can do a basicNack and set the requeue flag to be true, however, I don't want to requeue the message indefinitely (say, if our email system goes down, I don't want to continuously requeue unsent messages). I would like to define a finite number of times that I can requeue the message to be sent again. I can't set a field on the email message object, however, when I dequeue it and send a nack. The updated field is not present on the message in the queue. Is there any other way in which I can approach this? Thanks in advance.


Solution

There are no such feature like retry attempts in RabbitMQ (as well as in AMQP protocol).

Possible solution to implement retry attempts limit behavior:

  1. Redeliver message if it was not previously redelivered (check redelivered parameter on basic.deliver method - your library should have some interface for this) and drop it and then catch in dead letter exchange, then process somehow.

  2. Each time message cannot be processed publish it again but set or increment/decrement header field, say x-redelivered-count (you can chose any name you like, though). To get control over redeliveries in this case you have to check the field you set whether it reaches some limit (top or bottom - 0 is my choise, a-la ttl in ip header from tcp/ip).

  3. Store message unique key (say uuid, but you have to set it manually when you publish message) in Redis, memcache or other storage, even in mysql alongside with redeliveries count and then on each redelivery increment/decrement this value until it reach the limit.

  4. (for real geeks) write plugin that will implement such behavior like you want.

The pro of #3 is that redelivered message stay in queue head. This is important if you have long queue or if message order is important for you (note, that redeliveries will break strict messages order, see official docs for details or this question on SO).

P.S.:

There is similar answer in this topic, but in php. Look through it, maybe it helps you a bit (start reading it from words "There are multiple techniques to deal with cycle redeliver problem".



Answered By - pinepain
Answer Checked By - David Marino (PHPFixing Volunteer)
Read More
  • Share This:  
  •  Facebook
  •  Twitter
  •  Stumble
  •  Digg

Tuesday, July 12, 2022

[FIXED] How does rabbitmq cluster store message that routing to different nodes

 July 12, 2022     message, rabbitmq, storage     No comments   

Issue

RabbitMQ can send one message to different queues through exchange. While queueA and queueB on different node accept same message, will these two nodes store the message respectively on their own disk or using a common database to store this message once for sharing between nodes?


Solution

RabbitMQ in cluster does not share the same database-messages.

Each node has its own local database.

If want to learn more about that, I suggest to read: https://github.com/rabbitmq/internals/blob/master/queues_and_message_store.md



Answered By - Gabriele Santomaggio
Answer Checked By - Dawn Plyler (PHPFixing Volunteer)
Read More
  • Share This:  
  •  Facebook
  •  Twitter
  •  Stumble
  •  Digg

Monday, July 11, 2022

[FIXED] What are the limits of messages, queues and exchanges?

 July 11, 2022     amqp, message, message-queue, rabbitmq, rabbitmq-exchange     No comments   

Issue

  1. What are the allowed types of messages (strings, bytes, integers, etc.)?
  2. What is the maximum size of a message?
  3. What is the maximum number of queues and exchanges?

Solution

  1. Theoretically anything can be stored/sent as a message. You actually don't want to store anything on the queues. The system works most efficiently if the queues are empty most of the time. You can send anything you want to the queue with two preconditions:

    • The thing you are sending can be converted to and from a bytestring
    • The consumer knows exactly what it is getting and how to convert it to the original object

    Strings are pretty easy, they have a built in method for converting to and from bytes. If you know it is a string then you know how to convert it back. The best option is to use a markup string like XML, JSON, or YML. This way you can convert objects to Strings and back again to the original objects; they work across programming languages so your consumer can be written in a different language to your producer as long as it knows how to understand the object. I work in Java. I want to send complex messages with sub objects in the fields. I use my own message object. The message object has two additional methods toBytes and fromBytes that convert to and from the bytestream. I use routing keys that leave no doubt as to what type of message the consumer is receiving. The message is Serializable. This works fine, but is limiting as I can only use it with other Java programs.

  2. The size of the message is limited by the memory on the server, and if it is persistent then also the free HDD space too. You probably do not want to send messages that are too big; it might be better to send a reference to a file or DB.

    You might also want to read up on their performance measures: http://www.rabbitmq.com/blog/2012/04/17/rabbitmq-performance-measurements-part-1/ http://www.rabbitmq.com/blog/2012/04/25/rabbitmq-performance-measurements-part-2/

  3. Queues are pretty light weight, you will most likely be limited by the number of connections you have. It will depend on the server most likely. Here is some info on a similiar question: http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/2009-February/003042.html



Answered By - robthewolf
Answer Checked By - Dawn Plyler (PHPFixing Volunteer)
Read More
  • Share This:  
  •  Facebook
  •  Twitter
  •  Stumble
  •  Digg

[FIXED] How to ensure that messages get delivered?

 July 11, 2022     message, pika, python, rabbitmq     No comments   

Issue

How do you ensure that messages get delivered with Pika? By default it will not provide you with an error if the message was not delivered succesfully.

In this example several messages can be sent before pika acknowledges that the connection was down.

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
for index in xrange(10):
    channel.basic_publish(exchange='', routing_key='hello', 
                          body='Hello World #%s!' % index)
    print('Total Messages Sent: %s' % x)
connection.close()

Solution

When using Pika the channel.confirm_delivery() flag needs to be set before you start publishing messages. This is important so that Pika will confirm that each message has been sent successfully before sending the next message. This will however increase the time it takes to send messages to RabbitMQ, as delivery needs to be confirmed before the program can proceed with the next message.

channel.confirm_delivery()

try:
   for index in xrange(10):
       channel.basic_publish(exchange='', routing_key='hello', 
                              body='Hello World #%s!' % index)
       print('Total Messages Sent: %s' % x)
except pika.exceptions.ConnectionClosed as exc:
    print('Error. Connection closed, and the message was never delivered.')

basic_publish will return a Boolean depending if the message was sent or not. But, it is important to catch potential exceptions in case the connection is closed during transfer and handle it appropriately. As in those cases the exception will interrupt the flow of the program.



Answered By - eandersson
Answer Checked By - Senaida (PHPFixing Volunteer)
Read More
  • Share This:  
  •  Facebook
  •  Twitter
  •  Stumble
  •  Digg

Sunday, May 15, 2022

[FIXED] How to use docker-compose links to access username and password details from inside another container

 May 15, 2022     docker, docker-compose, rabbitmq, ubuntu     No comments   

Issue

I have a docker-compose file which contains details of my container as well as rabbitmq. Here is a cut down version of my docker-compose.yml file where I am using the container_name and links keywords to access the IP address of rabbitmq from inside my container.

version: "3.2"

environment:
    &my-env
    My_TEST_VAR1: 'test_1'
    My_TEST_VAR2: 'test_2'

rabbitmq:
    container_name: rabbitmq
    image: 'rabbitmq:3.6-management-alpine'
    ports:
      - '5672:5672'
      - '15672:15672'
    environment:
      AMQP_URL: 'amqp://rabbitmq?connection_attempts=5&retry_delay=5'
      RABBITMQ_DEFAULT_USER: "guest"
      RABBITMQ_DEFAULT_PASS: "guest"

  my-service:
    tty: true
    image: my_image_name:latest
    working_dir: /opt/services/my_service/
    command: python3.8 my_script.py
    ports:
      - 9000:9000
    links:
      - rabbitmq:rabbitmq.server
    environment:
      <<: *my-env

From inside my container I can ping the rabbitmq server successfully via:

ping rabbitmq.server

Is there I way I can access the rabbitmq default username and password using this link? Or do I have to just pass them as separate environment variables? (I would like to avoid this duplication if possible)


Solution

You should pass them using environment variables. Docker links at this point are an obsolete feature, and I'd recommend just outright deleting any links: you have left in your docker-compose.yml file. Compose sets up networking for you so that the Compose service names rabbitmq and my-server can be used as host names between the containers without any special setup; the environment variables that links provided were confusing and could unintentionally leak data.

If you want to avoid repeating things, you can use YAML anchor syntax as you already have, or write the environment variables into a separate env_file:. Unless you have a lot of settings or a lot of containers, just writing them in the docker-compose.yml file is easiest.

version: '3.8'
services:
  rabbitmq:
    image: 'rabbitmq:3.6-management-alpine'
    ports:
      - '5672:5672'
      - '15672:15672'
    environment:
      RABBITMQ_DEFAULT_USER: "guest"
      RABBITMQ_DEFAULT_PASS: "guest"
    # You may want volumes: to persist the queue.
    # As a special case for RabbitMQ only, you would need a hostname:.

  my-service:
    image: my_image_name:latest
    ports:
      - 9000:9000
    environment:
      # I'd just write these out.
      My_TEST_VAR1: 'test_1'
      My_TEST_VAR2: 'test_2'
      RABBITMQ_HOST: rabbitmq
      RABBITMQ_USER: guest
      RABBITMQ_PASSWORD: guest
    # working_dir: and command: should be in your Dockerfile as
    # WORKDIR and CMD respectively.  links: is obsolete.

In principle you can attach an anchor to any YAML node, though I'd find the syntax a little bit confusing if I was reading it. I'd tend to avoid syntax like this but it is technically an option.

services:
  rabbitmq:
    environment:
      RABBITMQ_DEFAULT_USER: &rabbitmq_user guest
  my-app:
    environment:
      RABBITMQ_USER: *rabbitmq_user

Finally, I hinted initially that the obsolete Docker links feature does republish environment variables. I wouldn't take advantage of this – it's an information leak in many ways, there's potential conflicts with the application's own environment variables, and the links feature in general is considered obsolete – but it is theoretically possible to use it

services:
  rabbitmq:
    environment:
      RABBITMQ_DEFAULT_USER: guest
  my-app:
    links: [rabbitmq]
docker-compose run my-app sh -c 'echo $RABBITMQ_DEFAULT_USER'

It'd be up to your application setup to understand the RabbitMQ image setup variables.



Answered By - David Maze
Answer Checked By - Clifford M. (PHPFixing Volunteer)
Read More
  • Share This:  
  •  Facebook
  •  Twitter
  •  Stumble
  •  Digg

Thursday, May 12, 2022

[FIXED] How to manage Messenger component memory in Symfony 5.3

 May 12, 2022     amqp, php, rabbitmq, symfony     No comments   

Issue

I use for my project in Symfony 5.3, the Messenger Component with a RabittMQ server. I want to manage my memory of my MessageHandler because my code is taking too much memory (Fatal error: Allowed memory size of 2147483648 bytes exhausted (tried to allocate 33554440 bytes).

For each message consumed, I feel like the MessageHandler retains the memory of the previous MessageHandler. This is my class where I run a command:

class MessageHandler implements MessageHandlerInterface
{

    private KernelInterface $kernel;

    public function __construct(KernelInterface $kernel)
    {
        $this->kernel = $kernel;
    }

    /**
     * @param RequestMessage $requestMessage
     * @throws \Exception
     */
    public function __invoke(RequestMessage $requestMessage)
    {
        $application = new Application($this->kernel);
        $application->setAutoExit(false);

        $input = new ArrayInput([
            'command' => 'app:my-command',
            'userId' => $requestMessage->getUserId(),
            '--no-debug' => ''
        ]);


        $output = new BufferedOutput();
        $application->run($input, $output);
    }
}

And I consume my messages with this command:

$ php bin/console messenger:consume -vv

I am looking for a solution to consume each of my messages with an independent memory. I don't know where the problem is, if someone can help me.

I can think of a memory leak but I don't understand why the memory of a message is not cleaned.


Solution

First of all, I am aware that I have a memory issue but I have to find a solution.

To run my command, I use Symfony's Process component to run my command in a different process to not overload RAM and run it independently. This is my resolve code:

use Symfony\Component\Process\Process;

$completedCommand = 'php bin/console app:my-command ' . $user->getId() . ' --no-debug';
$process = Process::fromShellCommandline($completedCommand);
$process->run();

if (!$process->isSuccessful()) {
    // Example: Throw an exception...
}


Answered By - Thomas Dulcamara
Answer Checked By - Mildred Charles (PHPFixing Admin)
Read More
  • Share This:  
  •  Facebook
  •  Twitter
  •  Stumble
  •  Digg

Wednesday, April 20, 2022

[FIXED] How to share a single promise based RabbitMQ connection across files or controllers in Node js instead of creating a new Connection each time?

 April 20, 2022     amqp, async-await, connection, node.js, rabbitmq     No comments   

Issue

amqplib library lets you create a rabbitmq connection and that object will be a segue to doing other things such as creating a channel and etc.

suppose that I'm going for a Producer/Consumer pattern, where each time a user hits a specific route, a job is produced and sent to the rabbitmq server where it's processed by certain consumers(workers).

app.post("/routethatdelegatesheavywork", async (req,res) => { 
  const amqpServerLink =
        "link-to-cloudmq";
      const connection = await amqp.connect(amqpServerLink);
      const channel = await connection.createChannel();
      //do other stuff with channel
})

while this "works", but i don't want to re-create that connection every time the controller is invoked since it makes the producer very slow and it's really not how it's supposed to be done.

here is where my problem comes:

how do i initialize one connection and re-use it every time i need it?

i have tried to create a connection outside controllers and use it when necessary but it's not possible since the connection is promise-based and await doesn't work on entry point and it has to be inside an async function to work.

although it is possible to run await without async using ESM (es modules) i don't want to do so since i have written all of the application using CommonJS (require("package")), changing that would require me to go through a lot of files and change every import/export according to ESM.

So, is there any other way to create one connection(that is promise based) and re-use it without having to migrate to ESM syntax?


Solution

Yes, remember that require in nodejs are singletons. Make a new amqpServerInterface module, and do

const amqpServerLink = "link-to-cloudmq"
const connection = amqp.connect(amqpServerLink)

function connect() {
      return connection
}

module.exports = {
     connect
}

Then in your controllers

const amqpServerInterface = require('amqpServerInterface')

app.post("/routethatdelegatesheavywork", async (req,res) => { 
 
      const connection = await amqpServerInterface.connect();
      const channel = await connection.createChannel();
      //do other stuff with channel
})

This will always return the same connection promise and will resolve to the save connection.



Answered By - Rani Sharim
Answer Checked By - Willingham (PHPFixing Volunteer)
Read More
  • Share This:  
  •  Facebook
  •  Twitter
  •  Stumble
  •  Digg

Wednesday, March 2, 2022

[FIXED] How to configure multiple exchanges in laravel-queue-rabbitmq?

 March 02, 2022     laravel, laravel-5, laravel-queue, rabbitmq     No comments   

Issue

I have found famous library for using RabbitMQ in Laravel.

In configuration config/queue.php I can specify only one exchange and queue name. Does it support multiple exchanges?


Solution

For future people who will search the answer for this question. There is one(bad) way to add multiple exchanges. You have to duplicate rabbitmq connection with new exchange and when you will want to publish the message to new exchange, you will just change connection.

This is a very ugly way, but i didn't another one. Laravel queue doesn't provide a method to change the exchange, but there is the method onConnection what you help you.

Here is a simple example

        'conn_one' => [

        'driver' => 'rabbitmq',
        'queue' => env('RABBITMQ_QUEUE', 'default'),
        'connection' => PhpAmqpLib\Connection\AMQPLazyConnection::class,

        'hosts' => [
            [
                'host' => env('RABBITMQ_HOST', '127.0.0.1'),
                'port' => env('RABBITMQ_PORT', 5672),
                'user' => env('RABBITMQ_USER', 'guest'),
                'password' => env('RABBITMQ_PASSWORD', 'guest'),
                'vhost' => env('RABBITMQ_VHOST', '/'),
            ],
        ],

        'options' => [
            'ssl_options' => [
                'cafile' => env('RABBITMQ_SSL_CAFILE', null),
                'local_cert' => env('RABBITMQ_SSL_LOCALCERT', null),
                'local_key' => env('RABBITMQ_SSL_LOCALKEY', null),
                'verify_peer' => env('RABBITMQ_SSL_VERIFY_PEER', true),
                'passphrase' => env('RABBITMQ_SSL_PASSPHRASE', null),
            ],
            'queue' => [
                'job' => VladimirYuldashev\LaravelQueueRabbitMQ\Queue\Jobs\RabbitMQJob::class,
                'exchange' => 'exchange_two',
                'exchange_type' => 'fanout',
            ],
        ],

     ],

    'conn_two' => [
        'driver' => 'rabbitmq',
        'queue' => env('RABBITMQ_QUEUE', 'default'),
        'connection' => PhpAmqpLib\Connection\AMQPLazyConnection::class,

        'hosts' => [
            [
                'host' => env('RABBITMQ_HOST', '127.0.0.1'),
                'port' => env('RABBITMQ_PORT', 5672),
                'user' => env('RABBITMQ_USER', 'guest'),
                'password' => env('RABBITMQ_PASSWORD', 'guest'),
                'vhost' => env('RABBITMQ_VHOST', '/'),
            ],
        ],

        'options' => [
            'ssl_options' => [
                'cafile' => env('RABBITMQ_SSL_CAFILE', null),
                'local_cert' => env('RABBITMQ_SSL_LOCALCERT', null),
                'local_key' => env('RABBITMQ_SSL_LOCALKEY', null),
                'verify_peer' => env('RABBITMQ_SSL_VERIFY_PEER', true),
                'passphrase' => env('RABBITMQ_SSL_PASSPHRASE', null),
            ],
            'queue' => [
                'job' => VladimirYuldashev\LaravelQueueRabbitMQ\Queue\Jobs\RabbitMQJob::class,
                'exchange' => 'exchange_two',
                'exchange_type' => 'fanout',
            ],
        ],
    ],

Use

ExampleJob::dispach($data)->onConnection('conn_one');
ExampleJob::dispach($data)->onConnection('conn_two');


Answered By - Alexandru Gojinetchi
Read More
  • Share This:  
  •  Facebook
  •  Twitter
  •  Stumble
  •  Digg

Monday, February 28, 2022

[FIXED] RabbitMQ and Yii console callback error

 February 28, 2022     php, rabbitmq, yii     No comments   

Issue

I am trying to get RabbitMQ working with the Yii console in order to send transactional emails, but I am experiencing problems with getting the PHP-AMQPLib library to work within Yii. My code is below:

<?php
class RabbitMqCommand extends CConsoleCommand {

    public function actionSendMail() {
        require_once ('/htdocs/code/wwwroot/protected/components/php-amqplib/amqp.inc');

        $conn = new AMQPConnection ( 'localhost', 5672, 'guest', '123456', '/' );
        $ch = $conn->channel ();

        $ch->queue_declare ( 'msgs', false, true, false, false );

        $ch->exchange_declare ( 'router', 'direct', false, true, false );

        $ch->queue_bind ( 'msgs', 'router');

        $ch->basic_consume ( 'msgs', 'consumer', false, false, false, false, 'processMessage' );

            // Loop as long as the channel has callbacks registered
        while ( count ( $ch->callbacks ) ) {
            $ch->wait ();
        }

        $ch->close();
        $conn->close();


    }

    public function processMessage($msg) 
        {
           //process and send email
        }

When I try to execute this code from the command line as follows:

php -q /htdocs/code/wwwroot/protected/yiic RabbitMQ SendMail

I receive the following error message:

PHP Error[2]: call_user_func() expects parameter 1 to be a valid callback, function 'processMessage' not found or invalid function name
    in file /htdocs/code/wwwroot/protected/components/php-amqplib/amqp.inc at line 1390
#0 /htdocs/code/wwwroot/protected/components/php-amqplib/amqp.inc(1390): call_user_func()
#1 unknown(0): AMQPChannel->basic_deliver()
#2 /htdocs/code/wwwroot/protected/components/php-amqplib/amqp.inc(167): call_user_func()
#3 /htdocs/code/wwwroot/protected/components/php-amqplib/amqp.inc(275): AMQPChannel->dispatch()
#4 /htdocs/code/wwwroot/protected/commands/RabbitMqCommand.php(29): AMQPChannel->wait()
#5 unknown(0): RabbitMqCommand->actionSendMail()
#6 /htdocs/code/framework/console/CConsoleCommand.php(135): ReflectionMethod->invokeArgs()
#7 /htdocs/code/framework/console/CConsoleCommandRunner.php(63): RabbitMqCommand->run()
#8 /htdocs/code/framework/console/CConsoleApplication.php(88): CConsoleCommandRunner->run()
#9 /htdocs/code/framework/base/CApplication.php(158): CConsoleApplication->processRequest()
#10 /htdocs/code/framework/yiic.php(33): CConsoleApplication->run()
#11 /htdocs/code/wwwroot/protected/yiic.php(7): require_once()
#12 /htdocs/code/wwwroot/protected/yiic(4): require_once()

Any idea what I may be doing wrong?


Solution

This error is happening because processMessage is an instance method of your object but the argument to basic_consume is expected to be a global PHP method.

See this thread for solutions: PHP Call a instance method with call_user_func within the same class



Answered By - Cody Caughlan
Read More
  • Share This:  
  •  Facebook
  •  Twitter
  •  Stumble
  •  Digg

Sunday, February 20, 2022

[FIXED] Symfony RabbitMQ Bundle in AWS (managed service): Any way to get this working?

 February 20, 2022     amazon-web-services, docker, php, rabbitmq, symfony     No comments   

Issue

We really tried a lot but it still can not get the Symfony RabbitMQ bundle (https://github.com/php-amqplib/RabbitMqBundle) running in AWS (with Docker). AWS only allows AMQPS and port 5671 to be opened in the AWS managed service.

This is our current configuration in detail:

old_sound_rabbit_mq:
    connections:
        default:
            # nevermind
        secure:
            url: 'amqps://%env(RABBITMQ_USER)%:%env(RABBITMQ_PASSWORD)%@%env(RABBITMQ_HOST)%:%env(RABBITMQ_PORT)%'
            vhost: '%env(RABBITMQ_VHOST)%'
            lazy: true
            connection_timeout: 6
            read_write_timeout: 6
            ssl_context:
                verify_peer: false
            keepalive: true
            heartbeat: 60
            use_socket: true # default false
    producers:
        # use 'old_sound_rabbit_mq.[my-producer-name]_producer' service to send data.
        article_create:
            connection: secure
            exchange_options: { name: pimcore.article.create, type: topic }
            queue_options:
                name: article_create

Note that we use the "url" config value because it seems like it is the only way to set AMQPS for the bundle.

The relevant parts of docker-compose.yml:

    php-fpm:
        container_name: php-fpm
        environment:
            - RABBITMQ_HOST=my-rabbitmq # usually the docker container (otherwise localhost or server address)
            - RABBITMQ_PORT=5671 # locally it seems to work with 5672
            - RABBITMQ_USER=user
            - RABBITMQ_PASSWORD=password
            - RABBITMQ_VHOST=/
    rabbitmq:
        container_name: my-rabbitmq
        image: rabbitmq:3.8-management
        ports:
            - 127.0.0.1:15672:15672
            - 127.0.0.1:5672:5672
            - 127.0.0.1:5671:5671
        environment:
            - RABBITMQ_DEFAULT_USER=pimcore
            - RABBITMQ_DEFAULT_PASS=pimcore
        volumes:
            - rabbitmq:/var/lib/rabbitmq

This is how the messages are sent from a Symfony Event Listener:

            $this->appKernel->getContainer()->get('old_sound_rabbit_mq.article_create_producer')->publish(
                serialize($objToSend),
                "article_create",
                [
                    'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
                    'message_id' => md5(uniqid("prefix", true))
                ]
            );

It all seems to work locally without any issues. In AWS we get the following errors:

Without sockets:

Broken pipe or closed connection

With sockets:

[2021-06-08 15:59:45] request.CRITICAL: Uncaught PHP Exception ErrorException: "Warning: socket_recv(): unable to read from socket [104]: Connection reset by peer" at /var/www/vendor/php-amqplib/php-amqplib/PhpAmqpLib/Wire/IO/SocketIO.php line 121 {"exception":"[object] (ErrorException(code: 0): Warning: socket_recv(): unable to read from socket [104]: Connection reset by peer at /var/www/vendor/php-amqplib/php-amqplib/PhpAmqpLib/Wire/IO/SocketIO.php:121)"} []

Solution

Finally solved - you have to define a custom AMQPChannel with a custom AMQPConnection with SSL options and then set this AMQPChannel to the producer:

// SSL
if ($port === 5671) {
    $sslOptions = array(
        'cafile' => CERTS_PATH . '/ca_certificate.pem',
        'local_cert' => CERTS_PATH . '/client_certificate.pem',
        'local_pk' => CERTS_PATH . '/client_key.pem',
        'verify_peer' => true,
        'verify_peer_name' => false,
    );
    $amqpSslConnection = new AMQPSSLConnection(
        getenv('RABBITMQ_HOST'),
        $port,
        getenv('RABBITMQ_USER'),
        getenv('RABBITMQ_PASSWORD'),
        getenv('RABBITMQ_VHOST'),
        $sslOptions
    );
    $amqpChannel = new AMQPChannel($amqpSslConnection);
    $producer->setChannel($amqpChannel);
    return $producer;
}


Answered By - Blackbam
Read More
  • Share This:  
  •  Facebook
  •  Twitter
  •  Stumble
  •  Digg
Older Posts Home

Total Pageviews

Featured Post

Why Learn PHP Programming

Why Learn PHP Programming A widely-used open source scripting language PHP is one of the most popular programming languages in the world. It...

Subscribe To

Posts
Atom
Posts
All Comments
Atom
All Comments

Copyright © PHPFixing