from typing import Callable import pika from pika import BlockingConnection, spec from pika.adapters.blocking_connection import BlockingChannel from pika.exceptions import StreamLostError class MessageQueue: __channel: BlockingChannel __connection: BlockingConnection __callback: Callable[[BlockingChannel, spec.Basic.Deliver, spec.BasicProperties, bytes], None] def __init__(self, host: str, user: str, password: str, queue_name: str) -> None: self.__host = host self.__user = user self.__password = password self.__queue_name = queue_name self.connect() # create queue if necessary self.__channel.queue_declare(queue=self.__queue_name, durable=True) def close(self) -> None: self.__connection.close() def connect(self) -> None: # print(f"Connecting to {self.__queue_name}") self.__connection = pika.BlockingConnection( pika.ConnectionParameters( credentials=pika.PlainCredentials(self.__user, self.__password), host=self.__host )) self.__channel = self.__connection.channel() def send_message(self, message: bytes) -> None: # message = ' '.join(sys.argv[1:]) or "Hello World!" def __send(): self.__channel.basic_publish( exchange='', routing_key=self.__queue_name, body=message, properties=pika.BasicProperties( delivery_mode=pika.DeliveryMode.Persistent )) print(f"[PUBLISH] Queued message in {self.__queue_name}") # try repeating once, if connection was lost retry = True while retry: try: __send() retry = False except StreamLostError as e: print("Reconnecting...") if retry: self.connect() __send() retry = False else: raise e # print(f" [x] Sent {message}") # connection.close() def __ack_and_call_callback( self, ch: BlockingChannel, method: spec.Basic.Deliver, properties: spec.BasicProperties, body: bytes) -> None: # acknowledge processing directly ch.basic_ack(delivery_tag=method.delivery_tag) # print(f" [x] Received {body.decode()}") self.__callback(ch, method, properties, body) # print(" [x] Done")