diff --git a/cli/src/Command/Job/Run.php b/cli/src/Command/Job/Run.php index cf52905..c99486f 100644 --- a/cli/src/Command/Job/Run.php +++ b/cli/src/Command/Job/Run.php @@ -42,10 +42,7 @@ class Run extends Console\Command\Command array_map(function($row) {return [$row];},$jobIds) ]]); $this->pushOutput('top', ['progress' => $jobCount]); - foreach ($jobIds as $jobId) { - $this->runJob($jobId); - } - $result = $this->getResponses(); + $result = $this->runJobs($jobIds); $this->pushOutput('top', ['progress' => 'finish']); $this->writeOutput($input, $output); @@ -53,15 +50,34 @@ class Run extends Console\Command\Command return $result; } - protected function runJob(int $jobId): void + protected function runJobs(array $jobIds): int + { + $pendingJobs = []; + foreach ($jobIds as $jobId) { + if (!$this->runJob($jobId)) { + $pendingJobs []= $jobId; + } + } + $result = $this->getResponses(); + + if (count($pendingJobs) > 0) { + if ($this->runJobs($pendingJobs) === self::FAILURE) { + $result = self::FAILURE; + } + } + return $result; + } + protected function runJob(int $jobId): bool { $uri = "/api/queue/run/{$jobId}"; $this->pushOutput('bottom', ['message' => "GET {$uri}"]); try { $this->fastcgi->get($uri); + return true; } catch (FastCGIException $exception) { $this->logger->error($exception->getMessage(), ['uri' => $uri, 'exception' =>$exception]); + return false; } } protected function getResponses(): int diff --git a/cli/src/Exception/Client/FastCGI.php b/cli/src/Exception/Client/FastCGI.php index 8ee3631..b62dbaf 100644 --- a/cli/src/Exception/Client/FastCGI.php +++ b/cli/src/Exception/Client/FastCGI.php @@ -7,7 +7,7 @@ use Psr\Http\Client\ClientExceptionInterface; class FastCGI implements ClientExceptionInterface { - public function __construct(protected ?Throwable $previous) {} + public function __construct(protected ?Throwable $previous = null) {} public function getMessage(): string { diff --git a/cli/src/Service/FastCGI.php b/cli/src/Service/FastCGI.php index 8500957..615ffc7 100644 --- a/cli/src/Service/FastCGI.php +++ b/cli/src/Service/FastCGI.php @@ -10,6 +10,7 @@ class FastCGI implements LoggerAwareInterface { public function __construct(protected Login $loginService, protected string $hostname, protected int $port, protected string $documentRoot, + protected int $maxRequests = 50, protected int $connectionTimeout = 5000, protected int $readTimeout = 5000) { $this->client = new FCGI\Client(); @@ -40,6 +41,9 @@ class FastCGI implements LoggerAwareInterface */ public function sendRequest(FCGI\Interfaces\ProvidesRequestData $request): self { + if (count($this->socketIds) >= $this->maxRequests) { + throw new FastCGIException(); + } if (!isset($this->socket)) { $this->connect(); } @@ -60,13 +64,15 @@ class FastCGI implements LoggerAwareInterface { $responses = []; $repeats = 0; - $maxRepeats = count($this->socketIds); + + $maxRepeats = min(count($this->socketIds), $this->maxRequests); while ($this->client->hasUnhandledResponses()) { if ($repeats >= $maxRepeats) { break; } try { + $readySocketIds = $this->client->getSocketIdsHavingResponse(); $readyResponses = $this->client->readReadyResponses(3000); } catch (FCGI\Exceptions\FastCGIClientException $exception) { $this->logger->error($exception->getMessage()); @@ -77,6 +83,7 @@ class FastCGI implements LoggerAwareInterface $responses []= $response; $repeats ++; } + $this->socketIds = array_diff($this->socketIds, $readySocketIds); } if ($this->client->hasUnhandledResponses()) { $this->logger->error("Unhandled responses");