<?php
namespace Yurun\Util\YurunHttp\Http2;
use Swoole\Coroutine;
use Swoole\Coroutine\Channel;
use Yurun\Util\YurunHttp\Attributes;
use Yurun\Util\YurunHttp\Http\Psr7\Uri;
class SwooleClient implements IHttp2Client
{
private $host;
private $port;
private $ssl;
private $handler;
private $http2Client;
private $recvChannels = [];
private $serverPushQueueLength = 16;
private $requestMap = [];
private $timeout;
private $recvCo;
public function __construct($host, $port, $ssl, $handler = null)
{
$this->host = $host;
$this->port = $port;
$this->ssl = $ssl;
if($handler)
{
$this->handler = $handler;
}
else
{
$this->handler = new \Yurun\Util\YurunHttp\Handler\Swoole;
}
}
public function connect()
{
$client = $this->handler->getHttp2ConnectionManager()->getConnection($this->host, $this->port, $this->ssl);
if($client)
{
$this->http2Client = $client;
if($this->timeout)
{
$this->http2Client->set([
'timeout' => $this->timeout,
]);
}
return true;
}
else
{
return false;
}
}
public function getHttpHandler()
{
return $this->handler;
}
public function close()
{
$this->http2Client = null;
$this->handler->getHttp2ConnectionManager()->closeConnection($this->host, $this->port, $this->ssl);
foreach($this->recvChannels as $channel)
{
$channel->close();
}
$this->recvChannels = [];
}
public function send($request, $pipeline = false, $dropRecvResponse = false)
{
if('2.0' !== $request->getProtocolVersion())
{
$request = $request->withProtocolVersion('2.0');
}
$uri = $request->getUri();
if($this->host != $uri->getHost() || $this->port != Uri::getServerPort($uri) || $this->ssl != ('https' === $uri->getScheme() || 'wss' === $uri->getScheme()))
{
throw new \RuntimeException(sprintf('Current http2 connection instance just support %s://%s:%s, does not support %s', $this->ssl ? 'https' : 'http', $this->host, $this->port, $uri->__toString()));
}
$request = $request->withAttribute(Attributes::HTTP2_PIPELINE, $pipeline);
$this->handler->buildRequest($request, $this->http2Client, $http2Request);
$streamId = $this->http2Client->send($http2Request);
if(!$streamId)
{
$this->close();
}
if(!$dropRecvResponse)
{
$this->recvChannels[$streamId] = new Channel(1);
$this->requestMap[$streamId] = $request;
}
return $streamId;
}
public function write($streamId, $data, $end = false)
{
return $this->http2Client->write($streamId, $data, $end);
}
public function end($streamId)
{
return $this->http2Client->write($streamId, '', true);
}
public function recv($streamId = -1, $timeout = null)
{
if(!$this->recvCo || (true !== $this->recvCo && !Coroutine::exists($this->recvCo)))
{
$this->startRecvCo();
}
if(isset($this->recvChannels[$streamId]))
{
$channel = $this->recvChannels[$streamId];
}
else
{
$this->recvChannels[$streamId] = $channel = new Channel(-1 === $streamId ? $this->serverPushQueueLength : 1);
}
$swooleResponse = $channel->pop($timeout);
if(-1 !== $streamId)
{
unset($this->recvChannels[$streamId]);
$channel->close();
}
if(isset($this->requestMap[$streamId]))
{
$request = $this->requestMap[$streamId];
unset($this->requestMap[$streamId]);
}
else
{
$request = null;
}
$response = $this->handler->buildHttp2Response($request, $this->http2Client, $swooleResponse);
return $response;
}
public function isConnected()
{
return null !== $this->http2Client;
}
private function startRecvCo()
{
if(!$this->isConnected())
{
return false;
}
$this->recvCo = true;
return $this->recvCo = Coroutine::create(function(){
while($this->isConnected())
{
if($this->timeout > 0)
{
$swooleResponse = $this->http2Client->recv($this->timeout);
}
else
{
$swooleResponse = $this->http2Client->recv();
}
if(!$swooleResponse)
{
$this->close();
return;
}
$streamId = $swooleResponse->streamId;
if(isset($this->recvChannels[$streamId]) || (0 === $streamId % 2 && isset($this->recvChannels[$streamId = -1])))
{
$this->recvChannels[$streamId]->push($swooleResponse);
}
}
});
}
public function getHost()
{
return $this->host;
}
public function getPort()
{
return $this->port;
}
public function isSSL()
{
return $this->ssl;
}
public function getRecvingCount()
{
return count($this->recvChannels);
}
public function getServerPushQueueLength()
{
return $this->serverPushQueueLength;
}
public function setServerPushQueueLength($serverPushQueueLength)
{
$this->serverPushQueueLength = $serverPushQueueLength;
return $this;
}
public function setTimeout($timeout)
{
$this->timeout = $timeout;
if($this->http2Client)
{
$this->http2Client->set([
'timeout' => $timeout,
]);
}
}
public function getTimeout()
{
return $this->timeout;
}
}