Queue command with direct redis access so it's faster
This commit is contained in:
@ -6,6 +6,7 @@
|
||||
"guzzlehttp/guzzle": "^7.8",
|
||||
"monolog/monolog": "^3.5",
|
||||
"php-di/php-di": "^7.0",
|
||||
"predis/predis": "^3.0",
|
||||
"symfony/console": "^6.3"
|
||||
},
|
||||
"require-dev": {
|
||||
|
@ -4,7 +4,7 @@ use Psr\Container\ContainerInterface;
|
||||
return [
|
||||
Psr\Log\LoggerInterface::class => function(ContainerInterface $container) {
|
||||
$minLogLevel = Monolog\Level::Debug;
|
||||
if ($container->has('DEBUG') and !$container->get('DEBUG')) {
|
||||
if ($container->has('DEBUG') and $container->get('DEBUG') === 'false') {
|
||||
$minLogLevel = Monolog\Level::Warning;
|
||||
}
|
||||
$handlers = [];
|
||||
|
19
cli/setup/setups/services.php
Normal file
19
cli/setup/setups/services.php
Normal file
@ -0,0 +1,19 @@
|
||||
<?php
|
||||
use Psr\Container\ContainerInterface;
|
||||
|
||||
return [
|
||||
Predis\ClientInterface::class => function(ContainerInterface $container) {
|
||||
$options = [
|
||||
'scheme' => 'tcp',
|
||||
'host' => $container->get('REDIS_HOST'),
|
||||
'port' => $container->get('REDIS_PORT')
|
||||
];
|
||||
if ($container->has('REDIS_USER')) {
|
||||
$options['username'] = $container->get('REDIS_USER');
|
||||
}
|
||||
if ($container->has('REDIS_PASSWORD')) {
|
||||
$options['password'] = $container->get('REDIS_PASSWORD');
|
||||
}
|
||||
return new Predis\Client($options);
|
||||
},
|
||||
];
|
@ -2,7 +2,10 @@
|
||||
namespace Incoviba\Command;
|
||||
|
||||
use DateTimeImmutable;
|
||||
use Psr\Http\Client\ClientInterface;
|
||||
use Psr\Log\LoggerInterface;
|
||||
use Symfony\Component\Console;
|
||||
use Incoviba\Service\Job;
|
||||
use Incoviba\Common\Alias\Command;
|
||||
|
||||
#[Console\Attribute\AsCommand(
|
||||
@ -11,6 +14,12 @@ use Incoviba\Common\Alias\Command;
|
||||
)]
|
||||
class Queue extends Command
|
||||
{
|
||||
public function __construct(ClientInterface $client, LoggerInterface $logger,
|
||||
protected Job $jobService, ?string $name = null)
|
||||
{
|
||||
parent::__construct($client, $logger, $name);
|
||||
}
|
||||
|
||||
public function execute(Console\Input\InputInterface $input, Console\Output\OutputInterface $output): int
|
||||
{
|
||||
$this->logger->debug("Running {$this->getName()}");
|
||||
@ -19,29 +28,18 @@ class Queue extends Command
|
||||
$io->title("[{$now->format('Y-m-d H:i:s e')}] Running Queue...");
|
||||
|
||||
$jobs = $this->getJobs($output);
|
||||
if (is_int($jobs)) {
|
||||
return $jobs;
|
||||
if (count($jobs) === 0) {
|
||||
return Console\Command\Command::SUCCESS;
|
||||
}
|
||||
|
||||
return $this->runJobs($output, $jobs);
|
||||
}
|
||||
|
||||
protected function getJobs(Console\Output\OutputInterface $output): int|array
|
||||
protected function getJobs(Console\Output\OutputInterface $output): array
|
||||
{
|
||||
$uri = '/api/queue/jobs';
|
||||
$output->writeln("GET {$uri}");
|
||||
$response = $this->client->get($uri);
|
||||
$output->writeln("Response Code: {$response->getStatusCode()}");
|
||||
if ($response->getStatusCode() !== 200) {
|
||||
return Console\Command\Command::FAILURE;
|
||||
}
|
||||
|
||||
$contents = $response->getBody()->getContents();
|
||||
if (empty($contents)) {
|
||||
return [];
|
||||
}
|
||||
|
||||
return json_decode($contents, true)['jobs'];
|
||||
$this->logger->debug("Getting jobs");
|
||||
$jobs = $this->jobService->getPending();
|
||||
return array_column($jobs, 'id');
|
||||
}
|
||||
protected function runJobs(Console\Output\OutputInterface $output, array $jobs): int
|
||||
{
|
||||
|
26
cli/src/Service/Job.php
Normal file
26
cli/src/Service/Job.php
Normal file
@ -0,0 +1,26 @@
|
||||
<?php
|
||||
namespace Incoviba\Service;
|
||||
|
||||
use Exception;
|
||||
use Predis\Connection\ConnectionException;
|
||||
use Psr\Log\LoggerInterface;
|
||||
|
||||
class Job
|
||||
{
|
||||
public function __construct(protected LoggerInterface $logger, protected Redis $redisService)
|
||||
{
|
||||
$this->redisKey = 'jobs';
|
||||
}
|
||||
protected string $redisKey;
|
||||
|
||||
public function getPending(): array
|
||||
{
|
||||
try {
|
||||
$jobs = $this->redisService->get($this->redisKey);
|
||||
return json_decode($jobs, true);
|
||||
} catch (ConnectionException|Exception $exception) {
|
||||
$this->logger->warning($exception);
|
||||
return [];
|
||||
}
|
||||
}
|
||||
}
|
41
cli/src/Service/Redis.php
Normal file
41
cli/src/Service/Redis.php
Normal file
@ -0,0 +1,41 @@
|
||||
<?php
|
||||
namespace Incoviba\Service;
|
||||
|
||||
use Exception;
|
||||
use Predis\ClientInterface;
|
||||
use Predis\Connection\ConnectionException;
|
||||
|
||||
class Redis
|
||||
{
|
||||
public function __construct(protected ClientInterface $client) {}
|
||||
|
||||
/**
|
||||
* @param string $name
|
||||
* @return string|null
|
||||
* @throws Exception|ConnectionException
|
||||
*/
|
||||
public function get(string $name): ?string
|
||||
{
|
||||
if (!$this->client->exists($name)) {
|
||||
throw new Exception($name);
|
||||
}
|
||||
return $this->client->get($name);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param string $name
|
||||
* @param mixed $value
|
||||
* @param int $expirationTTL
|
||||
* @return void
|
||||
* @throws ConnectionException
|
||||
*/
|
||||
public function set(string $name, mixed $value, int $expirationTTL = 60 * 60 * 24): void
|
||||
{
|
||||
$resolution = 'EX';
|
||||
if ($expirationTTL === -1) {
|
||||
$resolution = null;
|
||||
$expirationTTL = null;
|
||||
}
|
||||
$this->client->set($name, $value, $resolution, $expirationTTL);
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user