diff --git a/app/src/Model/Job.php b/app/src/Model/Job.php index d259e20..0110fc9 100644 --- a/app/src/Model/Job.php +++ b/app/src/Model/Job.php @@ -7,12 +7,14 @@ class Job extends Ideal\Model { public array $configuration; public bool $executed = false; + public int $retries = 0; protected function jsonComplement(): array { return [ 'configuration' => $this->configuration, - 'executed' => $this->executed + 'executed' => $this->executed, + 'retries' => $this->retries ]; } } diff --git a/app/src/Service/Job.php b/app/src/Service/Job.php index f0b2524..92450cf 100644 --- a/app/src/Service/Job.php +++ b/app/src/Service/Job.php @@ -1,6 +1,8 @@ $now->format('Uu'), 'configuration' => $configuration, 'executed' => false, 'created_at' => $now->format('Y-m-d H:i:s'), - 'updated_at' => null + 'updated_at' => null, + 'retries' => 0 ]; $jobs = []; try { @@ -82,15 +89,69 @@ class Job extends Ideal\Service } } catch (EmptyRedis) {} $jobs []= $data; - $this->redisService->set($this->redisKey, json_encode($jobs), -1); + try { + $this->redisService->set($this->redisKey, json_encode($jobs), -1); + } catch (ConnectionException $exception) { + throw new Create(__CLASS__, $exception); + } return $this->load($data); } /** * @param Model\Job $job - * @return bool + * @return Model\Job + * @throws Update * @throws Read */ + public function update(Model\Job $job): Model\Job + { + try { + $now = (new DateTimeImmutable('now', new DateTimeZone($_ENV['TZ'] ?? 'America/Santiago'))); + } catch (DateMalformedStringException | DateInvalidTimeZoneException) { + $now = new DateTimeImmutable(); + } + $jobs = $this->getJobs(); + try { + $idx = $this->findJob($jobs, $job->id); + } catch (EmptyResult $exception) { + throw new Read(__CLASS__, $exception); + } + $jobs[$idx]['updated_at'] = $now->format('Y-m-d H:i:s'); + $jobs[$idx]['retries'] = $job->retries; + try { + $this->redisService->set($this->redisKey, json_encode($jobs), -1); + } catch (ConnectionException $exception) { + throw new Update(__CLASS__, $exception); + } + return $this->load($jobs[$idx]); + } + + /** + * @param Model\Job $job + * @throws Read + * @throws Delete + */ + public function remove(Model\Job $job): void + { + $jobs = $this->getJobs(); + try { + $idx = $this->findJob($jobs, $job->id); + } catch (EmptyResult $exception) { + throw new Read(__CLASS__, $exception); + } + unset($jobs[$idx]); + try { + $this->redisService->set($this->redisKey, json_encode(array_values($jobs)), -1); + } catch (ConnectionException $exception) { + throw new Delete(__CLASS__, $exception); + } + } + + /** + * @param Model\Job $job + * @return bool + * @throws Read | Create + */ public function execute(Model\Job $job): bool { $jobs = $this->getJobs(); @@ -101,7 +162,11 @@ class Job extends Ideal\Service throw new Read(__CLASS__, $exception); } unset($jobs[$idx]); - $this->redisService->set($this->redisKey, json_encode(array_values($jobs)), -1); + try { + $this->redisService->set($this->redisKey, json_encode(array_values($jobs)), -1); + } catch (ConnectionException $exception) { + throw new Create(__CLASS__, $exception); + } return true; } @@ -144,6 +209,7 @@ class Job extends Ideal\Service $job->id = $id ?? $data['id'] ?? null; $job->configuration = $data['configuration'] ?? []; $job->executed = $data['executed'] ?? false; + $job->retries = $data['retries'] ?? 0; return $job; } } diff --git a/app/src/Service/Queue.php b/app/src/Service/Queue.php index 01921ab..aae7a3a 100644 --- a/app/src/Service/Queue.php +++ b/app/src/Service/Queue.php @@ -5,12 +5,13 @@ use Exception; use Psr\Http\Message\RequestInterface; use Psr\Log\LoggerInterface; use Incoviba\Common\Ideal; -use Incoviba\Exception\ServiceAction\{Create, Read}; +use Incoviba\Exception\ServiceAction\{Create, Delete, Read, Update}; use Incoviba\Service; class Queue extends Ideal\Service { - public function __construct(LoggerInterface $logger, protected Service\Job $jobService, Worker $defaultWorker) + public function __construct(LoggerInterface $logger, protected Service\Job $jobService, Worker $defaultWorker, + protected int $maxRetries = 5) { parent::__construct($logger); $this->register('default', $defaultWorker); @@ -94,9 +95,23 @@ class Queue extends Ideal\Service $errors = []; foreach ($jobs as $job) { + if ($job->retries >= $this->maxRetries) { + try { + $this->jobService->remove($job); + } catch (Read | Delete $exception) { + $this->logger->error($exception->getMessage(), ['exception' => $exception]); + } + continue; + } try { $this->runJob($job->id, $request); } catch (Exception) { + $job->retries ++; + try { + $this->jobService->update($job); + } catch (Read | Update $exception) { + $this->logger->error($exception->getMessage(), ['exception' => $exception]); + } $errors []= $job->id; } } diff --git a/cli/src/Service/Redis.php b/cli/src/Service/Redis.php index f091060..55d7358 100644 --- a/cli/src/Service/Redis.php +++ b/cli/src/Service/Redis.php @@ -12,14 +12,18 @@ class Redis /** * @param string $name * @return string|null - * @throws Exception|ConnectionException + * @throws Exception */ public function get(string $name): ?string { if (!$this->client->exists($name)) { throw new Exception($name); } - return $this->client->get($name); + try { + return $this->client->get($name); + } catch (ConnectionException $exception) { + throw new Exception($exception->getMessage(), $exception->getCode(), $exception); + } } /** @@ -27,7 +31,6 @@ class Redis * @param mixed $value * @param int $expirationTTL * @return void - * @throws ConnectionException */ public function set(string $name, mixed $value, int $expirationTTL = 60 * 60 * 24): void {