static function Get($userId, $cache = true, $reOpen = false, $channelType = self::TYPE_PRIVATE)
{
global $DB, $CACHE_MANAGER;
$nginxStatus = CPullOptions::GetQueueServerStatus();
$arResult = false;
$userId = intval($userId);
$cache_id="b_pchc_".$userId.'_'.$channelType;
if ($nginxStatus && $cache)
{
$res = $CACHE_MANAGER->Read(self::CHANNEL_TTL, $cache_id, self::CACHE_TABLE);
if ($res)
{
$arResult = $CACHE_MANAGER->Get($cache_id);
}
}
if(!is_array($arResult) || !isset($arResult['CHANNEL_ID']) || ($userId > 0 && !isset($arResult['CHANNEL_PUBLIC_ID'])))
{
CTimeZone::Disable();
$strSql = "
SELECT C.CHANNEL_ID, C.CHANNEL_PUBLIC_ID, C.CHANNEL_TYPE, ".$DB->DatetimeToTimestampFunction('C.DATE_CREATE')." DATE_CREATE, C.LAST_ID
FROM b_pull_channel C
WHERE C.USER_ID = ".$userId." AND C.CHANNEL_TYPE = '".$DB->ForSQL($channelType)."'
";
CTimeZone::Enable();
$res = $DB->Query($strSql);
$arResult = $res->Fetch();
if ($arResult && $nginxStatus && $cache)
{
self::SaveToCache($cache_id, $arResult);
}
}
if (empty($arResult) || intval($arResult['DATE_CREATE'])+ self::CHANNEL_TTL < time() || ($userId > 0 && $arResult['CHANNEL_PUBLIC_ID'] == ''))
{
$arChannel = [
'CHANNEL_ID' => self::GetNewChannelId(),
'CHANNEL_PUBLIC_ID' => $userId>0? self::GetNewChannelId('public'): '',
'CHANNEL_TYPE' => $channelType,
'DATE_CREATE' => time(),
'LAST_ID' => 0,
];
self::SaveToCache($cache_id, $arChannel);
if (isset($arResult['CHANNEL_ID']))
{
$DB->Query("DELETE FROM b_pull_channel WHERE CHANNEL_ID = '".$DB->ForSQL($arResult['CHANNEL_ID'])."'");
$DB->Query("DELETE FROM b_pull_channel WHERE CHANNEL_PUBLIC_ID = '".$DB->ForSQL($arResult['CHANNEL_PUBLIC_ID'])."'");
}
$arChannelData = self::Add($userId, $arChannel['CHANNEL_ID'], $arChannel['CHANNEL_PUBLIC_ID'], $arChannel['CHANNEL_TYPE']);
if (!$arChannelData)
{
return false;
}
$channelId = $arChannelData['CHANNEL_ID'];
$publicChannelId = $arChannelData['CHANNEL_PUBLIC_ID'];
if (!is_string($channelId) || $channelId === '')
{
return false;
}
if (isset($arResult['CHANNEL_ID']) && $channelId != $arResult['CHANNEL_ID'])
{
$params = [
'action' => $channelType != self::TYPE_PRIVATE? 'reconnect': 'get_config',
'channel' => [
'id' => self::SignChannel($arResult['CHANNEL_ID']),
'type' => $channelType,
],
];
if ($userId == 0)
{
$params['new_channel'] = [
'id' => self::SignChannel($channelId),
'start' => date('c', time()),
'end' => date('c', time()+ self::CHANNEL_TTL),
'type' => $channelType,
];
}
$arMessage = [
'module_id' => 'pull',
'command' => 'channel_expire',
'params' => $params
];
CPullStack::AddByChannel($arResult['CHANNEL_ID'], $arMessage);
}
return [
'CHANNEL_ID' => $channelId,
'CHANNEL_PUBLIC_ID' => $publicChannelId,
'CHANNEL_TYPE' => $channelType,
'CHANNEL_DT' => time(),
'LAST_ID' => 0,
];
}
else
{
if ($nginxStatus && $reOpen && CPullOptions::GetQueueServerVersion() < 3 && !CPullOptions::IsServerShared())
{
self::Send($arResult['CHANNEL_ID'], BitrixPullCommon::jsonEncode(Array(
'module_id' => 'pull',
'command' => 'reopen',
'expiry' => 1,
'params' => Array(),
'extra' => Array(
'server_time' => date('c'),
'server_name' => COption::GetOptionString('main', 'server_name', $_SERVER['SERVER_NAME']),
'revision_web' => PULL_REVISION_WEB,
'revision_mobile' => PULL_REVISION_MOBILE,
),
)));
}
return [
'CHANNEL_ID' => $arResult['CHANNEL_ID'],
'CHANNEL_PUBLIC_ID' => $arResult['CHANNEL_PUBLIC_ID'],
'CHANNEL_TYPE' => $arResult['CHANNEL_TYPE'],
'CHANNEL_DT' => $arResult['DATE_CREATE'],
'LAST_ID' => $arResult['LAST_ID'],
];
}
}