| | <?php |
| |
|
| | namespace SimpleQueue\Adapter; |
| |
|
| | use DateTime; |
| | use PhpAmqpLib\Channel\AMQPChannel; |
| | use PhpAmqpLib\Message\AMQPMessage; |
| | use PhpAmqpLib\Wire\AMQPTable; |
| | use SimpleQueue\Job; |
| | use SimpleQueue\QueueAdapterInterface; |
| |
|
| | |
| | |
| | |
| | |
| | |
| | class AmqpQueueAdapter implements QueueAdapterInterface |
| | { |
| | |
| | |
| | |
| | protected $channel; |
| |
|
| | |
| | |
| | |
| | protected $exchange = ''; |
| |
|
| | |
| | |
| | |
| | protected $queue = ''; |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | public function __construct(AMQPChannel $channel, $queue, $exchange) |
| | { |
| | $this->channel = $channel; |
| | $this->exchange = $exchange; |
| | $this->queue = $queue; |
| | } |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | public function push(Job $job) |
| | { |
| | $message = new AMQPMessage($job->serialize(), array('content_type' => 'text/plain')); |
| | $this->channel->basic_publish($message, $this->exchange); |
| | return $this; |
| | } |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | public function schedule(Job $job, DateTime $dateTime) |
| | { |
| | $now = new DateTime(); |
| | $when = clone($dateTime); |
| | $delay = $when->getTimestamp() - $now->getTimestamp(); |
| |
|
| | $message = new AMQPMessage($job->serialize(), array('delivery_mode' => 2)); |
| | $message->set('application_headers', new AMQPTable(array('x-delay' => $delay))); |
| |
|
| | $this->channel->basic_publish($message, $this->exchange); |
| | return $this; |
| | } |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | public function pull() |
| | { |
| | $message = null; |
| |
|
| | $this->channel->basic_consume($this->queue, 'test', false, false, false, false, function ($msg) use (&$message) { |
| | $message = $msg; |
| | $message->delivery_info['channel']->basic_cancel($message->delivery_info['consumer_tag']); |
| | }); |
| |
|
| | while (count($this->channel->callbacks)) { |
| | $this->channel->wait(); |
| | } |
| |
|
| | if ($message === null) { |
| | return null; |
| | } |
| |
|
| | $job = new Job(); |
| | $job->setId($message->get('delivery_tag')); |
| | $job->unserialize($message->getBody()); |
| |
|
| | return $job; |
| | } |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | public function completed(Job $job) |
| | { |
| | $this->channel->basic_ack($job->getId()); |
| | return $this; |
| | } |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | public function failed(Job $job) |
| | { |
| | $this->channel->basic_nack($job->getId()); |
| | return $this; |
| | } |
| | } |
| |
|