<?php
declare(strict_types=1);
namespace Worker\Built;
use Core\FileSystem\FileException;
use Core\Output;
use Exception;
use Facade\JsonRpc;
use Facade\Process;
use Protocol\Slice;
use stdClass;
use Worker\Built\JsonRpc\Attribute\RPC;
use Worker\Built\JsonRpc\JsonRpcPublisher;
use Worker\Prop\Build;
use Worker\Socket\TCPConnection;
class ProcessManager extends BuiltRpc
{
public array $processObserverIdMap = [];
public static string $facadeClass = Process::class;
public array $childrenProcessIds = [];
public function forkPassive(): void
{
parent::forkPassive();
$this->registerSignalHandler();
$this->childrenProcessIds = [];
}
public function initialize(): void
{
parent::initialize();
$this->registerSignalHandler();
$this->protocol(Slice::class);
try {
$this->bind($this->getRpcServiceAddress());
} catch (Exception $exception) {
Output::printException($exception);
}
}
public function registerSignalHandler(): void
{
pcntl_async_signals(true);
pcntl_signal(SIGCHLD, function () {
while (($childrenProcessId = pcntl_waitpid(-1, $status, WNOHANG)) > 0) {
if ($this->isFork()) {
JsonRpc::call([ProcessManager::class, 'isDie'], $childrenProcessId);
} else {
$this->isDie($childrenProcessId);
}
unset($this->childrenProcessIds[array_search($childrenProcessId, $this->childrenProcessIds)]);
}
});
if ($this->isFork()) {
pcntl_signal(SIGINT, [$this, 'processSignalHandler']);
pcntl_signal(SIGTERM, [$this, 'processSignalHandler']);
pcntl_signal(SIGQUIT, [$this, 'processSignalHandler']);
pcntl_signal(SIGUSR2, [$this, 'processSignalHandler']);
}
}
public function processSignalHandler(): void
{
if (!$this->isFork()) {
Output::info('Process:', 'Exit:' . posix_getpid());
}
foreach ($this->childrenProcessIds as $childrenProcessId) {
$this->signal($childrenProcessId, SIGUSR2);
}
if ($this->isFork()) {
exit(0);
}
}
public function onConnect(TCPConnection $client): void
{
$client->handshake($this->protocol);
}
#[RPC('设置守护进程ID')] public function setObserverProcessId(int $processId, int $observerProcessId): void
{
$this->processObserverIdMap[$processId] = $observerProcessId;
Output::info('Process running: ', $processId . ' [Guard:' . $observerProcessId . ']');
}
#[RPC('设置进程ID')] public function setProcessId(int $processId, JsonRpcPublisher $jsonRpcPublisher): array
{
$tcpConnection = $jsonRpcPublisher->tcpConnection;
$this->setClientName($tcpConnection, "process:{$processId}");
$rpcServiceList = [];
foreach (JsonRpc::getInstance()->rpcServiceAddressList as $name => $info) {
$rpcServiceList[] = [
'name' => $name,
'address' => $info['address'],
'type' => $info['type']
];
}
return $rpcServiceList;
}
#[RPC('向指定进程发送信号')] public function signal(int $processId, int $signal): bool
{
if ($observerProcessId = $this->processObserverIdMap[$processId] ?? null) {
if ($observerProcessId === posix_getpid()) {
return posix_kill($processId, $signal);
} elseif ($tcpConnection = $this->getClientByName("process:{$observerProcessId}")) {
try {
$this->slice->send($tcpConnection, json_encode([
'version' => '2.0',
'method' => 'posix_kill',
'params' => [$processId, $signal]
], JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES));
} catch (FileException $exception) {
Output::printException($exception);
}
}
}
return true;
}
#[RPC('关闭进程')] public function kill(int $processId): bool
{
$result = $this->signal($processId, SIGUSR2);
unset($this->processObserverIdMap[$processId]);
return $result;
}
#[RPC('进程退出')] public function isDie(int $processId): void
{
unset($this->processObserverIdMap[$processId]);
Output::info('Process:', 'Exit:' . $processId);
}
#[RPC('RPC服务上线')] public function registerRpcService(string $name, string $address, string $type, JsonRpcPublisher $jsonRpcPublisher): void
{
$connection = $jsonRpcPublisher->tcpConnection;
try {
$this->publishAsync(Build::new('rpcServiceOnline', [
'name' => $name,
'address' => $address,
'type' => $type
], $this->name));
} catch (Exception $exception) {
Output::printException($exception);
}
foreach ($this->getClients() as $client) {
if ($client !== $connection) {
try {
$this->slice->send($client, json_encode([
'version' => '2.0',
'method' => 'noticeRpcServiceOnline',
'params' => [$name, $address, $type]
], JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES));
} catch (FileException $exception) {
Output::printException($exception);
}
}
}
}
#[RPC('子进程输出')] protected function outputInfo(...$arguments): void
{
array_pop($arguments);
call_user_func_array([Output::class, 'info'], $arguments);
}
#[RPC('子进程发布事件')] protected function event(Build $event): void
{
$this->publishAsync($event);
}
}