|
|
<?php |
|
|
|
|
|
namespace Kanboard\Core\Queue; |
|
|
|
|
|
use Kanboard\Core\Base; |
|
|
use Kanboard\Job\BaseJob; |
|
|
use LogicException; |
|
|
use SimpleQueue\Queue; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class QueueManager extends Base |
|
|
{ |
|
|
|
|
|
|
|
|
|
|
|
protected $queue = null; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
public function setQueue(Queue $queue) |
|
|
{ |
|
|
$this->queue = $queue; |
|
|
return $this; |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
public function push(BaseJob $job) |
|
|
{ |
|
|
$jobClassName = get_class($job); |
|
|
|
|
|
if ($this->queue !== null) { |
|
|
$this->logger->debug(__METHOD__.': Job pushed in queue: '.$jobClassName); |
|
|
$this->queue->push(JobHandler::getInstance($this->container)->serializeJob($job)); |
|
|
} else { |
|
|
$this->logger->debug(__METHOD__.': Job executed synchronously: '.$jobClassName); |
|
|
call_user_func_array(array($job, 'execute'), $job->getJobParams()); |
|
|
} |
|
|
|
|
|
return $this; |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
public function listen() |
|
|
{ |
|
|
if ($this->queue === null) { |
|
|
throw new LogicException('No queue driver defined or unable to connect to broker!'); |
|
|
} |
|
|
|
|
|
while ($job = $this->queue->pull()) { |
|
|
JobHandler::getInstance($this->container)->executeJob($job); |
|
|
$this->queue->completed($job); |
|
|
} |
|
|
} |
|
|
} |
|
|
|