| |
| |
| |
| |
| |
|
|
| """ |
| FastAPI application for the Neurocaster Env Environment. |
| |
| This module creates an HTTP server that exposes the NeurocasterEnvironment |
| over HTTP and WebSocket endpoints, compatible with EnvClient. |
| |
| Endpoints: |
| - POST /reset: Reset the environment |
| - POST /step: Execute an action |
| - GET /state: Get current environment state |
| - GET /schema: Get action/observation schemas |
| - WS /ws: WebSocket endpoint for persistent sessions |
| |
| Usage: |
| # Development (with auto-reload): |
| uvicorn server.app:app --reload --host 0.0.0.0 --port 8000 |
| |
| # Production: |
| uvicorn server.app:app --host 0.0.0.0 --port 8000 --workers 4 |
| |
| # Or run directly: |
| python -m server.app |
| """ |
|
|
| from __future__ import annotations |
|
|
| import csv |
| import json |
| import os |
| import time |
| from collections import deque |
| from pathlib import Path |
| import subprocess |
| import sys |
| import threading |
| from typing import Any, Dict, Generator, List, Optional |
|
|
| from fastapi import FastAPI, HTTPException, Query, Request |
| from fastapi.responses import FileResponse, RedirectResponse, StreamingResponse |
| from fastapi.staticfiles import StaticFiles |
| from pydantic import BaseModel, Field |
|
|
| try: |
| from openenv.core.env_server.http_server import create_app |
| except Exception as e: |
| raise ImportError( |
| "openenv is required for the web interface. Install dependencies with '\n uv sync\n'" |
| ) from e |
|
|
| try: |
| from ..models import NeurocasterAction, NeurocasterObservation |
| from .neurocaster_env_environment import NeurocasterEnvironment |
| from .tribe_reward import TribeReward |
| except ImportError: |
| from models import NeurocasterAction, NeurocasterObservation |
| from server.neurocaster_env_environment import NeurocasterEnvironment |
| from server.tribe_reward import TribeReward |
|
|
| PROJECT_ROOT = Path(__file__).resolve().parents[1] |
| STATIC_DIR = Path(__file__).resolve().parent / "static" |
|
|
| _training_lock = threading.Lock() |
| _training_process: Optional[subprocess.Popen[bytes]] = None |
| _chat_lock = threading.Lock() |
| _chat_cache: Dict[str, Any] = {} |
|
|
|
|
| class TrainRequest(BaseModel): |
| model_name: str = Field(default="unsloth/Qwen2.5-1.5B-Instruct-bnb-4bit", min_length=1) |
| max_episodes: int = Field(default=50, ge=1) |
|
|
|
|
| class ChatRequest(BaseModel): |
| prompt: str = Field(min_length=1) |
| max_new_tokens: int = Field(default=2048, ge=32, le=4096) |
| temperature: float = Field(default=0.7, ge=0.0, le=2.0) |
|
|
|
|
| |
| openenv_app = create_app( |
| NeurocasterEnvironment, |
| NeurocasterAction, |
| NeurocasterObservation, |
| env_name="neurocaster_env", |
| max_concurrent_envs=1, |
| ) |
|
|
| app = FastAPI(title="NeuroCaster") |
| app.mount("/openenv", openenv_app) |
|
|
|
|
| @app.get("/api/telemetry") |
| def telemetry(n: int = Query(default=200, ge=1, le=5000)) -> List[Dict[str, Any]]: |
| """Return the latest real training telemetry rows.""" |
| telemetry_path = _telemetry_path() |
| rows = _read_jsonl_tail(telemetry_path, n) |
| if not rows: |
| rows = _read_reward_csv_tail(_training_output_dir() / "reward_curves.csv", n) |
| return [_normalize_telemetry_row(row, index + 1) for index, row in enumerate(rows)] |
|
|
|
|
| @app.post("/api/train") |
| def train(request: Request, payload: TrainRequest) -> Dict[str, Any]: |
| """Start real GRPO training as a background subprocess.""" |
| global _training_process, _chat_cache |
|
|
| with _training_lock: |
| if _training_process and _training_process.poll() is None: |
| return { |
| "ok": False, |
| "status": "already_running", |
| "pid": _training_process.pid, |
| "telemetry_path": str(_telemetry_path()), |
| } |
|
|
| output_dir = _training_output_dir() |
| output_dir.mkdir(parents=True, exist_ok=True) |
| telemetry_path = _telemetry_path() |
| telemetry_path.parent.mkdir(parents=True, exist_ok=True) |
| if telemetry_path.exists(): |
| telemetry_path.unlink() |
|
|
| log_path = output_dir / "training_stdout.log" |
| reward_curve_path = output_dir / "reward_curves.csv" |
| if reward_curve_path.exists(): |
| reward_curve_path.unlink() |
| env = os.environ.copy() |
| pythonpath_parts = [str(PROJECT_ROOT.parent), str(PROJECT_ROOT)] |
| if env.get("PYTHONPATH"): |
| pythonpath_parts.append(env["PYTHONPATH"]) |
| env["PYTHONPATH"] = os.pathsep.join(pythonpath_parts) |
|
|
| command = [ |
| sys.executable, |
| "-u", |
| str(PROJECT_ROOT / "train_grpo_colab.py"), |
| "--base-url", |
| _resolve_training_base_url(request), |
| "--model", |
| payload.model_name, |
| "--output-dir", |
| str(output_dir), |
| "--max-steps", |
| str(payload.max_episodes), |
| "--dataset-size", |
| os.getenv("NEUROCASTER_TRAINING_DATASET_SIZE", "32"), |
| "--telemetry-log", |
| str(telemetry_path), |
| ] |
|
|
| stdout = log_path.open("wb") |
| try: |
| _training_process = subprocess.Popen( |
| command, |
| cwd=PROJECT_ROOT, |
| env=env, |
| stdout=stdout, |
| stderr=subprocess.STDOUT, |
| ) |
| finally: |
| stdout.close() |
| _chat_cache = {} |
|
|
| return { |
| "ok": True, |
| "status": "started", |
| "pid": _training_process.pid if _training_process else None, |
| "model_name": payload.model_name, |
| "max_episodes": payload.max_episodes, |
| "telemetry_path": str(telemetry_path), |
| "output_dir": str(output_dir), |
| } |
|
|
|
|
| @app.get("/api/train/status") |
| def train_status() -> Dict[str, Any]: |
| """Expose current training process state for UI polling.""" |
| process = _training_process |
| telemetry_path = _telemetry_path() |
| output_dir = _training_output_dir() |
| if process is None: |
| return { |
| "status": "idle", |
| "running": False, |
| "pid": None, |
| "exit_code": None, |
| "telemetry_path": str(telemetry_path), |
| "output_dir": str(output_dir), |
| } |
|
|
| exit_code = process.poll() |
| if exit_code is None: |
| return { |
| "status": "running", |
| "running": True, |
| "pid": process.pid, |
| "exit_code": None, |
| "telemetry_path": str(telemetry_path), |
| "output_dir": str(output_dir), |
| } |
|
|
| return { |
| "status": "completed" if exit_code == 0 else "failed", |
| "running": False, |
| "pid": process.pid, |
| "exit_code": exit_code, |
| "telemetry_path": str(telemetry_path), |
| "output_dir": str(output_dir), |
| } |
|
|
|
|
| def _resolve_training_base_url(request: Request) -> str: |
| """ |
| The GRPO child process must call this same FastAPI app over loopback. Using |
| request.base_url alone breaks when it is a public host (e.g. Hugging Face |
| Spaces HTTPS URL), 0.0.0.0, or a hostname that is not routable from the |
| child process. Override with NEUROCASTER_TRAINING_BASE_URL if needed. |
| """ |
| override = (os.getenv("NEUROCASTER_TRAINING_BASE_URL") or "").strip() |
| if override: |
| return override.rstrip("/") |
| u = request.url |
| port = u.port |
| if port is None: |
| try: |
| port = int(os.getenv("PORT", "7860")) |
| except ValueError: |
| port = 7860 |
| return f"http://127.0.0.1:{port}" |
|
|
|
|
| def _read_text_file_tail(path: Path, max_lines: int) -> str: |
| if not path.is_file(): |
| return "" |
| with path.open(encoding="utf-8", errors="replace") as handle: |
| lines = handle.readlines()[-max_lines:] |
| return "".join(lines) |
|
|
|
|
| @app.get("/api/training/log") |
| def training_log(lines: int = Query(default=120, ge=1, le=500)) -> Dict[str, Any]: |
| """ |
| Last N lines of training_stdout.log. Disabled unless NEUROCASTER_EXPOSE_TRAINING_LOG=1 |
| (set on Hugging Face Spaces to debug exit code 1 without shell access). |
| """ |
| if os.getenv("NEUROCASTER_EXPOSE_TRAINING_LOG", "").lower() not in ("1", "true", "yes"): |
| raise HTTPException( |
| status_code=404, |
| detail="Set NEUROCASTER_EXPOSE_TRAINING_LOG=1 to enable this endpoint.", |
| ) |
| log_path = _training_output_dir() / "training_stdout.log" |
| text = _read_text_file_tail(log_path, lines) |
| return { |
| "path": str(log_path), |
| "lines": lines, |
| "content": text, |
| } |
|
|
|
|
| @app.get("/api/training/stream") |
| def training_stream() -> StreamingResponse: |
| """SSE stream for live training stdout lines and process status.""" |
| if os.getenv("NEUROCASTER_EXPOSE_TRAINING_LOG", "").lower() not in ("1", "true", "yes"): |
| raise HTTPException( |
| status_code=404, |
| detail="Set NEUROCASTER_EXPOSE_TRAINING_LOG=1 to enable this endpoint.", |
| ) |
| log_path = _training_output_dir() / "training_stdout.log" |
|
|
| def event_stream() -> Generator[str, None, None]: |
| offset = 0 |
| yield _sse_event("status", train_status()) |
| while True: |
| if log_path.exists(): |
| size = log_path.stat().st_size |
| if size < offset: |
| offset = 0 |
| with log_path.open(encoding="utf-8", errors="replace") as handle: |
| handle.seek(offset) |
| for line in handle: |
| line = line.rstrip("\n") |
| if line: |
| yield _sse_event("log", {"line": line}) |
| offset = handle.tell() |
|
|
| status_snapshot = train_status() |
| yield _sse_event("status", status_snapshot) |
| if status_snapshot.get("status") in {"completed", "failed"}: |
| yield _sse_event("end", status_snapshot) |
| break |
| yield _sse_event("heartbeat", {"ts": time.time()}) |
| time.sleep(1.0) |
|
|
| return StreamingResponse( |
| event_stream(), |
| media_type="text/event-stream", |
| headers={ |
| "Cache-Control": "no-cache", |
| "Connection": "keep-alive", |
| "X-Accel-Buffering": "no", |
| }, |
| ) |
|
|
|
|
| @app.post("/api/chat") |
| def chat(payload: ChatRequest) -> Dict[str, Any]: |
| """Generate markdown with the trained checkpoint and score it with TRIBE-v2.""" |
| process = _training_process |
| if process and process.poll() is None: |
| raise HTTPException(status_code=409, detail="Training is still running; wait for a checkpoint before chatting.") |
|
|
| model_dir = _latest_model_dir(_training_output_dir()) |
| if not model_dir: |
| raise HTTPException(status_code=404, detail="No trained model checkpoint found. Start and finish training first.") |
|
|
| markdown = _generate_markdown( |
| model_dir=model_dir, |
| prompt=payload.prompt, |
| max_new_tokens=payload.max_new_tokens, |
| temperature=payload.temperature, |
| ) |
| audio_anchor = _shared_root() / "audio" / "voices" / "reference.wav" |
| metrics = TribeReward().score(markdown, str(audio_anchor)) |
| normalized_metrics = { |
| **metrics, |
| "STS_Z_Score": float(metrics.get("Z_STS", 0.0)), |
| "TPJ_Z_Score": float(metrics.get("Z_TPJ", 0.0)), |
| "Broca_Z_Score": float(metrics.get("Z_Broca", 0.0)), |
| } |
| return { |
| "markdown": markdown, |
| "biological_reward": float(metrics.get("biological_reward", 0.0)), |
| "metrics": normalized_metrics, |
| "model_dir": str(model_dir), |
| } |
|
|
|
|
| def _shared_root() -> Path: |
| return Path(os.getenv("SHARED_DATA_ROOT", str(_data_root() / "shared_data"))) |
|
|
|
|
| def _training_output_dir() -> Path: |
| return Path(os.getenv("NEUROCASTER_TRAINING_OUTPUT_DIR", str(_data_root() / "neurocaster-grpo"))) |
|
|
|
|
| def _telemetry_path() -> Path: |
| return Path(os.getenv("NEUROCASTER_TELEMETRY_PATH", str(_shared_root() / "training_telemetry.jsonl"))) |
|
|
|
|
| def _data_root() -> Path: |
| configured = (os.getenv("NEUROCASTER_DATA_ROOT") or "").strip() |
| if configured: |
| return Path(configured) |
| if Path("/data").exists(): |
| return Path("/data") |
| return PROJECT_ROOT |
|
|
|
|
| def _sse_event(event: str, payload: Dict[str, Any]) -> str: |
| return f"event: {event}\ndata: {json.dumps(payload, ensure_ascii=False)}\n\n" |
|
|
|
|
| def _read_jsonl_tail(path: Path, limit: int) -> List[Dict[str, Any]]: |
| if not path.exists(): |
| return [] |
| rows: deque[Dict[str, Any]] = deque(maxlen=limit) |
| with path.open() as handle: |
| for line in handle: |
| line = line.strip() |
| if not line: |
| continue |
| try: |
| row = json.loads(line) |
| except json.JSONDecodeError: |
| continue |
| if isinstance(row, dict): |
| rows.append(row) |
| return list(rows) |
|
|
|
|
| def _read_reward_csv_tail(path: Path, limit: int) -> List[Dict[str, Any]]: |
| if not path.exists(): |
| return [] |
| rows: deque[Dict[str, Any]] = deque(maxlen=limit) |
| with path.open(newline="") as handle: |
| for row in csv.DictReader(handle): |
| rows.append(row) |
| return list(rows) |
|
|
|
|
| def _normalize_telemetry_row(row: Dict[str, Any], fallback_episode: int) -> Dict[str, Any]: |
| episode = int(_number(row.get("episode"), fallback_episode)) |
| reward = _number(row.get("reward"), _number(row.get("total_reward"), 0.0)) |
| total_reward = _number(row.get("total_reward"), _number(row.get("cumulative_reward"), reward)) |
| sts = _number(row.get("STS_Z_Score"), _number(row.get("Z_STS"), 0.0)) |
| tpj = _number(row.get("TPJ_Z_Score"), _number(row.get("Z_TPJ"), 0.0)) |
| broca = _number(row.get("Broca_Z_Score"), _number(row.get("Z_Broca"), 0.0)) |
| biological_reward = _number(row.get("biological_reward"), (sts + tpj) - broca) |
| return { |
| **row, |
| "episode": episode, |
| "reward": reward, |
| "total_reward": total_reward, |
| "cumulative_reward": total_reward, |
| "Z_STS": sts, |
| "Z_TPJ": tpj, |
| "Z_Broca": broca, |
| "STS_Z_Score": sts, |
| "TPJ_Z_Score": tpj, |
| "Broca_Z_Score": broca, |
| "biological_reward": biological_reward, |
| "cognitive_load": _number(row.get("cognitive_load"), broca), |
| "brain_activity": { |
| "sts": sts, |
| "tpj": tpj, |
| "broca": broca, |
| }, |
| } |
|
|
|
|
| def _number(value: Any, default: float) -> float: |
| try: |
| if value in (None, ""): |
| return float(default) |
| return float(value) |
| except (TypeError, ValueError): |
| return float(default) |
|
|
|
|
| def _latest_model_dir(output_dir: Path) -> Optional[Path]: |
| if _has_model_files(output_dir): |
| return output_dir |
| checkpoints = [path for path in output_dir.glob("checkpoint-*") if path.is_dir() and _has_model_files(path)] |
| if not checkpoints: |
| return None |
| return max(checkpoints, key=lambda path: path.stat().st_mtime) |
|
|
|
|
| def _has_model_files(path: Path) -> bool: |
| return any((path / name).exists() for name in ("adapter_config.json", "config.json", "tokenizer_config.json")) |
|
|
|
|
| def _generate_markdown(model_dir: Path, prompt: str, max_new_tokens: int, temperature: float) -> str: |
| with _chat_lock: |
| cache_key = str(model_dir.resolve()) |
| if _chat_cache.get("key") != cache_key: |
| _chat_cache.clear() |
| _chat_cache.update({"key": cache_key, **_load_generation_model(model_dir)}) |
| model = _chat_cache["model"] |
| tokenizer = _chat_cache["tokenizer"] |
|
|
| full_prompt = ( |
| "You are NeuroCaster. Return only polished Marp/Slidev markdown with concise on-screen " |
| "content, <!-- narration --> speaker notes, the static audio anchor, and at least one Mermaid diagram.\n\n" |
| f"User request:\n{prompt}" |
| ) |
| if getattr(tokenizer, "chat_template", None): |
| messages = [ |
| {"role": "system", "content": "You generate NeuroCaster explainer decks as markdown."}, |
| {"role": "user", "content": full_prompt}, |
| ] |
| rendered_prompt = tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True) |
| else: |
| rendered_prompt = full_prompt |
|
|
| inputs = tokenizer(rendered_prompt, return_tensors="pt") |
| device = getattr(model, "device", None) |
| if device is not None: |
| inputs = {key: value.to(device) for key, value in inputs.items()} |
|
|
| import torch |
|
|
| with torch.inference_mode(): |
| output_ids = model.generate( |
| **inputs, |
| max_new_tokens=max_new_tokens, |
| do_sample=temperature > 0, |
| temperature=max(temperature, 1e-5), |
| pad_token_id=getattr(tokenizer, "eos_token_id", None), |
| ) |
| prompt_length = inputs["input_ids"].shape[-1] |
| return tokenizer.decode(output_ids[0][prompt_length:], skip_special_tokens=True).strip() |
|
|
|
|
| def _load_generation_model(model_dir: Path) -> Dict[str, Any]: |
| errors: List[str] = [] |
| try: |
| from unsloth import FastLanguageModel |
|
|
| model, tokenizer = FastLanguageModel.from_pretrained( |
| model_name=str(model_dir), |
| max_seq_length=int(os.getenv("NEUROCASTER_CHAT_MAX_SEQ_LENGTH", "4096")), |
| load_in_4bit=True, |
| ) |
| FastLanguageModel.for_inference(model) |
| return {"model": model, "tokenizer": tokenizer} |
| except Exception as exc: |
| errors.append(f"unsloth: {exc}") |
|
|
| try: |
| from peft import AutoPeftModelForCausalLM |
| from transformers import AutoTokenizer |
|
|
| tokenizer = AutoTokenizer.from_pretrained(str(model_dir)) |
| model = AutoPeftModelForCausalLM.from_pretrained(str(model_dir), device_map="auto") |
| model.eval() |
| return {"model": model, "tokenizer": tokenizer} |
| except Exception as exc: |
| errors.append(f"peft: {exc}") |
|
|
| try: |
| from transformers import AutoModelForCausalLM, AutoTokenizer |
|
|
| tokenizer = AutoTokenizer.from_pretrained(str(model_dir)) |
| model = AutoModelForCausalLM.from_pretrained(str(model_dir), device_map="auto") |
| model.eval() |
| return {"model": model, "tokenizer": tokenizer} |
| except Exception as exc: |
| errors.append(f"transformers: {exc}") |
|
|
| raise HTTPException(status_code=500, detail="Could not load trained model: " + " | ".join(errors)) |
|
|
|
|
| if STATIC_DIR.exists(): |
| @app.get("/", include_in_schema=False) |
| def _root() -> RedirectResponse: |
| return RedirectResponse(url="/web") |
|
|
| @app.get("/web", include_in_schema=False) |
| def _web_index() -> FileResponse: |
| return FileResponse(str(STATIC_DIR / "index.html")) |
|
|
| @app.get("/web/index.html", include_in_schema=False) |
| def _web_index_html() -> FileResponse: |
| return FileResponse(str(STATIC_DIR / "index.html")) |
|
|
| @app.get("/web/assets/style.css", include_in_schema=False) |
| def _static_css() -> FileResponse: |
| return FileResponse(str(STATIC_DIR / "style.css")) |
|
|
| @app.get("/web/assets/app.js", include_in_schema=False) |
| def _static_js() -> FileResponse: |
| return FileResponse(str(STATIC_DIR / "app.js")) |
|
|
| app.mount("/web/assets", StaticFiles(directory=STATIC_DIR, html=True), name="web_assets") |
|
|
|
|
| |
| |
| app.mount("/", openenv_app) |
|
|
|
|
| def main(host: str = "0.0.0.0", port: int = 8000): |
| """ |
| Entry point for direct execution via uv run or python -m. |
| |
| This function enables running the server without Docker: |
| uv run --project . server |
| uv run --project . server --port 8001 |
| python -m neurocaster_env.server.app |
| |
| Args: |
| host: Host address to bind to (default: "0.0.0.0") |
| port: Port number to listen on (default: 8000) |
| |
| For production deployments, consider using uvicorn directly with |
| multiple workers: |
| uvicorn neurocaster_env.server.app:app --workers 4 |
| """ |
| import uvicorn |
|
|
| uvicorn.run(app, host=host, port=port) |
|
|
|
|
| if __name__ == "__main__": |
| import argparse |
|
|
| parser = argparse.ArgumentParser() |
| parser.add_argument("--port", type=int, default=8000) |
| args = parser.parse_args() |
| main(port=args.port) |
|
|