• Модуль: messageservice
  • Путь к файлу: ~/bitrix/modules/messageservice/lib/queue.php
  • Класс: BitrixMessageServiceQueue
  • Вызов: Queue::sendMessages
static function sendMessages()
{
	$lockTag = 'b_messageservice_message';
	if (!Application::getConnection()->lock($lockTag))
	{
		return "";
	}

	$counts = InternalEntityMessageTable::getAllDailyCount();

	$limit = abs((int)ConfigOption::get("messageservice", "queue_limit", 5));
	if (!$limit)
	{
		$limit = 5;
	}

	$query =
		MessageTable::query()
			->addSelect('ID')
			->addSelect('TYPE')
			->addSelect('SENDER_ID')
			->addSelect('AUTHOR_ID')
			->addSelect('MESSAGE_FROM')
			->addSelect('MESSAGE_TO')
			->addSelect('MESSAGE_HEADERS')
			->addSelect('MESSAGE_BODY')
			->addSelect('EXTERNAL_ID')
			->where(Query::filter()
				->logic('or')
				->where(Query::filter()
					->logic('and')
					->where('SUCCESS_EXEC', 'N')
					->where(Query::filter()
						->logic('or')
						->where('NEXT_EXEC', '<', new DateTime())
						->whereNull('NEXT_EXEC')
					)
				)
				->where(Query::filter()
					->logic('and')
					->where('SUCCESS_EXEC', 'P')
					->where('NEXT_EXEC', '<', (new DateTime())->add('-2 MINUTE'))
				)
			)
			->addOrder('ID')
			->setLimit($limit)
	;

	if (defined('BX_CLUSTER_GROUP'))
	{
		$query->where('CLUSTER_GROUP', BX_CLUSTER_GROUP);
	}
	$messageFieldsList = $query->fetchAll();

	if (!empty($messageFieldsList))
	{
		$idList = array_column($messageFieldsList, 'ID');
		MessageTable::updateMulti(
			$idList,
			[
				'SUCCESS_EXEC' => 'P',
				'NEXT_EXEC' => (new DateTime())->add('+2 MINUTE'),
			],
			true
		);
	}

	$nextDay = static::getNextExecTime();
	foreach ($messageFieldsList as $messageFields)
	{
		$serviceId = $messageFields['SENDER_ID'] . ':' . $messageFields['MESSAGE_FROM'];
		$message = Message::createFromFields($messageFields);

		if (!isset($counts[$serviceId]))
		{
			$counts[$serviceId] = 0;
		}

		$sender = $message->getSender();
		if ($sender)
		{
			$limit = SenderLimitation::getDailyLimit($sender->getId(), $messageFields['MESSAGE_FROM']);
			$current = $counts[$serviceId];

			if ($limit > 0 && $current >= $limit)
			{
				$message->update([
					'STATUS_ID' => MessageStatus::DEFERRED,
					'NEXT_EXEC' => $nextDay,
				]);
				continue;
			}
			++$counts[$serviceId];
		}

		try
		{
			$result = static::sendMessage($messageFields);
			$message->updateWithSendResult($result, $nextDay);
		}
		catch (Throwable $e)
		{
			Application::getInstance()->getExceptionHandler()->writeToLog($e);

			$message->update([
				'STATUS_ID' => MessageStatus::EXCEPTION,
				'SUCCESS_EXEC' => 'E',
				'DATE_EXEC' => new DateTime(),
				'EXEC_ERROR' => $e->getMessage(),
			]);
			break;
		}
	}

	Application::getConnection()->unlock($lockTag);
	return null;
}