| <?php |
|
|
| namespace SimpleQueue\Adapter; |
|
|
| use DateTime; |
| use PhpAmqpLib\Channel\AMQPChannel; |
| use PhpAmqpLib\Message\AMQPMessage; |
| use PhpAmqpLib\Wire\AMQPTable; |
| use SimpleQueue\Job; |
| use SimpleQueue\QueueAdapterInterface; |
|
|
| |
| |
| |
| |
| |
| class AmqpQueueAdapter implements QueueAdapterInterface |
| { |
| |
| |
| |
| protected $channel; |
|
|
| |
| |
| |
| protected $exchange = ''; |
|
|
| |
| |
| |
| protected $queue = ''; |
|
|
| |
| |
| |
| |
| |
| |
| |
| public function __construct(AMQPChannel $channel, $queue, $exchange) |
| { |
| $this->channel = $channel; |
| $this->exchange = $exchange; |
| $this->queue = $queue; |
| } |
|
|
| |
| |
| |
| |
| |
| |
| |
| public function push(Job $job) |
| { |
| $message = new AMQPMessage($job->serialize(), array('content_type' => 'text/plain')); |
| $this->channel->basic_publish($message, $this->exchange); |
| return $this; |
| } |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| public function schedule(Job $job, DateTime $dateTime) |
| { |
| $now = new DateTime(); |
| $when = clone($dateTime); |
| $delay = $when->getTimestamp() - $now->getTimestamp(); |
|
|
| $message = new AMQPMessage($job->serialize(), array('delivery_mode' => 2)); |
| $message->set('application_headers', new AMQPTable(array('x-delay' => $delay))); |
|
|
| $this->channel->basic_publish($message, $this->exchange); |
| return $this; |
| } |
|
|
| |
| |
| |
| |
| |
| |
| public function pull() |
| { |
| $message = null; |
|
|
| $this->channel->basic_consume($this->queue, 'test', false, false, false, false, function ($msg) use (&$message) { |
| $message = $msg; |
| $message->delivery_info['channel']->basic_cancel($message->delivery_info['consumer_tag']); |
| }); |
|
|
| while (count($this->channel->callbacks)) { |
| $this->channel->wait(); |
| } |
|
|
| if ($message === null) { |
| return null; |
| } |
|
|
| $job = new Job(); |
| $job->setId($message->get('delivery_tag')); |
| $job->unserialize($message->getBody()); |
|
|
| return $job; |
| } |
|
|
| |
| |
| |
| |
| |
| |
| |
| public function completed(Job $job) |
| { |
| $this->channel->basic_ack($job->getId()); |
| return $this; |
| } |
|
|
| |
| |
| |
| |
| |
| |
| |
| public function failed(Job $job) |
| { |
| $this->channel->basic_nack($job->getId()); |
| return $this; |
| } |
| } |
|
|