<?php
/**
* This file is part of workerman.
*
* Licensed under The MIT License
* For full copyright and license information, please see the MIT-LICENSE.txt
* Redistributions of files must retain the above copyright notice.
*
* @author walkor<walkor@workerman.net>
* @copyright walkor<walkor@workerman.net>
* @link http://www.workerman.net/
* @license http://www.opensource.org/licenses/mit-license.php MIT License
*/
namespace GatewayWorker;
use GatewayWorker\Lib\Context;
use Workerman\Connection\TcpConnection;
use Workerman\Worker;
use Workerman\Lib\Timer;
use Workerman\Autoloader;
use Workerman\Connection\AsyncTcpConnection;
use GatewayWorker\Protocols\GatewayProtocol;
/**
*
* Gateway,基于Worker 开发
* 用于转发客户端的数据给Worker处理,以及转发Worker的数据给客户端
*
* @author walkor<walkor@workerman.net>
*
*/
class Gateway extends Worker
{
const VERSION = '3.0.12';
public $lanIp = '127.0.0.1';
public $lanPort = 0;
public $startPort = 2000;
public $registerAddress = '127.0.0.1:1236';
public $reloadable = false;
public $pingInterval = 0;
public $pingNotResponseLimit = 0;
public $pingData = '';
public $secretKey = '';
public $router = null;
public $sendToWorkerBufferSize = 10240000;
public $sendToClientBufferSize = 1024000;
public $protocolAccelerate = false;
protected $_clientConnections = array();
protected $_uidConnections = array();
protected $_groupConnections = array();
protected $_workerConnections = array();
protected $_innerTcpWorker = null;
protected $_onWorkerStart = null;
protected $_onConnect = null;
protected $_onMessage = null;
protected $_onClose = null;
protected $_onWorkerStop = null;
protected $_startTime = 0;
protected $_gatewayPort = 0;
protected static $_connectionIdRecorder = 0;
const PERSISTENCE_CONNECTION_PING_INTERVAL = 25;
public function __construct($socket_name, $context_option = array())
{
parent::__construct($socket_name, $context_option);
$this->_gatewayPort = substr(strrchr($socket_name,':'),1);
$this->router = array("\\GatewayWorker\\Gateway", 'routerBind');
$backtrace = debug_backtrace();
$this->_autoloadRootPath = dirname($backtrace[0]['file']);
}
public function run()
{
$this->_onWorkerStart = $this->onWorkerStart;
$this->onWorkerStart = array($this, 'onWorkerStart');
$this->_onConnect = $this->onConnect;
$this->onConnect = array($this, 'onClientConnect');
$this->onMessage = array($this, 'onClientMessage');
$this->_onClose = $this->onClose;
$this->onClose = array($this, 'onClientClose');
$this->_onWorkerStop = $this->onWorkerStop;
$this->onWorkerStop = array($this, 'onWorkerStop');
if (!is_array($this->registerAddress)) {
$this->registerAddress = array($this->registerAddress);
}
$this->_startTime = time();
parent::run();
}
public function onClientMessage($connection, $data)
{
$connection->pingNotResponseCount = -1;
$this->sendToWorker(GatewayProtocol::CMD_ON_MESSAGE, $connection, $data);
}
public function onClientConnect($connection)
{
$connection->id = self::generateConnectionId();
$connection->gatewayHeader = array(
'local_ip' => ip2long($this->lanIp),
'local_port' => $this->lanPort,
'client_ip' => ip2long($connection->getRemoteIp()),
'client_port' => $connection->getRemotePort(),
'gateway_port' => $this->_gatewayPort,
'connection_id' => $connection->id,
'flag' => 0,
);
$connection->session = '';
$connection->pingNotResponseCount = -1;
$connection->maxSendBufferSize = $this->sendToClientBufferSize;
$this->_clientConnections[$connection->id] = $connection;
if ($this->_onConnect) {
call_user_func($this->_onConnect, $connection);
} elseif ($connection->protocol === '\Workerman\Protocols\Websocket') {
$connection->onWebSocketConnect = array($this, 'onWebsocketConnect');
}
$this->sendToWorker(GatewayProtocol::CMD_ON_CONNECT, $connection);
}
public function onWebsocketConnect($connection, $http_buffer)
{
$this->sendToWorker(GatewayProtocol::CMD_ON_WEBSOCKET_CONNECT, $connection, array('get' => $_GET, 'server' => $_SERVER, 'cookie' => $_COOKIE));
}
protected function generateConnectionId()
{
$max_unsigned_int = 4294967295;
if (self::$_connectionIdRecorder >= $max_unsigned_int) {
self::$_connectionIdRecorder = 0;
}
while(++self::$_connectionIdRecorder <= $max_unsigned_int) {
if(!isset($this->_clientConnections[self::$_connectionIdRecorder])) {
break;
}
}
return self::$_connectionIdRecorder;
}
protected function sendToWorker($cmd, $connection, $body = '')
{
$gateway_data = $connection->gatewayHeader;
$gateway_data['cmd'] = $cmd;
$gateway_data['body'] = $body;
$gateway_data['ext_data'] = $connection->session;
if ($this->_workerConnections) {
$worker_connection = call_user_func($this->router, $this->_workerConnections, $connection, $cmd, $body);
if (false === $worker_connection->send($gateway_data)) {
$msg = "SendBufferToWorker fail. May be the send buffer are overflow. See http://wiki.workerman.net/Error2";
static::log($msg);
return false;
}
} else {
$time_diff = 2;
if (time() - $this->_startTime >= $time_diff) {
$msg = 'SendBufferToWorker fail. The connections between Gateway and BusinessWorker are not ready. See http://wiki.workerman.net/Error3';
static::log($msg);
}
$connection->destroy();
return false;
}
return true;
}
public static function routerRand($worker_connections, $client_connection, $cmd, $buffer)
{
return $worker_connections[array_rand($worker_connections)];
}
public static function routerBind($worker_connections, $client_connection, $cmd, $buffer)
{
if (!isset($client_connection->businessworker_address) || !isset($worker_connections[$client_connection->businessworker_address])) {
$client_connection->businessworker_address = array_rand($worker_connections);
}
return $worker_connections[$client_connection->businessworker_address];
}
public function onClientClose($connection)
{
$this->sendToWorker(GatewayProtocol::CMD_ON_CLOSE, $connection);
unset($this->_clientConnections[$connection->id]);
if (!empty($connection->uid)) {
$uid = $connection->uid;
unset($this->_uidConnections[$uid][$connection->id]);
if (empty($this->_uidConnections[$uid])) {
unset($this->_uidConnections[$uid]);
}
}
if (!empty($connection->groups)) {
foreach ($connection->groups as $group) {
unset($this->_groupConnections[$group][$connection->id]);
if (empty($this->_groupConnections[$group])) {
unset($this->_groupConnections[$group]);
}
}
}
if ($this->_onClose) {
call_user_func($this->_onClose, $connection);
}
}
public function onWorkerStart()
{
$this->lanPort = $this->startPort + $this->id;
if ($this->pingInterval > 0) {
$timer_interval = $this->pingNotResponseLimit > 0 ? $this->pingInterval / 2 : $this->pingInterval;
Timer::add($timer_interval, array($this, 'ping'));
}
if ($this->lanIp !== '127.0.0.1') {
Timer::add(self::PERSISTENCE_CONNECTION_PING_INTERVAL, array($this, 'pingBusinessWorker'));
}
if (!class_exists('\Protocols\GatewayProtocol')) {
class_alias('GatewayWorker\Protocols\GatewayProtocol', 'Protocols\GatewayProtocol');
}
$this->_innerTcpWorker = new Worker("GatewayProtocol://{$this->lanIp}:{$this->lanPort}");
$this->_innerTcpWorker->listen();
$this->_innerTcpWorker->name = 'GatewayInnerWorker';
Autoloader::setRootPath($this->_autoloadRootPath);
$this->_innerTcpWorker->onMessage = array($this, 'onWorkerMessage');
$this->_innerTcpWorker->onConnect = array($this, 'onWorkerConnect');
$this->_innerTcpWorker->onClose = array($this, 'onWorkerClose');
$this->registerAddress();
if ($this->_onWorkerStart) {
call_user_func($this->_onWorkerStart, $this);
}
}
public function onWorkerConnect($connection)
{
$connection->maxSendBufferSize = $this->sendToWorkerBufferSize;
$connection->authorized = $this->secretKey ? false : true;
}
public function onWorkerMessage($connection, $data)
{
$cmd = $data['cmd'];
if (empty($connection->authorized) && $cmd !== GatewayProtocol::CMD_WORKER_CONNECT && $cmd !== GatewayProtocol::CMD_GATEWAY_CLIENT_CONNECT) {
self::log("Unauthorized request from " . $connection->getRemoteIp() . ":" . $connection->getRemotePort());
$connection->close();
return;
}
switch ($cmd) {
case GatewayProtocol::CMD_WORKER_CONNECT:
$worker_info = json_decode($data['body'], true);
if ($worker_info['secret_key'] !== $this->secretKey) {
self::log("Gateway: Worker key does not match ".var_export($this->secretKey, true)." !== ". var_export($this->secretKey));
$connection->close();
return;
}
$key = $connection->getRemoteIp() . ':' . $worker_info['worker_key'];
if (isset($this->_workerConnections[$key])) {
self::log("Gateway: Worker->name conflict. Key:{$key}");
$connection->close();
return;
}
$connection->key = $key;
$this->_workerConnections[$key] = $connection;
$connection->authorized = true;
return;
case GatewayProtocol::CMD_GATEWAY_CLIENT_CONNECT:
$worker_info = json_decode($data['body'], true);
if ($worker_info['secret_key'] !== $this->secretKey) {
self::log("Gateway: GatewayClient key does not match ".var_export($this->secretKey, true)." !== ".var_export($this->secretKey, true));
$connection->close();
return;
}
$connection->authorized = true;
return;
case GatewayProtocol::CMD_SEND_TO_ONE:
if (isset($this->_clientConnections[$data['connection_id']])) {
$this->_clientConnections[$data['connection_id']]->send($data['body']);
}
return;
case GatewayProtocol::CMD_KICK:
if (isset($this->_clientConnections[$data['connection_id']])) {
$this->_clientConnections[$data['connection_id']]->close($data['body']);
}
return;
case GatewayProtocol::CMD_DESTROY:
if (isset($this->_clientConnections[$data['connection_id']])) {
$this->_clientConnections[$data['connection_id']]->destroy();
}
return;
case GatewayProtocol::CMD_SEND_TO_ALL:
$raw = (bool)($data['flag'] & GatewayProtocol::FLAG_NOT_CALL_ENCODE);
$body = $data['body'];
if (!$raw && $this->protocolAccelerate && $this->protocol) {
$body = $this->preEncodeForClient($body);
$raw = true;
}
$ext_data = $data['ext_data'] ? json_decode($data['ext_data'], true) : '';
if (isset($ext_data['connections'])) {
foreach ($ext_data['connections'] as $connection_id) {
if (isset($this->_clientConnections[$connection_id])) {
$this->_clientConnections[$connection_id]->send($body, $raw);
}
}
} else {
$exclude_connection_id = !empty($ext_data['exclude']) ? $ext_data['exclude'] : null;
foreach ($this->_clientConnections as $client_connection) {
if (!isset($exclude_connection_id[$client_connection->id])) {
$client_connection->send($body, $raw);
}
}
}
return;
case GatewayProtocol::CMD_SELECT:
$client_info_array = array();
$ext_data = json_decode($data['ext_data'], true);
if (!$ext_data) {
echo 'CMD_SELECT ext_data=' . var_export($data['ext_data'], true) . '\r\n';
$buffer = serialize($client_info_array);
$connection->send(pack('N', strlen($buffer)) . $buffer, true);
return;
}
$fields = $ext_data['fields'];
$where = $ext_data['where'];
if ($where) {
$connection_box_map = array(
'groups' => $this->_groupConnections,
'uid' => $this->_uidConnections
);
foreach ($where as $key => $items) {
if ($key !== 'connection_id') {
$connections_box = $connection_box_map[$key];
foreach ($items as $item) {
if (isset($connections_box[$item])) {
foreach ($connections_box[$item] as $connection_id => $client_connection) {
if (!isset($client_info_array[$connection_id])) {
$client_info_array[$connection_id] = array();
foreach ($fields as $field) {
$client_info_array[$connection_id][$field] = isset($client_connection->$field) ? $client_connection->$field : null;
}
}
}
}
}
} else {
foreach ($items as $connection_id) {
if (isset($this->_clientConnections[$connection_id])) {
$client_connection = $this->_clientConnections[$connection_id];
$client_info_array[$connection_id] = array();
foreach ($fields as $field) {
$client_info_array[$connection_id][$field] = isset($client_connection->$field) ? $client_connection->$field : null;
}
}
}
}
}
} else {
foreach ($this->_clientConnections as $connection_id => $client_connection) {
foreach ($fields as $field) {
$client_info_array[$connection_id][$field] = isset($client_connection->$field) ? $client_connection->$field : null;
}
}
}
$buffer = serialize($client_info_array);
$connection->send(pack('N', strlen($buffer)) . $buffer, true);
return;
case GatewayProtocol::CMD_GET_GROUP_ID_LIST:
$buffer = serialize(array_keys($this->_groupConnections));
$connection->send(pack('N', strlen($buffer)) . $buffer, true);
return;
case GatewayProtocol::CMD_SET_SESSION:
if (isset($this->_clientConnections[$data['connection_id']])) {
$this->_clientConnections[$data['connection_id']]->session = $data['ext_data'];
}
return;
case GatewayProtocol::CMD_UPDATE_SESSION:
if (!isset($this->_clientConnections[$data['connection_id']])) {
return;
} else {
if (!$this->_clientConnections[$data['connection_id']]->session) {
$this->_clientConnections[$data['connection_id']]->session = $data['ext_data'];
return;
}
$session = Context::sessionDecode($this->_clientConnections[$data['connection_id']]->session);
$session_for_merge = Context::sessionDecode($data['ext_data']);
$session = array_replace_recursive($session, $session_for_merge);
$this->_clientConnections[$data['connection_id']]->session = Context::sessionEncode($session);
}
return;
case GatewayProtocol::CMD_GET_SESSION_BY_CLIENT_ID:
if (!isset($this->_clientConnections[$data['connection_id']])) {
$session = serialize(null);
} else {
if (!$this->_clientConnections[$data['connection_id']]->session) {
$session = serialize(array());
} else {
$session = $this->_clientConnections[$data['connection_id']]->session;
}
}
$connection->send(pack('N', strlen($session)) . $session, true);
return;
case GatewayProtocol::CMD_GET_ALL_CLIENT_SESSIONS:
$client_info_array = array();
foreach ($this->_clientConnections as $connection_id => $client_connection) {
$client_info_array[$connection_id] = $client_connection->session;
}
$buffer = serialize($client_info_array);
$connection->send(pack('N', strlen($buffer)) . $buffer, true);
return;
case GatewayProtocol::CMD_IS_ONLINE:
$buffer = serialize((int)isset($this->_clientConnections[$data['connection_id']]));
$connection->send(pack('N', strlen($buffer)) . $buffer, true);
return;
case GatewayProtocol::CMD_BIND_UID:
$uid = $data['ext_data'];
if (empty($uid)) {
echo "bindUid(client_id, uid) uid empty, uid=" . var_export($uid, true);
return;
}
$connection_id = $data['connection_id'];
if (!isset($this->_clientConnections[$connection_id])) {
return;
}
$client_connection = $this->_clientConnections[$connection_id];
if (isset($client_connection->uid)) {
$current_uid = $client_connection->uid;
unset($this->_uidConnections[$current_uid][$connection_id]);
if (empty($this->_uidConnections[$current_uid])) {
unset($this->_uidConnections[$current_uid]);
}
}
$client_connection->uid = $uid;
$this->_uidConnections[$uid][$connection_id] = $client_connection;
return;
case GatewayProtocol::CMD_UNBIND_UID:
$connection_id = $data['connection_id'];
if (!isset($this->_clientConnections[$connection_id])) {
return;
}
$client_connection = $this->_clientConnections[$connection_id];
if (isset($client_connection->uid)) {
$current_uid = $client_connection->uid;
unset($this->_uidConnections[$current_uid][$connection_id]);
if (empty($this->_uidConnections[$current_uid])) {
unset($this->_uidConnections[$current_uid]);
}
$client_connection->uid_info = '';
$client_connection->uid = null;
}
return;
case GatewayProtocol::CMD_SEND_TO_UID:
$uid_array = json_decode($data['ext_data'], true);
foreach ($uid_array as $uid) {
if (!empty($this->_uidConnections[$uid])) {
foreach ($this->_uidConnections[$uid] as $connection) {
$connection->send($data['body']);
}
}
}
return;
case GatewayProtocol::CMD_JOIN_GROUP:
$group = $data['ext_data'];
if (empty($group)) {
echo "join(group) group empty, group=" . var_export($group, true);
return;
}
$connection_id = $data['connection_id'];
if (!isset($this->_clientConnections[$connection_id])) {
return;
}
$client_connection = $this->_clientConnections[$connection_id];
if (!isset($client_connection->groups)) {
$client_connection->groups = array();
}
$client_connection->groups[$group] = $group;
$this->_groupConnections[$group][$connection_id] = $client_connection;
return;
case GatewayProtocol::CMD_LEAVE_GROUP:
$group = $data['ext_data'];
if (empty($group)) {
echo "leave(group) group empty, group=" . var_export($group, true);
return;
}
$connection_id = $data['connection_id'];
if (!isset($this->_clientConnections[$connection_id])) {
return;
}
$client_connection = $this->_clientConnections[$connection_id];
if (!isset($client_connection->groups[$group])) {
return;
}
unset($client_connection->groups[$group], $this->_groupConnections[$group][$connection_id]);
if (empty($this->_groupConnections[$group])) {
unset($this->_groupConnections[$group]);
}
return;
case GatewayProtocol::CMD_UNGROUP:
$group = $data['ext_data'];
if (empty($group)) {
echo "leave(group) group empty, group=" . var_export($group, true);
return;
}
if (empty($this->_groupConnections[$group])) {
return;
}
foreach ($this->_groupConnections[$group] as $client_connection) {
unset($client_connection->groups[$group]);
}
unset($this->_groupConnections[$group]);
return;
case GatewayProtocol::CMD_SEND_TO_GROUP:
$raw = (bool)($data['flag'] & GatewayProtocol::FLAG_NOT_CALL_ENCODE);
$body = $data['body'];
if (!$raw && $this->protocolAccelerate && $this->protocol) {
$body = $this->preEncodeForClient($body);
$raw = true;
}
$ext_data = json_decode($data['ext_data'], true);
$group_array = $ext_data['group'];
$exclude_connection_id = $ext_data['exclude'];
foreach ($group_array as $group) {
if (!empty($this->_groupConnections[$group])) {
foreach ($this->_groupConnections[$group] as $connection) {
if(!isset($exclude_connection_id[$connection->id]))
{
$connection->send($body, $raw);
}
}
}
}
return;
case GatewayProtocol::CMD_GET_CLIENT_SESSIONS_BY_GROUP:
$group = $data['ext_data'];
if (!isset($this->_groupConnections[$group])) {
$buffer = serialize(array());
$connection->send(pack('N', strlen($buffer)) . $buffer, true);
return;
}
$client_info_array = array();
foreach ($this->_groupConnections[$group] as $connection_id => $client_connection) {
$client_info_array[$connection_id] = $client_connection->session;
}
$buffer = serialize($client_info_array);
$connection->send(pack('N', strlen($buffer)) . $buffer, true);
return;
case GatewayProtocol::CMD_GET_CLIENT_COUNT_BY_GROUP:
$group = $data['ext_data'];
$count = 0;
if ($group !== '') {
if (isset($this->_groupConnections[$group])) {
$count = count($this->_groupConnections[$group]);
}
} else {
$count = count($this->_clientConnections);
}
$buffer = serialize($count);
$connection->send(pack('N', strlen($buffer)) . $buffer, true);
return;
case GatewayProtocol::CMD_GET_CLIENT_ID_BY_UID:
$uid = $data['ext_data'];
if (empty($this->_uidConnections[$uid])) {
$buffer = serialize(array());
} else {
$buffer = serialize(array_keys($this->_uidConnections[$uid]));
}
$connection->send(pack('N', strlen($buffer)) . $buffer, true);
return;
default :
$err_msg = "gateway inner pack err cmd=$cmd";
echo $err_msg;
}
}
public function onWorkerClose($connection)
{
if (isset($connection->key)) {
unset($this->_workerConnections[$connection->key]);
}
}
public function registerAddress()
{
$address = $this->lanIp . ':' . $this->lanPort;
foreach ($this->registerAddress as $register_address) {
$register_connection = new AsyncTcpConnection("text://{$register_address}");
$secret_key = $this->secretKey;
$register_connection->onConnect = function($register_connection) use ($address, $secret_key, $register_address){
$register_connection->send('{"event":"gateway_connect", "address":"' . $address . '", "secret_key":"' . $secret_key . '"}');
if (strpos($register_address, '127.0.0.1') !== 0) {
$register_connection->ping_timer = Timer::add(self::PERSISTENCE_CONNECTION_PING_INTERVAL, function () use ($register_connection) {
$register_connection->send('{"event":"ping"}');
});
}
};
$register_connection->onClose = function ($register_connection) {
if(!empty($register_connection->ping_timer)) {
Timer::del($register_connection->ping_timer);
}
$register_connection->reconnect(1);
};
$register_connection->connect();
}
}
public function ping()
{
$ping_data = $this->pingData ? (string)$this->pingData : null;
$raw = false;
if ($this->protocolAccelerate && $ping_data && $this->protocol) {
$ping_data = $this->preEncodeForClient($ping_data);
$raw = true;
}
foreach ($this->_clientConnections as $connection) {
if ($this->pingNotResponseLimit > 0 &&
$connection->pingNotResponseCount >= $this->pingNotResponseLimit * 2
) {
$connection->destroy();
continue;
}
$connection->pingNotResponseCount++;
if ($ping_data) {
if ($connection->pingNotResponseCount === 0 ||
($this->pingNotResponseLimit > 0 && $connection->pingNotResponseCount % 2 === 1)
) {
continue;
}
$connection->send($ping_data, $raw);
}
}
}
public function pingBusinessWorker()
{
$gateway_data = GatewayProtocol::$empty;
$gateway_data['cmd'] = GatewayProtocol::CMD_PING;
foreach ($this->_workerConnections as $connection) {
$connection->send($gateway_data);
}
}
protected function preEncodeForClient($data)
{
foreach ($this->_clientConnections as $client_connection) {
return call_user_func(array($client_connection->protocol, 'encode'), $data, $client_connection);
}
}
public function onWorkerStop()
{
if ($this->_onWorkerStop) {
call_user_func($this->_onWorkerStop, $this);
}
}
public static function log($msg){
Timer::add(1, function() use ($msg) {
Worker::log($msg);
}, null, false);
}
}