import asyncio import json import logging from typing import Any, Dict, Set from fastapi import APIRouter, WebSocket, WebSocketDisconnect LOGGER = logging.getLogger(__name__) class UnityBridge: """Bi-directional bridge for humanoid motion commands to Unity clients.""" def __init__(self) -> None: self._clients: Set[WebSocket] = set() self.router = APIRouter(prefix="/unity", tags=["unity"]) self.router.add_api_websocket_route("/ws", self.unity_ws) async def unity_ws(self, websocket: WebSocket) -> None: await websocket.accept() self._clients.add(websocket) try: while True: inbound = await websocket.receive_text() LOGGER.debug("Unity ack: %s", inbound) except WebSocketDisconnect: LOGGER.info("Unity client disconnected") finally: self._clients.discard(websocket) async def broadcast_motion(self, action: Dict[str, Any]) -> None: if not self._clients: return payload = { "type": "mcp_motion", "gesture": action.get("gesture", "idle"), "body_motion": action.get("body_motion", "stand"), "gaze_target": action.get("gaze_target", "student"), } dead: Set[WebSocket] = set() for client in self._clients: try: await client.send_text(json.dumps(payload)) except Exception: dead.add(client) for client in dead: self._clients.discard(client) async def heartbeat(self) -> None: while True: await asyncio.sleep(5) for client in list(self._clients): try: await client.send_text(json.dumps({"type": "heartbeat"})) except Exception: self._clients.discard(client)