Очереди сообщений

Очереди в Bitrix Framework выполняют задачи в фоне. Это помогает разгрузить систему, если какие-то операции требуют много времени или ресурсов.

Функционал очередей находится в альфа-версии разработки. Очереди можно использовать с версии 25.100.300 главного модуля, но без гарантий обратной совместимости. Разработчики могут изменить функционал в следующих обновлениях.

Как работают очереди

Очередь — это канал передачи данных от отправителя к обработчику. Сообщение помещается в брокер, а обработчик забирает его и выполняет задачу.

Основные компоненты:

  • сообщение — контейнер данных,

  • обработчик — класс, который обрабатывает сообщение,

  • брокер — хранилище сообщений,

  • очередь — логическая группа сообщений, привязанная к обработчику.

Быстрый старт

Чтобы использовать очереди:

  • создайте класс сообщения,

  • создайте класс обработчика сообщения,

  • добавьте информацию об очереди в конфигурацию .settings.php.

Дополнительно можно настроить сложную логику сообщений, переключить обработку на консольный скрипт и использовать свою таблицу для хранения сообщений.

Пример настройки очереди сообщений

1. Создать класс сообщения

Сообщение должно поддерживать JSON-сериализацию.

  • Используйте поля простых типов для автоматической сериализации: string, int, float, bool, array.

  • Реализуйте метод jsonSerialize(), если нужно передать объект. Метод вернет массив простых типов.

use Bitrix\Main\Messenger\Entity\AbstractMessage;
        use Bitrix\Main\Messenger\Entity\MessageInterface;
        
        class MyMessage extends AbstractMessage
        {
        	public function __construct(
        		public readonly string $text,  // string — допустимый тип, сериализуется автоматически
        		public readonly int $superId   // int — допустимый тип, сериализуется автоматически
        	) {}
        
        	public function jsonSerialize(): mixed // метод вернет массив простых типов
        	{
        		return [
        			'text' => $this->text,
        			'superId' => $this->superId,
        		];
        	}
        
        	public static function createFromData(array $data): MessageInterface
        	{
        		return new static(...$data);
        	}
        }
        

2. Создать класс обработчика

Обработчик получает сообщения из очереди и выполняет нужные действия.

use Bitrix\Main\Messenger\Entity\MessageInterface;
        use Bitrix\Main\Messenger\Receiver\AbstractReceiver;
        
        class MyReceiver extends AbstractReceiver
        {
        	/**
        	 * @param MyMessage $message
        	 */
        	protected function process(MessageInterface $message): void
        	{
        		$this->superNotifier->send($message->text);
        	}
        }
        

3. Добавить конфигурацию в .settings.php

Добавьте очередь в конфигурацию модуля — в handler укажите путь к классу обработчику.

<?php
        
        return [
        	... // другие настройки, в том числе брокеров
        	'messenger' => [
        		'value' => [
        			'queues' => [ // список очередей
        				'first_queue' => [ // очередь с именем 'first_queue'
        					'handler' => \Bitrix\MyModule\Internals\Messenger\Receiver\MyReceiver::class,
        				],
        			],
        		],
        		'readonly' => true, // защита от перезаписи настроек через API
        	],
        ];
        

Убедитесь, что в конфигурации есть настройки брокера. Подробнее в разделе Конфигурация очередей.

4. Отправить сообщение

Сообщение поступит в очередь и обработается в фоне.

$message = new MyMessage('Мое первое сообщение!', 12345);
        $message->send('first_queue');
        

Конфигурация очередей

Систему очередей нужно настроить в файле .settings.php. Конфигурация может быть:

  • глобальной — в файле bitrix/.settings.php для общих настроек и межмодульных очередей,

  • модульной — в файле .settings.php модуля, для очередей из этого модуля.

Конфигурацию очередей нужно указывать в файле .settings.php того модуля, к которому относятся очереди. В глобальном файле указываются только те очереди, которые одновременно используются в нескольких модулях.

Глобальная конфигурация

Глобальную конфигурацию настраивают, когда очереди и брокер планируют использовать в разных модулях, например, для интеграции между модулями.

По умолчанию в /bitrix/.settings.php создается секция messenger с параметрами run_mode и brokers, где описан один брокер по умолчанию default.

return [
        	'messenger' => [
        		'value' => [
        			'run_mode' => 'web', // web, cli
        			'brokers' => [
        				'default' => [
        					'type' => DbBroker::TYPE_CODE, // сейчас один тип — db
        					'params' => [
        						'table' => MessengerMessageTable::class,
        					]
        				],
        				'db_calendar' => [
        					'type' => 'db',
        					'params' => [
        						'table' => 'b_calendar_messages',
        						'module' => 'calendar',
        					],
        				],
        			],
        			'queues' => [
        				'first_queue' => [
        					'handler' => MyReceiver::class,
        				],
        				'google_section_sync' => [
        					'broker' => 'db_calendar',
        					'handler' => GoogleSectionReceiver::class,
        				],
        			],
        		],
        		'readonly' => true,
        ];
        

Параметры

Режим run_mode — определяет как обрабатывать сообщения:

  • web — в фоновых задачах,

  • cli — в консольном скрипте.

При значении cli нужно настроить запуск скрипта вручную командой:

php bitrix.php messenger:consume
        

Брокеры brokers — доступные хранилища сообщений. Ключ — имя брокера, а значение — его параметры.

  • type — тип брокера. Сейчас поддерживается только значение db, то есть хранение в базе данных.

  • params — параметры брокера. Для типа db:

    • table — таблица для хранения сообщений,

    • module — модуль, с которым связана таблица. По умолчанию имеет значение main.

Брокер по умолчанию default должен всегда быть в глобальной конфигурации.

Очереди queues — список доступных очередей с указанием обработчиков.

Модульная конфигурация

В .settings.php модуля указываются очереди, используемые этим модулем.

return [
        	...
        	'messenger' => [
        		'value' => [
        			'queues' => [
        				'google_section_sync' => [
        					'broker' => 'calendar_db',
        					'handler' => \Bitrix\Calendar\Synchronization\Internals\Messenger\Receiver\GoogleSectionReceiver::class,
        				],
        				'google_event_sync' => [
        					'handler' => \Bitrix\Calendar\Synchronization\Internals\Messenger\Receiver\GoogleEventReceiver::class,
        				],
        			],
        		],
        		'readonly' => true,
        	],
        ];
        

Параметры

Брокеры brokers — доступные хранилища сообщений. Структура объекта полностью идентична глобальной конфигурации.

Очереди queues — доступные очереди. Ключ — идентификатор очереди, а значение — параметры очереди:

  • handler — класс-обработчик сообщений,

  • broker — брокер очереди. Если его не указать, используется брокер по умолчанию,

  • retry_strategy — настройки повторной обработки сообщений в случае неудачной попытки обработки,

  • module — модуль, к которому относится очередь. Всегда имеет значение идентификатора модуля.

Своя таблица для сообщений

При работе с DBBroker сообщения сохраняются в базу данных. По умолчанию используется таблица b_main_messenger_message, но можно создать отдельную таблицу для своего модуля.

  1. Создайте класс-наследник MessengerMessageTable.

    use Bitrix\Main\Messenger\Internals\Storage\Db\Model\MessengerMessageTable;
            
            class CalendarMessagesTable extends MessengerMessageTable
            {
                public static function getTableName(): string
                {
                    return 'b_calendar_messenger_message';
                }
            }
            
  2. Укажите свою таблицу в конфигурации брокера.

    return [
                'messenger' => [
                    'value' => [
                        'brokers' => [
                            'db_calendar' => [
                                'type' => 'db',
                                'params' => [
                                    'table' => CalendarMessagesTable::class,
                                ],
                            ],
                        ],
                    ],
                    'readonly' => true,
                ],
            ];
            
  3. Используйте имя брокера db_calendar в настройках очередей. Сообщения будут сохраняться в вашу таблицу, а работа с ними не изменится.

За создание и удаление собственной таблицы отвечает модуль, к которому эта таблица относится. Поэтому обязательно добавьте запрос создания таблицы в установщик модуля.

Повторная обработка сообщений

Сообщение может не обработаться с первой попытки, например, из-за временной недоступности сервиса. Тогда сообщение нужно обработать повторно.

Используйте повторную обработку:

  • при работе с внешними API,

  • при нестабильных соединениях,

  • для ресурсоемких задач.

Параметры

Для каждой очереди можно указать в конфигурации параметр retry_strategy с настройками повторной обработки.

  • max_retries — максимальное количество попыток. По умолчанию — 3.

  • delay — базовая задержка между попытками в секундах. По умолчанию — 1.

  • multiplier — множитель задержки между попытками. По умолчанию — 2.

  • max_delay — максимальное значение задержки между повторами в секундах. По умолчанию — 0, то есть без ограничения.

Пример конфигурации

return [
            'messenger' => [
                'value' => [
                    'queues' => [
                        'second_queue' => [
                            'handler' => MyReceiver::class,
                            'retry_strategy' => [
                                'max_retries' => 10,
                                'delay' => 5,       // базовая задержка — пять секунд 
                                'multiplier' => 2,  // удваиваем с каждой попыткой
                                'max_delay' => 300  // максимум пять минут между попытками
                            ],
                        ],
                    ],
                ],
                'readonly' => true,
            ],
        ];
        
  • Первая повторная попытка сработает через 5 секунд.

  • Вторая — через 10 секунд.

  • Третья — через 20 секунд, и так далее.

  • Шестая попытка не выполнится, так как через 320 секунд сработает ограничение max_delay.

Запуск обработчика очередей

Система поддерживает два режима обработки.

  1. web — обработка через фоновые задачи. Работает по умолчанию и без дополнительных настроек.

  2. cli — консольный режим. Требует настройки, но предоставляет больше возможностей и контроля. Для production-окружения рекомендуется использовать cli-режим с супервизором.

Консольный режим

Чтобы использовать консольный режим, установите в конфигурации 'run_mode' => 'cli'.

Основная команда для обработки:

php bitrix.php messenger:consume
        

Она запускает обработчики для всех очередей в бесконечном цикле.

Параметры команды:

  • Выбор очередей. Можно указать несколько:

    php bitrix.php messenger:consume first_queue second_queue
            
  • Дополнительные опции:

    // Обрабатывает очередь second_queue 60 секунд с паузой 2 секунды между проверками
            php bitrix.php messenger:consume second_queue -t 60 —sleep 2
            
    • -t, —time-limit — ограничить время работы, в секундах,

    • —sleep — установить паузу между итерациями. По умолчанию — 1 секунда.

Обработчики сообщений

Каждая очередь использует свой обработчик. Он должен:

  • обработать сообщение,

  • выбросить исключение при ошибке.

Чтобы создать обработчик, наследуйтесь от класса AbstractReceiver и реализуйте метод process. Метод возвращает void, когда сообщение успешно обработано, или бросает исключение.

class SomeReceiver extends AbstractReceiver
        {
            protected function process(MessageInterface $message): void
            {
                if ($message instanceof SectionMessage) {
                    $section = $this->sectionRepository->getById($message->getSectionId());
                    (new SendSectionCommand($section))->run();
                    return;
                }
        
                throw new UnprocessableMessageException($message->getId(), $this->queueId);
            }
        }
        

Типы исключений

Используйте разные исключения для разных сценариев.

  • UnprocessableMessageException — сообщение не подходит для этого обработчика. Пример: в очередь для обработки задач пришло сообщение о пользователе.

  • UnrecoverableMessageException — сообщение устарело и повторная обработка не нужна. Пример: сообщение содержит ID удаленного объекта.

  • RecoverableMessageException — временная ошибка, стоит повторить попытку. Можно указать задержку в секундах через метод getRetryDelay(). Если возвращает null, используются настройки повтора из конфигурации очереди.

  • Другие исключения — критические ошибки обработки. Система применит к ним настройки повторной обработки retry_strategy из конфигурации очереди.

Пример обработчика сообщений в консольном режиме

Создается обработчик SomeReceiver для конкретной очереди. При получении сообщения выводится его ID. В cli-режиме запускаем воркер $worker. Воркер в бесконечном цикле проверяет очередь и передает сообщения обработчику.

class SomeReceiver extends \Bitrix\Main\Messenger\Receiver\AbstractReceiver
        {
            protected function process(MessageInterface $message): void
            {
                echo "Message {$message->getId()} received\n";
            }
        }
        
        // Для cli-режима нужно запускать воркер вручную:
        $worker = new \Bitrix\Main\Messenger\Internals\Worker();
        $worker->process();
        

Собственная реализация

Для особых сценариев обработки можно реализовать собственный обработчик через интерфейс ReceiverInterface.

interface ReceiverInterface
        {
            /**
             * Основной метод обработки
             * @throws ReceiverException
             */
            public function run(): void;
            
            // Установка лимита сообщений за одну итерацию
            public function setLimit(int $limit): self;
            
            // Назначение очереди
            public function setQueueId(string $queueId): self;
        
            // Выбор брокера
            public function setBroker(BrokerInterface $broker): self;
        }
        

При реализации интерфейса вы берете на себя полный контроль над процессом обработки. Необходимо самостоятельно управлять:

  • получением сообщений из брокера,

  • подтверждением успешной обработки ack,

  • обработкой ошибок reject.

Методы интерфейса:

  • run() — получает сообщения из очереди, выполняет обработку и выбрасывает ReceiverException при неудачной обработке.

  • setLimit(int $limit) — устанавливает лимит сообщений для одновременной обработки, например, setLimit(10) — обрабатывать по 10 сообщений.

  • setQueueId(string $queueId) — назначает очередь для обработки.

  • setBroker(BrokerInterface $broker) — подключает брокера сообщений, например, setBroker(new RedisBroker()).

Работа с сообщениями

Сообщения — это контейнеры данных для передачи через очередь. Они должны реализовывать \Bitrix\Main\Messenger\Entity\MessageInterface, но лучше расширять класс \Bitrix\Main\Messenger\Entity\AbstractMessage.

Ключевые требования к сообщениям:

  • Данные должны преобразовываться в JSON, то есть быть сериализуемыми.

  • Можно использовать любые поля и структуры данных.

  • Необходимо учитывать возможную задержку обработки.

Отправить сообщение

Создайте объект сообщения и отправьте в очередь.

$message = new MyMessage(...);
        $message->send('first_queue');
        

Отложить обработку сообщения

Параметр DelayParam(3600) откладывает обработку на 1 час (3600 секунд). Сообщение появится в очереди, но обработчик получит его только через указанное время.

use Bitrix\Main\Messenger\Entity\ProcessingParam\DelayParam;
        
        $message = new MyMessage(...);
        $message->send('first_queue', [new DelayParam(3600)]);
        

Отправить сообщение с дополнительным параметром

Сообщение отправляется с двумя параметрами, которые применятся при постановке в очередь:

  • DelayParam — отложить обработку на 1 час,

  • ItemIdParam — отметить сообщение идентификатором myid-123.

use Bitrix\Main\Messenger\Entity\ProcessingParam\DelayParam;
        use Bitrix\Main\Messenger\Entity\ProcessingParam\ItemIdParam;
        
        $message = new MyMessage(...);
        $message->send('first_queue', [new DelayParam(3600), new ItemIdParam('myid-123')]);
        

Создать свой параметр

Чтобы добавить новый параметр, создайте класс, реализующий интерфейс \Bitrix\Main\Messenger\Entity\ProcessingParam\ProcessingParamInterface:

interface ProcessingParamInterface
        {
            public function applyTo(MessageBox $messageBox): MessageBox;
        }
        

Интерфейс требует всего один метод — applyTo. Он принимает объект класса \Bitrix\Main\Messenger\Entity\MessageBox — обертку для сообщения, которое отправляется в брокер очередей.

Особенности работы с отложенной обработкой

Сообщения могут обрабатываться не сразу, что требует особого подхода к их формированию.

Пример. В системе есть задача с ID = 123, которая имеет связь с внешним сервисом ID = trk-042. При удалении задачи нужно уведомить внешний сервис. Если отправить только task_id, то к моменту обработки система уже удалит задачу из базы данных и внешний идентификатор будет недоступен.

Решение — включать в сообщение все необходимые данные сразу.

$message->setData([
            'task_id' => 123,
            'external_id' => 'trk-042'
        ]);
        

Такой подход гарантирует, что обработчик получит все данные даже после удаления исходной сущности. Это особенно важно для операций удаления, синхронизации с внешними системами и сценариев, где важна согласованность данных.

Предыдущая