diff --git a/app/bin/console b/app/bin/console old mode 100644 new mode 100755 diff --git a/app/bin/integration_tests b/app/bin/integration_tests old mode 100644 new mode 100755 diff --git a/app/bin/performance_tests b/app/bin/performance_tests old mode 100644 new mode 100755 diff --git a/app/bin/unit_tests b/app/bin/unit_tests old mode 100644 new mode 100755 diff --git a/app/src/Service/Queue.php b/app/src/Service/Queue.php index 755e6dc..3d082a7 100644 --- a/app/src/Service/Queue.php +++ b/app/src/Service/Queue.php @@ -59,6 +59,8 @@ class Queue extends Ideal\Service try { if (!$worker->execute($job)) { $this->logger->debug("Could not execute job {$job->id}"); + $job->retries++; + $this->jobService->update($job); return false; } if (!$this->jobService->execute($job)) { @@ -67,6 +69,12 @@ class Queue extends Ideal\Service } } catch (Exception $exception) { $this->logger->warning("Could not run job {$job->id}", ['exception' => $exception]); + $job->retries++; + try { + $this->jobService->update($job); + } catch (Update $exception) { + $this->logger->error($exception->getMessage(), ['job' => $job, 'exception' => $exception]); + } return false; } return true; diff --git a/cli/entrypoint b/cli/entrypoint old mode 100644 new mode 100755 diff --git a/cli/src/Command/Queue.php b/cli/src/Command/Queue.php index 302ee0c..38df4d4 100644 --- a/cli/src/Command/Queue.php +++ b/cli/src/Command/Queue.php @@ -42,7 +42,14 @@ class Queue extends Command } $io->title("[{$now->format('Y-m-d H:i:s e')}] Running Queue..."); - return $this->runJob(); + $results = []; + for ($i = 0; $i < $this->batchSize; $i++) { + if ($this->jobService->getPending() === 0) { + break; + } + $results []= $this->runJob(); + } + return count(array_filter($results, fn ($result) => $result === self::FAILURE)) === 0 ? self::SUCCESS : self::FAILURE; } protected array $sections; diff --git a/cli/src/Command/Job/Pending.php b/cli/src/Command/Queue/Pending.php similarity index 79% rename from cli/src/Command/Job/Pending.php rename to cli/src/Command/Queue/Pending.php index 718b375..e85d6cd 100644 --- a/cli/src/Command/Job/Pending.php +++ b/cli/src/Command/Queue/Pending.php @@ -1,10 +1,10 @@ addOption('configurations', 'c', Console\Input\InputOption::VALUE_REQUIRED | Console\Input\InputOption::VALUE_IS_ARRAY, 'Job configuration, must be in valid JSON format'); + $this->addOption('configurations', 'c', Console\Input\InputOption::VALUE_REQUIRED | Console\Input\InputOption::VALUE_IS_ARRAY, 'Job configuration options array, each job configuration must be in valid JSON format'); + $this->addOption('files', 'f', Console\Input\InputOption::VALUE_REQUIRED | Console\Input\InputOption::VALUE_IS_ARRAY, 'Paths to jobs configurations files with JSON array content'); } protected function execute(Console\Input\InputInterface $input, Console\Output\OutputInterface $output): int @@ -23,8 +25,8 @@ class Push extends Console\Command\Command $io = new Console\Style\SymfonyStyle($input, $output); $io->title("Pushing job"); - $configurations = $input->getOption('configurations'); - if ($configurations === null) { + $configurations = $this->getConfigurations($input); + if (count($configurations) === 0) { $io->error('Missing configurations'); return self::FAILURE; } @@ -46,4 +48,74 @@ class Push extends Console\Command\Command } return $result; } + + protected function getConfigurations(Console\Input\InputInterface $input): array + { + return [ + ...$this->getFilesConfigurations($input), + ...$this->getOptionConfigurations($input), + ]; + } + protected function getFilesConfigurations(Console\Input\InputInterface $input): array + { + $configurations = []; + $files = $input->getOption('files'); + if ($files === null) { + return $configurations; + } + foreach ($files as $filePath) { + if (!file_exists($filePath)) { + continue; + } + $configurations = array_merge($configurations, $this->getFileConfigurations($filePath)); + } + return $configurations; + } + protected function getFileConfigurations(string $filePath): array + { + $configurations = []; + if (!file_exists($filePath)) { + return $configurations; + } + $json = file_get_contents($filePath); + if (!json_validate($json)) { + return $configurations; + } + $tmp = json_decode($json, true); + foreach ($tmp as $config) { + try { + $configurations []= $this->processConfiguration(json_encode($config)); + } catch (Throwable $exception) { + $this->logger->warning($exception->getMessage(), ['exception' => $exception, 'config' => $config]); + } + } + return $configurations; + } + protected function getOptionConfigurations(Console\Input\InputInterface $input): array + { + $configurations = []; + $configOptions = $input->getOption('configurations'); + if ($configOptions === null) { + return $configurations; + } + foreach ($configOptions as $config) { + try { + $configurations []= $this->processConfiguration($config); + } catch (Throwable $exception) { + $this->logger->warning($exception->getMessage(), ['exception' => $exception, 'config' => $config]); + } + } + return $configurations; + } + protected function processConfiguration(string $configuration): string + { + $json = json_decode($configuration, true); + if (!array_key_exists('type', $json) and !array_key_exists('configuration', $json)) { + throw new Console\Exception\InvalidArgumentException('Missing type or configuration key in JSON'); + } + if (array_key_exists('type', $json)) { + return json_encode($json); + } + return json_encode($json['configuration']); + } } diff --git a/cli/start_command b/cli/start_command old mode 100644 new mode 100755