Brain / app.py
Percy3822's picture
Update app.py
26cb7ed verified
raw
history blame
1.91 kB
import os, time, json, asyncio
from typing import Any, Dict
from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Request
from fastapi.responses import JSONResponse
from fastapi.middleware.cors import CORSMiddleware
BASE_DIR = "/tmp/brain_app"
FILES_DIR = os.path.join(BASE_DIR, "files")
LOGS_DIR = os.path.join(FILES_DIR, "logs")
for d in (BASE_DIR, FILES_DIR, LOGS_DIR):
os.makedirs(d, exist_ok=True)
def log_event(kind: str, data: Dict[str, Any]):
rec = {"ts": time.time(), "kind": kind, "data": data}
with open(os.path.join(LOGS_DIR, "events.jsonl"), "a", encoding="utf-8") as f:
f.write(json.dumps(rec, ensure_ascii=False) + "\n")
app = FastAPI(title="Brain (Skeleton)")
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], allow_credentials=True,
allow_methods=[""], allow_headers=[""],
)
@app.get("/health")
def health():
return {
"ok": True,
"service": "brain-skeleton",
"time": time.time(),
"files_dir": FILES_DIR,
"logs_dir": LOGS_DIR,
}
@app.post("/echo")
async def echo(request: Request):
payload = await request.json()
log_event("echo_http_in", payload)
out = {"ok": True, "ts": time.time(), "echo": payload}
log_event("echo_http_out", out)
return JSONResponse(out)
@app.websocket("/ws/echo")
async def ws_echo(ws: WebSocket):
await ws.accept()
await ws.send_json({"event": "hello", "msg": "echo socket ready"})
try:
while True:
text = await ws.receive_text()
log_event("echo_ws_in", {"text": text})
await ws.send_json({"event": "echo", "text": text})
except WebSocketDisconnect:
pass
except Exception as e:
try:
await ws.send_json({"event": "error", "detail": str(e)})
except:
pass
finally:
try:
await ws.close()
except:
pass