<?php<liu21st@gmail.com>declare (strict_types = 1);
namespace think\db;
use Closure;
use PDO;
use PDOStatement;
use think\db\exception\BindParamException;
use think\db\exception\DbException;
use think\db\exception\PDOException;
abstract class PDOConnection extends Connection
{
const PARAM_FLOAT = 21;
protected $config = [
'type' => '',
'hostname' => '',
'database' => '',
'username' => '',
'password' => '',
'hostport' => '',
'dsn' => '',
'params' => [],
'charset' => 'utf8',
'prefix' => '',
'deploy' => 0,
'rw_separate' => false,
'master_num' => 1,
'slave_no' => '',
'read_master' => false,
'fields_strict' => true,
'fields_cache' => false,
'trigger_sql' => true,
'builder' => '',
'query' => '',
'break_reconnect' => false,
'break_match_str' => [],
];
protected $PDOStatement;
protected $queryStr = '';
protected $transTimes = 0;
protected $reConnectTimes = 0;
protected $fetchType = PDO::FETCH_ASSOC;
protected $attrCase = PDO::CASE_LOWER;
protected $info = [];
protected $queryStartTime;
protected $params = [
PDO::ATTR_CASE => PDO::CASE_NATURAL,
PDO::ATTR_ERRMODE => PDO::ERRMODE_EXCEPTION,
PDO::ATTR_ORACLE_NULLS => PDO::NULL_NATURAL,
PDO::ATTR_STRINGIFY_FETCHES => false,
PDO::ATTR_EMULATE_PREPARES => false,
];
protected $bindType = [
'string' => PDO::PARAM_STR,
'str' => PDO::PARAM_STR,
'integer' => PDO::PARAM_INT,
'int' => PDO::PARAM_INT,
'boolean' => PDO::PARAM_BOOL,
'bool' => PDO::PARAM_BOOL,
'float' => self::PARAM_FLOAT,
'datetime' => PDO::PARAM_STR,
'timestamp' => PDO::PARAM_STR,
];
protected $breakMatchStr = [
'server has gone away',
'no connection to the server',
'Lost connection',
'is dead or not enabled',
'Error while sending',
'decryption failed or bad record mac',
'server closed the connection unexpectedly',
'SSL connection has been closed unexpectedly',
'Error writing data to the connection',
'Resource deadlock avoided',
'failed with errno',
];
protected $bind = [];
public function getQueryClass(): string
{
return $this->getConfig('query') ?: Query::class;
}
public function getBuilderClass(): string
{
return $this->getConfig('builder') ?: '\\think\\db\\builder\\' . ucfirst($this->getConfig('type'));
}
abstract protected function parseDsn(array $config);
abstract public function getFields(string $tableName);
abstract public function getTables(string $dbName);
public function fieldCase(array $info): array
{
switch ($this->attrCase) {
case PDO::CASE_LOWER:
$info = array_change_key_case($info);
break;
case PDO::CASE_UPPER:
$info = array_change_key_case($info, CASE_UPPER);
break;
case PDO::CASE_NATURAL:
default:
}
return $info;
}
protected function getFieldType(string $type): string
{
if (0 === strpos($type, 'set') || 0 === strpos($type, 'enum')) {
$result = 'string';
} elseif (preg_match('/(double|float|decimal|real|numeric)/is', $type)) {
$result = 'float';
} elseif (preg_match('/(int|serial|bit)/is', $type)) {
$result = 'int';
} elseif (preg_match('/bool/is', $type)) {
$result = 'bool';
} elseif (0 === strpos($type, 'timestamp')) {
$result = 'timestamp';
} elseif (0 === strpos($type, 'datetime')) {
$result = 'datetime';
} elseif (0 === strpos($type, 'date')) {
$result = 'date';
} else {
$result = 'string';
}
return $result;
}
public function getFieldBindType(string $type): int
{
if (in_array($type, ['integer', 'string', 'float', 'boolean', 'bool', 'int', 'str'])) {
$bind = $this->bindType[$type];
} elseif (0 === strpos($type, 'set') || 0 === strpos($type, 'enum')) {
$bind = PDO::PARAM_STR;
} elseif (preg_match('/(double|float|decimal|real|numeric)/is', $type)) {
$bind = self::PARAM_FLOAT;
} elseif (preg_match('/(int|serial|bit)/is', $type)) {
$bind = PDO::PARAM_INT;
} elseif (preg_match('/bool/is', $type)) {
$bind = PDO::PARAM_BOOL;
} else {
$bind = PDO::PARAM_STR;
}
return $bind;
}
protected function getSchemaCacheKey(string $schema): string
{
return $this->getConfig('hostname') . ':' . $this->getConfig('hostport') . '@' . $schema;
}
public function getSchemaInfo(string $tableName, $force = false)
{
if (!strpos($tableName, '.')) {
$schema = $this->getConfig('database') . '.' . $tableName;
} else {
$schema = $tableName;
}
if (!isset($this->info[$schema]) || $force) {
$cacheKey = $this->getSchemaCacheKey($schema);
$cacheField = $this->config['fields_cache'] && !empty($this->cache);
if ($cacheField && !$force) {
$info = $this->cache->get($cacheKey);
}
if (empty($info)) {
$info = $this->getTableFieldsInfo($tableName);
if ($cacheField) {
$this->cache->set($cacheKey, $info);
}
}
$pk = $info['_pk'] ?? null;
$autoinc = $info['_autoinc'] ?? null;
unset($info['_pk'], $info['_autoinc']);
$bind = [];
foreach ($info as $name => $val) {
$bind[$name] = $this->getFieldBindType($val);
}
$this->info[$schema] = [
'fields' => array_keys($info),
'type' => $info,
'bind' => $bind,
'pk' => $pk,
'autoinc' => $autoinc,
];
}
return $this->info[$schema];
}
public function getTableInfo($tableName, string $fetch = '')
{
if (is_array($tableName)) {
$tableName = key($tableName) ?: current($tableName);
}
if (strpos($tableName, ',') || strpos($tableName, ')')) {
return [];
}
[$tableName] = explode(' ', $tableName);
$info = $this->getSchemaInfo($tableName);
return $fetch ? $info[$fetch] : $info;
}
public function getTableFieldsInfo(string $tableName): array
{
$fields = $this->getFields($tableName);
$info = [];
foreach ($fields as $key => $val) {
$info[$key] = $this->getFieldType($val['type']);
if (!empty($val['primary'])) {
$pk[] = $key;
}
if (!empty($val['autoinc'])) {
$autoinc = $key;
}
}
if (isset($pk)) {
$pk = count($pk) > 1 ? $pk : $pk[0];
$info['_pk'] = $pk;
}
if (isset($autoinc)) {
$info['_autoinc'] = $autoinc;
}
return $info;
}
public function getPk($tableName)
{
return $this->getTableInfo($tableName, 'pk');
}
public function getAutoInc($tableName)
{
return $this->getTableInfo($tableName, 'autoinc');
}
public function getTableFields($tableName): array
{
return $this->getTableInfo($tableName, 'fields');
}
public function getFieldsType($tableName, string $field = null)
{
$result = $this->getTableInfo($tableName, 'type');
if ($field && isset($result[$field])) {
return $result[$field];
}
return $result;
}
public function getFieldsBind($tableName): array
{
return $this->getTableInfo($tableName, 'bind');
}
public function connect(array $config = [], $linkNum = 0, $autoConnection = false): PDO
{
if (isset($this->links[$linkNum])) {
return $this->links[$linkNum];
}
if (empty($config)) {
$config = $this->config;
} else {
$config = array_merge($this->config, $config);
}
if (isset($config['params']) && is_array($config['params'])) {
$params = $config['params'] + $this->params;
} else {
$params = $this->params;
}
$this->attrCase = $params[PDO::ATTR_CASE];
if (!empty($config['break_match_str'])) {
$this->breakMatchStr = array_merge($this->breakMatchStr, (array) $config['break_match_str']);
}
try {
if (empty($config['dsn'])) {
$config['dsn'] = $this->parseDsn($config);
}
$startTime = microtime(true);
$this->links[$linkNum] = $this->createPdo($config['dsn'], $config['username'], $config['password'], $params);
if (!empty($config['trigger_sql'])) {
$this->trigger('CONNECT:[ UseTime:' . number_format(microtime(true) - $startTime, 6) . 's ] ' . $config['dsn']);
}
return $this->links[$linkNum];
} catch (\PDOException $e) {
if ($autoConnection) {
$this->db->log($e->getMessage(), 'error');
return $this->connect($autoConnection, $linkNum);
} else {
throw $e;
}
}
}
public function view(...$args)
{
return $this->newQuery()->view(...$args);
}
protected function createPdo($dsn, $username, $password, $params)
{
return new PDO($dsn, $username, $password, $params);
}
public function free(): void
{
$this->PDOStatement = null;
}
public function getPdo()
{
if (!$this->linkID) {
return false;
}
return $this->linkID;
}
public function getCursor(BaseQuery $query, string $sql, array $bind = [], $model = null, $condition = null)
{
$this->queryPDOStatement($query, $sql, $bind);
while ($result = $this->PDOStatement->fetch($this->fetchType)) {
if ($model) {
yield $model->newInstance($result, $condition);
} else {
yield $result;
}
}
}
public function query(string $sql, array $bind = [], bool $master = false): array
{
return $this->pdoQuery($this->newQuery(), $sql, $bind, $master);
}
public function execute(string $sql, array $bind = []): int
{
return $this->pdoExecute($this->newQuery(), $sql, $bind, true);
}
protected function pdoQuery(BaseQuery $query, $sql, array $bind = [], bool $master = null): array
{
$query->parseOptions();
if ($query->getOptions('cache')) {
$cacheItem = $this->parseCache($query, $query->getOptions('cache'));
$key = $cacheItem->getKey();
$data = $this->cache->get($key);
if (null !== $data) {
return $data;
}
}
if ($sql instanceof Closure) {
$sql = $sql($query);
$bind = $query->getBind();
}
if (!isset($master)) {
$master = $query->getOptions('master') ? true : false;
}
$procedure = $query->getOptions('procedure') ? true : in_array(strtolower(substr(trim($sql), 0, 4)), ['call', 'exec']);
$this->getPDOStatement($sql, $bind, $master, $procedure);
$resultSet = $this->getResult($procedure);
if (isset($cacheItem) && $resultSet) {
$cacheItem->set($resultSet);
$this->cacheData($cacheItem);
}
return $resultSet;
}
public function pdo(BaseQuery $query): PDOStatement
{
$bind = $query->getBind();
$sql = $this->builder->select($query);
return $this->queryPDOStatement($query, $sql, $bind);
}
public function getPDOStatement(string $sql, array $bind = [], bool $master = false, bool $procedure = false): PDOStatement
{
$this->initConnect($this->readMaster ?: $master);
$this->queryStr = $sql;
$this->bind = $bind;
$this->db->updateQueryTimes();
try {
$this->queryStartTime = microtime(true);
$this->PDOStatement = $this->linkID->prepare($sql);
if ($procedure) {
$this->bindParam($bind);
} else {
$this->bindValue($bind);
}
$this->PDOStatement->execute();
if (!empty($this->config['trigger_sql'])) {
$this->trigger('', $master);
}
$this->reConnectTimes = 0;
return $this->PDOStatement;
} catch (\Throwable | \Exception $e) {
if ($this->reConnectTimes < 4 && $this->isBreak($e)) {
++$this->reConnectTimes;
return $this->close()->getPDOStatement($sql, $bind, $master, $procedure);
}
if ($e instanceof \PDOException) {
throw new PDOException($e, $this->config, $this->getLastsql());
} else {
throw $e;
}
}
}
protected function pdoExecute(BaseQuery $query, string $sql, array $bind = [], bool $origin = false): int
{
if ($origin) {
$query->parseOptions();
}
$this->queryPDOStatement($query->master(true), $sql, $bind);
if (!$origin && !empty($this->config['deploy']) && !empty($this->config['read_master'])) {
$this->readMaster = true;
}
$this->numRows = $this->PDOStatement->rowCount();
if ($query->getOptions('cache')) {
$cacheItem = $this->parseCache($query, $query->getOptions('cache'));
$key = $cacheItem->getKey();
$tag = $cacheItem->getTag();
if (isset($key) && $this->cache->has($key)) {
$this->cache->delete($key);
} elseif (!empty($tag) && method_exists($this->cache, 'tag')) {
$this->cache->tag($tag)->clear();
}
}
return $this->numRows;
}
protected function queryPDOStatement(BaseQuery $query, string $sql, array $bind = []): PDOStatement
{
$options = $query->getOptions();
$master = !empty($options['master']) ? true : false;
$procedure = !empty($options['procedure']) ? true : in_array(strtolower(substr(trim($sql), 0, 4)), ['call', 'exec']);
return $this->getPDOStatement($sql, $bind, $master, $procedure);
}
public function find(BaseQuery $query): array
{
$result = $this->db->trigger('before_find', $query);
if (!$result) {
$resultSet = $this->pdoQuery($query, function ($query) {
return $this->builder->select($query, true);
});
$result = $resultSet[0] ?? [];
}
return $result;
}
public function cursor(BaseQuery $query)
{
$options = $query->parseOptions();
$sql = $this->builder->select($query);
$condition = $options['where']['AND'] ?? null;
return $this->getCursor($query, $sql, $query->getBind(), $query->getModel(), $condition);
}
public function select(BaseQuery $query): array
{
$resultSet = $this->db->trigger('before_select', $query);
if (!$resultSet) {
$resultSet = $this->pdoQuery($query, function ($query) {
return $this->builder->select($query);
});
}
return $resultSet;
}
public function insert(BaseQuery $query, bool $getLastInsID = false)
{
$options = $query->parseOptions();
$sql = $this->builder->insert($query);
$result = '' == $sql ? 0 : $this->pdoExecute($query, $sql, $query->getBind());
if ($result) {
$sequence = $options['sequence'] ?? null;
$lastInsId = $this->getLastInsID($query, $sequence);
$data = $options['data'];
if ($lastInsId) {
$pk = $query->getAutoInc();
if ($pk) {
$data[$pk] = $lastInsId;
}
}
$query->setOption('data', $data);
$this->db->trigger('after_insert', $query);
if ($getLastInsID && $lastInsId) {
return $lastInsId;
}
}
return $result;
}
public function insertAll(BaseQuery $query, array $dataSet = [], int $limit = 0): int
{
if (!is_array(reset($dataSet))) {
return 0;
}
$options = $query->parseOptions();
$replace = !empty($options['replace']);
if (0 === $limit && count($dataSet) >= 5000) {
$limit = 1000;
}
if ($limit) {
$this->startTrans();
try {
$array = array_chunk($dataSet, $limit, true);
$count = 0;
foreach ($array as $item) {
$sql = $this->builder->insertAll($query, $item, $replace);
$count += $this->pdoExecute($query, $sql, $query->getBind());
}
$this->commit();
} catch (\Exception | \Throwable $e) {
$this->rollback();
throw $e;
}
return $count;
}
$sql = $this->builder->insertAll($query, $dataSet, $replace);
return $this->pdoExecute($query, $sql, $query->getBind());
}
public function selectInsert(BaseQuery $query, array $fields, string $table): int
{
$query->parseOptions();
$sql = $this->builder->selectInsert($query, $fields, $table);
return $this->pdoExecute($query, $sql, $query->getBind());
}
public function update(BaseQuery $query): int
{
$query->parseOptions();
$sql = $this->builder->update($query);
$result = '' == $sql ? 0 : $this->pdoExecute($query, $sql, $query->getBind());
if ($result) {
$this->db->trigger('after_update', $query);
}
return $result;
}
public function delete(BaseQuery $query): int
{
$query->parseOptions();
$sql = $this->builder->delete($query);
$result = $this->pdoExecute($query, $sql, $query->getBind());
if ($result) {
$this->db->trigger('after_delete', $query);
}
return $result;
}
public function value(BaseQuery $query, string $field, $default = null, bool $one = true)
{
$options = $query->parseOptions();
if (isset($options['field'])) {
$query->removeOption('field');
}
if (isset($options['group'])) {
$query->group('');
}
$query->setOption('field', (array) $field);
if (!empty($options['cache'])) {
$cacheItem = $this->parseCache($query, $options['cache'], 'value');
$key = $cacheItem->getKey();
if ($this->cache->has($key)) {
return $this->cache->get($key);
}
}
$sql = $this->builder->select($query, $one);
if (isset($options['field'])) {
$query->setOption('field', $options['field']);
} else {
$query->removeOption('field');
}
if (isset($options['group'])) {
$query->setOption('group', $options['group']);
}
$pdo = $this->getPDOStatement($sql, $query->getBind(), $options['master']);
$result = $pdo->fetchColumn();
if (isset($cacheItem)) {
$cacheItem->set($result);
$this->cacheData($cacheItem);
}
return false !== $result ? $result : $default;
}
public function aggregate(BaseQuery $query, string $aggregate, $field, bool $force = false)
{
if (is_string($field) && 0 === stripos($field, 'DISTINCT ')) {
[$distinct, $field] = explode(' ', $field);
}
$field = $aggregate . '(' . (!empty($distinct) ? 'DISTINCT ' : '') . $this->builder->parseKey($query, $field, true) . ') AS think_' . strtolower($aggregate);
$result = $this->value($query, $field, 0, false);
return $force ? (float) $result : $result;
}
public function column(BaseQuery $query, string $column, string $key = ''): array
{
$options = $query->parseOptions();
if (isset($options['field'])) {
$query->removeOption('field');
}
if ($key && '*' != $column) {
$field = $key . ',' . $column;
} else {
$field = $column;
}
$field = array_map('trim', explode(',', $field));
$query->setOption('field', $field);
if (!empty($options['cache'])) {
$cacheItem = $this->parseCache($query, $options['cache'], 'column');
$name = $cacheItem->getKey();
if ($this->cache->has($name)) {
return $this->cache->get($name);
}
}
$sql = $this->builder->select($query);
if (isset($options['field'])) {
$query->setOption('field', $options['field']);
} else {
$query->removeOption('field');
}
$pdo = $this->getPDOStatement($sql, $query->getBind(), $options['master']);
$resultSet = $pdo->fetchAll(PDO::FETCH_ASSOC);
if (empty($resultSet)) {
$result = [];
} elseif (('*' == $column || strpos($column, ',')) && $key) {
$result = array_column($resultSet, null, $key);
} else {
if (empty($key)) {
$key = null;
}
if (strpos($column, ',')) {
$column = null;
} elseif (strpos($column, ' ')) {
$column = substr(strrchr(trim($column), ' '), 1);
} elseif (strpos($column, '.')) {
[$alias, $column] = explode('.', $column);
}
if (is_string($key) && strpos($key, '.')) {
[$alias, $key] = explode('.', $key);
}
$result = array_column($resultSet, $column, $key);
}
if (isset($cacheItem)) {
$cacheItem->set($result);
$this->cacheData($cacheItem);
}
return $result;
}
public function getRealSql(string $sql, array $bind = []): string
{
foreach ($bind as $key => $val) {
$value = is_array($val) ? $val[0] : $val;
$type = is_array($val) ? $val[1] : PDO::PARAM_STR;
if ((self::PARAM_FLOAT == $type || PDO::PARAM_STR == $type) && is_string($value)) {
$value = '\'' . addslashes($value) . '\'';
} elseif (PDO::PARAM_INT == $type && '' === $value) {
$value = 0;
}
$sql = is_numeric($key) ?
substr_replace($sql, $value, strpos($sql, '?'), 1) :
substr_replace($sql, $value, strpos($sql, ':' . $key), strlen(':' . $key));
}
return rtrim($sql);
}
protected function bindValue(array $bind = []): void
{
foreach ($bind as $key => $val) {
$param = is_numeric($key) ? $key + 1 : ':' . $key;
if (is_array($val)) {
if (PDO::PARAM_INT == $val[1] && '' === $val[0]) {
$val[0] = 0;
} elseif (self::PARAM_FLOAT == $val[1]) {
$val[0] = is_string($val[0]) ? (float) $val[0] : $val[0];
$val[1] = PDO::PARAM_STR;
}
$result = $this->PDOStatement->bindValue($param, $val[0], $val[1]);
} else {
$result = $this->PDOStatement->bindValue($param, $val);
}
if (!$result) {
throw new BindParamException(
"Error occurred when binding parameters '{$param}'",
$this->config,
$this->getLastsql(),
$bind
);
}
}
}
protected function bindParam(array $bind): void
{
foreach ($bind as $key => $val) {
$param = is_numeric($key) ? $key + 1 : ':' . $key;
if (is_array($val)) {
array_unshift($val, $param);
$result = call_user_func_array([$this->PDOStatement, 'bindParam'], $val);
} else {
$result = $this->PDOStatement->bindValue($param, $val);
}
if (!$result) {
$param = array_shift($val);
throw new BindParamException(
"Error occurred when binding parameters '{$param}'",
$this->config,
$this->getLastsql(),
$bind
);
}
}
}
protected function getResult(bool $procedure = false): array
{
if ($procedure) {
return $this->procedure();
}
$result = $this->PDOStatement->fetchAll($this->fetchType);
$this->numRows = count($result);
return $result;
}
protected function procedure(): array
{
$item = [];
do {
$result = $this->getResult();
if (!empty($result)) {
$item[] = $result;
}
} while ($this->PDOStatement->nextRowset());
$this->numRows = count($item);
return $item;
}
public function transaction(callable $callback)
{
$this->startTrans();
try {
$result = null;
if (is_callable($callback)) {
$result = $callback($this);
}
$this->commit();
return $result;
} catch (\Exception | \Throwable $e) {
$this->rollback();
throw $e;
}
}
public function startTrans(): void
{
$this->initConnect(true);
++$this->transTimes;
try {
if (1 == $this->transTimes) {
$this->linkID->beginTransaction();
} elseif ($this->transTimes > 1 && $this->supportSavepoint()) {
$this->linkID->exec(
$this->parseSavepoint('trans' . $this->transTimes)
);
}
$this->reConnectTimes = 0;
} catch (\Exception $e) {
if ($this->reConnectTimes < 4 && $this->isBreak($e)) {
--$this->transTimes;
++$this->reConnectTimes;
$this->close()->startTrans();
}
throw $e;
}
}
public function commit(): void
{
$this->initConnect(true);
if (1 == $this->transTimes) {
$this->linkID->commit();
}
--$this->transTimes;
}
public function rollback(): void
{
$this->initConnect(true);
if (1 == $this->transTimes) {
$this->linkID->rollBack();
} elseif ($this->transTimes > 1 && $this->supportSavepoint()) {
$this->linkID->exec(
$this->parseSavepointRollBack('trans' . $this->transTimes)
);
}
$this->transTimes = max(0, $this->transTimes - 1);
}
protected function supportSavepoint(): bool
{
return false;
}
protected function parseSavepoint(string $name): string
{
return 'SAVEPOINT ' . $name;
}
protected function parseSavepointRollBack(string $name): string
{
return 'ROLLBACK TO SAVEPOINT ' . $name;
}
public function batchQuery(BaseQuery $query, array $sqlArray = [], array $bind = []): bool
{
$this->startTrans();
try {
foreach ($sqlArray as $sql) {
$this->pdoExecute($query, $sql, $bind);
}
$this->commit();
} catch (\Exception $e) {
$this->rollback();
throw $e;
}
return true;
}
public function close()
{
$this->linkID = null;
$this->linkWrite = null;
$this->linkRead = null;
$this->links = [];
$this->free();
return $this;
}
protected function isBreak($e): bool
{
if (!$this->config['break_reconnect']) {
return false;
}
$error = $e->getMessage();
foreach ($this->breakMatchStr as $msg) {
if (false !== stripos($error, $msg)) {
return true;
}
}
return false;
}
public function getLastSql(): string
{
return $this->getRealSql($this->queryStr, $this->bind);
}
public function getLastInsID(BaseQuery $query, string $sequence = null)
{
try {
$insertId = $this->linkID->lastInsertId($sequence);
} catch (\Exception $e) {
$insertId = '';
}
return $this->autoInsIDType($query, $insertId);
}
protected function autoInsIDType(BaseQuery $query, string $insertId)
{
$pk = $query->getAutoInc();
if ($pk) {
$type = $this->getFieldBindType($pk);
if (PDO::PARAM_INT == $type) {
$insertId = (int) $insertId;
} elseif (self::PARAM_FLOAT == $type) {
$insertId = (float) $insertId;
}
}
return $insertId;
}
public function getError(): string
{
if ($this->PDOStatement) {
$error = $this->PDOStatement->errorInfo();
$error = $error[1] . ':' . $error[2];
} else {
$error = '';
}
if ('' != $this->queryStr) {
$error .= "\n [ SQL语句 ] : " . $this->getLastsql();
}
return $error;
}
protected function initConnect(bool $master = true): void
{
if (!empty($this->config['deploy'])) {
if ($master || $this->transTimes) {
if (!$this->linkWrite) {
$this->linkWrite = $this->multiConnect(true);
}
$this->linkID = $this->linkWrite;
} else {
if (!$this->linkRead) {
$this->linkRead = $this->multiConnect(false);
}
$this->linkID = $this->linkRead;
}
} elseif (!$this->linkID) {
$this->linkID = $this->connect();
}
}
protected function multiConnect(bool $master = false): PDO
{
$config = [];
foreach (['username', 'password', 'hostname', 'hostport', 'database', 'dsn', 'charset'] as $name) {
$config[$name] = is_string($this->config[$name]) ? explode(',', $this->config[$name]) : $this->config[$name];
}
$m = floor(mt_rand(0, $this->config['master_num'] - 1));
if ($this->config['rw_separate']) {
if ($master) {
$r = $m;
} elseif (is_numeric($this->config['slave_no'])) {
$r = $this->config['slave_no'];
} else {
$r = floor(mt_rand($this->config['master_num'], count($config['hostname']) - 1));
}
} else {
$r = floor(mt_rand(0, count($config['hostname']) - 1));
}
$dbMaster = false;
if ($m != $r) {
$dbMaster = [];
foreach (['username', 'password', 'hostname', 'hostport', 'database', 'dsn', 'charset'] as $name) {
$dbMaster[$name] = $config[$name][$m] ?? $config[$name][0];
}
}
$dbConfig = [];
foreach (['username', 'password', 'hostname', 'hostport', 'database', 'dsn', 'charset'] as $name) {
$dbConfig[$name] = $config[$name][$r] ?? $config[$name][0];
}
return $this->connect($dbConfig, $r, $r == $m ? false : $dbMaster);
}
public function startTransXa(string $xid)
{}
public function prepareXa(string $xid)
{}
public function commitXa(string $xid)
{}
public function rollbackXa(string $xid)
{}
}