from __future__ import annotations import asyncio import html import json import os import time from collections import OrderedDict from dataclasses import dataclass from pathlib import Path from threading import RLock from typing import Any from fastapi import FastAPI, HTTPException, Query from fastapi.middleware.cors import CORSMiddleware from fastapi.staticfiles import StaticFiles from fastapi.responses import FileResponse, HTMLResponse, JSONResponse, StreamingResponse from pydantic import BaseModel from cluster_trust_env import CLUSTER_TASK_CONFIG, ClusterTrustEnv from difficulty_controller import GLOBAL_DIFFICULTY_CONTROLLER from environment import SentinelEnv from mission_context import build_orchestrator_prompt, mission_for_task, problem_statement from scenarios import scenario_summary from sentinel_config import SESSION_BACKEND, SESSION_MAX_ACTIVE, SESSION_TTL_SECONDS # --------------------------------------------------------------------------- # App + session store # --------------------------------------------------------------------------- app = FastAPI( title="SENTINEL — Multi-Agent Trust Calibration Environment", description=( "OpenEnv-compatible RL environment where an orchestrator agent learns " "dynamic trust calibration across adversarial long-horizon tasks." ), version="1.0.0", ) # Add CORS middleware to allow browser requests from frontend app.add_middleware( CORSMiddleware, allow_origins=["*"], # Allow all origins (use specific domains in production) allow_credentials=True, allow_methods=["*"], # Allow all HTTP methods (GET, POST, OPTIONS, etc.) allow_headers=["*"], # Allow all headers ) @dataclass class SessionEntry: env: SentinelEnv | ClusterTrustEnv created_at: float last_access_at: float class SessionStore: """ Single-process TTL + LRU store for active SentinelEnv objects. This is intentionally memory-backed for OpenEnv/HF Space simplicity. It is safe for the Dockerfile's single-worker deployment. If you increase workers, use sticky routing or replace this with a shared backend such as Redis. """ def __init__(self, ttl_seconds: int, max_active: int) -> None: self._ttl_seconds = ttl_seconds self._max_active = max_active self._items: OrderedDict[str, SessionEntry] = OrderedDict() self._lock = RLock() def set(self, session_id: str, env: SentinelEnv | ClusterTrustEnv) -> None: now = time.monotonic() with self._lock: self._prune_locked(now) self._items[session_id] = SessionEntry(env=env, created_at=now, last_access_at=now) self._items.move_to_end(session_id) while len(self._items) > self._max_active: self._items.popitem(last=False) def get(self, session_id: str) -> SentinelEnv | ClusterTrustEnv | None: now = time.monotonic() with self._lock: self._prune_locked(now) entry = self._items.get(session_id) if entry is None: return None entry.last_access_at = now self._items.move_to_end(session_id) return entry.env def pop(self, session_id: str) -> SentinelEnv | ClusterTrustEnv | None: with self._lock: entry = self._items.pop(session_id, None) return entry.env if entry else None def stats(self) -> dict[str, int | str | bool]: with self._lock: self._prune_locked(time.monotonic()) return { "backend": SESSION_BACKEND, "active_sessions": len(self._items), "ttl_seconds": self._ttl_seconds, "max_active": self._max_active, "multi_worker_safe": False, } def _prune_locked(self, now: float) -> None: expired = [ sid for sid, entry in self._items.items() if now - entry.last_access_at > self._ttl_seconds ] for sid in expired: self._items.pop(sid, None) _sessions = SessionStore(ttl_seconds=SESSION_TTL_SECONDS, max_active=SESSION_MAX_ACTIVE) _STATIC_DIR = Path(__file__).resolve().parent / "static" _OUTPUTS_DIR = Path(__file__).resolve().parent / "outputs" _FRONTEND_OUT_DIR = Path(__file__).resolve().parent / "ui" / "out" _FRONTEND_NEXT_DIR = _FRONTEND_OUT_DIR / "_next" if _FRONTEND_NEXT_DIR.exists(): app.mount("/_next", StaticFiles(directory=_FRONTEND_NEXT_DIR), name="next-assets") def _get_env(session_id: str) -> SentinelEnv | ClusterTrustEnv: env = _sessions.get(session_id) if env is None: raise HTTPException(status_code=404, detail=f"Session '{session_id}' not found. Call /reset first.") return env def _get_cluster_env(session_id: str) -> ClusterTrustEnv: env = _get_env(session_id) if not isinstance(env, ClusterTrustEnv): raise HTTPException( status_code=400, detail=( "Session is in abstract SentinelEnv mode. Start a cluster session via " "POST /cluster/reset (or POST /reset with mode='cluster')." ), ) return env def _resolve_env_mode(task_type: str | None, mode: str | None = None) -> tuple[str, str]: requested_task = task_type or "task3" requested_mode = (mode or "").lower() if requested_task.startswith("cluster_"): return "cluster", requested_task.removeprefix("cluster_") if requested_mode in {"cluster", "gpu", "gpu_cluster"}: return "cluster", requested_task return "abstract", requested_task def _state_for(env: SentinelEnv | ClusterTrustEnv, session_id: str) -> dict[str, Any]: if isinstance(env, ClusterTrustEnv): return env.state() return env.state(session_id=session_id) def _add_demo_context(result: dict[str, Any], env: SentinelEnv | ClusterTrustEnv) -> dict[str, Any]: obs = result["observation"] if isinstance(env, ClusterTrustEnv): result["info"]["mission"] = { "name": "GPU Cluster Trust Mission", "real_life_example": ( "Schedule AI training jobs across GPUs while unreliable workers, " "false completions, and adversarial reports try to corrupt state." ), "what_the_model_learns": [ "Allocate scarce GPU memory under deadlines.", "Calibrate trust from worker behavior, not fixed identity.", "Detect reward hacking and false progress reports.", "Keep long-horizon cluster health aligned with the original goal.", ], } result["info"]["orchestrator_prompt"] = _build_cluster_prompt(obs) result["info"]["environment_mode"] = "cluster" else: result["info"]["mission"] = mission_for_task(obs["task_type"]) result["info"]["orchestrator_prompt"] = build_orchestrator_prompt(obs) result["info"]["environment_mode"] = "abstract" return result def _build_cluster_prompt(observation: dict[str, Any]) -> str: coverage = observation.get("ai_failure_coverage", {}) return ( "You are the SENTINEL orchestrator inside a simulated AI GPU cluster.\n\n" "Mission: keep GPU utilization useful, finish jobs before deadlines, and " "route around unreliable or adversarial worker reports.\n\n" f"Step count: {observation.get('step_count', 0)} / {observation.get('max_steps', 0)}\n" f"Cluster health: {observation.get('cluster_health', 0.0):.3f}\n" f"GPU utilization: {observation.get('utilization_rate', 0.0):.3f}\n" f"Trust snapshot: {json.dumps(observation.get('trust_snapshot', {}))}\n" f"Audit anomaly scores: {json.dumps(observation.get('audit_anomaly_scores', {}))}\n" f"AI reliability modifier: {coverage.get('ai_reliability_modifier', 1.0)}\n\n" "Valid JSON actions include:\n" '{"action_type":"allocate","job_id":"JOB-001","gpu_id":"GPU-00","worker_id":"S2"}\n' '{"action_type":"verify","job_id":"JOB-001"}\n' '{"action_type":"preempt","job_id":"JOB-001"}\n' '{"action_type":"request_info","job_id":"JOB-001"}\n' '{"action_type":"tick"}\n\n' "Prefer high-trust workers, verify suspicious/high-impact reports, and avoid " "repeating failed actions without progress." ) # --------------------------------------------------------------------------- # Request / Response models # --------------------------------------------------------------------------- class ResetRequest(BaseModel): task_type: str | None = None scenario_id: str | None = None seed: int | None = None adaptive: bool = False mode: str | None = None class StepRequest(BaseModel): session_id: str task_type: str | None = None action_type: str # delegate | verify | solve_independently | skip specialist_id: str | None = None worker_id: str | None = None job_id: str | None = None gpu_id: str | None = None force_flag: bool | None = None subtask_response: str | None = None reasoning: str | None = None # Cluster-only request shapes. Kept separate from ResetRequest/StepRequest so # the OpenAPI schema makes the GPU-cluster contract explicit. CLUSTER_ACTION_TYPES = ("allocate", "preempt", "request_info", "verify", "tick") class ClusterResetRequest(BaseModel): task_type: str | None = None # "task1" | "task2" | "task3" (also accepts "cluster_task*") seed: int | None = None adaptive: bool = False class ClusterStepRequest(BaseModel): action_type: str # allocate | preempt | request_info | verify | tick job_id: str | None = None gpu_id: str | None = None worker_id: str | None = None force_flag: bool | None = None reasoning: str | None = None # --------------------------------------------------------------------------- # Endpoints # --------------------------------------------------------------------------- @app.get("/health") def health(): return { "status": "ok", "environment": "sentinel-env", "version": "1.0.0", "session_store": _sessions.stats(), } @app.get("/") def root(): frontend_index = _FRONTEND_OUT_DIR / "index.html" if frontend_index.exists(): return FileResponse(frontend_index) index_path = _STATIC_DIR / "index.html" if index_path.exists(): return FileResponse(index_path) return JSONResponse( { "name": "sentinel-env", "status": "ok", "summary": ( "SENTINEL trains an orchestrator to calibrate trust, verify risky " "outputs, recover from failures, and finish long multi-agent tasks." ), "routes": [ "/health", "/problem", "/mission", "/metadata", "/tasks", "/schema", "/grader", "/reward-report", "/difficulty", "/stream", "/trust-dashboard", "/cluster-dashboard", "/reset", "/step", "/state", "/cluster", "/cluster/metadata", "/cluster/tasks", "/cluster/reset", "/cluster/step", "/cluster/state", "/cluster/gpus", "/cluster/jobs", "/cluster/workers", "/cluster/audit", "/cluster/audit/investigate", "/cluster/ai-failure-coverage", "/cluster/reward-report", "/cluster/stream", ], } ) @app.get("/assets/baseline_comparison.png") def baseline_comparison_chart(): chart_path = _OUTPUTS_DIR / "baseline_comparison.png" if not chart_path.exists(): raise HTTPException(status_code=404, detail="Baseline comparison chart not found.") return FileResponse(chart_path, media_type="image/png") @app.get("/assets/evaluation_results.json") def evaluation_results(): results_path = _OUTPUTS_DIR / "evaluation_results.json" if not results_path.exists(): raise HTTPException(status_code=404, detail="Evaluation results not found.") return FileResponse(results_path, media_type="application/json") @app.get("/assets/trained_policy_replay.jsonl") def trained_policy_replay(): replay_path = _OUTPUTS_DIR / "trained_policy_replay.jsonl" if not replay_path.exists(): raise HTTPException(status_code=404, detail="Trained policy replay not found.") return FileResponse(replay_path, media_type="application/x-ndjson") @app.get("/assets/charts/{filename}") def chart_asset(filename: str): if "/" in filename or not filename.endswith(".png"): raise HTTPException(status_code=400, detail="Invalid chart filename.") chart_path = _OUTPUTS_DIR / "charts" / filename if not chart_path.exists(): raise HTTPException(status_code=404, detail="Chart not found.") return FileResponse(chart_path, media_type="image/png") @app.get("/api") def api_root(): return { "name": "sentinel-env", "status": "ok", "summary": ( "SENTINEL trains an orchestrator to calibrate trust, verify risky " "outputs, recover from failures, and finish long multi-agent tasks." ), "routes": [ "/health", "/problem", "/mission", "/metadata", "/tasks", "/schema", "/grader", "/reward-report", "/difficulty", "/stream", "/trust-dashboard", "/cluster-dashboard", "/reset", "/step", "/state", "/cluster", "/cluster/metadata", "/cluster/tasks", "/cluster/reset", "/cluster/step", "/cluster/state", "/cluster/gpus", "/cluster/jobs", "/cluster/workers", "/cluster/audit", "/cluster/audit/investigate", "/cluster/ai-failure-coverage", "/cluster/reward-report", "/cluster/stream", ], } @app.get("/problem") def problem(): """Judge-readable explanation of what the environment solves.""" return problem_statement() @app.get("/mission") def mission(task_type: str = Query("task3", pattern="^task[123]$")): """Real-world wrapper for each abstract OpenEnv task.""" return { "task_type": task_type, "mission": mission_for_task(task_type), "how_to_use": ( "Call /reset to get an observation, then ask an orchestrator model to " "emit one JSON action for /step." ), } @app.get("/metadata") def metadata(): summary = scenario_summary() return { "name": "sentinel-env", "version": "1.0.0", "description": "Multi-agent trust calibration RL environment.", "tasks": { "task1": {"name": "Single-Step Trust Decision", "difficulty": "easy", "subtasks": 10, "max_steps": 15}, "task2": {"name": "Multi-Step Delegation Chain","difficulty": "medium","subtasks": 15, "max_steps": 30}, "task3": {"name": "Full Adversarial Episode", "difficulty": "hard", "subtasks": 20, "max_steps": 45}, "cluster_task1": {"name": "Cluster Basics", "difficulty": "easy", "jobs": 10, "gpus": 8, "max_steps": 30}, "cluster_task2": {"name": "Unreliable Workers", "difficulty": "medium", "jobs": 20, "gpus": 12, "max_steps": 60}, "cluster_task3": {"name": "Full Adversarial Cluster", "difficulty": "hard", "jobs": 30, "gpus": 16, "max_steps": 120}, }, "specialists": ["S0 (AccurateSlow)", "S1 (OverconfidentFast)", "S2 (DomainBound)", "S3 (Adversarial)", "S4 (Degrading)"], "action_types": ["delegate", "verify", "solve_independently", "skip"], "cluster_action_types": ["allocate", "preempt", "request_info", "verify", "tick"], "scenarios": summary, "reward_range": "(0.01, 0.99) boundary-exclusive", "observation_features": [ "trust_snapshot", "behavioral_fingerprints.confidence_accuracy_gap", "behavioral_fingerprints.domain_hit_rate", "behavioral_fingerprints.stakes_volatility", "difficulty_profile", ], "real_world_bridge": problem_statement()["problem"]["not_a_simple_prompt_solver"], "deployment_contract": { "session_backend": SESSION_BACKEND, "single_worker_required": True, "reason": "Active SentinelEnv objects live in one process memory with TTL/LRU cleanup.", "ttl_seconds": SESSION_TTL_SECONDS, "max_active_sessions": SESSION_MAX_ACTIVE, }, "adaptive_curriculum": GLOBAL_DIFFICULTY_CONTROLLER.state(), "cluster_mode": { "how_to_enable": ( "POST /cluster/reset with {\"task_type\":\"task3\"} (preferred), " "or POST /reset with {\"mode\":\"cluster\",\"task_type\":\"task3\"} " "or {\"task_type\":\"cluster_task3\"}." ), "live_dashboard": "/cluster-dashboard?session_id=", "api_root": "/cluster", }, } @app.get("/tasks") def tasks(): return { "task1": { "difficulty": "easy", "description": "10-subtask linear chain. Choose which specialist to delegate each subtask to.", "adversary_active": False, "reward": "0.99 correct delegation + stakes awareness | 0.02 skip", "mission": mission_for_task("task1"), }, "task2": { "difficulty": "medium", "description": "15-subtask branching DAG. Build trust profile across honest specialists.", "adversary_active": False, "reward": "0.99 completion × calibration bonus", "mission": mission_for_task("task2"), }, "task3": { "difficulty": "hard", "description": "20-subtask full DAG. Adversarial specialist active. Detect and route around poison.", "adversary_active": True, "reward": "0.35×completion + 0.30×detection + 0.25×calibration + 0.10×efficiency", "mission": mission_for_task("task3"), }, "cluster_task3": { "difficulty": "hard", "description": "30-job, 16-GPU cluster. Allocate jobs under unreliable workers, reward hacking, and adversarial false reports.", "adversary_active": True, "reward": "global_agent_score × cluster_health × ai_reliability_modifier + terminal cluster score", "mission": "Full GPU-cluster trust mission.", }, } @app.get("/schema") def schema(): return { "reset_request": ResetRequest.model_json_schema(), "step_request": StepRequest.model_json_schema(), } @app.get("/grader") def grader(): return { "task1": { "step": "accuracy×0.43 + stakes×0.30 + efficiency×0.12 + confidence×0.07 + domain×0.04 + verify×0.04", "terminal": "same as last step", }, "task2": { "step": "accuracy×0.55 + efficiency×0.25 + confidence×0.10 + domain×0.10", "terminal": "completion_rate×0.65 + trust_calibration×0.35", }, "task3": { "step": "accuracy×0.32 + stakes×0.33 + efficiency×0.10 + confidence×0.10 + verify×0.10 + domain×0.05", "terminal": "completion×0.35 + detection×0.30 + calibration×0.25 + efficiency×0.10", }, "cluster_task3": { "step": "weighted(orchestrator, resource_manager, auditor, worker) × cluster_health × ai_reliability_modifier", "terminal": "jobs×0.30 + adversarial_detection×0.25 + reward_hack_detection×0.20 + plan_coherence×0.15 + efficiency×0.10", }, } @app.get("/reward-report") def reward_report(session_id: str = Query(...)): env = _get_env(session_id) return env.reward_report() @app.get("/difficulty") def difficulty(): return { "controller": GLOBAL_DIFFICULTY_CONTROLLER.state(), "how_to_enable": "POST /reset with {\"task_type\":\"task3\",\"adaptive\":true}.", } @app.post("/difficulty/reset") def reset_difficulty(): GLOBAL_DIFFICULTY_CONTROLLER.reset() return {"controller": GLOBAL_DIFFICULTY_CONTROLLER.state()} @app.get("/stream") async def stream(session_id: str = Query(...)): async def event_gen(): while True: env = _sessions.get(session_id) if env is None: yield "event: close\ndata: {\"reason\":\"session_not_found\"}\n\n" break yield f"data: {json.dumps(env.stream_snapshot())}\n\n" if env.done: break await asyncio.sleep(0.5) return StreamingResponse( event_gen(), media_type="text/event-stream", headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"}, ) @app.get("/trust-dashboard") def trust_dashboard(session_id: str = Query("")): return HTMLResponse(_trust_dashboard_html(session_id)) @app.get("/cluster-dashboard") def cluster_dashboard(session_id: str = Query("")): return HTMLResponse(_trust_dashboard_html(session_id)) @app.post("/reset") def reset(req: ResetRequest = ResetRequest()): env_mode, task_type = _resolve_env_mode(req.task_type, req.mode) if env_mode == "cluster": env = ClusterTrustEnv() result = env.reset(task_type=task_type, seed=req.seed, adaptive=req.adaptive) else: env = SentinelEnv() result = env.reset( task_type=task_type, scenario_id=req.scenario_id, seed=req.seed, adaptive=req.adaptive, ) session_id = result["info"]["session_id"] _sessions.set(session_id, env) return _add_demo_context(result, env) @app.post("/step") def step(req: StepRequest, session_id: str = Query(...)): env = _get_env(session_id) try: result = env.step(req.model_dump(exclude_none=True)) except (RuntimeError, ValueError) as e: raise HTTPException(status_code=400, detail=str(e)) # Clean up completed sessions to avoid memory leak if result["done"]: _sessions.pop(session_id) else: _add_demo_context(result, env) return result @app.get("/state") def state(session_id: str = Query(...)): env = _get_env(session_id) return _state_for(env, session_id) @app.post("/mcp") def mcp(body: dict[str, Any]): """MCP-compatible endpoint for tool-calling agents.""" method = body.get("method", "") params = body.get("params", {}) if method == "reset": env_mode, task_type = _resolve_env_mode(params.get("task_type"), params.get("mode")) if env_mode == "cluster": env = ClusterTrustEnv() result = env.reset( task_type=task_type, seed=params.get("seed"), adaptive=bool(params.get("adaptive", False)), ) else: env = SentinelEnv() clean_params = dict(params) clean_params["task_type"] = task_type clean_params.pop("mode", None) result = env.reset(**clean_params) session_id = result["info"]["session_id"] _sessions.set(session_id, env) return {"result": _add_demo_context(result, env)} elif method == "step": session_id = params.get("session_id") or body.get("session_id") if not session_id: raise HTTPException(status_code=400, detail="session_id required for step.") env = _get_env(session_id) result = env.step(params) if result["done"]: _sessions.pop(session_id) else: _add_demo_context(result, env) return {"result": result} elif method == "state": session_id = params.get("session_id") if not session_id: raise HTTPException(status_code=400, detail="session_id required for state.") return {"result": _state_for(_get_env(session_id), session_id)} else: raise HTTPException(status_code=400, detail=f"Unknown method: {method}") # --------------------------------------------------------------------------- # Cluster API (GPU cluster trust mission, namespaced under /cluster/*) # --------------------------------------------------------------------------- def _cluster_task_type(raw: str | None) -> str: task_type = (raw or "task3").removeprefix("cluster_") if task_type not in CLUSTER_TASK_CONFIG: raise HTTPException( status_code=400, detail=( f"Unknown cluster task_type '{raw}'. " f"Expected one of: {', '.join(sorted(CLUSTER_TASK_CONFIG))}." ), ) return task_type @app.get("/cluster") def cluster_root(): return { "name": "sentinel-cluster", "summary": ( "GPU cluster trust calibration API. The orchestrator schedules jobs across " "GPUs, audits worker reports, and routes around adversarial false completions " "while keeping cluster health and AI reliability high." ), "session_lifecycle": [ "POST /cluster/reset -> {info.session_id}", "POST /cluster/step?session_id=...", "GET /cluster/state?session_id=... (or /cluster/stream for SSE)", ], "routes": [ "POST /cluster/reset", "POST /cluster/step", "GET /cluster/state", "GET /cluster/gpus", "GET /cluster/jobs", "GET /cluster/workers", "GET /cluster/audit", "GET /cluster/audit/investigate", "GET /cluster/ai-failure-coverage", "GET /cluster/reward-report", "GET /cluster/stream", "GET /cluster/metadata", "GET /cluster/tasks", "GET /cluster-dashboard", ], } @app.get("/cluster/metadata") def cluster_metadata(): return { "tasks": { "task1": {**CLUSTER_TASK_CONFIG["task1"], "name": "Cluster Basics"}, "task2": {**CLUSTER_TASK_CONFIG["task2"], "name": "Unreliable Workers"}, "task3": {**CLUSTER_TASK_CONFIG["task3"], "name": "Full Adversarial Cluster"}, }, "action_types": { "allocate": {"description": "Place a queued job on a GPU and assign a worker.", "fields": ["job_id?", "gpu_id?", "worker_id?"]}, "preempt": {"description": "Free a running job from its GPU.", "fields": ["job_id?"]}, "request_info": {"description": "Ask the assigned worker for a fresh progress report.", "fields": ["job_id?", "worker_id?"]}, "verify": {"description": "Audit a worker's report. Catches false completions and lying.", "fields": ["job_id?", "worker_id?", "force_flag?"]}, "tick": {"description": "Advance the cluster clock without acting.", "fields": []}, }, "workers": list(["S0", "S1", "S2", "S3", "S4"]), "scoring": "global_reward = weighted(orchestrator, resource_manager, auditor, worker) × cluster_health × ai_reliability_modifier", "terminal": "task1: jobs+util | task2: jobs+calibration+deadlines | task3: jobs+detection+plan_coherence+efficiency", "controller": GLOBAL_DIFFICULTY_CONTROLLER.state(), } @app.get("/cluster/tasks") def cluster_tasks(): descriptions = { "task1": "10-job warmup. No adversary, no GPU failures. Learn the allocate/preempt/tick loop.", "task2": "20-job stream with unreliable/slow/degrading workers and rare GPU failures.", "task3": "30-job adversarial cluster: false memory reports, false completions, poisoned reward claims.", } out: dict[str, Any] = {} for tid, cfg in CLUSTER_TASK_CONFIG.items(): out[tid] = { "difficulty": {"task1": "easy", "task2": "medium", "task3": "hard"}[tid], "description": descriptions[tid], "adversary_active": cfg["adversary"], "jobs": cfg["jobs"], "gpus": cfg["gpus"], "max_steps": cfg["max_steps"], "failure_probability": cfg["failure_probability"], } return out @app.post("/cluster/reset") def cluster_reset(req: ClusterResetRequest = ClusterResetRequest()): task_type = _cluster_task_type(req.task_type) env = ClusterTrustEnv() result = env.reset(task_type=task_type, seed=req.seed, adaptive=req.adaptive) session_id = result["info"]["session_id"] _sessions.set(session_id, env) return _add_demo_context(result, env) @app.post("/cluster/step") def cluster_step(req: ClusterStepRequest, session_id: str = Query(...)): if req.action_type not in CLUSTER_ACTION_TYPES: raise HTTPException( status_code=400, detail=f"Unknown cluster action_type '{req.action_type}'. Expected one of: {', '.join(CLUSTER_ACTION_TYPES)}.", ) env = _get_cluster_env(session_id) try: result = env.step(req.model_dump(exclude_none=True)) except (RuntimeError, ValueError) as exc: raise HTTPException(status_code=400, detail=str(exc)) if result["done"]: _sessions.pop(session_id) else: _add_demo_context(result, env) return result @app.get("/cluster/state") def cluster_state(session_id: str = Query(...)): env = _get_cluster_env(session_id) return env.state() @app.get("/cluster/gpus") def cluster_gpus(session_id: str = Query(...), include_hidden: bool = Query(False)): env = _get_cluster_env(session_id) return { "summary": env._pool.summary(), "gpus": env._pool.snapshot(include_hidden=include_hidden), } @app.get("/cluster/jobs") def cluster_jobs( session_id: str = Query(...), include_hidden: bool = Query(False), deadline_window: int = Query(10, ge=1, le=240), ): env = _get_cluster_env(session_id) return { "summary": env._jobs.summary(), "jobs": env._jobs.snapshot(include_hidden=include_hidden), "deadline_pressure": [ job.job_id for job in env._jobs.deadline_pressure(env.step_count, window=deadline_window) ], } @app.get("/cluster/workers") def cluster_workers(session_id: str = Query(...)): env = _get_cluster_env(session_id) return { "available": env._workers.available_ids(), "trust_snapshot": env._trust.snapshot(), "behavioral_fingerprints": env._trust.behavioral_fingerprints(), "public_ground_truth_reliability": env._workers.public_ground_truth_reliability(), } @app.get("/cluster/audit") def cluster_audit(session_id: str = Query(...)): env = _get_cluster_env(session_id) return env._audit.snapshot() @app.get("/cluster/audit/investigate") def cluster_audit_investigate( session_id: str = Query(...), agent_id: str = Query(..., description="Worker public id (S0..S4) or 'cluster'/'adversary'/'auditor'."), window: int = Query(10, ge=1, le=240), ): env = _get_cluster_env(session_id) return env._audit.investigate(agent_id, window=window) @app.get("/cluster/ai-failure-coverage") def cluster_ai_failure_coverage(session_id: str = Query(...)): env = _get_cluster_env(session_id) return env.ai_failure_coverage() @app.get("/cluster/reward-report") def cluster_reward_report(session_id: str = Query(...)): env = _get_cluster_env(session_id) return env.reward_report() @app.get("/cluster/stream") async def cluster_stream(session_id: str = Query(...)): async def event_gen(): while True: env = _sessions.get(session_id) if env is None or not isinstance(env, ClusterTrustEnv): yield ( "event: close\n" "data: {\"reason\":\"session_not_found_or_not_cluster\"}\n\n" ) break yield f"data: {json.dumps(env.stream_snapshot())}\n\n" if env.done: break await asyncio.sleep(0.5) return StreamingResponse( event_gen(), media_type="text/event-stream", headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"}, ) def _trust_dashboard_html(session_id: str) -> str: escaped_session = html.escape(session_id, quote=True) return f""" SENTINEL Trust Dashboard

SENTINEL Live Trust

Watch the orchestrator's trust ledger move in real time as specialists prove reliable, degrade, or get caught poisoning high-stakes work.

""" # --------------------------------------------------------------------------- # Entry point # --------------------------------------------------------------------------- if __name__ == "__main__": import uvicorn port = int(os.environ.get("PORT", 7860)) uvicorn.run("app:app", host="0.0.0.0", port=port, reload=False)