Percy3822 commited on
Commit
d07e459
·
verified ·
1 Parent(s): 076eacb

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +98 -48
app.py CHANGED
@@ -1,60 +1,115 @@
1
- import os, json, asyncio, time
2
- from typing import Optional
3
- import httpx
4
  from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Body
5
  from fastapi.middleware.cors import CORSMiddleware
6
- from fastapi.responses import JSONResponse
7
 
8
  FILES_DIR = os.environ.get("FILES_DIR", "/tmp/brain_app/files")
9
  TTS_WS_TARGET = os.environ.get("TTS_WS_TARGET", "")
10
  TTS_HTTP_TARGET = os.environ.get("TTS_HTTP_TARGET", "")
11
  STT_WS_TARGET = os.environ.get("STT_WS_TARGET", "")
 
12
 
13
- app = FastAPI(title="Brain Proxy")
 
 
14
  app.add_middleware(
15
  CORSMiddleware,
16
- allow_origins=[""], allow_headers=[""], allow_methods=["*"], allow_credentials=True,
17
  )
18
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
19
  @app.get("/health")
20
  def health():
21
  return {
22
- "ok": True,
23
- "service": "brain",
24
- "files_dir": FILES_DIR,
25
- "tts_ws_target": TTS_WS_TARGET,
26
- "tts_http_target": TTS_HTTP_TARGET,
27
- "stt_ws_target": STT_WS_TARGET,
28
- "time": time.time(),
29
  }
30
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
31
  # ---------- HTTP proxy to TTS /speak ----------
32
  @app.post("/speak")
33
  async def speak_proxy(body: dict = Body(...)):
34
  if not TTS_HTTP_TARGET:
35
  return JSONResponse({"ok": False, "error": "TTS HTTP target not configured"}, 500)
36
  try:
37
- async with httpx.AsyncClient(timeout=60) as client:
38
  r = await client.post(TTS_HTTP_TARGET, json=body)
39
  r.raise_for_status()
40
- return JSONResponse(r.json(), r.status_code)
41
- except httpx.HTTPError as e:
 
 
 
42
  return JSONResponse({"ok": False, "error": f"TTS proxy failed: {e}"}, 502)
43
 
44
- # ---------- WS proxy helpers ----------
45
- async def _bridge_ws(client_ws: WebSocket, upstream_url: str):
46
- import websockets
47
-
48
- # Establish upstream connection
49
  try:
50
  upstream = await websockets.connect(upstream_url, ping_interval=None, max_size=8_000_000)
51
  except Exception as e:
52
  await client_ws.send_json({"event":"error","error":f"Upstream connect failed: {repr(e)}"})
53
- await client_ws.close()
54
- return
55
 
56
  async def c2u():
57
- # client -> upstream (pass text or bytes)
58
  try:
59
  while True:
60
  msg = await client_ws.receive()
@@ -62,19 +117,13 @@ async def _bridge_ws(client_ws: WebSocket, upstream_url: str):
62
  await upstream.send(msg["bytes"])
63
  elif "text" in msg and msg["text"] is not None:
64
  await upstream.send(msg["text"])
65
- else:
66
- # ignore pings/others
67
- pass
68
  except WebSocketDisconnect:
69
  pass
70
  finally:
71
- try:
72
- await upstream.close()
73
- except Exception:
74
- pass
75
 
76
  async def u2c():
77
- # upstream -> client (pass text or binary)
78
  try:
79
  while True:
80
  msg = await upstream.recv()
@@ -85,27 +134,28 @@ async def _bridge_ws(client_ws: WebSocket, upstream_url: str):
85
  except Exception:
86
  pass
87
  finally:
88
- try:
89
- await client_ws.close()
90
- except Exception:
91
- pass
92
 
93
- # run both directions
94
  await asyncio.gather(c2u(), u2c())
95
 
96
- # ---------- WS endpoints the client will use ----------
97
- @app.websocket("/ws/tts_proxy")
98
- async def ws_tts_proxy(ws: WebSocket):
99
  await ws.accept()
100
  if not TTS_WS_TARGET:
101
- await ws.send_json({"event":"error","error":"TTS WS target not configured"})
102
- await ws.close(); return
103
- await _bridge_ws(ws, TTS_WS_TARGET)
104
 
105
- @app.websocket("/ws/stt_proxy")
106
- async def ws_stt_proxy(ws: WebSocket):
107
  await ws.accept()
108
  if not STT_WS_TARGET:
109
- await ws.send_json({"event":"error","error":"STT WS target not configured"})
110
- await ws.close(); return
111
- await _bridge_ws(ws, STT_WS_TARGET)
 
 
 
 
 
 
 
1
+ import os, json, time, asyncio
2
+ from typing import Dict, Any
3
+ import httpx, websockets
4
  from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Body
5
  from fastapi.middleware.cors import CORSMiddleware
6
+ from fastapi.responses import JSONResponse, StreamingResponse
7
 
8
  FILES_DIR = os.environ.get("FILES_DIR", "/tmp/brain_app/files")
9
  TTS_WS_TARGET = os.environ.get("TTS_WS_TARGET", "")
10
  TTS_HTTP_TARGET = os.environ.get("TTS_HTTP_TARGET", "")
11
  STT_WS_TARGET = os.environ.get("STT_WS_TARGET", "")
12
+ AGENT_WS_TARGET = os.environ.get("AGENT_WS_TARGET", "")
13
 
14
+ os.makedirs(FILES_DIR, exist_ok=True)
15
+
16
+ app = FastAPI(title="Brain (Proxy/Orchestrator)")
17
  app.add_middleware(
18
  CORSMiddleware,
19
+ allow_origins=[""], allow_methods=[""], allow_headers=["*"], allow_credentials=True,
20
  )
21
 
22
+ telemetry_subs = set() # Server-Sent Events subscribers
23
+ logs_subs = set() # SSE for generic logs/events
24
+
25
+ def _sse_pack(d: Dict[str, Any]) -> bytes:
26
+ return f"data: {json.dumps(d, ensure_ascii=False)}\n\n".encode("utf-8")
27
+
28
+ async def _sse_broadcast(pool, payload):
29
+ dead = []
30
+ for q in list(pool):
31
+ try:
32
+ await q.put(payload)
33
+ except Exception:
34
+ dead.append(q)
35
+ for q in dead:
36
+ pool.discard(q)
37
+
38
  @app.get("/health")
39
  def health():
40
  return {
41
+ "ok": True, "service": "brain",
42
+ "tts_ws_target": TTS_WS_TARGET, "tts_http_target": TTS_HTTP_TARGET,
43
+ "stt_ws_target": STT_WS_TARGET, "agent_ws_target": AGENT_WS_TARGET,
44
+ "files_dir": FILES_DIR, "time": time.time()
 
 
 
45
  }
46
 
47
+ # ---------- SSE streams ----------
48
+ @app.get("/stream/telemetry")
49
+ async def stream_telemetry():
50
+ q: asyncio.Queue = asyncio.Queue()
51
+ telemetry_subs.add(q)
52
+ async def gen():
53
+ try:
54
+ while True:
55
+ item = await q.get()
56
+ yield item
57
+ finally:
58
+ telemetry_subs.discard(q)
59
+ return StreamingResponse(gen(), media_type="text/event-stream")
60
+
61
+ @app.get("/stream/logs")
62
+ async def stream_logs():
63
+ q: asyncio.Queue = asyncio.Queue()
64
+ logs_subs.add(q)
65
+ async def gen():
66
+ try:
67
+ while True:
68
+ item = await q.get()
69
+ yield item
70
+ finally:
71
+ logs_subs.discard(q)
72
+ return StreamingResponse(gen(), media_type="text/event-stream")
73
+
74
+ # ---------- Telemetry ingest ----------
75
+ @app.post("/telemetry")
76
+ async def telemetry(body: dict = Body(...)):
77
+ body["ts"] = time.time()
78
+ await _sse_broadcast(telemetry_subs, _sse_pack(body))
79
+ return {"ok": True}
80
+
81
+ # ---------- Generic log/event ----------
82
+ @app.post("/log")
83
+ async def log(body: dict = Body(...)):
84
+ body["ts"] = time.time()
85
+ await _sse_broadcast(logs_subs, _sse_pack(body))
86
+ return {"ok": True}
87
+
88
  # ---------- HTTP proxy to TTS /speak ----------
89
  @app.post("/speak")
90
  async def speak_proxy(body: dict = Body(...)):
91
  if not TTS_HTTP_TARGET:
92
  return JSONResponse({"ok": False, "error": "TTS HTTP target not configured"}, 500)
93
  try:
94
+ async with httpx.AsyncClient(timeout=120) as client:
95
  r = await client.post(TTS_HTTP_TARGET, json=body)
96
  r.raise_for_status()
97
+ out = r.json()
98
+ await _sse_broadcast(logs_subs, _sse_pack({"kind":"tts_http", "req": body, "resp": out}))
99
+ return JSONResponse(out, r.status_code)
100
+ except Exception as e:
101
+ await _sse_broadcast(logs_subs, _sse_pack({"kind":"error", "where":"speak_proxy", "error": repr(e)}))
102
  return JSONResponse({"ok": False, "error": f"TTS proxy failed: {e}"}, 502)
103
 
104
+ # ---------- bidirectional WS bridge ----------
105
+ async def _bridge(client_ws: WebSocket, upstream_url: str):
 
 
 
106
  try:
107
  upstream = await websockets.connect(upstream_url, ping_interval=None, max_size=8_000_000)
108
  except Exception as e:
109
  await client_ws.send_json({"event":"error","error":f"Upstream connect failed: {repr(e)}"})
110
+ await client_ws.close(); return
 
111
 
112
  async def c2u():
 
113
  try:
114
  while True:
115
  msg = await client_ws.receive()
 
117
  await upstream.send(msg["bytes"])
118
  elif "text" in msg and msg["text"] is not None:
119
  await upstream.send(msg["text"])
 
 
 
120
  except WebSocketDisconnect:
121
  pass
122
  finally:
123
+ try: await upstream.close()
124
+ except: pass
 
 
125
 
126
  async def u2c():
 
127
  try:
128
  while True:
129
  msg = await upstream.recv()
 
134
  except Exception:
135
  pass
136
  finally:
137
+ try: await client_ws.close()
138
+ except: pass
 
 
139
 
 
140
  await asyncio.gather(c2u(), u2c())
141
 
142
+ @app.websocket("/ws/tts")
143
+ async def ws_tts(ws: WebSocket):
 
144
  await ws.accept()
145
  if not TTS_WS_TARGET:
146
+ await ws.send_json({"event":"error","error":"TTS WS target not configured"}); await ws.close(); return
147
+ await _bridge(ws, TTS_WS_TARGET)
 
148
 
149
+ @app.websocket("/ws/stt")
150
+ async def ws_stt(ws: WebSocket):
151
  await ws.accept()
152
  if not STT_WS_TARGET:
153
+ await ws.send_json({"event":"error","error":"STT WS target not configured"}); await ws.close(); return
154
+ await _bridge(ws, STT_WS_TARGET)
155
+
156
+ @app.websocket("/ws/agent")
157
+ async def ws_agent(ws: WebSocket):
158
+ await ws.accept()
159
+ if not AGENT_WS_TARGET:
160
+ await ws.send_json({"event":"error","error":"Agent WS target not configured"}); await ws.close(); return
161
+ await _bridge(ws, AGENT_WS_TARGET)