|
|
|
|
|
import os, json, time, asyncio |
|
|
from typing import Optional, Dict, Any, List, Set |
|
|
|
|
|
from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Request, Header |
|
|
from fastapi.responses import JSONResponse, PlainTextResponse |
|
|
from fastapi.middleware.cors import CORSMiddleware |
|
|
import httpx |
|
|
|
|
|
APP_START_TS = time.time() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
BRAIN_SECRET = (os.getenv("BRAIN_SHARED_SECRET") or "").strip() |
|
|
ALLOW_ORIGINS = [o.strip() for o in (os.getenv("ALLOW_ORIGINS") or "*").split(",") if o.strip()] |
|
|
LOG_EVENTS = (os.getenv("LOG_EVENTS") or "0").strip() == "1" |
|
|
|
|
|
|
|
|
|
|
|
FORWARD_TTS_URL = (os.getenv("FORWARD_TTS_URL") or "").strip() |
|
|
|
|
|
|
|
|
http = httpx.AsyncClient( |
|
|
timeout=httpx.Timeout(connect=10.0, read=20.0, write=10.0, pool=10.0) |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
app = FastAPI(title="Brain_v2") |
|
|
|
|
|
app.add_middleware( |
|
|
CORSMiddleware, |
|
|
allow_origins=ALLOW_ORIGINS if ALLOW_ORIGINS else ["*"], |
|
|
allow_credentials=True, |
|
|
allow_methods=["*"], |
|
|
allow_headers=["*"], |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class Hub: |
|
|
def __init__(self): |
|
|
self._peers: Set[WebSocket] = set() |
|
|
self._lock = asyncio.Lock() |
|
|
|
|
|
async def join(self, ws: WebSocket): |
|
|
async with self._lock: |
|
|
self._peers.add(ws) |
|
|
|
|
|
async def leave(self, ws: WebSocket): |
|
|
async with self._lock: |
|
|
self._peers.discard(ws) |
|
|
|
|
|
async def broadcast(self, payload: Dict[str, Any]): |
|
|
data = json.dumps(payload, ensure_ascii=False) |
|
|
async with self._lock: |
|
|
peers = list(self._peers) |
|
|
if LOG_EVENTS: |
|
|
print(f"[BRAIN] broadcast → {len(peers)} peer(s): {data[:160]}{'...' if len(data)>160 else ''}", flush=True) |
|
|
for ws in peers: |
|
|
try: |
|
|
await ws.send_text(data) |
|
|
except Exception: |
|
|
|
|
|
await self.leave(ws) |
|
|
|
|
|
hub = Hub() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@app.get("/") |
|
|
def root(): |
|
|
return PlainTextResponse("Brain_v2 — POST /process (x-auth), WS /ws/brain, GET /health") |
|
|
|
|
|
@app.get("/health") |
|
|
def health(): |
|
|
return { |
|
|
"ok": True, |
|
|
"uptime_sec": round(time.time() - APP_START_TS, 3), |
|
|
"bus_peers": len(hub._peers), |
|
|
"config": { |
|
|
"allow_origins": ALLOW_ORIGINS or ["*"], |
|
|
"log_events": LOG_EVENTS, |
|
|
"forward_tts_url": bool(FORWARD_TTS_URL), |
|
|
}, |
|
|
"tip": "Subscribe via WS /ws/brain; producers POST to /process with x-auth", |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@app.post("/process") |
|
|
async def process_event(request: Request, x_auth: Optional[str] = Header(None)): |
|
|
|
|
|
if not BRAIN_SECRET: |
|
|
return JSONResponse({"ok": False, "error": "server not configured (missing BRAIN_SHARED_SECRET)"}, status_code=500) |
|
|
if (x_auth or "").strip() != BRAIN_SECRET: |
|
|
return JSONResponse({"ok": False, "error": "unauthorized"}, status_code=401) |
|
|
|
|
|
try: |
|
|
body = await request.json() |
|
|
except Exception: |
|
|
return JSONResponse({"ok": False, "error": "invalid json"}, status_code=400) |
|
|
|
|
|
text = (body.get("text") or "").strip() |
|
|
meta = body.get("meta") or {} |
|
|
if not text: |
|
|
return JSONResponse({"ok": False, "error": "empty text"}, status_code=400) |
|
|
|
|
|
payload = { |
|
|
"event": "final", |
|
|
"text": text, |
|
|
"meta": meta, |
|
|
"ts": time.time(), |
|
|
} |
|
|
if FORWARD_TTS_URL: |
|
|
payload["tts_hint"] = {"ws": FORWARD_TTS_URL} |
|
|
|
|
|
await hub.broadcast(payload) |
|
|
return {"ok": True, "delivered_to": len(hub._peers)} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@app.websocket("/ws/brain") |
|
|
async def ws_brain(ws: WebSocket): |
|
|
await ws.accept() |
|
|
await hub.join(ws) |
|
|
try: |
|
|
|
|
|
while True: |
|
|
try: |
|
|
|
|
|
msg = await asyncio.wait_for(ws.receive_text(), timeout=60.0) |
|
|
if msg and LOG_EVENTS: |
|
|
print(f"[BRAIN] client msg: {msg[:160]}{'...' if len(msg)>160 else ''}", flush=True) |
|
|
except asyncio.TimeoutError: |
|
|
|
|
|
await ws.send_text(json.dumps({"event":"ping","ts": time.time()})) |
|
|
except WebSocketDisconnect: |
|
|
pass |
|
|
except Exception: |
|
|
|
|
|
pass |
|
|
finally: |
|
|
await hub.leave(ws) |
|
|
try: |
|
|
await ws.close() |
|
|
except Exception: |
|
|
pass |