File size: 3,662 Bytes
4a2ab42
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
fa7464f
 
 
4a2ab42
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
fa7464f
 
 
4a2ab42
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
import asyncio
import contextlib
import logging
import uuid
from abc import ABC, abstractmethod
from collections.abc import Callable
from datetime import datetime
from typing import Any

logger = logging.getLogger(__name__)


class MessageQueueInterface(ABC):
    """Abstract interface for Message Queue implementations (RabbitMQ, Kafka, etc.)"""

    @abstractmethod
    async def connect(self) -> bool:
        """Establish connection to the message broker."""

    @abstractmethod
    async def publish(self, topic: str, message: dict[str, Any]) -> bool:
        """Publish a message to a topic/queue."""

    @abstractmethod
    async def subscribe(
        self, topic: str, handler: Callable[[dict[str, Any]], None]
    ) -> bool:
        """Subscribe to a topic with a handler function."""

    @abstractmethod
    async def close(self):
        """Close connection."""


class InMemoryMessageQueue(MessageQueueInterface):
    """
    In-memory implementation for development/testing.
    Simulates a message broker without external dependencies.
    """

    def __init__(self):
        self._subscribers: dict[str, list[Callable]] = {}
        self._connected = False
        self._queue: asyncio.Queue = asyncio.Queue()
        self._worker_task: asyncio.Task | None = None

    async def connect(self) -> bool:
        self._connected = True
        logger.info("[MQ] In-memory message queue connected")
        self._worker_task = asyncio.create_task(self._process_queue())
        return True

    async def publish(self, topic: str, message: dict[str, Any]) -> bool:
        if not self._connected:
            logger.warning("[MQ] Cannot publish, not connected")
            return False

        payload = {
            "id": str(uuid.uuid4()),
            "topic": topic,
            "timestamp": datetime.utcnow().isoformat(),
            "data": message,
        }
        await self._queue.put(payload)
        logger.debug(f"[MQ] Published to {topic}: {message.keys()}")
        return True

    async def subscribe(
        self, topic: str, handler: Callable[[dict[str, Any]], None]
    ) -> bool:
        if topic not in self._subscribers:
            self._subscribers[topic] = []
        self._subscribers[topic].append(handler)
        logger.info(f"[MQ] Subscribed to {topic}")
        return True

    async def _process_queue(self):
        """Background worker to process messages"""
        while self._connected:
            try:
                payload = await self._queue.get()
                topic = payload["topic"]
                data = payload["data"]

                if topic in self._subscribers:
                    for handler in self._subscribers[topic]:
                        try:
                            # Support both async and sync handlers
                            if asyncio.iscoroutinefunction(handler):
                                await handler(data)
                            else:
                                handler(data)
                        except Exception as e:
                            logger.error(f"[MQ] Handler error for {topic}: {e}")

                self._queue.task_done()
            except asyncio.CancelledError:
                break
            except Exception as e:
                logger.error(f"[MQ] Worker error: {e}")

    async def close(self):
        self._connected = False
        if self._worker_task:
            self._worker_task.cancel()
            with contextlib.suppress(asyncio.CancelledError):
                await self._worker_task
        logger.info("[MQ] In-memory MQ closed")


# Singleton instance
mq_service = InMemoryMessageQueue()