• Модуль: pull
  • Путь к файлу: ~/bitrix/modules/pull/lib/protobuftransport.php
  • Класс: BitrixPullProtobufTransport
  • Вызов: ProtobufTransport::sendMessages
static function sendMessages(array $messages, array $options = []): Result
{
	$result = new Result();
	if(!Config::isProtobufUsed())
	{
		throw new SystemException("Sending messages in protobuf format is not supported by queue server");
	}

	$protobufMessages = static::convertMessages($messages);
	$requests = static::createRequests($protobufMessages);
	$requestBatches = static::createRequestBatches($requests);

	$queueServerUrl = $options['serverUrl'] ?? Config::getPublishUrl();
	$queueServerUrl = CHTTP::urlAddParams($queueServerUrl, ["binaryMode" => "true"]);
	foreach ($requestBatches as $requestBatch)
	{
		$urlWithSignature = $queueServerUrl;
		$httpClient = new HttpClient(["streamTimeout" => 1]);
		$bodyStream = $requestBatch->toStream();
		if(CPullOptions::IsServerShared())
		{
			$signature = CPullChannel::GetSignature($bodyStream->getContents());
			$urlWithSignature = CHTTP::urlAddParams($urlWithSignature, ["signature" => $signature]);
		}

		$httpClient->disableSslVerification();
		$sendResult = $httpClient->query(HttpClient::HTTP_POST, $urlWithSignature, $bodyStream);
		if (!$sendResult)
		{
			$errorCode = array_key_first($httpClient->getError());
			$errorMsg = $httpClient->getError()[$errorCode];
			$result->addError(new BitrixMainError($errorMsg, $errorCode));
		}
	}

	return $result;
}