# Brain_v2 — lightweight router/control-plane (no audio proxy) import os, json, time, asyncio from typing import Optional, Dict, Any, List, Set from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Request, Header from fastapi.responses import JSONResponse, PlainTextResponse from fastapi.middleware.cors import CORSMiddleware import httpx APP_START_TS = time.time() # ----------------------- # Config # ----------------------- BRAIN_SECRET = (os.getenv("BRAIN_SHARED_SECRET") or "").strip() ALLOW_ORIGINS = [o.strip() for o in (os.getenv("ALLOW_ORIGINS") or "*").split(",") if o.strip()] LOG_EVENTS = (os.getenv("LOG_EVENTS") or "0").strip() == "1" # Optional hint included in control-plane messages to clients. # (We do NOT proxy audio; models/clients connect directly.) FORWARD_TTS_URL = (os.getenv("FORWARD_TTS_URL") or "").strip() # httpx client (used only if you add out-calls later) http = httpx.AsyncClient( timeout=httpx.Timeout(connect=10.0, read=20.0, write=10.0, pool=10.0) ) # ----------------------- # App & CORS # ----------------------- app = FastAPI(title="Brain_v2") app.add_middleware( CORSMiddleware, allow_origins=ALLOW_ORIGINS if ALLOW_ORIGINS else ["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # ----------------------- # Broadcast Hub # ----------------------- class Hub: def __init__(self): self._peers: Set[WebSocket] = set() self._lock = asyncio.Lock() async def join(self, ws: WebSocket): async with self._lock: self._peers.add(ws) async def leave(self, ws: WebSocket): async with self._lock: self._peers.discard(ws) async def broadcast(self, payload: Dict[str, Any]): data = json.dumps(payload, ensure_ascii=False) async with self._lock: peers = list(self._peers) if LOG_EVENTS: print(f"[BRAIN] broadcast → {len(peers)} peer(s): {data[:160]}{'...' if len(data)>160 else ''}", flush=True) for ws in peers: try: await ws.send_text(data) except Exception: # drop dead peers silently await self.leave(ws) hub = Hub() # ----------------------- # Health # ----------------------- @app.get("/") def root(): return PlainTextResponse("Brain_v2 — POST /process (x-auth), WS /ws/brain, GET /health") @app.get("/health") def health(): return { "ok": True, "uptime_sec": round(time.time() - APP_START_TS, 3), "bus_peers": len(hub._peers), "config": { "allow_origins": ALLOW_ORIGINS or ["*"], "log_events": LOG_EVENTS, "forward_tts_url": bool(FORWARD_TTS_URL), }, "tip": "Subscribe via WS /ws/brain; producers POST to /process with x-auth", } # ----------------------- # Process (Notify in) # ----------------------- @app.post("/process") async def process_event(request: Request, x_auth: Optional[str] = Header(None)): # Secure the router if not BRAIN_SECRET: return JSONResponse({"ok": False, "error": "server not configured (missing BRAIN_SHARED_SECRET)"}, status_code=500) if (x_auth or "").strip() != BRAIN_SECRET: return JSONResponse({"ok": False, "error": "unauthorized"}, status_code=401) try: body = await request.json() except Exception: return JSONResponse({"ok": False, "error": "invalid json"}, status_code=400) text = (body.get("text") or "").strip() meta = body.get("meta") or {} if not text: return JSONResponse({"ok": False, "error": "empty text"}, status_code=400) payload = { "event": "final", "text": text, "meta": meta, "ts": time.time(), } if FORWARD_TTS_URL: payload["tts_hint"] = {"ws": FORWARD_TTS_URL} await hub.broadcast(payload) return {"ok": True, "delivered_to": len(hub._peers)} # ----------------------- # Brain bus (WS) # ----------------------- @app.websocket("/ws/brain") async def ws_brain(ws: WebSocket): await ws.accept() await hub.join(ws) try: # passive channel: we don’t require client messages; keepalive pings are fine while True: try: # wait for client pings/messages but time out to stay responsive msg = await asyncio.wait_for(ws.receive_text(), timeout=60.0) if msg and LOG_EVENTS: print(f"[BRAIN] client msg: {msg[:160]}{'...' if len(msg)>160 else ''}", flush=True) except asyncio.TimeoutError: # send keepalive ping await ws.send_text(json.dumps({"event":"ping","ts": time.time()})) except WebSocketDisconnect: pass except Exception: # swallow unexpected during client disconnects pass finally: await hub.leave(ws) try: await ws.close() except Exception: pass