Percy3822 commited on
Commit
18bb6e6
·
verified ·
1 Parent(s): c59dbf3

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +141 -62
app.py CHANGED
@@ -1,50 +1,68 @@
1
- # app.py
2
- import json, time, os
3
  from pathlib import Path
4
- from fastapi import FastAPI, Request
5
- from fastapi.responses import JSONResponse, StreamingResponse
6
 
7
- app = FastAPI(title="Brain", version="0.1.0")
 
 
8
 
9
- # -----------------------------
10
- # Writable paths (with fallback)
11
- # -----------------------------
12
- def _mk_dir(path: Path) -> Path:
13
- try:
14
- path.mkdir(parents=True, exist_ok=True)
15
- return path
16
- except Exception as e:
17
- # fallback to /tmp which is always writable on HF Spaces
18
- fallback = Path("/tmp/brain_app/files") if "files" in str(path) else Path("/tmp/brain_app")
19
- try:
20
- fallback.mkdir(parents=True, exist_ok=True)
21
- except Exception:
22
- pass
23
- print(f"[WARN] Could not create {path} ({e}); using {fallback}")
24
- return fallback
25
-
26
- # Allow overrides via env, else use /tmp
27
- BASE_DIR = Path(os.getenv("BRAIN_BASE_DIR", "/tmp/brain_app"))
28
- FILES_DIR = _mk_dir(Path(os.getenv("FILES_DIR", str(BASE_DIR / "files"))))
29
- LOG_DIR = _mk_dir(Path(os.getenv("LOG_DIR", str(FILES_DIR / "logs"))))
30
-
31
- EVENTS_FILE = LOG_DIR / "events.jsonl"
32
- ERRORS_FILE = LOG_DIR / "errors.jsonl"
33
-
34
- # -----------------------------
35
- # Safe file append
36
- # -----------------------------
37
- def _append_jsonl(path: Path, obj: dict):
38
  try:
39
- path.parent.mkdir(parents=True, exist_ok=True)
40
  with path.open("a", encoding="utf-8") as f:
41
  f.write(json.dumps(obj, ensure_ascii=False) + "\n")
42
- except Exception as e:
43
- print(f"[WARN] log write failed: {e}")
 
 
 
 
 
 
 
 
 
 
44
 
45
- # -----------------------------
46
- # Endpoints
47
- # -----------------------------
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
48
  @app.get("/health")
49
  def health():
50
  return {
@@ -53,40 +71,101 @@ def health():
53
  "time": time.time(),
54
  "base_dir": str(BASE_DIR),
55
  "files_dir": str(FILES_DIR),
56
- "logs_dir": str(LOG_DIR),
57
  "events_file": str(EVENTS_FILE),
58
  }
59
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
60
  @app.post("/process")
61
- async def process(req: Request):
62
- payload = await req.json()
63
  evt = {"ts": time.time(), "type": "process", "data": payload}
64
- _append_jsonl(EVENTS_FILE, evt)
 
65
  return {"ok": True, "received": payload}
66
 
 
67
  @app.post("/log_error")
68
- async def log_error(req: Request):
69
- payload = await req.json()
70
- err = {"ts": time.time(), "type": "error", "data": payload}
71
- _append_jsonl(ERRORS_FILE, err)
72
- _append_jsonl(EVENTS_FILE, {"ts": err["ts"], "type": "error_ref"})
73
  return {"ok": True}
74
 
75
- def _sse_from_file(path: Path):
76
- # generator for SSE streaming of an existing file (no tailing daemon)
77
- def gen():
78
- if not path.exists():
79
- yield "data: {}\n\n"
80
- return
81
- with path.open("r", encoding="utf-8") as f:
82
- for line in f:
83
- yield f"data: " + line.strip() + "\n\n"
84
- return gen
85
-
86
  @app.get("/stream/logs")
87
  def stream_logs():
88
- return StreamingResponse(_sse_from_file(EVENTS_FILE)(), media_type="text/event-stream")
89
 
90
  @app.get("/stream/errors")
91
  def stream_errors():
92
- return StreamingResponse(_sse_from_file(ERRORS_FILE)(), media_type="text/event-stream")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import os, json, time, uuid, io
 
2
  from pathlib import Path
3
+ from typing import Dict, Any, AsyncIterator
4
+ from queue import Queue, Empty
5
 
6
+ import httpx
7
+ from fastapi import FastAPI, File, UploadFile, Request, Response
8
+ from fastapi.responses import StreamingResponse, JSONResponse
9
 
10
+ # ---------- CONFIG ----------
11
+ BASE_DIR = Path(os.getenv("BRAIN_BASE_DIR", "/tmp/brain_app"))
12
+ FILES_DIR = BASE_DIR / "files"
13
+ LOGS_DIR = FILES_DIR / "logs"
14
+ FILES_DIR.mkdir(parents=True, exist_ok=True)
15
+ LOGS_DIR.mkdir(parents=True, exist_ok=True)
16
+
17
+ EVENTS_FILE = LOGS_DIR / "events.jsonl"
18
+ ERRORS_FILE = LOGS_DIR / "errors.jsonl"
19
+ TELEM_FILE = LOGS_DIR / "telemetry.jsonl"
20
+
21
+ ACTUAL_TTS = os.getenv("ACTUAL_TTS", "https://Percy3822-ActualTTS.hf.space")
22
+ ACTUAL_STT = os.getenv("ACTUAL_STT", "https://Percy3822-ActualSTT.hf.space")
23
+ PY_AI = os.getenv("PY_AI", "https://Percy3822-Python-ai.hf.space")
24
+
25
+ # ---------- APP ----------
26
+ app = FastAPI(title="Brain")
27
+
28
+ # simple fanout queues for SSE
29
+ _log_subs, _err_subs, _telem_subs = set(), set(), set()
30
+
31
+ def _append(path: Path, obj: Dict[str, Any]):
 
 
 
 
 
 
 
32
  try:
 
33
  with path.open("a", encoding="utf-8") as f:
34
  f.write(json.dumps(obj, ensure_ascii=False) + "\n")
35
+ except Exception:
36
+ pass
37
+
38
+ def _broadcast(subs, event):
39
+ dead = []
40
+ for q in list(subs):
41
+ try:
42
+ q.put_nowait(event)
43
+ except Exception:
44
+ dead.append(q)
45
+ for q in dead:
46
+ subs.discard(q)
47
 
48
+ def _sse_stream(subs) -> StreamingResponse:
49
+ def make() -> AsyncIterator[str]:
50
+ q: Queue = Queue(maxsize=1000)
51
+ subs.add(q)
52
+ try:
53
+ yield f"data: {json.dumps({'ts': time.time(), 'hello': True})}\n\n"
54
+ while True:
55
+ try:
56
+ item = q.get(timeout=30)
57
+ except Empty:
58
+ yield "data: {}\n\n" # keepalive
59
+ continue
60
+ yield f"data: {json.dumps(item)}\n\n"
61
+ finally:
62
+ subs.discard(q)
63
+ return StreamingResponse(make(), media_type="text/event-stream")
64
+
65
+ # ---------- HEALTH ----------
66
  @app.get("/health")
67
  def health():
68
  return {
 
71
  "time": time.time(),
72
  "base_dir": str(BASE_DIR),
73
  "files_dir": str(FILES_DIR),
74
+ "logs_dir": str(LOGS_DIR),
75
  "events_file": str(EVENTS_FILE),
76
  }
77
 
78
+ # ---------- WARMUP ----------
79
+ @app.post("/warmup")
80
+ async def warmup():
81
+ notes = {}
82
+ async with httpx.AsyncClient(timeout=20) as s:
83
+ try:
84
+ r = await s.get(f"{PY_AI}/health")
85
+ notes["python_ai"] = "ok" if r.status_code == 200 else f"status {r.status_code}"
86
+ except Exception:
87
+ notes["python_ai"] = "down"
88
+ try:
89
+ r = await s.get(f"{ACTUAL_TTS}/health")
90
+ notes["tts"] = "ok" if r.status_code == 200 else f"status {r.status_code}"
91
+ except Exception:
92
+ notes["tts"] = "down"
93
+ try:
94
+ r = await s.get(f"{ACTUAL_STT}/health")
95
+ notes["stt"] = "ok" if r.status_code == 200 else f"status {r.status_code}"
96
+ except Exception:
97
+ notes["stt"] = "down"
98
+ return {"ok": True, "notes": notes}
99
+
100
+ # ---------- GENERIC PROCESS (demo) ----------
101
  @app.post("/process")
102
+ async def process(payload: Dict[str, Any], request: Request):
 
103
  evt = {"ts": time.time(), "type": "process", "data": payload}
104
+ _append(EVENTS_FILE, evt)
105
+ _broadcast(_log_subs, evt)
106
  return {"ok": True, "received": payload}
107
 
108
+ # ---------- ERRORS ----------
109
  @app.post("/log_error")
110
+ async def log_error(payload: Dict[str, Any], request: Request):
111
+ evt = {"ts": time.time(), "type": "error", "data": payload}
112
+ _append(ERRORS_FILE, evt)
113
+ _broadcast(_err_subs, evt)
 
114
  return {"ok": True}
115
 
 
 
 
 
 
 
 
 
 
 
 
116
  @app.get("/stream/logs")
117
  def stream_logs():
118
+ return _sse_stream(_log_subs)
119
 
120
  @app.get("/stream/errors")
121
  def stream_errors():
122
+ return _sse_stream(_err_subs)
123
+
124
+ # ---------- TELEMETRY ----------
125
+ @app.post("/telemetry")
126
+ async def ingest_telemetry(payload: Dict[str, Any], request: Request):
127
+ evt = {
128
+ "ts": time.time(),
129
+ "type": "telemetry",
130
+ "client_ip": request.client.host if request and request.client else None,
131
+ "data": payload,
132
+ }
133
+ _append(TELEM_FILE, evt)
134
+ _broadcast(_telem_subs, evt)
135
+ return {"ok": True}
136
+
137
+ @app.get("/stream/telemetry")
138
+ def stream_telemetry():
139
+ return _sse_stream(_telem_subs)
140
+
141
+ # ---------- PYTHON AI PROXY ----------
142
+ @app.post("/code_help")
143
+ async def code_help_proxy(payload: Dict[str, Any]):
144
+ async with httpx.AsyncClient(timeout=60) as s:
145
+ r = await s.post(f"{PY_AI}/code_help", json=payload)
146
+ return JSONResponse(status_code=r.status_code, content=r.json())
147
+
148
+ # ---------- SPEAK (TTS) ----------
149
+ @app.post("/speak")
150
+ async def speak(payload: Dict[str, Any]):
151
+ # payload: { text, voice? }
152
+ async with httpx.AsyncClient(timeout=120) as s:
153
+ r = await s.post(f"{ACTUAL_TTS}/speak", json=payload)
154
+ data = r.json()
155
+ # Always return fully-qualified URL
156
+ if "audio_url" in data:
157
+ url = data["audio_url"]
158
+ if url.startswith("/"):
159
+ data["audio_url"] = f"{ACTUAL_TTS}{url}"
160
+ elif url.startswith("http") is False:
161
+ data["audio_url"] = f"{ACTUAL_TTS}/{url.lstrip('/')}"
162
+ return JSONResponse(status_code=r.status_code, content=data)
163
+
164
+ # ---------- TRANSCRIBE (STT) ----------
165
+ @app.post("/transcribe")
166
+ async def transcribe(file: UploadFile = File(...)):
167
+ raw = await file.read()
168
+ files = {"file": (file.filename, raw, file.content_type or "audio/wav")}
169
+ async with httpx.AsyncClient(timeout=120) as s:
170
+ r = await s.post(f"{ACTUAL_STT}/transcribe", files=files)
171
+ return JSONResponse(status_code=r.status_code, content=r.json())