Command queue

This commit is contained in:
2022-03-25 15:03:49 -03:00
parent e3737aba27
commit dbad283e14
7 changed files with 222 additions and 1 deletions

View File

@ -0,0 +1,47 @@
<?php
namespace Contabilidad\Common\Controller;
use Psr\Http\Message\ServerRequestInterface as Request;
use Psr\Http\Message\ResponseInterface as Response;
use ProVM\Common\Factory\Model as Factory;
use ProVM\Common\Define\Controller\Json;
use Contabilidad\Queue;
class Queues {
use Json;
public function __invoke(Request $request, Response $response, Factory $factory): Response {
$queues = $factory->find(Queue::class)->many();
$output = [
'queues' => array_map(function($item) {return $item->toArray();}, $queues)
];
return $this->withJson($response, $output);
}
public function pending(Request $request, Response $response, Factory $factory): Response {
$pending = $factory->find(Queue::class)->where([['processed', 0]])->many();
$output = [
'pending' => array_map(function($item) {return $item->toArray();}, $pending)
];
return $this->withJson($response, $output);
}
public function processed(Request $request, Response $response, Factory $factory): Response {
$input = json_decode($request->getBody()->getContents());
$output = [
'input' => $input,
'queues' => []
];
if (!is_array($input->processed)) {
$input->processed = [$input->processed];
}
foreach ($input->processed as $id) {
$queue = $factory->find(Queue::class)->one($id);
$queue->setProcessed(true);
$status = $queue->save();
$output['queues'] []= [
'queue' => $queue->toArray(),
'processed' => $status
];
}
return $this->withJson($response, $output);
}
}

View File

@ -0,0 +1,8 @@
<?php
use Contabilidad\Common\Controller\Queues;
$app->group('/queues', function($app) {
$app->get('/pending', [Queues::class, 'pending']);
$app->post('/processed', [Queues::class, 'processed']);
$app->get('[/]', Queues::class);
});

83
api/src/Queue.php Normal file
View File

@ -0,0 +1,83 @@
<?php
namespace Contabilidad;
use DateTimeInterface;
use Carbon\Carbon;
use ProVM\Common\Alias\Model;
use ProVM\Common\Factory\Model as Factory;
/**
* @property int $id
* @property string $command
* @property DateTimeInterface $created
* @property bool $processed
*/
class Queue extends Model {
public static $_table = 'queue';
protected static $fields = ['command', 'created', 'processed'];
public function created(DateTimeInterface $fecha = null) {
if ($fecha === null) {
return Carbon::parse($this->created);
}
$this->created = $fecha->format('Y-m-d H:i:s');
return $this;
}
public function hasArguments() {
return Model::factory(QueueArgument::class)
->whereEqual('queue_id', $this->id)
->groupBy('queue_id')
->count('id') > 0;
}
protected $arguments;
public function arguments() {
if ($this->arguments === null) {
$this->arguments = $this->parentOf(QueueArgument::class, [Model::CHILD_KEY => 'queue_id']);
}
return $this->arguments;
}
public function isProcessed() {
return $this->processed > 0;
}
public function setProcessed(bool $processed) {
$this->processed = $processed ? 1 : 0;
return $this;
}
public static function find(Factory $factory, $data) {
$where = [
'command' => $data['command'],
'processed' => $data['processed'] ?? 0
];
array_walk($where, function(&$item, $key) {
$item = [$key, $item];
});
$where = array_values($where);
return $factory->find(Queue::class)->where($where)->one();
}
public function toArray(): array
{
$arr = parent::toArray();
$cmd = [(string) $this];
$arr['arguments'] = [];
if ($this->hasArguments()) {
$arr['arguments'] = array_map(function($item) use (&$cmd) {
$cmd []= (string) $item;
return $item->toArray();
}, $this->arguments());
}
$arr['cmd'] = implode(' ', $cmd);
return $arr;
}
public function __toString(): string {
$str = "{$this->command}";
$arguments = $this->arguments();
if ($arguments !== null and count($arguments) > 0) {
$arguments = implode(' ', array_map(function($item) {return (string) $item;}, $arguments));
$str .= " {$arguments}";
}
return $str;
}
}

26
api/src/QueueArgument.php Normal file
View File

@ -0,0 +1,26 @@
<?php
namespace Contabilidad;
use ProVM\Common\Alias\Model;
/**
* @property int $id
* @property Queue $queue_id
* @property string $argument
* @property string $value
*/
class QueueArgument extends Model {
public static $_table = 'queue_arguments';
protected $queue;
public function queue() {
if ($this->queue === null) {
$this->queue = $this->childOf(Queue::class, [Model::SELF_KEY => 'queue_id']);
}
return $this->queue;
}
public function __toString(): string {
return "{$this->argument}='{$this->value}'";
}
}

View File

@ -0,0 +1,53 @@
<?php
namespace Contabilidad\Common\Command;
use Psr\Http\Client\ClientInterface;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;
class Queue extends Command {
protected $client;
public function __construct(ClientInterface $client = null, string $name = null) {
parent::__construct($name);
$this->setClient($client);
}
public function setClient(ClientInterface $client) {
$this->client = $client;
return $this;
}
public function getClient(): ClientInterface {
return $this->client;
}
public function execute(InputInterface $input, OutputInterface $output)
{
$response = $this->getClient()->get('/queues/pending');
if ($response->getStatusCode() !== 200) {
return Command::FAILURE;
}
$input = json_decode($response->getBody()->getContents());
$output = [
'input' => $input,
'processed' => []
];
foreach ($input->pending as $queue) {
$log = "Running {$queue->command} from queue. Created in {$queue->created}.";
error_log($log);
$cmd = '/usr/local/bin/php /app/bin/console ' . $queue->cmd;
exec($cmd, $result, $code);
if ($code != Command::SUCCESS) {
error_log(var_export($queue, true));
error_log(var_export($result, true));
continue;
}
$output['processed'] []= $queue->id;
}
$response = $this->getClient()->post('/queues/processed', ['json' => $output]);
if ($response->getStatusCode() !== 200) {
return Command::FAILURE;
}
return Command::SUCCESS;
}
}

View File

@ -21,4 +21,4 @@
# For more information see the manual pages of crontab(5) and cron(8)
#
# m h dom mon dow command
0 2 1 * * /usr/local/bin/php /app/bin/console consolidar
0 2 * * * /usr/local/bin/php /app/bin/console queue

View File

@ -0,0 +1,4 @@
<?php
use Contabilidad\Common\Command\Queue;
$app->add(new Queue($app->getContainer()->get(\Psr\Http\Client\ClientInterface::class), 'queue'));