<?php
declare(strict_types=1);
namespace Mine\Amqp\Listener;
use App\System\Mapper\SystemQueueMapper;
use App\System\Model\SystemQueue;
use App\System\Service\SystemQueueService;
use Mine\Helper\Str;
use Mine\Amqp\Event\AfterProduce;
use Mine\Amqp\Event\BeforeProduce;
use Mine\Amqp\Event\FailToProduce;
use Mine\Amqp\Event\ProduceEvent;
use Mine\Amqp\Event\WaitTimeout;
use Hyperf\Event\Contract\ListenerInterface;
use Hyperf\Event\Annotation\Listener;
class QueueProduceListener implements ListenerInterface
{
private $service;
private $exchangeName;
private $routingKeyName;
private $queueName;
private $throwable;
private $uuid;
public function listen(): array
{
return [
AfterProduce::class,
BeforeProduce::class,
ProduceEvent::class,
FailToProduce::class,
WaitTimeout::class,
];
}
public function process(object $event)
{
$this->service = new SystemQueueService(new SystemQueueMapper());
$producer = $event->producer;
$this->throwable = $event->throwable ?? '';
$delayTime = $event->delayTime ?? 0;
$class = get_class($event);
$func = lcfirst(trim(strrchr($class, '\\'),'\\'));
$this->$func($producer,$delayTime);
}
public function beforeProduce($producer,$delayTime){
$queueName = strchr($producer->getRoutingKey(),'.',true).'.queue';
$this->exchangeName = $producer->getExchange();
$this->routingKeyName = $producer->getRoutingKey();
$this->queueName = $queueName;
$uuid = Str::getUUID();
$this->uuid = $uuid;
$content = ['uuid'=>$uuid,'data'=>json_decode($producer->payload())];
$producer->setPayload($content);
$data = [
'uuid'=>$uuid,
'exchange_name'=>$this->exchangeName,
'routing_key_name'=>$this->routingKeyName,
'queue_name'=>$this->queueName,
'queue_content'=>$producer->payload(),
'delay_time'=>$delayTime,
'produce_status'=>SystemQueue::PRODUCE_STATUS_SUCCESS
];
$this->service->save($data);
}
public function produceEvent($producer,$delayTime){ }
public function afterProduce($producer,$delayTime){ }
public function failToProduce($producer,$delayTime){
$condition = ['uuid'=>$this->uuid];
$data = ['produce_status'=>SystemQueue::PRODUCE_STATUS_FAIL];
if($this->throwable){
$data['log_content'] = $this->throwable->getMessage();
}
$this->service->update($condition,$data);
}
}