test_ui / src /open_llm_vtuber /message_handler.py
britto224's picture
Upload 130 files
5669b22 verified
from typing import Dict, Optional, Tuple
import asyncio
from loguru import logger
from collections import defaultdict
class MessageHandler:
def __init__(self):
self._response_events: Dict[
str, Dict[Tuple[str, Optional[str]], asyncio.Event]
] = defaultdict(dict)
self._response_data: Dict[str, Dict[Tuple[str, Optional[str]], dict]] = (
defaultdict(dict)
)
async def wait_for_response(
self,
client_uid: str,
response_type: str,
request_id: str | None = None,
timeout: float | None = None,
) -> Optional[dict]:
"""
Wait for a response of specific type and optional request_id from a client.
Args:
client_uid: Client identifier
response_type: Type of response to wait for
request_id: Optional identifier for the specific request
timeout: Optional timeout in seconds. If None, wait indefinitely
Returns:
Optional[dict]: Response data if received, None if timeout
"""
event = asyncio.Event()
response_key = (response_type, request_id)
self._response_events[client_uid][response_key] = event
try:
if timeout is not None:
# Wait with timeout
await asyncio.wait_for(event.wait(), timeout)
else:
# Wait indefinitely
await event.wait()
return self._response_data[client_uid].pop(response_key, None)
except asyncio.TimeoutError:
logger.warning(
f"Timeout waiting for {response_type} (ID: {request_id}) from {client_uid}"
)
return None
finally:
self._response_events[client_uid].pop(response_key, None)
def handle_message(self, client_uid: str, message: dict) -> None:
"""
Process an incoming message, potentially matching a response event waiting.
Args:
client_uid: Client identifier
message: Message data dictionary, expected to contain 'type' and optionally 'request_id'
"""
msg_type = message.get("type")
request_id = message.get("request_id")
if not msg_type:
return
response_key = (msg_type, request_id)
if (
client_uid in self._response_events
and response_key in self._response_events[client_uid]
):
self._response_data[client_uid][response_key] = message
self._response_events[client_uid][response_key].set()
def cleanup_client(self, client_uid: str) -> None:
"""
Cleanup all events and cached data for a given client.
Args:
client_uid: Client identifier
"""
if client_uid in self._response_events:
for event in self._response_events[client_uid].values():
event.set()
self._response_events.pop(client_uid)
self._response_data.pop(client_uid, None)
message_handler = MessageHandler()