Spaces:
Runtime error
Runtime error
| import asyncio | |
| from aiokafka import AIOKafkaConsumer, AIOKafkaProducer | |
| from functools import wraps | |
| from typing import Callable, Dict | |
| class AsyncKafkaPubSub: | |
| def __init__(self, bootstrap_servers="ubuntu:9092", group_id="default-py-group"): | |
| self.bootstrap_servers = bootstrap_servers | |
| self.group_id = group_id | |
| self.handlers: Dict[str, Callable[[str], None]] = {} | |
| self._consumer_started = False | |
| self._lock = asyncio.Lock() | |
| self._producer = None | |
| def subscribe(self, topic: str): | |
| print(f"Subscribing to topic: {topic}") | |
| def decorator(func: Callable[[str], None]): | |
| self.handlers[topic] = func | |
| asyncio.create_task(self._ensure_consumer()) | |
| return func | |
| return decorator | |
| async def publish(self, topic: str, message: str): | |
| if not self._producer: | |
| self._producer = AIOKafkaProducer(bootstrap_servers=self.bootstrap_servers) | |
| await self._producer.start() | |
| await self._producer.send_and_wait(topic, message.encode()) | |
| async def _ensure_consumer(self): | |
| async with self._lock: | |
| if self._consumer_started: | |
| return | |
| self._consumer_started = True | |
| await self._start_consumer() | |
| async def _start_consumer(self): | |
| consumer = AIOKafkaConsumer( | |
| *self.handlers.keys(), | |
| bootstrap_servers=self.bootstrap_servers, | |
| group_id=self.group_id, | |
| auto_offset_reset='latest', | |
| ) | |
| await consumer.start() | |
| print(f"Subscribed to: {', '.join(self.handlers.keys())}") | |
| async def listen(): | |
| try: | |
| async for msg in consumer: | |
| print(f"Received message on topic '{msg.topic}': {msg.value.decode()}") | |
| topic = msg.topic | |
| data = msg.value.decode() | |
| handler = self.handlers.get(topic) | |
| if handler: | |
| await self._maybe_async(handler, data) | |
| finally: | |
| await consumer.stop() | |
| asyncio.create_task(listen()) | |
| async def _maybe_async(self, func: Callable, *args, **kwargs): | |
| result = func(*args, **kwargs) | |
| if asyncio.iscoroutine(result): | |
| await result | |
| kafka_pubsub = AsyncKafkaPubSub() | |
| def handle_chat(msg): | |
| print(f"[chat kafka] {msg}") | |