Cambio en queue para que no quede pegado esperando respuesta en cli.
Chequeo de servicios externos para agregar elementos pendientes.
This commit is contained in:
@ -1,9 +1,13 @@
|
||||
<?php
|
||||
namespace Incoviba\Service;
|
||||
|
||||
use DateTimeImmutable;
|
||||
use InvalidArgumentException;
|
||||
use OutOfRangeException;
|
||||
use PDOException;
|
||||
use Psr\Log\LoggerInterface;
|
||||
use Incoviba\Common\Ideal;
|
||||
use Incoviba\Common\Implement\Exception\EmptyRedis;
|
||||
use Incoviba\Common\Implement\Exception\EmptyResult;
|
||||
use Incoviba\Exception\ServiceAction\{Create, Read, Update};
|
||||
use Incoviba\Repository;
|
||||
@ -11,59 +15,132 @@ use Incoviba\Model;
|
||||
|
||||
class Job extends Ideal\Service
|
||||
{
|
||||
public function __construct(LoggerInterface $logger, protected Repository\Job $jobRepository)
|
||||
public function __construct(LoggerInterface $logger, protected Redis $redisService,
|
||||
protected Repository\Job $jobRepository)
|
||||
{
|
||||
parent::__construct($logger);
|
||||
}
|
||||
protected string $redisKey = 'jobs';
|
||||
|
||||
public function getPending(null|string|array $orderBy = null): array
|
||||
{
|
||||
try {
|
||||
$jobs = $this->redisService->get($this->redisKey);
|
||||
if ($jobs === null) {
|
||||
return [];
|
||||
}
|
||||
$jobs = json_decode($jobs, true);
|
||||
if ($orderBy !== null) {
|
||||
uksort($jobs, function($a, $b) use ($orderBy) {
|
||||
return $a[$orderBy] <=> $b[$orderBy];
|
||||
});
|
||||
}
|
||||
return array_map([$this, 'load'], $jobs);
|
||||
} catch (EmptyRedis) {
|
||||
return [];
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @param int $id
|
||||
* @return Model\Job
|
||||
* @throws Read
|
||||
*/
|
||||
public function getPendingById(int $id): Model\Job
|
||||
{
|
||||
$jobs = $this->getJobs();
|
||||
try {
|
||||
$idx = $this->findJob($jobs, $id);
|
||||
} catch (EmptyResult $exception) {
|
||||
$exception = new OutOfRangeException('Job not found', count($jobs), $exception);
|
||||
throw new Read(__CLASS__, $exception);
|
||||
}
|
||||
return $this->load($jobs[$idx]);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param array $configuration
|
||||
* @return Model\Job
|
||||
* @throws Create
|
||||
* @throws Read
|
||||
*/
|
||||
public function add(array $configuration): Model\Job
|
||||
{
|
||||
$now = (new DateTimeImmutable());
|
||||
$data = [
|
||||
'id' => $now->getTimestamp(),
|
||||
'configuration' => $configuration,
|
||||
'executed' => false,
|
||||
'created_at' => $now->format('Y-m-d H:i:s'),
|
||||
'updated_at' => null
|
||||
];
|
||||
$jobs = [];
|
||||
try {
|
||||
$data = [
|
||||
'configuration' => json_encode($configuration)
|
||||
];
|
||||
$job = $this->jobRepository->create($data);
|
||||
return $this->process($this->jobRepository->save($job));
|
||||
} catch (PDOException $exception) {
|
||||
throw new Create(__CLASS__, $exception);
|
||||
$jobs = $this->redisService->get($this->redisKey);
|
||||
if ($jobs !== null) {
|
||||
$jobs = json_decode($jobs, true);
|
||||
}
|
||||
} catch (EmptyRedis) {}
|
||||
$jobs []= $data;
|
||||
$this->redisService->set($this->redisKey, json_encode($jobs), -1);
|
||||
return $this->load($data);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param Model\Job $job
|
||||
* @return bool
|
||||
* @throws Read
|
||||
*/
|
||||
public function execute(Model\Job $job): bool
|
||||
{
|
||||
$jobs = $this->getJobs();
|
||||
try {
|
||||
$idx = $this->findJob($jobs, $job->id);
|
||||
} catch (EmptyResult $exception) {
|
||||
$exception = new OutOfRangeException('Job not found', count($jobs), $exception);
|
||||
throw new Read(__CLASS__, $exception);
|
||||
}
|
||||
unset($jobs[$idx]);
|
||||
$this->redisService->set($this->redisKey, json_encode(array_values($jobs)), -1);
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return array
|
||||
* @throws Read
|
||||
*/
|
||||
public function getPending(): array
|
||||
protected function getJobs(): array
|
||||
{
|
||||
try {
|
||||
return array_map([$this, 'process'],$this->jobRepository->fetchPending());
|
||||
} catch (EmptyResult $exception) {
|
||||
$jobs = $this->redisService->get($this->redisKey);
|
||||
} catch (EmptyRedis $exception) {
|
||||
throw new Read(__CLASS__, $exception);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @param Model\Job $job
|
||||
* @return bool
|
||||
* @throws Update
|
||||
*/
|
||||
public function execute(Model\Job $job): bool
|
||||
{
|
||||
try {
|
||||
$this->jobRepository->edit($job, ['executed' => true]);
|
||||
return true;
|
||||
} catch (EmptyResult | PDOException $exception) {
|
||||
throw new Update(__CLASS__, $exception);
|
||||
if ($jobs === null) {
|
||||
$exception = new InvalidArgumentException("Redis Key {$this->redisKey} not found");
|
||||
throw new Read(__CLASS__, $exception);
|
||||
}
|
||||
return json_decode($jobs, true);
|
||||
}
|
||||
|
||||
protected function process(Model\Job $job): Model\Job
|
||||
/**
|
||||
* @param array $jobs
|
||||
* @param int $id
|
||||
* @return int
|
||||
* @throws EmptyResult
|
||||
*/
|
||||
protected function findJob(array $jobs, int $id): int
|
||||
{
|
||||
$idx = array_find_key($jobs, fn($job) => $job['id'] === $id);
|
||||
if ($idx === false) {
|
||||
throw new EmptyResult("SELECT * FROM jobs WHERE id = ?");
|
||||
}
|
||||
return $idx;
|
||||
}
|
||||
protected function load(array $data, ?int $id = null): Model\Job
|
||||
{
|
||||
$job = new Model\Job();
|
||||
$job->id = $id ?? $data['id'] ?? null;
|
||||
$job->configuration = $data['configuration'] ?? [];
|
||||
$job->executed = $data['executed'] ?? false;
|
||||
return $job;
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user