File size: 2,545 Bytes
4b63644 4e9265f 4b63644 5a95d9a 4b63644 4e9265f 4b63644 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 | from typing import Callable
import pika
from pika import BlockingConnection, spec
from pika.adapters.blocking_connection import BlockingChannel
from pika.exceptions import StreamLostError
class MessageQueue:
__channel: BlockingChannel
__connection: BlockingConnection
__callback: Callable[[BlockingChannel, spec.Basic.Deliver, spec.BasicProperties, bytes], None]
def __init__(self, host: str, user: str, password: str, queue_name: str) -> None:
self.__host = host
self.__user = user
self.__password = password
self.__queue_name = queue_name
self.connect()
# create queue if necessary
self.__channel.queue_declare(queue=self.__queue_name, durable=True)
def close(self) -> None:
self.__connection.close()
def connect(self) -> None:
# print(f"Connecting to {self.__queue_name}")
self.__connection = pika.BlockingConnection(
pika.ConnectionParameters(
credentials=pika.PlainCredentials(self.__user, self.__password),
host=self.__host
))
self.__channel = self.__connection.channel()
def send_message(self, message: bytes) -> None:
# message = ' '.join(sys.argv[1:]) or "Hello World!"
def __send():
self.__channel.basic_publish(
exchange='',
routing_key=self.__queue_name,
body=message,
properties=pika.BasicProperties(
delivery_mode=pika.DeliveryMode.Persistent
))
print(f"[PUBLISH] Queued message in {self.__queue_name}")
# try repeating once, if connection was lost
retry = True
while retry:
try:
__send()
retry = False
except StreamLostError as e:
print("Reconnecting...")
if retry:
self.connect()
__send()
retry = False
else:
raise e
# print(f" [x] Sent {message}")
# connection.close()
def __ack_and_call_callback(
self,
ch: BlockingChannel,
method: spec.Basic.Deliver,
properties: spec.BasicProperties,
body: bytes) -> None:
# acknowledge processing directly
ch.basic_ack(delivery_tag=method.delivery_tag)
# print(f" [x] Received {body.decode()}")
self.__callback(ch, method, properties, body)
# print(" [x] Done")
|