- Модуль: messageservice
- Путь к файлу: ~/bitrix/modules/messageservice/lib/queue.php
- Класс: BitrixMessageServiceQueue
- Вызов: Queue::sendMessages
static function sendMessages()
{
$lockTag = 'b_messageservice_message';
if (!Application::getConnection()->lock($lockTag))
{
return "";
}
$counts = InternalEntityMessageTable::getAllDailyCount();
$limit = abs((int)ConfigOption::get("messageservice", "queue_limit", 5));
if (!$limit)
{
$limit = 5;
}
$query =
MessageTable::query()
->addSelect('ID')
->addSelect('TYPE')
->addSelect('SENDER_ID')
->addSelect('AUTHOR_ID')
->addSelect('MESSAGE_FROM')
->addSelect('MESSAGE_TO')
->addSelect('MESSAGE_HEADERS')
->addSelect('MESSAGE_BODY')
->addSelect('EXTERNAL_ID')
->where(Query::filter()
->logic('or')
->where(Query::filter()
->logic('and')
->where('SUCCESS_EXEC', 'N')
->where(Query::filter()
->logic('or')
->where('NEXT_EXEC', '<', new DateTime())
->whereNull('NEXT_EXEC')
)
)
->where(Query::filter()
->logic('and')
->where('SUCCESS_EXEC', 'P')
->where('NEXT_EXEC', '<', (new DateTime())->add('-2 MINUTE'))
)
)
->addOrder('ID')
->setLimit($limit)
;
if (defined('BX_CLUSTER_GROUP'))
{
$query->where('CLUSTER_GROUP', BX_CLUSTER_GROUP);
}
$messageFieldsList = $query->fetchAll();
if (!empty($messageFieldsList))
{
$idList = array_column($messageFieldsList, 'ID');
MessageTable::updateMulti(
$idList,
[
'SUCCESS_EXEC' => 'P',
'NEXT_EXEC' => (new DateTime())->add('+2 MINUTE'),
],
true
);
}
$nextDay = static::getNextExecTime();
foreach ($messageFieldsList as $messageFields)
{
$serviceId = $messageFields['SENDER_ID'] . ':' . $messageFields['MESSAGE_FROM'];
$message = Message::createFromFields($messageFields);
if (!isset($counts[$serviceId]))
{
$counts[$serviceId] = 0;
}
$sender = $message->getSender();
if ($sender)
{
$limit = SenderLimitation::getDailyLimit($sender->getId(), $messageFields['MESSAGE_FROM']);
$current = $counts[$serviceId];
if ($limit > 0 && $current >= $limit)
{
$message->update([
'STATUS_ID' => MessageStatus::DEFERRED,
'NEXT_EXEC' => $nextDay,
]);
continue;
}
++$counts[$serviceId];
}
try
{
$result = static::sendMessage($messageFields);
$message->updateWithSendResult($result, $nextDay);
}
catch (Throwable $e)
{
Application::getInstance()->getExceptionHandler()->writeToLog($e);
$message->update([
'STATUS_ID' => MessageStatus::EXCEPTION,
'SUCCESS_EXEC' => 'E',
'DATE_EXEC' => new DateTime(),
'EXEC_ERROR' => $e->getMessage(),
]);
break;
}
}
Application::getConnection()->unlock($lockTag);
return null;
}