File size: 1,953 Bytes
bf292d9
 
2c8368f
bf292d9
2c8368f
bf292d9
2c8368f
 
 
 
 
 
bf292d9
 
 
 
2c8368f
bf292d9
2c8368f
bf292d9
 
 
 
 
2c8368f
 
 
bf292d9
 
 
 
 
2c8368f
bf292d9
 
 
 
2c8368f
 
 
bf292d9
 
2c8368f
 
bf292d9
2c8368f
bf292d9
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
import json
import aio_pika
from typing import Callable, Awaitable, Dict, Any, List, Optional

from models import CloudEvent

Handler = Callable[[Any], Awaitable[None]]  # data can be dict / list / str

class RabbitListenerBase:
    def __init__(self, base, instance_name: str, handlers: Dict[str, Handler]):
        self._base = base
        self._instance_name = instance_name
        self._handlers = handlers
        self._consumers: List[aio_pika.abc.AbstractRobustQueue] = []

    def _qname(self, exchange: str, routing_keys: List[str]) -> str:
        rk_part = "-".join(sorted([rk for rk in (routing_keys or [""]) if rk])) or ""
        suffix = f"-{rk_part}" if rk_part else ""
        return f"{self._instance_name}-{exchange}{suffix}"

    async def start(self, declarations: List[dict]):
        for d in declarations:
            exch = d["ExchangeName"]
            ttl = d.get("MessageTimeout") or None
            rks = d.get("RoutingKeys") or [""]
            qname = self._qname(exch, rks)
            q = await self._base.declare_queue_bind(exchange=exch, queue_name=qname, routing_keys=rks, ttl_ms=ttl)
            await q.consume(self._make_consumer(d["FuncName"]))
            self._consumers.append(q)

    def _make_consumer(self, func_name: str):
        handler = self._handlers.get(func_name)

        async def _on_msg(msg: aio_pika.IncomingMessage):
            async with msg.process():
                try:
                    envelope = json.loads(msg.body.decode("utf-8"))
                    # Validate basic CloudEvent shape without being strict
                    # (C# side doesn’t require strict validation either)
                    data = envelope.get("data", None)
                    if handler:
                        await handler(data)
                except Exception:
                    # swallow to avoid redelivery storms; log if you wire a logger
                    pass

        return _on_msg