<?php<448901948@qq.com>
namespace think\queue\driver;
use Exception;
use think\queue\job\Redis as RedisJob;
class Redis
{
protected $redis;
protected $options = [
'expire' => 60,
'default' => 'default',
'host' => '127.0.0.1',
'port' => 6379,
'password' => '',
'timeout' => 0,
'persistent' => false
];
public function __construct($options)
{
if (!extension_loaded('redis')) {
throw new Exception('redis扩展未安装');
}
if (!empty($options)) {
$this->options = array_merge($this->options, $options);
}
$func = $this->options['persistent'] ? 'pconnect' : 'connect';
$this->redis = new \Redis;
$this->redis->$func($this->options['host'], $this->options['port'], $this->options['timeout']);
if ('' != $this->options['password']) {
$this->redis->auth($this->options['password']);
}
}
public function push($job, $data = '', $queue = null)
{
return $this->pushRaw($this->createPayload($job, $data), $queue);
}
public function later($delay, $job, $data = '', $queue = null)
{
$payload = $this->createPayload($job, $data);
$this->redis->zAdd($this->getQueue($queue) . ':delayed', time() + $delay, $payload);
}
public function pop($queue = null)
{
$original = $queue ?: $this->options['default'];
$queue = $this->getQueue($queue);
if (!is_null($this->options['expire'])) {
$this->migrateAllExpiredJobs($queue);
}
$job = $this->redis->lPop($queue);
if ($job !== false) {
$this->redis->zAdd($queue . ':reserved', time() + $this->options['expire'], $job);
return new RedisJob($this, $job, $original);
}
}
public function release($queue, $payload, $delay, $attempts)
{
$payload = $this->setMeta($payload, 'attempts', $attempts);
$this->redis->zAdd($this->getQueue($queue) . ':delayed', time() + $delay, $payload);
}
public function pushRaw($payload, $queue = null)
{
$this->redis->rPush($this->getQueue($queue), $payload);
return json_decode($payload, true)['id'];
}
protected function createPayload($job, $data)
{
$payload = json_encode(['job' => $job, 'data' => $data]);
$payload = $this->setMeta($payload, 'id', $this->getRandomId());
return $this->setMeta($payload, 'attempts', 1);
}
public function deleteReserved($queue, $job)
{
$this->redis->zRem($this->getQueue($queue) . ':reserved', $job);
}
protected function migrateAllExpiredJobs($queue)
{
$this->migrateExpiredJobs($queue . ':delayed', $queue, false);
$this->migrateExpiredJobs($queue . ':reserved', $queue);
}
public function migrateExpiredJobs($from, $to, $attempt = true)
{
$this->redis->watch($from);
$jobs = $this->getExpiredJobs(
$from, $time = time()
);
if (count($jobs) > 0) {
$this->transaction(function () use ($from, $to, $time, $jobs, $attempt) {
$this->removeExpiredJobs($from, $time);
$this->pushExpiredJobsOntoNewQueue($to, $jobs, $attempt);
});
}
$this->redis->unwatch();
}
protected function transaction(\Closure $closure)
{
$this->redis->multi();
try {
call_user_func($closure);
if (!$this->redis->exec()) {
$this->redis->discard();
}
} catch (Exception $e) {
$this->redis->discard();
}
}
protected function getExpiredJobs($from, $time)
{
return $this->redis->zRangeByScore($from, '-inf', $time);
}
protected function removeExpiredJobs($from, $time)
{
$this->redis->zRemRangeByScore($from, '-inf', $time);
}
protected function pushExpiredJobsOntoNewQueue($to, $jobs, $attempt = true)
{
if ($attempt) {
foreach ($jobs as &$job) {
$attempts = json_decode($job, true)['attempts'];
$job = $this->setMeta($job, 'attempts', $attempts + 1);
}
}
call_user_func_array([$this->redis, 'rPush'], array_merge([$to], $jobs));
}
protected function getRandomId()
{
return uniqid();
}
protected function setMeta($payload, $key, $value)
{
$payload = json_decode($payload, true);
$payload[$key] = $value;
return json_encode($payload);
}
protected function getQueue($queue)
{
return 'queues:' . ($queue ?: $this->options['default']);
}
}