| <?php |
|
|
| namespace SimpleQueue\Adapter; |
|
|
| use DateTime; |
| use Pheanstalk\Job as BeanstalkJob; |
| use Pheanstalk\Pheanstalk; |
| use Pheanstalk\PheanstalkInterface; |
| use SimpleQueue\Job; |
| use SimpleQueue\QueueAdapterInterface; |
|
|
| |
| |
| |
| |
| |
| class BeanstalkQueueAdapter implements QueueAdapterInterface |
| { |
| |
| |
| |
| protected $beanstalk; |
|
|
| |
| |
| |
| protected $queueName = ''; |
|
|
| |
| |
| |
| |
| |
| |
| public function __construct(PheanstalkInterface $beanstalk, $queueName) |
| { |
| $this->beanstalk = $beanstalk; |
| $this->queueName = $queueName; |
| } |
|
|
| |
| |
| |
| |
| |
| |
| |
| public function push(Job $job) |
| { |
| $this->beanstalk->putInTube($this->queueName, $job->serialize()); |
| return $this; |
| } |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| public function schedule(Job $job, DateTime $dateTime) |
| { |
| $now = new DateTime(); |
| $when = clone($dateTime); |
| $delay = $when->getTimestamp() - $now->getTimestamp(); |
| |
| $this->beanstalk->putInTube($this->queueName, $job->serialize(), Pheanstalk::DEFAULT_PRIORITY, $delay); |
| return $this; |
| } |
|
|
| |
| |
| |
| |
| |
| |
| public function pull() |
| { |
| $beanstalkJob = $this->beanstalk->reserveFromTube($this->queueName); |
|
|
| if ($beanstalkJob === false) { |
| return null; |
| } |
|
|
| $job = new Job(); |
| $job->setId($beanstalkJob->getId()); |
| $job->unserialize($beanstalkJob->getData()); |
|
|
| return $job; |
| } |
|
|
| |
| |
| |
| |
| |
| |
| |
| public function completed(Job $job) |
| { |
| $beanstalkJob = new BeanstalkJob($job->getId(), $job->serialize()); |
| $this->beanstalk->delete($beanstalkJob); |
| return $this; |
| } |
|
|
| |
| |
| |
| |
| |
| |
| |
| public function failed(Job $job) |
| { |
| $beanstalkJob = new BeanstalkJob($job->getId(), $job->serialize()); |
| $this->beanstalk->bury($beanstalkJob); |
| return $this; |
| } |
| } |
|
|