NishithP2004's picture
Upload folder using huggingface_hub
cfe6321 verified
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.
"""
FastAPI application for the Neurocaster Env Environment.
This module creates an HTTP server that exposes the NeurocasterEnvironment
over HTTP and WebSocket endpoints, compatible with EnvClient.
Endpoints:
- POST /reset: Reset the environment
- POST /step: Execute an action
- GET /state: Get current environment state
- GET /schema: Get action/observation schemas
- WS /ws: WebSocket endpoint for persistent sessions
Usage:
# Development (with auto-reload):
uvicorn server.app:app --reload --host 0.0.0.0 --port 8000
# Production:
uvicorn server.app:app --host 0.0.0.0 --port 8000 --workers 4
# Or run directly:
python -m server.app
"""
from __future__ import annotations
import csv
import json
import os
import time
from collections import deque
from pathlib import Path
import subprocess
import sys
import threading
from typing import Any, Dict, Generator, List, Optional
from fastapi import FastAPI, HTTPException, Query, Request
from fastapi.responses import FileResponse, RedirectResponse, StreamingResponse
from fastapi.staticfiles import StaticFiles
from pydantic import BaseModel, Field
try:
from openenv.core.env_server.http_server import create_app
except Exception as e: # pragma: no cover
raise ImportError(
"openenv is required for the web interface. Install dependencies with '\n uv sync\n'"
) from e
try:
from ..models import NeurocasterAction, NeurocasterObservation
from .neurocaster_env_environment import NeurocasterEnvironment
from .tribe_reward import TribeReward
except ImportError:
from models import NeurocasterAction, NeurocasterObservation
from server.neurocaster_env_environment import NeurocasterEnvironment
from server.tribe_reward import TribeReward
PROJECT_ROOT = Path(__file__).resolve().parents[1]
STATIC_DIR = Path(__file__).resolve().parent / "static"
_training_lock = threading.Lock()
_training_process: Optional[subprocess.Popen[bytes]] = None
_chat_lock = threading.Lock()
_chat_cache: Dict[str, Any] = {}
class TrainRequest(BaseModel):
model_name: str = Field(default="unsloth/Qwen2.5-1.5B-Instruct-bnb-4bit", min_length=1)
max_episodes: int = Field(default=50, ge=1)
class ChatRequest(BaseModel):
prompt: str = Field(min_length=1)
max_new_tokens: int = Field(default=2048, ge=32, le=4096)
temperature: float = Field(default=0.7, ge=0.0, le=2.0)
# Create the stock OpenEnv app, then mount it behind the custom NeuroCaster UI.
openenv_app = create_app(
NeurocasterEnvironment,
NeurocasterAction,
NeurocasterObservation,
env_name="neurocaster_env",
max_concurrent_envs=1, # increase this number to allow more concurrent WebSocket sessions
)
app = FastAPI(title="NeuroCaster")
app.mount("/openenv", openenv_app)
@app.get("/api/telemetry")
def telemetry(n: int = Query(default=200, ge=1, le=5000)) -> List[Dict[str, Any]]:
"""Return the latest real training telemetry rows."""
telemetry_path = _telemetry_path()
rows = _read_jsonl_tail(telemetry_path, n)
if not rows:
rows = _read_reward_csv_tail(_training_output_dir() / "reward_curves.csv", n)
return [_normalize_telemetry_row(row, index + 1) for index, row in enumerate(rows)]
@app.post("/api/train")
def train(request: Request, payload: TrainRequest) -> Dict[str, Any]:
"""Start real GRPO training as a background subprocess."""
global _training_process, _chat_cache
with _training_lock:
if _training_process and _training_process.poll() is None:
return {
"ok": False,
"status": "already_running",
"pid": _training_process.pid,
"telemetry_path": str(_telemetry_path()),
}
output_dir = _training_output_dir()
output_dir.mkdir(parents=True, exist_ok=True)
telemetry_path = _telemetry_path()
telemetry_path.parent.mkdir(parents=True, exist_ok=True)
if telemetry_path.exists():
telemetry_path.unlink()
log_path = output_dir / "training_stdout.log"
reward_curve_path = output_dir / "reward_curves.csv"
if reward_curve_path.exists():
reward_curve_path.unlink()
env = os.environ.copy()
pythonpath_parts = [str(PROJECT_ROOT.parent), str(PROJECT_ROOT)]
if env.get("PYTHONPATH"):
pythonpath_parts.append(env["PYTHONPATH"])
env["PYTHONPATH"] = os.pathsep.join(pythonpath_parts)
command = [
sys.executable,
"-u", # unbuffered: tracebacks flush to training_stdout.log immediately
str(PROJECT_ROOT / "train_grpo_colab.py"),
"--base-url",
_resolve_training_base_url(request),
"--model",
payload.model_name,
"--output-dir",
str(output_dir),
"--max-steps",
str(payload.max_episodes),
"--dataset-size",
os.getenv("NEUROCASTER_TRAINING_DATASET_SIZE", "32"),
"--telemetry-log",
str(telemetry_path),
]
stdout = log_path.open("wb")
try:
_training_process = subprocess.Popen(
command,
cwd=PROJECT_ROOT,
env=env,
stdout=stdout,
stderr=subprocess.STDOUT,
)
finally:
stdout.close()
_chat_cache = {}
return {
"ok": True,
"status": "started",
"pid": _training_process.pid if _training_process else None,
"model_name": payload.model_name,
"max_episodes": payload.max_episodes,
"telemetry_path": str(telemetry_path),
"output_dir": str(output_dir),
}
@app.get("/api/train/status")
def train_status() -> Dict[str, Any]:
"""Expose current training process state for UI polling."""
process = _training_process
telemetry_path = _telemetry_path()
output_dir = _training_output_dir()
if process is None:
return {
"status": "idle",
"running": False,
"pid": None,
"exit_code": None,
"telemetry_path": str(telemetry_path),
"output_dir": str(output_dir),
}
exit_code = process.poll()
if exit_code is None:
return {
"status": "running",
"running": True,
"pid": process.pid,
"exit_code": None,
"telemetry_path": str(telemetry_path),
"output_dir": str(output_dir),
}
return {
"status": "completed" if exit_code == 0 else "failed",
"running": False,
"pid": process.pid,
"exit_code": exit_code,
"telemetry_path": str(telemetry_path),
"output_dir": str(output_dir),
}
def _resolve_training_base_url(request: Request) -> str:
"""
The GRPO child process must call this same FastAPI app over loopback. Using
request.base_url alone breaks when it is a public host (e.g. Hugging Face
Spaces HTTPS URL), 0.0.0.0, or a hostname that is not routable from the
child process. Override with NEUROCASTER_TRAINING_BASE_URL if needed.
"""
override = (os.getenv("NEUROCASTER_TRAINING_BASE_URL") or "").strip()
if override:
return override.rstrip("/")
u = request.url
port = u.port
if port is None:
try:
port = int(os.getenv("PORT", "7860"))
except ValueError:
port = 7860
return f"http://127.0.0.1:{port}"
def _read_text_file_tail(path: Path, max_lines: int) -> str:
if not path.is_file():
return ""
with path.open(encoding="utf-8", errors="replace") as handle:
lines = handle.readlines()[-max_lines:]
return "".join(lines)
@app.get("/api/training/log")
def training_log(lines: int = Query(default=120, ge=1, le=500)) -> Dict[str, Any]:
"""
Last N lines of training_stdout.log. Disabled unless NEUROCASTER_EXPOSE_TRAINING_LOG=1
(set on Hugging Face Spaces to debug exit code 1 without shell access).
"""
if os.getenv("NEUROCASTER_EXPOSE_TRAINING_LOG", "").lower() not in ("1", "true", "yes"):
raise HTTPException(
status_code=404,
detail="Set NEUROCASTER_EXPOSE_TRAINING_LOG=1 to enable this endpoint.",
)
log_path = _training_output_dir() / "training_stdout.log"
text = _read_text_file_tail(log_path, lines)
return {
"path": str(log_path),
"lines": lines,
"content": text,
}
@app.get("/api/training/stream")
def training_stream() -> StreamingResponse:
"""SSE stream for live training stdout lines and process status."""
if os.getenv("NEUROCASTER_EXPOSE_TRAINING_LOG", "").lower() not in ("1", "true", "yes"):
raise HTTPException(
status_code=404,
detail="Set NEUROCASTER_EXPOSE_TRAINING_LOG=1 to enable this endpoint.",
)
log_path = _training_output_dir() / "training_stdout.log"
def event_stream() -> Generator[str, None, None]:
offset = 0
yield _sse_event("status", train_status())
while True:
if log_path.exists():
size = log_path.stat().st_size
if size < offset:
offset = 0
with log_path.open(encoding="utf-8", errors="replace") as handle:
handle.seek(offset)
for line in handle:
line = line.rstrip("\n")
if line:
yield _sse_event("log", {"line": line})
offset = handle.tell()
status_snapshot = train_status()
yield _sse_event("status", status_snapshot)
if status_snapshot.get("status") in {"completed", "failed"}:
yield _sse_event("end", status_snapshot)
break
yield _sse_event("heartbeat", {"ts": time.time()})
time.sleep(1.0)
return StreamingResponse(
event_stream(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no",
},
)
@app.post("/api/chat")
def chat(payload: ChatRequest) -> Dict[str, Any]:
"""Generate markdown with the trained checkpoint and score it with TRIBE-v2."""
process = _training_process
if process and process.poll() is None:
raise HTTPException(status_code=409, detail="Training is still running; wait for a checkpoint before chatting.")
model_dir = _latest_model_dir(_training_output_dir())
if not model_dir:
raise HTTPException(status_code=404, detail="No trained model checkpoint found. Start and finish training first.")
markdown = _generate_markdown(
model_dir=model_dir,
prompt=payload.prompt,
max_new_tokens=payload.max_new_tokens,
temperature=payload.temperature,
)
audio_anchor = _shared_root() / "audio" / "voices" / "reference.wav"
metrics = TribeReward().score(markdown, str(audio_anchor))
normalized_metrics = {
**metrics,
"STS_Z_Score": float(metrics.get("Z_STS", 0.0)),
"TPJ_Z_Score": float(metrics.get("Z_TPJ", 0.0)),
"Broca_Z_Score": float(metrics.get("Z_Broca", 0.0)),
}
return {
"markdown": markdown,
"biological_reward": float(metrics.get("biological_reward", 0.0)),
"metrics": normalized_metrics,
"model_dir": str(model_dir),
}
def _shared_root() -> Path:
return Path(os.getenv("SHARED_DATA_ROOT", str(_data_root() / "shared_data")))
def _training_output_dir() -> Path:
return Path(os.getenv("NEUROCASTER_TRAINING_OUTPUT_DIR", str(_data_root() / "neurocaster-grpo")))
def _telemetry_path() -> Path:
return Path(os.getenv("NEUROCASTER_TELEMETRY_PATH", str(_shared_root() / "training_telemetry.jsonl")))
def _data_root() -> Path:
configured = (os.getenv("NEUROCASTER_DATA_ROOT") or "").strip()
if configured:
return Path(configured)
if Path("/data").exists():
return Path("/data")
return PROJECT_ROOT
def _sse_event(event: str, payload: Dict[str, Any]) -> str:
return f"event: {event}\ndata: {json.dumps(payload, ensure_ascii=False)}\n\n"
def _read_jsonl_tail(path: Path, limit: int) -> List[Dict[str, Any]]:
if not path.exists():
return []
rows: deque[Dict[str, Any]] = deque(maxlen=limit)
with path.open() as handle:
for line in handle:
line = line.strip()
if not line:
continue
try:
row = json.loads(line)
except json.JSONDecodeError:
continue
if isinstance(row, dict):
rows.append(row)
return list(rows)
def _read_reward_csv_tail(path: Path, limit: int) -> List[Dict[str, Any]]:
if not path.exists():
return []
rows: deque[Dict[str, Any]] = deque(maxlen=limit)
with path.open(newline="") as handle:
for row in csv.DictReader(handle):
rows.append(row)
return list(rows)
def _normalize_telemetry_row(row: Dict[str, Any], fallback_episode: int) -> Dict[str, Any]:
episode = int(_number(row.get("episode"), fallback_episode))
reward = _number(row.get("reward"), _number(row.get("total_reward"), 0.0))
total_reward = _number(row.get("total_reward"), _number(row.get("cumulative_reward"), reward))
sts = _number(row.get("STS_Z_Score"), _number(row.get("Z_STS"), 0.0))
tpj = _number(row.get("TPJ_Z_Score"), _number(row.get("Z_TPJ"), 0.0))
broca = _number(row.get("Broca_Z_Score"), _number(row.get("Z_Broca"), 0.0))
biological_reward = _number(row.get("biological_reward"), (sts + tpj) - broca)
return {
**row,
"episode": episode,
"reward": reward,
"total_reward": total_reward,
"cumulative_reward": total_reward,
"Z_STS": sts,
"Z_TPJ": tpj,
"Z_Broca": broca,
"STS_Z_Score": sts,
"TPJ_Z_Score": tpj,
"Broca_Z_Score": broca,
"biological_reward": biological_reward,
"cognitive_load": _number(row.get("cognitive_load"), broca),
"brain_activity": {
"sts": sts,
"tpj": tpj,
"broca": broca,
},
}
def _number(value: Any, default: float) -> float:
try:
if value in (None, ""):
return float(default)
return float(value)
except (TypeError, ValueError):
return float(default)
def _latest_model_dir(output_dir: Path) -> Optional[Path]:
if _has_model_files(output_dir):
return output_dir
checkpoints = [path for path in output_dir.glob("checkpoint-*") if path.is_dir() and _has_model_files(path)]
if not checkpoints:
return None
return max(checkpoints, key=lambda path: path.stat().st_mtime)
def _has_model_files(path: Path) -> bool:
return any((path / name).exists() for name in ("adapter_config.json", "config.json", "tokenizer_config.json"))
def _generate_markdown(model_dir: Path, prompt: str, max_new_tokens: int, temperature: float) -> str:
with _chat_lock:
cache_key = str(model_dir.resolve())
if _chat_cache.get("key") != cache_key:
_chat_cache.clear()
_chat_cache.update({"key": cache_key, **_load_generation_model(model_dir)})
model = _chat_cache["model"]
tokenizer = _chat_cache["tokenizer"]
full_prompt = (
"You are NeuroCaster. Return only polished Marp/Slidev markdown with concise on-screen "
"content, <!-- narration --> speaker notes, the static audio anchor, and at least one Mermaid diagram.\n\n"
f"User request:\n{prompt}"
)
if getattr(tokenizer, "chat_template", None):
messages = [
{"role": "system", "content": "You generate NeuroCaster explainer decks as markdown."},
{"role": "user", "content": full_prompt},
]
rendered_prompt = tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
else:
rendered_prompt = full_prompt
inputs = tokenizer(rendered_prompt, return_tensors="pt")
device = getattr(model, "device", None)
if device is not None:
inputs = {key: value.to(device) for key, value in inputs.items()}
import torch
with torch.inference_mode():
output_ids = model.generate(
**inputs,
max_new_tokens=max_new_tokens,
do_sample=temperature > 0,
temperature=max(temperature, 1e-5),
pad_token_id=getattr(tokenizer, "eos_token_id", None),
)
prompt_length = inputs["input_ids"].shape[-1]
return tokenizer.decode(output_ids[0][prompt_length:], skip_special_tokens=True).strip()
def _load_generation_model(model_dir: Path) -> Dict[str, Any]:
errors: List[str] = []
try:
from unsloth import FastLanguageModel
model, tokenizer = FastLanguageModel.from_pretrained(
model_name=str(model_dir),
max_seq_length=int(os.getenv("NEUROCASTER_CHAT_MAX_SEQ_LENGTH", "4096")),
load_in_4bit=True,
)
FastLanguageModel.for_inference(model)
return {"model": model, "tokenizer": tokenizer}
except Exception as exc: # pragma: no cover - depends on optional GPU stack
errors.append(f"unsloth: {exc}")
try:
from peft import AutoPeftModelForCausalLM
from transformers import AutoTokenizer
tokenizer = AutoTokenizer.from_pretrained(str(model_dir))
model = AutoPeftModelForCausalLM.from_pretrained(str(model_dir), device_map="auto")
model.eval()
return {"model": model, "tokenizer": tokenizer}
except Exception as exc: # pragma: no cover - depends on trained artifact shape
errors.append(f"peft: {exc}")
try:
from transformers import AutoModelForCausalLM, AutoTokenizer
tokenizer = AutoTokenizer.from_pretrained(str(model_dir))
model = AutoModelForCausalLM.from_pretrained(str(model_dir), device_map="auto")
model.eval()
return {"model": model, "tokenizer": tokenizer}
except Exception as exc: # pragma: no cover - depends on trained artifact shape
errors.append(f"transformers: {exc}")
raise HTTPException(status_code=500, detail="Could not load trained model: " + " | ".join(errors))
if STATIC_DIR.exists():
@app.get("/", include_in_schema=False)
def _root() -> RedirectResponse:
return RedirectResponse(url="/web")
@app.get("/web", include_in_schema=False)
def _web_index() -> FileResponse:
return FileResponse(str(STATIC_DIR / "index.html"))
@app.get("/web/index.html", include_in_schema=False)
def _web_index_html() -> FileResponse:
return FileResponse(str(STATIC_DIR / "index.html"))
@app.get("/web/assets/style.css", include_in_schema=False)
def _static_css() -> FileResponse:
return FileResponse(str(STATIC_DIR / "style.css"))
@app.get("/web/assets/app.js", include_in_schema=False)
def _static_js() -> FileResponse:
return FileResponse(str(STATIC_DIR / "app.js"))
app.mount("/web/assets", StaticFiles(directory=STATIC_DIR, html=True), name="web_assets")
# Preserve stock OpenEnv paths (/reset, /step, /state, /schema, /ws, /mcp) after
# the custom UI and /api routes have had a chance to match.
app.mount("/", openenv_app)
def main(host: str = "0.0.0.0", port: int = 8000):
"""
Entry point for direct execution via uv run or python -m.
This function enables running the server without Docker:
uv run --project . server
uv run --project . server --port 8001
python -m neurocaster_env.server.app
Args:
host: Host address to bind to (default: "0.0.0.0")
port: Port number to listen on (default: 8000)
For production deployments, consider using uvicorn directly with
multiple workers:
uvicorn neurocaster_env.server.app:app --workers 4
"""
import uvicorn
uvicorn.run(app, host=host, port=port)
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser()
parser.add_argument("--port", type=int, default=8000)
args = parser.parse_args()
main(port=args.port)