demo / src /wrapper /message_queue.py
ElmiraManavi
create evaluation page
5a95d9a
from typing import Callable
import pika
from pika import BlockingConnection, spec
from pika.adapters.blocking_connection import BlockingChannel
from pika.exceptions import StreamLostError
class MessageQueue:
__channel: BlockingChannel
__connection: BlockingConnection
__callback: Callable[[BlockingChannel, spec.Basic.Deliver, spec.BasicProperties, bytes], None]
def __init__(self, host: str, user: str, password: str, queue_name: str) -> None:
self.__host = host
self.__user = user
self.__password = password
self.__queue_name = queue_name
self.connect()
# create queue if necessary
self.__channel.queue_declare(queue=self.__queue_name, durable=True)
def close(self) -> None:
self.__connection.close()
def connect(self) -> None:
# print(f"Connecting to {self.__queue_name}")
self.__connection = pika.BlockingConnection(
pika.ConnectionParameters(
credentials=pika.PlainCredentials(self.__user, self.__password),
host=self.__host
))
self.__channel = self.__connection.channel()
def send_message(self, message: bytes) -> None:
# message = ' '.join(sys.argv[1:]) or "Hello World!"
def __send():
self.__channel.basic_publish(
exchange='',
routing_key=self.__queue_name,
body=message,
properties=pika.BasicProperties(
delivery_mode=pika.DeliveryMode.Persistent
))
print(f"[PUBLISH] Queued message in {self.__queue_name}")
# try repeating once, if connection was lost
retry = True
while retry:
try:
__send()
retry = False
except StreamLostError as e:
print("Reconnecting...")
if retry:
self.connect()
__send()
retry = False
else:
raise e
# print(f" [x] Sent {message}")
# connection.close()
def __ack_and_call_callback(
self,
ch: BlockingChannel,
method: spec.Basic.Deliver,
properties: spec.BasicProperties,
body: bytes) -> None:
# acknowledge processing directly
ch.basic_ack(delivery_tag=method.delivery_tag)
# print(f" [x] Received {body.decode()}")
self.__callback(ch, method, properties, body)
# print(" [x] Done")