From 8b2de31e0288905d08e1c6672c5e2b1bd26d2259 Mon Sep 17 00:00:00 2001 From: Juan Pablo Vial Date: Sat, 10 May 2025 12:30:35 -0400 Subject: [PATCH] Job Queue --- app/src/Model/Job.php | 18 ++++++++ app/src/Repository/Job.php | 54 +++++++++++++++++++++++ app/src/Service/Job.php | 51 ++++++++++++++++++++++ app/src/Service/Queue.php | 69 ++++++++++++++++++++++++++++++ app/src/Service/Worker.php | 9 ++++ app/src/Service/Worker/Request.php | 50 ++++++++++++++++++++++ 6 files changed, 251 insertions(+) create mode 100644 app/src/Model/Job.php create mode 100644 app/src/Repository/Job.php create mode 100644 app/src/Service/Job.php create mode 100644 app/src/Service/Queue.php create mode 100644 app/src/Service/Worker.php create mode 100644 app/src/Service/Worker/Request.php diff --git a/app/src/Model/Job.php b/app/src/Model/Job.php new file mode 100644 index 0000000..d259e20 --- /dev/null +++ b/app/src/Model/Job.php @@ -0,0 +1,18 @@ + $this->configuration, + 'executed' => $this->executed + ]; + } +} diff --git a/app/src/Repository/Job.php b/app/src/Repository/Job.php new file mode 100644 index 0000000..83ed63b --- /dev/null +++ b/app/src/Repository/Job.php @@ -0,0 +1,54 @@ +register('configuration', (new Implement\Repository\Mapper()) + ->setFunction(function($data) { + return json_decode($data['configuration'], true); + })) + ->register('executed', new Implement\Repository\Mapper\Boolean('executed')); + return $this->parseData(new Model\Job(), $data, $map); + } + public function save(Define\Model $model): Model\Job + { + $model->id = $this->saveNew(['configuration', 'executed', 'created_at'], + [json_encode($model->configuration), $model->executed, (new DateTimeImmutable())->format('Y-m-d H:i:s.u')]); + return $model; + } + public function edit(Define\Model $model, array $new_data): Model\Job + { + if (isset($new_data['configuration']) and !is_string($new_data['configuration'])) { + $new_data['configuration'] = json_encode($new_data['configuration']); + } + return $this->update($model, ['configuration', 'executed', 'updated_at'], + array_merge($new_data, ['updated_at' => (new DateTimeImmutable())->format('Y-m-d H:i:s.u')])); + } + + /** + * @return array + * @throws Implement\Exception\EmptyResult + */ + public function fetchPending(): array + { + $query = $this->connection->getQueryBuilder() + ->select() + ->from($this->getTable()) + ->where('executed = ?'); + return $this->fetchMany($query, [false]); + } +} diff --git a/app/src/Service/Job.php b/app/src/Service/Job.php new file mode 100644 index 0000000..c72523a --- /dev/null +++ b/app/src/Service/Job.php @@ -0,0 +1,51 @@ +jobRepository->create(compact('configuration')); + return $this->process($this->jobRepository->save($job)); + } catch (PDOException $exception) { + throw new Create(__CLASS__, $exception); + } + } + + /** + * @return array + * @throws Read + */ + public function getPending(): array + { + try { + return array_merge([$this, 'process'],$this->jobRepository->fetchPending()); + } catch (EmptyResult $exception) { + throw new Read(__CLASS__, $exception); + } + } + + protected function process(Model\Job $job): Model\Job + { + return $job; + } +} diff --git a/app/src/Service/Queue.php b/app/src/Service/Queue.php new file mode 100644 index 0000000..0630319 --- /dev/null +++ b/app/src/Service/Queue.php @@ -0,0 +1,69 @@ +register('default', $defaultWorker); + } + + protected array $workers; + public function register(string $name, Worker $worker): self + { + $this->workers[strtolower($name)] = $worker; + return $this; + } + + public function enqueue(array $configuration): bool + { + try { + $this->jobService->add($configuration); + return true; + } catch (Create $exception) { + $final = new Exception("Could not enqueue job", 0, $exception); + $this->logger->warning($final); + return false; + } + } + + public function run(): bool + { + try { + $jobs = $this->jobService->getPending(); + } catch (Read $exception) { + $final = new Exception("Could not get pending jobs", 0, $exception); + $this->logger->warning($final); + return false; + } + + $status = true; + foreach ($jobs as $job) { + $type = 'default'; + if (isset($job->configuration['type'])) { + $type = strtolower($job->configuration['type']); + } + if (!isset($this->workers[$type])) { + $type = 'default'; + } + + $worker = $this->workers[$type]; + + try { + $status &= $worker->run($job); + } catch (Exception $exception) { + $final = new Exception("Could not run job", 0, $exception); + $this->logger->warning($final); + $status &= false; + } + } + return $status; + } +} diff --git a/app/src/Service/Worker.php b/app/src/Service/Worker.php new file mode 100644 index 0000000..6d2f48c --- /dev/null +++ b/app/src/Service/Worker.php @@ -0,0 +1,9 @@ +configuration['url']; + $method = strtolower($job->configuration['method']); + $body = $job->configuration['body']; + + try { + $response = $this->client->{$method}($url, [ + 'json' => $body, + ]); + } catch (ClientExceptionInterface $exception) { + throw new EmptyResponse($url, $exception); + } + + $statusCode = $response->getStatusCode(); + if ((int) floor($statusCode / 100) !== 2) { + throw new EmptyResponse($url); + } + if ($statusCode !== 204) { + $contents = $response->getBody()->getContents(); + $data = json_decode($contents, true); + if (!isset($data['success']) or !$data['success']) { + throw new EmptyResponse($url); + } + } + return true; + } +}