<?php
declare(strict_types=1);
namespace Mine\Amqp\Listener;
use App\System\Mapper\SystemQueueMapper;
use App\System\Model\SystemQueue;
use App\System\Service\SystemQueueService;
use Mine\Amqp\Event\AfterConsume;
use Mine\Amqp\Event\BeforeConsume;
use Mine\Amqp\Event\ConsumeEvent;
use Mine\Amqp\Event\FailToConsume;
use Mine\Amqp\Event\WaitTimeout;
use Hyperf\Event\Contract\ListenerInterface;
use Hyperf\Event\Annotation\Listener;
class QueueConsumeListener implements ListenerInterface
{
private $service;
private $uuid;
private $throwable;
public function listen(): array
{
return [
AfterConsume::class,
BeforeConsume::class,
ConsumeEvent::class,
FailToConsume::class,
WaitTimeout::class,
];
}
public function process(object $event)
{
$this->service = new SystemQueueService(new SystemQueueMapper());
$message = $event->message;
$this->throwable = $event->throwable ?? '';
if($message){
$this->uuid = $event->data['uuid'];
$class = get_class($event);
$func = lcfirst(trim(strrchr($class, '\\'),'\\'));
$this->$func($message);
}
}
public function beforeConsume($message){
$condition = ['uuid'=>$this->uuid];
$data = ['consume_status'=>SystemQueue::CONSUME_STATUS_DOING];
$this->service->update($condition,$data);
}
public function consumeEvent($message){ }
public function afterConsume($message){
$condition = ['uuid'=>$this->uuid];
$data = ['consume_status'=>SystemQueue::CONSUME_STATUS_SUCCESS];
$this->service->update($condition,$data);
}
public function failToConsume($message){
$condition = ['uuid'=>$this->uuid];
$data = ['consume_status'=>SystemQueue::CONSUME_STATUS_4];
if($this->throwable){
$data['log_content'] = $this->throwable->getMessage();
}
$this->service->update($condition,$data);
}
}