File size: 2,437 Bytes
4390951
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
import asyncio
from aiokafka import AIOKafkaConsumer, AIOKafkaProducer
from functools import wraps
from typing import Callable, Dict


class AsyncKafkaPubSub:
    def __init__(self, bootstrap_servers="ubuntu:9092", group_id="default-py-group"):
        self.bootstrap_servers = bootstrap_servers
        self.group_id = group_id
        self.handlers: Dict[str, Callable[[str], None]] = {}
        self._consumer_started = False
        self._lock = asyncio.Lock()
        self._producer = None

    def subscribe(self, topic: str):
        print(f"Subscribing to topic: {topic}")
        def decorator(func: Callable[[str], None]):
            self.handlers[topic] = func
            asyncio.create_task(self._ensure_consumer())
            return func
        return decorator

    async def publish(self, topic: str, message: str):
        if not self._producer:
            self._producer = AIOKafkaProducer(bootstrap_servers=self.bootstrap_servers)
            await self._producer.start()
        await self._producer.send_and_wait(topic, message.encode())

    async def _ensure_consumer(self):
        async with self._lock:
            if self._consumer_started:
                return
            self._consumer_started = True
            await self._start_consumer()

    async def _start_consumer(self):
        consumer = AIOKafkaConsumer(
            *self.handlers.keys(),
            bootstrap_servers=self.bootstrap_servers,
            group_id=self.group_id,
            auto_offset_reset='latest',
        )
        await consumer.start()
        print(f"Subscribed to: {', '.join(self.handlers.keys())}")

        async def listen():
            try:
                async for msg in consumer:
                    print(f"Received message on topic '{msg.topic}': {msg.value.decode()}")
                    topic = msg.topic
                    data = msg.value.decode()
                    handler = self.handlers.get(topic)
                    if handler:
                        await self._maybe_async(handler, data)
            finally:
                await consumer.stop()

        asyncio.create_task(listen())

    async def _maybe_async(self, func: Callable, *args, **kwargs):
        result = func(*args, **kwargs)
        if asyncio.iscoroutine(result):
            await result


kafka_pubsub = AsyncKafkaPubSub()


@kafka_pubsub.subscribe("chat")
def handle_chat(msg):
    print(f"[chat kafka] {msg}")