<?php
declare(strict_types = 1);
namespace App\System\Service;
use App\System\Mapper\SystemQueueMessageMapper;
use App\System\Model\SystemQueueMessage;
use App\System\Model\SystemUser;
use App\System\Queue\Producer\MessageProducer;
use Hyperf\Di\Annotation\Inject;
use Mine\Abstracts\AbstractService;
use Mine\Amqp\DelayProducer;
class SystemQueueMessageService extends AbstractService
{
public $mapper;
public $userService;
protected $producer;
public function __construct(SystemQueueMessageMapper $mapper)
{
$this->mapper = $mapper;
}
public function send(array $data): int
{
$this->setAttributes($data);
$userIdArr = [$this->receive_by];
if(!$this->receive_by){
$userIdArr = $this->userService->pluck(['status'=>SystemUser::USER_NORMAL],'id');
}
$messageId = array_map(function($userId) use ($data){
$data['receive_by'] = $userId;
return $this->mapper->save($data);
},$userIdArr);
return $this->push($messageId);
}
public function look($messageId = 0): int
{
$condition = $messageId;
if(!$messageId){
$condition = [
'receive_by'=>user()->getId()
];
}
return $this->mapper->updateByCondition($condition, [
'read_status'=>SystemQueueMessage::READ_STATUS_YES
]) ? 1 : 0;
}
protected function push(array $messageId): int
{
$message = new MessageProducer(['messageId'=>$messageId]);
return $this->producer->produce($message,false,5,0) ? 1 : 0;
}
}