Symfony Messenger Cheat Sheet
如果无法正常显示,请先停止浏览器的去广告插件。
1. By https://andreiabohner.org
Messenger
Install
$ composer require symfony/messenger
4.4
HEADS UP!
Middleware are called twice:
Implement MiddlewareInterface
You can create your
own middleware!
Middleware
Can access the message
and its wrapper (the envelope)
while it’s dispatched through
the bus
Envelope
Built-in middlewares:
SendMessageMiddleware
Enables async processing,
logs the processing of messages
if a logger is passed.
HandleMessageMiddleware Calls the registered handler(s).
Wrap messages into the message bus,
allowing to add useful info inside through
envelope stamps
Add metadata or some
config to the message
When you pass your message to the bus,
internally, it gets wrapped inside an Envelope
e
Messag
- when a message is dispatched
- when the worker receives a
message from the transport, it
passes that message back into
the bus and the middleware are
called again
Put a message in a Envelope
$envelope = new Envelope ($message, [
new DelayStamp (9000)
Add Stamps
]);
$messageBus->dispatch($envelope);
Implement StampInterface
Stamp
Attach extra config
to the envelope
Receive
dStamp
eStamp
BusNam
d.bus
comman
Piece of info to attach to the message:
any sort of metadata/config that the
middleware or transport layer may use.
To see on the web debug toolbar which stamps have been
applied to the envelope, use dump():
dump($messageBus->dispatch($envelope));
Message Bus
The bus is a
collection of
middleware
Dispatch messages
to the
Handler
Can be:
- Command Bus
Usually don't provide any results.
Should have exactly one handler.
Command Bus
- Query Bus
a PHP callable
Dispatch the Envelope
(with the message)
back to the message bus
Handle messages using the business
logic you defined.
Called by HandleMessageMiddleware
Used to get info back from the handler.
Rarely async.
- Event Bus
Dispatched after something happens.
Can have zero to many handlers.
Execute middleware again!
Can have multiple middleware
Transport
Consume messages from a transport
(reads a message off of a queue)
Transport's serializer transforms into an Envelope
object with the message object inside
Used t o send & receive
messages that will not be handled
immediatly. Usually a queue & will
be responsible for communicating
with a message broker or 3rd parties
Worker
$ php bin/console messenger:consume async
Built-in Stamps
SerializerStamp configure the serialization groups used by the transport
ValidationStamp configure the validation groups used when the validation middleware is enabled
ReceivedStamp added when a message is received from a transport
SentStamp marks the message as sent by a specific sender. Allows accessing the sender FQCN & the alias if available from the SendersLocator
SentToFailureTransportStamp applied when a message is sent to the failure transport
HandledStamp marks the message as handled by a specific handler. Allows accessing the handler returned value and the handler name
DelayStamp delay delivery of your message on a transport
RedeliveryStamp applied when a messages needs to be redelivered
BusNameStamp used to identify which bus it was passed to
TransportMessageIdStamp added by a sender or receiver to indicate the id of this message in that transport
2. Messenger
Run async code
with queues & workers
By https://andreiabohner.org
4.4
HEADS UP!
Pass always the smallest amount of
info to the message!
Class that
holds data
Create a Message
E.g.: if you need to pass a Doctrine entity in a message,
pass the entity's primary key (or any other relevant info,
e.g. email)
// src/Message/SmsNotification.php
namespace App\Message;
class SmsNotification
{
private $content;
public function __construct(string $content, int $userId)
{
$this->content = $content;
$this->userId = $userId;
}
public function getUserId(): int
{
return $this->userId;
}
public function getContent(): string
{
return $this->content;
}
}
Create a Handler
Class that will be called when the message is dispatched.
Read the message class & perform some task.
// src/MessageHandler/SmsNotificationHandler.php
namespace App\MessageHandler;
Tells Symfony that this
use App\Message\SmsNotification;
is a message handler
use App\Repository\UserRepository;
use Symfony\Component\Messenger\Handler\MessageHandlerInterface;
class SmsNotificationHandler implements MessageHandlerInterface
{
private $userRepository;
The handler should:
public function __construct(UserRepository $userRepository)
{
$this->userRepository = $userRepository;
} 1 - implement MessageHandlerInterface
2 - create an __invoke() method with
one argument that's type-hinted
with the message class (or interface)
public function __invoke (SmsNotification $message)
{
$user = $this->userRepository->find($message->getUserId()); Follow these 2 rules & Symfony will find
& register the handler automatically
// ...
}
}
Query for a fresh object
here, on the handler
Manually Configuring a Handler
# config/services.yaml
Restrict this handler
services:
to the command bus
App\MessageHandler\SmsNotificationHandler:
tags: [{ name: messenger.message_handler, bus: command.bus }]
# or configure with options
tags:
-
Only needed if
name: messenger.message_handler
can't be guessed
handles: App\Message\SmsNotification
by type-hint
bus: command.bus
Prevent handlers from
being registered twice
autoconfigure: false
Options:
- bus
- from_transport
- handles
- method
- priority
3. By https://andreiabohner.org
Messenger
HEADS UP!
4.4
By default all messages
are sent sync (i.e: as soon
as they are dispatched)
unless you configure a route/transport for it
so you can send async/queued messages
Dispatch the Message
Call the bus:
inject the message_bus service
(via MessageBusInterface ),
e.g. in a controller
use Symfony\Component\Messenger\ MessageBusInterface ;
class DefaultController extends AbstractController
{
public function index( MessageBusInterface $bus)
{
$bus ->dispatch (new SmsNotification('A message!'));
If you pass a raw message here,
// or use the shortcut
by default, the dispatch() method
$this ->dispatchMessage (new SmsNotification('A message!'));
wraps it in an Envelope
// ...
Call SmsNotificationHandler
For async/queued
messages!
Route Messages to a Transport
A transport is registered using a "DSN" in your .env file
#.env
# MESSENGER_TRANSPORT_DSN=amqp://guest:guest@localhost:5672/%2f/messages
# MESSENGER_TRANSPORT_DSN=doctrine://default
# MESSENGER_TRANSPORT_DSN=redis://localhost:6379/messages
# config/packages/messenger.yaml
If you want the message not to be sent
framework:
immediately configure a transport to send to it
messenger:
(a transport tells where to send/read messages)
transports:
Could be any
async: ' %env(MESSENGER_TRANSPORT_DSN)% '
desired name
async_priority_high:
dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
options:
queue_name is specific to
queue_name: high
the doctrine transport
#exchange:
For amqp send to a separate
#
name: high
exchange then queue
#queues:
#
messages_high: ~
# or redis try "group"
async_priority_low:
dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
options:
queue_name: low
Send messages to
transports & its
Route all messages that extend
routing:
handler(s)
this base class to async transport
'App\Message\SmsNotification' : async
Add all your
message class
that need to be
async here
'App\Message\AbstractAsyncMessage' : async
'App\Message\AsyncMessageInterface' : async
'My\Message\ToBeSentToTwoSenders' : [async, async_high_priority]
Consume Messages (Run the Worker)
A command that "handles" messages from a queue is called a "worker”
This is your
worker!
Route all messages that implement
this interface to async transport
$ php bin/console messenger:consume async
-vv
Send messages to
multiple transports
By default, the command will run forever:
looking for new messages on your transport
and handling them
-vv show details about
what is happening
Name of the transport you defined
This is how
to Prioritize
Transports
$ php bin/console messenger:consume async_priority_high async_priority_low
Instruct the worker to handle messages in a priority order:
The worker will always first look for messages waiting on async_priority_high
If there are none, then it will consume messages from async_priority_low
A worker can read from
one or many transports
4. Use CQRS - Command Query Responsibility Segregation
By https://andreiabohner.org
Messenger
4.4
Messenger Configuration
# config/packages/messenger.yaml
framework:
Send failed messages to the transport
defined here (for later handling)
messenger:
failure_transport: failed
The bus that is going to be injected
default_bus: command.bus
when injecting MessageBusInterface
buses:
command.bus:
In case of error, a new connection is opened
autowireable with the
middleware:
MessageBusInterface
The connection is open before your handler & closed
type-hint (because this
- doctrine_ping_connection
immediately afterwards instead of keeping it open forever
is the default_bus)
- doctrine_close_connection
Wrap your handler in a single Doctrine transaction
so your handler doesn’t need to call flush().
- doctrine_transaction
An error will rollback automatically
- doctrine_clear_entity_manager
autowireable with
Cleans the entity manager
MessageBusInterface $queryBus query.bus:
before sending it to your handler
middleware:
- validation
autowireable with
event.bus:
MessageBusInterface $eventBus
default_middleware: allow_no_handlers
middleware:
validate the message object itself
- validation
Route messages
through Symfony's validator
that have to go
through the
message queue
routing:
'App\Message\MyMessage' : amqp
Send messages
(e.g. to a
queueing system)
& receive them
via a worker
Supports multiple transports, e.g.:
- doctrine
- inmemory
- redis
- sync
transports:
amqp: enqueue://default
async:
dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
If handling a message
fails 3 times
failed :
(default max_retries),
dsn: 'doctrine://default?queue_name=failed'
it will then be sent to
async_priority_high:
the failed transport
dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
retry_strategy:
If a message fails it’s retried multiple
times (max_retries) & then will be discarded
max_retries: 3
Configuring
Milliseconds
delay
delay: 1000
Retries & Failures
Causes
the
delay to be higher before each
multiplier: 2
for this
retry. E.g. 1 second delay, 2 seconds, 4 seconds
transport
max_delay: 0
Override all of this with a service that
# service: null
implements Symfony\Component\Messenger\Retry\RetryStrategyInterface
Define the serializer
for this transport
serializer: messenger.transport.symfony_serializer
serializer:
Built-in service
that uses the
default_serializer: messenger.transport.symfony_serializer
Define the global serializer.
Serializer component
symfony_serializer:
When messages are
sent/received to a transport,
format: json
they're serialized using
context:
{ }
Handling Messages Synchronously using a Transport
PHP's native serialize() &
unserialize() functions
by default
# config/packages/messenger.yaml
framework:
messenger:
transports:
# ... other transports
sync: 'sync://'
routing:
App\Message\SmsNotification: sync
5. Messenger
send & receive messages to/from
other apps or via message queues
By https://andreiabohner.org
Transports
4.4
Each transport has a number of different connection options and there are 2 ways to pass them:
2 - via the options key under the transport in messenger.yaml
1 - via the DSN, as query parameters
# .env
MESSENGER_TRANSPORT_DSN=redis://localhost:6379/messages
# config/packages/messenger.yaml
framework:
messenger:
# or with options:
transports:
MESSENGER_TRANSPORT_DSN=redis://password@localhost:6379/messages/symfony/consumer?
async: “%env(MESSENGER_TRANSPORT_DSN)%"
auto_setup=true&
dsn: "%env(MESSENGER_TRANSPORT_DSN)%"
serializer=1&
HEADS UP!
options:
stream_max_entries=0&
auto_setup: true
dbindex=0
Options defined under “options”
serializer: 1
key take precedence over ones
stream_max_entries: 0
dbindex: 0
defined in the DSN
AMPQ
# .env
MESSENGER_TRANSPORT_DSN=amqp://guest:guest@localhost:5672/%2f/messages
Need the AMQP
PHP extension
Options
# config/packages/messenger.yaml
framework:
messenger:
It’s possible to configure AMQP-specific settings on your message by
transports:
adding AmqpStamp to your Envelope:
async:
dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpStamp; A
mqpSta
options:
m
p
// ...
exchange:
name: messages
$attributes = [];
type: direct
$bus->dispatch(new SmsNotification(), [
default_publish_routing_key: normal
new AmqpStamp('routing-key-name’, AMQP_NOPARAM, $attributes)
queues:
]);
messages_normal:
binding_keys: [normal]
exchange:
host Hostname of the AMQP service. port Port of the AMQP service. name Name of the exchange. (Default: messages)
vhost Virtual Host to use with the AMQP service. type Type of exchange. Possible types: fanout, direct,
user Username to connect the the AMQP service.
password Password to connect to the AMQP service.
auto_setup
Enable or not the auto-setup of queues &
exchanges. (Default: true)
prefetch_count
topic, header. (Default: fanout)
default_publish_routing_key Routing key to use when publishing, if none is specified
on the message.
flags
Set channel prefetch count.
Exchange flags. Possible flags: AMQP_DURABLE,
AMQP_PASSIVE, AMQP_AUTODELETE, ...
(Default: AMQP_DURABLE)
queues:
arguments
queues[name] An array of queues, keyed by the name.
binding_keys The binding keys (if any) to bind to this queue.
binding_arguments Arguments to be used while binding the queue.
flags Queue flags. (Default: AMQP_DURABLE)
arguments Extra arguments.
Extra arguments.
delay:
queue_name_pattern
Pattern to use to create the queues. (Default:
“delay_%exchange_name%_%routing_key%_%delay%")
exchange_name
Name of the exchange to be used for the
delayed/retried messages. (Default: "delays")
Options
Doctrine
Use to store
messages in a
database table
# .env
MESSENGER_TRANSPORT_DSN=doctrine://default
# config/packages/messenger.yaml
Connection name
framework:
messenger:
transports:
async_priority_high: ' %env(MESSENGER_TRANSPORT_DSN)%?queue_name=high_priority '
async_normal:
dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
options:
queue_name: normal_priority
table_name Name of the table. (Default: messenger_messages)
queue_name Name of the queue (a column in the table, to use one table for multiple transports). (Default: default)
redeliver_timeout
Timeout before retrying a message that's in the queue but in the "handling" state (if a worker died for some reason, this
will occur, eventually you should retry the message) - in seconds. (Default: 3600)
auto_setup
When the transport
is first used it will
create a table named
messenger_messages
If the table “messenger_messages” should be created automatically during send/get. (Default: true)
6. By https://andreiabohner.org
Messenger
4.4
Options
Redis
Use streams to queue messages.
Need the Redis PHP
extension (>=4.3) &
a Redis server (^5.0)
stream
# .env
MESSENGER_TRANSPORT_DSN=redis://localhost:6379/messages
# Full DSN Example
MESSENGER_TRANSPORT_DSN=redis://password@localhost:6379/messages/symfony/consumer?
auto_setup=true&
serializer=1&
stream_max_entries=0&
dbindex=0
Redis stream name. (Default: messages)
group Redis consumer group name. (Default: symfony)
consumer Consumer name used in Redis. (Default: consumer)
auto_setup Create the Redis group automatically? (Default: true)
auth Redis password.
serializer How to serialize the final payload in Redis (Redis::OPT_SERIALIZER option). (Default: Redis::SERIALIZER_PHP)
stream_max_entries Maximum number of entries which the stream will be trimmed to. Set it to a large enough number to avoid losing pending messages
(which means "no trimming").
dbindex
In-memory
Useful for
tests!
Use this config only
in test environment
Doesn’t actually delivery messages. Instead, it holds them in memory during the request.
All in-memory transports will be reset automatically after each test in test classes
extending KernelTestCase or WebTestCase.
# config/packages/test/messenger.yaml
framework:
messenger:
transports:
async_priority_normal: 'in-memory://'
use Symfony\Component\Messenger\Transport\InMemoryTransport;
class ImagePostControllerTest extends WebTestCase
{
Method to call on a transport to get
public function testCreate()
the sent, or "queued" messages
{
$transport = self::$container->get('messenger.transport.async_priority_normal’);
$this->assertCount(1, $transport->get());
}
}
Sync
Useful when
developing!
Instead of sending each message to an external
queue, it just handles them immediately.
They're handled synchronously.
Use this config only
in dev environment
# config/packages/dev/messenger.yaml
framework:
messenger:
transports:
async: 'sync://'
async_priority_high: 'sync://'
Console
$ php bin/console messenger:consume async
Run the consumer (worker). Fetch & deserialize each message back into PHP,
then pass it to the message bus to be handled.
--time-limit=3600
Run the command for xx minutes and then exit.
--memory-limit=128M Exit once its memory usage is above a certain level.
--limit =20
Run a specific number of messages and then exit.
$ php bin/console debug:messenger List available messages and handlers per bus.
$ php bin/console messenger:setup-transports Configure the transports. E.g. table name in doctrine transport.
$ php bin/console messenger:stop-workers
Sends a signal to stop any messenger:consume processes that are running. Each worker
command will finish the message they are currently processing and then exit.
Failed transport
These commands are available
when the "failure_transport"
is configured
Worker commands are not automatically restarted.
$ php bin/console messenger:failed:show See all messages in the failure transport.
$ php bin/console messenger:failed:show 20 -vv See details about a specific failure.
$ php bin/console messenger:failed:retry -vv View and retry messages one-by-one.
$ php bin/console messenger:failed:retry -vv --force View and retry messages one-by-one without asking.
$ php bin/console messenger:failed:retry 20 30 --force Retry specific messages.
$ php bin/console messenger:failed:remove 20 Remove a message without retrying it.
7. By https://andreiabohner.org
Messenger
Install Enqueue
$ composer require sroze/messenger-enqueue-transport
4.4
Enable Enqueue
Other Transports
(Available via Enqueue)
// config/bundles.php
return [
Enable the bundle
// ...
Enqueue\MessengerAdapter\Bundle\EnqueueAdapterBundle::class => ['all' => true],
];
enqueue://default
?queue[name]=queue_name
&topic[name]=topic_name
&deliveryDelay=1800
&delayStrategy=Enqueue\AmqpTools\RabbitMqDelayPluginDelayStrategy
&timeToLive=3600
&receiveTimeout=1000
&priority=1
Enqueue extra
options
Default configuration
# config/packages/enqueue.yaml
enqueue:
transport: Accept a string DSN, an array with DSN key, or null
dsn:
~
MQ broker DSN,
Should implement "Interop\Queue\ConnectionFactory"
connection_factory_class: ~
E.g. amqp, sqs,
gps, ...
Should implement
factory_service:
~
“Enqueue\ConnectionFactoryFactoryInterface"
factory_class:
~
consumption:
Time in milliseconds
receive_timeout:
10000
queue consumer waits
for a message (default: 100ms)
client:
traceable_producer:
true
prefix:
enqueue
separator:
.
app_name:
app
router_topic:
default
router_queue:
default
router_processor:
null
redelivered_delay_time: 0
default_queue:
default
Contains driver specific options
driver_options:
[]
monitoring: Accept a string DSN, an array with DSN key, or null.
Stats storage DSN.
dsn:
~
Schemes supported:
storage_factory_service:
~
Should implement
"wamp", "ws", "influxdb”
“Enqueue\Monitoring\StatsStorageFactory"
storage_factory_class: ~
async_commands:
enabled:
false
timeout:
60
command_name:
~
queue_name:
~
job:
enabled:
false
async_events:
enabled:
false
extensions:
doctrine_ping_connection_extension: false
doctrine_clear_identity_map_extension: false
doctrine_odm_clear_identity_map_extension: false
doctrine_closed_entity_manager_extension: false
reset_services_extension: false
signal_extension:
true
reply_extension:
true
8. By https://andreiabohner.org
Messenger
4.4
Amazon SQS
Add the SQS DSN
// .env
SQS_DSN=sqs:?key=<SQS_KEY>&secret=<SQS_SECRET>®ion=<SQS_REGION>
SQS_QUEUE_NAME=my-sqs-queue-name
Add the queue name as an env var
Install Enqueue SQS
$ composer require enqueue/sqs
Options
// config/packages/messenger.yaml
framework:
messenger:
Add custom settings
transports
for SQS transport
async:
dsn: ' enqueue://default '
options:
receiveTimeout: 20
queue:
name: ' %env(resolve:SQS_QUEUE_NAME)% '
key
AWS credentials. If no credentials are provided, the SDK will attempt to load them from the environment. (Default: null)
secret AWS credentials. If no credentials are provided, the SDK will attempt to load them from the environment. (Default: null)
token AWS credentials. If no credentials are provided, the SDK will attempt to load them from the environment. (Default: null)
region (string, required) Region to connect to. See http://docs.aws.amazon.com/general/latest/gr/rande.html for a list of available
regions. (Default: null)
retries (int) Configures the maximum number of allowed retries for a client (pass 0 to disable retries). (Default: 3)
version (string, required) The version of the webservice to utilize. (Default: '2012-11-05')
lazy Enable lazy connection. (Default: true)
endpoint (string) The full URI of the webservice. This is only required when connecting to a custom endpoint e.g. localstack (Default: null)
queue_owner_aws_account_id The AWS account ID of the account that created the queue.
Google Pub/Sub
Install Enqueue Google Pub/Sub
$ composer require enqueue/gps
// .env
MESSENGER_TRANSPORT_DSN=enqueue://gps?projectId=projdev&emulatorHost=http%3A%2F%2Fgoogle-pubsub%3A8085
Options
// config/packages/messenger.yaml
framework:
messenger:
transports
async:
dsn: ' enqueue://gps '
options:
projectId: '%env(GOOGLE_PROJECT_ID)%'
keyFilePath: '%env(GOOGLE_APPLICATION_CREDENTIALS)%'
projectId The project ID from the Google Developer's Console.
keyFilePath The full path to your service account credentials.json file retrieved from the Google Developers Console.
retries Number of retries for a failed request. (Default: 3)
scopes Scopes to be used for the request.
emulatorHost The endpoint used to emulate communication with GooglePubSub.
lazy The connection will be performed as later as possible, if the option set to true.
Apache Kafka
Install Enqueue Kafka
$ composer require enqueue/rdkafka
// .env
MESSENGER_TRANSPORT_DSN=enqueue://default
KAFKA_BROKER_LIST=node-1.kafka.host:9092,node-2.kafka.host:9092,node-3.kafka.host:9092