Spaces:
Runtime error
Runtime error
File size: 2,437 Bytes
4390951 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 |
import asyncio
from aiokafka import AIOKafkaConsumer, AIOKafkaProducer
from functools import wraps
from typing import Callable, Dict
class AsyncKafkaPubSub:
def __init__(self, bootstrap_servers="ubuntu:9092", group_id="default-py-group"):
self.bootstrap_servers = bootstrap_servers
self.group_id = group_id
self.handlers: Dict[str, Callable[[str], None]] = {}
self._consumer_started = False
self._lock = asyncio.Lock()
self._producer = None
def subscribe(self, topic: str):
print(f"Subscribing to topic: {topic}")
def decorator(func: Callable[[str], None]):
self.handlers[topic] = func
asyncio.create_task(self._ensure_consumer())
return func
return decorator
async def publish(self, topic: str, message: str):
if not self._producer:
self._producer = AIOKafkaProducer(bootstrap_servers=self.bootstrap_servers)
await self._producer.start()
await self._producer.send_and_wait(topic, message.encode())
async def _ensure_consumer(self):
async with self._lock:
if self._consumer_started:
return
self._consumer_started = True
await self._start_consumer()
async def _start_consumer(self):
consumer = AIOKafkaConsumer(
*self.handlers.keys(),
bootstrap_servers=self.bootstrap_servers,
group_id=self.group_id,
auto_offset_reset='latest',
)
await consumer.start()
print(f"Subscribed to: {', '.join(self.handlers.keys())}")
async def listen():
try:
async for msg in consumer:
print(f"Received message on topic '{msg.topic}': {msg.value.decode()}")
topic = msg.topic
data = msg.value.decode()
handler = self.handlers.get(topic)
if handler:
await self._maybe_async(handler, data)
finally:
await consumer.stop()
asyncio.create_task(listen())
async def _maybe_async(self, func: Callable, *args, **kwargs):
result = func(*args, **kwargs)
if asyncio.iscoroutine(result):
await result
kafka_pubsub = AsyncKafkaPubSub()
@kafka_pubsub.subscribe("chat")
def handle_chat(msg):
print(f"[chat kafka] {msg}")
|