<?php
declare(strict_types=1);
namespace App\System\Queue\Consumer;
use App\System\Model\SystemQueueMessage;
use App\System\Service\SystemQueueMessageService;
use Hyperf\Amqp\Result;
use Hyperf\Amqp\Annotation\Consumer;
use Hyperf\Amqp\Message\ConsumerMessage;
use Hyperf\Di\Annotation\Inject;
use PhpAmqpLib\Message\AMQPMessage;
class MessageConsumer extends ConsumerMessage
{
protected $service;
public function consumeMessage($data, AMQPMessage $message): string
{
parent::consumeMessage($data,$message);
$data = $data['data'];
$messageIdArr = $data['messageId'] ?? [];
if(!$messageIdArr){
return Result::DROP;
}
array_map(function($messageId){
$this->service->update($messageId,['send_status'=>SystemQueueMessage::STATUS_SENDING]);
$this->service->update($messageId,['send_status'=>SystemQueueMessage::STATUS_SEND_SUCCESS]);
},$messageIdArr);
return Result::ACK;
}
public function isEnable(): bool
{
return env('AMQP_ENABLE', false);
}
}