keyboard

Hugo Soltys

Symfony developer

Since 2013

Use RabbitMQ to send your emails asynchronously with Symfony

Posted on by Hugo - 1319 views - 0 Comments


Like I told you in a previous tutorialRabbitMQ can be a very useful tool if you have a website with a lot of traffic.

Today we will see another utility of this tool, which has helped us a lot my team and I at my workplace.

How to send your emails asynchronously with RabbitMQ and Symfony.


Let's get started !

I'm not going to talk about what is RabbitMQ or how to install it cause I already explained it in this tutorial, so I suggest you to read it before this one if you haven't did it yet, I am waiting for you.

 

Done ? Ok let's go.

A NEW RABBITMQ CONSUMER

 

To use RabbitMQ to queuing your emails, you will need a new consumer, so in your config.yml file add the following code.

old_sound_rabbit_mq:
    #...
    producers:
        #...
        mails_queue:
            connection:       default
            exchange_options: {name: 'mails_queue', type: direct}
    consumers:
        #...
        mails_queue: &consumer_mails_queue
            connection:       default
            exchange_options: {name: 'mails_queue', type: direct}
            queue_options:    {name: 'mails_queue'}
            callback:         app.consumer.mails_queue

 

Now that we have configured a new consumer, we have to create the corresponding class which we will declare as a Symfony service called app.consumer.mails_queue like in the bundle configuration.

<?php

namespace AppBundle\Consumer;

use OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface;
use PhpAmqpLib\Message\AMQPMessage;
use Symfony\Component\DependencyInjection\ContainerInterface;

class EmailConsumer
{
    /** @var ContainerInterface $container */
    protected $container;

    /**
     * @param ContainerInterface $container
     */
    public function __construct(ContainerInterface $container)
    {
        $this->container = $container;
    }

    /**
     * @param AMQPMessage $msg
     * @return bool
     */
    public function execute(AMQPMessage $msg)
    {
        return $this->processMessage($msg);
    }

    /**
     * @param AMQPMessage $msg
     * @return int
     */
    public function processMessage(AMQPMessage $msg)
    {
        // To be completed later in the tutorial

        return ConsumerInterface::MSG_ACK;
    }
}

 

And now in your services.xml add our new service

<?xml version="1.0" ?>

<container xmlns="http://symfony.com/schema/dic/services"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
           xsi:schemaLocation="http://symfony.com/schema/dic/services http://symfony.com/schema/dic/services/services-1.0.xsd">

    <parameters>
        <parameter key="email_consumer_class">AppBundle\Consumer\EmailConsumer</parameter>
    </parameters>

    <services>
        <service id="app.consumer.mails_queue" class="%email_consumer_class%">
            <argument type="service" id="service_container" />
        </service>
    </services>
</container>

 

That's it ! Now we we'll have to create our own Swiftmailer spool class.

 

SWIFTMAILER CONFIGURATION

 

For those who don't know, Swiftmailer is the mailing PHP library used in Symfony to send emails. I will not explain in this tutorial what is Swiftmailer and how to use it. If you want to learn more about it, RTFM tongue-out

 

According to the documentation, Swiftmailer allows you to use your own spool class to queue your emails in the way you want. To do this, you just have to edit the Swiftmailer configuration in your config.yml file like below.

swiftmailer:
    # rest of the configuration
    spool:
        type: rabbitmq

 

Swiftmailer now knows that he has to search a service called swiftmailer.mailer.default.spool.rabbitmq to queue the emails that your website is sending.

Now we just have to create this service, and override the default queueMessage method to queue all the emails in our mails_queue consumer.

<?php

namespace AppBundle\Mailer;

use OldSound\RabbitMqBundle\RabbitMq\Consumer;
use OldSound\RabbitMqBundle\RabbitMq\Producer;
use Swift_ConfigurableSpool;
use Swift_Mime_Message;
use Swift_Transport;
use Symfony\Component\DependencyInjection\ContainerInterface;

class RabbitMqSpool extends Swift_ConfigurableSpool
{
    /** @var ContainerInterface $container */
    protected $container;

    public function __construct(ContainerInterface $container)
    {
        $this->container = $container;
    }

    /**
     * Starts this Spool mechanism.
     */
    public function start()
    {
    }

    /**
     * Stops this Spool mechanism.
     */
    public function stop()
    {
    }

    /**
     * Tests if this Spool mechanism has started.
     *
     * @return bool
     */
    public function isStarted()
    {
        return true;
    }

    /**
     * Queues a message.
     *
     * @param Swift_Mime_Message $message The message to store
     *
     * @return bool Whether the operation has succeeded
     */
    public function queueMessage(Swift_Mime_Message $message)
    {
        $serialized = serialize($message);
        $this->getMailProducer()->publish($serialized);
    }

    /**
     * Sends messages using the given transport instance.
     *
     * @param Swift_Transport $transport A transport instance
     * @param string[] $failedRecipients An array of failures by-reference
     *
     * @return int The number of sent emails
     */
    public function flushQueue(Swift_Transport $transport, &$failedRecipients = null)
    {
        return $this->getConsumer()->consume($this->getMessageLimit());
    }

    protected function getConsumer() {
        return $this->container->get('old_sound_rabbit_mq.mails_queue_consumer');
    }


    protected function getMailProducer() {
        return $this->container->get('old_sound_rabbit_mq.mails_queue_producer');
    }
}

 

Now that the PHP is written, let's declare it as a service in your service.xml file.

<?xml version="1.0" ?>

<container xmlns="http://symfony.com/schema/dic/services"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://symfony.com/schema/dic/services http://symfony.com/schema/dic/services/services-1.0.xsd">

    <parameters>
        <parameter key="email_consumer_class">AppBundle\Consumer\EmailConsumer</parameter>
    </parameters>

    <services>
        <service id="app.consumer.mails_queue" class="%email_consumer_class%">
            <argument type="service" id="service_container" />
        </service>

        <service id="swiftmailer.mailer.default.spool.rabbitmq" class="AppBundle\Mailer\RabbitMqSpool" >
            <argument type="service" id="service_container" />
        </service>
    </services>
</container>

 

From this point, every time you will send an email, it will be serialized and stored in your new RabbitMQ queue.

The last step will be to edit the EmailConsumer class to send the emails to their recipients once they are consumed.

 

SEND THE EMAILS

 

Back in your EmailConsumer with the processMessage method.

<?php

namespace AppBundle\Consumer;

use OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface;
use PhpAmqpLib\Message\AMQPMessage;
use Symfony\Component\DependencyInjection\ContainerInterface;

class EmailConsumer
{
    /** @var ContainerInterface $container */
    protected $container;

    /**
     * @param ContainerInterface $container
     */
    public function __construct(ContainerInterface $container)
    {
        $this->container = $container;
    }

    /**
     * @param AMQPMessage $msg
     * @return bool
     */
    public function execute(AMQPMessage $msg)
    {
        return $this->processMessage($msg);
    }

    /**
     * @param AMQPMessage $msg
     * @return int
     */
    public function processMessage(AMQPMessage $msg)
    {
        $message = unserialize($msg->getBody());
        $transport = $this->getTransport();
        $transport->send($message);
        $transport->stop();

        return ConsumerInterface::MSG_ACK;
    }

    /** @return \Swift_Transport  */
    protected function getTransport()
    {
        /** @var \Swift_Transport $swiftTransport */
        $swiftTransport = $this->container->get('swiftmailer.transport.real');

        if (!$swiftTransport->isStarted()) {
            $swiftTransport->start();
        }

        return $swiftTransport;
    }
}

 

And finally, do not forget to create your new queue and to launch your new consumer with the two following Symfony commands.

bin/console rabbitmq:setup-fabric
bin/console rabbitmq:consumer emails_queue

 

As you can see, the code is quite simple. Each time a new message is consumed by our consumer, the processMessage function will unserialize it, call the Swiftmailer transport service and use it to send the message.

The main advantage of using this method to send your emails is the speed of treatment provided by RabbitMQ. To give you an idea, we are able to send approximatively 1500 emails in a few seconds with this trick.

 

I hope you enjoyed this tutorial, like always if you have any question you can ask it in the comment section below, I will answer you quickly laughing

See you soon for a new Symfony tutorial,

Hugo.

 


Hugo Soltys

My name is Hugo, I'm 25 and I'm a Symfony developer since 2013. I love to create websites by myself to learn new technologies or increase my skills.
I love movies, books, music and video games. I also like to drink a few beers with my friends. I'm from Lille (France) and I currently work as Symfony developer at Decathlon since 2016. Before that, I worked as Symfony developer for the IT Room company, in Roubaix, France.


Older articles