Spaces:
Paused
Paused
File size: 3,662 Bytes
4a2ab42 fa7464f 4a2ab42 fa7464f 4a2ab42 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 | import asyncio
import contextlib
import logging
import uuid
from abc import ABC, abstractmethod
from collections.abc import Callable
from datetime import datetime
from typing import Any
logger = logging.getLogger(__name__)
class MessageQueueInterface(ABC):
"""Abstract interface for Message Queue implementations (RabbitMQ, Kafka, etc.)"""
@abstractmethod
async def connect(self) -> bool:
"""Establish connection to the message broker."""
@abstractmethod
async def publish(self, topic: str, message: dict[str, Any]) -> bool:
"""Publish a message to a topic/queue."""
@abstractmethod
async def subscribe(
self, topic: str, handler: Callable[[dict[str, Any]], None]
) -> bool:
"""Subscribe to a topic with a handler function."""
@abstractmethod
async def close(self):
"""Close connection."""
class InMemoryMessageQueue(MessageQueueInterface):
"""
In-memory implementation for development/testing.
Simulates a message broker without external dependencies.
"""
def __init__(self):
self._subscribers: dict[str, list[Callable]] = {}
self._connected = False
self._queue: asyncio.Queue = asyncio.Queue()
self._worker_task: asyncio.Task | None = None
async def connect(self) -> bool:
self._connected = True
logger.info("[MQ] In-memory message queue connected")
self._worker_task = asyncio.create_task(self._process_queue())
return True
async def publish(self, topic: str, message: dict[str, Any]) -> bool:
if not self._connected:
logger.warning("[MQ] Cannot publish, not connected")
return False
payload = {
"id": str(uuid.uuid4()),
"topic": topic,
"timestamp": datetime.utcnow().isoformat(),
"data": message,
}
await self._queue.put(payload)
logger.debug(f"[MQ] Published to {topic}: {message.keys()}")
return True
async def subscribe(
self, topic: str, handler: Callable[[dict[str, Any]], None]
) -> bool:
if topic not in self._subscribers:
self._subscribers[topic] = []
self._subscribers[topic].append(handler)
logger.info(f"[MQ] Subscribed to {topic}")
return True
async def _process_queue(self):
"""Background worker to process messages"""
while self._connected:
try:
payload = await self._queue.get()
topic = payload["topic"]
data = payload["data"]
if topic in self._subscribers:
for handler in self._subscribers[topic]:
try:
# Support both async and sync handlers
if asyncio.iscoroutinefunction(handler):
await handler(data)
else:
handler(data)
except Exception as e:
logger.error(f"[MQ] Handler error for {topic}: {e}")
self._queue.task_done()
except asyncio.CancelledError:
break
except Exception as e:
logger.error(f"[MQ] Worker error: {e}")
async def close(self):
self._connected = False
if self._worker_task:
self._worker_task.cancel()
with contextlib.suppress(asyncio.CancelledError):
await self._worker_task
logger.info("[MQ] In-memory MQ closed")
# Singleton instance
mq_service = InMemoryMessageQueue()
|