<?php
namespace Yesf\RD\Adapter;
use Swoole\Coroutine as co;
use Yesf\Yesf;
use Yesf\DI\Container;
use Yesf\Exception\DBException;
use Yesf\Connection\Pool;
use Yesf\Connection\PoolInterface;
use Yesf\RD\RDInterface;
use Latitude\QueryBuilder\QueryFactory;
use Latitude\QueryBuilder\Engine\MySqlEngine;
class Mysql implements RDInterface {
private $pool;
public function __construct(PoolInterface $pool) {
$this->pool = $pool;
}
public function query(string $sql, $data = null) {
$connection = $this->pool->getConnection();
$result = null;
$tryAgain = true;
SQL_START_EXECUTE:
if (is_array($data) && count($data) > 0) {
if (strpos($sql, ':') !== false) {
$bind = [];
$sql = preg_replace_callback('/:(\w+)\b/', function ($matches) use (&$data, &$bind) {
$bind[] = $data[$matches[1]];
return '?';
}, $sql);
$data = &$bind;
}
try {
$st = $connection->prepare($sql);
if ($st === false) {
goto SQL_TRY_AGAIN;
}
if (is_object($st)) {
$result = $st->execute($data);
} else {
$result = $connection->execute($data);
}
} catch (\Throwable $e) {
goto SQL_TRY_AGAIN;
}
if ($result === false) {
goto SQL_TRY_AGAIN;
}
goto SQL_SUCCESS_RETURN;
} else {
try {
$result = $connection->query($sql);
} catch (\Throwable $e) {
goto SQL_TRY_AGAIN;
}
if ($result === false) {
goto SQL_TRY_AGAIN;
}
goto SQL_SUCCESS_RETURN;
}
SQL_TRY_AGAIN:
if (($connection->errno === 2006 || $connection->errno === 2013) && $tryAgain) {
$tryAgain = false;
$connection = $this->pool->reconnect($connection);
goto SQL_START_EXECUTE;
} else {
$error = $connection->error;
$errno = $connection->errno;
$this->pool->freeConnection($connection);
throw new DBException($error, $errno);
}
SQL_SUCCESS_RETURN:
if ($result === true) {
$result = [
'_affected_rows' => $connection->affected_rows
];
if (stripos($sql, 'insert') === 0) {
$result['_insert_id'] = $connection->insert_id;
}
}
$this->pool->freeConnection($connection);
return $result;
}
public function get(string $sql, $data = null) {
if (stripos($sql, 'limit') === false) {
$sql .= ' LIMIT 0,1';
}
$r = $this->query($sql, $data);
return count($r) > 0 ? current($r) : null;
}
public function getColumn(string $sql, $data = null, $column = null) {
if ($column === null) {
if ($data === null) {
throw new DBException('$column can not be empty');
} else {
$column = $data;
}
}
$result = $this->get($sql, $data);
if ($result === null || !isset($result[$column])) {
throw new DBException("Column $column not exists");
} else {
return $result[$column];
}
}
public static function getBuilder() {
return new QueryFactory(Container::getInstance()->get(MySqlEngine::class));
}
}