Очереди сообщений

Очереди в Bitrix Framework выполняют задачи в фоне. Это помогает разгрузить систему, если какие-то операции требуют много времени или ресурсов.

Функционал очередей находится в альфа-версии разработки. Очереди можно использовать с версии 25.100.300 главного модуля, но без гарантий обратной совместимости. Разработчики могут изменить функционал в следующих обновлениях.

Как работают очереди

Очередь — это канал передачи данных от отправителя к обработчику. Сообщение помещается в брокер, а обработчик забирает его и выполняет задачу.

Основные компоненты:

  • сообщение — контейнер данных,

  • обработчик — класс, который обрабатывает сообщение,

  • брокер — хранилище сообщений,

  • очередь — логическая группа сообщений, привязанная к обработчику.

Быстрый старт

Чтобы использовать очереди:

  • создайте класс сообщения,

  • создайте класс обработчика сообщения,

  • добавьте информацию об очереди в конфигурацию .settings.php.

Дополнительно можно настроить сложную логику сообщений, переключить обработку на консольный скрипт и использовать свою таблицу для хранения сообщений.

Пример настройки очереди сообщений

1. Создать класс сообщения

Сообщение должно поддерживать JSON-сериализацию.

  • Используйте поля простых типов для автоматической сериализации: string, int, float, bool, array.

  • Реализуйте метод jsonSerialize(), если нужно передать объект. Метод вернет массив простых типов.

use Bitrix\Main\Messenger\Entity\AbstractMessage;
        use Bitrix\Main\Messenger\Entity\MessageInterface;
        
        class MyMessage extends AbstractMessage
        {
        	public function __construct(
        		public readonly string $text,  // string — допустимый тип, сериализуется автоматически
        		public readonly int $superId   // int — допустимый тип, сериализуется автоматически
        	) {}
        
        	public function jsonSerialize(): mixed // метод вернет массив простых типов
        	{
        		return [
        			'text' => $this->text,
        			'superId' => $this->superId,
        		];
        	}
        
        	public static function createFromData(array $data): MessageInterface
        	{
        		return new static(...$data);
        	}
        }
        

2. Создать класс обработчика

Обработчик получает сообщения из очереди и выполняет нужные действия.

use Bitrix\Main\Messenger\Entity\MessageInterface;
        use Bitrix\Main\Messenger\Receiver\AbstractReceiver;
        
        class MyReceiver extends AbstractReceiver
        {
        	/**
        	 * @param MyMessage $message
        	 */
        	protected function process(MessageInterface $message): void
        	{
        		$this->superNotifier->send($message->text);
        	}
        }
        

3. Добавить конфигурацию в .settings.php

Добавьте очередь в конфигурацию модуля — в handler укажите путь к классу обработчику.

<?php
        
        return [
        	... // другие настройки, в том числе брокеров
        	'messenger' => [
        		'value' => [
        			'queues' => [ // список очередей
        				'first_queue' => [ // очередь с именем 'first_queue'
        					'handler' => \Bitrix\MyModule\Internals\Messenger\Receiver\MyReceiver::class,
        				],
        			],
        		],
        		'readonly' => true, // защита от перезаписи настроек через API
        	],
        ];
        

Убедитесь, что в конфигурации есть настройки брокера. Подробнее в разделе Конфигурация очередей.

4. Отправить сообщение

Сообщение поступит в очередь и обработается в фоне.

$message = new MyMessage('Мое первое сообщение!', 12345);
        $message->send('first_queue');
        

Конфигурация очередей

Систему очередей нужно настроить в файле .settings.php. Конфигурация может быть:

  • глобальной — в файле bitrix/.settings.php для общих настроек и межмодульных очередей,

  • модульной — в файле .settings.php модуля, для очередей из этого модуля.

Конфигурацию очередей нужно указывать в файле .settings.php того модуля, к которому относятся очереди. В глобальном файле указываются только те очереди, которые одновременно используются в нескольких модулях.

Глобальная конфигурация

Глобальную конфигурацию настраивают, когда очереди и брокер планируют использовать в разных модулях, например, для интеграции между модулями.

По умолчанию в /bitrix/.settings.php создается секция messenger с параметрами run_mode и brokers, где описан один брокер по умолчанию default.

return [
        	'messenger' => [
        		'value' => [
        			'run_mode' => 'web', // web, cli
        			'brokers' => [
        				'default' => [
        					'type' => DbBroker::TYPE_CODE, // сейчас один тип — db
        					'params' => [
        						'table' => MessengerMessageTable::class,
        					]
        				],
        				'db_calendar' => [
        					'type' => 'db',
        					'params' => [
        						'table' => 'b_calendar_messages',
        						'module' => 'calendar',
        					],
        				],
        			],
        			'queues' => [
        				'first_queue' => [
        					'handler' => MyReceiver::class,
        				],
        				'google_section_sync' => [
        					'broker' => 'db_calendar',
        					'handler' => GoogleSectionReceiver::class,
        				],
        			],
        		],
        		'readonly' => true,
        ];
        

Параметры

Режим run_mode — определяет как обрабатывать сообщения:

  • web — в фоновых задачах,

  • cli — в консольном скрипте.

При значении cli нужно настроить запуск скрипта вручную командой:

php bitrix.php messenger:consume
        

Брокеры brokers — доступные хранилища сообщений. Ключ — имя брокера, а значение — его параметры.

  • type — тип брокера. Сейчас поддерживается только значение db, то есть хранение в базе данных.

  • params — параметры брокера. Для типа db:

    • table — таблица для хранения сообщений,

    • module — модуль, с которым связана таблица. По умолчанию имеет значение main.

Брокер по умолчанию default должен всегда быть в глобальной конфигурации.

Очереди queues — список доступных очередей с указанием обработчиков.

Модульная конфигурация

В .settings.php модуля указываются очереди, используемые этим модулем.

return [
        	...
            'messenger' => [
                'value' => [
                    'queues' => [
                        'google_section_sync' => [
                            'broker' => 'calendar_db',
                            'handler' => \Bitrix\Calendar\Synchronization\Internals\Messenger\Receiver\GoogleSectionReceiver::class,
                        ],
                        'google_event_sync' => [
                            'handler' => \Bitrix\Calendar\Synchronization\Internals\Messenger\Receiver\GoogleEventReceiver::class,
                            'limit' => 30,
                            'total_processing_limit' => 50,
                        ],
                    ],
                ],
                'readonly' => true,
            ],
        ];
        

Параметры

Брокеры brokers — доступные хранилища сообщений. Структура объекта полностью идентична глобальной конфигурации.

Очереди queues — доступные очереди. Ключ — идентификатор очереди, а значение — параметры очереди:

  • handler — класс-обработчик сообщений,

  • broker — брокер очереди. Если его не указать, используется брокер по умолчанию,

  • limit — количество обрабатываемых обработчиком сообщений за раз, по умолчанию — 50,

  • total_processing_limit — максимальное количество одновременно обрабатываемых сообщений в очереди, по умолчанию — 100,

    Значение не может быть меньше значения параметра limit, иначе при обработке очереди будет выброшено исключение.

  • retry_strategy — настройки повторной обработки сообщений в случае неудачной попытки обработки,

  • module — модуль, к которому относится очередь. Всегда имеет значение идентификатора модуля.

Ограничение количества обрабатываемых сообщений

Количество обрабатываемых сообщений настраивается двумя параметрами очереди.

  1. Параметр limit задает количество сообщений, которые обработчик выбирает из очереди за один раз.

    Пример. Если обработка выполняется на хите в режиме web и обработка одного сообщения занимает около 10 секунд, значение 50 может создавать лишнюю нагрузку. Достаточно обрабатывать по 2-3 сообщения за хит, чтобы очередь разбиралась равномерно и не влияла на работу другого функционала.

  2. Параметр total_processing_limit задает максимальное количество сообщений, которые могут одновременно обрабатываться в очереди всеми обработчиками.

    Пример. Если limit равен 30 и total_processing_limit50, то при наличии 100 готовых сообщений первый обработчик заберет 30, а второй сможет забрать только 20. Параметр помогает ограничить общую нагрузку на систему и избежать ситуации, когда длительная обработка сообщений блокирует работу портала.

Значение параметра total_processing_limit не должно быть меньше, чем значение параметра limit, иначе при обработке очереди будет выброшено исключение, чтобы подсветить проблему в конфигурации.

Повторная обработка сообщений

Сообщение может не обработаться с первой попытки, например, из-за временной недоступности сервиса. Тогда сообщение нужно обработать повторно.

Используйте повторную обработку:

  • при работе с внешними API,

  • при нестабильных соединениях,

  • для ресурсоемких задач.

Параметры

Для каждой очереди можно указать в конфигурации параметр retry_strategy с настройками повторной обработки.

  • max_retries — максимальное количество попыток. По умолчанию — 3.

  • delay — базовая задержка между попытками в секундах. По умолчанию — 1.

  • multiplier — множитель задержки между попытками. По умолчанию — 2.

  • max_delay — максимальное значение задержки между повторами в секундах. По умолчанию — 0, то есть без ограничения.

Пример конфигурации

return [
            'messenger' => [
                'value' => [
                    'queues' => [
                        'second_queue' => [
                            'handler' => MyReceiver::class,
                            'retry_strategy' => [
                                'max_retries' => 10,
                                'delay' => 5,       // базовая задержка — пять секунд 
                                'multiplier' => 2,  // удваиваем с каждой попыткой
                                'max_delay' => 300  // максимум пять минут между попытками
                            ],
                        ],
                    ],
                ],
                'readonly' => true,
            ],
        ];
        
  • Первая повторная попытка выполнится через 5 секунд.

  • Вторая — через 10 секунд.

  • Третья — через 20 секунд, далее задержка будет увеличиваться с учетом multiplier.

  • Шестая попытка не выполнится, так как через 300 секунд сработает ограничение max_delay.

Своя таблица для сообщений

При работе с DBBroker сообщения сохраняются в базу данных. По умолчанию используется таблица b_main_messenger_message. Можно создать отдельную таблицу для своего модуля.

  1. Создайте класс-наследник MessengerMessageTable.

    use Bitrix\Main\Messenger\Internals\Storage\Db\Model\MessengerMessageTable;
            
            class CalendarMessagesTable extends MessengerMessageTable
            {
                public static function getTableName(): string
                {
                    return 'b_calendar_messenger_message';
                }
            }
            
  2. Укажите свою таблицу в конфигурации брокера.

    return [
                'messenger' => [
                    'value' => [
                        'brokers' => [
                            'db_calendar' => [
                                'type' => 'db',
                                'params' => [
                                    'table' => CalendarMessagesTable::class,
                                ],
                            ],
                        ],
                    ],
                    'readonly' => true,
                ],
            ];
            
  3. Укажите имя брокера db_calendar в настройках очередей. После этого сообщения будут сохраняться в созданную таблицу.

За создание и удаление собственной таблицы отвечает модуль, к которому относится таблица. Поэтому обязательно добавьте запрос создания таблицы в установщик модуля.

Запуск обработчика очередей

Система поддерживает два режима обработки.

  1. web — обработка через фоновые задачи. Работает по умолчанию и без дополнительных настроек.

  2. cli — консольный режим. Требует дополнительной настройки и предоставляет больше возможностей для управления обработкой. Для производственной среды рекомендуется использовать cli-режим с супервизором.

Консольный режим

Чтобы использовать консольный режим, установите в конфигурации 'run_mode' => 'cli'.

Основная команда для обработки:

php bitrix.php messenger:consume
        

Она запускает обработчики для всех очередей в бесконечном цикле.

Параметры команды:

  • Выбор очередей. Можно указать несколько:

    php bitrix.php messenger:consume first_queue second_queue
            
  • Дополнительные опции:

    // Обрабатывает очередь second_queue 60 секунд с паузой 2 секунды между проверками
            php bitrix.php messenger:consume second_queue -t 60 —sleep 2
            
    • -t, —time-limit — ограничить время работы, в секундах,

    • —sleep — установить паузу между итерациями. По умолчанию — 1 секунда.

Обработчики сообщений

Каждая очередь использует свой обработчик. Он должен:

  • обработать сообщение,

  • выбросить исключение при ошибке.

Чтобы создать обработчик, наследуйтесь от класса AbstractReceiver и реализуйте метод process. Метод возвращает void, когда сообщение успешно обработано, или бросает исключение.

class SomeReceiver extends AbstractReceiver
        {
            protected function process(MessageInterface $message): void
            {
                if ($message instanceof SectionMessage) {
                    $section = $this->sectionRepository->getById($message->getSectionId());
                    (new SendSectionCommand($section))->run();
                    return;
                }
        
                throw new UnprocessableMessageException($message->getId(), $this->queueId);
            }
        }
        

Типы исключений

Используйте разные исключения для разных сценариев.

  • UnprocessableMessageException — сообщение не подходит для этого обработчика. Пример: в очередь для обработки задач пришло сообщение о пользователе.

  • UnrecoverableMessageException — сообщение устарело и повторная обработка не нужна. Пример: сообщение содержит ID удаленного объекта.

  • RecoverableMessageException — временная ошибка, стоит повторить попытку. Можно указать задержку в секундах через метод getRetryDelay(). Если возвращает null, используются настройки повтора из конфигурации очереди.

  • Другие исключения — критические ошибки обработки. Система применит к ним настройки повторной обработки retry_strategy из конфигурации очереди.

Пример обработчика сообщений в консольном режиме

В примере обработчик SomeReceiver используется для конкретной очереди. При получении сообщения он выводит его ID. В cli-режиме воркер нужно запускать вручную. Он проверяет очередь в цикле и передает сообщения обработчику.

class SomeReceiver extends \Bitrix\Main\Messenger\Receiver\AbstractReceiver
        {
            protected function process(MessageInterface $message): void
            {
                echo "Message {$message->getId()} received\n";
            }
        }
        
        // Для cli-режима нужно запускать воркер вручную:
        $worker = new \Bitrix\Main\Messenger\Internals\Worker();
        $worker->process();
        

Собственная реализация

Для особых сценариев обработки можно реализовать собственный обработчик через интерфейс ReceiverInterface.

interface ReceiverInterface
        {
            /**
             * Основной метод обработки
             * @throws ReceiverException
             */
            public function run(): void;
            
            // Установка лимита сообщений за одну итерацию
            public function setLimit(int $limit): self;
            
            // Назначение очереди
            public function setQueueId(string $queueId): self;
        
            // Выбор брокера
            public function setBroker(BrokerInterface $broker): self;
        }
        

При реализации интерфейса необходимо самостоятельно управлять:

  • получением сообщений из брокера,

  • подтверждением успешной обработки ack,

  • обработкой ошибок reject.

Методы интерфейса:

  • run() — получает сообщения из очереди, выполняет обработку и выбрасывает ReceiverException при неудачной обработке.

  • setLimit(int $limit) — устанавливает лимит сообщений для одновременной обработки. Например, setLimit(10) задает обработку по 10 сообщений.

  • setQueueId(string $queueId) — назначает очередь для обработки.

  • setBroker(BrokerInterface $broker) — подключает брокера сообщений, например, setBroker(new RedisBroker()).

Работа с сообщениями

Сообщения — это контейнеры данных для передачи через очередь. Они должны реализовывать \Bitrix\Main\Messenger\Entity\MessageInterface, но лучше расширять класс \Bitrix\Main\Messenger\Entity\AbstractMessage.

Ключевые требования к сообщениям:

  • Данные должны преобразовываться в JSON, то есть быть сериализуемыми.

  • Можно использовать любые поля и структуры данных.

  • Необходимо учитывать возможную задержку обработки.

Отправить сообщение

Создайте объект сообщения и отправьте в очередь.

$message = new MyMessage(...);
        $message->send('first_queue');
        

Отложить обработку сообщения

Параметр DelayParam(3600) откладывает обработку на 1 час (3600 секунд). Сообщение появится в очереди, но обработчик получит его только через указанное время.

use Bitrix\Main\Messenger\Entity\ProcessingParam\DelayParam;
        
        $message = new MyMessage(...);
        $message->send('first_queue', [new DelayParam(3600)]);
        

Отправить сообщение с дополнительным параметром

Сообщение отправляется с двумя параметрами, которые применятся при постановке в очередь:

  • DelayParam — отложить обработку на 1 час,

  • ItemIdParam — отметить сообщение идентификатором myid-123.

use Bitrix\Main\Messenger\Entity\ProcessingParam\DelayParam;
        use Bitrix\Main\Messenger\Entity\ProcessingParam\ItemIdParam;
        
        $message = new MyMessage(...);
        $message->send('first_queue', [new DelayParam(3600), new ItemIdParam('myid-123')]);
        

Создать свой параметр

Чтобы добавить новый параметр, создайте класс, реализующий интерфейс \Bitrix\Main\Messenger\Entity\ProcessingParam\ProcessingParamInterface:

interface ProcessingParamInterface
        {
            public function applyTo(MessageBox $messageBox): MessageBox;
        }
        

Интерфейс требует всего один метод — applyTo. Он принимает объект класса \Bitrix\Main\Messenger\Entity\MessageBox — обертку для сообщения, которое отправляется в брокер очередей.

Особенности работы с отложенной обработкой

Сообщения могут обрабатываться не сразу, что требует особого подхода к их формированию.

Если сообщение обрабатывается с задержкой, в него нужно включать все данные, которые потребуются обработчику.

Пример. В системе есть задача с ID = 123, связанная с внешним сервисом ID = trk-042. При удалении задачи нужно уведомить внешний сервис. Если передать только task_id, то к моменту обработки задача уже может быть удалена из базы данных, и внешний идентификатор станет недоступен.

В таком случае в сообщение нужно сразу включить все необходимые данные.

$message->setData([
            'task_id' => 123,
            'external_id' => 'trk-042'
        ]);
        

Такой подход гарантирует, что обработчик получит все данные даже после удаления исходной сущности. Это особенно важно для операций удаления, синхронизации с внешними системами и сценариев, в которых важна согласованность данных.

Предыдущая