File size: 4,975 Bytes
d8005d7
00b91e0
 
 
 
06ad053
00b91e0
e3a9d8c
 
00b91e0
 
 
 
 
 
 
 
06ad053
00b91e0
 
 
06ad053
00b91e0
 
 
 
06ad053
00b91e0
 
 
 
06ad053
 
 
00b91e0
 
06ad053
 
 
 
00b91e0
 
 
 
bf6d293
00b91e0
 
 
 
 
 
06ad053
00b91e0
 
 
06ad053
00b91e0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
06ad053
 
00b91e0
06ad053
 
00b91e0
06ad053
 
00b91e0
 
 
 
 
 
 
 
06ad053
 
00b91e0
 
 
06ad053
00b91e0
 
 
 
 
 
06ad053
00b91e0
 
 
 
06ad053
00b91e0
 
 
 
06ad053
00b91e0
 
 
 
 
06ad053
00b91e0
 
 
 
 
06ad053
00b91e0
 
 
 
 
 
 
06ad053
00b91e0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
# Brain_v2 — lightweight router/control-plane (no audio proxy)
import os, json, time, asyncio
from typing import Optional, Dict, Any, List, Set

from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Request, Header
from fastapi.responses import JSONResponse, PlainTextResponse
from fastapi.middleware.cors import CORSMiddleware
import httpx

APP_START_TS = time.time()

# -----------------------
# Config
# -----------------------
BRAIN_SECRET = (os.getenv("BRAIN_SHARED_SECRET") or "").strip()
ALLOW_ORIGINS = [o.strip() for o in (os.getenv("ALLOW_ORIGINS") or "*").split(",") if o.strip()]
LOG_EVENTS = (os.getenv("LOG_EVENTS") or "0").strip() == "1"

# Optional hint included in control-plane messages to clients.
# (We do NOT proxy audio; models/clients connect directly.)
FORWARD_TTS_URL = (os.getenv("FORWARD_TTS_URL") or "").strip()

# httpx client (used only if you add out-calls later)
http = httpx.AsyncClient(
    timeout=httpx.Timeout(connect=10.0, read=20.0, write=10.0, pool=10.0)
)

# -----------------------
# App & CORS
# -----------------------
app = FastAPI(title="Brain_v2")

app.add_middleware(
    CORSMiddleware,
    allow_origins=ALLOW_ORIGINS if ALLOW_ORIGINS else ["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# -----------------------
# Broadcast Hub
# -----------------------
class Hub:
    def __init__(self):
        self._peers: Set[WebSocket] = set()
        self._lock = asyncio.Lock()

    async def join(self, ws: WebSocket):
        async with self._lock:
            self._peers.add(ws)

    async def leave(self, ws: WebSocket):
        async with self._lock:
            self._peers.discard(ws)

    async def broadcast(self, payload: Dict[str, Any]):
        data = json.dumps(payload, ensure_ascii=False)
        async with self._lock:
            peers = list(self._peers)
        if LOG_EVENTS:
            print(f"[BRAIN] broadcast → {len(peers)} peer(s): {data[:160]}{'...' if len(data)>160 else ''}", flush=True)
        for ws in peers:
            try:
                await ws.send_text(data)
            except Exception:
                # drop dead peers silently
                await self.leave(ws)

hub = Hub()

# -----------------------
# Health
# -----------------------
@app.get("/")
def root():
    return PlainTextResponse("Brain_v2 — POST /process (x-auth), WS /ws/brain, GET /health")

@app.get("/health")
def health():
    return {
        "ok": True,
        "uptime_sec": round(time.time() - APP_START_TS, 3),
        "bus_peers": len(hub._peers),
        "config": {
            "allow_origins": ALLOW_ORIGINS or ["*"],
            "log_events": LOG_EVENTS,
            "forward_tts_url": bool(FORWARD_TTS_URL),
        },
        "tip": "Subscribe via WS /ws/brain; producers POST to /process with x-auth",
    }

# -----------------------
# Process (Notify in)
# -----------------------
@app.post("/process")
async def process_event(request: Request, x_auth: Optional[str] = Header(None)):
    # Secure the router
    if not BRAIN_SECRET:
        return JSONResponse({"ok": False, "error": "server not configured (missing BRAIN_SHARED_SECRET)"}, status_code=500)
    if (x_auth or "").strip() != BRAIN_SECRET:
        return JSONResponse({"ok": False, "error": "unauthorized"}, status_code=401)

    try:
        body = await request.json()
    except Exception:
        return JSONResponse({"ok": False, "error": "invalid json"}, status_code=400)

    text = (body.get("text") or "").strip()
    meta = body.get("meta") or {}
    if not text:
        return JSONResponse({"ok": False, "error": "empty text"}, status_code=400)

    payload = {
        "event": "final",
        "text": text,
        "meta": meta,
        "ts": time.time(),
    }
    if FORWARD_TTS_URL:
        payload["tts_hint"] = {"ws": FORWARD_TTS_URL}

    await hub.broadcast(payload)
    return {"ok": True, "delivered_to": len(hub._peers)}

# -----------------------
# Brain bus (WS)
# -----------------------
@app.websocket("/ws/brain")
async def ws_brain(ws: WebSocket):
    await ws.accept()
    await hub.join(ws)
    try:
        # passive channel: we don’t require client messages; keepalive pings are fine
        while True:
            try:
                # wait for client pings/messages but time out to stay responsive
                msg = await asyncio.wait_for(ws.receive_text(), timeout=60.0)
                if msg and LOG_EVENTS:
                    print(f"[BRAIN] client msg: {msg[:160]}{'...' if len(msg)>160 else ''}", flush=True)
            except asyncio.TimeoutError:
                # send keepalive ping
                await ws.send_text(json.dumps({"event":"ping","ts": time.time()}))
    except WebSocketDisconnect:
        pass
    except Exception:
        # swallow unexpected during client disconnects
        pass
    finally:
        await hub.leave(ws)
        try:
            await ws.close()
        except Exception:
            pass