112 lines
3.4 KiB
PHP
112 lines
3.4 KiB
PHP
<?php
|
|
namespace Incoviba\Service\MQTT;
|
|
|
|
use Exception;
|
|
use Incoviba\Exception\MQTT\MissingJob;
|
|
use Incoviba\Exception\MQTT\RemoveJob;
|
|
use Incoviba\Exception\MQTT\SetJob;
|
|
use Psr\Log\LoggerInterface;
|
|
use xobotyi\beansclient;
|
|
use xobotyi\beansclient\Exception\ClientException;
|
|
use xobotyi\beansclient\Exception\CommandException;
|
|
use xobotyi\beansclient\Exception\JobException;
|
|
|
|
class Beanstalkd implements MQTTInterface
|
|
{
|
|
/**
|
|
* @throws JobException
|
|
* @throws ClientException
|
|
* @throws CommandException
|
|
*/
|
|
public function __construct(protected LoggerInterface $logger, protected beansclient\BeansClient $client,
|
|
protected string $tube = 'default', protected int $ttr = beansclient\BeansClient::DEFAULT_TTR,
|
|
protected int $priority = 1)
|
|
{
|
|
$this->client->watchTube($this->tube);
|
|
}
|
|
|
|
public function exists(): bool
|
|
{
|
|
try {
|
|
$stats = $this->client->statsTube($this->tube);
|
|
return $stats['current-jobs-ready'] > 0;
|
|
} catch (Exception $exception) {
|
|
$this->logger->error($exception->getMessage(), ['exception' => $exception]);
|
|
return false;
|
|
}
|
|
}
|
|
|
|
protected ?beansclient\Job $currentJob = null;
|
|
public function get(): string
|
|
{
|
|
if (!$this->exists()) {
|
|
throw new MissingJob();
|
|
}
|
|
try {
|
|
$job = $this->client->watchTube($this->tube)->reserve();
|
|
$this->currentJob = $job;
|
|
return $job->payload;
|
|
} catch (Exception $exception) {
|
|
$this->logger->error($exception->getMessage(), ['exception' => $exception]);
|
|
throw new MissingJob($exception);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* @param string $value
|
|
* @param int $delay
|
|
* @return $this
|
|
* @throws SetJob
|
|
*/
|
|
public function set(string $value, int $delay = 0): self
|
|
{
|
|
try {
|
|
$this->client->useTube($this->tube)->put($value, $this->priority, $delay, $this->ttr ?? 0);
|
|
} catch (Exception $exception) {
|
|
$this->logger->error($exception->getMessage(), ['payload' => $value, 'delay' => $delay, 'exception' => $exception]);
|
|
throw new SetJob($value, $exception);
|
|
}
|
|
return $this;
|
|
}
|
|
|
|
/**
|
|
* @param string $newPayload
|
|
* @param int|null $jobId
|
|
* @return self
|
|
* @throws RemoveJob
|
|
* @throws SetJob
|
|
*/
|
|
public function update(string $newPayload, ?int $jobId = null): self
|
|
{
|
|
if ($jobId === null) {
|
|
$jobId = $this->currentJob->id;
|
|
}
|
|
return $this->remove($jobId)
|
|
->set($newPayload);
|
|
}
|
|
|
|
/**
|
|
* @param int|null $jobId
|
|
* @return $this
|
|
* @throws RemoveJob
|
|
*/
|
|
public function remove(?int $jobId = null): self
|
|
{
|
|
if ($jobId === null) {
|
|
$jobId = $this->currentJob->id;
|
|
}
|
|
try {
|
|
if (!$this->client->useTube($this->tube)->delete($jobId)) {
|
|
throw new JobException("Failed to delete job {$jobId}");
|
|
}
|
|
if ($this->currentJob !== null && $this->currentJob->id === $jobId) {
|
|
$this->currentJob = null;
|
|
}
|
|
} catch (Exception $exception) {
|
|
$this->logger->error($exception->getMessage(), ['jobId' => $jobId, 'exception' => $exception]);
|
|
throw new RemoveJob($jobId, $exception);
|
|
}
|
|
return $this;
|
|
}
|
|
}
|