diff --git a/CLI.Dockerfile b/CLI.Dockerfile index 4ee997b..5844682 100644 --- a/CLI.Dockerfile +++ b/CLI.Dockerfile @@ -3,7 +3,8 @@ FROM php:8.4-cli ENV TZ "${TZ}" ENV APP_NAME "${APP_NAME}" -RUN apt-get update && apt-get install -y --no-install-recommends cron rsyslog nano && rm -r /var/lib/apt/lists/* +RUN apt-get update && apt-get install -y --no-install-recommends cron rsyslog nano beanstalkd \ + && rm -r /var/lib/apt/lists/* RUN pecl install xdebug-3.4.2 \ && docker-php-ext-enable xdebug \ diff --git a/Dockerfile b/Dockerfile index 021a147..091b51d 100644 --- a/Dockerfile +++ b/Dockerfile @@ -3,7 +3,7 @@ FROM php:8.4-fpm ENV TZ=America/Santiago RUN apt-get update && apt-get install -y --no-install-recommends libzip-dev libicu-dev git \ - libpng-dev unzip tzdata libxml2-dev \ + libpng-dev unzip tzdata libxml2-dev beanstalkd \ && rm -r /var/lib/apt/lists/* \ && docker-php-ext-install pdo pdo_mysql zip intl gd bcmath dom \ && pecl install xdebug-3.4.2 \ diff --git a/app/composer.json b/app/composer.json index 2d2a50c..7f97281 100644 --- a/app/composer.json +++ b/app/composer.json @@ -8,11 +8,13 @@ "ext-gd": "*", "ext-openssl": "*", "ext-pdo": "*", + "ext-sockets": "*", "berrnd/slim-blade-view": "^1", "guzzlehttp/guzzle": "^7", "monolog/monolog": "^3", "nyholm/psr7": "^1", "nyholm/psr7-server": "^1", + "pda/pheanstalk": "^7.0", "php-di/php-di": "^7", "php-di/slim-bridge": "^3", "phpoffice/phpspreadsheet": "^3", diff --git a/app/resources/database/migrations/20250701192525_create_toku_accounts.php b/app/resources/database/migrations/20250701192525_create_toku_accounts.php index 2d7270d..dbfce11 100644 --- a/app/resources/database/migrations/20250701192525_create_toku_accounts.php +++ b/app/resources/database/migrations/20250701192525_create_toku_accounts.php @@ -19,14 +19,18 @@ final class CreateTokuAccounts extends AbstractMigration */ public function change(): void { + $this->execute('SET unique_checks=0; SET foreign_key_checks=0;'); + $this->table('toku_accounts') ->addColumn('sociedad_rut', 'integer', ['limit' => 8, 'signed' => false, 'null' => false]) ->addColumn('toku_id', 'string', ['length' => 255, 'null' => false]) ->addColumn('account_key', 'string', ['length' => 255, 'null' => false]) ->addColumn('enabled', 'boolean', ['default' => true]) - ->addIndex(['toku_id'], ['unique' => true]) - ->addForeignKey('sociedad_rut', 'inmobiliaria', 'rut', ['delete' => 'CASCADE', 'update' => 'CASCADE']) ->addTimestamps() + #->addForeignKey('sociedad_rut', 'inmobiliaria', 'rut', ['delete' => 'CASCADE', 'update' => 'CASCADE']) + ->addIndex(['toku_id'], ['unique' => true]) ->create(); + + $this->execute('SET unique_checks=1; SET foreign_key_checks=1;'); } } diff --git a/app/resources/routes/api/queue.php b/app/resources/routes/api/queue.php index 37c36ce..43a94d1 100644 --- a/app/resources/routes/api/queue.php +++ b/app/resources/routes/api/queue.php @@ -2,9 +2,9 @@ use Incoviba\Controller\API\Queues; $app->group('/queue', function($app) { - $app->get('/jobs[/]', [Queues::class, 'jobs']); + #$app->get('/jobs[/]', [Queues::class, 'jobs']); $app->group('/run', function($app) { - $app->get('/{job_id:[0-9]+}[/]', [Queues::class, 'run']); - #$app->get('[/]', Queues::class); + #$app->get('/{job_id:[0-9]+}[/]', [Queues::class, 'run']); + $app->get('[/]', Queues::class); }); }); diff --git a/app/setup/setups/services.php b/app/setup/setups/services.php index 728f77c..9cdecfc 100644 --- a/app/setup/setups/services.php +++ b/app/setup/setups/services.php @@ -162,6 +162,16 @@ return [ ->register('subscription', $container->get(Incoviba\Service\Venta\MediosPago\Toku\Subscription::class)) ->register('invoice', $container->get(Incoviba\Service\Venta\MediosPago\Toku\Invoice::class)); }, + Pheanstalk\Pheanstalk::class => function(ContainerInterface $container) { + return Pheanstalk\Pheanstalk::create( + $container->get('BEANSTALKD_HOST'), + $container->has('BEANSTALKD_PORT') ? $container->get('BEANSTALKD_PORT') : 11300 + ); + }, + Incoviba\Service\MQTT::class => function(ContainerInterface $container) { + return new Incoviba\Service\MQTT() + ->register('default', $container->get(Incoviba\Service\MQTT\Pheanstalk::class)); + }, Incoviba\Service\Queue::class => function(ContainerInterface $container) { return new Incoviba\Service\Queue( $container->get(Psr\Log\LoggerInterface::class), diff --git a/app/src/Controller/API/Queues.php b/app/src/Controller/API/Queues.php index 8c2720c..52bae56 100644 --- a/app/src/Controller/API/Queues.php +++ b/app/src/Controller/API/Queues.php @@ -22,20 +22,4 @@ class Queues extends Ideal\Controller } return $this->withJson($response, $output); } - public function jobs(ServerRequestInterface $request, ResponseInterface $response, - Service\Queue $queueService): ResponseInterface - { - $output = [ - 'jobs' => array_column($queueService->getPendingJobs(), 'id') - ]; - return $this->withJson($response, $output); - } - public function run(ServerRequestInterface $request, ResponseInterface $response, Service\Queue $queueService, - int $job_id): ResponseInterface - { - if ($queueService->runJob($job_id, $request)) { - return $response->withStatus(200); - } - return $response->withStatus(422); - } } diff --git a/app/src/Exception/MQTT.php b/app/src/Exception/MQTT.php new file mode 100644 index 0000000..2d38e64 --- /dev/null +++ b/app/src/Exception/MQTT.php @@ -0,0 +1,18 @@ +redisService->get($this->redisKey); - if ($jobs === null) { - return []; - } - $jobs = json_decode($jobs, true); - if ($orderBy !== null) { - uksort($jobs, function($a, $b) use ($orderBy) { - return $a[$orderBy] <=> $b[$orderBy]; - }); - } - return array_map([$this, 'load'], $jobs); - } catch (ConnectionException | EmptyRedis) { - return []; + return $this->mqttService->exists(); + } catch (MQTTException $exception) { + $this->logger->error($exception->getMessage(), ['exception' => $exception]); + return false; } } /** - * @param int $id * @return Model\Job * @throws Read */ - public function getPendingById(int $id): Model\Job + public function get(): Model\Job { - $jobs = $this->getJobs(); try { - $idx = $this->findJob($jobs, $id); - } catch (EmptyResult $exception) { - $exception = new OutOfRangeException('Job not found', count($jobs), $exception); + return $this->load(json_decode($this->mqttService->get(), true)); + } catch (MQTTException $exception) { + $this->logger->error($exception->getMessage(), ['exception' => $exception]); throw new Read(__CLASS__, $exception); } - return $this->load($jobs[$idx]); } /** @@ -71,6 +54,7 @@ class Job extends Ideal\Service try { $now = (new DateTimeImmutable('now', new DateTimeZone($_ENV['TZ'] ?? 'America/Santiago'))); } catch (DateMalformedStringException | DateInvalidTimeZoneException $exception) { + $this->logger->warning($exception->getMessage(), ['exception' => $exception]); $now = new DateTimeImmutable(); } $data = [ @@ -81,17 +65,9 @@ class Job extends Ideal\Service 'updated_at' => null, 'retries' => 0 ]; - $jobs = []; try { - $jobs = $this->redisService->get($this->redisKey); - if ($jobs !== null) { - $jobs = json_decode($jobs, true); - } - } catch (EmptyRedis) {} - $jobs []= $data; - try { - $this->redisService->set($this->redisKey, json_encode($jobs), -1); - } catch (ConnectionException $exception) { + $this->mqttService->set(json_encode($data)); + } catch (MQTTException $exception) { throw new Create(__CLASS__, $exception); } return $this->load($data); @@ -99,50 +75,35 @@ class Job extends Ideal\Service /** * @param Model\Job $job - * @return Model\Job + * @return void * @throws Update - * @throws Read */ - public function update(Model\Job $job): Model\Job + public function update(Model\Job $job): void { try { $now = (new DateTimeImmutable('now', new DateTimeZone($_ENV['TZ'] ?? 'America/Santiago'))); } catch (DateMalformedStringException | DateInvalidTimeZoneException) { $now = new DateTimeImmutable(); } - $jobs = $this->getJobs(); + $data = json_decode(json_encode($job), true); + $data['updated_at'] = $now->format('Y-m-d H:i:s'); + try { - $idx = $this->findJob($jobs, $job->id); - } catch (EmptyResult $exception) { - throw new Read(__CLASS__, $exception); - } - $jobs[$idx]['updated_at'] = $now->format('Y-m-d H:i:s'); - $jobs[$idx]['retries'] = $job->retries; - try { - $this->redisService->set($this->redisKey, json_encode($jobs), -1); - } catch (ConnectionException $exception) { + $this->mqttService->update(json_encode($data)); + } catch (MQTTException $exception) { throw new Update(__CLASS__, $exception); } - return $this->load($jobs[$idx]); } /** * @param Model\Job $job - * @throws Read * @throws Delete */ public function remove(Model\Job $job): void { - $jobs = $this->getJobs(); try { - $idx = $this->findJob($jobs, $job->id); - } catch (EmptyResult $exception) { - throw new Read(__CLASS__, $exception); - } - unset($jobs[$idx]); - try { - $this->redisService->set($this->redisKey, json_encode(array_values($jobs)), -1); - } catch (ConnectionException $exception) { + $this->mqttService->remove(); + } catch (MQTTException $exception) { throw new Delete(__CLASS__, $exception); } } @@ -150,59 +111,18 @@ class Job extends Ideal\Service /** * @param Model\Job $job * @return bool - * @throws Read | Create */ public function execute(Model\Job $job): bool { - $jobs = $this->getJobs(); try { - $idx = $this->findJob($jobs, $job->id); - } catch (EmptyResult $exception) { - $exception = new OutOfRangeException('Job not found', count($jobs), $exception); - throw new Read(__CLASS__, $exception); + $this->mqttService->remove(); + return true; + } catch (MQTTException $exception) { + $this->logger->error($exception->getMessage(), ['exception' => $exception]); + return false; } - unset($jobs[$idx]); - try { - $this->redisService->set($this->redisKey, json_encode(array_values($jobs)), -1); - } catch (ConnectionException $exception) { - throw new Create(__CLASS__, $exception); - } - return true; } - /** - * @return array - * @throws Read - */ - protected function getJobs(): array - { - try { - $jobs = $this->redisService->get($this->redisKey); - } catch (EmptyRedis $exception) { - throw new Read(__CLASS__, $exception); - } - if ($jobs === null) { - $exception = new InvalidArgumentException("Redis Key {$this->redisKey} not found"); - throw new Read(__CLASS__, $exception); - } - return json_decode($jobs, true); - } - /** - * @param array $jobs - * @param int $id - * @return int - * @throws EmptyResult - */ - protected function findJob(array $jobs, int $id): int - { - $idx = array_find_key($jobs, function($job) use ($id) { - return (int) $job['id'] === $id; - }); - if ($idx === null) { - throw new EmptyResult("SELECT * FROM jobs WHERE id = ?"); - } - return $idx; - } protected function load(array $data, ?int $id = null): Model\Job { $job = new Model\Job(); diff --git a/app/src/Service/MQTT.php b/app/src/Service/MQTT.php new file mode 100644 index 0000000..de653cb --- /dev/null +++ b/app/src/Service/MQTT.php @@ -0,0 +1,102 @@ +clients[$name] = $client; + return $this; + } + public function clientExists(string $name): bool + { + return isset($this->clients[$name]); + } + + /** + * @param string|null $name + * @return MQTTInterface + * @throws MQTTException/MissingClient + */ + public function getClient(?string $name = null): MQTTInterface + { + if ($name === null) { + $name = array_keys($this->clients)[0]; + } + if (!$this->clientExists($name)) { + throw new MQTTException\MissingClient($name); + } + return $this->clients[$name]; + } + + /** + * @param string|null $host + * @return bool + * @throws MQTTException/MissingClient + * @throws MQTTException/MissingJob + */ + public function exists(?string $host = null): bool + { + $client = $this->getClient($host); + return $client->exists(); + } + + /** + * @param string|null $host + * @return string + * @throws MQTTException/MissingClient + * @throws MQTTException/MissingJob + */ + public function get(?string $host = null): string + { + $client = $this->getClient($host); + return $client->get(); + } + + /** + * @param string $value + * @param int $delay + * @param string|null $host + * @return $this + * @throws MQTTException/MissingClient + * @throws MQTTException/SetJob + */ + public function set(string $value, int $delay = 0, ?string $host = null): self + { + $client = $this->getClient($host); + $client->set($value, $delay); + return $this; + } + + /** + * @param int|null $jobId + * @param string|null $host + * @return $this + * @throws MQTTException/MissingJob + * @throws MQTTException/RemoveJob + */ + public function remove(?int $jobId = null, ?string $host = null): self + { + $this->getClient($host)->remove($jobId); + return $this; + } + + /** + * @param string $newPayload + * @param int|null $jobId + * @param string|null $host + * @return $this + * @throws MQTTException/MissingJob + * @throws MQTTException/RemoveJob + * @throws MQTTException/SetJob + */ + public function update(string $newPayload, ?int $jobId = null, ?string $host = null): self + { + $this->getClient($host)->update($newPayload, $jobId); + return $this; + } +} diff --git a/app/src/Service/MQTT/Beanstalkd.php b/app/src/Service/MQTT/Beanstalkd.php new file mode 100644 index 0000000..2f30b0c --- /dev/null +++ b/app/src/Service/MQTT/Beanstalkd.php @@ -0,0 +1,111 @@ +client->watchTube($this->tube); + } + + public function exists(): bool + { + try { + $stats = $this->client->statsTube($this->tube); + return $stats['current-jobs-ready'] > 0; + } catch (Exception $exception) { + $this->logger->error($exception->getMessage(), ['exception' => $exception]); + return false; + } + } + + protected ?beansclient\Job $currentJob = null; + public function get(): string + { + if (!$this->exists()) { + throw new MissingJob(); + } + try { + $job = $this->client->watchTube($this->tube)->reserve(); + $this->currentJob = $job; + return $job->payload; + } catch (Exception $exception) { + $this->logger->error($exception->getMessage(), ['exception' => $exception]); + throw new MissingJob($exception); + } + } + + /** + * @param string $value + * @param int $delay + * @return $this + * @throws SetJob + */ + public function set(string $value, int $delay = 0): self + { + try { + $this->client->useTube($this->tube)->put($value, $this->priority, $delay, $this->ttr ?? 0); + } catch (Exception $exception) { + $this->logger->error($exception->getMessage(), ['payload' => $value, 'delay' => $delay, 'exception' => $exception]); + throw new SetJob($value, $exception); + } + return $this; + } + + /** + * @param string $newPayload + * @param int|null $jobId + * @return self + * @throws RemoveJob + * @throws SetJob + */ + public function update(string $newPayload, ?int $jobId = null): self + { + if ($jobId === null) { + $jobId = $this->currentJob->id; + } + return $this->remove($jobId) + ->set($newPayload); + } + + /** + * @param int|null $jobId + * @return $this + * @throws RemoveJob + */ + public function remove(?int $jobId = null): self + { + if ($jobId === null) { + $jobId = $this->currentJob->id; + } + try { + if (!$this->client->useTube($this->tube)->delete($jobId)) { + throw new JobException("Failed to delete job {$jobId}"); + } + if ($this->currentJob !== null && $this->currentJob->id === $jobId) { + $this->currentJob = null; + } + } catch (Exception $exception) { + $this->logger->error($exception->getMessage(), ['jobId' => $jobId, 'exception' => $exception]); + throw new RemoveJob($jobId, $exception); + } + return $this; + } +} diff --git a/app/src/Service/MQTT/MQTTInterface.php b/app/src/Service/MQTT/MQTTInterface.php new file mode 100644 index 0000000..a6ce092 --- /dev/null +++ b/app/src/Service/MQTT/MQTTInterface.php @@ -0,0 +1,25 @@ +tube = new PBA\Values\TubeName($tubeName); + } + + protected PBA\Values\TubeName $tube; + + public function set(string $value, int $delay = 0): self + { + $this->client->useTube($this->tube); + $this->client->put($value, self::DEFAULT_PRIORITY, $delay, self::DEFAULT_TTR); + return $this; + } + public function exists(): bool + { + $stats = $this->client->statsTube($this->tube); + return $stats->currentJobsReady > 0; + } + protected int $currentJobId; + public function get(): string + { + $this->client->useTube($this->tube); + $job = $this->client->reserve(); + $this->currentJobId = $job->getId(); + return $job->getData(); + } + public function update(string $newPayload, ?int $jobId = null): self + { + if ($jobId === null) { + $jobId = $this->currentJobId; + } + $this->remove($jobId); + $this->set($newPayload); + return $this; + } + public function remove(?int $jobId = null): self + { + if ($jobId === null) { + $jobId = $this->currentJobId; + } + $this->client->useTube($this->tube); + $this->client->delete(new PBA\Values\JobId($jobId)); + return $this; + } +} diff --git a/app/src/Service/Queue.php b/app/src/Service/Queue.php index aae7a3a..755e6dc 100644 --- a/app/src/Service/Queue.php +++ b/app/src/Service/Queue.php @@ -7,6 +7,7 @@ use Psr\Log\LoggerInterface; use Incoviba\Common\Ideal; use Incoviba\Exception\ServiceAction\{Create, Delete, Read, Update}; use Incoviba\Service; +use Incoviba\Model; class Queue extends Ideal\Service { @@ -29,7 +30,7 @@ class Queue extends Ideal\Service try { $this->jobService->add($configuration); return true; - } catch (Read $exception) { + } catch (Create $exception) { $final = new Exception("Could not enqueue job", 0, $exception); $this->logger->warning($final); return false; @@ -40,22 +41,8 @@ class Queue extends Ideal\Service return $this->enqueue($configuration); } - /** - * @return array - */ - public function getPendingJobs(): array + public function runJob(Model\Job $job, ?RequestInterface $request = null): bool { - return $this->jobService->getPending(); - } - public function runJob(int $job_id, ?RequestInterface $request = null): bool - { - try { - $job = $this->jobService->getPendingById($job_id); - } catch (Read $exception) { - $this->logger->debug($exception); - return false; - } - $type = 'default'; if (isset($job->configuration['type'])) { $type = strtolower($job->configuration['type']); @@ -71,50 +58,49 @@ class Queue extends Ideal\Service try { if (!$worker->execute($job)) { - $this->logger->debug("Could not execute job {$job_id}"); + $this->logger->debug("Could not execute job {$job->id}"); return false; } if (!$this->jobService->execute($job)) { - $this->logger->debug("Could not remove job {$job_id}"); + $this->logger->debug("Could not remove job {$job->id}"); return false; } } catch (Exception $exception) { - $final = new Exception("Could not run job", 0, $exception); - $this->logger->warning($final); + $this->logger->warning("Could not run job {$job->id}", ['exception' => $exception]); return false; } return true; } public function run(?RequestInterface $request = null): bool { - $jobs = $this->jobService->getPending(); - if (count($jobs) === 0) { - $this->logger->debug("No pending jobs"); + if (!$this->jobService->isPending()) { return true; } - - $errors = []; - foreach ($jobs as $job) { - if ($job->retries >= $this->maxRetries) { - try { - $this->jobService->remove($job); - } catch (Read | Delete $exception) { - $this->logger->error($exception->getMessage(), ['exception' => $exception]); - } - continue; - } - try { - $this->runJob($job->id, $request); - } catch (Exception) { - $job->retries ++; - try { - $this->jobService->update($job); - } catch (Read | Update $exception) { - $this->logger->error($exception->getMessage(), ['exception' => $exception]); - } - $errors []= $job->id; - } + try { + $job = $this->jobService->get(); + } catch (Read $exception) { + $this->logger->error($exception->getMessage(), ['exception' => $exception]); + return false; } - return count($errors) === 0; + if ($job->retries >= $this->maxRetries) { + try { + $this->jobService->remove($job); + } catch (Delete $exception) { + $this->logger->error($exception->getMessage(), ['job' => $job, 'exception' => $exception]); + } + return true; + } + try { + $this->runJob($job, $request); + } catch (Exception) { + $job->retries ++; + try { + $this->jobService->update($job); + } catch (Update $exception) { + $this->logger->error($exception->getMessage(), ['job' => $job, 'exception' => $exception]); + } + return false; + } + return true; } } diff --git a/app/tests/unit/src/Service/MQTT/BeanstalkdTest.php b/app/tests/unit/src/Service/MQTT/BeanstalkdTest.php new file mode 100644 index 0000000..5daf585 --- /dev/null +++ b/app/tests/unit/src/Service/MQTT/BeanstalkdTest.php @@ -0,0 +1,101 @@ +logger = $this->getMockBuilder(LoggerInterface::class)->getMock(); + $this->client = $this->getMockBuilder(BeansClient::class)->disableOriginalConstructor()->getMock(); + } + + public function testExists(): void + { + $stats = ['current-jobs-ready' => 1]; + $this->client->method('watchTube')->willReturn($this->client); + $this->client->method('statsTube')->willReturn($stats); + $service = new Beanstalkd($this->logger, $this->client); + $this->assertTrue($service->exists()); + } + public function testNotExists(): void + { + $stats = ['current-jobs-ready' => 0]; + $this->client->method('watchTube')->willReturn($this->client); + $this->client->method('statsTube')->willReturn($stats); + $service = new Beanstalkd($this->logger, $this->client); + $this->assertFalse($service->exists()); + } + public function testGet(): void + { + $jobData = [ + 'id' => 1, + 'configuration' => [ + 'type' => 'service', + ], + 'created_at' => '2020-01-01 00:00:00', + ]; + $connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock(); + $connection->method('isActive')->willReturn(true); + $this->client->method('getConnection')->willReturn($connection); + $this->client->method('watchTube')->willReturn($this->client); + $this->client->method('statsTube')->willReturn(['current-jobs-ready' => 1]); + $job = new Job($this->client, 1, 'ready', json_encode($jobData)); + $this->client->method('reserve')->willReturn($job); + $service = new Beanstalkd($this->logger, $this->client); + $this->assertEquals(json_encode($jobData), $service->get()); + } + public function testGetException(): void + { + $this->client->method('watchTube')->willReturn($this->client); + $this->client->method('statsTube')->willReturn(['current-jobs-ready' => 0]); + $service = new Beanstalkd($this->logger, $this->client); + $this->expectException(MissingJob::class); + $service->get(); + + $this->client->method('statsTube')->willReturn(['current-jobs-ready' => 1]); + $exception = new JobException(); + $this->client->method('reserve')->willThrowException($exception); + $this->expectException(MissingJob::class); + $service->get(); + } + public function testSet(): void + { + $this->client->method('useTube')->willReturn($this->client); + $this->client->method('put'); + $service = new Beanstalkd($this->logger, $this->client); + $service->set('test'); + $this->assertTrue(true); + } + public function testSetException(): void + { + $this->client->method('useTube')->willReturn($this->client); + $exception = new JobException(); + $this->client->method('put')->willThrowException($exception); + $service = new Beanstalkd($this->logger, $this->client); + $service->set('test'); + $this->assertTrue(true); + } + public function testRemove(): void + { + $this->client->method('useTube')->willReturn($this->client); + $this->client->method('delete')->willReturn(true); + $service = new Beanstalkd($this->logger, $this->client); + $service->remove(1); + $this->assertTrue(true); + + $this->client->method('delete')->willReturn(false); + $service->remove(1); + $this->assertTrue(true); + } +} \ No newline at end of file diff --git a/app/tests/unit/src/Service/MQTTTest.php b/app/tests/unit/src/Service/MQTTTest.php new file mode 100644 index 0000000..33059f9 --- /dev/null +++ b/app/tests/unit/src/Service/MQTTTest.php @@ -0,0 +1,69 @@ +getMockBuilder(Beanstalkd::class)->disableOriginalConstructor()->getMock(); + $mqtt->register('beanstalkd', $beanstalkd); + $this->assertTrue($mqtt->clientExists('beanstalkd')); + } + public function testGetClient(): void + { + $mqtt = new MQTT(); + $logger = $this->getMockBuilder(LoggerInterface::class)->disableOriginalConstructor()->getMock(); + $client = $this->getMockBuilder(BeansClient::class)->disableOriginalConstructor()->getMock(); + $beanstalkd = new Beanstalkd($logger, $client); + $mqtt->register('beanstalkd', $beanstalkd); + $this->assertEquals($beanstalkd, $mqtt->getClient('beanstalkd')); + } + public function testGetClientException(): void + { + $mqtt = new MQTT(); + $this->expectException(MissingClient::class); + $mqtt->getClient('test'); + } + public function testExists(): void + { + $mqtt = new MQTT(); + $beanstalkd = $this->getMockBuilder(Beanstalkd::class)->disableOriginalConstructor()->getMock(); + $beanstalkd->method('exists')->willReturn(true); + $mqtt->register('beanstalkd', $beanstalkd); + $this->assertTrue($mqtt->exists('beanstalkd')); + } + public function testGet(): void + { + $mqtt = new MQTT(); + $beanstalkd = $this->getMockBuilder(Beanstalkd::class)->disableOriginalConstructor()->getMock(); + $beanstalkd->method('get')->willReturn('test'); + $mqtt->register('beanstalkd', $beanstalkd); + $this->assertEquals('test', $mqtt->get('beanstalkd')); + } + public function testSet(): void + { + $mqtt = new MQTT(); + $beanstalkd = $this->getMockBuilder(Beanstalkd::class)->disableOriginalConstructor()->getMock(); + $beanstalkd->method('set'); + $mqtt->register('beanstalkd', $beanstalkd); + $mqtt->set('test', 0, 'beanstalkd'); + $this->assertTrue(true); + } + public function testRemove(): void + { + $mqtt = new MQTT(); + $beanstalkd = $this->getMockBuilder(Beanstalkd::class)->disableOriginalConstructor()->getMock(); + $beanstalkd->method('remove'); + $mqtt->register('beanstalkd', $beanstalkd); + $mqtt->remove(0, 'beanstalkd'); + $this->assertTrue(true); + } +} diff --git a/app/tests/unit/src/Service/QueueTest.php b/app/tests/unit/src/Service/QueueTest.php new file mode 100644 index 0000000..af0bc34 --- /dev/null +++ b/app/tests/unit/src/Service/QueueTest.php @@ -0,0 +1,66 @@ +logger = $this->getMockBuilder(LoggerInterface::class) + ->disableOriginalConstructor() + ->getMock(); + $this->jobService = $this->getMockBuilder(Job::class) + ->disableOriginalConstructor() + ->getMock(); + $this->defaultWorker = $this->getMockBuilder(Worker::class) + ->disableOriginalConstructor() + ->getMock(); + } + + public function testRegister(): void + { + $queue = new Queue($this->logger, $this->jobService, $this->defaultWorker); + $worker = $this->getMockBuilder(Worker::class) + ->disableOriginalConstructor() + ->getMock(); + $queue->register('test', $worker); + $this->assertTrue(true); + } + public function testEnqueue(): void + { + $queue = new Queue($this->logger, $this->jobService, $this->defaultWorker); + $jobData = ['test' => 'test']; + $result = $queue->enqueue($jobData); + $this->assertTrue($result); + + $result = $queue->push($jobData); + $this->assertTrue($result); + } + public function testRun(): void + { + $queue = new Queue($this->logger, $this->jobService, $this->defaultWorker); + $result = $queue->run(); + $this->assertTrue($result); + + + $jobData = [ + 'type' => 'test', + ]; + $job = new Model\Job(); + $job->id = 1; + $job->configuration = $jobData; + $this->jobService->method('isPending')->willReturn(true); + $this->jobService->method('get')->willReturn($job); + $result = $queue->run(); + $this->assertTrue($result); + } +} \ No newline at end of file diff --git a/cli/composer.json b/cli/composer.json index a54ff9b..c22433f 100644 --- a/cli/composer.json +++ b/cli/composer.json @@ -2,10 +2,12 @@ "name": "incoviba/cli", "type": "project", "require": { + "ext-sockets": "*", "dragonmantank/cron-expression": "^3.4", "guzzlehttp/guzzle": "^7.8", "hollodotme/fast-cgi-client": "^3.1", "monolog/monolog": "^3.5", + "pda/pheanstalk": "^7.0", "php-di/php-di": "^7.0", "predis/predis": "^3.0", "symfony/console": "^6.3" diff --git a/cli/entrypoint b/cli/entrypoint index c74d879..d7fa9bb 100755 --- a/cli/entrypoint +++ b/cli/entrypoint @@ -6,8 +6,13 @@ then then CMD=$1 shift - $CMD -c "$@" - exit + if [[ $# -gt 0 ]] + then + $CMD -c "$@" + exit 0 + fi + $CMD + exit 0 fi fi diff --git a/cli/setup/setups/services.php b/cli/setup/setups/services.php index 2a9bc57..b666a8b 100644 --- a/cli/setup/setups/services.php +++ b/cli/setup/setups/services.php @@ -16,4 +16,15 @@ return [ } return new Predis\Client($options); }, + Pheanstalk\Pheanstalk::class => function(ContainerInterface $container) { + return Pheanstalk\Pheanstalk::create( + $container->get('BEANSTALKD_HOST'), + $container->has('BEANSTALKD_PORT') ? $container->get('BEANSTALKD_PORT') : 11300 + ); + }, + Incoviba\Service\MQTT\MQTTInterface::class => function(ContainerInterface $container) { + $service = new Incoviba\Service\MQTT($container->get(Psr\Log\LoggerInterface::class)); + $service->register('default', $container->get(Incoviba\Service\MQTT\Pheanstalk::class)); + return $service; + } ]; diff --git a/cli/src/Command/Job/Pending.php b/cli/src/Command/Job/Pending.php index d3a79a4..718b375 100644 --- a/cli/src/Command/Job/Pending.php +++ b/cli/src/Command/Job/Pending.php @@ -12,37 +12,11 @@ class Pending extends Console\Command\Command parent::__construct($name); } - protected function configure(): void - { - $this->addOption('full', 'f', Console\Input\InputOption::VALUE_NONE, 'Full output'); - } - protected function execute(Console\Input\InputInterface $input, Console\Output\OutputInterface $output): int { - $jobs = $this->jobService->getPending(); - $jobCount = count($jobs); + $jobCount = $this->jobService->getPending(); $output->writeln("Found {$jobCount} pending jobs"); - if ($input->getOption('full') and $jobCount > 0) { - $io = new Console\Style\SymfonyStyle($input, $output); - - $rows = []; - foreach ($jobs as $job) { - $retries = $job['retries'] ?? 0; - $updated = $job['updated_at'] ?? ''; - - $rows[] = [ - $job['id'], - $job['created_at'], - $job['configuration']['type'], - $retries, - $updated - ]; - } - - $io->table(['ID', 'Created', 'Type', 'Retries', 'Updated'], $rows); - } - return self::SUCCESS; } } diff --git a/cli/src/Command/Job/Run.php b/cli/src/Command/Job/Run.php index c99486f..5222c14 100644 --- a/cli/src/Command/Job/Run.php +++ b/cli/src/Command/Job/Run.php @@ -9,22 +9,16 @@ use Incoviba\Service; use Psr\Log\LoggerInterface; use Symfony\Component\Console; -#[Console\Attribute\AsCommand(name: 'jobs:run', description: 'Run jobs')] +#[Console\Attribute\AsCommand(name: 'jobs:run', description: 'Run job')] class Run extends Console\Command\Command { public function __construct(protected Service\FastCGI $fastcgi, protected LoggerInterface $logger, + protected Service\Job $jobService, protected DateTimeZone $timeZone, ?string $name = null) { parent::__construct($name); } - protected function configure(): void - { - $this->addArgument('job_ids', - Console\Input\InputArgument::IS_ARRAY | Console\Input\InputArgument::REQUIRED, 'Job IDs'); - } - - protected array $output = []; public function execute(Console\Input\InputInterface $input, Console\Output\OutputInterface $output): int { try { @@ -33,44 +27,18 @@ class Run extends Console\Command\Command $now = new DateTimeImmutable(); } - $jobIds = $input->getArgument('job_ids'); - $jobCount = count($jobIds); - - $this->pushOutput('top', ['message' => "[{$now->format('Y-m-d H:i:s e')}] Running {$jobCount} jobs..."]); - $this->pushOutput('bottom', ['table' => [ - ['Job IDs'], - array_map(function($row) {return [$row];},$jobIds) - ]]); - $this->pushOutput('top', ['progress' => $jobCount]); - $result = $this->runJobs($jobIds); - $this->pushOutput('top', ['progress' => 'finish']); - - $this->writeOutput($input, $output); - - return $result; - } - - protected function runJobs(array $jobIds): int - { - $pendingJobs = []; - foreach ($jobIds as $jobId) { - if (!$this->runJob($jobId)) { - $pendingJobs []= $jobId; - } + if ($this->jobService->getPending() === 0) { + $output->writeln("[{$now->format('Y-m-d H:i:s e')}] No pending jobs to run."); + return self::SUCCESS; } - $result = $this->getResponses(); - if (count($pendingJobs) > 0) { - if ($this->runJobs($pendingJobs) === self::FAILURE) { - $result = self::FAILURE; - } - } - return $result; + $output->writeln("[{$now->format('Y-m-d H:i:s e')}] Running Ready Job..."); + $this->runJob(); + return $this->getResponses(); } - protected function runJob(int $jobId): bool + protected function runJob(): bool { - $uri = "/api/queue/run/{$jobId}"; - $this->pushOutput('bottom', ['message' => "GET {$uri}"]); + $uri = "/api/queue/run"; try { $this->fastcgi->get($uri); @@ -85,7 +53,6 @@ class Run extends Console\Command\Command $result = self::SUCCESS; $responses = $this->fastcgi->awaitResponses(); foreach ($responses as $response) { - $this->pushOutput('top', ['progress' => 'advance']); if ($response->getError() !== '') { $this->logger->error("Error running job", [ 'error' => $response->getError(), @@ -93,100 +60,8 @@ class Run extends Console\Command\Command 'headers' => $response->getHeaders(), ]); $result = self::FAILURE; - continue; } - $this->pushOutput('bottom', ['message' => $response->getBody()]); } return $result; } - - protected function pushOutput(string $section, array $configuration): void - { - if (!isset($this->output[$section])) { - $this->output[$section] = []; - } - foreach ($configuration as $key => $value) { - if (!isset($this->output[$section][$key])) { - $this->output[$section][$key] = []; - } - $this->output[$section][$key] []= $value; - } - if (isset($this->output[$section]['progress'])) { - usort($this->output[$section]['progress'], function($a, $b) { - if ($a === $b) { - return 0; - } - if (is_int($a)) { - return -1; - } - if (is_int($b)) { - return 1; - } - if ($a === 'finish') { - return 1; - } - if ($b === 'finish') { - return -1; - } - return 0; - }); - } - } - protected function writeOutput(Console\Input\InputInterface $input, Console\Output\OutputInterface $output): void - { - $sectionNames = array_keys($this->output); - $ios = []; - foreach ($sectionNames as $sectionName) { - $section = $output->section(); - $ios[$sectionName] = new Console\Style\SymfonyStyle($input, $section); - } - - foreach ($this->output as $sectionName => $configurations) { - $io = $ios[$sectionName]; - $this->writeSection($io, $configurations); - } - } - protected function writeSection(Console\Style\SymfonyStyle $io, array $configurations): void - { - if (array_key_exists('table', $configurations)) { - $this->writeTables($io, $configurations['table']); - } - if (array_key_exists('progress', $configurations)) { - $this->writeProgress($io, $configurations['progress']); - } - if (array_key_exists('message', $configurations)) { - $this->writeMessages($io, $configurations['message']); - } - } - protected function writeTables(Console\Style\SymfonyStyle $io, array $tableConfigurations): void - { - foreach ($tableConfigurations as $tableData) { - $io->table(...$tableData); - } - } - protected function writeMessages(Console\Style\SymfonyStyle $io, array $messages): void - { - foreach ($messages as $message) { - $io->writeln($message); - } - } - protected function writeProgress(Console\Style\SymfonyStyle $io, array $progresses): void - { - $progressBar = null; - foreach ($progresses as $progress) { - if ($progress === 'advance' and $progressBar !== null) { - $progressBar->advance(); - continue; - } - if ($progress === 'finish' and $progressBar !== null) { - $progressBar->finish(); - continue; - } - if (in_array($progress, ['finish', 'advance'])) { - continue; - } - $progressBar = $io->createProgressBar($progress); - } - $io->newLine(); - } } diff --git a/cli/src/Command/Queue.php b/cli/src/Command/Queue.php index faf19cd..302ee0c 100644 --- a/cli/src/Command/Queue.php +++ b/cli/src/Command/Queue.php @@ -35,85 +35,22 @@ class Queue extends Command ]; $io = new Console\Style\SymfonyStyle($input, $this->sections['top']); $now = new DateTimeImmutable('now', $this->timezone); - $io->title("[{$now->format('Y-m-d H:i:s e')}] Running Queue..."); - $jobs = $this->getJobs(); - $jobCount = count($jobs); - if ($jobCount === 0) { - return Console\Command\Command::SUCCESS; + if ($this->jobService->getPending() === 0) { + $io->success("[{$now->format('Y-m-d H:i:s e')}] Queue is empty"); + return self::SUCCESS; } - $io->writeln("Found {$jobCount} jobs to run"); - $result = $this->runJob($jobs[0]); - /*$result = $this->runJobs($io, $jobs); - foreach ($this->outputs as $output) { - $this->sections['bottom']->writeln($output); - }*/ - return $result; + $io->title("[{$now->format('Y-m-d H:i:s e')}] Running Queue..."); + return $this->runJob(); } protected array $sections; - - protected function getJobs(): array - { - $this->logger->debug("Getting jobs"); - $jobs = $this->jobService->getPending(); - $jobCount = count($jobs); - if ($jobCount === 0) { - $this->logger->debug("No jobs to run"); - return []; - } - $this->logger->debug("Found {$jobCount} jobs"); - return array_column($jobs, 'id'); - } - protected function runJobs(Console\Style\SymfonyStyle $io, array $jobs): int - { - $chunks = array_chunk($jobs, $this->batchSize); - $chunkCount = count($chunks); - $result = self::SUCCESS; - $progress1 = $io->createProgressBar($chunkCount); - $progress1->start(); - foreach ($chunks as $chunk) { - if ($this->runJobBatch($chunk) === self::FAILURE) { - $result = self::FAILURE; - } - $progress1->advance(); - } - $progress1->finish(); - return $result; - } - protected array $outputs = []; - protected function runJobBatch(array $jobIds): int + protected function runJob(): int { $baseCommand = "{$this->baseCommand} jobs:run"; - - $jobsLine = implode(' ', $jobIds); - $command = "{$baseCommand} {$jobsLine}"; - - try { - exec($command, $output, $resultCode); - $this->outputs []= $output; - } catch (Throwable $exception) { - $this->logger->error("Failed to run command", [ - 'command' => $command, - 'exception' => $exception - ]); - return self::FAILURE; - } - if ($resultCode !== 0) { - $this->logger->error("Failed to run command", [ - 'command' => $command, - 'result_code' => $resultCode - ]); - return self::FAILURE; - } - return self::SUCCESS; - } - protected function runJob(int $jobId): int - { - $baseCommand = "{$this->baseCommand} jobs:run"; - $command = "{$baseCommand} {$jobId}"; + $command = "{$baseCommand}"; try { exec($command, $output, $resultCode); $this->outputs []= $output; diff --git a/cli/src/Exception/MQTT.php b/cli/src/Exception/MQTT.php new file mode 100644 index 0000000..0e59b39 --- /dev/null +++ b/cli/src/Exception/MQTT.php @@ -0,0 +1,18 @@ +redisKey = 'jobs'; - } + public function __construct(protected LoggerInterface $logger, protected MQTTInterface $mqttService) {} protected string $redisKey; - public function getPending(): array + public function getPending(): int { try { - $jobs = $this->redisService->get($this->redisKey); - return json_decode($jobs, true); - } catch (ConnectionException|Exception $exception) { - $exception = new Exception("Could not read {$this->redisKey} from Redis", $exception->getCode(), $exception); + return $this->mqttService->pending(); + } catch (MQTTException $exception) { $this->logger->warning($exception->getMessage(), ['exception' => $exception]); - return []; + return 0; } } @@ -44,9 +39,11 @@ class Job 'updated_at' => null, 'retries' => 0 ]; - $jobs = $this->getPending(); - $jobs []= $data; - $this->redisService->set($this->redisKey, json_encode($jobs), -1); + try { + $this->mqttService->set(json_encode($data)); + } catch (MQTTException $exception) { + $this->logger->warning($exception->getMessage(), ['exception' => $exception]); + } return $data; } } diff --git a/cli/src/Service/MQTT.php b/cli/src/Service/MQTT.php new file mode 100644 index 0000000..9b4980b --- /dev/null +++ b/cli/src/Service/MQTT.php @@ -0,0 +1,124 @@ +transports[$name] = $transport; + return $this; + } + + /** + * @param string $payload + * @param int $delay + * @param string|null $transportName + * @return $this + * @throws MQTTException\UnknownTransport + * @throws MQTTException\Create + */ + public function set(string $payload, int $delay = 0, ?string $transportName = null): self + { + $transport = $this->getTransport($transportName); + $transport->set($payload, $delay); + return $this; + } + + /** + * @param string|null $transportName + * @return int + * @throws MQTTException\UnknownTransport + * @throws MQTTException\Read + */ + public function pending(?string $transportName = null): int + { + $transport = $this->getTransport($transportName); + return $transport->pending(); + } + + /** + * @param int|null $jobId + * @param string|null $transportName + * @return bool + * @throws MQTTException\UnknownTransport + * @throws MQTTException\Read + */ + public function exists(?int $jobId = null, ?string $transportName = null): bool + { + $transport = $this->getTransport($transportName); + return $transport->exists($jobId); + } + + /** + * @param int|null $jobId + * @param string|null $transportName + * @return string + * @throws MQTTException\UnknownTransport + * @throws MQTTException\Read + */ + public function get(?int $jobId = null, ?string $transportName = null): string + { + $transport = $this->getTransport($transportName); + return $transport->get($jobId); + } + + /** + * @param string $newPayload + * @param int|null $jobId + * @param string|null $transportName + * @return $this + * @throws MQTTException\UnknownTransport + * @throws MQTTException\Update + */ + public function update(string $newPayload, ?int $jobId = null, ?string $transportName = null): self + { + $transport = $this->getTransport($transportName); + $transport->update($newPayload, $jobId); + return $this; + } + + /** + * @param int|null $jobId + * @param string|null $transportName + * @return $this + * @throws MQTTException\UnknownTransport + * @throws MQTTException\Delete + */ + public function remove(?int $jobId = null, ?string $transportName = null): self + { + $transport = $this->getTransport($transportName); + $transport->remove($jobId); + return $this; + } + + /** + * @param string|null $transportName + * @return mixed + * @throws MQTTException\UnknownTransport + */ + protected function getTransport(?string $transportName): mixed + { + if (count($this->transports) === 0) { + throw new MQTTException\UnknownTransport(''); + } + if ($transportName === null) { + if (array_key_exists('default', $this->transports)) { + $transportName = 'default'; + } else { + $transportName = array_keys($this->transports)[0]; + } + } + if (!array_key_exists($transportName, $this->transports)) { + if ($transportName === null) { + $transportName = ''; + } + throw new MQTTException\UnknownTransport($transportName); + } + return $this->transports[$transportName]; + } +} diff --git a/cli/src/Service/MQTT/Beanstalkd.php b/cli/src/Service/MQTT/Beanstalkd.php new file mode 100644 index 0000000..6330ed6 --- /dev/null +++ b/cli/src/Service/MQTT/Beanstalkd.php @@ -0,0 +1,127 @@ +client->put($payload, $this->ttr, $this->priority, $delay); + } catch (Exception $exception) { + throw new MQTT\Create($this->tube, $payload, $exception); + } + return $this; + } + + /** + * @return int + * @throws MQTT\Read + */ + public function pending(): int + { + try { + $stats = $this->client + ->statsTube($this->tube); + } catch (Exception $exception) { + throw new MQTT\Read($this->tube, $exception); + } + if (!array_key_exists('current-jobs-ready', $stats)) { + throw new MQTT\Read($this->tube); + } + return $stats['current-jobs-ready']; + } + + /** + * @param int|null $jobId + * @return bool + * @throws MQTT\Read + */ + public function exists(?int $jobId = null): bool + { + return $this->pending() > 0; + } + + protected int $currentJobId; + + /** + * @param int|null $jobId + * @return string + * @throws MQTT\Read + */ + public function get(?int $jobId = null): string + { + try { + if ($jobId !== null) { + $job = (object) $this->client + ->reserveJob($jobId); + } else { + $job = (object) $this->client + ->reserve(); + } + } catch (Exception $exception) { + throw new MQTT\Read($this->tube, $exception); + } + $this->currentJobId = $job->id; + return $job->payload; + } + + /** + * @param string $newPayload + * @param int|null $jobId + * @return self + * @throws MQTT\Update + */ + public function update(string $newPayload, ?int $jobId = null): self + { + try { + $this->remove($jobId); + $this->set($newPayload); + } catch (MQTT\Delete | MQTT\Create $exception) { + throw new MQTT\Update($this->tube, $newPayload, $jobId, $exception); + } + return $this; + } + + /** + * @param int|null $jobId + * @return self + * @throws MQTT\Delete + */ + public function remove(?int $jobId = null): self + { + try { + if ($jobId === null) { + $jobId = $this->currentJobId; + } + $this->client + ->delete($jobId); + } catch (Exception $exception) { + throw new MQTT\Delete($this->tube, $jobId, $exception); + } + return $this; + } +} diff --git a/cli/src/Service/MQTT/MQTTInterface.php b/cli/src/Service/MQTT/MQTTInterface.php new file mode 100644 index 0000000..6e32de0 --- /dev/null +++ b/cli/src/Service/MQTT/MQTTInterface.php @@ -0,0 +1,12 @@ +tube = new PBA\Values\TubeName($tubeName); + } + + protected PBA\Values\TubeName $tube; + + public function set(string $payload, int $delay = 0): self + { + $this->client->useTube($this->tube); + $this->client->put($payload, $delay); + return $this; + } + + public function pending(): int + { + $stats = $this->client->statsTube($this->tube); + return $stats->currentJobsReady; + } + + public function exists(?int $jobId = null): bool + { + return $this->pending() > 0; + } + protected int $currentJobId; + public function get(?int $jobId = null): string + { + $this->client->watch($this->tube); + if ($jobId !== null) { + $jobId = new PBA\Values\JobId($jobId); + $job = $this->client->reserveJob($jobId); + } else { + $job = $this->client->reserve(); + } + $this->currentJobId = $job->getId(); + return $job->getData(); + } + + public function update(string $newPayload, ?int $jobId = null): self + { + $this->remove($jobId); + $this->set($newPayload); + return $this; + } + + public function remove(?int $jobId = null): self + { + if ($jobId === null) { + $jobId = $this->currentJobId; + } + $this->client->watch($this->tube); + $this->client->delete(new PBA\Values\JobId($jobId)); + return $this; + } +} diff --git a/mqtt.compose.yml b/mqtt.compose.yml new file mode 100644 index 0000000..c56fa15 --- /dev/null +++ b/mqtt.compose.yml @@ -0,0 +1,21 @@ +services: + mqtt: + profiles: + - mqtt + container_name: incoviba_mqtt + image: maateen/docker-beanstalkd + restart: unless-stopped + volumes: + - incoviba_mqtt:/var/lib/beanstalkd + + mqtt-admin: + profiles: + - mqtt + container_name: incoviba_mqtt_admin + image: mitulislam/beanstalkd-aurora:latest + restart: unless-stopped + ports: + - "8093:3000" + +volumes: + incoviba_mqtt: {} diff --git a/testing.compose.yml b/testing.compose.yml index 5f851cb..acf3810 100644 --- a/testing.compose.yml +++ b/testing.compose.yml @@ -18,6 +18,8 @@ services: condition: service_healthy test-redis: condition: service_healthy + test-mqtt: + condition: service_started test-db: profiles: @@ -48,6 +50,19 @@ services: networks: - testing + test-mqtt: + profiles: + - testing + image: maateen/docker-beanstalkd + container_name: incoviba_test_mqtt + healthcheck: + test: [ "CMD", "nc", "-z", "localhost", "11300" ] + interval: 5s + timeout: 5s + retries: 5 + networks: + - testing + volumes: test-db: {}