| from typing import Dict, Optional, Tuple
|
| import asyncio
|
| from loguru import logger
|
| from collections import defaultdict
|
|
|
|
|
| class MessageHandler:
|
| def __init__(self):
|
| self._response_events: Dict[
|
| str, Dict[Tuple[str, Optional[str]], asyncio.Event]
|
| ] = defaultdict(dict)
|
| self._response_data: Dict[str, Dict[Tuple[str, Optional[str]], dict]] = (
|
| defaultdict(dict)
|
| )
|
|
|
| async def wait_for_response(
|
| self,
|
| client_uid: str,
|
| response_type: str,
|
| request_id: str | None = None,
|
| timeout: float | None = None,
|
| ) -> Optional[dict]:
|
| """
|
| Wait for a response of specific type and optional request_id from a client.
|
|
|
| Args:
|
| client_uid: Client identifier
|
| response_type: Type of response to wait for
|
| request_id: Optional identifier for the specific request
|
| timeout: Optional timeout in seconds. If None, wait indefinitely
|
|
|
| Returns:
|
| Optional[dict]: Response data if received, None if timeout
|
| """
|
| event = asyncio.Event()
|
| response_key = (response_type, request_id)
|
| self._response_events[client_uid][response_key] = event
|
|
|
| try:
|
| if timeout is not None:
|
|
|
| await asyncio.wait_for(event.wait(), timeout)
|
| else:
|
|
|
| await event.wait()
|
|
|
| return self._response_data[client_uid].pop(response_key, None)
|
| except asyncio.TimeoutError:
|
| logger.warning(
|
| f"Timeout waiting for {response_type} (ID: {request_id}) from {client_uid}"
|
| )
|
| return None
|
| finally:
|
| self._response_events[client_uid].pop(response_key, None)
|
|
|
| def handle_message(self, client_uid: str, message: dict) -> None:
|
| """
|
| Process an incoming message, potentially matching a response event waiting.
|
|
|
| Args:
|
| client_uid: Client identifier
|
| message: Message data dictionary, expected to contain 'type' and optionally 'request_id'
|
| """
|
| msg_type = message.get("type")
|
| request_id = message.get("request_id")
|
| if not msg_type:
|
| return
|
|
|
| response_key = (msg_type, request_id)
|
|
|
| if (
|
| client_uid in self._response_events
|
| and response_key in self._response_events[client_uid]
|
| ):
|
| self._response_data[client_uid][response_key] = message
|
| self._response_events[client_uid][response_key].set()
|
|
|
| def cleanup_client(self, client_uid: str) -> None:
|
| """
|
| Cleanup all events and cached data for a given client.
|
|
|
| Args:
|
| client_uid: Client identifier
|
| """
|
| if client_uid in self._response_events:
|
| for event in self._response_events[client_uid].values():
|
| event.set()
|
| self._response_events.pop(client_uid)
|
| self._response_data.pop(client_uid, None)
|
|
|
|
|
| message_handler = MessageHandler()
|
|
|