Percy3822 commited on
Commit
26cb7ed
·
verified ·
1 Parent(s): 290f34e

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +44 -142
app.py CHANGED
@@ -1,161 +1,63 @@
1
- import os, json, time, asyncio
2
- from typing import Dict, Any
3
- import httpx, websockets
4
- from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Body
5
  from fastapi.middleware.cors import CORSMiddleware
6
- from fastapi.responses import JSONResponse, StreamingResponse
7
 
8
- FILES_DIR = os.environ.get("FILES_DIR", "/tmp/brain_app/files")
9
- TTS_WS_TARGET = os.environ.get("TTS_WS_TARGET", "")
10
- TTS_HTTP_TARGET = os.environ.get("TTS_HTTP_TARGET", "")
11
- STT_WS_TARGET = os.environ.get("STT_WS_TARGET", "")
12
- AGENT_WS_TARGET = os.environ.get("AGENT_WS_TARGET", "")
13
 
14
- os.makedirs(FILES_DIR, exist_ok=True)
 
 
 
15
 
16
- app = FastAPI(title="Brain (Proxy/Orchestrator)")
17
  app.add_middleware(
18
  CORSMiddleware,
19
- allow_origins=[""], allow_methods=[""], allow_headers=["*"], allow_credentials=True,
 
20
  )
21
 
22
- telemetry_subs = set() # Server-Sent Events subscribers
23
- logs_subs = set() # SSE for generic logs/events
24
-
25
- def _sse_pack(d: Dict[str, Any]) -> bytes:
26
- return f"data: {json.dumps(d, ensure_ascii=False)}\n\n".encode("utf-8")
27
-
28
- async def _sse_broadcast(pool, payload):
29
- dead = []
30
- for q in list(pool):
31
- try:
32
- await q.put(payload)
33
- except Exception:
34
- dead.append(q)
35
- for q in dead:
36
- pool.discard(q)
37
-
38
  @app.get("/health")
39
  def health():
40
  return {
41
- "ok": True, "service": "brain",
42
- "tts_ws_target": TTS_WS_TARGET, "tts_http_target": TTS_HTTP_TARGET,
43
- "stt_ws_target": STT_WS_TARGET, "agent_ws_target": AGENT_WS_TARGET,
44
- "files_dir": FILES_DIR, "time": time.time()
 
45
  }
46
 
47
- # ---------- SSE streams ----------
48
- @app.get("/stream/telemetry")
49
- async def stream_telemetry():
50
- q: asyncio.Queue = asyncio.Queue()
51
- telemetry_subs.add(q)
52
- async def gen():
53
- try:
54
- while True:
55
- item = await q.get()
56
- yield item
57
- finally:
58
- telemetry_subs.discard(q)
59
- return StreamingResponse(gen(), media_type="text/event-stream")
60
-
61
- @app.get("/stream/logs")
62
- async def stream_logs():
63
- q: asyncio.Queue = asyncio.Queue()
64
- logs_subs.add(q)
65
- async def gen():
66
- try:
67
- while True:
68
- item = await q.get()
69
- yield item
70
- finally:
71
- logs_subs.discard(q)
72
- return StreamingResponse(gen(), media_type="text/event-stream")
73
-
74
- # ---------- Telemetry ingest ----------
75
- @app.post("/telemetry")
76
- async def telemetry(body: dict = Body(...)):
77
- body["ts"] = time.time()
78
- await _sse_broadcast(telemetry_subs, _sse_pack(body))
79
- return {"ok": True}
80
-
81
- # ---------- Generic log/event ----------
82
- @app.post("/log")
83
- async def log(body: dict = Body(...)):
84
- body["ts"] = time.time()
85
- await _sse_broadcast(logs_subs, _sse_pack(body))
86
- return {"ok": True}
87
-
88
- # ---------- HTTP proxy to TTS /speak ----------
89
- @app.post("/speak")
90
- async def speak_proxy(body: dict = Body(...)):
91
- if not TTS_HTTP_TARGET:
92
- return JSONResponse({"ok": False, "error": "TTS HTTP target not configured"}, 500)
93
- try:
94
- async with httpx.AsyncClient(timeout=120) as client:
95
- r = await client.post(TTS_HTTP_TARGET, json=body)
96
- r.raise_for_status()
97
- out = r.json()
98
- await _sse_broadcast(logs_subs, _sse_pack({"kind":"tts_http", "req": body, "resp": out}))
99
- return JSONResponse(out, r.status_code)
100
- except Exception as e:
101
- await _sse_broadcast(logs_subs, _sse_pack({"kind":"error", "where":"speak_proxy", "error": repr(e)}))
102
- return JSONResponse({"ok": False, "error": f"TTS proxy failed: {e}"}, 502)
103
 
104
- # ---------- bidirectional WS bridge ----------
105
- async def _bridge(client_ws: WebSocket, upstream_url: str):
 
 
106
  try:
107
- upstream = await websockets.connect(upstream_url, ping_interval=None, max_size=8_000_000)
 
 
 
 
 
108
  except Exception as e:
109
- await client_ws.send_json({"event":"error","error":f"Upstream connect failed: {repr(e)}"})
110
- await client_ws.close(); return
111
-
112
- async def c2u():
113
  try:
114
- while True:
115
- msg = await client_ws.receive()
116
- if "bytes" in msg and msg["bytes"] is not None:
117
- await upstream.send(msg["bytes"])
118
- elif "text" in msg and msg["text"] is not None:
119
- await upstream.send(msg["text"])
120
- except WebSocketDisconnect:
121
  pass
122
- finally:
123
- try: await upstream.close()
124
- except: pass
125
-
126
- async def u2c():
127
  try:
128
- while True:
129
- msg = await upstream.recv()
130
- if isinstance(msg, (bytes, bytearray)):
131
- await client_ws.send_bytes(msg)
132
- else:
133
- await client_ws.send_text(msg)
134
- except Exception:
135
- pass
136
- finally:
137
- try: await client_ws.close()
138
- except: pass
139
-
140
- await asyncio.gather(c2u(), u2c())
141
-
142
- @app.websocket("/ws/tts")
143
- async def ws_tts(ws: WebSocket):
144
- await ws.accept()
145
- if not TTS_WS_TARGET:
146
- await ws.send_json({"event":"error","error":"TTS WS target not configured"}); await ws.close(); return
147
- await _bridge(ws, TTS_WS_TARGET)
148
-
149
- @app.websocket("/ws/stt")
150
- async def ws_stt(ws: WebSocket):
151
- await ws.accept()
152
- if not STT_WS_TARGET:
153
- await ws.send_json({"event":"error","error":"STT WS target not configured"}); await ws.close(); return
154
- await _bridge(ws, STT_WS_TARGET)
155
-
156
- @app.websocket("/ws/agent")
157
- async def ws_agent(ws: WebSocket):
158
- await ws.accept()
159
- if not AGENT_WS_TARGET:
160
- await ws.send_json({"event":"error","error":"Agent WS target not configured"}); await ws.close(); return
161
- await _bridge(ws, AGENT_WS_TARGET)
 
1
+ import os, time, json, asyncio
2
+ from typing import Any, Dict
3
+ from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Request
4
+ from fastapi.responses import JSONResponse
5
  from fastapi.middleware.cors import CORSMiddleware
 
6
 
7
+ BASE_DIR = "/tmp/brain_app"
8
+ FILES_DIR = os.path.join(BASE_DIR, "files")
9
+ LOGS_DIR = os.path.join(FILES_DIR, "logs")
10
+ for d in (BASE_DIR, FILES_DIR, LOGS_DIR):
11
+ os.makedirs(d, exist_ok=True)
12
 
13
+ def log_event(kind: str, data: Dict[str, Any]):
14
+ rec = {"ts": time.time(), "kind": kind, "data": data}
15
+ with open(os.path.join(LOGS_DIR, "events.jsonl"), "a", encoding="utf-8") as f:
16
+ f.write(json.dumps(rec, ensure_ascii=False) + "\n")
17
 
18
+ app = FastAPI(title="Brain (Skeleton)")
19
  app.add_middleware(
20
  CORSMiddleware,
21
+ allow_origins=["*"], allow_credentials=True,
22
+ allow_methods=[""], allow_headers=[""],
23
  )
24
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
25
  @app.get("/health")
26
  def health():
27
  return {
28
+ "ok": True,
29
+ "service": "brain-skeleton",
30
+ "time": time.time(),
31
+ "files_dir": FILES_DIR,
32
+ "logs_dir": LOGS_DIR,
33
  }
34
 
35
+ @app.post("/echo")
36
+ async def echo(request: Request):
37
+ payload = await request.json()
38
+ log_event("echo_http_in", payload)
39
+ out = {"ok": True, "ts": time.time(), "echo": payload}
40
+ log_event("echo_http_out", out)
41
+ return JSONResponse(out)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
42
 
43
+ @app.websocket("/ws/echo")
44
+ async def ws_echo(ws: WebSocket):
45
+ await ws.accept()
46
+ await ws.send_json({"event": "hello", "msg": "echo socket ready"})
47
  try:
48
+ while True:
49
+ text = await ws.receive_text()
50
+ log_event("echo_ws_in", {"text": text})
51
+ await ws.send_json({"event": "echo", "text": text})
52
+ except WebSocketDisconnect:
53
+ pass
54
  except Exception as e:
 
 
 
 
55
  try:
56
+ await ws.send_json({"event": "error", "detail": str(e)})
57
+ except:
 
 
 
 
 
58
  pass
59
+ finally:
 
 
 
 
60
  try:
61
+ await ws.close()
62
+ except:
63
+ pass