Symfony Messenger Cheat Sheet

如果无法正常显示,请先停止浏览器的去广告插件。
分享至:
1. By https://andreiabohner.org Messenger Install $ composer require symfony/messenger 4.4 HEADS UP! Middleware are called twice: Implement MiddlewareInterface You can create your own middleware! Middleware Can access the message and its wrapper (the envelope) while it’s dispatched through the bus Envelope Built-in middlewares: SendMessageMiddleware Enables async processing, logs the processing of messages if a logger is passed. HandleMessageMiddleware Calls the registered handler(s). Wrap messages into the message bus, allowing to add useful info inside through envelope stamps Add metadata or some config to the message When you pass your message to the bus, internally, it gets wrapped inside an Envelope e Messag - when a message is dispatched - when the worker receives a message from the transport, it passes that message back into the bus and the middleware are called again Put a message in a Envelope $envelope = new Envelope ($message, [ new DelayStamp (9000) Add Stamps ]); $messageBus->dispatch($envelope); Implement StampInterface Stamp Attach extra config to the envelope Receive dStamp eStamp BusNam d.bus comman Piece of info to attach to the message: any sort of metadata/config that the middleware or transport layer may use. To see on the web debug toolbar which stamps have been applied to the envelope, use dump(): dump($messageBus->dispatch($envelope)); Message Bus The bus is a collection of middleware Dispatch messages to the Handler Can be: - Command Bus Usually don't provide any results. Should have exactly one handler. Command Bus - Query Bus a PHP callable Dispatch the Envelope (with the message) back to the message bus Handle messages using the business logic you defined. Called by HandleMessageMiddleware Used to get info back from the handler. Rarely async. - Event Bus Dispatched after something happens. Can have zero to many handlers. Execute middleware again! Can have multiple middleware Transport Consume messages from a transport (reads a message off of a queue) Transport's serializer transforms into an Envelope object with the message object inside Used t o send & receive messages that will not be handled immediatly. Usually a queue & will be responsible for communicating with a message broker or 3rd parties Worker $ php bin/console messenger:consume async Built-in Stamps SerializerStamp configure the serialization groups used by the transport ValidationStamp configure the validation groups used when the validation middleware is enabled ReceivedStamp added when a message is received from a transport SentStamp marks the message as sent by a specific sender. Allows accessing the sender FQCN & the alias if available from the SendersLocator SentToFailureTransportStamp applied when a message is sent to the failure transport HandledStamp marks the message as handled by a specific handler. Allows accessing the handler returned value and the handler name DelayStamp delay delivery of your message on a transport RedeliveryStamp applied when a messages needs to be redelivered BusNameStamp used to identify which bus it was passed to TransportMessageIdStamp added by a sender or receiver to indicate the id of this message in that transport
2. Messenger Run async code with queues & workers By https://andreiabohner.org 4.4 HEADS UP! Pass always the smallest amount of info to the message! Class that holds data Create a Message E.g.: if you need to pass a Doctrine entity in a message, pass the entity's primary key (or any other relevant info, e.g. email) // src/Message/SmsNotification.php namespace App\Message; class SmsNotification { private $content; public function __construct(string $content, int $userId) { $this->content = $content; $this->userId = $userId; } public function getUserId(): int { return $this->userId; } public function getContent(): string { return $this->content; } } Create a Handler Class that will be called when the message is dispatched. Read the message class & perform some task. // src/MessageHandler/SmsNotificationHandler.php namespace App\MessageHandler; Tells Symfony that this use App\Message\SmsNotification; is a message handler use App\Repository\UserRepository; use Symfony\Component\Messenger\Handler\MessageHandlerInterface; class SmsNotificationHandler implements MessageHandlerInterface { private $userRepository; The handler should: public function __construct(UserRepository $userRepository) { $this->userRepository = $userRepository; } 1 - implement MessageHandlerInterface 2 - create an __invoke() method with one argument that's type-hinted with the message class (or interface) public function __invoke (SmsNotification $message) { $user = $this->userRepository->find($message->getUserId()); Follow these 2 rules & Symfony will find & register the handler automatically // ... } } Query for a fresh object here, on the handler Manually Configuring a Handler # config/services.yaml Restrict this handler services: to the command bus App\MessageHandler\SmsNotificationHandler: tags: [{ name: messenger.message_handler, bus: command.bus }] # or configure with options tags: - Only needed if name: messenger.message_handler can't be guessed handles: App\Message\SmsNotification by type-hint bus: command.bus Prevent handlers from being registered twice autoconfigure: false Options: - bus - from_transport - handles - method - priority
3. By https://andreiabohner.org Messenger HEADS UP! 4.4 By default all messages are sent sync (i.e: as soon as they are dispatched) unless you configure a route/transport for it so you can send async/queued messages Dispatch the Message Call the bus: inject the message_bus service (via MessageBusInterface ), e.g. in a controller use Symfony\Component\Messenger\ MessageBusInterface ; class DefaultController extends AbstractController { public function index( MessageBusInterface $bus) { $bus ->dispatch (new SmsNotification('A message!')); If you pass a raw message here, // or use the shortcut by default, the dispatch() method $this ->dispatchMessage (new SmsNotification('A message!')); wraps it in an Envelope // ... Call SmsNotificationHandler For async/queued messages! Route Messages to a Transport A transport is registered using a "DSN" in your .env file #.env # MESSENGER_TRANSPORT_DSN=amqp://guest:guest@localhost:5672/%2f/messages # MESSENGER_TRANSPORT_DSN=doctrine://default # MESSENGER_TRANSPORT_DSN=redis://localhost:6379/messages # config/packages/messenger.yaml If you want the message not to be sent framework: immediately configure a transport to send to it messenger: (a transport tells where to send/read messages) transports: Could be any async: ' %env(MESSENGER_TRANSPORT_DSN)% ' desired name async_priority_high: dsn: '%env(MESSENGER_TRANSPORT_DSN)%' options: queue_name is specific to queue_name: high the doctrine transport #exchange: For amqp send to a separate # name: high exchange then queue #queues: # messages_high: ~ # or redis try "group" async_priority_low: dsn: '%env(MESSENGER_TRANSPORT_DSN)%' options: queue_name: low Send messages to transports & its Route all messages that extend routing: handler(s) this base class to async transport 'App\Message\SmsNotification' : async Add all your message class that need to be async here 'App\Message\AbstractAsyncMessage' : async 'App\Message\AsyncMessageInterface' : async 'My\Message\ToBeSentToTwoSenders' : [async, async_high_priority] Consume Messages (Run the Worker) A command that "handles" messages from a queue is called a "worker” This is your worker! Route all messages that implement this interface to async transport $ php bin/console messenger:consume async -vv Send messages to multiple transports By default, the command will run forever: looking for new messages on your transport and handling them -vv show details about what is happening Name of the transport you defined This is how to Prioritize Transports $ php bin/console messenger:consume async_priority_high async_priority_low Instruct the worker to handle messages in a priority order: The worker will always first look for messages waiting on async_priority_high If there are none, then it will consume messages from async_priority_low A worker can read from one or many transports
4. Use CQRS - Command Query Responsibility Segregation By https://andreiabohner.org Messenger 4.4 Messenger Configuration # config/packages/messenger.yaml framework: Send failed messages to the transport defined here (for later handling) messenger: failure_transport: failed The bus that is going to be injected default_bus: command.bus when injecting MessageBusInterface buses: command.bus: In case of error, a new connection is opened autowireable with the middleware: MessageBusInterface The connection is open before your handler & closed type-hint (because this - doctrine_ping_connection immediately afterwards instead of keeping it open forever is the default_bus) - doctrine_close_connection Wrap your handler in a single Doctrine transaction so your handler doesn’t need to call flush(). - doctrine_transaction An error will rollback automatically - doctrine_clear_entity_manager autowireable with Cleans the entity manager MessageBusInterface $queryBus query.bus: before sending it to your handler middleware: - validation autowireable with event.bus: MessageBusInterface $eventBus default_middleware: allow_no_handlers middleware: validate the message object itself - validation Route messages through Symfony's validator that have to go through the message queue routing: 'App\Message\MyMessage' : amqp Send messages (e.g. to a queueing system) & receive them via a worker Supports multiple transports, e.g.: - doctrine - inmemory - redis - sync transports: amqp: enqueue://default async: dsn: '%env(MESSENGER_TRANSPORT_DSN)%' If handling a message fails 3 times failed : (default max_retries), dsn: 'doctrine://default?queue_name=failed' it will then be sent to async_priority_high: the failed transport dsn: '%env(MESSENGER_TRANSPORT_DSN)%' retry_strategy: If a message fails it’s retried multiple times (max_retries) & then will be discarded max_retries: 3 Configuring Milliseconds delay delay: 1000 Retries & Failures Causes the delay to be higher before each multiplier: 2 for this retry. E.g. 1 second delay, 2 seconds, 4 seconds transport max_delay: 0 Override all of this with a service that # service: null implements Symfony\Component\Messenger\Retry\RetryStrategyInterface Define the serializer for this transport serializer: messenger.transport.symfony_serializer serializer: Built-in service that uses the default_serializer: messenger.transport.symfony_serializer Define the global serializer. Serializer component symfony_serializer: When messages are sent/received to a transport, format: json they're serialized using context: { } Handling Messages Synchronously using a Transport PHP's native serialize() & unserialize() functions by default # config/packages/messenger.yaml framework: messenger: transports: # ... other transports sync: 'sync://' routing: App\Message\SmsNotification: sync
5. Messenger send & receive messages to/from other apps or via message queues By https://andreiabohner.org Transports 4.4 Each transport has a number of different connection options and there are 2 ways to pass them: 2 - via the options key under the transport in messenger.yaml 1 - via the DSN, as query parameters # .env MESSENGER_TRANSPORT_DSN=redis://localhost:6379/messages # config/packages/messenger.yaml framework: messenger: # or with options: transports: MESSENGER_TRANSPORT_DSN=redis://password@localhost:6379/messages/symfony/consumer? async: “%env(MESSENGER_TRANSPORT_DSN)%" auto_setup=true& dsn: "%env(MESSENGER_TRANSPORT_DSN)%" serializer=1& HEADS UP! options: stream_max_entries=0& auto_setup: true dbindex=0 Options defined under “options” serializer: 1 key take precedence over ones stream_max_entries: 0 dbindex: 0 defined in the DSN AMPQ # .env MESSENGER_TRANSPORT_DSN=amqp://guest:guest@localhost:5672/%2f/messages Need the AMQP PHP extension Options # config/packages/messenger.yaml framework: messenger: It’s possible to configure AMQP-specific settings on your message by transports: adding AmqpStamp to your Envelope: async: dsn: '%env(MESSENGER_TRANSPORT_DSN)%' use Symfony\Component\Messenger\Transport\AmqpExt\AmqpStamp; A mqpSta options: m p // ... exchange: name: messages $attributes = []; type: direct $bus->dispatch(new SmsNotification(), [ default_publish_routing_key: normal new AmqpStamp('routing-key-name’, AMQP_NOPARAM, $attributes) queues: ]); messages_normal: binding_keys: [normal] exchange: host Hostname of the AMQP service. port Port of the AMQP service. name Name of the exchange. (Default: messages) vhost Virtual Host to use with the AMQP service. type Type of exchange. Possible types: fanout, direct, user Username to connect the the AMQP service. password Password to connect to the AMQP service. auto_setup Enable or not the auto-setup of queues & exchanges. (Default: true) prefetch_count topic, header. (Default: fanout) default_publish_routing_key Routing key to use when publishing, if none is specified on the message. flags Set channel prefetch count. Exchange flags. Possible flags: AMQP_DURABLE, AMQP_PASSIVE, AMQP_AUTODELETE, ... (Default: AMQP_DURABLE) queues: arguments queues[name] An array of queues, keyed by the name. binding_keys The binding keys (if any) to bind to this queue. binding_arguments Arguments to be used while binding the queue. flags Queue flags. (Default: AMQP_DURABLE) arguments Extra arguments. Extra arguments. delay: queue_name_pattern Pattern to use to create the queues. (Default: “delay_%exchange_name%_%routing_key%_%delay%") exchange_name Name of the exchange to be used for the delayed/retried messages. (Default: "delays") Options Doctrine Use to store messages in a database table # .env MESSENGER_TRANSPORT_DSN=doctrine://default # config/packages/messenger.yaml Connection name framework: messenger: transports: async_priority_high: ' %env(MESSENGER_TRANSPORT_DSN)%?queue_name=high_priority ' async_normal: dsn: '%env(MESSENGER_TRANSPORT_DSN)%' options: queue_name: normal_priority table_name Name of the table. (Default: messenger_messages) queue_name Name of the queue (a column in the table, to use one table for multiple transports). (Default: default) redeliver_timeout Timeout before retrying a message that's in the queue but in the "handling" state (if a worker died for some reason, this will occur, eventually you should retry the message) - in seconds. (Default: 3600) auto_setup When the transport is first used it will create a table named messenger_messages If the table “messenger_messages” should be created automatically during send/get. (Default: true)
6. By https://andreiabohner.org Messenger 4.4 Options Redis Use streams to queue messages. Need the Redis PHP extension (>=4.3) & a Redis server (^5.0) stream # .env MESSENGER_TRANSPORT_DSN=redis://localhost:6379/messages # Full DSN Example MESSENGER_TRANSPORT_DSN=redis://password@localhost:6379/messages/symfony/consumer? auto_setup=true& serializer=1& stream_max_entries=0& dbindex=0 Redis stream name. (Default: messages) group Redis consumer group name. (Default: symfony) consumer Consumer name used in Redis. (Default: consumer) auto_setup Create the Redis group automatically? (Default: true) auth Redis password. serializer How to serialize the final payload in Redis (Redis::OPT_SERIALIZER option). (Default: Redis::SERIALIZER_PHP) stream_max_entries Maximum number of entries which the stream will be trimmed to. Set it to a large enough number to avoid losing pending messages (which means "no trimming"). dbindex In-memory Useful for tests! Use this config only in test environment Doesn’t actually delivery messages. Instead, it holds them in memory during the request. All in-memory transports will be reset automatically after each test in test classes extending KernelTestCase or WebTestCase. # config/packages/test/messenger.yaml framework: messenger: transports: async_priority_normal: 'in-memory://' use Symfony\Component\Messenger\Transport\InMemoryTransport; class ImagePostControllerTest extends WebTestCase { Method to call on a transport to get public function testCreate() the sent, or "queued" messages { $transport = self::$container->get('messenger.transport.async_priority_normal’); $this->assertCount(1, $transport->get()); } } Sync Useful when developing! Instead of sending each message to an external queue, it just handles them immediately. They're handled synchronously. Use this config only in dev environment # config/packages/dev/messenger.yaml framework: messenger: transports: async: 'sync://' async_priority_high: 'sync://' Console $ php bin/console messenger:consume async Run the consumer (worker). Fetch & deserialize each message back into PHP, then pass it to the message bus to be handled. --time-limit=3600 Run the command for xx minutes and then exit. --memory-limit=128M Exit once its memory usage is above a certain level. --limit =20 Run a specific number of messages and then exit. $ php bin/console debug:messenger List available messages and handlers per bus. $ php bin/console messenger:setup-transports Configure the transports. E.g. table name in doctrine transport. $ php bin/console messenger:stop-workers Sends a signal to stop any messenger:consume processes that are running. Each worker command will finish the message they are currently processing and then exit. Failed transport These commands are available when the "failure_transport" is configured Worker commands are not automatically restarted. $ php bin/console messenger:failed:show See all messages in the failure transport. $ php bin/console messenger:failed:show 20 -vv See details about a specific failure. $ php bin/console messenger:failed:retry -vv View and retry messages one-by-one. $ php bin/console messenger:failed:retry -vv --force View and retry messages one-by-one without asking. $ php bin/console messenger:failed:retry 20 30 --force Retry specific messages. $ php bin/console messenger:failed:remove 20 Remove a message without retrying it.
7. By https://andreiabohner.org Messenger Install Enqueue $ composer require sroze/messenger-enqueue-transport 4.4 Enable Enqueue Other Transports (Available via Enqueue) // config/bundles.php return [ Enable the bundle // ... Enqueue\MessengerAdapter\Bundle\EnqueueAdapterBundle::class => ['all' => true], ]; enqueue://default ?queue[name]=queue_name &topic[name]=topic_name &deliveryDelay=1800 &delayStrategy=Enqueue\AmqpTools\RabbitMqDelayPluginDelayStrategy &timeToLive=3600 &receiveTimeout=1000 &priority=1 Enqueue extra options Default configuration # config/packages/enqueue.yaml enqueue: transport: Accept a string DSN, an array with DSN key, or null dsn: ~ MQ broker DSN, Should implement "Interop\Queue\ConnectionFactory" connection_factory_class: ~ E.g. amqp, sqs, gps, ... Should implement factory_service: ~ “Enqueue\ConnectionFactoryFactoryInterface" factory_class: ~ consumption: Time in milliseconds receive_timeout: 10000 queue consumer waits for a message (default: 100ms) client: traceable_producer: true prefix: enqueue separator: . app_name: app router_topic: default router_queue: default router_processor: null redelivered_delay_time: 0 default_queue: default Contains driver specific options driver_options: [] monitoring: Accept a string DSN, an array with DSN key, or null. Stats storage DSN. dsn: ~ Schemes supported: storage_factory_service: ~ Should implement "wamp", "ws", "influxdb” “Enqueue\Monitoring\StatsStorageFactory" storage_factory_class: ~ async_commands: enabled: false timeout: 60 command_name: ~ queue_name: ~ job: enabled: false async_events: enabled: false extensions: doctrine_ping_connection_extension: false doctrine_clear_identity_map_extension: false doctrine_odm_clear_identity_map_extension: false doctrine_closed_entity_manager_extension: false reset_services_extension: false signal_extension: true reply_extension: true
8. By https://andreiabohner.org Messenger 4.4 Amazon SQS Add the SQS DSN // .env SQS_DSN=sqs:?key=<SQS_KEY>&secret=<SQS_SECRET>&region=<SQS_REGION> SQS_QUEUE_NAME=my-sqs-queue-name Add the queue name as an env var Install Enqueue SQS $ composer require enqueue/sqs Options // config/packages/messenger.yaml framework: messenger: Add custom settings transports for SQS transport async: dsn: ' enqueue://default ' options: receiveTimeout: 20 queue: name: ' %env(resolve:SQS_QUEUE_NAME)% ' key AWS credentials. If no credentials are provided, the SDK will attempt to load them from the environment. (Default: null) secret AWS credentials. If no credentials are provided, the SDK will attempt to load them from the environment. (Default: null) token AWS credentials. If no credentials are provided, the SDK will attempt to load them from the environment. (Default: null) region (string, required) Region to connect to. See http://docs.aws.amazon.com/general/latest/gr/rande.html for a list of available regions. (Default: null) retries (int) Configures the maximum number of allowed retries for a client (pass 0 to disable retries). (Default: 3) version (string, required) The version of the webservice to utilize. (Default: '2012-11-05') lazy Enable lazy connection. (Default: true) endpoint (string) The full URI of the webservice. This is only required when connecting to a custom endpoint e.g. localstack (Default: null) queue_owner_aws_account_id The AWS account ID of the account that created the queue. Google Pub/Sub Install Enqueue Google Pub/Sub $ composer require enqueue/gps // .env MESSENGER_TRANSPORT_DSN=enqueue://gps?projectId=projdev&emulatorHost=http%3A%2F%2Fgoogle-pubsub%3A8085 Options // config/packages/messenger.yaml framework: messenger: transports async: dsn: ' enqueue://gps ' options: projectId: '%env(GOOGLE_PROJECT_ID)%' keyFilePath: '%env(GOOGLE_APPLICATION_CREDENTIALS)%' projectId The project ID from the Google Developer's Console. keyFilePath The full path to your service account credentials.json file retrieved from the Google Developers Console. retries Number of retries for a failed request. (Default: 3) scopes Scopes to be used for the request. emulatorHost The endpoint used to emulate communication with GooglePubSub. lazy The connection will be performed as later as possible, if the option set to true. Apache Kafka Install Enqueue Kafka $ composer require enqueue/rdkafka // .env MESSENGER_TRANSPORT_DSN=enqueue://default KAFKA_BROKER_LIST=node-1.kafka.host:9092,node-2.kafka.host:9092,node-3.kafka.host:9092

- 위키
Copyright © 2011-2025 iteam. Current version is 2.139.1. UTC+08:00, 2025-01-17 03:53
浙ICP备14020137号-1 $방문자$