keyboard

Hugo Soltys

Symfony developer

Since 2013

Queue your Doctrine insertions with RabbitMQ in your Symfony application

Posted on by Hugo - 810 views - 0 comment


RabbitMQ is an open source message broker software (sometimes called message-oriented middleware) that implements the Advanced Message Queuing Protocol (AMQP). The RabbitMQ server is written in the Erlang programming language and is built on the Open Telecom Platform framework for clustering and failover.

In a Symfony project, RabbitMQ can allow you for example to queue your database insertions.

In this article, we will see how to setup a working RabbitMQ server in your Symfony project.


Here we go.

Installing RabbitMQ on a Symfony project is quite simple since there is a bundle dedicated to RabbitMQ and AMQP messaging.

All you have to do is following the steps below.

 

INSTALLING RABBITMQ

 

Open a new terminal and in your working directory type the following command to download the latest stable version :

composer require php-amqplib/rabbitmq-bundle

 

Then enable the bundle by adding it on your AppKernel.php

// app/AppKernel.php

public function registerBundles()
{
    $bundles = array(
        new OldSound\RabbitMqBundle\OldSoundRabbitMqBundle(),
    );
}

 

CONFIGURE RABBITMQ

 

In your config.yml file, add the old_sound_rabbit_mq section

old_sound_rabbit_mq:
    connections:
        default:
            host:     'localhost'
            port:     5672
            user:     'guest'
            password: 'guest'
            vhost:    '/'
            lazy:     false
            connection_timeout: 3
            read_write_timeout: 3

            # requires php-amqplib v2.4.1+ and PHP5.4+
            keepalive: false

            # requires php-amqplib v2.4.1+
            heartbeat: 0

            #requires php_sockets.dll
            use_socket: true # default false
    producers:
        comment:
            connection:       default
            exchange_options: {name: 'comment', type: direct}
    consumers:
        comment:
            connection:       default
            exchange_options: {name: 'comment', type: direct}
            queue_options:    {name: 'comment'}
            callback:         comment_service

 

If you want to see the bundle configuration reference and all the possible options with their explanations, take a look at the bundle's GitHub repository.

 

SOME EXPLANATIONS

 

In a messaging application, the process sending messages to the broker is called producer while the process receiving those messages is called consumer. In your application you will have several of them that you can list under their respective entries in the configuration.

 

USAGE

In our example, we will assume that you have a blog with articles and for each article you have a lot of comments. This incessant flow cause some slowness loading your pages and so you would like use RabbitMQ to queue them.

 

In your controller which save a new comment, we will replace the direct database storing by queuing.

public function commentAction()
{
    $comment = new Comment();
    $comment->setContent('This blog is awesome.')
    $comment->setDate(new \DateTime());
    $comment->setAuthor('Hugo');

    $this->get('old_sound_rabbit_mq.comment_producer')->publish(serialize($comment));
}

 

Your comment is now queued, as you can see if you take a look at the RabbitMQ interface (http://localhost:15672).

 

You must now consume the queued messages to store the comments in your database. To do this we will create our own consumer class like the following one.

 

<?php

namespace AppBundle\Consumer;

use OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface;
use PhpAmqpLib\Message\AMQPMessage;
use Doctrine\Bundle\DoctrineBundle\Registry;

class CommentConsumer implements ConsumerInterface
{
    private $em;

    public function __construct(Registry $doctrine)
    {
        $this->em = $doctrine->getEntityManager();
    }

    public function execute(AMQPMessage $msg)
    {
        return $this->processMessage($msg);
    }

    public function processMessage(AMQPMessage $msg)
    {
        $comment = unserialize($msg);

        $this->em->persist($comment);
        $this->em->flush();
    }
}

 

We must now declare this consumer as our comment_service as we configured it.

<?xml version="1.0" ?>

<container xmlns="http://symfony.com/schema/dic/services"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
           xsi:schemaLocation="http://symfony.com/schema/dic/services http://symfony.com/schema/dic/services/services-1.0.xsd">
    <services>
        <service id="comment_service" class="AppBundle\Consumer\CommentConsumer">
            <argument type="service" id="doctrine" />
        </service>
    </services>
</container>

 

At this point, each comment you will consume will pass by your custom consumer. All that remains is to launch our consumer with the following Symfony commands :

bin/console rabbitmq:setup-fabric
bin/console rabbitmq:consumer comment

 

And that's all. You can now post comments as much as you want, they will automatically be queued and consumed later. 
Also, if for some reason you have to stop the consumption, just stop the consumer and launch it again when your problem is solved. By this way you will never lose your datas.

 

To conclude, I'll say that RabbitMQ can be a very useful helper for large traffic application. You can do a lot of things with it and we'll see in a future article another useful usage example. 

 

Leave a comment if you liked this article or if you have any question.

Thanks for reading,

Hugo.


Hugo Soltys

My name is Hugo, I'm 25 and I'm a Symfony developer since 2013. I love to create websites by myself to learn new technologies or increase my skills.
I love movies, books, music and video games. I also like to drink a few beers with my friends. I'm from Lille (France) and I currently work as Symfony developer at Decathlon since 2016. Before that, I worked as Symfony developer for the IT Room company, in Roubaix, France.