Ladybug / unity_bridge.py
Adedoyinjames's picture
Upload 6 files
5374858 verified
import asyncio
import json
import logging
from typing import Any, Dict, Set
from fastapi import APIRouter, WebSocket, WebSocketDisconnect
LOGGER = logging.getLogger(__name__)
class UnityBridge:
"""Bi-directional bridge for humanoid motion commands to Unity clients."""
def __init__(self) -> None:
self._clients: Set[WebSocket] = set()
self.router = APIRouter(prefix="/unity", tags=["unity"])
self.router.add_api_websocket_route("/ws", self.unity_ws)
async def unity_ws(self, websocket: WebSocket) -> None:
await websocket.accept()
self._clients.add(websocket)
try:
while True:
inbound = await websocket.receive_text()
LOGGER.debug("Unity ack: %s", inbound)
except WebSocketDisconnect:
LOGGER.info("Unity client disconnected")
finally:
self._clients.discard(websocket)
async def broadcast_motion(self, action: Dict[str, Any]) -> None:
if not self._clients:
return
payload = {
"type": "mcp_motion",
"gesture": action.get("gesture", "idle"),
"body_motion": action.get("body_motion", "stand"),
"gaze_target": action.get("gaze_target", "student"),
}
dead: Set[WebSocket] = set()
for client in self._clients:
try:
await client.send_text(json.dumps(payload))
except Exception:
dead.add(client)
for client in dead:
self._clients.discard(client)
async def heartbeat(self) -> None:
while True:
await asyncio.sleep(5)
for client in list(self._clients):
try:
await client.send_text(json.dumps({"type": "heartbeat"}))
except Exception:
self._clients.discard(client)