Percy3822 commited on
Commit
00b91e0
·
verified ·
1 Parent(s): 2ef1a2f

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +128 -79
app.py CHANGED
@@ -1,109 +1,158 @@
1
- import os, json
2
- from typing import Dict, Any
3
- from fastapi import FastAPI, Body, Request
4
- from fastapi.middleware.cors import CORSMiddleware
 
 
 
 
 
5
  from fastapi.responses import JSONResponse, PlainTextResponse
 
6
  import httpx
7
 
8
- # Stable AsyncClient with explicit 4-part timeout
9
- _client = httpx.AsyncClient(
10
- timeout=httpx.Timeout(
11
- connect=15.0, # max time to connect
12
- read=30.0, # max time to wait for response data
13
- write=15.0, # max time to send request body
14
- pool=15.0 # max time to get a connection from the pool
15
- )
16
- )
17
 
18
- # ========= Config (set via Secrets) =========
19
- TTS_BASE = os.getenv("TTS_BASE", "https://Percy3822-ActualTTS.hf.space")
20
- PY_AI_URL = os.getenv("PY_AI_URL", "https://Percy3822-Python_ai.hf.space") # optional/future
21
- GODOT_AI_URL = os.getenv("GODOT_AI_URL", "https://Percy3822-Godot_ai.hf.space") # optional/future
22
- SECRET = os.getenv("BRAIN_SHARED_SECRET") # same header "x-auth" for internal calls
23
 
24
- ROUTES = {
25
- "python": PY_AI_URL,
26
- "godot": GODOT_AI_URL,
27
- "chat": None, # handled client-side (or future Chat module)
28
- }
29
 
30
- app = FastAPI(title="Brain_v2 (Router-Only)", version="2.0.0")
 
 
 
31
 
32
- # CORS for your local app; restrict later if desired
33
  app.add_middleware(
34
  CORSMiddleware,
35
- allow_origins=["*"],
 
36
  allow_methods=["*"],
37
  allow_headers=["*"],
38
  )
39
 
40
- _client = httpx.AsyncClient(timeout=httpx.Timeout(connect=15, read=None, write=15))
 
 
 
 
 
 
 
 
 
 
41
 
42
- def _authorized(req: Request) -> bool:
43
- return not SECRET or req.headers.get("x-auth") == SECRET
 
44
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
45
  @app.get("/")
46
  def root():
47
- return PlainTextResponse("Brain_v2 router — POST /process to get stream URLs")
48
 
49
  @app.get("/health")
50
- async def health():
51
  return {
52
  "ok": True,
53
- "routes": {k: (v or "client") for k, v in ROUTES.items()},
54
- "tts_ws": f"{TTS_BASE}/ws/tts",
55
- "version": "2.0.0"
 
 
 
 
 
56
  }
57
 
 
 
 
58
  @app.post("/process")
59
- async def process(payload: Dict[str, Any] = Body(...), request: Request = None):
60
- """
61
- Input: { "text": "...", "module":"auto|python|godot|chat" }
62
- Output: { ok, route, stream_url, tts_url }
63
- - Client connects directly to stream_url for live TEXT tokens
64
- - Client connects directly to tts_url for AUDIO (WS) when needed
65
- """
66
- if request and not _authorized(request):
67
- return JSONResponse({"error": "unauthorized"}, status_code=401)
68
-
69
- text = (payload.get("text") or "").strip()
70
- module = (payload.get("module") or "auto").lower()
71
- if not text:
72
- return JSONResponse({"error": "empty text"}, status_code=400)
73
 
74
- # Lightweight routing
75
- low = text.lower()
76
- route = "chat"
77
- if module in ("python", "godot", "chat"):
78
- route = module
79
- elif low.startswith(("python:", "py:")):
80
- route = "python"
81
- elif low.startswith(("godot:", "gd:")):
82
- route = "godot"
83
 
84
- target = ROUTES.get(route)
85
- stream_url = f"{target}/ws/stream" if target else None
 
 
86
 
87
- return {
88
- "ok": True,
89
- "route": route,
90
- "stream_url": stream_url, # direct model → client (text)
91
- "tts_url": f"{TTS_BASE}/ws/tts", # direct TTS → client (audio)
92
- "note": "Connect directly to these URLs for streaming. Brain does not proxy."
93
  }
 
 
 
 
 
94
 
95
- # Optional debug: say text via TTS to test wiring (non-streaming HTTP)
96
- @app.post("/say")
97
- async def say(payload: Dict[str, Any] = Body(...), request: Request = None):
98
- if request and not _authorized(request):
99
- return JSONResponse({"error": "unauthorized"}, status_code=401)
100
- txt = (payload.get("text") or "").strip()
101
- if not txt:
102
- return JSONResponse({"error": "empty text"}, status_code=400)
103
  try:
104
- async with _client.stream("POST", f"{TTS_BASE}/speak.wav", json={"text": txt}) as r:
105
- r.raise_for_status()
106
- # we don’t proxy bytes here; caller just checks 200 OK for warmup
107
- return {"ok": True}
108
- except Exception as e:
109
- return JSONResponse({"error": str(e)}, status_code=502)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ ---
2
+
3
+ # app.py
4
+ ```python
5
+ # Brain_v2 — lightweight router/control-plane (no audio proxy)
6
+ import os, json, time, asyncio
7
+ from typing import Optional, Dict, Any, List, Set
8
+
9
+ from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Request, Header
10
  from fastapi.responses import JSONResponse, PlainTextResponse
11
+ from fastapi.middleware.cors import CORSMiddleware
12
  import httpx
13
 
14
+ APP_START_TS = time.time()
15
+
16
+ # -----------------------
17
+ # Config
18
+ # -----------------------
19
+ BRAIN_SECRET = (os.getenv("BRAIN_SHARED_SECRET") or "").strip()
20
+ ALLOW_ORIGINS = [o.strip() for o in (os.getenv("ALLOW_ORIGINS") or "*").split(",") if o.strip()]
21
+ LOG_EVENTS = (os.getenv("LOG_EVENTS") or "0").strip() == "1"
 
22
 
23
+ # Optional hint included in control-plane messages to clients.
24
+ # (We do NOT proxy audio; models/clients connect directly.)
25
+ FORWARD_TTS_URL = (os.getenv("FORWARD_TTS_URL") or "").strip()
 
 
26
 
27
+ # httpx client (used only if you add out-calls later)
28
+ http = httpx.AsyncClient(
29
+ timeout=httpx.Timeout(connect=10.0, read=20.0, write=10.0, pool=10.0)
30
+ )
 
31
 
32
+ # -----------------------
33
+ # App & CORS
34
+ # -----------------------
35
+ app = FastAPI(title="Brain_v2")
36
 
 
37
  app.add_middleware(
38
  CORSMiddleware,
39
+ allow_origins=ALLOW_ORIGINS if ALLOW_ORIGINS else ["*"],
40
+ allow_credentials=True,
41
  allow_methods=["*"],
42
  allow_headers=["*"],
43
  )
44
 
45
+ # -----------------------
46
+ # Broadcast Hub
47
+ # -----------------------
48
+ class Hub:
49
+ def _init_(self):
50
+ self._peers: Set[WebSocket] = set()
51
+ self._lock = asyncio.Lock()
52
+
53
+ async def join(self, ws: WebSocket):
54
+ async with self._lock:
55
+ self._peers.add(ws)
56
 
57
+ async def leave(self, ws: WebSocket):
58
+ async with self._lock:
59
+ self._peers.discard(ws)
60
 
61
+ async def broadcast(self, payload: Dict[str, Any]):
62
+ data = json.dumps(payload, ensure_ascii=False)
63
+ async with self._lock:
64
+ peers = list(self._peers)
65
+ if LOG_EVENTS:
66
+ print(f"[BRAIN] broadcast → {len(peers)} peer(s): {data[:160]}{'...' if len(data)>160 else ''}", flush=True)
67
+ for ws in peers:
68
+ try:
69
+ await ws.send_text(data)
70
+ except Exception:
71
+ # drop dead peers silently
72
+ await self.leave(ws)
73
+
74
+ hub = Hub()
75
+
76
+ # -----------------------
77
+ # Health
78
+ # -----------------------
79
  @app.get("/")
80
  def root():
81
+ return PlainTextResponse("Brain_v2 — POST /process (x-auth), WS /ws/brain, GET /health")
82
 
83
  @app.get("/health")
84
+ def health():
85
  return {
86
  "ok": True,
87
+ "uptime_sec": round(time.time() - APP_START_TS, 3),
88
+ "bus_peers": len(hub._peers),
89
+ "config": {
90
+ "allow_origins": ALLOW_ORIGINS or ["*"],
91
+ "log_events": LOG_EVENTS,
92
+ "forward_tts_url": bool(FORWARD_TTS_URL),
93
+ },
94
+ "tip": "Subscribe via WS /ws/brain; producers POST to /process with x-auth",
95
  }
96
 
97
+ # -----------------------
98
+ # Process (Notify in)
99
+ # -----------------------
100
  @app.post("/process")
101
+ async def process_event(request: Request, x_auth: Optional[str] = Header(None)):
102
+ # Secure the router
103
+ if not BRAIN_SECRET:
104
+ return JSONResponse({"ok": False, "error": "server not configured (missing BRAIN_SHARED_SECRET)"}, status_code=500)
105
+ if (x_auth or "").strip() != BRAIN_SECRET:
106
+ return JSONResponse({"ok": False, "error": "unauthorized"}, status_code=401)
 
 
 
 
 
 
 
 
107
 
108
+ try:
109
+ body = await request.json()
110
+ except Exception:
111
+ return JSONResponse({"ok": False, "error": "invalid json"}, status_code=400)
 
 
 
 
 
112
 
113
+ text = (body.get("text") or "").strip()
114
+ meta = body.get("meta") or {}
115
+ if not text:
116
+ return JSONResponse({"ok": False, "error": "empty text"}, status_code=400)
117
 
118
+ payload = {
119
+ "event": "final",
120
+ "text": text,
121
+ "meta": meta,
122
+ "ts": time.time(),
 
123
  }
124
+ if FORWARD_TTS_URL:
125
+ payload["tts_hint"] = {"ws": FORWARD_TTS_URL}
126
+
127
+ await hub.broadcast(payload)
128
+ return {"ok": True, "delivered_to": len(hub._peers)}
129
 
130
+ # -----------------------
131
+ # Brain bus (WS)
132
+ # -----------------------
133
+ @app.websocket("/ws/brain")
134
+ async def ws_brain(ws: WebSocket):
135
+ await ws.accept()
136
+ await hub.join(ws)
 
137
  try:
138
+ # passive channel: we don’t require client messages; keepalive pings are fine
139
+ while True:
140
+ try:
141
+ # wait for client pings/messages but time out to stay responsive
142
+ msg = await asyncio.wait_for(ws.receive_text(), timeout=60.0)
143
+ if msg and LOG_EVENTS:
144
+ print(f"[BRAIN] client msg: {msg[:160]}{'...' if len(msg)>160 else ''}", flush=True)
145
+ except asyncio.TimeoutError:
146
+ # send keepalive ping
147
+ await ws.send_text(json.dumps({"event":"ping","ts": time.time()}))
148
+ except WebSocketDisconnect:
149
+ pass
150
+ except Exception:
151
+ # swallow unexpected during client disconnects
152
+ pass
153
+ finally:
154
+ await hub.leave(ws)
155
+ try:
156
+ await ws.close()
157
+ except Exception:
158
+ pass