PHPFixing
  • Privacy Policy
  • TOS
  • Ask Question
  • Contact Us
  • Home
  • PHP
  • Programming
  • SQL Injection
  • Web3.0
Showing posts with label amqp. Show all posts
Showing posts with label amqp. Show all posts

Wednesday, September 21, 2022

[FIXED] How to configure RabbitMq so that multiple apps can manage their own queues and bindings to a shared exchange

 September 21, 2022     amqp, rabbitmq, virtualhost     No comments   

Issue

We have an app that publishing to single exchange (using amqp). Additionally we have a number of apps interested in consuming messages from this exchange. To that end, they create queues and bindings from the queues to the exchange.

We would like to ensure that each app's queues and bindings can only be managed by that app and the user that the app connects with. I envisaged using virtual hosts so that the exchange sat in a /common virtual host which each app's user had read access to and each app's queues and bindings lived in their own /<app> virtual host which the user had full access to.

The documentation, however, suggests that a user cannot access more than one virtual host simultaneously within a channel and the API doesn't provide an option to specify virtual host as part of bindQueue().

Is there a way to achieve this?


Solution

I think to achieve what you asked you need to use federation. I have the same domain problem. One exchange and then different application consuming from different queues. If I am right the step are:

Exchange A => federation To Exchange 1/2/3/4/... these exchange have a different vhost each other.

create different vhost and user for different applications. Give the access to these vhost to different exchange (Exchange 1/2/3/4/)

create different queue to bind from the different Exchange 1/2/3/4/

Is this clear?



Answered By - Danilo
Answer Checked By - Robin (PHPFixing Admin)
Read More
  • Share This:  
  •  Facebook
  •  Twitter
  •  Stumble
  •  Digg

Tuesday, September 20, 2022

[FIXED] What is an elegant way of halting consumption of messages gracefully in the C# client for RabbitMQ?

 September 20, 2022     amqp, c#, consumer, messaging, rabbitmq     No comments   

Issue

I am setting up a standard standalone thread listening to RabbitMQ in C#. Suppose the method for listening in the thread looks like this:

public void Listen()
{
    using (var channel = connection.CreateModel())
    {
        var consumer = SetupQueues(channel);
        while (true)
        {
            var ea = consumer.Queue.Dequeue();    // blocking call
            handler.HandleMessage(channel, ea);
        }
    }
}

What is an elegant way of halting consumption of messages gracefully in the C# client for RabbitMQ? Keep in mind I have found nothing of use in the RabbitMQ examples/docs or these SO questions:

  • How to stop consuming message from selective queue - RabbitMQ
  • How to pause and resume consumption gracefully in rabbitmq, pika python
  • What is the best way to safely end a java application with running RabbitMQ consumers

The issue here is consumer.Queue.Dequeue() is a blocking call. I have tried these options:

  • Calling channel.BasicCancel(string tag). This causes a System.IO.EndOfStreamException in the blocking call. I do not want to use this exception as part of the control flow for obvious reasons.

  • Calling consumer.Queue.Dequeue(int millisecondsTimeout, out T result) and checking a flag in between loop iterations. This can work but seems hacky.

I want to let the thread exit gracefully and clean up any unmanaged resources I might have, so no thread aborting, etc.

Any help is appreciated. Thanks


Solution

The DeQueue with the timeout & flag is the way to do it. It's a very common pattern, and is why many blocking calls are provided with timeout-enabled versions.

Alternately, throwing a (known) exception isn't necessarily a bad thing for control flow. Gracefully shutting down could mean actually trapping the exception, commenting "this is thrown when requesting the channel shuts down", and then returning cleanly. This is how part of TPL works with the CancellationToken.



Answered By - Bryan Boettcher
Answer Checked By - Gilberto Lyons (PHPFixing Admin)
Read More
  • Share This:  
  •  Facebook
  •  Twitter
  •  Stumble
  •  Digg

Monday, July 11, 2022

[FIXED] What are the limits of messages, queues and exchanges?

 July 11, 2022     amqp, message, message-queue, rabbitmq, rabbitmq-exchange     No comments   

Issue

  1. What are the allowed types of messages (strings, bytes, integers, etc.)?
  2. What is the maximum size of a message?
  3. What is the maximum number of queues and exchanges?

Solution

  1. Theoretically anything can be stored/sent as a message. You actually don't want to store anything on the queues. The system works most efficiently if the queues are empty most of the time. You can send anything you want to the queue with two preconditions:

    • The thing you are sending can be converted to and from a bytestring
    • The consumer knows exactly what it is getting and how to convert it to the original object

    Strings are pretty easy, they have a built in method for converting to and from bytes. If you know it is a string then you know how to convert it back. The best option is to use a markup string like XML, JSON, or YML. This way you can convert objects to Strings and back again to the original objects; they work across programming languages so your consumer can be written in a different language to your producer as long as it knows how to understand the object. I work in Java. I want to send complex messages with sub objects in the fields. I use my own message object. The message object has two additional methods toBytes and fromBytes that convert to and from the bytestream. I use routing keys that leave no doubt as to what type of message the consumer is receiving. The message is Serializable. This works fine, but is limiting as I can only use it with other Java programs.

  2. The size of the message is limited by the memory on the server, and if it is persistent then also the free HDD space too. You probably do not want to send messages that are too big; it might be better to send a reference to a file or DB.

    You might also want to read up on their performance measures: http://www.rabbitmq.com/blog/2012/04/17/rabbitmq-performance-measurements-part-1/ http://www.rabbitmq.com/blog/2012/04/25/rabbitmq-performance-measurements-part-2/

  3. Queues are pretty light weight, you will most likely be limited by the number of connections you have. It will depend on the server most likely. Here is some info on a similiar question: http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/2009-February/003042.html



Answered By - robthewolf
Answer Checked By - Dawn Plyler (PHPFixing Volunteer)
Read More
  • Share This:  
  •  Facebook
  •  Twitter
  •  Stumble
  •  Digg

Thursday, May 12, 2022

[FIXED] How to manage Messenger component memory in Symfony 5.3

 May 12, 2022     amqp, php, rabbitmq, symfony     No comments   

Issue

I use for my project in Symfony 5.3, the Messenger Component with a RabittMQ server. I want to manage my memory of my MessageHandler because my code is taking too much memory (Fatal error: Allowed memory size of 2147483648 bytes exhausted (tried to allocate 33554440 bytes).

For each message consumed, I feel like the MessageHandler retains the memory of the previous MessageHandler. This is my class where I run a command:

class MessageHandler implements MessageHandlerInterface
{

    private KernelInterface $kernel;

    public function __construct(KernelInterface $kernel)
    {
        $this->kernel = $kernel;
    }

    /**
     * @param RequestMessage $requestMessage
     * @throws \Exception
     */
    public function __invoke(RequestMessage $requestMessage)
    {
        $application = new Application($this->kernel);
        $application->setAutoExit(false);

        $input = new ArrayInput([
            'command' => 'app:my-command',
            'userId' => $requestMessage->getUserId(),
            '--no-debug' => ''
        ]);


        $output = new BufferedOutput();
        $application->run($input, $output);
    }
}

And I consume my messages with this command:

$ php bin/console messenger:consume -vv

I am looking for a solution to consume each of my messages with an independent memory. I don't know where the problem is, if someone can help me.

I can think of a memory leak but I don't understand why the memory of a message is not cleaned.


Solution

First of all, I am aware that I have a memory issue but I have to find a solution.

To run my command, I use Symfony's Process component to run my command in a different process to not overload RAM and run it independently. This is my resolve code:

use Symfony\Component\Process\Process;

$completedCommand = 'php bin/console app:my-command ' . $user->getId() . ' --no-debug';
$process = Process::fromShellCommandline($completedCommand);
$process->run();

if (!$process->isSuccessful()) {
    // Example: Throw an exception...
}


Answered By - Thomas Dulcamara
Answer Checked By - Mildred Charles (PHPFixing Admin)
Read More
  • Share This:  
  •  Facebook
  •  Twitter
  •  Stumble
  •  Digg

Wednesday, April 20, 2022

[FIXED] How to share a single promise based RabbitMQ connection across files or controllers in Node js instead of creating a new Connection each time?

 April 20, 2022     amqp, async-await, connection, node.js, rabbitmq     No comments   

Issue

amqplib library lets you create a rabbitmq connection and that object will be a segue to doing other things such as creating a channel and etc.

suppose that I'm going for a Producer/Consumer pattern, where each time a user hits a specific route, a job is produced and sent to the rabbitmq server where it's processed by certain consumers(workers).

app.post("/routethatdelegatesheavywork", async (req,res) => { 
  const amqpServerLink =
        "link-to-cloudmq";
      const connection = await amqp.connect(amqpServerLink);
      const channel = await connection.createChannel();
      //do other stuff with channel
})

while this "works", but i don't want to re-create that connection every time the controller is invoked since it makes the producer very slow and it's really not how it's supposed to be done.

here is where my problem comes:

how do i initialize one connection and re-use it every time i need it?

i have tried to create a connection outside controllers and use it when necessary but it's not possible since the connection is promise-based and await doesn't work on entry point and it has to be inside an async function to work.

although it is possible to run await without async using ESM (es modules) i don't want to do so since i have written all of the application using CommonJS (require("package")), changing that would require me to go through a lot of files and change every import/export according to ESM.

So, is there any other way to create one connection(that is promise based) and re-use it without having to migrate to ESM syntax?


Solution

Yes, remember that require in nodejs are singletons. Make a new amqpServerInterface module, and do

const amqpServerLink = "link-to-cloudmq"
const connection = amqp.connect(amqpServerLink)

function connect() {
      return connection
}

module.exports = {
     connect
}

Then in your controllers

const amqpServerInterface = require('amqpServerInterface')

app.post("/routethatdelegatesheavywork", async (req,res) => { 
 
      const connection = await amqpServerInterface.connect();
      const channel = await connection.createChannel();
      //do other stuff with channel
})

This will always return the same connection promise and will resolve to the save connection.



Answered By - Rani Sharim
Answer Checked By - Willingham (PHPFixing Volunteer)
Read More
  • Share This:  
  •  Facebook
  •  Twitter
  •  Stumble
  •  Digg
Older Posts Home

Total Pageviews

Featured Post

Why Learn PHP Programming

Why Learn PHP Programming A widely-used open source scripting language PHP is one of the most popular programming languages in the world. It...

Subscribe To

Posts
Atom
Posts
All Comments
Atom
All Comments

Copyright © PHPFixing