<?php<liu21st@gmail.com>
namespace think\mongo;
use MongoDB\BSON\ObjectID;
use MongoDB\Driver\BulkWrite;
use MongoDB\Driver\Command;
use MongoDB\Driver\Cursor;
use MongoDB\Driver\Exception\AuthenticationException;
use MongoDB\Driver\Exception\BulkWriteException;
use MongoDB\Driver\Exception\ConnectionException;
use MongoDB\Driver\Exception\InvalidArgumentException;
use MongoDB\Driver\Exception\RuntimeException;
use MongoDB\Driver\Manager;
use MongoDB\Driver\Query as MongoQuery;
use MongoDB\Driver\ReadPreference;
use MongoDB\Driver\WriteConcern;
use think\Collection;
use think\Container;
use think\Db;
use think\Exception;
class Connection
{
protected static $instance = [];
protected $dbName = '';
protected $queryStr = '';
protected $typeMap = 'array';
protected $mongo; protected $cursor;
protected static $event = [];
protected $links = [];
protected $linkID;
protected $linkRead;
protected $linkWrite;
protected $builder;
protected $numRows = 0;
protected $error = '';
protected $options = [];
protected static $info = [];
protected $config = [
'type' => '',
'hostname' => '',
'database' => '',
'is_replica_set' => false,
'username' => '',
'password' => '',
'hostport' => '',
'dsn' => '',
'params' => [],
'charset' => 'utf8',
'pk' => '_id',
'pk_type' => 'ObjectID',
'prefix' => '',
'debug' => false,
'deploy' => 0,
'rw_separate' => false,
'master_num' => 1,
'slave_no' => '',
'fields_strict' => true,
'resultset_type' => 'array',
'auto_timestamp' => false,
'datetime_format' => 'Y-m-d H:i:s',
'sql_explain' => false,
'pk_convert_id' => false,
'type_map' => ['root' => 'array', 'document' => 'array'],
'query' => '\\think\\mongo\\Query',
];
public function __construct(array $config = [])
{
if (!class_exists('\MongoDB\Driver\Manager')) {
throw new Exception('require mongodb > 1.0');
}
if (!empty($config)) {
$this->config = array_merge($this->config, $config);
}
$this->builder = new Builder($this);
}
public static function instance($config = [], $name = false)
{
if (false === $name) {
$name = md5(serialize($config));
}
if (true === $name || !isset(self::$instance[$name])) {
$options = self::parseConfig($config);
if (true === $name) {
$name = md5(serialize($config));
}
self::$instance[$name] = new static($options);
}
return self::$instance[$name];
}
public function connect(array $config = [], $linkNum = 0)
{
if (!isset($this->links[$linkNum])) {
if (empty($config)) {
$config = $this->config;
} else {
$config = array_merge($this->config, $config);
}
$this->dbName = $config['database'];
$this->typeMap = $config['type_map'];
if ($config['pk_convert_id'] && '_id' == $config['pk']) {
$this->config['pk'] = 'id';
}
$host = 'mongodb://' . ($config['username'] ? "{$config['username']}" : '') . ($config['password'] ? ":{$config['password']}@" : '') . $config['hostname'] . ($config['hostport'] ? ":{$config['hostport']}" : '');
if ($config['debug']) {
$startTime = microtime(true);
}
$this->links[$linkNum] = new Manager($host, $this->config['params']);
if ($config['debug']) {
$this->logger('[ MongoDb ] CONNECT :[ UseTime:' . number_format(microtime(true) - $startTime, 6) . 's ] ' . $config['dsn']);
}
}
return $this->links[$linkNum];
}
public function getConfig($config = '')
{
return $config ? $this->config[$config] : $this->config;
}
public function setConfig($config, $value)
{
$this->config[$config] = $value;
}
public function getMongo()
{
if (!$this->mongo) {
return;
} else {
return $this->mongo;
}
}
public function db($db = null)
{
if (is_null($db)) {
return $this->dbName;
} else {
$this->dbName = $db;
}
}
public function parseSqlTable($sql)
{
if (false !== strpos($sql, '__')) {
$prefix = $this->getConfig('prefix');
$sql = preg_replace_callback("/__([A-Z0-9_-]+)__/sU", function ($match) use ($prefix) {
return $prefix . strtolower($match[1]);
}, $sql);
}
return $sql;
}
public function query($namespace, MongoQuery $query, ReadPreference $readPreference = null, $class = false, $typeMap = null)
{
$this->initConnect(false);
Db::$queryTimes++;
if (false === strpos($namespace, '.')) {
$namespace = $this->dbName . '.' . $namespace;
}
if ($this->config['debug'] && !empty($this->queryStr)) {
$this->queryStr = 'db' . strstr($namespace, '.') . '.' . $this->queryStr;
}
$this->debug(true);
$this->cursor = $this->mongo->executeQuery($namespace, $query, $readPreference);
$this->debug(false);
return $this->getResult($class, $typeMap);
}
public function command(Command $command, $dbName = '', ReadPreference $readPreference = null, $class = false, $typeMap = null)
{
$this->initConnect(false);
Db::$queryTimes++;
$this->debug(true);
$dbName = $dbName ?: $this->dbName;
if ($this->config['debug'] && !empty($this->queryStr)) {
$this->queryStr = 'db.' . $this->queryStr;
}
$this->cursor = $this->mongo->executeCommand($dbName, $command, $readPreference);
$this->debug(false);
return $this->getResult($class, $typeMap);
}
protected function getResult($class = '', $typeMap = null)
{
if (true === $class) {
return $this->cursor;
}
if (is_null($typeMap)) {
$typeMap = $this->typeMap;
}
$typeMap = is_string($typeMap) ? ['root' => $typeMap] : $typeMap;
$this->cursor->setTypeMap($typeMap);
$result = $this->cursor->toArray();
if ($this->getConfig('pk_convert_id')) {
foreach ($result as &$data) {
$this->convertObjectID($data);
}
}
$this->numRows = count($result);
return $result;
}
private function convertObjectID(&$data)
{
if (isset($data['_id'])) {
$data['id'] = $data['_id']->__toString();
unset($data['_id']);
}
}
public function execute($namespace, BulkWrite $bulk, WriteConcern $writeConcern = null)
{
$this->initConnect(true);
Db::$executeTimes++;
if (false === strpos($namespace, '.')) {
$namespace = $this->dbName . '.' . $namespace;
}
if ($this->config['debug'] && !empty($this->queryStr)) {
$this->queryStr = 'db' . strstr($namespace, '.') . '.' . $this->queryStr;
}
$this->debug(true);
$writeResult = $this->mongo->executeBulkWrite($namespace, $bulk, $writeConcern);
$this->debug(false);
$this->numRows = $writeResult->getMatchedCount();
return $writeResult;
}
public function log($type, $data, $options = [])
{
if (!$this->config['debug']) {
return;
}
if (is_array($data)) {
array_walk_recursive($data, function (&$value) {
if ($value instanceof ObjectID) {
$value = $value->__toString();
}
});
}
switch (strtolower($type)) {
case 'aggregate':
$this->queryStr = 'runCommand(' . ($data ? json_encode($data) : '') . ');';
break;
case 'find':
$this->queryStr = $type . '(' . ($data ? json_encode($data) : '') . ')';
if (isset($options['sort'])) {
$this->queryStr .= '.sort(' . json_encode($options['sort']) . ')';
}
if (isset($options['limit'])) {
$this->queryStr .= '.limit(' . $options['limit'] . ')';
}
$this->queryStr .= ';';
break;
case 'insert':
case 'remove':
$this->queryStr = $type . '(' . ($data ? json_encode($data) : '') . ');';
break;
case 'update':
$this->queryStr = $type . '(' . json_encode($options) . ',' . json_encode($data) . ');';
break;
case 'cmd':
$this->queryStr = $data . '(' . json_encode($options) . ');';
break;
}
$this->options = $options;
}
public function getLastSql()
{
return $this->queryStr;
}
public function listen($callback)
{
self::$event[] = $callback;
}
protected function triggerSql($sql, $runtime, $options = [])
{
if (!empty(self::$event)) {
foreach (self::$event as $callback) {
if (is_callable($callback)) {
call_user_func_array($callback, [$sql, $runtime, $options]);
}
}
} else {
$this->logger('[ SQL ] ' . $sql . ' [ RunTime:' . $runtime . 's ]');
}
}
public function logger($log, $type = 'sql')
{
$this->config['debug'] && Container::get('log')->record($log, $type);
}
protected function debug($start, $sql = '')
{
if (!empty($this->config['debug'])) {
$debug = Container::get('debug');
if ($start) {
$debug->remark('queryStartTime', 'time');
} else {
$debug->remark('queryEndTime', 'time');
$runtime = $debug->getRangeTime('queryStartTime', 'queryEndTime');
$sql = $sql ?: $this->queryStr;
$this->triggerSql($sql, $runtime, $this->options);
}
}
}
public function free()
{
$this->cursor = null;
}
public function close()
{
$this->mongo = null;
$this->cursor = null;
$this->linkRead = null;
$this->linkWrite = null;
$this->links = [];
}
protected function initConnect($master = true)
{
if (!empty($this->config['deploy'])) {
if ($master) {
if (!$this->linkWrite) {
$this->linkWrite = $this->multiConnect(true);
}
$this->mongo = $this->linkWrite;
} else {
if (!$this->linkRead) {
$this->linkRead = $this->multiConnect(false);
}
$this->mongo = $this->linkRead;
}
} elseif (!$this->mongo) {
$this->mongo = $this->connect();
}
}
protected function multiConnect($master = false)
{
$config = [];
foreach (['username', 'password', 'hostname', 'hostport', 'database', 'dsn'] as $name) {
$config[$name] = explode(',', $this->config[$name]);
}
$m = floor(mt_rand(0, $this->config['master_num'] - 1));
if ($this->config['rw_separate']) {
if ($master) {
if ($this->config['is_replica_set']) {
return $this->replicaSetConnect();
} else {
$r = $m;
}
} elseif (is_numeric($this->config['slave_no'])) {
$r = $this->config['slave_no'];
} else {
$r = floor(mt_rand($this->config['master_num'], count($config['hostname']) - 1));
}
} else {
$r = floor(mt_rand(0, count($config['hostname']) - 1));
}
$dbConfig = [];
foreach (['username', 'password', 'hostname', 'hostport', 'database', 'dsn'] as $name) {
$dbConfig[$name] = isset($config[$name][$r]) ? $config[$name][$r] : $config[$name][0];
}
return $this->connect($dbConfig, $r);
}
public function replicaSetConnect()
{
$this->dbName = $this->config['database'];
$this->typeMap = $this->config['type_map'];
if ($this->config['debug']) {
$startTime = microtime(true);
}
$this->config['params']['replicaSet'] = $this->config['database'];
$manager = new Manager($this->buildUrl(), $this->config['params']);
if ($this->config['debug']) {
$this->logger('[ MongoDB ] ReplicaSet CONNECT:[ UseTime:' . number_format(microtime(true) - $startTime, 6) . 's ] ' . $this->config['dsn']);
}
return $manager;
}
private function buildUrl()
{
$url = 'mongodb://' . ($this->config['username'] ? "{$this->config['username']}" : '') . ($this->config['password'] ? ":{$this->config['password']}@" : '');
$hostList = explode(',', $this->config['hostname']);
$portList = explode(',', $this->config['hostport']);
for ($i = 0; $i < count($hostList); $i++) {
$url = $url . $hostList[$i] . ':' . $portList[0] . ',';
}
return rtrim($url, ",") . '/';
}
public function insert(Query $query, $replace = null, $getLastInsID = false)
{
$options = $query->getOptions();
if (empty($options['data'])) {
throw new Exception('miss data to insert');
}
$bulk = $this->builder->insert($query, $replace);
$writeConcern = isset($options['writeConcern']) ? $options['writeConcern'] : null;
$writeResult = $this->execute($options['table'], $bulk, $writeConcern);
$result = $writeResult->getInsertedCount();
if ($result) {
$data = $options['data'];
$lastInsId = $this->getLastInsID();
if ($lastInsId) {
$pk = $query->getPk($options);
$data[$pk] = $lastInsId;
}
$query->setOption('data', $data);
$query->trigger('after_insert');
if ($getLastInsID) {
return $lastInsId;
}
}
return $result;
}
public function getLastInsID($sequence = null)
{
$id = $this->builder->getLastInsID();
if (is_array($id)) {
array_walk($id, function (&$item, $key) {
if ($item instanceof ObjectID) {
$item = $item->__toString();
}
});
} elseif ($id instanceof ObjectID) {
$id = $id->__toString();
}
return $id;
}
public function insertAll(Query $query, array $dataSet)
{
$options = $query->getOptions();
if (!is_array(reset($dataSet))) {
return false;
}
$bulk = $this->builder->insertAll($query, $dataSet);
$writeConcern = isset($options['writeConcern']) ? $options['writeConcern'] : null;
$writeResult = $this->execute($options['table'], $bulk, $writeConcern);
return $writeResult->getInsertedCount();
}
public function update(Query $query)
{
$options = $query->getOptions();
$data = $options['data'];
if (isset($options['cache']) && is_string($options['cache']['key'])) {
$key = $options['cache']['key'];
}
$pk = $query->getPk($options);
if (empty($options['where'])) {
if (is_string($pk) && isset($data[$pk])) {
$where[$pk] = $data[$pk];
$key = 'mongo:' . $options['table'] . '|' . $data[$pk];
unset($data[$pk]);
} elseif (is_array($pk)) {
foreach ($pk as $field) {
if (isset($data[$field])) {
$where[$field] = $data[$field];
} else {
throw new Exception('miss complex primary data');
}
unset($data[$field]);
}
}
if (!isset($where)) {
throw new Exception('miss update condition');
} else {
$options['where']['$and'] = $where;
}
} elseif (!isset($key) && is_string($pk) && isset($options['where']['$and'][$pk])) {
$key = $this->getCacheKey($options['where']['$and'][$pk], $options);
}
$bulk = $this->builder->update($query);
$writeConcern = isset($options['writeConcern']) ? $options['writeConcern'] : null;
$writeResult = $this->execute($options['table'], $bulk, $writeConcern);
if (isset($key) && Container::get('cache')->get($key)) {
Container::get('cache')->rm($key);
}
$result = $writeResult->getModifiedCount();
if ($result) {
if (isset($where[$pk])) {
$data[$pk] = $where[$pk];
} elseif (is_string($pk) && isset($key) && strpos($key, '|')) {
list($a, $val) = explode('|', $key);
$data[$pk] = $val;
}
$query->setOption('data', $data);
$query->trigger('after_update');
}
return $result;
}
public function delete(Query $query)
{
$options = $query->getOptions();
$pk = $query->getPk($options);
$data = $options['data'];
if (!is_null($data) && true !== $data) {
if (!is_array($data)) {
$key = 'mongo:' . $options['table'] . '|' . $data;
}
$query->parsePkWhere($data);
} elseif (!isset($key) && is_string($pk) && isset($options['where']['$and'][$pk])) {
$key = $this->getCacheKey($options['where']['$and'][$pk], $options);
}
if (true !== $data && empty($options['where'])) {
throw new Exception('delete without condition');
}
$bulk = $this->builder->delete($query);
$writeConcern = isset($options['writeConcern']) ? $options['writeConcern'] : null;
$writeResult = $this->execute($options['table'], $bulk, $writeConcern);
if (isset($key) && Container::get('cache')->get($key)) {
Container::get('cache')->rm($key);
}
$result = $writeResult->getDeletedCount();
if ($result) {
if (!is_array($data) && is_string($pk) && isset($key) && strpos($key, '|')) {
list($a, $val) = explode('|', $key);
$item[$pk] = $val;
$data = $item;
}
$query->setOption('data', $data);
$query->trigger('after_delete');
}
return $result;
}
public function getCursor(Query $query)
{
$options = $query->getOptions();
$mongoQuery = $this->builder->select($query);
$readPreference = isset($options['readPreference']) ? $options['readPreference'] : null;
return $this->query($options['table'], $mongoQuery, $readPreference, true, $options['typeMap']);
}
public function select(Query $query)
{
$options = $query->getOptions();
$resultSet = false;
if (!empty($options['cache'])) {
$cache = $options['cache'];
$key = is_string($cache['key']) ? $cache['key'] : md5(serialize($options));
$resultSet = Container::get('cache')->get($key);
}
if (!$resultSet) {
$mongoQuery = $this->builder->select($query);
if ($resultSet = $query->trigger('before_select')) {
} else {
$readPreference = isset($options['readPreference']) ? $options['readPreference'] : null;
$resultSet = $this->query($options['table'], $mongoQuery, $readPreference, $options['fetch_cursor'], $options['typeMap']);
if ($resultSet instanceof Cursor) {
return $resultSet;
}
}
if (isset($cache)) {
$this->cacheData($key, $resultSet, $cache);
}
}
return $resultSet;
}
public function find(Query $query)
{
$options = $query->getOptions();
$pk = $query->getPk($options);
$data = $options['data'];
if (!empty($options['cache']) && true === $options['cache']['key'] && is_string($pk) && isset($options['where']['$and'][$pk])) {
$key = $this->getCacheKey($options['where']['$and'][$pk], $options);
}
$result = false;
if (!empty($options['cache'])) {
$cache = $options['cache'];
if (true === $cache['key'] && !is_null($data) && !is_array($data)) {
$key = 'mongo:' . $options['table'] . '|' . $data;
} elseif (!isset($key)) {
$key = is_string($cache['key']) ? $cache['key'] : md5(serialize($options));
}
$result = Container::get('cache')->get($key);
}
if (false === $result) {
if (is_string($pk)) {
if (!is_array($data)) {
if (isset($key) && strpos($key, '|')) {
list($a, $val) = explode('|', $key);
$item[$pk] = $val;
} else {
$item[$pk] = $data;
}
$data = $item;
}
}
$query->setOption('data', $data);
$query->setOption('limit', 1);
$mongoQuery = $this->builder->select($query);
if ($result = $query->trigger('before_find')) {
} else {
$readPreference = isset($options['readPreference']) ? $options['readPreference'] : null;
$resultSet = $this->query($options['table'], $mongoQuery, $readPreference, $options['fetch_cursor'], $options['typeMap']);
if ($resultSet instanceof Cursor) {
return $resultSet;
}
$result = isset($resultSet[0]) ? $resultSet[0] : null;
}
if (isset($cache)) {
$this->cacheData($key, $result, $cache);
}
}
return $result;
}
protected function cacheData($key, $data, $config = [])
{
$cache = Container::get('cache');
if (isset($config['tag'])) {
$cache->tag($config['tag'])->set($key, $data, $config['expire']);
} else {
$cache->set($key, $data, $config['expire']);
}
}
protected function getCacheKey($value, $options)
{
if (is_scalar($value)) {
$data = $value;
} elseif (is_array($value) && 'eq' == strtolower($value[0])) {
$data = $value[1];
}
if (isset($data)) {
return 'mongo:' . $options['table'] . '|' . $data;
} else {
return md5(serialize($options));
}
}
public function getTableInfo($tableName, $fetch = '')
{
if (is_array($tableName)) {
$tableName = key($tableName) ?: current($tableName);
}
if (strpos($tableName, ',')) {
return false;
} else {
$tableName = $this->parseSqlTable($tableName);
}
$guid = md5($tableName);
if (!isset(self::$info[$guid])) {
$mongoQuery = new MongoQuery([], ['limit' => 1]);
$cursor = $this->query($tableName, $mongoQuery, null, true, ['root' => 'array', 'document' => 'array']);
$resultSet = $cursor->toArray();
$result = isset($resultSet[0]) ? (array) $resultSet[0] : [];
$fields = array_keys($result);
$type = [];
foreach ($result as $key => $val) {
$type[$key] = getType($val);
if ('_id' == $key) {
$pk = $key;
}
}
if (!isset($pk)) {
$pk = null;
}
$result = ['fields' => $fields, 'type' => $type, 'pk' => $pk];
self::$info[$guid] = $result;
}
return $fetch ? self::$info[$guid][$fetch] : self::$info[$guid];
}
public function value(Query $query, $field, $default = null)
{
$options = $query->getOptions();
$result = null;
if (!empty($options['cache'])) {
$cache = $options['cache'];
$key = is_string($cache['key']) ? $cache['key'] : md5($field . serialize($options));
$result = Container::get('cache')->get($key);
}
if (!$result) {
if (isset($options['field'])) {
$query->removeOption('field');
}
$query->setOption('field', $field);
$query->setOption('limit', 1);
$mongoQuery = $this->builder->select($query);
$readPreference = isset($options['readPreference']) ? $options['readPreference'] : null;
$cursor = $this->query($options['table'], $mongoQuery, $readPreference, true, ['root' => 'array']);
$resultSet = $cursor->toArray();
if (!empty($resultSet)) {
$data = (array) array_shift($resultSet);
if ($this->getConfig('pk_convert_id')) {
$data['id'] = $data['_id']->__toString();
}
$result = $data[$field];
} else {
$result = null;
}
if (isset($cache)) {
$this->cacheData($key, $result, $cache);
}
}
return !is_null($result) ? $result : $default;
}
public function column(Query $query, $field, $key = '')
{
$options = $query->getOptions();
$result = false;
if (!empty($options['cache'])) {
$cache = $options['cache'];
$guid = is_string($cache['key']) ? $cache['key'] : md5($field . serialize($options));
$result = Container::get('cache')->get($guid);
}
if (!$result) {
if (isset($options['projection'])) {
$query->removeOption('projection');
}
if ($key && '*' != $field) {
$field = $key . ',' . $field;
}
if (is_string($field)) {
$field = array_map('trim', explode(',', $field));
}
$query->field($field);
$mongoQuery = $this->builder->select($query);
$readPreference = isset($options['readPreference']) ? $options['readPreference'] : null;
$cursor = $this->query($options['table'], $mongoQuery, $readPreference, true, ['root' => 'array']);
$resultSet = $cursor->toArray();
if ($resultSet) {
$fields = array_keys(get_object_vars($resultSet[0]));
$count = count($fields);
$key1 = array_shift($fields);
$key2 = $fields ? array_shift($fields) : '';
$key = $key ?: $key1;
foreach ($resultSet as $val) {
$val = (array) $val;
if ($this->getConfig('pk_convert_id')) {
$val['id'] = $val['_id']->__toString();
unset($val['_id']);
}
$name = $val[$key];
if ($name instanceof ObjectID) {
$name = $name->__toString();
}
if (2 == $count) {
$result[$name] = $val[$key2];
} elseif (1 == $count) {
$result[$name] = $val[$key1];
} else {
$result[$name] = $val;
}
}
} else {
$result = [];
}
if (isset($cache) && isset($guid)) {
$this->cacheData($guid, $result, $cache);
}
}
return $result;
}
public function cmd(Query $query, $command, $extra = null, $db = null)
{
if (is_array($command) || is_object($command)) {
if ($this->getConfig('debug')) {
$this->log('cmd', 'cmd', $command);
}
$command = new Command($command);
} else {
$command = $this->builder->$command($query, $extra);
}
return $this->command($command, $db);
}
private static function parseConfig($config)
{
if (empty($config)) {
$config = Container::get('config')->pull('database');
} elseif (is_string($config) && false === strpos($config, '/')) {
$config = Container::get('config')->get('database.' . $config);
}
if (is_string($config)) {
return self::parseDsnConfig($config);
} else {
return $config;
}
}
private static function parseDsnConfig($dsnStr)
{
$info = parse_url($dsnStr);
if (!$info) {
return [];
}
$dsn = [
'type' => $info['scheme'],
'username' => isset($info['user']) ? $info['user'] : '',
'password' => isset($info['pass']) ? $info['pass'] : '',
'hostname' => isset($info['host']) ? $info['host'] : '',
'hostport' => isset($info['port']) ? $info['port'] : '',
'database' => !empty($info['path']) ? ltrim($info['path'], '/') : '',
'charset' => isset($info['fragment']) ? $info['fragment'] : 'utf8',
];
if (isset($info['query'])) {
parse_str($info['query'], $dsn['params']);
} else {
$dsn['params'] = [];
}
return $dsn;
}
public function getPk($tableName)
{
return $this->getTableInfo($tableName, 'pk');
}
public function getTableFields($tableName)
{
return $this->getTableInfo($tableName, 'fields');
}
public function getFieldsType($tableName)
{
return $this->getTableInfo($tableName, 'type');
}
public function startTrans()
{}
public function commit()
{}
public function rollback()
{}
public function __destruct()
{
$this->free();
$this->close();
}
}