Symfony Messenger

The sharpest tool in your PHP toolbox

Alessandro Lai / @AlessandroLai@phpc.social
phpday 2024 - 16th-17th May 2024
Verona

Who am I?

Nearly 10 years in Facile.it..

First project...

  • Simple (but big) Symfony 2 app
  • Classic LAMP stack
  • Bare metal hosting, external sysadmins

We needed a queue

  • Long elaborations
  • Delaying an action/check
  • Handling external deps
  • Concatenation of HTTP calls leads to low reliability

The first experiment: MySQL

  • A dedicated table
  • One row, one job
  • A column acting as a "lock"
  • A SELECT FOR UPDATE query...
  • ...up to 2 hours of delay on replica nodes

A real queue system:

RabbitMQ logo

But....

  • PHP 5.4
  • ext-amqp was not stable
  • OldSoundBundle too
  • So, we wrote our own system

Other projects...

  • New, separate project
  • Different DB (MongoDB)
  • No time to extract a package
Ryan's tweet announcing he has brain cancer

Let's see code!

Installation

$ composer require symfony/messenger
With Symfony Flex, you'll get the recipe
packages/messenger.yaml
framework:
    messenger:
        failure_transport: failed

        transports:
            sync: 'sync://'
            async: '%env(MESSENGER_TRANSPORT_DSN)%'
            failed: 'doctrine://default?queue_name=failed'

        routing:
            'App\Message\YourMessage': async

# when@test:
#    framework:
#        messenger:
#            transports:
#                # replace with your transport name here (e.g., my_transport: 'in-memory://')
#                # For more Messenger testing tools, see https://github.com/zenstruck/messenger-test
#                async: 'in-memory://'

Many available Transports

RabbitMQ: symfony/amqp-messenger
MESSENGER_TRANSPORT_DSN=amqp://guest:guest@localhost:5672/%2f/messages
Doctrine/DBAL: symfony/doctrine-messenger
MESSENGER_TRANSPORT_DSN=doctrine://default
MongoDB: facile-it/mongodb-messenger-transport
MESSENGER_TRANSPORT_DSN=facile-it-mongodb://my-connection
And many more... you can also use Enqueue
through sroze/messenger-enqueue-transport

Dispatching a message

class CompressImage
{
    public function __construct(
        public readonly int $imageId,
    ) {}
}
#[AsController]
class FooController
{
    public function __construct(
        private readonly MessageBusInterface $messageBus,
    ) { }

    #[Route(name: 'dispatch', path: '/dispatch', methods: ['POST'])]
    public function dispatch(): Response
    {
        $this->messageBus->dispatch(new CompressImage(random_int(1, 9999)));
        
        return new Response('Ok');
    }
}

Consuming a message

#[AsMessageHandler]
class CompressImageHandler
{
    public function __invoke(CompressImage $message): void
    {
        $image = $this->imageRepository->find($message->imageId);
        
        if ($image === null) {
            throw new UnrecoverableMessageHandlingException('Image does not exists');
        }
        
        // ...
    }
}
$ bin/console messenger:consume async -vv

Advanced features

Envelope and stamps

use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Stamp\SerializerStamp;

$envelope = (new Envelope($message))
    ->with(new SerializerStamp([
        'groups' => ['my_serialization_groups'],
    ]));
                    
$bus->dispatch($envelope);
use Symfony\Component\Messenger\Stamp\DelayStamp;

$bus->dispatch($message, [new DelayStamp(3000)]);

Automatic retries

# config/packages/messenger.yaml
framework:
    messenger:
        transports:
            async_priority_high:
                dsn: '%env(MESSENGER_TRANSPORT_DSN)%'

                # default configuration
                retry_strategy:
                    max_retries: 3

                    # milliseconds delay
                    delay: 1000

                    # causes the delay to be higher before each retry
                    # e.g. 1 second delay, 2 seconds, 4 seconds
                    multiplier: 2
                    max_delay: 0

                    # override all of this with a service that implements
                    # Symfony\Component\Messenger\Retry\RetryStrategyInterface
                    service: Facile\Messenger\CustomRetryStrategy

Failed transport

# config/packages/messenger.yaml
framework:
    messenger:
        # after retrying, messages will be sent to the "failed" transport
        failure_transport: failed

        transports:
            # ... other transports

            failed: 'doctrine://default?queue_name=failed'
$ bin/console messenger:failed:show

$ bin/console messenger:failed:retry

$ bin/console messenger:consume failed

$ bin/console messenger:failed:remove 20 30

Message routing

# config/packages/messenger.yaml
framework:
    messenger:
        routing:
            'Facile\Messenger\Priority0Message': 'priority_0'
            'Facile\Messenger\Priority1Message': 'priority_1'
            'Facile\Messenger\Priority2Message': 'priority_2'
            'Facile\Messenger\Priority3Message': 'priority_3'
            '*': 'async'

Advanced consumer usages

Using one consumer for multiple queues

One consumer for all queues
$ bin/console messenger:consume -vv priority_0 priority_1 priority_2 priority_3
Two consumers "helping each other"
$ bin/console messenger:consume -vv queue_1 queue_2
$ bin/console messenger:consume -vv queue_2 queue_1

Automatic terminations

$ bin/console messenger:consume -vv async \
    --limit=10 \         # after 10 consumed messages
    --failure-limit=3 \  # after 3 failures
    --time-limit=3600 \  # after 1 hour
    --memory-limit=128M  # when reaching memory peak
facile-it/terminable-loop-command
command:
 - bin/terminable-loop-command.sh
 - php
 - '-d=memory_limit=256M'
 - bin/console
 - messenger:consume
 - --memory-limit=200M
 - -v
 - messages

Pitfalls and limitations

Serialization by default is not JSON

# config/packages/messenger.yaml
framework:
    messenger:
        serializer:
            symfony_serializer:
                format: json
                context: { }

            # by default on all transports
            default_serializer: messenger.transport.symfony_serializer

        transports:
            async_priority_normal:
                # on a specific transport
                serializer: messenger.transport.symfony_serializer

Test tricks 1/2

Force serialization with inmemory transport

# config/packages/test/messenger.yaml
framework:
    messenger:
        transports:
            async_priority_normal: 'in-memory://?serialize=true'

Check what's being dispatched in functional tests

/** @var InMemoryTransport $transport */
$transport = $this->getContainer()->get('messenger.transport.async_priority_normal');

$this->assertCount(1, $transport->getSent());
$this->assertContainsOnlyInstancesOf(MyMessage::class, $transport->getSent());

Test tricks 2/2

Assert that all transports are empty at the end of each (functional) test
abstract class BaseFunctionalTestCase extends WebTestCase
{                    
    protected function ackMessage(): Envelope
    {
        $transport = $this->getContainer()->get('messenger.transport.async');
        $firstEnvelope = array_shift($transport->get());
        $transport->ack($firstEnvelope);

        return $firstEnvelope;
    }

    protected function assertPostConditions(): void
    {
        foreach ($this->getAllTransports() as $name => $transport) {
            $this->assertEmpty($transport->get(), 'Messages in transport ' . $name);
        }
    }
}

A failure transport is still a transport

By default, after 3 retries from the failure transport, the message will be discarded

# config/packages/test/messenger.yaml
framework:
    messenger:
        transports:
            failed:
                dsn: 'doctrine://default?queue_name=failed'
                retry_strategy:
                    max_retries: 999999
                    # or
                    service: Facile\Messenger\AlwaysRetryStrategy

Abstraction hides transport limitations

  • Doctrine is not ideal for high contention
  • SQS has 15 minutes delay limitation
  • RabbitMQ does polling, not fetch

Thanks for your attention!

Questions?

https://joind.in/talk/92ac5 Rate this talk on Joind.in

Contacts