You too can become a hero and support this Blog at GitHub Sponsors or Patreon.

More about support


The Message Bus is coming
And everybody's queuing
New York to San Francisco
An intercity Disco

- Loosely based on Vengaboys

A new gem can be found in TYPO3 since version 12: The integration of the symfony/messenger component (Feature RST, Patch, Official TYPO3 docs, Official symfony docs). The component enables us to emit messages from our code that get picked up by handlers, who then process the messages.

Granted, this sounds a lot like the PSR-14 events we already have. But with symfony/messenger we can have the messages queued and then have the handlers process the messages asynchronously, which can be a very powerful approach.


Let's see what parts are needed to get a message queue going. What the benefits and challenges are and how we do it in TYPO3.

Use Cases

Before we have a look at the technical stuff regarding the message bus and message queue and all that, let's try to understand the problem(s) that this approach is designed to solve.

There is one outstanding property of the message queue: It can be processed asynchronously.

This means, that whenever our application has to do something expensive (in terms of performance) that does not necessarily need to be finished while the current (web) request is waiting for its response, the message queue is worth considering. Instead of doing the expensive task right away, we just send a message to the message bus and another process (a so called "consumer" task) will pick it up and hand it to the responsible handler.

Some common examples of such tasks are:

  • Sending emails
  • Generating images, PDFs or any kind of files
  • Sending web requests to another system and processing the response
  • Doing complex calculations

Since the processing of the messages in the queue is decoupled from the requests that create the messages, it can be scaled quite well and handle huge amounts of messages. This might not come up often, but when we need to handle many tasks, the message queue is here to help us.

By default, the transport is set to be synchronous. This means there is no message queue. Instead, messages are processed immediately. With a synchronous transport, Messages behave very much like events. To benefit from asynchronous transport we need to change the default configuration (and provide a way to consume the queue).

Top

Important Parts

Without going into too much detail, we will have a quick look at the core components: messages, message handlers, the bus, the transport, the queue and the consumer.

For a deeper dive, please read the documentation on how to use symfony/messenger in symfony projects, as most of those concepts and examples apply to TYPO3 as well.

Let's start with the message.

Top

Message

A Message is a PHP object that is passed around (technically, it is passed around wrapped in an envelope object with stamps, but for our purposes we dont need to get into that). It holds all information that a handler might need to process the message. If the transport is configured to happen asynchronously, a message will temporarily be stored in a queue. For this purpose it needs to be serializable.

Other than that we can freely design our own message objects. Here is an example:

class TellTheInternet
{
    public function __construct(
        private readonly bool $areYouSure,
        private readonly string $youWillNotBelieveWhatJustHappened,
    ) {
    }

    // Getter and/or helper methods
}

We just need to make sure, the class is serializable. That's it.

Top

Message Handler

A message handler is a PHP class that handles a specific message. Here is where the magic happens. An email is send, a PDF is created or an URL is called.

An example:

class InternetCommunicationHandler
{
    public function __invoke(TellTheInternet $message) {
        // business code to tell the internet about a thing.
    }
}

Instead of the FQCN of a specific message, we could also specify an interface that a handler is responsible for, to bundle multiple different messages to the same handling. There is no limit to the amount of handlers that can process a single message.

TYPO3 allows for the ordering of handlers, as we already know from middlewares and event listeners (read the Article PSR-14 Events in TYPO3 if you want to find out ore about events).

Top

Message Bus

The message bus is the dispatcher for messages. Whenever we want to emit a message, we inject the MessageBusInterface and hand over our message. The message bus will find the matching transport for the message and dispatch it accordingly.

Here is an example:

class ThingController extends ActionController
{
    public function __construct(
        protected readonly MessageBusInterface $bus,
    ) {
    }

    public function notableAction(): ResponseInterface
    {
		$this->bus->dispatch(
			new TellTheInternet(true, 'A notable thing')
		);

		// Do notable things
	}
}

Note, that the behavior of the message bus can be altered. Internally, the message bus executes a stack of middlewares.

What happens when you dispatch a message to a message bus depends on its collection of middleware and their order.

Official symfony documentation

Of course, we can change that stack and add our own middleware. We can even register multiple busses with individual middleware stacks, each tailored to our needs.

But for now (and for our example below), we keep things simple and stick to the default setup: One bus with default set of middlewares.

Top

Transport

Another piece to the (simplified) puzzle is the transport. Depending on the configuration, the messages are processed either synchronously or asynchronously. When processed synchronously the message bus will hand the message directly to all responsible message handlers, the handlers do their thing and the world moves on.

When processed asynchronously, the message bus hands the message to the configured transport. This could for example be the doctrine transport, that will then store the (serialized) message in the database. Other transport methods could be redis, AMQP or many more.

Of course, we could add our own transport, but that would be out of scope for this article, for this please refer once again to the symfony documentation.

Top

Message Queue and Consumer

An asynchronous transport that stores messages somewhere (for example in the database) creates a message queue. Messages in the queue have not been processed yet and therefore we need another part to get the messages out of the queue and on their way to their handlers. This is done by the consumer.

A consumer (also known as worker) is a process (usually a service run on the server) that constantly checks the queue for messages, determines the next message in line and pipes it into the message handler stack. The fact that such a consumer must be created on the server is the reason why the default transport configuration is synchronous. Because if it would be set to an asynchronous transport by default, the queue would fill up and nothing would happen to the process as long as a consumer is absent.

It would also be possible to have multiple workers running simultaneously to increase the amount of messages that can be processed in any given time or to spawn them whenever the queue fills up.

Top

Example in TYPO3

Now, that we have the vocabulary straight, we can move to an example in TYPO3.

I created a demo extension "messenger_example" that includes the whole example. Fell free to check it out on GitHub.

Our demo application is a minimalist shop. Wen can order things, and we only have to provide an email address and the amount of things we want to order.

We want to dispatch a message when the order is submitted, so that everything can be updated in the database (a synchronous task) and then dispatch another message after the database operation to send out an email (an asynchronous task).

Top

Message Objects

Here are our two message objects:

class OrderReceivedMessage
{
    public function __construct(private readonly string $email, private readonly int $amount)
    {
    }

    public static function fromOrder(Order $order): self
    {
        return new self($order->getEmail(), $order->getAmount());
    }

    // getter
}

class OrderSavedMessage
{
    public function __construct(private readonly string $email, private readonly int $amount)
    {
    }

    public static function fromOrderReceivedMessage(OrderReceivedMessage $message): self
    {
        return new self($message->getEmail(), $message->getAmount());
    }

    // getter
}

Now, we need to configure the routing of the OrderSavedMessage, so that an asynchronous transport is used. We achieve this in the ext_localconf.php of our extension:

// Unset the default, so that it no longer applies
unset($GLOBALS['TYPO3_CONF_VARS']['SYS']['messenger']['routing']['*']);
// Set Webhook-Messages and OrderSavedMessage to asynchronous transport via doctrine
foreach ([WebhookMessageInterface::class, OrderSavedMessage::class] as $className) {
    $GLOBALS['TYPO3_CONF_VARS']['SYS']['messenger']['routing'][$className] = 'doctrine';
}

This code also routes all webhook messages through the (asynchronous) doctrine transport (read more about this in the article about EXT:webhooks).

We have to unset the default configuration of '*' => 'default' because if we dont, the default transport is applied as well as the doctrine transport. This is most likely a bug - I will remove this warning once this has been resolved in the TYPO3 core. Note that our OrderReceivedMessage object is still routed through the default transport, despite unsetting the explicit configuration.

Top

Message Handlers

Next, we need the handlers for the messages. Here they are:

class OrderReceivedHandler
{
    public function __construct(private readonly MessageBusInterface $bus)
    {
    }

    public function __invoke(OrderReceivedMessage $message): void
    {
        // Write Order to database or do other synchronous stuff

        // Dispatch new event message
        $this->bus->dispatch(OrderSavedMessage::fromOrderReceivedMessage($message));
    }
}


class OrderCreatedHandler
{
    public function __construct(private readonly MailerInterface $mailer)
    {
    }

    public function __invoke(OrderSavedMessage $message): void
    {
        $mail = GeneralUtility::makeInstance(MailMessage::class);
        $mail->to($message->getEmail())
            ->from(MailUtility::getSystemFromAddress())
            ->subject('Thank you for your order')
            ->html("<h1>Thank you for your order</h1>
<p>You ordered {$message->getAmount()} things!</p>");
        $this->mailer->send($mail);
    }
}

These handlers must be registered as message handlers, and we can do that by tagging them as messenger.message_handler in the Services.yaml:

DanielGoerz\MessengerExample\MessageHandler\OrderReceivedHandler:
  tags:
    - name: 'messenger.message_handler'

DanielGoerz\MessengerExample\MessageHandler\OrderCreatedHandler:
  tags:
    - name: 'messenger.message_handler'

With TYPO3 v13 we have a new option.

Top

#[AsMessageHandler] Attribute

Since TYPO3 13

symfony/messenger comes with a PHP Attribute AsMessageHandler. This attribute is not supported with TYPO3 12, but support was added to TYPO3 v13 (Feature RST, Patch). If we add this attribute to our handler class, it is tagged automatically, so that we can omit the manual tagging in the Services.yaml.

Here is an example:

use Symfony\Component\Messenger\Attribute\AsMessageHandler;

#[AsMessageHandler]
final class OrderReceivedHandler
{
}

Now, that our message handlers are registered, we can look at dispatching a message.

Top

Dispatching

To learn about dispatching, we can have a look at the controller, where the first message is dispatched:

class OrderController extends ActionController
{
    public function __construct(private readonly MessageBusInterface $bus)
    {
    }

    public function createAction(): ResponseInterface
    {
        $this->view->assign('order', Order::createNew());
        return $this->htmlResponse();
    }

    public function finishAction(Order $order = null): ResponseInterface
    {
        if ($order === null) {
            return $this->redirect('create');
        }
        $this->bus->dispatch(OrderReceivedMessage::fromOrder($order));
        return $this->htmlResponse();
    }
}

Once the finishAction of the OrderController is called, the messages are dispatched and the doctrine transport writes the serialized OrderSavedMessage alongside some metadata into the database. The table is called sys_messenger_messages.

Top

Consumer task

Now we need to create a consumer task, that constantly checks the queue. The TYPO3 core provides a CLI command for this:

./vendor/bin/typo3 messenger:consume doctrine


 [OK] Consuming messages from transport "doctrine".


 // Quit the worker with CONTROL-C.

[info] DanielGoerz\MessengerExample\Message\OrderSavedMessage was handled successfully (acknowledging to transport).

Note, that doctrine is the name of the transport that will be checked.

This command will run for one hour by default.

To quote the official TYPO3 docs: 

As this command is running as a worker, it is stopped after 1 hour to avoid memory leaks. Therefore, the command should be run from a service manager like systemd to restart automatically after the command exits due to the time limit.

Official Docs

 There is one last caveat when it comes to the message queue and the consumer.

Exceptions thrown by the message handlers are ignored by the consumer. Currently, there is no "retry" or "failed queue" concept, so errors have to be handled in the message handlers.

And with that, we covered not quite everything, but a lot. symfony/messenger is a welcome addition to the toolkit of TYPO3 developers. The core itself provides an ambitious integration of the new component with the introduction of support for outgoing webhooks. But more about that in the dedicated article Outgoing Webhooks in TYPO3.


Thanks for reading until the end. I hope you caught the bus and that there was something in it for you. Thanks to all supporters!

Keep queuing!

Top

Further Information