| import asyncio |
| from typing import List, AsyncGenerator |
|
|
| '''ObserverClient asynchronously handles events in queue populated by an ObserverServer''' |
| class BaseObserverClient(): |
| def __init__(self, server = None): |
| self.server = None |
| if server: |
| self.listen(server) |
| |
| self.queue = asyncio.Queue() |
| self.event_listener = None |
|
|
| def listen(self, server): |
| if self.server: |
| self.close() |
|
|
| self.server = server |
| self.server.join(self) |
| |
| self.event_listener = asyncio.create_task(self._event_listener()) |
|
|
| def close(self): |
| self.server.detach(self) |
| self.server = None |
| |
| async def _event_listener(self): |
| while True: |
| next_event = await self.queue.get() |
| await self.handle_event(next_event['event'], next_event['payload']) |
| |
| |
| async def handle_event(self, event_id: str, payload) -> None: |
| raise NotImplementedError |
|
|
| '''ObserverServer adds events and payloads to all listening client queues.''' |
| class ObserverServer(): |
| def __init__(self): |
| self.clients: List[ObserverClient] = [] |
|
|
| def join(self, client): |
| if client not in self.clients: |
| self.clients.append(client) |
|
|
| def detach(self, client): |
| if client in self.clients: |
| self.clients.remove(client) |
|
|
| async def broadcast_event(self, event_id: str, payload: dict = {}): |
| for client in self.clients: |
| await client.queue.put({ |
| "event": event_id, |
| "payload": payload |
| }) |
| |
| async def broadcast_stream(self, event_id: str, payload_stream: AsyncGenerator): |
| async for payload in payload_stream: |
| for client in self.clients: |
| await client.queue.put({ |
| "event": event_id, |
| "payload": payload |
| }) |