# Copyright (c) Meta Platforms, Inc. and affiliates. # All rights reserved. # # This source code is licensed under the BSD-style license found in the # LICENSE file in the root directory of this source tree. """ FastAPI application for the Neurocaster Env Environment. This module creates an HTTP server that exposes the NeurocasterEnvironment over HTTP and WebSocket endpoints, compatible with EnvClient. Endpoints: - POST /reset: Reset the environment - POST /step: Execute an action - GET /state: Get current environment state - GET /schema: Get action/observation schemas - WS /ws: WebSocket endpoint for persistent sessions Usage: # Development (with auto-reload): uvicorn server.app:app --reload --host 0.0.0.0 --port 8000 # Production: uvicorn server.app:app --host 0.0.0.0 --port 8000 --workers 4 # Or run directly: python -m server.app """ from __future__ import annotations import csv import json import os import time from collections import deque from pathlib import Path import subprocess import sys import threading from typing import Any, Dict, Generator, List, Optional from fastapi import FastAPI, HTTPException, Query, Request from fastapi.responses import FileResponse, RedirectResponse, StreamingResponse from fastapi.staticfiles import StaticFiles from pydantic import BaseModel, Field try: from openenv.core.env_server.http_server import create_app except Exception as e: # pragma: no cover raise ImportError( "openenv is required for the web interface. Install dependencies with '\n uv sync\n'" ) from e try: from ..models import NeurocasterAction, NeurocasterObservation from .neurocaster_env_environment import NeurocasterEnvironment from .tribe_reward import TribeReward except ImportError: from models import NeurocasterAction, NeurocasterObservation from server.neurocaster_env_environment import NeurocasterEnvironment from server.tribe_reward import TribeReward PROJECT_ROOT = Path(__file__).resolve().parents[1] STATIC_DIR = Path(__file__).resolve().parent / "static" _training_lock = threading.Lock() _training_process: Optional[subprocess.Popen[bytes]] = None _chat_lock = threading.Lock() _chat_cache: Dict[str, Any] = {} class TrainRequest(BaseModel): model_name: str = Field(default="unsloth/Qwen2.5-1.5B-Instruct-bnb-4bit", min_length=1) max_episodes: int = Field(default=50, ge=1) class ChatRequest(BaseModel): prompt: str = Field(min_length=1) max_new_tokens: int = Field(default=2048, ge=32, le=4096) temperature: float = Field(default=0.7, ge=0.0, le=2.0) # Create the stock OpenEnv app, then mount it behind the custom NeuroCaster UI. openenv_app = create_app( NeurocasterEnvironment, NeurocasterAction, NeurocasterObservation, env_name="neurocaster_env", max_concurrent_envs=1, # increase this number to allow more concurrent WebSocket sessions ) app = FastAPI(title="NeuroCaster") app.mount("/openenv", openenv_app) @app.get("/api/telemetry") def telemetry(n: int = Query(default=200, ge=1, le=5000)) -> List[Dict[str, Any]]: """Return the latest real training telemetry rows.""" telemetry_path = _telemetry_path() rows = _read_jsonl_tail(telemetry_path, n) if not rows: rows = _read_reward_csv_tail(_training_output_dir() / "reward_curves.csv", n) return [_normalize_telemetry_row(row, index + 1) for index, row in enumerate(rows)] @app.post("/api/train") def train(request: Request, payload: TrainRequest) -> Dict[str, Any]: """Start real GRPO training as a background subprocess.""" global _training_process, _chat_cache with _training_lock: if _training_process and _training_process.poll() is None: return { "ok": False, "status": "already_running", "pid": _training_process.pid, "telemetry_path": str(_telemetry_path()), } output_dir = _training_output_dir() output_dir.mkdir(parents=True, exist_ok=True) telemetry_path = _telemetry_path() telemetry_path.parent.mkdir(parents=True, exist_ok=True) if telemetry_path.exists(): telemetry_path.unlink() log_path = output_dir / "training_stdout.log" reward_curve_path = output_dir / "reward_curves.csv" if reward_curve_path.exists(): reward_curve_path.unlink() env = os.environ.copy() pythonpath_parts = [str(PROJECT_ROOT.parent), str(PROJECT_ROOT)] if env.get("PYTHONPATH"): pythonpath_parts.append(env["PYTHONPATH"]) env["PYTHONPATH"] = os.pathsep.join(pythonpath_parts) command = [ sys.executable, "-u", # unbuffered: tracebacks flush to training_stdout.log immediately str(PROJECT_ROOT / "train_grpo_colab.py"), "--base-url", _resolve_training_base_url(request), "--model", payload.model_name, "--output-dir", str(output_dir), "--max-steps", str(payload.max_episodes), "--dataset-size", os.getenv("NEUROCASTER_TRAINING_DATASET_SIZE", "32"), "--telemetry-log", str(telemetry_path), ] stdout = log_path.open("wb") try: _training_process = subprocess.Popen( command, cwd=PROJECT_ROOT, env=env, stdout=stdout, stderr=subprocess.STDOUT, ) finally: stdout.close() _chat_cache = {} return { "ok": True, "status": "started", "pid": _training_process.pid if _training_process else None, "model_name": payload.model_name, "max_episodes": payload.max_episodes, "telemetry_path": str(telemetry_path), "output_dir": str(output_dir), } @app.get("/api/train/status") def train_status() -> Dict[str, Any]: """Expose current training process state for UI polling.""" process = _training_process telemetry_path = _telemetry_path() output_dir = _training_output_dir() if process is None: return { "status": "idle", "running": False, "pid": None, "exit_code": None, "telemetry_path": str(telemetry_path), "output_dir": str(output_dir), } exit_code = process.poll() if exit_code is None: return { "status": "running", "running": True, "pid": process.pid, "exit_code": None, "telemetry_path": str(telemetry_path), "output_dir": str(output_dir), } return { "status": "completed" if exit_code == 0 else "failed", "running": False, "pid": process.pid, "exit_code": exit_code, "telemetry_path": str(telemetry_path), "output_dir": str(output_dir), } def _resolve_training_base_url(request: Request) -> str: """ The GRPO child process must call this same FastAPI app over loopback. Using request.base_url alone breaks when it is a public host (e.g. Hugging Face Spaces HTTPS URL), 0.0.0.0, or a hostname that is not routable from the child process. Override with NEUROCASTER_TRAINING_BASE_URL if needed. """ override = (os.getenv("NEUROCASTER_TRAINING_BASE_URL") or "").strip() if override: return override.rstrip("/") u = request.url port = u.port if port is None: try: port = int(os.getenv("PORT", "7860")) except ValueError: port = 7860 return f"http://127.0.0.1:{port}" def _read_text_file_tail(path: Path, max_lines: int) -> str: if not path.is_file(): return "" with path.open(encoding="utf-8", errors="replace") as handle: lines = handle.readlines()[-max_lines:] return "".join(lines) @app.get("/api/training/log") def training_log(lines: int = Query(default=120, ge=1, le=500)) -> Dict[str, Any]: """ Last N lines of training_stdout.log. Disabled unless NEUROCASTER_EXPOSE_TRAINING_LOG=1 (set on Hugging Face Spaces to debug exit code 1 without shell access). """ if os.getenv("NEUROCASTER_EXPOSE_TRAINING_LOG", "").lower() not in ("1", "true", "yes"): raise HTTPException( status_code=404, detail="Set NEUROCASTER_EXPOSE_TRAINING_LOG=1 to enable this endpoint.", ) log_path = _training_output_dir() / "training_stdout.log" text = _read_text_file_tail(log_path, lines) return { "path": str(log_path), "lines": lines, "content": text, } @app.get("/api/training/stream") def training_stream() -> StreamingResponse: """SSE stream for live training stdout lines and process status.""" if os.getenv("NEUROCASTER_EXPOSE_TRAINING_LOG", "").lower() not in ("1", "true", "yes"): raise HTTPException( status_code=404, detail="Set NEUROCASTER_EXPOSE_TRAINING_LOG=1 to enable this endpoint.", ) log_path = _training_output_dir() / "training_stdout.log" def event_stream() -> Generator[str, None, None]: offset = 0 yield _sse_event("status", train_status()) while True: if log_path.exists(): size = log_path.stat().st_size if size < offset: offset = 0 with log_path.open(encoding="utf-8", errors="replace") as handle: handle.seek(offset) for line in handle: line = line.rstrip("\n") if line: yield _sse_event("log", {"line": line}) offset = handle.tell() status_snapshot = train_status() yield _sse_event("status", status_snapshot) if status_snapshot.get("status") in {"completed", "failed"}: yield _sse_event("end", status_snapshot) break yield _sse_event("heartbeat", {"ts": time.time()}) time.sleep(1.0) return StreamingResponse( event_stream(), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no", }, ) @app.post("/api/chat") def chat(payload: ChatRequest) -> Dict[str, Any]: """Generate markdown with the trained checkpoint and score it with TRIBE-v2.""" process = _training_process if process and process.poll() is None: raise HTTPException(status_code=409, detail="Training is still running; wait for a checkpoint before chatting.") model_dir = _latest_model_dir(_training_output_dir()) if not model_dir: raise HTTPException(status_code=404, detail="No trained model checkpoint found. Start and finish training first.") markdown = _generate_markdown( model_dir=model_dir, prompt=payload.prompt, max_new_tokens=payload.max_new_tokens, temperature=payload.temperature, ) audio_anchor = _shared_root() / "audio" / "voices" / "reference.wav" metrics = TribeReward().score(markdown, str(audio_anchor)) normalized_metrics = { **metrics, "STS_Z_Score": float(metrics.get("Z_STS", 0.0)), "TPJ_Z_Score": float(metrics.get("Z_TPJ", 0.0)), "Broca_Z_Score": float(metrics.get("Z_Broca", 0.0)), } return { "markdown": markdown, "biological_reward": float(metrics.get("biological_reward", 0.0)), "metrics": normalized_metrics, "model_dir": str(model_dir), } def _shared_root() -> Path: return Path(os.getenv("SHARED_DATA_ROOT", str(_data_root() / "shared_data"))) def _training_output_dir() -> Path: return Path(os.getenv("NEUROCASTER_TRAINING_OUTPUT_DIR", str(_data_root() / "neurocaster-grpo"))) def _telemetry_path() -> Path: return Path(os.getenv("NEUROCASTER_TELEMETRY_PATH", str(_shared_root() / "training_telemetry.jsonl"))) def _data_root() -> Path: configured = (os.getenv("NEUROCASTER_DATA_ROOT") or "").strip() if configured: return Path(configured) if Path("/data").exists(): return Path("/data") return PROJECT_ROOT def _sse_event(event: str, payload: Dict[str, Any]) -> str: return f"event: {event}\ndata: {json.dumps(payload, ensure_ascii=False)}\n\n" def _read_jsonl_tail(path: Path, limit: int) -> List[Dict[str, Any]]: if not path.exists(): return [] rows: deque[Dict[str, Any]] = deque(maxlen=limit) with path.open() as handle: for line in handle: line = line.strip() if not line: continue try: row = json.loads(line) except json.JSONDecodeError: continue if isinstance(row, dict): rows.append(row) return list(rows) def _read_reward_csv_tail(path: Path, limit: int) -> List[Dict[str, Any]]: if not path.exists(): return [] rows: deque[Dict[str, Any]] = deque(maxlen=limit) with path.open(newline="") as handle: for row in csv.DictReader(handle): rows.append(row) return list(rows) def _normalize_telemetry_row(row: Dict[str, Any], fallback_episode: int) -> Dict[str, Any]: episode = int(_number(row.get("episode"), fallback_episode)) reward = _number(row.get("reward"), _number(row.get("total_reward"), 0.0)) total_reward = _number(row.get("total_reward"), _number(row.get("cumulative_reward"), reward)) sts = _number(row.get("STS_Z_Score"), _number(row.get("Z_STS"), 0.0)) tpj = _number(row.get("TPJ_Z_Score"), _number(row.get("Z_TPJ"), 0.0)) broca = _number(row.get("Broca_Z_Score"), _number(row.get("Z_Broca"), 0.0)) biological_reward = _number(row.get("biological_reward"), (sts + tpj) - broca) return { **row, "episode": episode, "reward": reward, "total_reward": total_reward, "cumulative_reward": total_reward, "Z_STS": sts, "Z_TPJ": tpj, "Z_Broca": broca, "STS_Z_Score": sts, "TPJ_Z_Score": tpj, "Broca_Z_Score": broca, "biological_reward": biological_reward, "cognitive_load": _number(row.get("cognitive_load"), broca), "brain_activity": { "sts": sts, "tpj": tpj, "broca": broca, }, } def _number(value: Any, default: float) -> float: try: if value in (None, ""): return float(default) return float(value) except (TypeError, ValueError): return float(default) def _latest_model_dir(output_dir: Path) -> Optional[Path]: if _has_model_files(output_dir): return output_dir checkpoints = [path for path in output_dir.glob("checkpoint-*") if path.is_dir() and _has_model_files(path)] if not checkpoints: return None return max(checkpoints, key=lambda path: path.stat().st_mtime) def _has_model_files(path: Path) -> bool: return any((path / name).exists() for name in ("adapter_config.json", "config.json", "tokenizer_config.json")) def _generate_markdown(model_dir: Path, prompt: str, max_new_tokens: int, temperature: float) -> str: with _chat_lock: cache_key = str(model_dir.resolve()) if _chat_cache.get("key") != cache_key: _chat_cache.clear() _chat_cache.update({"key": cache_key, **_load_generation_model(model_dir)}) model = _chat_cache["model"] tokenizer = _chat_cache["tokenizer"] full_prompt = ( "You are NeuroCaster. Return only polished Marp/Slidev markdown with concise on-screen " "content, speaker notes, the static audio anchor, and at least one Mermaid diagram.\n\n" f"User request:\n{prompt}" ) if getattr(tokenizer, "chat_template", None): messages = [ {"role": "system", "content": "You generate NeuroCaster explainer decks as markdown."}, {"role": "user", "content": full_prompt}, ] rendered_prompt = tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True) else: rendered_prompt = full_prompt inputs = tokenizer(rendered_prompt, return_tensors="pt") device = getattr(model, "device", None) if device is not None: inputs = {key: value.to(device) for key, value in inputs.items()} import torch with torch.inference_mode(): output_ids = model.generate( **inputs, max_new_tokens=max_new_tokens, do_sample=temperature > 0, temperature=max(temperature, 1e-5), pad_token_id=getattr(tokenizer, "eos_token_id", None), ) prompt_length = inputs["input_ids"].shape[-1] return tokenizer.decode(output_ids[0][prompt_length:], skip_special_tokens=True).strip() def _load_generation_model(model_dir: Path) -> Dict[str, Any]: errors: List[str] = [] try: from unsloth import FastLanguageModel model, tokenizer = FastLanguageModel.from_pretrained( model_name=str(model_dir), max_seq_length=int(os.getenv("NEUROCASTER_CHAT_MAX_SEQ_LENGTH", "4096")), load_in_4bit=True, ) FastLanguageModel.for_inference(model) return {"model": model, "tokenizer": tokenizer} except Exception as exc: # pragma: no cover - depends on optional GPU stack errors.append(f"unsloth: {exc}") try: from peft import AutoPeftModelForCausalLM from transformers import AutoTokenizer tokenizer = AutoTokenizer.from_pretrained(str(model_dir)) model = AutoPeftModelForCausalLM.from_pretrained(str(model_dir), device_map="auto") model.eval() return {"model": model, "tokenizer": tokenizer} except Exception as exc: # pragma: no cover - depends on trained artifact shape errors.append(f"peft: {exc}") try: from transformers import AutoModelForCausalLM, AutoTokenizer tokenizer = AutoTokenizer.from_pretrained(str(model_dir)) model = AutoModelForCausalLM.from_pretrained(str(model_dir), device_map="auto") model.eval() return {"model": model, "tokenizer": tokenizer} except Exception as exc: # pragma: no cover - depends on trained artifact shape errors.append(f"transformers: {exc}") raise HTTPException(status_code=500, detail="Could not load trained model: " + " | ".join(errors)) if STATIC_DIR.exists(): @app.get("/", include_in_schema=False) def _root() -> RedirectResponse: return RedirectResponse(url="/web") @app.get("/web", include_in_schema=False) def _web_index() -> FileResponse: return FileResponse(str(STATIC_DIR / "index.html")) @app.get("/web/index.html", include_in_schema=False) def _web_index_html() -> FileResponse: return FileResponse(str(STATIC_DIR / "index.html")) @app.get("/web/assets/style.css", include_in_schema=False) def _static_css() -> FileResponse: return FileResponse(str(STATIC_DIR / "style.css")) @app.get("/web/assets/app.js", include_in_schema=False) def _static_js() -> FileResponse: return FileResponse(str(STATIC_DIR / "app.js")) app.mount("/web/assets", StaticFiles(directory=STATIC_DIR, html=True), name="web_assets") # Preserve stock OpenEnv paths (/reset, /step, /state, /schema, /ws, /mcp) after # the custom UI and /api routes have had a chance to match. app.mount("/", openenv_app) def main(host: str = "0.0.0.0", port: int = 8000): """ Entry point for direct execution via uv run or python -m. This function enables running the server without Docker: uv run --project . server uv run --project . server --port 8001 python -m neurocaster_env.server.app Args: host: Host address to bind to (default: "0.0.0.0") port: Port number to listen on (default: 8000) For production deployments, consider using uvicorn directly with multiple workers: uvicorn neurocaster_env.server.app:app --workers 4 """ import uvicorn uvicorn.run(app, host=host, port=port) if __name__ == "__main__": import argparse parser = argparse.ArgumentParser() parser.add_argument("--port", type=int, default=8000) args = parser.parse_args() main(port=args.port)