Alessandro Lai / @AlessandroLai@phpc.social
                
                        phpday 2024 - 16th-17th May 2024
                        Verona
                
            
 
                SELECT FOR UPDATE query...
                     
                     
                    But....
ext-amqp was not stable
                         
                 
                
            $ composer require symfony/messengerpackages/messenger.yaml
                framework:
    messenger:
        failure_transport: failed
        transports:
            sync: 'sync://'
            async: '%env(MESSENGER_TRANSPORT_DSN)%'
            failed: 'doctrine://default?queue_name=failed'
        routing:
            'App\Message\YourMessage': async
# when@test:
#    framework:
#        messenger:
#            transports:
#                # replace with your transport name here (e.g., my_transport: 'in-memory://')
#                # For more Messenger testing tools, see https://github.com/zenstruck/messenger-test
#                async: 'in-memory://'
symfony/amqp-messenger
                MESSENGER_TRANSPORT_DSN=amqp://guest:guest@localhost:5672/%2f/messagessymfony/doctrine-messenger
                MESSENGER_TRANSPORT_DSN=doctrine://defaultfacile-it/mongodb-messenger-transport
                MESSENGER_TRANSPORT_DSN=facile-it-mongodb://my-connectionsroze/messenger-enqueue-transport
            class CompressImage
{
    public function __construct(
        public readonly int $imageId,
    ) {}
}#[AsController]
class FooController
{
    public function __construct(
        private readonly MessageBusInterface $messageBus,
    ) { }
    #[Route(name: 'dispatch', path: '/dispatch', methods: ['POST'])]
    public function dispatch(): Response
    {
        $this->messageBus->dispatch(new CompressImage(random_int(1, 9999)));
        
        return new Response('Ok');
    }
}#[AsMessageHandler]
class CompressImageHandler
{
    public function __invoke(CompressImage $message): void
    {
        $image = $this->imageRepository->find($message->imageId);
        
        if ($image === null) {
            throw new UnrecoverableMessageHandlingException('Image does not exists');
        }
        
        // ...
    }
}$ bin/console messenger:consume async -vvuse Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Stamp\SerializerStamp;
$envelope = (new Envelope($message))
    ->with(new SerializerStamp([
        'groups' => ['my_serialization_groups'],
    ]));
                    
$bus->dispatch($envelope);use Symfony\Component\Messenger\Stamp\DelayStamp;
$bus->dispatch($message, [new DelayStamp(3000)]);# config/packages/messenger.yaml
framework:
    messenger:
        transports:
            async_priority_high:
                dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
                # default configuration
                retry_strategy:
                    max_retries: 3
                    # milliseconds delay
                    delay: 1000
                    # causes the delay to be higher before each retry
                    # e.g. 1 second delay, 2 seconds, 4 seconds
                    multiplier: 2
                    max_delay: 0
                    # override all of this with a service that implements
                    # Symfony\Component\Messenger\Retry\RetryStrategyInterface
                    service: Facile\Messenger\CustomRetryStrategy# config/packages/messenger.yaml
framework:
    messenger:
        # after retrying, messages will be sent to the "failed" transport
        failure_transport: failed
        transports:
            # ... other transports
            failed: 'doctrine://default?queue_name=failed'$ bin/console messenger:failed:show
$ bin/console messenger:failed:retry
$ bin/console messenger:consume failed
$ bin/console messenger:failed:remove 20 30# config/packages/messenger.yaml
framework:
    messenger:
        routing:
            'Facile\Messenger\Priority0Message': 'priority_0'
            'Facile\Messenger\Priority1Message': 'priority_1'
            'Facile\Messenger\Priority2Message': 'priority_2'
            'Facile\Messenger\Priority3Message': 'priority_3'
            '*': 'async'
$ bin/console messenger:consume -vv priority_0 priority_1 priority_2 priority_3$ bin/console messenger:consume -vv queue_1 queue_2
$ bin/console messenger:consume -vv queue_2 queue_1$ bin/console messenger:consume -vv async \
    --limit=10 \         # after 10 consumed messages
    --failure-limit=3 \  # after 3 failures
    --time-limit=3600 \  # after 1 hour
    --memory-limit=128M  # when reaching memory peakfacile-it/terminable-loop-command
                
                command:
 - bin/terminable-loop-command.sh
 - php
 - '-d=memory_limit=256M'
 - bin/console
 - messenger:consume
 - --memory-limit=200M
 - -v
 - messages
# config/packages/messenger.yaml
framework:
    messenger:
        serializer:
            symfony_serializer:
                format: json
                context: { }
            # by default on all transports
            default_serializer: messenger.transport.symfony_serializer
        transports:
            async_priority_normal:
                # on a specific transport
                serializer: messenger.transport.symfony_serializer
Force serialization with inmemory transport
# config/packages/test/messenger.yaml
framework:
    messenger:
        transports:
            async_priority_normal: 'in-memory://?serialize=true'Check what's being dispatched in functional tests
/** @var InMemoryTransport $transport */
$transport = $this->getContainer()->get('messenger.transport.async_priority_normal');
$this->assertCount(1, $transport->getSent());
$this->assertContainsOnlyInstancesOf(MyMessage::class, $transport->getSent());abstract class BaseFunctionalTestCase extends WebTestCase
{                    
    protected function ackMessage(): Envelope
    {
        $transport = $this->getContainer()->get('messenger.transport.async');
        $firstEnvelope = array_shift($transport->get());
        $transport->ack($firstEnvelope);
        return $firstEnvelope;
    }
    protected function assertPostConditions(): void
    {
        foreach ($this->getAllTransports() as $name => $transport) {
            $this->assertEmpty($transport->get(), 'Messages in transport ' . $name);
        }
    }
}By default, after 3 retries from the failure transport, the message will be discarded
# config/packages/test/messenger.yaml
framework:
    messenger:
        transports:
            failed:
                dsn: 'doctrine://default?queue_name=failed'
                retry_strategy:
                    max_retries: 999999
                    # or
                    service: Facile\Messenger\AlwaysRetryStrategy| https://joind.in/talk/92ac5 | Contacts
 |