Alessandro Lai / @AlessandroLai@phpc.social
phpday 2024 - 16th-17th May 2024
Verona
SELECT FOR UPDATE
query...
But....
ext-amqp
was not stable
$ composer require symfony/messenger
With Symfony Flex, you'll get the recipe
packages/messenger.yaml
framework:
messenger:
failure_transport: failed
transports:
sync: 'sync://'
async: '%env(MESSENGER_TRANSPORT_DSN)%'
failed: 'doctrine://default?queue_name=failed'
routing:
'App\Message\YourMessage': async
# when@test:
# framework:
# messenger:
# transports:
# # replace with your transport name here (e.g., my_transport: 'in-memory://')
# # For more Messenger testing tools, see https://github.com/zenstruck/messenger-test
# async: 'in-memory://'
symfony/amqp-messenger
MESSENGER_TRANSPORT_DSN=amqp://guest:guest@localhost:5672/%2f/messages
Doctrine/DBAL: symfony/doctrine-messenger
MESSENGER_TRANSPORT_DSN=doctrine://default
MongoDB: facile-it/mongodb-messenger-transport
MESSENGER_TRANSPORT_DSN=facile-it-mongodb://my-connection
And many more... you can also use Enqueue sroze/messenger-enqueue-transport
class CompressImage
{
public function __construct(
public readonly int $imageId,
) {}
}
#[AsController]
class FooController
{
public function __construct(
private readonly MessageBusInterface $messageBus,
) { }
#[Route(name: 'dispatch', path: '/dispatch', methods: ['POST'])]
public function dispatch(): Response
{
$this->messageBus->dispatch(new CompressImage(random_int(1, 9999)));
return new Response('Ok');
}
}
#[AsMessageHandler]
class CompressImageHandler
{
public function __invoke(CompressImage $message): void
{
$image = $this->imageRepository->find($message->imageId);
if ($image === null) {
throw new UnrecoverableMessageHandlingException('Image does not exists');
}
// ...
}
}
$ bin/console messenger:consume async -vv
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Stamp\SerializerStamp;
$envelope = (new Envelope($message))
->with(new SerializerStamp([
'groups' => ['my_serialization_groups'],
]));
$bus->dispatch($envelope);
use Symfony\Component\Messenger\Stamp\DelayStamp;
$bus->dispatch($message, [new DelayStamp(3000)]);
# config/packages/messenger.yaml
framework:
messenger:
transports:
async_priority_high:
dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
# default configuration
retry_strategy:
max_retries: 3
# milliseconds delay
delay: 1000
# causes the delay to be higher before each retry
# e.g. 1 second delay, 2 seconds, 4 seconds
multiplier: 2
max_delay: 0
# override all of this with a service that implements
# Symfony\Component\Messenger\Retry\RetryStrategyInterface
service: Facile\Messenger\CustomRetryStrategy
# config/packages/messenger.yaml
framework:
messenger:
# after retrying, messages will be sent to the "failed" transport
failure_transport: failed
transports:
# ... other transports
failed: 'doctrine://default?queue_name=failed'
$ bin/console messenger:failed:show
$ bin/console messenger:failed:retry
$ bin/console messenger:consume failed
$ bin/console messenger:failed:remove 20 30
# config/packages/messenger.yaml
framework:
messenger:
routing:
'Facile\Messenger\Priority0Message': 'priority_0'
'Facile\Messenger\Priority1Message': 'priority_1'
'Facile\Messenger\Priority2Message': 'priority_2'
'Facile\Messenger\Priority3Message': 'priority_3'
'*': 'async'
$ bin/console messenger:consume -vv priority_0 priority_1 priority_2 priority_3
$ bin/console messenger:consume -vv queue_1 queue_2
$ bin/console messenger:consume -vv queue_2 queue_1
$ bin/console messenger:consume -vv async \
--limit=10 \ # after 10 consumed messages
--failure-limit=3 \ # after 3 failures
--time-limit=3600 \ # after 1 hour
--memory-limit=128M # when reaching memory peak
facile-it/terminable-loop-command
command:
- bin/terminable-loop-command.sh
- php
- '-d=memory_limit=256M'
- bin/console
- messenger:consume
- --memory-limit=200M
- -v
- messages
# config/packages/messenger.yaml
framework:
messenger:
serializer:
symfony_serializer:
format: json
context: { }
# by default on all transports
default_serializer: messenger.transport.symfony_serializer
transports:
async_priority_normal:
# on a specific transport
serializer: messenger.transport.symfony_serializer
Force serialization with inmemory
transport
# config/packages/test/messenger.yaml
framework:
messenger:
transports:
async_priority_normal: 'in-memory://?serialize=true'
Check what's being dispatched in functional tests
/** @var InMemoryTransport $transport */
$transport = $this->getContainer()->get('messenger.transport.async_priority_normal');
$this->assertCount(1, $transport->getSent());
$this->assertContainsOnlyInstancesOf(MyMessage::class, $transport->getSent());
abstract class BaseFunctionalTestCase extends WebTestCase
{
protected function ackMessage(): Envelope
{
$transport = $this->getContainer()->get('messenger.transport.async');
$firstEnvelope = array_shift($transport->get());
$transport->ack($firstEnvelope);
return $firstEnvelope;
}
protected function assertPostConditions(): void
{
foreach ($this->getAllTransports() as $name => $transport) {
$this->assertEmpty($transport->get(), 'Messages in transport ' . $name);
}
}
}
By default, after 3 retries from the failure transport, the message will be discarded
# config/packages/test/messenger.yaml
framework:
messenger:
transports:
failed:
dsn: 'doctrine://default?queue_name=failed'
retry_strategy:
max_retries: 999999
# or
service: Facile\Messenger\AlwaysRetryStrategy
https://joind.in/talk/92ac5
|
Contacts
|