register('default', $defaultWorker); } protected array $workers; public function register(string $name, Worker $worker): self { $this->workers[strtolower($name)] = $worker; return $this; } public function enqueue(array $configuration): bool { try { $this->jobService->add($configuration); return true; } catch (Create $exception) { $final = new Exception("Could not enqueue job", 0, $exception); $this->logger->warning($final); return false; } } /** * @return array */ public function getPendingJobs(): array { return $this->jobService->getPending(); } public function runJob(int $job_id, ?RequestInterface $request = null): bool { try { $job = $this->jobService->getPendingById($job_id); } catch (Read $exception) { $this->logger->debug($exception); return false; } $type = 'default'; if (isset($job->configuration['type'])) { $type = strtolower($job->configuration['type']); } if (!isset($this->workers[$type])) { $type = 'default'; } $worker = $this->workers[$type]; if (is_a($worker, Service\Worker\Request::class) and $request !== null) { $worker->setRequest($request); } try { if (!$worker->execute($job)) { $this->logger->debug("Could not execute job {$job_id}"); return false; } if (!$this->jobService->execute($job)) { $this->logger->debug("Could not remove job {$job_id}"); return false; } } catch (Exception $exception) { $final = new Exception("Could not run job", 0, $exception); $this->logger->warning($final); return false; } return true; } public function run(?RequestInterface $request = null): bool { try { $jobs = $this->jobService->getPending(); } catch (Read $exception) { $final = new Exception("Could not get pending jobs", 0, $exception); $this->logger->warning($final); return false; } $errors = []; foreach ($jobs as $job) { try { $this->runJob($job->id, $request); } catch (Exception) { $errors []= $job->id; } } return count($errors) === 0; } }