How to use Symfony messenger to consume RabbitMQ messages (with example)
Coding (Symfony)
A practice approach to Event-Driven Architecture in PHP
I have already explained in more detail what Event-Driven architecture is.
If you are new to this concept and need to read more about it visit A Quick Guide to Event-Driven Architecture.
If you already know enough and want to put your hand in some Symfony code this is the post for you.
In this article, you will learn how to consume messages stored in RabbitMQ using PHP.
We will consume it via Symfony Messenger.
What is Event-Driven Architecture?
The traditional architecture of a system is a classic request/response one.
A service performs an action that involves some other service.
While the second one does its stuff the first need to wait.
That works well but it is not the best way to structure a system.
In an Event-Driven architecture, the flow follows the pattern of events.
When something happens, for instance, an event that requires an action, we store it rather than process it.
In this way, the service we are interacting with does need to stop and wait.
And the second can retrieve the event and perform the action whenever it is ready.
To be able to use this architecture we need to store this event somewhere.
There are several event brokers out there.
The most famous are Apache Kafka, Google Cloud Pub/Sub, and RabbitMQ.
RabbitMQ
RabbitMQ is one of the most popular open-source message brokers.
It supports many messaging protocols.
We can deploy it with many configurations.
It also runs on many operating systems and cloud environments.
The way it works is by creating different channels that can contain many queues.
In turn, these queues can store many events or messages.
The way we are going to use RabbitMQ in this article is to store messages.
Then we are going to retrieve those messages using a PHP application.
If you want to learn more about the language read The Basics of PHP
Since programmers do not want to reinvent the wheel all the time.
We are going to use a PHP framework for our application.
The framework of choice for this article is Symfony.
Symfony
Symfony is one of the most popular PHP frameworks out there.
It consists of sets of components that have their own purpose.
By putting all these components together we do not need to worry about configurations.
Things such as authorization, authentication, routing, etc.
Symfony has already it all built-in inside the application.
I have written a few guides about Symfony, if you want to start with it here is how to Install Symfony 6 on your machine
One of these components and the one we are going to need for this article is Symfony Messenger.
How to Consume RabbitMQ Messages Using Symfony
Symfony makes it easy for a developer to create a structure capable of retrieving messages from a broker.
In our case, we only need 4 pieces to complete the task.
We need:
- the message class is the class that holds the message’s data;
- the handler class is in charge of reading the Message. It also will delegate the action to one or more services;
- the serializer class is where the information gets translated into a readable version for our application to use it;
- transport is what we need for sending and receiving messages;
The Message Class
First thing first, what we want is our message from RabbitMQ.
To make this example clear for everyone we are going to use an item inside our object,
Which will be an order_id and its value.
{'order_id': 1234567}
To deal with this format we need a class with a setter and a getter for the order.
namespace App\Message; class PurchasesNotification { private int $order_id; public function getOrderId(): int { return $this--->order_id; } public function setOrderId(int $order_id) { $this->order_id = $order_id; } }
Pretty straightforward, isn’t it?
The Handler Class
The handler is where the magic happens.
In Symfony, the Handler is a PHP callable.
It is the class that is called when that message is dispatched.
The handler class reads the PurchasesNotification class above.
It then can perform the tasks or delegate them to appropriate services.
namespace App\MessageHandler; use App\Message\PurchasesNotification; use Symfony\Component\Messenger\Attribute\AsMessageHandler; use App\Service\PurchasesService; #[AsMessageHandler] class PurchasesHandler { public function __construct(public PurchasesService $service) {} public function __invoke(PurchasesNotification $message) { $orderId = json_decode((string) $message--->getOrderId(), true); try { $this->service->getPurchases($orderId); } catch (\Exception $exception) { echo $exception->getMessage(); } } }
In this case, what I am doing is getting the order ID from the getOrderId()
method of the Message class.
Then I am invoking a PurchasesService class to deal with it.
The Serializer Class
Unfortunately, this code is not ready to work.
The reason is that PHP cannot serialize it with its standard serializer.
Could not decode message using PHP serialization {...}
To solve this issue we need to create our own serializer class.
This class must have the MessageSerializerInterface and contain 2 methods:
- decode()
- encode()
The first will get the body and the headers from the incoming Rabbit messages and create an Envelope that our application can read.
The latter uses this instance of Envelope to get the message and return an array with the data in a usable format.
namespace App\Serializer; use App\Message\PurchasesNotification; use Symfony\Component\Messenger\Envelope; use Symfony\Component\Messenger\Exception\MessageDecodingFailedException; use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface as MessageSerializerInterface; use Symfony\Component\Serializer\SerializerInterface; class ExternalMessageSerializer implements MessageSerializerInterface { public function __construct(private SerializerInterface $serializer) {} public function decode(array $encodedEnvelope): Envelope { $body = $encodedEnvelope['body']; $headers = $encodedEnvelope['headers']; try { $message = $this--->serializer->deserialize($body, PurchasesNotification::class, 'json'); } catch (\Throwable $throwable) { throw new MessageDecodingFailedException($throwable->getMessage()); } $stamps = []; if (!empty($headers['stamps'])) { $stamps = unserialize($headers['stamps']); } return new Envelope($message, $stamps); } public function encode(Envelope $envelope): array { $message = $envelope->getMessage(); $stamps = $envelope->all(); if ($message instanceof PurchasesNotification) { $data = [ 'order_id' => $message->getOrderId() ]; } else { throw new \Exception(sprintf('Serializer does not support message of type %d.', $message::class)); } return [ 'body' => json_encode($data), 'headers' => [ 'stamps' => serialize($stamps) ] ]; } }
The Transport
The Transport consists of the configuration with RabbitMQ.
We told Symfony how to deal with the incoming message and what to do once that message became an instance of a class we can manage in our system.
What Symfony does not know yet is how to get this message.
In short, we need to tell it that RabbitMQ exists and it is ready to send messages.
The way we do this is via a messenger.yaml file inside the config/packages
directory.
framework: messenger: serializer: default_serializer: App\Serializer\ExternalMessageSerializer transports: external_messages: '%env(MESSENGER_TRANSPORT_DSN)%/queue_order' serializer: App\Serializer\ExternalMessageSerializer routing: 'App\Message\PurchasesNotification': external_messages
Now, Symfony knows that an external message with that DNS is waiting for it.
The MESSENGER_TRANSPORT_DSN
environment variable would look something like this below.
MESSENGER_TRANSPORT_DSN=amqp://guest:guest@rabbit:5672/%2f
Symfony now also knows that after a message has arrived it has to use our bespoke ExternalMessageSerializer
to translate it.
And that the message’s format is equal to the one of the PurchasesNotification
class.
There would be more information we can add from here but for this basic article, I’ll prefer to keep it simple.
Listen to messages
Now that all the pieces of the puzzle have been completed we are ready to run the command that listens to RabbitMQ.
Given our transport name is external_messages this is what we need to consume.
From the terminal, cd at the root of your project and type the following command:
bin/console messenger:consume external_messages
Everything should have worked.
Your messages should have disappeared from the broker queue.
In fact, the service has processed them.
Conclusion
You have seen how to make a Symfony application receive messages from a message broker.
An Event-driven architecture has many pros in them of security and reliability of the data.
And using this PHP framework is also very easy to put in place.
After all, we need to create or update only 4 files.
Have you ever used Event-driven architecture in your application?
If you want to know more about this topic have a read at A Quick Guide to Event-Driven Architecture.