<?php
namespace Imi\Process;
use Imi\App;
use Imi\Util\File;
use Imi\Event\Event;
use Imi\Bean\BeanFactory;
use Imi\Process\Parser\ProcessPoolParser;
use Imi\Process\Exception\ProcessAlreadyRunException;
use Imi\Util\Imi;
use Imi\Task\TaskManager;
abstract class ProcessPoolManager
{
public static function create($name, $workerNum = null, $args = [], $ipcType = 0, $msgQueueKey = null): \Swoole\Process\Pool
{
$processPoolOption = ProcessPoolParser::getInstance()->getProcessPool($name);
if(null === $processPoolOption)
{
return null;
}
if(null === $workerNum)
{
$workerNum = $processPoolOption['ProcessPool']->workerNum;
}
if(null === $ipcType)
{
$ipcType = $processPoolOption['ProcessPool']->ipcType;
}
if(null === $msgQueueKey)
{
$msgQueueKey = $processPoolOption['ProcessPool']->msgQueueKey;
}
$pool = new \Swoole\Process\Pool($workerNum, $ipcType, $msgQueueKey);
$pool->on('WorkerStart', function ($pool, $workerId) use($name, $workerNum, $args, $ipcType, $msgQueueKey, $processPoolOption) {
Imi::setProcessName('processPool', [
'processPoolName' => $name,
'workerId' => $workerId,
]);
\Swoole\Runtime::enableCoroutine(true);
mt_srand();
imigo(function() use($pool, $workerId, $name, $workerNum, $args, $ipcType, $msgQueueKey, $processPoolOption){
$processInstance = BeanFactory::newInstance($processPoolOption['className'], $args);
\Imi\Bean\Annotation::getInstance()->init(\Imi\Main\Helper::getAppMains());
App::initWorker();
Event::trigger('IMI.PROCESS_POOL.PROCESS.BEGIN', [
'name' => $name,
'pool' => $pool,
'workerId' => $workerId,
'workerNum' => $workerNum,
'args' => $args,
'ipcType' => $ipcType,
'msgQueueKey' => $msgQueueKey,
]);
$processInstance->run($pool, $workerId, $name, $workerNum, $args, $ipcType, $msgQueueKey);
});
\Swoole\Event::wait();
});
$pool->on('WorkerStop', imiCallable(function ($pool, $workerId) use($name, $workerNum, $args, $ipcType, $msgQueueKey) {
Event::trigger('IMI.PROCESS_POOL.PROCESS.END', [
'name' => $name,
'pool' => $pool,
'workerId' => $workerId,
'workerNum' => $workerNum,
'args' => $args,
'ipcType' => $ipcType,
'msgQueueKey' => $msgQueueKey,
]);
}, true));
return $pool;
}
}