<?php<liu21st@gmail.com>declare (strict_types = 1);
namespace think\db;
use Closure;
use PDO;
use PDOStatement;
use think\db\exception\BindParamException;
use think\db\exception\DbException;
use think\db\exception\PDOException;
use think\Model;
abstract class PDOConnection extends Connection
{
const PARAM_FLOAT = 21;
protected $config = [
'type' => '',
'hostname' => '',
'database' => '',
'username' => '',
'password' => '',
'hostport' => '',
'dsn' => '',
'params' => [],
'charset' => 'utf8',
'prefix' => '',
'deploy' => 0,
'rw_separate' => false,
'master_num' => 1,
'slave_no' => '',
'read_master' => false,
'fields_strict' => true,
'fields_cache' => false,
'trigger_sql' => true,
'builder' => '',
'query' => '',
'break_reconnect' => false,
'break_match_str' => [],
];
protected $PDOStatement;
protected $queryStr = '';
protected $transTimes = 0;
protected $reConnectTimes = 0;
protected $fetchType = PDO::FETCH_ASSOC;
protected $attrCase = PDO::CASE_LOWER;
protected $info = [];
protected $queryStartTime;
protected $params = [
PDO::ATTR_CASE => PDO::CASE_NATURAL,
PDO::ATTR_ERRMODE => PDO::ERRMODE_EXCEPTION,
PDO::ATTR_ORACLE_NULLS => PDO::NULL_NATURAL,
PDO::ATTR_STRINGIFY_FETCHES => false,
PDO::ATTR_EMULATE_PREPARES => false,
];
protected $bindType = [
'string' => PDO::PARAM_STR,
'str' => PDO::PARAM_STR,
'integer' => PDO::PARAM_INT,
'int' => PDO::PARAM_INT,
'boolean' => PDO::PARAM_BOOL,
'bool' => PDO::PARAM_BOOL,
'float' => self::PARAM_FLOAT,
'datetime' => PDO::PARAM_STR,
'timestamp' => PDO::PARAM_STR,
];
protected $breakMatchStr = [
'server has gone away',
'no connection to the server',
'Lost connection',
'is dead or not enabled',
'Error while sending',
'decryption failed or bad record mac',
'server closed the connection unexpectedly',
'SSL connection has been closed unexpectedly',
'Error writing data to the connection',
'Resource deadlock avoided',
'failed with errno',
'child connection forced to terminate due to client_idle_limit',
'query_wait_timeout',
'reset by peer',
'Physical connection is not usable',
'TCP Provider: Error code 0x68',
'ORA-03114',
'Packets out of order. Expected',
'Adaptive Server connection failed',
'Communication link failure',
'connection is no longer usable',
'Login timeout expired',
'SQLSTATE[HY000] [2002] Connection refused',
'running with the --read-only option so it cannot execute this statement',
'The connection is broken and recovery is not possible. The connection is marked by the client driver as unrecoverable. No attempt was made to restore the connection.',
'SQLSTATE[HY000] [2002] php_network_getaddresses: getaddrinfo failed: Try again',
'SQLSTATE[HY000] [2002] php_network_getaddresses: getaddrinfo failed: Name or service not known',
'SQLSTATE[HY000]: General error: 7 SSL SYSCALL error: EOF detected',
'SQLSTATE[HY000] [2002] Connection timed out',
'SSL: Connection timed out',
'SQLSTATE[HY000]: General error: 1105 The last transaction was aborted due to Seamless Scaling. Please retry.',
];
protected $bind = [];
public function getQueryClass(): string
{
return $this->getConfig('query') ?: Query::class;
}
public function getBuilderClass(): string
{
return $this->getConfig('builder') ?: '\\think\\db\\builder\\' . ucfirst($this->getConfig('type'));
}
abstract protected function parseDsn(array $config): string;
abstract public function getFields(string $tableName): array;
abstract public function getTables(string $dbName = ''): array;
public function fieldCase(array $info): array
{
switch ($this->attrCase) {
case PDO::CASE_LOWER:
$info = array_change_key_case($info);
break;
case PDO::CASE_UPPER:
$info = array_change_key_case($info, CASE_UPPER);
break;
case PDO::CASE_NATURAL:
default:
}
return $info;
}
protected function getFieldType(string $type): string
{
if (0 === strpos($type, 'set') || 0 === strpos($type, 'enum')) {
$result = 'string';
} elseif (preg_match('/(double|float|decimal|real|numeric)/is', $type)) {
$result = 'float';
} elseif (preg_match('/(int|serial|bit)/is', $type)) {
$result = 'int';
} elseif (preg_match('/bool/is', $type)) {
$result = 'bool';
} elseif (0 === strpos($type, 'timestamp')) {
$result = 'timestamp';
} elseif (0 === strpos($type, 'datetime')) {
$result = 'datetime';
} elseif (0 === strpos($type, 'date')) {
$result = 'date';
} else {
$result = 'string';
}
return $result;
}
public function getFieldBindType(string $type): int
{
if (in_array($type, ['integer', 'string', 'float', 'boolean', 'bool', 'int', 'str'])) {
$bind = $this->bindType[$type];
} elseif (0 === strpos($type, 'set') || 0 === strpos($type, 'enum')) {
$bind = PDO::PARAM_STR;
} elseif (preg_match('/(double|float|decimal|real|numeric)/is', $type)) {
$bind = self::PARAM_FLOAT;
} elseif (preg_match('/(int|serial|bit)/is', $type)) {
$bind = PDO::PARAM_INT;
} elseif (preg_match('/bool/is', $type)) {
$bind = PDO::PARAM_BOOL;
} else {
$bind = PDO::PARAM_STR;
}
return $bind;
}
protected function getSchemaCacheKey(string $schema): string
{
return $this->getConfig('hostname') . ':' . $this->getConfig('hostport') . '@' . $schema;
}
public function getSchemaInfo(string $tableName, $force = false)
{
if (!strpos($tableName, '.')) {
$schema = $this->getConfig('database') . '.' . $tableName;
} else {
$schema = $tableName;
}
if (!isset($this->info[$schema]) || $force) {
$cacheKey = $this->getSchemaCacheKey($schema);
$cacheField = $this->config['fields_cache'] && !empty($this->cache);
if ($cacheField && !$force) {
$info = $this->cache->get($cacheKey);
}
if (empty($info)) {
$info = $this->getTableFieldsInfo($tableName);
if ($cacheField) {
$this->cache->set($cacheKey, $info);
}
}
$pk = $info['_pk'] ?? null;
$autoinc = $info['_autoinc'] ?? null;
unset($info['_pk'], $info['_autoinc']);
$bind = [];
foreach ($info as $name => $val) {
$bind[$name] = $this->getFieldBindType($val);
}
$this->info[$schema] = [
'fields' => array_keys($info),
'type' => $info,
'bind' => $bind,
'pk' => $pk,
'autoinc' => $autoinc,
];
}
return $this->info[$schema];
}
public function getTableInfo($tableName, string $fetch = '')
{
if (is_array($tableName)) {
$tableName = key($tableName) ?: current($tableName);
}
if (strpos($tableName, ',') || strpos($tableName, ')')) {
return [];
}
[$tableName] = explode(' ', $tableName);
$info = $this->getSchemaInfo($tableName);
return $fetch ? $info[$fetch] : $info;
}
public function getTableFieldsInfo(string $tableName): array
{
$fields = $this->getFields($tableName);
$info = [];
foreach ($fields as $key => $val) {
$info[$key] = $this->getFieldType($val['type']);
if (!empty($val['primary'])) {
$pk[] = $key;
}
if (!empty($val['autoinc'])) {
$autoinc = $key;
}
}
if (isset($pk)) {
$pk = count($pk) > 1 ? $pk : $pk[0];
$info['_pk'] = $pk;
}
if (isset($autoinc)) {
$info['_autoinc'] = $autoinc;
}
return $info;
}
public function getPk($tableName)
{
return $this->getTableInfo($tableName, 'pk');
}
public function getAutoInc($tableName)
{
return $this->getTableInfo($tableName, 'autoinc');
}
public function getTableFields($tableName): array
{
return $this->getTableInfo($tableName, 'fields');
}
public function getFieldsType($tableName, string $field = null)
{
$result = $this->getTableInfo($tableName, 'type');
if ($field && isset($result[$field])) {
return $result[$field];
}
return $result;
}
public function getFieldsBind($tableName): array
{
return $this->getTableInfo($tableName, 'bind');
}
public function connect(array $config = [], $linkNum = 0, $autoConnection = false): PDO
{
if (isset($this->links[$linkNum])) {
return $this->links[$linkNum];
}
if (empty($config)) {
$config = $this->config;
} else {
$config = array_merge($this->config, $config);
}
if (isset($config['params']) && is_array($config['params'])) {
$params = $config['params'] + $this->params;
} else {
$params = $this->params;
}
$this->attrCase = $params[PDO::ATTR_CASE];
if (!empty($config['break_match_str'])) {
$this->breakMatchStr = array_merge($this->breakMatchStr, (array) $config['break_match_str']);
}
try {
if (empty($config['dsn'])) {
$config['dsn'] = $this->parseDsn($config);
}
$startTime = microtime(true);
$this->links[$linkNum] = $this->createPdo($config['dsn'], $config['username'], $config['password'], $params);
if (!empty($config['trigger_sql'])) {
$this->trigger('CONNECT:[ UseTime:' . number_format(microtime(true) - $startTime, 6) . 's ] ' . $config['dsn']);
}
return $this->links[$linkNum];
} catch (\PDOException $e) {
if ($autoConnection) {
$this->db->log($e->getMessage(), 'error');
return $this->connect($autoConnection, $linkNum);
} else {
throw $e;
}
}
}
public function view(...$args)
{
return $this->newQuery()->view(...$args);
}
protected function createPdo($dsn, $username, $password, $params)
{
return new PDO($dsn, $username, $password, $params);
}
public function free(): void
{
$this->PDOStatement = null;
}
public function getPdo()
{
if (!$this->linkID) {
return false;
}
return $this->linkID;
}
public function getCursor(BaseQuery $query, string $sql, array $bind = [], $model = null, $condition = null)
{
$this->queryPDOStatement($query, $sql, $bind);
while ($result = $this->PDOStatement->fetch($this->fetchType)) {
if ($model) {
yield $model->newInstance($result, $condition);
} else {
yield $result;
}
}
}
public function query(string $sql, array $bind = [], bool $master = false): array
{
return $this->pdoQuery($this->newQuery(), $sql, $bind, $master);
}
public function execute(string $sql, array $bind = []): int
{
return $this->pdoExecute($this->newQuery(), $sql, $bind, true);
}
protected function pdoQuery(BaseQuery $query, $sql, array $bind = [], bool $master = null): array
{
$query->parseOptions();
if ($query->getOptions('cache')) {
$cacheItem = $this->parseCache($query, $query->getOptions('cache'));
$key = $cacheItem->getKey();
$data = $this->cache->get($key);
if (null !== $data) {
return $data;
}
}
if ($sql instanceof Closure) {
$sql = $sql($query);
$bind = $query->getBind();
}
if (!isset($master)) {
$master = $query->getOptions('master') ? true : false;
}
$procedure = $query->getOptions('procedure') ? true : in_array(strtolower(substr(trim($sql), 0, 4)), ['call', 'exec']);
$this->getPDOStatement($sql, $bind, $master, $procedure);
$resultSet = $this->getResult($procedure);
if (isset($cacheItem) && $resultSet) {
$cacheItem->set($resultSet);
$this->cacheData($cacheItem);
}
return $resultSet;
}
public function pdo(BaseQuery $query): PDOStatement
{
$bind = $query->getBind();
$sql = $this->builder->select($query);
return $this->queryPDOStatement($query, $sql, $bind);
}
public function getPDOStatement(string $sql, array $bind = [], bool $master = false, bool $procedure = false): PDOStatement
{
try {
$this->initConnect($this->readMaster ?: $master);
$this->queryStr = $sql;
$this->bind = $bind;
$this->db->updateQueryTimes();
$this->queryStartTime = microtime(true);
$this->PDOStatement = $this->linkID->prepare($sql);
if ($procedure) {
$this->bindParam($bind);
} else {
$this->bindValue($bind);
}
$this->PDOStatement->execute();
if (!empty($this->config['trigger_sql'])) {
$this->trigger('', $master);
}
$this->reConnectTimes = 0;
return $this->PDOStatement;
} catch (\Throwable | \Exception $e) {
if ($this->transTimes > 0) {
if ($this->isBreak($e)) {
$this->transTimes = 0;
}
} else {
if ($this->reConnectTimes < 4 && $this->isBreak($e)) {
++$this->reConnectTimes;
return $this->close()->getPDOStatement($sql, $bind, $master, $procedure);
}
}
if ($e instanceof \PDOException) {
throw new PDOException($e, $this->config, $this->getLastsql());
} else {
throw $e;
}
}
}
protected function pdoExecute(BaseQuery $query, string $sql, array $bind = [], bool $origin = false): int
{
if ($origin) {
$query->parseOptions();
}
$this->queryPDOStatement($query->master(true), $sql, $bind);
if (!$origin && !empty($this->config['deploy']) && !empty($this->config['read_master'])) {
$this->readMaster = true;
}
$this->numRows = $this->PDOStatement->rowCount();
if ($query->getOptions('cache')) {
$cacheItem = $this->parseCache($query, $query->getOptions('cache'));
$key = $cacheItem->getKey();
$tag = $cacheItem->getTag();
if (isset($key) && $this->cache->has($key)) {
$this->cache->delete($key);
} elseif (!empty($tag) && method_exists($this->cache, 'tag')) {
$this->cache->tag($tag)->clear();
}
}
return $this->numRows;
}
protected function queryPDOStatement(BaseQuery $query, string $sql, array $bind = []): PDOStatement
{
$options = $query->getOptions();
$master = !empty($options['master']) ? true : false;
$procedure = !empty($options['procedure']) ? true : in_array(strtolower(substr(trim($sql), 0, 4)), ['call', 'exec']);
return $this->getPDOStatement($sql, $bind, $master, $procedure);
}
public function find(BaseQuery $query): array
{
$result = $this->db->trigger('before_find', $query);
if (!$result) {
$resultSet = $this->pdoQuery($query, function ($query) {
return $this->builder->select($query, true);
});
$result = $resultSet[0] ?? [];
}
return $result;
}
public function cursor(BaseQuery $query)
{
$options = $query->parseOptions();
$sql = $this->builder->select($query);
$condition = $options['where']['AND'] ?? null;
return $this->getCursor($query, $sql, $query->getBind(), $query->getModel(), $condition);
}
public function select(BaseQuery $query): array
{
$resultSet = $this->db->trigger('before_select', $query);
if (!$resultSet) {
$resultSet = $this->pdoQuery($query, function ($query) {
return $this->builder->select($query);
});
}
return $resultSet;
}
public function insert(BaseQuery $query, bool $getLastInsID = false)
{
$options = $query->parseOptions();
$sql = $this->builder->insert($query);
$result = '' == $sql ? 0 : $this->pdoExecute($query, $sql, $query->getBind());
if ($result) {
$sequence = $options['sequence'] ?? null;
$lastInsId = $this->getLastInsID($query, $sequence);
$data = $options['data'];
if ($lastInsId) {
$pk = $query->getAutoInc();
if ($pk) {
$data[$pk] = $lastInsId;
}
}
$query->setOption('data', $data);
$this->db->trigger('after_insert', $query);
if ($getLastInsID && $lastInsId) {
return $lastInsId;
}
}
return $result;
}
public function insertAll(BaseQuery $query, array $dataSet = [], int $limit = 0): int
{
if (!is_array(reset($dataSet))) {
return 0;
}
$options = $query->parseOptions();
$replace = !empty($options['replace']);
if (0 === $limit && count($dataSet) >= 5000) {
$limit = 1000;
}
if ($limit) {
$this->startTrans();
try {
$array = array_chunk($dataSet, $limit, true);
$count = 0;
foreach ($array as $item) {
$sql = $this->builder->insertAll($query, $item, $replace);
$count += $this->pdoExecute($query, $sql, $query->getBind());
}
$this->commit();
} catch (\Exception | \Throwable $e) {
$this->rollback();
throw $e;
}
return $count;
}
$sql = $this->builder->insertAll($query, $dataSet, $replace);
return $this->pdoExecute($query, $sql, $query->getBind());
}
public function selectInsert(BaseQuery $query, array $fields, string $table): int
{
$query->parseOptions();
$sql = $this->builder->selectInsert($query, $fields, $table);
return $this->pdoExecute($query, $sql, $query->getBind());
}
public function update(BaseQuery $query): int
{
$query->parseOptions();
$sql = $this->builder->update($query);
$result = '' == $sql ? 0 : $this->pdoExecute($query, $sql, $query->getBind());
if ($result) {
$this->db->trigger('after_update', $query);
}
return $result;
}
public function delete(BaseQuery $query): int
{
$query->parseOptions();
$sql = $this->builder->delete($query);
$result = $this->pdoExecute($query, $sql, $query->getBind());
if ($result) {
$this->db->trigger('after_delete', $query);
}
return $result;
}
public function value(BaseQuery $query, string $field, $default = null, bool $one = true)
{
$options = $query->parseOptions();
if (isset($options['field'])) {
$query->removeOption('field');
}
if (isset($options['group'])) {
$query->group('');
}
$query->setOption('field', (array) $field);
if (!empty($options['cache'])) {
$cacheItem = $this->parseCache($query, $options['cache'], 'value');
$key = $cacheItem->getKey();
if ($this->cache->has($key)) {
return $this->cache->get($key);
}
}
$sql = $this->builder->select($query, $one);
if (isset($options['field'])) {
$query->setOption('field', $options['field']);
} else {
$query->removeOption('field');
}
if (isset($options['group'])) {
$query->setOption('group', $options['group']);
}
$pdo = $this->getPDOStatement($sql, $query->getBind(), $options['master']);
$result = $pdo->fetchColumn();
if (isset($cacheItem)) {
$cacheItem->set($result);
$this->cacheData($cacheItem);
}
return false !== $result ? $result : $default;
}
public function aggregate(BaseQuery $query, string $aggregate, $field, bool $force = false)
{
if (is_string($field) && 0 === stripos($field, 'DISTINCT ')) {
[$distinct, $field] = explode(' ', $field);
}
$field = $aggregate . '(' . (!empty($distinct) ? 'DISTINCT ' : '') . $this->builder->parseKey($query, $field, true) . ') AS think_' . strtolower($aggregate);
$result = $this->value($query, $field, 0, false);
return $force ? (float) $result : $result;
}
public function column(BaseQuery $query, $column, string $key = ''): array
{
$options = $query->parseOptions();
if (isset($options['field'])) {
$query->removeOption('field');
}
if (empty($key) || trim($key) === '') {
$key = null;
}
if (\is_string($column)) {
$column = \trim($column);
if ('*' !== $column) {
$column = \array_map('\trim', \explode(',', $column));
}
} elseif (\is_array($column)) {
if (\in_array('*', $column)) {
$column = '*';
}
} else {
throw new DbException('not support type');
}
$field = $column;
if ('*' !== $column && $key && !\in_array($key, $column)) {
$field[] = $key;
}
$query->setOption('field', $field);
if (!empty($options['cache'])) {
$cacheItem = $this->parseCache($query, $options['cache'], 'column');
$name = $cacheItem->getKey();
if ($this->cache->has($name)) {
return $this->cache->get($name);
}
}
$sql = $this->builder->select($query);
if (isset($options['field'])) {
$query->setOption('field', $options['field']);
} else {
$query->removeOption('field');
}
$pdo = $this->getPDOStatement($sql, $query->getBind(), $options['master']);
$resultSet = $pdo->fetchAll(PDO::FETCH_ASSOC);
if (is_string($key) && strpos($key, '.')) {
[$alias, $key] = explode('.', $key);
}
if (empty($resultSet)) {
$result = [];
} elseif ('*' !== $column && \count($column) === 1) {
$column = \array_shift($column);
if (\strpos($column, ' ')) {
$column = \substr(\strrchr(\trim($column), ' '), 1);
}
if (\strpos($column, '.')) {
[$alias, $column] = \explode('.', $column);
}
$result = \array_column($resultSet, $column, $key);
} elseif ($key) {
$result = \array_column($resultSet, null, $key);
} else {
$result = $resultSet;
}
if (isset($cacheItem)) {
$cacheItem->set($result);
$this->cacheData($cacheItem);
}
return $result;
}
public function getRealSql(string $sql, array $bind = []): string
{
foreach ($bind as $key => $val) {
$value = strval(is_array($val) ? $val[0] : $val);
$type = is_array($val) ? $val[1] : PDO::PARAM_STR;
if (self::PARAM_FLOAT == $type || PDO::PARAM_STR == $type) {
$value = '\'' . addslashes($value) . '\'';
} elseif (PDO::PARAM_INT == $type && '' === $value) {
$value = '0';
}
$sql = is_numeric($key) ?
substr_replace($sql, $value, strpos($sql, '?'), 1) :
substr_replace($sql, $value, strpos($sql, ':' . $key), strlen(':' . $key));
}
return rtrim($sql);
}
protected function bindValue(array $bind = []): void
{
foreach ($bind as $key => $val) {
$param = is_numeric($key) ? $key + 1 : ':' . $key;
if (is_array($val)) {
if (PDO::PARAM_INT == $val[1] && '' === $val[0]) {
$val[0] = 0;
} elseif (self::PARAM_FLOAT == $val[1]) {
$val[0] = is_string($val[0]) ? (float) $val[0] : $val[0];
$val[1] = PDO::PARAM_STR;
}
$result = $this->PDOStatement->bindValue($param, $val[0], $val[1]);
} else {
$result = $this->PDOStatement->bindValue($param, $val);
}
if (!$result) {
throw new BindParamException(
"Error occurred when binding parameters '{$param}'",
$this->config,
$this->getLastsql(),
$bind
);
}
}
}
protected function bindParam(array $bind): void
{
foreach ($bind as $key => $val) {
$param = is_numeric($key) ? $key + 1 : ':' . $key;
if (is_array($val)) {
array_unshift($val, $param);
$result = call_user_func_array([$this->PDOStatement, 'bindParam'], $val);
} else {
$result = $this->PDOStatement->bindValue($param, $val);
}
if (!$result) {
$param = array_shift($val);
throw new BindParamException(
"Error occurred when binding parameters '{$param}'",
$this->config,
$this->getLastsql(),
$bind
);
}
}
}
protected function getResult(bool $procedure = false): array
{
if ($procedure) {
return $this->procedure();
}
$result = $this->PDOStatement->fetchAll($this->fetchType);
$this->numRows = count($result);
return $result;
}
protected function procedure(): array
{
$item = [];
do {
$result = $this->getResult();
if (!empty($result)) {
$item[] = $result;
}
} while ($this->PDOStatement->nextRowset());
$this->numRows = count($item);
return $item;
}
public function transaction(callable $callback)
{
$this->startTrans();
try {
$result = null;
if (is_callable($callback)) {
$result = $callback($this);
}
$this->commit();
return $result;
} catch (\Exception | \Throwable $e) {
$this->rollback();
throw $e;
}
}
public function startTrans(): void
{
try {
$this->initConnect(true);
++$this->transTimes;
if (1 == $this->transTimes) {
$this->linkID->beginTransaction();
} elseif ($this->transTimes > 1 && $this->supportSavepoint()) {
$this->linkID->exec(
$this->parseSavepoint('trans' . $this->transTimes)
);
}
$this->reConnectTimes = 0;
} catch (\Throwable | \Exception $e) {
if ($this->transTimes === 1 && $this->reConnectTimes < 4 && $this->isBreak($e)) {
--$this->transTimes;
++$this->reConnectTimes;
$this->close()->startTrans();
} else {
if ($this->isBreak($e)) {
$this->transTimes = 0;
}
throw $e;
}
}
}
public function commit(): void
{
$this->initConnect(true);
if (1 == $this->transTimes) {
$this->linkID->commit();
}
--$this->transTimes;
}
public function rollback(): void
{
$this->initConnect(true);
if (1 == $this->transTimes) {
$this->linkID->rollBack();
} elseif ($this->transTimes > 1 && $this->supportSavepoint()) {
$this->linkID->exec(
$this->parseSavepointRollBack('trans' . $this->transTimes)
);
}
$this->transTimes = max(0, $this->transTimes - 1);
}
protected function supportSavepoint(): bool
{
return false;
}
protected function parseSavepoint(string $name): string
{
return 'SAVEPOINT ' . $name;
}
protected function parseSavepointRollBack(string $name): string
{
return 'ROLLBACK TO SAVEPOINT ' . $name;
}
public function batchQuery(BaseQuery $query, array $sqlArray = [], array $bind = []): bool
{
$this->startTrans();
try {
foreach ($sqlArray as $sql) {
$this->pdoExecute($query, $sql, $bind);
}
$this->commit();
} catch (\Exception $e) {
$this->rollback();
throw $e;
}
return true;
}
public function close()
{
$this->linkID = null;
$this->linkWrite = null;
$this->linkRead = null;
$this->links = [];
$this->transTimes = 0;
$this->free();
return $this;
}
protected function isBreak($e): bool
{
if (!$this->config['break_reconnect']) {
return false;
}
$error = $e->getMessage();
foreach ($this->breakMatchStr as $msg) {
if (false !== stripos($error, $msg)) {
return true;
}
}
return false;
}
public function getLastSql(): string
{
return $this->getRealSql($this->queryStr, $this->bind);
}
public function getLastInsID(BaseQuery $query, string $sequence = null)
{
try {
$insertId = $this->linkID->lastInsertId($sequence);
} catch (\Exception $e) {
$insertId = '';
}
return $this->autoInsIDType($query, $insertId);
}
protected function autoInsIDType(BaseQuery $query, string $insertId)
{
$pk = $query->getAutoInc();
if ($pk) {
$type = $this->getFieldBindType($pk);
if (PDO::PARAM_INT == $type) {
$insertId = (int) $insertId;
} elseif (self::PARAM_FLOAT == $type) {
$insertId = (float) $insertId;
}
}
return $insertId;
}
public function getError(): string
{
if ($this->PDOStatement) {
$error = $this->PDOStatement->errorInfo();
$error = $error[1] . ':' . $error[2];
} else {
$error = '';
}
if ('' != $this->queryStr) {
$error .= "\n [ SQL语句 ] : " . $this->getLastsql();
}
return $error;
}
protected function initConnect(bool $master = true): void
{
if (!empty($this->config['deploy'])) {
if ($master || $this->transTimes) {
if (!$this->linkWrite) {
$this->linkWrite = $this->multiConnect(true);
}
$this->linkID = $this->linkWrite;
} else {
if (!$this->linkRead) {
$this->linkRead = $this->multiConnect(false);
}
$this->linkID = $this->linkRead;
}
} elseif (!$this->linkID) {
$this->linkID = $this->connect();
}
}
protected function multiConnect(bool $master = false): PDO
{
$config = [];
foreach (['username', 'password', 'hostname', 'hostport', 'database', 'dsn', 'charset'] as $name) {
$config[$name] = is_string($this->config[$name]) ? explode(',', $this->config[$name]) : $this->config[$name];
}
$m = floor(mt_rand(0, $this->config['master_num'] - 1));
if ($this->config['rw_separate']) {
if ($master) {
$r = $m;
} elseif (is_numeric($this->config['slave_no'])) {
$r = $this->config['slave_no'];
} else {
$r = floor(mt_rand($this->config['master_num'], count($config['hostname']) - 1));
}
} else {
$r = floor(mt_rand(0, count($config['hostname']) - 1));
}
$dbMaster = false;
if ($m != $r) {
$dbMaster = [];
foreach (['username', 'password', 'hostname', 'hostport', 'database', 'dsn', 'charset'] as $name) {
$dbMaster[$name] = $config[$name][$m] ?? $config[$name][0];
}
}
$dbConfig = [];
foreach (['username', 'password', 'hostname', 'hostport', 'database', 'dsn', 'charset'] as $name) {
$dbConfig[$name] = $config[$name][$r] ?? $config[$name][0];
}
return $this->connect($dbConfig, $r, $r == $m ? false : $dbMaster);
}
public function startTransXa(string $xid)
{}
public function prepareXa(string $xid)
{}
public function commitXa(string $xid)
{}
public function rollbackXa(string $xid)
{}
}