Percy3822 commited on
Commit
0bbd045
·
verified ·
1 Parent(s): f772dc7

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +68 -228
app.py CHANGED
@@ -1,233 +1,73 @@
1
- # app.py — Brain (Milestone 1: streaming + bus + telemetry)
2
- import asyncio, json, os, time, uuid
3
- from datetime import datetime, timezone
4
- from typing import AsyncGenerator, Dict, List, Optional
5
-
6
- import httpx
7
- from fastapi import FastAPI, Form, UploadFile, File, Request
8
- from fastapi.middleware.cors import CORSMiddleware
9
- from fastapi.responses import StreamingResponse, JSONResponse
10
-
11
- APP_START = time.time()
12
-
13
- # ---- Config -----------------------------------------------------------------
14
- ACTUAL_TTS = os.getenv("ACTUAL_TTS_URL", "https://Percy3822-ActualTTS.hf.space")
15
- ACTUAL_STT = os.getenv("ACTUAL_STT_URL", "https://Percy3822-ActualSTT.hf.space")
16
- PY_AI = os.getenv("PYTHON_AI_URL", "https://Percy3822-Python-ai.hf.space")
17
- FILES_DIR = os.getenv("FILES_DIR", "/home/user/files")
18
- LOG_DIR = os.path.join(FILES_DIR, "logs")
19
- os.makedirs(LOG_DIR, exist_ok=True)
20
-
21
- # ---- App --------------------------------------------------------------------
22
- app = FastAPI(title="Brain Streaming")
23
- app.add_middleware(
24
- CORSMiddleware,
25
- allow_origins=["*"],
26
- allow_methods=["*"],
27
- allow_headers=["*"],
28
- )
29
-
30
- # ---- Simple pub/sub for SSE -------------------------------------------------
31
- class Bus:
32
- def __init__(self):
33
- self.subs: List[asyncio.Queue] = []
34
-
35
- def sub(self) -> asyncio.Queue:
36
- q = asyncio.Queue()
37
- self.subs.append(q)
38
- return q
39
-
40
- async def pub(self, event: Dict):
41
- # Ensure an iso timestamp and monotonic ms
42
- event.setdefault("ts_iso", datetime.now(timezone.utc).isoformat())
43
- event.setdefault("ts_ms", int(time.time() * 1000))
44
- # log to jsonl
45
- with open(os.path.join(LOG_DIR, "events.jsonl"), "a", encoding="utf-8") as f:
46
- f.write(json.dumps(event, ensure_ascii=False) + "\n")
47
- # fan out
48
- for q in list(self.subs):
49
- try:
50
- q.put_nowait(event)
51
- except asyncio.QueueFull:
52
- pass
53
-
54
- BUS = Bus()
55
-
56
- def _sse_encode(ev: Dict) -> bytes:
57
- return f"data: {json.dumps(ev, ensure_ascii=False)}\n\n".encode("utf-8")
58
-
59
- async def _sse_stream(q: asyncio.Queue) -> AsyncGenerator[bytes, None]:
60
- # Heartbeat to keep connections alive
61
  try:
62
- while True:
63
- try:
64
- ev = await asyncio.wait_for(q.get(), timeout=15)
65
- yield _sse_encode(ev)
66
- except asyncio.TimeoutError:
67
- yield b": hb\n\n"
68
- except asyncio.CancelledError:
69
- return
70
-
71
- @app.get("/events")
72
- async def events():
73
- q = BUS.sub()
74
- return StreamingResponse(_sse_stream(q), media_type="text/event-stream")
75
 
76
- # ---- Health + warmup --------------------------------------------------------
77
- @app.post("/warmup")
78
- async def warmup():
79
- out = {}
80
- async with httpx.AsyncClient(timeout=15) as client:
81
- try:
82
- r = await client.get(f"{ACTUAL_TTS}/health")
83
- out["tts"] = ("ok" if r.status_code == 200 else f"status {r.status_code}")
84
- except Exception as e:
85
- out["tts"] = f"err: {e!r}"
86
- try:
87
- r = await client.get(f"{ACTUAL_STT}/health")
88
- out["stt"] = ("ok" if r.status_code == 200 else f"status {r.status_code}")
89
- except Exception as e:
90
- out["stt"] = f"err: {e!r}"
91
- try:
92
- r = await client.post(f"{PY_AI}/code_help", json={"utterance":"ping"})
93
- out["python_ai"] = "ok" if r.status_code in (200,422) else f"status {r.status_code}"
94
- except Exception as e:
95
- out["python_ai"] = f"err: {e!r}"
96
- return {"ok": True, "notes": out}
97
 
98
  @app.get("/health")
99
  async def health():
100
- up = time.time() - APP_START
101
- return {"ok": True, "uptime_sec": round(up,2)}
102
-
103
- # ---- Telemetry intake -------------------------------------------------------
104
- @app.post("/telemetry")
105
- async def telemetry(payload: Dict):
106
- # payload example: { kind, cpu, mem, active_app, net, meter, etc. }
107
- await BUS.pub({"type":"telemetry", "payload":payload})
108
- return {"ok": True}
109
-
110
- # ---- Streaming NLM (demo) ---------------------------------------------------
111
- @app.post("/stream/nlm")
112
- async def stream_nlm(body: Dict):
113
- """
114
- Body:
115
- { "id": "...(optional)", "prompt": "text to answer", "answer": "(optional: if you want me to stream this string)" }
116
- For now we stream the provided 'answer' (or a canned one) token-by-token to let you wire the UI.
117
- """
118
- req_id = body.get("id") or str(uuid.uuid4())
119
- prompt = body.get("prompt", "")
120
- answer = body.get("answer") or f"Ok! I received your prompt: {prompt[:100]}"
121
-
122
- async def gen():
123
- start = time.time()
124
- await BUS.pub({"type":"nlm_start", "id":req_id, "prompt":prompt})
125
- buf = []
126
- for token in answer.split(" "):
127
- buf.append(token)
128
- chunk = " ".join(buf)
129
- ev = {"type":"nlm_token", "id":req_id, "delta":token, "text":chunk}
130
- yield _sse_encode(ev)
131
- await asyncio.sleep(0.03) # tiny delay so UI shows a stream
132
- dur = time.time() - start
133
- await BUS.pub({"type":"nlm_done", "id":req_id, "duration_ms":int(dur*1000)})
134
- yield _sse_encode({"type":"nlm_done", "id":req_id})
135
- return StreamingResponse(gen(), media_type="text/event-stream")
136
-
137
- # ---- Streaming TTS orchestration (status stream) ----------------------------
138
- @app.post("/stream/tts")
139
- async def stream_tts(body: Dict):
140
- """
141
- Body: { "text": "...", "voice": "en_US-amy-medium", "description": "optional" }
142
- Streams status events; ends with {type: "tts_audio", audio_url: "..."} when ready.
143
- """
144
- req_id = str(uuid.uuid4())
145
- text = body.get("text","").strip()
146
- voice = body.get("voice")
147
- desc = body.get("description")
148
-
149
- async def gen():
150
- start = time.time()
151
- yield _sse_encode({"type":"tts_status","id":req_id,"phase":"queued"})
152
- await BUS.pub({"type":"tts_start","id":req_id,"text":text,"voice":voice})
153
-
154
- try:
155
- async with httpx.AsyncClient(timeout=120) as client:
156
- yield _sse_encode({"type":"tts_status","id":req_id,"phase":"synthesizing"})
157
- r = await client.post(f"{ACTUAL_TTS}/speak", json={"text":text, **({"voice":voice} if voice else {}), **({"description":desc} if desc else {})})
158
- r.raise_for_status()
159
- data = r.json()
160
- if not data.get("ok"):
161
- raise RuntimeError(data.get("error") or "TTS failed")
162
- audio_url = data.get("audio_url")
163
- yield _sse_encode({"type":"tts_audio","id":req_id,"audio_url":audio_url})
164
- dur = time.time() - start
165
- await BUS.pub({"type":"tts_done","id":req_id,"audio_url":audio_url,"duration_ms":int(dur*1000)})
166
- yield _sse_encode({"type":"tts_done","id":req_id})
167
- except Exception as e:
168
- err = {"type":"error","scope":"tts","id":req_id,"msg":repr(e)}
169
- await BUS.pub(err)
170
- yield _sse_encode(err)
171
-
172
- return StreamingResponse(gen(), media_type="text/event-stream")
173
-
174
- # ---- Streaming STT orchestration (status stream) ----------------------------
175
- @app.post("/stream/stt")
176
- async def stream_stt(file: UploadFile = File(...), beam_size: int = 1, vad_filter: bool = True):
177
- """
178
- Uploads one file, streams status, ends with text result.
179
- """
180
- req_id = str(uuid.uuid4())
181
-
182
- async def gen():
183
- start = time.time()
184
- yield _sse_encode({"type":"stt_status","id":req_id,"phase":"received","filename":file.filename})
185
- await BUS.pub({"type":"stt_start","id":req_id,"filename":file.filename})
186
-
187
- try:
188
- # forward to ActualSTT
189
- files = {"file": (file.filename, await file.read(), file.content_type or "audio/wav")}
190
- params = {"beam_size":beam_size, "vad_filter":"true" if vad_filter else "false"}
191
- async with httpx.AsyncClient(timeout=120) as client:
192
- yield _sse_encode({"type":"stt_status","id":req_id,"phase":"processing"})
193
- r = await client.post(f"{ACTUAL_STT}/transcribe", params=params, files=files)
194
- r.raise_for_status()
195
- data = r.json()
196
- text = data.get("text","")
197
- lang = data.get("language","")
198
- dur_a = data.get("duration",0)
199
- yield _sse_encode({"type":"stt_text","id":req_id,"text":text,"language":lang,"audio_duration":dur_a})
200
- dur = time.time() - start
201
- await BUS.pub({"type":"stt_done","id":req_id,"text":text,"duration_ms":int(dur*1000)})
202
- yield _sse_encode({"type":"stt_done","id":req_id})
203
- except Exception as e:
204
- err = {"type":"error","scope":"stt","id":req_id,"msg":repr(e)}
205
- await BUS.pub(err)
206
- yield _sse_encode(err)
207
-
208
- return StreamingResponse(gen(), media_type="text/event-stream")
209
-
210
- # ---- Existing compatibility endpoints (non-stream) --------------------------
211
- @app.post("/speak")
212
- async def speak(body: Dict):
213
- text = body.get("text","")
214
- voice = body.get("voice")
215
- desc = body.get("description")
216
- async with httpx.AsyncClient(timeout=120) as client:
217
- r = await client.post(f"{ACTUAL_TTS}/speak", json={"text":text, **({"voice":voice} if voice else {}), **({"description":desc} if desc else {})})
218
- return JSONResponse(r.json(), status_code=r.status_code)
219
-
220
- @app.post("/transcribe")
221
- async def transcribe(file: UploadFile = File(...), beam_size: int = 1, vad_filter: bool = True):
222
- files = {"file": (file.filename, await file.read(), file.content_type or "audio/wav")}
223
- params = {"beam_size":beam_size, "vad_filter":"true" if vad_filter else "false"}
224
- async with httpx.AsyncClient(timeout=120) as client:
225
- r = await client.post(f"{ACTUAL_STT}/transcribe", params=params, files=files)
226
- return JSONResponse(r.json(), status_code=r.status_code)
227
-
228
- # (Optional) passthrough to python_ai/code_help so your tests still work
229
- @app.post("/code_help")
230
- async def code_help(body: Dict):
231
- async with httpx.AsyncClient(timeout=60) as client:
232
- r = await client.post(f"{PY_AI}/code_help", json=body)
233
- return JSONResponse(r.json(), status_code=r.status_code)
 
1
+ import os
2
+ import time
3
+ import json
4
+ from pathlib import Path
5
+ from fastapi import FastAPI, Request
6
+ from fastapi.responses import JSONResponse, StreamingResponse
7
+
8
+ # ------------------
9
+ # Setup paths
10
+ # ------------------
11
+ BASE_DIR = Path(__file__).parent
12
+ FILES_DIR = BASE_DIR / "files"
13
+ LOG_DIR = FILES_DIR / "logs"
14
+
15
+ FILES_DIR.mkdir(parents=True, exist_ok=True)
16
+ LOG_DIR.mkdir(parents=True, exist_ok=True)
17
+
18
+ # ------------------
19
+ # Safe logger
20
+ # ------------------
21
+ def _safe_append_jsonl(path: Path, event: dict):
22
+ """Safely append JSON event to a file without crashing."""
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
23
  try:
24
+ path.parent.mkdir(parents=True, exist_ok=True)
25
+ with open(path, "a", encoding="utf-8") as f:
26
+ f.write(json.dumps(event, ensure_ascii=False) + "\n")
27
+ except Exception as e:
28
+ print(f"[WARN] Could not log event: {e}")
 
 
 
 
 
 
 
 
29
 
30
+ # ------------------
31
+ # FastAPI app
32
+ # ------------------
33
+ app = FastAPI(title="Brain", version="0.1")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
34
 
35
  @app.get("/health")
36
  async def health():
37
+ return {
38
+ "ok": True,
39
+ "service": "brain",
40
+ "time": time.time(),
41
+ "files_dir": str(FILES_DIR),
42
+ "logs": str(LOG_DIR),
43
+ }
44
+
45
+ @app.post("/process")
46
+ async def process(request: Request):
47
+ payload = await request.json()
48
+
49
+ event = {
50
+ "ts": time.time(),
51
+ "type": "process",
52
+ "data": payload,
53
+ }
54
+ _safe_append_jsonl(LOG_DIR / "events.jsonl", event)
55
+
56
+ # Echo back
57
+ return JSONResponse({"ok": True, "received": payload})
58
+
59
+ @app.get("/stream/logs")
60
+ async def stream_logs():
61
+ """Stream logs line-by-line to client (SSE style)."""
62
+
63
+ async def event_stream():
64
+ path = LOG_DIR / "events.jsonl"
65
+ if not path.exists():
66
+ yield "data: {}\n\n"
67
+ return
68
+
69
+ with open(path, "r", encoding="utf-8") as f:
70
+ for line in f:
71
+ yield f"data: {line.strip()}\n\n"
72
+
73
+ return StreamingResponse(event_stream(), media_type="text/event-stream")