Spaces:
Sleeping
Sleeping
| import asyncio | |
| import json | |
| import logging | |
| from typing import Any, Dict, Set | |
| from fastapi import APIRouter, WebSocket, WebSocketDisconnect | |
| LOGGER = logging.getLogger(__name__) | |
| class UnityBridge: | |
| """Bi-directional bridge for humanoid motion commands to Unity clients.""" | |
| def __init__(self) -> None: | |
| self._clients: Set[WebSocket] = set() | |
| self.router = APIRouter(prefix="/unity", tags=["unity"]) | |
| self.router.add_api_websocket_route("/ws", self.unity_ws) | |
| async def unity_ws(self, websocket: WebSocket) -> None: | |
| await websocket.accept() | |
| self._clients.add(websocket) | |
| try: | |
| while True: | |
| inbound = await websocket.receive_text() | |
| LOGGER.debug("Unity ack: %s", inbound) | |
| except WebSocketDisconnect: | |
| LOGGER.info("Unity client disconnected") | |
| finally: | |
| self._clients.discard(websocket) | |
| async def broadcast_motion(self, action: Dict[str, Any]) -> None: | |
| if not self._clients: | |
| return | |
| payload = { | |
| "type": "mcp_motion", | |
| "gesture": action.get("gesture", "idle"), | |
| "body_motion": action.get("body_motion", "stand"), | |
| "gaze_target": action.get("gaze_target", "student"), | |
| } | |
| dead: Set[WebSocket] = set() | |
| for client in self._clients: | |
| try: | |
| await client.send_text(json.dumps(payload)) | |
| except Exception: | |
| dead.add(client) | |
| for client in dead: | |
| self._clients.discard(client) | |
| async def heartbeat(self) -> None: | |
| while True: | |
| await asyncio.sleep(5) | |
| for client in list(self._clients): | |
| try: | |
| await client.send_text(json.dumps({"type": "heartbeat"})) | |
| except Exception: | |
| self._clients.discard(client) | |