| | from typing import Callable |
| |
|
| | import pika |
| | from pika import BlockingConnection, spec |
| | from pika.adapters.blocking_connection import BlockingChannel |
| | from pika.exceptions import StreamLostError |
| |
|
| |
|
| | class MessageQueue: |
| | __channel: BlockingChannel |
| | __connection: BlockingConnection |
| | __callback: Callable[[BlockingChannel, spec.Basic.Deliver, spec.BasicProperties, bytes], None] |
| |
|
| | def __init__(self, host: str, user: str, password: str, queue_name: str) -> None: |
| | self.__host = host |
| | self.__user = user |
| | self.__password = password |
| | self.__queue_name = queue_name |
| | self.connect() |
| | |
| | self.__channel.queue_declare(queue=self.__queue_name, durable=True) |
| |
|
| | def close(self) -> None: |
| | self.__connection.close() |
| |
|
| | def connect(self) -> None: |
| | |
| | self.__connection = pika.BlockingConnection( |
| | pika.ConnectionParameters( |
| | credentials=pika.PlainCredentials(self.__user, self.__password), |
| | host=self.__host |
| | )) |
| | self.__channel = self.__connection.channel() |
| |
|
| | def send_message(self, message: bytes) -> None: |
| | |
| | def __send(): |
| | self.__channel.basic_publish( |
| | exchange='', |
| | routing_key=self.__queue_name, |
| | body=message, |
| | properties=pika.BasicProperties( |
| | delivery_mode=pika.DeliveryMode.Persistent |
| | )) |
| | print(f"[PUBLISH] Queued message in {self.__queue_name}") |
| |
|
| | |
| | retry = True |
| | while retry: |
| | try: |
| | __send() |
| | retry = False |
| | except StreamLostError as e: |
| | print("Reconnecting...") |
| | if retry: |
| | self.connect() |
| | __send() |
| | retry = False |
| | else: |
| | raise e |
| | |
| | |
| |
|
| | def __ack_and_call_callback( |
| | self, |
| | ch: BlockingChannel, |
| | method: spec.Basic.Deliver, |
| | properties: spec.BasicProperties, |
| | body: bytes) -> None: |
| | |
| | ch.basic_ack(delivery_tag=method.delivery_tag) |
| | |
| | self.__callback(ch, method, properties, body) |
| | |
| |
|