Очереди сообщений
Очереди в Bitrix Framework выполняют задачи в фоне. Это помогает разгрузить систему, если какие-то операции требуют много времени или ресурсов.
Функционал очередей находится в альфа-версии разработки. Очереди можно использовать с версии 25.100.300 главного модуля, но без гарантий обратной совместимости. Разработчики могут изменить функционал в следующих обновлениях.
Как работают очереди
Очередь — это канал передачи данных от отправителя к обработчику. Сообщение помещается в брокер, а обработчик забирает его и выполняет задачу.
Основные компоненты:
-
сообщение — контейнер данных,
-
обработчик — класс, который обрабатывает сообщение,
-
брокер — хранилище сообщений,
-
очередь — логическая группа сообщений, привязанная к обработчику.
Быстрый старт
Чтобы использовать очереди:
-
создайте класс сообщения,
-
создайте класс обработчика сообщения,
-
добавьте информацию об очереди в конфигурацию
.settings.php.
Дополнительно можно настроить сложную логику сообщений, переключить обработку на консольный скрипт и использовать свою таблицу для хранения сообщений.
Пример настройки очереди сообщений
1. Создать класс сообщения
Сообщение должно поддерживать JSON-сериализацию.
-
Используйте поля простых типов для автоматической сериализации:
string,int,float,bool,array. -
Реализуйте метод
jsonSerialize(), если нужно передать объект. Метод вернет массив простых типов.
use Bitrix\Main\Messenger\Entity\AbstractMessage;
use Bitrix\Main\Messenger\Entity\MessageInterface;
class MyMessage extends AbstractMessage
{
public function __construct(
public readonly string $text, // string — допустимый тип, сериализуется автоматически
public readonly int $superId // int — допустимый тип, сериализуется автоматически
) {}
public function jsonSerialize(): mixed // метод вернет массив простых типов
{
return [
'text' => $this->text,
'superId' => $this->superId,
];
}
public static function createFromData(array $data): MessageInterface
{
return new static(...$data);
}
}
2. Создать класс обработчика
Обработчик получает сообщения из очереди и выполняет нужные действия.
use Bitrix\Main\Messenger\Entity\MessageInterface;
use Bitrix\Main\Messenger\Receiver\AbstractReceiver;
class MyReceiver extends AbstractReceiver
{
/**
* @param MyMessage $message
*/
protected function process(MessageInterface $message): void
{
$this->superNotifier->send($message->text);
}
}
3. Добавить конфигурацию в .settings.php
Добавьте очередь в конфигурацию модуля — в handler укажите путь к классу обработчику.
<?php
return [
... // другие настройки, в том числе брокеров
'messenger' => [
'value' => [
'queues' => [ // список очередей
'first_queue' => [ // очередь с именем 'first_queue'
'handler' => \Bitrix\MyModule\Internals\Messenger\Receiver\MyReceiver::class,
],
],
],
'readonly' => true, // защита от перезаписи настроек через API
],
];
Убедитесь, что в конфигурации есть настройки брокера. Подробнее в разделе Конфигурация очередей.
4. Отправить сообщение
Сообщение поступит в очередь и обработается в фоне.
$message = new MyMessage('Мое первое сообщение!', 12345);
$message->send('first_queue');
Конфигурация очередей
Систему очередей нужно настроить в файле .settings.php. Конфигурация может быть:
-
глобальной — в файле
bitrix/.settings.phpдля общих настроек и межмодульных очередей, -
модульной — в файле
.settings.phpмодуля, для очередей из этого модуля.
Конфигурацию очередей нужно указывать в файле .settings.php того модуля, к которому относятся очереди. В глобальном файле указываются только те очереди, которые одновременно используются в нескольких модулях.
Глобальная конфигурация
Глобальную конфигурацию настраивают, когда очереди и брокер планируют использовать в разных модулях, например, для интеграции между модулями.
По умолчанию в /bitrix/.settings.php создается секция messenger с параметрами run_mode и brokers, где описан один брокер по умолчанию default.
return [
'messenger' => [
'value' => [
'run_mode' => 'web', // web, cli
'brokers' => [
'default' => [
'type' => DbBroker::TYPE_CODE, // сейчас один тип — db
'params' => [
'table' => MessengerMessageTable::class,
]
],
'db_calendar' => [
'type' => 'db',
'params' => [
'table' => 'b_calendar_messages',
'module' => 'calendar',
],
],
],
'queues' => [
'first_queue' => [
'handler' => MyReceiver::class,
],
'google_section_sync' => [
'broker' => 'db_calendar',
'handler' => GoogleSectionReceiver::class,
],
],
],
'readonly' => true,
];
Параметры
Режим run_mode — определяет как обрабатывать сообщения:
-
web— в фоновых задачах, -
cli— в консольном скрипте.
При значении cli нужно настроить запуск скрипта вручную командой:
php bitrix.php messenger:consume
Брокеры brokers — доступные хранилища сообщений. Ключ — имя брокера, а значение — его параметры.
-
type— тип брокера. Сейчас поддерживается только значениеdb, то есть хранение в базе данных. -
params— параметры брокера. Для типаdb:-
table— таблица для хранения сообщений, -
module— модуль, с которым связана таблица. По умолчанию имеет значениеmain.
-
Брокер по умолчанию default должен всегда быть в глобальной конфигурации.
Очереди queues — список доступных очередей с указанием обработчиков.
Модульная конфигурация
В .settings.php модуля указываются очереди, используемые этим модулем.
return [
...
'messenger' => [
'value' => [
'queues' => [
'google_section_sync' => [
'broker' => 'calendar_db',
'handler' => \Bitrix\Calendar\Synchronization\Internals\Messenger\Receiver\GoogleSectionReceiver::class,
],
'google_event_sync' => [
'handler' => \Bitrix\Calendar\Synchronization\Internals\Messenger\Receiver\GoogleEventReceiver::class,
],
],
],
'readonly' => true,
],
];
Параметры
Брокеры brokers — доступные хранилища сообщений. Структура объекта полностью идентична глобальной конфигурации.
Очереди queues — доступные очереди. Ключ — идентификатор очереди, а значение — параметры очереди:
-
handler— класс-обработчик сообщений, -
broker— брокер очереди. Если его не указать, используется брокер по умолчанию, -
retry_strategy— настройки повторной обработки сообщений в случае неудачной попытки обработки, -
module— модуль, к которому относится очередь. Всегда имеет значение идентификатора модуля.
Своя таблица для сообщений
При работе с DBBroker сообщения сохраняются в базу данных. По умолчанию используется таблица b_main_messenger_message, но можно создать отдельную таблицу для своего модуля.
-
Создайте класс-наследник
MessengerMessageTable.use Bitrix\Main\Messenger\Internals\Storage\Db\Model\MessengerMessageTable; class CalendarMessagesTable extends MessengerMessageTable { public static function getTableName(): string { return 'b_calendar_messenger_message'; } } -
Укажите свою таблицу в конфигурации брокера.
return [ 'messenger' => [ 'value' => [ 'brokers' => [ 'db_calendar' => [ 'type' => 'db', 'params' => [ 'table' => CalendarMessagesTable::class, ], ], ], ], 'readonly' => true, ], ]; -
Используйте имя брокера
db_calendarв настройках очередей. Сообщения будут сохраняться в вашу таблицу, а работа с ними не изменится.
За создание и удаление собственной таблицы отвечает модуль, к которому эта таблица относится. Поэтому обязательно добавьте запрос создания таблицы в установщик модуля.
Повторная обработка сообщений
Сообщение может не обработаться с первой попытки, например, из-за временной недоступности сервиса. Тогда сообщение нужно обработать повторно.
Используйте повторную обработку:
-
при работе с внешними API,
-
при нестабильных соединениях,
-
для ресурсоемких задач.
Параметры
Для каждой очереди можно указать в конфигурации параметр retry_strategy с настройками повторной обработки.
-
max_retries— максимальное количество попыток. По умолчанию —3. -
delay— базовая задержка между попытками в секундах. По умолчанию —1. -
multiplier— множитель задержки между попытками. По умолчанию —2. -
max_delay— максимальное значение задержки между повторами в секундах. По умолчанию —0, то есть без ограничения.
Пример конфигурации
return [
'messenger' => [
'value' => [
'queues' => [
'second_queue' => [
'handler' => MyReceiver::class,
'retry_strategy' => [
'max_retries' => 10,
'delay' => 5, // базовая задержка — пять секунд
'multiplier' => 2, // удваиваем с каждой попыткой
'max_delay' => 300 // максимум пять минут между попытками
],
],
],
],
'readonly' => true,
],
];
-
Первая повторная попытка сработает через 5 секунд.
-
Вторая — через 10 секунд.
-
Третья — через 20 секунд, и так далее.
-
Шестая попытка не выполнится, так как через 320 секунд сработает ограничение
max_delay.
Запуск обработчика очередей
Система поддерживает два режима обработки.
-
web— обработка через фоновые задачи. Работает по умолчанию и без дополнительных настроек. -
cli— консольный режим. Требует настройки, но предоставляет больше возможностей и контроля. Для production-окружения рекомендуется использовать cli-режим с супервизором.
Консольный режим
Чтобы использовать консольный режим, установите в конфигурации 'run_mode' => 'cli'.
Основная команда для обработки:
php bitrix.php messenger:consume
Она запускает обработчики для всех очередей в бесконечном цикле.
Параметры команды:
-
Выбор очередей. Можно указать несколько:
php bitrix.php messenger:consume first_queue second_queue -
Дополнительные опции:
// Обрабатывает очередь second_queue 60 секунд с паузой 2 секунды между проверками php bitrix.php messenger:consume second_queue -t 60 —sleep 2-
-t,—time-limit— ограничить время работы, в секундах, -
—sleep— установить паузу между итерациями. По умолчанию — 1 секунда.
-
Обработчики сообщений
Каждая очередь использует свой обработчик. Он должен:
-
обработать сообщение,
-
выбросить исключение при ошибке.
Чтобы создать обработчик, наследуйтесь от класса AbstractReceiver и реализуйте метод process. Метод возвращает void, когда сообщение успешно обработано, или бросает исключение.
class SomeReceiver extends AbstractReceiver
{
protected function process(MessageInterface $message): void
{
if ($message instanceof SectionMessage) {
$section = $this->sectionRepository->getById($message->getSectionId());
(new SendSectionCommand($section))->run();
return;
}
throw new UnprocessableMessageException($message->getId(), $this->queueId);
}
}
Типы исключений
Используйте разные исключения для разных сценариев.
-
UnprocessableMessageException— сообщение не подходит для этого обработчика. Пример: в очередь для обработки задач пришло сообщение о пользователе. -
UnrecoverableMessageException— сообщение устарело и повторная обработка не нужна. Пример: сообщение содержитIDудаленного объекта. -
RecoverableMessageException— временная ошибка, стоит повторить попытку. Можно указать задержку в секундах через методgetRetryDelay(). Если возвращаетnull, используются настройки повтора из конфигурации очереди. -
Другие исключения — критические ошибки обработки. Система применит к ним настройки повторной обработки
retry_strategyиз конфигурации очереди.
Пример обработчика сообщений в консольном режиме
Создается обработчик SomeReceiver для конкретной очереди. При получении сообщения выводится его ID. В cli-режиме запускаем воркер $worker. Воркер в бесконечном цикле проверяет очередь и передает сообщения обработчику.
class SomeReceiver extends \Bitrix\Main\Messenger\Receiver\AbstractReceiver
{
protected function process(MessageInterface $message): void
{
echo "Message {$message->getId()} received\n";
}
}
// Для cli-режима нужно запускать воркер вручную:
$worker = new \Bitrix\Main\Messenger\Internals\Worker();
$worker->process();
Собственная реализация
Для особых сценариев обработки можно реализовать собственный обработчик через интерфейс ReceiverInterface.
interface ReceiverInterface
{
/**
* Основной метод обработки
* @throws ReceiverException
*/
public function run(): void;
// Установка лимита сообщений за одну итерацию
public function setLimit(int $limit): self;
// Назначение очереди
public function setQueueId(string $queueId): self;
// Выбор брокера
public function setBroker(BrokerInterface $broker): self;
}
При реализации интерфейса вы берете на себя полный контроль над процессом обработки. Необходимо самостоятельно управлять:
-
получением сообщений из брокера,
-
подтверждением успешной обработки
ack, -
обработкой ошибок
reject.
Методы интерфейса:
-
run()— получает сообщения из очереди, выполняет обработку и выбрасываетReceiverExceptionпри неудачной обработке. -
setLimit(int $limit)— устанавливает лимит сообщений для одновременной обработки, например,setLimit(10)— обрабатывать по 10 сообщений. -
setQueueId(string $queueId)— назначает очередь для обработки. -
setBroker(BrokerInterface $broker)— подключает брокера сообщений, например,setBroker(new RedisBroker()).
Работа с сообщениями
Сообщения — это контейнеры данных для передачи через очередь. Они должны реализовывать \Bitrix\Main\Messenger\Entity\MessageInterface, но лучше расширять класс \Bitrix\Main\Messenger\Entity\AbstractMessage.
Ключевые требования к сообщениям:
-
Данные должны преобразовываться в JSON, то есть быть сериализуемыми.
-
Можно использовать любые поля и структуры данных.
-
Необходимо учитывать возможную задержку обработки.
Отправить сообщение
Создайте объект сообщения и отправьте в очередь.
$message = new MyMessage(...);
$message->send('first_queue');
Отложить обработку сообщения
Параметр DelayParam(3600) откладывает обработку на 1 час (3600 секунд). Сообщение появится в очереди, но обработчик получит его только через указанное время.
use Bitrix\Main\Messenger\Entity\ProcessingParam\DelayParam;
$message = new MyMessage(...);
$message->send('first_queue', [new DelayParam(3600)]);
Отправить сообщение с дополнительным параметром
Сообщение отправляется с двумя параметрами, которые применятся при постановке в очередь:
-
DelayParam— отложить обработку на 1 час, -
ItemIdParam— отметить сообщение идентификаторомmyid-123.
use Bitrix\Main\Messenger\Entity\ProcessingParam\DelayParam;
use Bitrix\Main\Messenger\Entity\ProcessingParam\ItemIdParam;
$message = new MyMessage(...);
$message->send('first_queue', [new DelayParam(3600), new ItemIdParam('myid-123')]);
Создать свой параметр
Чтобы добавить новый параметр, создайте класс, реализующий интерфейс \Bitrix\Main\Messenger\Entity\ProcessingParam\ProcessingParamInterface:
interface ProcessingParamInterface
{
public function applyTo(MessageBox $messageBox): MessageBox;
}
Интерфейс требует всего один метод — applyTo. Он принимает объект класса \Bitrix\Main\Messenger\Entity\MessageBox — обертку для сообщения, которое отправляется в брокер очередей.
Особенности работы с отложенной обработкой
Сообщения могут обрабатываться не сразу, что требует особого подхода к их формированию.
Пример. В системе есть задача с ID = 123, которая имеет связь с внешним сервисом ID = trk-042. При удалении задачи нужно уведомить внешний сервис. Если отправить только task_id, то к моменту обработки система уже удалит задачу из базы данных и внешний идентификатор будет недоступен.
Решение — включать в сообщение все необходимые данные сразу.
$message->setData([
'task_id' => 123,
'external_id' => 'trk-042'
]);
Такой подход гарантирует, что обработчик получит все данные даже после удаления исходной сущности. Это особенно важно для операций удаления, синхронизации с внешними системами и сценариев, где важна согласованность данных.