Percy3822 commited on
Commit
79405ca
·
verified ·
1 Parent(s): 4ee14aa

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +50 -130
app.py CHANGED
@@ -1,68 +1,31 @@
1
- import os, json, time, uuid, io
 
 
 
2
  from pathlib import Path
3
- from typing import Dict, Any, AsyncIterator
4
- from queue import Queue, Empty
5
 
6
- import httpx
7
- from fastapi import FastAPI, File, UploadFile, Request, Response
8
- from fastapi.responses import StreamingResponse, JSONResponse
9
 
10
- # ---------- CONFIG ----------
11
- BASE_DIR = Path(os.getenv("BRAIN_BASE_DIR", "/tmp/brain_app"))
12
  FILES_DIR = BASE_DIR / "files"
13
  LOGS_DIR = FILES_DIR / "logs"
14
  FILES_DIR.mkdir(parents=True, exist_ok=True)
15
  LOGS_DIR.mkdir(parents=True, exist_ok=True)
16
 
17
  EVENTS_FILE = LOGS_DIR / "events.jsonl"
18
- ERRORS_FILE = LOGS_DIR / "errors.jsonl"
19
  TELEM_FILE = LOGS_DIR / "telemetry.jsonl"
20
 
21
- ACTUAL_TTS = os.getenv("ACTUAL_TTS", "https://Percy3822-ActualTTS.hf.space")
22
- ACTUAL_STT = os.getenv("ACTUAL_STT", "https://Percy3822-ActualSTT.hf.space")
23
- PY_AI = os.getenv("PY_AI", "https://Percy3822-Python-ai.hf.space")
24
-
25
- # ---------- APP ----------
26
  app = FastAPI(title="Brain")
 
27
 
28
- # simple fanout queues for SSE
29
- _log_subs, _err_subs, _telem_subs = set(), set(), set()
30
-
31
- def _append(path: Path, obj: Dict[str, Any]):
32
- try:
33
- with path.open("a", encoding="utf-8") as f:
34
- f.write(json.dumps(obj, ensure_ascii=False) + "\n")
35
- except Exception:
36
- pass
37
-
38
- def _broadcast(subs, event):
39
- dead = []
40
- for q in list(subs):
41
- try:
42
- q.put_nowait(event)
43
- except Exception:
44
- dead.append(q)
45
- for q in dead:
46
- subs.discard(q)
47
-
48
- def _sse_stream(subs) -> StreamingResponse:
49
- def make() -> AsyncIterator[str]:
50
- q: Queue = Queue(maxsize=1000)
51
- subs.add(q)
52
- try:
53
- yield f"data: {json.dumps({'ts': time.time(), 'hello': True})}\n\n"
54
- while True:
55
- try:
56
- item = q.get(timeout=30)
57
- except Empty:
58
- yield "data: {}\n\n" # keepalive
59
- continue
60
- yield f"data: {json.dumps(item)}\n\n"
61
- finally:
62
- subs.discard(q)
63
- return StreamingResponse(make(), media_type="text/event-stream")
64
 
65
- # ---------- HEALTH ----------
66
  @app.get("/health")
67
  def health():
68
  return {
@@ -73,99 +36,56 @@ def health():
73
  "files_dir": str(FILES_DIR),
74
  "logs_dir": str(LOGS_DIR),
75
  "events_file": str(EVENTS_FILE),
 
76
  }
77
 
78
- # ---------- WARMUP ----------
79
- @app.post("/warmup")
80
- async def warmup():
81
- notes = {}
82
- async with httpx.AsyncClient(timeout=20) as s:
83
- try:
84
- r = await s.get(f"{PY_AI}/health")
85
- notes["python_ai"] = "ok" if r.status_code == 200 else f"status {r.status_code}"
86
- except Exception:
87
- notes["python_ai"] = "down"
88
- try:
89
- r = await s.get(f"{ACTUAL_TTS}/health")
90
- notes["tts"] = "ok" if r.status_code == 200 else f"status {r.status_code}"
91
- except Exception:
92
- notes["tts"] = "down"
93
- try:
94
- r = await s.get(f"{ACTUAL_STT}/health")
95
- notes["stt"] = "ok" if r.status_code == 200 else f"status {r.status_code}"
96
- except Exception:
97
- notes["stt"] = "down"
98
- return {"ok": True, "notes": notes}
99
-
100
- # ---------- GENERIC PROCESS (demo) ----------
101
  @app.post("/process")
102
- async def process(payload: Dict[str, Any], request: Request):
103
- evt = {"ts": time.time(), "type": "process", "data": payload}
104
- _append(EVENTS_FILE, evt)
105
- _broadcast(_log_subs, evt)
106
  return {"ok": True, "received": payload}
107
 
108
- # ---------- ERRORS ----------
109
  @app.post("/log_error")
110
- async def log_error(payload: Dict[str, Any], request: Request):
111
- evt = {"ts": time.time(), "type": "error", "data": payload}
112
- _append(ERRORS_FILE, evt)
113
- _broadcast(_err_subs, evt)
114
  return {"ok": True}
115
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
116
  @app.get("/stream/logs")
117
  def stream_logs():
118
- return _sse_stream(_log_subs)
119
 
120
  @app.get("/stream/errors")
121
  def stream_errors():
122
- return _sse_stream(_err_subs)
123
-
124
- # ---------- TELEMETRY ----------
125
- @app.post("/telemetry")
126
- async def ingest_telemetry(payload: Dict[str, Any], request: Request):
127
- evt = {
128
- "ts": time.time(),
129
- "type": "telemetry",
130
- "client_ip": request.client.host if request and request.client else None,
131
- "data": payload,
132
- }
133
- _append(TELEM_FILE, evt)
134
- _broadcast(_telem_subs, evt)
135
- return {"ok": True}
136
 
137
  @app.get("/stream/telemetry")
138
  def stream_telemetry():
139
- return _sse_stream(_telem_subs)
140
-
141
- # ---------- PYTHON AI PROXY ----------
142
- @app.post("/code_help")
143
- async def code_help_proxy(payload: Dict[str, Any]):
144
- async with httpx.AsyncClient(timeout=60) as s:
145
- r = await s.post(f"{PY_AI}/code_help", json=payload)
146
- return JSONResponse(status_code=r.status_code, content=r.json())
147
-
148
- # ---------- SPEAK (TTS) ----------
149
- @app.post("/speak")
150
- async def speak(payload: Dict[str, Any]):
151
- # payload: { text, voice? }
152
- async with httpx.AsyncClient(timeout=120) as s:
153
- r = await s.post(f"{ACTUAL_TTS}/speak", json=payload)
154
- data = r.json()
155
- # Always return fully-qualified URL
156
- if "audio_url" in data:
157
- url = data["audio_url"]
158
- if url.startswith("/"):
159
- data["audio_url"] = f"{ACTUAL_TTS}{url}"
160
- elif url.startswith("http") is False:
161
- data["audio_url"] = f"{ACTUAL_TTS}/{url.lstrip('/')}"
162
- return JSONResponse(status_code=r.status_code, content=data)
163
 
164
- # ---------- TRANSCRIBE (STT) ----------
165
- @app.post("/transcribe")
166
- async def transcribe(file: UploadFile = File(...)):
167
- raw = await file.read()
168
- files = {"file": (file.filename, raw, file.content_type or "audio/wav")}
169
- async with httpx.AsyncClient(timeout=120) as s:
170
- r = await s.post(f"{ACTUAL_STT}/transcribe", files=files)
171
- return JSONResponse(status_code=r.status_code, content=r.json())
 
1
+ import os
2
+ import json
3
+ import time
4
+ import asyncio
5
  from pathlib import Path
6
+ from typing import Dict, Any
 
7
 
8
+ from fastapi import FastAPI, Body, Request
9
+ from fastapi.responses import JSONResponse, StreamingResponse
10
+ from fastapi.staticfiles import StaticFiles
11
 
12
+ BASE_DIR = Path(os.environ.get("BASE_DIR", "/tmp/brain_app"))
 
13
  FILES_DIR = BASE_DIR / "files"
14
  LOGS_DIR = FILES_DIR / "logs"
15
  FILES_DIR.mkdir(parents=True, exist_ok=True)
16
  LOGS_DIR.mkdir(parents=True, exist_ok=True)
17
 
18
  EVENTS_FILE = LOGS_DIR / "events.jsonl"
 
19
  TELEM_FILE = LOGS_DIR / "telemetry.jsonl"
20
 
 
 
 
 
 
21
  app = FastAPI(title="Brain")
22
+ app.mount("/files", StaticFiles(directory=str(FILES_DIR)), name="files")
23
 
24
+ def append_jsonl(path: Path, obj: Dict[str, Any]):
25
+ obj = {"ts": time.time(), **obj}
26
+ with path.open("a", encoding="utf-8") as f:
27
+ f.write(json.dumps(obj, ensure_ascii=False) + "\n")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
28
 
 
29
  @app.get("/health")
30
  def health():
31
  return {
 
36
  "files_dir": str(FILES_DIR),
37
  "logs_dir": str(LOGS_DIR),
38
  "events_file": str(EVENTS_FILE),
39
+ "telemetry_file": str(TELEM_FILE),
40
  }
41
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
42
  @app.post("/process")
43
+ async def process(payload: Dict[str, Any] = Body(...)):
44
+ append_jsonl(EVENTS_FILE, {"type": "process", "data": payload})
 
 
45
  return {"ok": True, "received": payload}
46
 
 
47
  @app.post("/log_error")
48
+ async def log_error(payload: Dict[str, Any] = Body(...)):
49
+ append_jsonl(EVENTS_FILE, {"type": "error", "data": payload})
 
 
50
  return {"ok": True}
51
 
52
+ @app.post("/telemetry")
53
+ async def telemetry(payload: Dict[str, Any] = Body(...)):
54
+ append_jsonl(TELEM_FILE, {"type": "telemetry", "data": payload})
55
+ return {"ok": True}
56
+
57
+ def sse_lines(path: Path, event_filter: str | None = None):
58
+ # Tail-like SSE; yields lines as they are appended
59
+ pos = 0
60
+ while True:
61
+ try:
62
+ with path.open("r", encoding="utf-8") as f:
63
+ f.seek(pos)
64
+ for line in f:
65
+ pos = f.tell()
66
+ try:
67
+ obj = json.loads(line.strip())
68
+ if event_filter and obj.get("type") != event_filter:
69
+ continue
70
+ yield f"data: {json.dumps(obj, ensure_ascii=False)}\n\n"
71
+ except Exception:
72
+ continue
73
+ except FileNotFoundError:
74
+ pass
75
+ time.sleep(0.5)
76
+
77
  @app.get("/stream/logs")
78
  def stream_logs():
79
+ return StreamingResponse(sse_lines(EVENTS_FILE, None), media_type="text/event-stream")
80
 
81
  @app.get("/stream/errors")
82
  def stream_errors():
83
+ return StreamingResponse(sse_lines(EVENTS_FILE, "error"), media_type="text/event-stream")
 
 
 
 
 
 
 
 
 
 
 
 
 
84
 
85
  @app.get("/stream/telemetry")
86
  def stream_telemetry():
87
+ return StreamingResponse(sse_lines(TELEM_FILE, "telemetry"), media_type="text/event-stream")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
88
 
89
+ @app.get("/")
90
+ def index():
91
+ return {"ok": True, "service": "brain", "streams": ["/stream/logs", "/stream/telemetry"]}