<?php<liu21st@gmail.com>declare (strict_types = 1);
namespace think\db;
use MongoDB\Driver\Command;
use MongoDB\Driver\Cursor;
use MongoDB\Driver\Exception\AuthenticationException;
use MongoDB\Driver\Exception\ConnectionException;
use MongoDB\Driver\Exception\InvalidArgumentException;
use MongoDB\Driver\Exception\RuntimeException;
use MongoDB\Driver\ReadPreference;
use MongoDB\Driver\WriteConcern;
use think\db\exception\DbException as Exception;
use think\Paginator;
class Mongo extends BaseQuery
{
protected $connection;
public function command(Command $command, string $dbName = '', ReadPreference $readPreference = null, $typeMap = null)
{
return $this->connection->command($command, $dbName, $readPreference, $typeMap);
}
public function cmd($command, $extra = null, string $db = ''): array
{
$this->parseOptions();
return $this->connection->cmd($this, $command, $extra, $db);
}
public function getDistinct(string $field)
{
$result = $this->cmd('distinct', $field);
return $result[0]['values'];
}
public function listCollections(string $db = '')
{
$cursor = $this->cmd('listCollections', null, $db);
$result = [];
foreach ($cursor as $collection) {
$result[] = $collection['name'];
}
return $result;
}
public function count(string $field = null): int
{
$result = $this->cmd('count');
return $result[0]['n'];
}
public function aggregate(string $aggregate, $field, bool $force = false)
{
$result = $this->cmd('aggregate', [strtolower($aggregate), $field]);
$value = $result[0]['aggregate'] ?? 0;
if ($force) {
$value += 0;
}
return $value;
}
public function multiAggregate(array $aggregate, array $groupBy): array
{
$result = $this->cmd('multiAggregate', [$aggregate, $groupBy]);
foreach ($result as &$row) {
if (isset($row['_id']) && !empty($row['_id'])) {
foreach ($row['_id'] as $k => $v) {
$row[$k] = $v;
}
unset($row['_id']);
}
}
return $result;
}
public function inc(string $field, float $step = 1)
{
$this->options['data'][$field] = ['$inc', $step];
return $this;
}
public function dec(string $field, float $step = 1)
{
return $this->inc($field, -1 * $step);
}
public function table($table)
{
$this->options['table'] = $table;
return $this;
}
public function collection(string $collection)
{
return $this->table($collection);
}
public function typeMap($typeMap)
{
$this->options['typeMap'] = $typeMap;
return $this;
}
public function awaitData(bool $awaitData)
{
$this->options['awaitData'] = $awaitData;
return $this;
}
public function batchSize(int $batchSize)
{
$this->options['batchSize'] = $batchSize;
return $this;
}
public function exhaust(bool $exhaust)
{
$this->options['exhaust'] = $exhaust;
return $this;
}
public function modifiers(array $modifiers)
{
$this->options['modifiers'] = $modifiers;
return $this;
}
public function noCursorTimeout(bool $noCursorTimeout)
{
$this->options['noCursorTimeout'] = $noCursorTimeout;
return $this;
}
public function oplogReplay(bool $oplogReplay)
{
$this->options['oplogReplay'] = $oplogReplay;
return $this;
}
public function partial(bool $partial)
{
$this->options['partial'] = $partial;
return $this;
}
public function maxTimeMS(string $maxTimeMS)
{
$this->options['maxTimeMS'] = $maxTimeMS;
return $this;
}
public function collation(array $collation)
{
$this->options['collation'] = $collation;
return $this;
}
public function replace(bool $replace = true)
{
return $this;
}
public function field($field)
{
if (empty($field) || '*' == $field) {
return $this;
}
if (is_string($field)) {
$field = array_map('trim', explode(',', $field));
}
$projection = [];
foreach ($field as $key => $val) {
if (is_numeric($key)) {
$projection[$val] = 1;
} else {
$projection[$key] = $val;
}
}
$this->options['projection'] = $projection;
return $this;
}
public function withoutField($field)
{
if (empty($field) || '*' == $field) {
return $this;
}
if (is_string($field)) {
$field = array_map('trim', explode(',', $field));
}
$projection = [];
foreach ($field as $key => $val) {
if (is_numeric($key)) {
$projection[$val] = 0;
} else {
$projection[$key] = $val;
}
}
$this->options['projection'] = $projection;
return $this;
}
public function skip(int $skip)
{
$this->options['skip'] = $skip;
return $this;
}
public function slaveOk(bool $slaveOk)
{
$this->options['slaveOk'] = $slaveOk;
return $this;
}
public function limit(int $offset, int $length = null)
{
if (is_null($length)) {
$length = $offset;
$offset = 0;
}
$this->options['skip'] = $offset;
$this->options['limit'] = $length;
return $this;
}
public function order($field, string $order = '')
{
if (is_array($field)) {
$this->options['sort'] = $field;
} else {
$this->options['sort'][$field] = 'asc' == strtolower($order) ? 1 : -1;
}
return $this;
}
public function tailable(bool $tailable)
{
$this->options['tailable'] = $tailable;
return $this;
}
public function writeConcern(WriteConcern $writeConcern)
{
$this->options['writeConcern'] = $writeConcern;
return $this;
}
public function getPk()
{
return $this->pk ?: $this->connection->getConfig('pk');
}
public function getCursor(): Cursor
{
$this->parseOptions();
return $this->connection->getCursor($this);
}
public function getQueryGuid($data = null): string
{
return md5($this->getConfig('database') . serialize(var_export($data ?: $this->options, true)));
}
public function paginate($listRows = null, $simple = false): Paginator
{
if (is_int($simple)) {
$total = $simple;
$simple = false;
}
$defaultConfig = [
'query' => [], 'fragment' => '', 'var_page' => 'page', 'list_rows' => 15, ];
if (is_array($listRows)) {
$config = array_merge($defaultConfig, $listRows);
$listRows = intval($config['list_rows']);
} else {
$config = $defaultConfig;
$listRows = intval($listRows ?: $config['list_rows']);
}
$page = isset($config['page']) ? (int) $config['page'] : Paginator::getCurrentPage($config['var_page']);
$page = $page < 1 ? 1 : $page;
$config['path'] = $config['path'] ?? Paginator::getCurrentPath();
if (!isset($total) && !$simple) {
$options = $this->getOptions();
unset($this->options['order'], $this->options['limit'], $this->options['page'], $this->options['field']);
$total = $this->count();
$results = $this->options($options)->page($page, $listRows)->select();
} elseif ($simple) {
$results = $this->limit(($page - 1) * $listRows, $listRows + 1)->select();
$total = null;
} else {
$results = $this->page($page, $listRows)->select();
}
$this->removeOption('limit');
$this->removeOption('page');
return Paginator::make($results, $listRows, $page, $total, $simple, $config);
}
public function chunk(int $count, callable $callback, $column = null, string $order = 'asc'): bool
{
$options = $this->getOptions();
$column = $column ?: $this->getPk();
if (isset($options['order'])) {
unset($options['order']);
}
if (is_array($column)) {
$times = 1;
$query = $this->options($options)->page($times, $count);
} else {
$query = $this->options($options)->limit($count);
if (strpos($column, '.')) {
[$alias, $key] = explode('.', $column);
} else {
$key = $column;
}
}
$resultSet = $query->order($column, $order)->select();
while (count($resultSet) > 0) {
if (false === call_user_func($callback, $resultSet)) {
return false;
}
if (isset($times)) {
$times++;
$query = $this->options($options)->page($times, $count);
} else {
$end = $resultSet->pop();
$lastId = is_array($end) ? $end[$key] : $end->getData($key);
$query = $this->options($options)
->limit($count)
->where($column, 'asc' == strtolower($order) ? '>' : '<', $lastId);
}
$resultSet = $query->order($column, $order)->select();
}
return true;
}
public function parseOptions(): array
{
$options = $this->options;
if (empty($options['table'])) {
$options['table'] = $this->getTable();
}
foreach (['where', 'data'] as $name) {
if (!isset($options[$name])) {
$options[$name] = [];
}
}
$modifiers = empty($options['modifiers']) ? [] : $options['modifiers'];
if (isset($options['comment'])) {
$modifiers['$comment'] = $options['comment'];
}
if (isset($options['maxTimeMS'])) {
$modifiers['$maxTimeMS'] = $options['maxTimeMS'];
}
if (!empty($modifiers)) {
$options['modifiers'] = $modifiers;
}
if (!isset($options['projection'])) {
$options['projection'] = [];
}
if (!isset($options['typeMap'])) {
$options['typeMap'] = $this->getConfig('type_map');
}
if (!isset($options['limit'])) {
$options['limit'] = 0;
}
foreach (['master', 'fetch_sql', 'fetch_cursor'] as $name) {
if (!isset($options[$name])) {
$options[$name] = false;
}
}
if (isset($options['page'])) {
[$page, $listRows] = $options['page'];
$page = $page > 0 ? $page : 1;
$listRows = $listRows > 0 ? $listRows : (is_numeric($options['limit']) ? $options['limit'] : 20);
$offset = $listRows * ($page - 1);
$options['skip'] = intval($offset);
$options['limit'] = intval($listRows);
}
$this->options = $options;
return $options;
}
public function getFieldsType(): array
{
if (!empty($this->options['field_type'])) {
return $this->options['field_type'];
}
return [];
}
public function getFieldType(string $field)
{
$fieldType = $this->getFieldsType();
return $fieldType[$field] ?? null;
}
}