Beanstalkd for jobs

This commit is contained in:
Juan Pablo Vial
2025-07-14 14:29:41 -04:00
parent e6e7470bb2
commit 501151a90e
26 changed files with 693 additions and 390 deletions

View File

@ -22,20 +22,4 @@ class Queues extends Ideal\Controller
}
return $this->withJson($response, $output);
}
public function jobs(ServerRequestInterface $request, ResponseInterface $response,
Service\Queue $queueService): ResponseInterface
{
$output = [
'jobs' => array_column($queueService->getPendingJobs(), 'id')
];
return $this->withJson($response, $output);
}
public function run(ServerRequestInterface $request, ResponseInterface $response, Service\Queue $queueService,
int $job_id): ResponseInterface
{
if ($queueService->runJob($job_id, $request)) {
return $response->withStatus(200);
}
return $response->withStatus(422);
}
}

View File

@ -0,0 +1,18 @@
<?php
namespace Incoviba\Exception;
use Throwable;
use Exception;
abstract class MQTT extends Exception
{
public function __construct($message = "", $code = 0, ?Throwable $previous = null)
{
$baseCode = 700;
$code = $baseCode + $code;
if ($message == "") {
$message = "MQTT Exception";
}
parent::__construct($message, $code, $previous);
}
}

View File

@ -0,0 +1,18 @@
<?php
namespace Incoviba\Exception\MQTT;
use Throwable;
use Incoviba\Exception\MQTT;
class MissingClient extends MQTT
{
public function __construct(string $host = '', ?Throwable $previous = null)
{
$message = 'Missing MQTT client';
if ($host !== '') {
$message = "{$message} for host {$host}";
}
$code = 1;
parent::__construct($message, $code, $previous);
}
}

View File

@ -0,0 +1,15 @@
<?php
namespace Incoviba\Exception\MQTT;
use Throwable;
use Incoviba\Exception\MQTT;
class MissingJob extends MQTT
{
public function __construct(?Throwable $previous = null)
{
$message = 'Missing MQTT job';
$code = 10;
parent::__construct($message, $code, $previous);
}
}

View File

@ -0,0 +1,15 @@
<?php
namespace Incoviba\Exception\MQTT;
use Throwable;
use Incoviba\Exception\MQTT;
class RemoveJob extends MQTT
{
public function __construct(int $jobId, ?Throwable $previous = null)
{
$message = "Could not remove job {$jobId}";
$code = 13;
parent::__construct($message, $code, $previous);
}
}

View File

@ -0,0 +1,15 @@
<?php
namespace Incoviba\Exception\MQTT;
use Throwable;
use Incoviba\Exception\MQTT;
class SetJob extends MQTT
{
public function __construct(string $payload, ?Throwable $previous = null)
{
$message = "Could not set job with {$payload}";
$code = 11;
parent::__construct($message, $code, $previous);
}
}

View File

@ -5,60 +5,43 @@ use DateInvalidTimeZoneException;
use DateMalformedStringException;
use DateTimeImmutable;
use DateTimeZone;
use InvalidArgumentException;
use OutOfRangeException;
use Psr\Log\LoggerInterface;
use Predis\Connection\ConnectionException;
use Incoviba\Common\Ideal;
use Incoviba\Common\Implement\Exception\EmptyRedis;
use Incoviba\Common\Implement\Exception\EmptyResult;
use Incoviba\Exception\MQTT as MQTTException;
use Incoviba\Exception\ServiceAction\{Create, Delete, Read, Update};
use Incoviba\Repository;
use Incoviba\Model;
use Incoviba\Repository;
class Job extends Ideal\Service
{
public function __construct(LoggerInterface $logger, protected Redis $redisService,
public function __construct(LoggerInterface $logger, protected MQTT $mqttService,
protected Repository\Job $jobRepository)
{
parent::__construct($logger);
}
protected string $redisKey = 'jobs';
public function getPending(null|string|array $orderBy = null): array
public function isPending(): bool
{
try {
$jobs = $this->redisService->get($this->redisKey);
if ($jobs === null) {
return [];
}
$jobs = json_decode($jobs, true);
if ($orderBy !== null) {
uksort($jobs, function($a, $b) use ($orderBy) {
return $a[$orderBy] <=> $b[$orderBy];
});
}
return array_map([$this, 'load'], $jobs);
} catch (ConnectionException | EmptyRedis) {
return [];
return $this->mqttService->exists();
} catch (MQTTException $exception) {
$this->logger->error($exception->getMessage(), ['exception' => $exception]);
return false;
}
}
/**
* @param int $id
* @return Model\Job
* @throws Read
*/
public function getPendingById(int $id): Model\Job
public function get(): Model\Job
{
$jobs = $this->getJobs();
try {
$idx = $this->findJob($jobs, $id);
} catch (EmptyResult $exception) {
$exception = new OutOfRangeException('Job not found', count($jobs), $exception);
return $this->load(json_decode($this->mqttService->get(), true));
} catch (MQTTException $exception) {
$this->logger->error($exception->getMessage(), ['exception' => $exception]);
throw new Read(__CLASS__, $exception);
}
return $this->load($jobs[$idx]);
}
/**
@ -71,6 +54,7 @@ class Job extends Ideal\Service
try {
$now = (new DateTimeImmutable('now', new DateTimeZone($_ENV['TZ'] ?? 'America/Santiago')));
} catch (DateMalformedStringException | DateInvalidTimeZoneException $exception) {
$this->logger->warning($exception->getMessage(), ['exception' => $exception]);
$now = new DateTimeImmutable();
}
$data = [
@ -81,17 +65,9 @@ class Job extends Ideal\Service
'updated_at' => null,
'retries' => 0
];
$jobs = [];
try {
$jobs = $this->redisService->get($this->redisKey);
if ($jobs !== null) {
$jobs = json_decode($jobs, true);
}
} catch (EmptyRedis) {}
$jobs []= $data;
try {
$this->redisService->set($this->redisKey, json_encode($jobs), -1);
} catch (ConnectionException $exception) {
$this->mqttService->set(json_encode($data));
} catch (MQTTException $exception) {
throw new Create(__CLASS__, $exception);
}
return $this->load($data);
@ -99,50 +75,35 @@ class Job extends Ideal\Service
/**
* @param Model\Job $job
* @return Model\Job
* @return void
* @throws Update
* @throws Read
*/
public function update(Model\Job $job): Model\Job
public function update(Model\Job $job): void
{
try {
$now = (new DateTimeImmutable('now', new DateTimeZone($_ENV['TZ'] ?? 'America/Santiago')));
} catch (DateMalformedStringException | DateInvalidTimeZoneException) {
$now = new DateTimeImmutable();
}
$jobs = $this->getJobs();
$data = json_decode(json_encode($job), true);
$data['updated_at'] = $now->format('Y-m-d H:i:s');
try {
$idx = $this->findJob($jobs, $job->id);
} catch (EmptyResult $exception) {
throw new Read(__CLASS__, $exception);
}
$jobs[$idx]['updated_at'] = $now->format('Y-m-d H:i:s');
$jobs[$idx]['retries'] = $job->retries;
try {
$this->redisService->set($this->redisKey, json_encode($jobs), -1);
} catch (ConnectionException $exception) {
$this->mqttService->update(json_encode($data));
} catch (MQTTException $exception) {
throw new Update(__CLASS__, $exception);
}
return $this->load($jobs[$idx]);
}
/**
* @param Model\Job $job
* @throws Read
* @throws Delete
*/
public function remove(Model\Job $job): void
{
$jobs = $this->getJobs();
try {
$idx = $this->findJob($jobs, $job->id);
} catch (EmptyResult $exception) {
throw new Read(__CLASS__, $exception);
}
unset($jobs[$idx]);
try {
$this->redisService->set($this->redisKey, json_encode(array_values($jobs)), -1);
} catch (ConnectionException $exception) {
$this->mqttService->remove();
} catch (MQTTException $exception) {
throw new Delete(__CLASS__, $exception);
}
}
@ -150,59 +111,18 @@ class Job extends Ideal\Service
/**
* @param Model\Job $job
* @return bool
* @throws Read | Create
*/
public function execute(Model\Job $job): bool
{
$jobs = $this->getJobs();
try {
$idx = $this->findJob($jobs, $job->id);
} catch (EmptyResult $exception) {
$exception = new OutOfRangeException('Job not found', count($jobs), $exception);
throw new Read(__CLASS__, $exception);
$this->mqttService->remove();
return true;
} catch (MQTTException $exception) {
$this->logger->error($exception->getMessage(), ['exception' => $exception]);
return false;
}
unset($jobs[$idx]);
try {
$this->redisService->set($this->redisKey, json_encode(array_values($jobs)), -1);
} catch (ConnectionException $exception) {
throw new Create(__CLASS__, $exception);
}
return true;
}
/**
* @return array
* @throws Read
*/
protected function getJobs(): array
{
try {
$jobs = $this->redisService->get($this->redisKey);
} catch (EmptyRedis $exception) {
throw new Read(__CLASS__, $exception);
}
if ($jobs === null) {
$exception = new InvalidArgumentException("Redis Key {$this->redisKey} not found");
throw new Read(__CLASS__, $exception);
}
return json_decode($jobs, true);
}
/**
* @param array $jobs
* @param int $id
* @return int
* @throws EmptyResult
*/
protected function findJob(array $jobs, int $id): int
{
$idx = array_find_key($jobs, function($job) use ($id) {
return (int) $job['id'] === $id;
});
if ($idx === null) {
throw new EmptyResult("SELECT * FROM jobs WHERE id = ?");
}
return $idx;
}
protected function load(array $data, ?int $id = null): Model\Job
{
$job = new Model\Job();

102
app/src/Service/MQTT.php Normal file
View File

@ -0,0 +1,102 @@
<?php
namespace Incoviba\Service;
use Incoviba\Exception\MQTT as MQTTException;
use Incoviba\Service\MQTT\MQTTInterface;
class MQTT implements MQTTInterface
{
protected array $clients = [];
public function register(string $name, MQTTInterface $client): self
{
$this->clients[$name] = $client;
return $this;
}
public function clientExists(string $name): bool
{
return isset($this->clients[$name]);
}
/**
* @param string|null $name
* @return MQTTInterface
* @throws MQTTException/MissingClient
*/
public function getClient(?string $name = null): MQTTInterface
{
if ($name === null) {
$name = array_keys($this->clients)[0];
}
if (!$this->clientExists($name)) {
throw new MQTTException\MissingClient($name);
}
return $this->clients[$name];
}
/**
* @param string|null $host
* @return bool
* @throws MQTTException/MissingClient
* @throws MQTTException/MissingJob
*/
public function exists(?string $host = null): bool
{
$client = $this->getClient($host);
return $client->exists();
}
/**
* @param string|null $host
* @return string
* @throws MQTTException/MissingClient
* @throws MQTTException/MissingJob
*/
public function get(?string $host = null): string
{
$client = $this->getClient($host);
return $client->get();
}
/**
* @param string $value
* @param int $delay
* @param string|null $host
* @return $this
* @throws MQTTException/MissingClient
* @throws MQTTException/SetJob
*/
public function set(string $value, int $delay = 0, ?string $host = null): self
{
$client = $this->getClient($host);
$client->set($value, $delay);
return $this;
}
/**
* @param int|null $jobId
* @param string|null $host
* @return $this
* @throws MQTTException/MissingJob
* @throws MQTTException/RemoveJob
*/
public function remove(?int $jobId = null, ?string $host = null): self
{
$this->getClient($host)->remove($jobId);
return $this;
}
/**
* @param string $newPayload
* @param int|null $jobId
* @param string|null $host
* @return $this
* @throws MQTTException/MissingJob
* @throws MQTTException/RemoveJob
* @throws MQTTException/SetJob
*/
public function update(string $newPayload, ?int $jobId = null, ?string $host = null): self
{
$this->getClient($host)->update($newPayload, $jobId);
return $this;
}
}

View File

@ -0,0 +1,111 @@
<?php
namespace Incoviba\Service\MQTT;
use Exception;
use Incoviba\Exception\MQTT\MissingJob;
use Incoviba\Exception\MQTT\RemoveJob;
use Incoviba\Exception\MQTT\SetJob;
use Psr\Log\LoggerInterface;
use xobotyi\beansclient;
use xobotyi\beansclient\Exception\ClientException;
use xobotyi\beansclient\Exception\CommandException;
use xobotyi\beansclient\Exception\JobException;
class Beanstalkd implements MQTTInterface
{
/**
* @throws JobException
* @throws ClientException
* @throws CommandException
*/
public function __construct(protected LoggerInterface $logger, protected beansclient\BeansClient $client,
protected string $tube = 'default', protected int $ttr = beansclient\BeansClient::DEFAULT_TTR,
protected int $priority = 1)
{
$this->client->watchTube($this->tube);
}
public function exists(): bool
{
try {
$stats = $this->client->statsTube($this->tube);
return $stats['current-jobs-ready'] > 0;
} catch (Exception $exception) {
$this->logger->error($exception->getMessage(), ['exception' => $exception]);
return false;
}
}
protected ?beansclient\Job $currentJob = null;
public function get(): string
{
if (!$this->exists()) {
throw new MissingJob();
}
try {
$job = $this->client->watchTube($this->tube)->reserve();
$this->currentJob = $job;
return $job->payload;
} catch (Exception $exception) {
$this->logger->error($exception->getMessage(), ['exception' => $exception]);
throw new MissingJob($exception);
}
}
/**
* @param string $value
* @param int $delay
* @return $this
* @throws SetJob
*/
public function set(string $value, int $delay = 0): self
{
try {
$this->client->useTube($this->tube)->put($value, $this->priority, $delay, $this->ttr ?? 0);
} catch (Exception $exception) {
$this->logger->error($exception->getMessage(), ['payload' => $value, 'delay' => $delay, 'exception' => $exception]);
throw new SetJob($value, $exception);
}
return $this;
}
/**
* @param string $newPayload
* @param int|null $jobId
* @return self
* @throws RemoveJob
* @throws SetJob
*/
public function update(string $newPayload, ?int $jobId = null): self
{
if ($jobId === null) {
$jobId = $this->currentJob->id;
}
return $this->remove($jobId)
->set($newPayload);
}
/**
* @param int|null $jobId
* @return $this
* @throws RemoveJob
*/
public function remove(?int $jobId = null): self
{
if ($jobId === null) {
$jobId = $this->currentJob->id;
}
try {
if (!$this->client->useTube($this->tube)->delete($jobId)) {
throw new JobException("Failed to delete job {$jobId}");
}
if ($this->currentJob !== null && $this->currentJob->id === $jobId) {
$this->currentJob = null;
}
} catch (Exception $exception) {
$this->logger->error($exception->getMessage(), ['jobId' => $jobId, 'exception' => $exception]);
throw new RemoveJob($jobId, $exception);
}
return $this;
}
}

View File

@ -0,0 +1,25 @@
<?php
namespace Incoviba\Service\MQTT;
use Incoviba\Exception\MQTT\MissingJob;
interface MQTTInterface {
/**
* @return bool
*/
public function exists(): bool;
/**
* @return string
* @throws MissingJob
*/
public function get(): string;
/**
* @param string $value
* @param int $delay
* @return self
*/
public function set(string $value, int $delay = 0): self;
public function remove(?int $jobId = null): self;
}

View File

@ -7,6 +7,7 @@ use Psr\Log\LoggerInterface;
use Incoviba\Common\Ideal;
use Incoviba\Exception\ServiceAction\{Create, Delete, Read, Update};
use Incoviba\Service;
use Incoviba\Model;
class Queue extends Ideal\Service
{
@ -29,7 +30,7 @@ class Queue extends Ideal\Service
try {
$this->jobService->add($configuration);
return true;
} catch (Read $exception) {
} catch (Create $exception) {
$final = new Exception("Could not enqueue job", 0, $exception);
$this->logger->warning($final);
return false;
@ -40,22 +41,8 @@ class Queue extends Ideal\Service
return $this->enqueue($configuration);
}
/**
* @return array
*/
public function getPendingJobs(): array
public function runJob(Model\Job $job, ?RequestInterface $request = null): bool
{
return $this->jobService->getPending();
}
public function runJob(int $job_id, ?RequestInterface $request = null): bool
{
try {
$job = $this->jobService->getPendingById($job_id);
} catch (Read $exception) {
$this->logger->debug($exception);
return false;
}
$type = 'default';
if (isset($job->configuration['type'])) {
$type = strtolower($job->configuration['type']);
@ -71,50 +58,49 @@ class Queue extends Ideal\Service
try {
if (!$worker->execute($job)) {
$this->logger->debug("Could not execute job {$job_id}");
$this->logger->debug("Could not execute job {$job->id}");
return false;
}
if (!$this->jobService->execute($job)) {
$this->logger->debug("Could not remove job {$job_id}");
$this->logger->debug("Could not remove job {$job->id}");
return false;
}
} catch (Exception $exception) {
$final = new Exception("Could not run job", 0, $exception);
$this->logger->warning($final);
$this->logger->warning("Could not run job {$job->id}", ['exception' => $exception]);
return false;
}
return true;
}
public function run(?RequestInterface $request = null): bool
{
$jobs = $this->jobService->getPending();
if (count($jobs) === 0) {
$this->logger->debug("No pending jobs");
if (!$this->jobService->isPending()) {
return true;
}
$errors = [];
foreach ($jobs as $job) {
if ($job->retries >= $this->maxRetries) {
try {
$this->jobService->remove($job);
} catch (Read | Delete $exception) {
$this->logger->error($exception->getMessage(), ['exception' => $exception]);
}
continue;
}
try {
$this->runJob($job->id, $request);
} catch (Exception) {
$job->retries ++;
try {
$this->jobService->update($job);
} catch (Read | Update $exception) {
$this->logger->error($exception->getMessage(), ['exception' => $exception]);
}
$errors []= $job->id;
}
try {
$job = $this->jobService->get();
} catch (Read $exception) {
$this->logger->error($exception->getMessage(), ['exception' => $exception]);
return false;
}
return count($errors) === 0;
if ($job->retries >= $this->maxRetries) {
try {
$this->jobService->remove($job);
} catch (Delete $exception) {
$this->logger->error($exception->getMessage(), ['job' => $job, 'exception' => $exception]);
}
return true;
}
try {
$this->runJob($job, $request);
} catch (Exception) {
$job->retries ++;
try {
$this->jobService->update($job);
} catch (Update $exception) {
$this->logger->error($exception->getMessage(), ['job' => $job, 'exception' => $exception]);
}
return false;
}
return true;
}
}