sentinel-env / app.py
XcodeAddy's picture
Expose cluster trust API endpoints
939dba8
from __future__ import annotations
import asyncio
import html
import json
import os
import time
from collections import OrderedDict
from dataclasses import dataclass
from pathlib import Path
from threading import RLock
from typing import Any
from fastapi import FastAPI, HTTPException, Query
from fastapi.middleware.cors import CORSMiddleware
from fastapi.staticfiles import StaticFiles
from fastapi.responses import FileResponse, HTMLResponse, JSONResponse, StreamingResponse
from pydantic import BaseModel
from cluster_trust_env import CLUSTER_TASK_CONFIG, ClusterTrustEnv
from difficulty_controller import GLOBAL_DIFFICULTY_CONTROLLER
from environment import SentinelEnv
from mission_context import build_orchestrator_prompt, mission_for_task, problem_statement
from scenarios import scenario_summary
from sentinel_config import SESSION_BACKEND, SESSION_MAX_ACTIVE, SESSION_TTL_SECONDS
# ---------------------------------------------------------------------------
# App + session store
# ---------------------------------------------------------------------------
app = FastAPI(
title="SENTINEL β€” Multi-Agent Trust Calibration Environment",
description=(
"OpenEnv-compatible RL environment where an orchestrator agent learns "
"dynamic trust calibration across adversarial long-horizon tasks."
),
version="1.0.0",
)
# Add CORS middleware to allow browser requests from frontend
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # Allow all origins (use specific domains in production)
allow_credentials=True,
allow_methods=["*"], # Allow all HTTP methods (GET, POST, OPTIONS, etc.)
allow_headers=["*"], # Allow all headers
)
@dataclass
class SessionEntry:
env: SentinelEnv | ClusterTrustEnv
created_at: float
last_access_at: float
class SessionStore:
"""
Single-process TTL + LRU store for active SentinelEnv objects.
This is intentionally memory-backed for OpenEnv/HF Space simplicity. It is
safe for the Dockerfile's single-worker deployment. If you increase workers,
use sticky routing or replace this with a shared backend such as Redis.
"""
def __init__(self, ttl_seconds: int, max_active: int) -> None:
self._ttl_seconds = ttl_seconds
self._max_active = max_active
self._items: OrderedDict[str, SessionEntry] = OrderedDict()
self._lock = RLock()
def set(self, session_id: str, env: SentinelEnv | ClusterTrustEnv) -> None:
now = time.monotonic()
with self._lock:
self._prune_locked(now)
self._items[session_id] = SessionEntry(env=env, created_at=now, last_access_at=now)
self._items.move_to_end(session_id)
while len(self._items) > self._max_active:
self._items.popitem(last=False)
def get(self, session_id: str) -> SentinelEnv | ClusterTrustEnv | None:
now = time.monotonic()
with self._lock:
self._prune_locked(now)
entry = self._items.get(session_id)
if entry is None:
return None
entry.last_access_at = now
self._items.move_to_end(session_id)
return entry.env
def pop(self, session_id: str) -> SentinelEnv | ClusterTrustEnv | None:
with self._lock:
entry = self._items.pop(session_id, None)
return entry.env if entry else None
def stats(self) -> dict[str, int | str | bool]:
with self._lock:
self._prune_locked(time.monotonic())
return {
"backend": SESSION_BACKEND,
"active_sessions": len(self._items),
"ttl_seconds": self._ttl_seconds,
"max_active": self._max_active,
"multi_worker_safe": False,
}
def _prune_locked(self, now: float) -> None:
expired = [
sid
for sid, entry in self._items.items()
if now - entry.last_access_at > self._ttl_seconds
]
for sid in expired:
self._items.pop(sid, None)
_sessions = SessionStore(ttl_seconds=SESSION_TTL_SECONDS, max_active=SESSION_MAX_ACTIVE)
_STATIC_DIR = Path(__file__).resolve().parent / "static"
_OUTPUTS_DIR = Path(__file__).resolve().parent / "outputs"
_FRONTEND_OUT_DIR = Path(__file__).resolve().parent / "ui" / "out"
_FRONTEND_NEXT_DIR = _FRONTEND_OUT_DIR / "_next"
if _FRONTEND_NEXT_DIR.exists():
app.mount("/_next", StaticFiles(directory=_FRONTEND_NEXT_DIR), name="next-assets")
def _get_env(session_id: str) -> SentinelEnv | ClusterTrustEnv:
env = _sessions.get(session_id)
if env is None:
raise HTTPException(status_code=404, detail=f"Session '{session_id}' not found. Call /reset first.")
return env
def _get_cluster_env(session_id: str) -> ClusterTrustEnv:
env = _get_env(session_id)
if not isinstance(env, ClusterTrustEnv):
raise HTTPException(
status_code=400,
detail=(
"Session is in abstract SentinelEnv mode. Start a cluster session via "
"POST /cluster/reset (or POST /reset with mode='cluster')."
),
)
return env
def _resolve_env_mode(task_type: str | None, mode: str | None = None) -> tuple[str, str]:
requested_task = task_type or "task3"
requested_mode = (mode or "").lower()
if requested_task.startswith("cluster_"):
return "cluster", requested_task.removeprefix("cluster_")
if requested_mode in {"cluster", "gpu", "gpu_cluster"}:
return "cluster", requested_task
return "abstract", requested_task
def _state_for(env: SentinelEnv | ClusterTrustEnv, session_id: str) -> dict[str, Any]:
if isinstance(env, ClusterTrustEnv):
return env.state()
return env.state(session_id=session_id)
def _add_demo_context(result: dict[str, Any], env: SentinelEnv | ClusterTrustEnv) -> dict[str, Any]:
obs = result["observation"]
if isinstance(env, ClusterTrustEnv):
result["info"]["mission"] = {
"name": "GPU Cluster Trust Mission",
"real_life_example": (
"Schedule AI training jobs across GPUs while unreliable workers, "
"false completions, and adversarial reports try to corrupt state."
),
"what_the_model_learns": [
"Allocate scarce GPU memory under deadlines.",
"Calibrate trust from worker behavior, not fixed identity.",
"Detect reward hacking and false progress reports.",
"Keep long-horizon cluster health aligned with the original goal.",
],
}
result["info"]["orchestrator_prompt"] = _build_cluster_prompt(obs)
result["info"]["environment_mode"] = "cluster"
else:
result["info"]["mission"] = mission_for_task(obs["task_type"])
result["info"]["orchestrator_prompt"] = build_orchestrator_prompt(obs)
result["info"]["environment_mode"] = "abstract"
return result
def _build_cluster_prompt(observation: dict[str, Any]) -> str:
coverage = observation.get("ai_failure_coverage", {})
return (
"You are the SENTINEL orchestrator inside a simulated AI GPU cluster.\n\n"
"Mission: keep GPU utilization useful, finish jobs before deadlines, and "
"route around unreliable or adversarial worker reports.\n\n"
f"Step count: {observation.get('step_count', 0)} / {observation.get('max_steps', 0)}\n"
f"Cluster health: {observation.get('cluster_health', 0.0):.3f}\n"
f"GPU utilization: {observation.get('utilization_rate', 0.0):.3f}\n"
f"Trust snapshot: {json.dumps(observation.get('trust_snapshot', {}))}\n"
f"Audit anomaly scores: {json.dumps(observation.get('audit_anomaly_scores', {}))}\n"
f"AI reliability modifier: {coverage.get('ai_reliability_modifier', 1.0)}\n\n"
"Valid JSON actions include:\n"
'{"action_type":"allocate","job_id":"JOB-001","gpu_id":"GPU-00","worker_id":"S2"}\n'
'{"action_type":"verify","job_id":"JOB-001"}\n'
'{"action_type":"preempt","job_id":"JOB-001"}\n'
'{"action_type":"request_info","job_id":"JOB-001"}\n'
'{"action_type":"tick"}\n\n'
"Prefer high-trust workers, verify suspicious/high-impact reports, and avoid "
"repeating failed actions without progress."
)
# ---------------------------------------------------------------------------
# Request / Response models
# ---------------------------------------------------------------------------
class ResetRequest(BaseModel):
task_type: str | None = None
scenario_id: str | None = None
seed: int | None = None
adaptive: bool = False
mode: str | None = None
class StepRequest(BaseModel):
session_id: str
task_type: str | None = None
action_type: str # delegate | verify | solve_independently | skip
specialist_id: str | None = None
worker_id: str | None = None
job_id: str | None = None
gpu_id: str | None = None
force_flag: bool | None = None
subtask_response: str | None = None
reasoning: str | None = None
# Cluster-only request shapes. Kept separate from ResetRequest/StepRequest so
# the OpenAPI schema makes the GPU-cluster contract explicit.
CLUSTER_ACTION_TYPES = ("allocate", "preempt", "request_info", "verify", "tick")
class ClusterResetRequest(BaseModel):
task_type: str | None = None # "task1" | "task2" | "task3" (also accepts "cluster_task*")
seed: int | None = None
adaptive: bool = False
class ClusterStepRequest(BaseModel):
action_type: str # allocate | preempt | request_info | verify | tick
job_id: str | None = None
gpu_id: str | None = None
worker_id: str | None = None
force_flag: bool | None = None
reasoning: str | None = None
# ---------------------------------------------------------------------------
# Endpoints
# ---------------------------------------------------------------------------
@app.get("/health")
def health():
return {
"status": "ok",
"environment": "sentinel-env",
"version": "1.0.0",
"session_store": _sessions.stats(),
}
@app.get("/")
def root():
frontend_index = _FRONTEND_OUT_DIR / "index.html"
if frontend_index.exists():
return FileResponse(frontend_index)
index_path = _STATIC_DIR / "index.html"
if index_path.exists():
return FileResponse(index_path)
return JSONResponse(
{
"name": "sentinel-env",
"status": "ok",
"summary": (
"SENTINEL trains an orchestrator to calibrate trust, verify risky "
"outputs, recover from failures, and finish long multi-agent tasks."
),
"routes": [
"/health", "/problem", "/mission", "/metadata", "/tasks", "/schema",
"/grader", "/reward-report", "/difficulty", "/stream", "/trust-dashboard",
"/cluster-dashboard",
"/reset", "/step", "/state",
"/cluster", "/cluster/metadata", "/cluster/tasks",
"/cluster/reset", "/cluster/step", "/cluster/state",
"/cluster/gpus", "/cluster/jobs", "/cluster/workers",
"/cluster/audit", "/cluster/audit/investigate",
"/cluster/ai-failure-coverage", "/cluster/reward-report", "/cluster/stream",
],
}
)
@app.get("/assets/baseline_comparison.png")
def baseline_comparison_chart():
chart_path = _OUTPUTS_DIR / "baseline_comparison.png"
if not chart_path.exists():
raise HTTPException(status_code=404, detail="Baseline comparison chart not found.")
return FileResponse(chart_path, media_type="image/png")
@app.get("/assets/evaluation_results.json")
def evaluation_results():
results_path = _OUTPUTS_DIR / "evaluation_results.json"
if not results_path.exists():
raise HTTPException(status_code=404, detail="Evaluation results not found.")
return FileResponse(results_path, media_type="application/json")
@app.get("/assets/trained_policy_replay.jsonl")
def trained_policy_replay():
replay_path = _OUTPUTS_DIR / "trained_policy_replay.jsonl"
if not replay_path.exists():
raise HTTPException(status_code=404, detail="Trained policy replay not found.")
return FileResponse(replay_path, media_type="application/x-ndjson")
@app.get("/assets/charts/{filename}")
def chart_asset(filename: str):
if "/" in filename or not filename.endswith(".png"):
raise HTTPException(status_code=400, detail="Invalid chart filename.")
chart_path = _OUTPUTS_DIR / "charts" / filename
if not chart_path.exists():
raise HTTPException(status_code=404, detail="Chart not found.")
return FileResponse(chart_path, media_type="image/png")
@app.get("/api")
def api_root():
return {
"name": "sentinel-env",
"status": "ok",
"summary": (
"SENTINEL trains an orchestrator to calibrate trust, verify risky "
"outputs, recover from failures, and finish long multi-agent tasks."
),
"routes": [
"/health", "/problem", "/mission", "/metadata", "/tasks", "/schema",
"/grader", "/reward-report", "/difficulty", "/stream", "/trust-dashboard",
"/cluster-dashboard",
"/reset", "/step", "/state",
"/cluster", "/cluster/metadata", "/cluster/tasks",
"/cluster/reset", "/cluster/step", "/cluster/state",
"/cluster/gpus", "/cluster/jobs", "/cluster/workers",
"/cluster/audit", "/cluster/audit/investigate",
"/cluster/ai-failure-coverage", "/cluster/reward-report", "/cluster/stream",
],
}
@app.get("/problem")
def problem():
"""Judge-readable explanation of what the environment solves."""
return problem_statement()
@app.get("/mission")
def mission(task_type: str = Query("task3", pattern="^task[123]$")):
"""Real-world wrapper for each abstract OpenEnv task."""
return {
"task_type": task_type,
"mission": mission_for_task(task_type),
"how_to_use": (
"Call /reset to get an observation, then ask an orchestrator model to "
"emit one JSON action for /step."
),
}
@app.get("/metadata")
def metadata():
summary = scenario_summary()
return {
"name": "sentinel-env",
"version": "1.0.0",
"description": "Multi-agent trust calibration RL environment.",
"tasks": {
"task1": {"name": "Single-Step Trust Decision", "difficulty": "easy", "subtasks": 10, "max_steps": 15},
"task2": {"name": "Multi-Step Delegation Chain","difficulty": "medium","subtasks": 15, "max_steps": 30},
"task3": {"name": "Full Adversarial Episode", "difficulty": "hard", "subtasks": 20, "max_steps": 45},
"cluster_task1": {"name": "Cluster Basics", "difficulty": "easy", "jobs": 10, "gpus": 8, "max_steps": 30},
"cluster_task2": {"name": "Unreliable Workers", "difficulty": "medium", "jobs": 20, "gpus": 12, "max_steps": 60},
"cluster_task3": {"name": "Full Adversarial Cluster", "difficulty": "hard", "jobs": 30, "gpus": 16, "max_steps": 120},
},
"specialists": ["S0 (AccurateSlow)", "S1 (OverconfidentFast)",
"S2 (DomainBound)", "S3 (Adversarial)", "S4 (Degrading)"],
"action_types": ["delegate", "verify", "solve_independently", "skip"],
"cluster_action_types": ["allocate", "preempt", "request_info", "verify", "tick"],
"scenarios": summary,
"reward_range": "(0.01, 0.99) boundary-exclusive",
"observation_features": [
"trust_snapshot",
"behavioral_fingerprints.confidence_accuracy_gap",
"behavioral_fingerprints.domain_hit_rate",
"behavioral_fingerprints.stakes_volatility",
"difficulty_profile",
],
"real_world_bridge": problem_statement()["problem"]["not_a_simple_prompt_solver"],
"deployment_contract": {
"session_backend": SESSION_BACKEND,
"single_worker_required": True,
"reason": "Active SentinelEnv objects live in one process memory with TTL/LRU cleanup.",
"ttl_seconds": SESSION_TTL_SECONDS,
"max_active_sessions": SESSION_MAX_ACTIVE,
},
"adaptive_curriculum": GLOBAL_DIFFICULTY_CONTROLLER.state(),
"cluster_mode": {
"how_to_enable": (
"POST /cluster/reset with {\"task_type\":\"task3\"} (preferred), "
"or POST /reset with {\"mode\":\"cluster\",\"task_type\":\"task3\"} "
"or {\"task_type\":\"cluster_task3\"}."
),
"live_dashboard": "/cluster-dashboard?session_id=<session_id>",
"api_root": "/cluster",
},
}
@app.get("/tasks")
def tasks():
return {
"task1": {
"difficulty": "easy",
"description": "10-subtask linear chain. Choose which specialist to delegate each subtask to.",
"adversary_active": False,
"reward": "0.99 correct delegation + stakes awareness | 0.02 skip",
"mission": mission_for_task("task1"),
},
"task2": {
"difficulty": "medium",
"description": "15-subtask branching DAG. Build trust profile across honest specialists.",
"adversary_active": False,
"reward": "0.99 completion Γ— calibration bonus",
"mission": mission_for_task("task2"),
},
"task3": {
"difficulty": "hard",
"description": "20-subtask full DAG. Adversarial specialist active. Detect and route around poison.",
"adversary_active": True,
"reward": "0.35Γ—completion + 0.30Γ—detection + 0.25Γ—calibration + 0.10Γ—efficiency",
"mission": mission_for_task("task3"),
},
"cluster_task3": {
"difficulty": "hard",
"description": "30-job, 16-GPU cluster. Allocate jobs under unreliable workers, reward hacking, and adversarial false reports.",
"adversary_active": True,
"reward": "global_agent_score Γ— cluster_health Γ— ai_reliability_modifier + terminal cluster score",
"mission": "Full GPU-cluster trust mission.",
},
}
@app.get("/schema")
def schema():
return {
"reset_request": ResetRequest.model_json_schema(),
"step_request": StepRequest.model_json_schema(),
}
@app.get("/grader")
def grader():
return {
"task1": {
"step": "accuracyΓ—0.43 + stakesΓ—0.30 + efficiencyΓ—0.12 + confidenceΓ—0.07 + domainΓ—0.04 + verifyΓ—0.04",
"terminal": "same as last step",
},
"task2": {
"step": "accuracyΓ—0.55 + efficiencyΓ—0.25 + confidenceΓ—0.10 + domainΓ—0.10",
"terminal": "completion_rateΓ—0.65 + trust_calibrationΓ—0.35",
},
"task3": {
"step": "accuracyΓ—0.32 + stakesΓ—0.33 + efficiencyΓ—0.10 + confidenceΓ—0.10 + verifyΓ—0.10 + domainΓ—0.05",
"terminal": "completionΓ—0.35 + detectionΓ—0.30 + calibrationΓ—0.25 + efficiencyΓ—0.10",
},
"cluster_task3": {
"step": "weighted(orchestrator, resource_manager, auditor, worker) Γ— cluster_health Γ— ai_reliability_modifier",
"terminal": "jobsΓ—0.30 + adversarial_detectionΓ—0.25 + reward_hack_detectionΓ—0.20 + plan_coherenceΓ—0.15 + efficiencyΓ—0.10",
},
}
@app.get("/reward-report")
def reward_report(session_id: str = Query(...)):
env = _get_env(session_id)
return env.reward_report()
@app.get("/difficulty")
def difficulty():
return {
"controller": GLOBAL_DIFFICULTY_CONTROLLER.state(),
"how_to_enable": "POST /reset with {\"task_type\":\"task3\",\"adaptive\":true}.",
}
@app.post("/difficulty/reset")
def reset_difficulty():
GLOBAL_DIFFICULTY_CONTROLLER.reset()
return {"controller": GLOBAL_DIFFICULTY_CONTROLLER.state()}
@app.get("/stream")
async def stream(session_id: str = Query(...)):
async def event_gen():
while True:
env = _sessions.get(session_id)
if env is None:
yield "event: close\ndata: {\"reason\":\"session_not_found\"}\n\n"
break
yield f"data: {json.dumps(env.stream_snapshot())}\n\n"
if env.done:
break
await asyncio.sleep(0.5)
return StreamingResponse(
event_gen(),
media_type="text/event-stream",
headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"},
)
@app.get("/trust-dashboard")
def trust_dashboard(session_id: str = Query("")):
return HTMLResponse(_trust_dashboard_html(session_id))
@app.get("/cluster-dashboard")
def cluster_dashboard(session_id: str = Query("")):
return HTMLResponse(_trust_dashboard_html(session_id))
@app.post("/reset")
def reset(req: ResetRequest = ResetRequest()):
env_mode, task_type = _resolve_env_mode(req.task_type, req.mode)
if env_mode == "cluster":
env = ClusterTrustEnv()
result = env.reset(task_type=task_type, seed=req.seed, adaptive=req.adaptive)
else:
env = SentinelEnv()
result = env.reset(
task_type=task_type,
scenario_id=req.scenario_id,
seed=req.seed,
adaptive=req.adaptive,
)
session_id = result["info"]["session_id"]
_sessions.set(session_id, env)
return _add_demo_context(result, env)
@app.post("/step")
def step(req: StepRequest, session_id: str = Query(...)):
env = _get_env(session_id)
try:
result = env.step(req.model_dump(exclude_none=True))
except (RuntimeError, ValueError) as e:
raise HTTPException(status_code=400, detail=str(e))
# Clean up completed sessions to avoid memory leak
if result["done"]:
_sessions.pop(session_id)
else:
_add_demo_context(result, env)
return result
@app.get("/state")
def state(session_id: str = Query(...)):
env = _get_env(session_id)
return _state_for(env, session_id)
@app.post("/mcp")
def mcp(body: dict[str, Any]):
"""MCP-compatible endpoint for tool-calling agents."""
method = body.get("method", "")
params = body.get("params", {})
if method == "reset":
env_mode, task_type = _resolve_env_mode(params.get("task_type"), params.get("mode"))
if env_mode == "cluster":
env = ClusterTrustEnv()
result = env.reset(
task_type=task_type,
seed=params.get("seed"),
adaptive=bool(params.get("adaptive", False)),
)
else:
env = SentinelEnv()
clean_params = dict(params)
clean_params["task_type"] = task_type
clean_params.pop("mode", None)
result = env.reset(**clean_params)
session_id = result["info"]["session_id"]
_sessions.set(session_id, env)
return {"result": _add_demo_context(result, env)}
elif method == "step":
session_id = params.get("session_id") or body.get("session_id")
if not session_id:
raise HTTPException(status_code=400, detail="session_id required for step.")
env = _get_env(session_id)
result = env.step(params)
if result["done"]:
_sessions.pop(session_id)
else:
_add_demo_context(result, env)
return {"result": result}
elif method == "state":
session_id = params.get("session_id")
if not session_id:
raise HTTPException(status_code=400, detail="session_id required for state.")
return {"result": _state_for(_get_env(session_id), session_id)}
else:
raise HTTPException(status_code=400, detail=f"Unknown method: {method}")
# ---------------------------------------------------------------------------
# Cluster API (GPU cluster trust mission, namespaced under /cluster/*)
# ---------------------------------------------------------------------------
def _cluster_task_type(raw: str | None) -> str:
task_type = (raw or "task3").removeprefix("cluster_")
if task_type not in CLUSTER_TASK_CONFIG:
raise HTTPException(
status_code=400,
detail=(
f"Unknown cluster task_type '{raw}'. "
f"Expected one of: {', '.join(sorted(CLUSTER_TASK_CONFIG))}."
),
)
return task_type
@app.get("/cluster")
def cluster_root():
return {
"name": "sentinel-cluster",
"summary": (
"GPU cluster trust calibration API. The orchestrator schedules jobs across "
"GPUs, audits worker reports, and routes around adversarial false completions "
"while keeping cluster health and AI reliability high."
),
"session_lifecycle": [
"POST /cluster/reset -> {info.session_id}",
"POST /cluster/step?session_id=...",
"GET /cluster/state?session_id=... (or /cluster/stream for SSE)",
],
"routes": [
"POST /cluster/reset",
"POST /cluster/step",
"GET /cluster/state",
"GET /cluster/gpus",
"GET /cluster/jobs",
"GET /cluster/workers",
"GET /cluster/audit",
"GET /cluster/audit/investigate",
"GET /cluster/ai-failure-coverage",
"GET /cluster/reward-report",
"GET /cluster/stream",
"GET /cluster/metadata",
"GET /cluster/tasks",
"GET /cluster-dashboard",
],
}
@app.get("/cluster/metadata")
def cluster_metadata():
return {
"tasks": {
"task1": {**CLUSTER_TASK_CONFIG["task1"], "name": "Cluster Basics"},
"task2": {**CLUSTER_TASK_CONFIG["task2"], "name": "Unreliable Workers"},
"task3": {**CLUSTER_TASK_CONFIG["task3"], "name": "Full Adversarial Cluster"},
},
"action_types": {
"allocate": {"description": "Place a queued job on a GPU and assign a worker.",
"fields": ["job_id?", "gpu_id?", "worker_id?"]},
"preempt": {"description": "Free a running job from its GPU.",
"fields": ["job_id?"]},
"request_info": {"description": "Ask the assigned worker for a fresh progress report.",
"fields": ["job_id?", "worker_id?"]},
"verify": {"description": "Audit a worker's report. Catches false completions and lying.",
"fields": ["job_id?", "worker_id?", "force_flag?"]},
"tick": {"description": "Advance the cluster clock without acting.",
"fields": []},
},
"workers": list(["S0", "S1", "S2", "S3", "S4"]),
"scoring": "global_reward = weighted(orchestrator, resource_manager, auditor, worker) Γ— cluster_health Γ— ai_reliability_modifier",
"terminal": "task1: jobs+util | task2: jobs+calibration+deadlines | task3: jobs+detection+plan_coherence+efficiency",
"controller": GLOBAL_DIFFICULTY_CONTROLLER.state(),
}
@app.get("/cluster/tasks")
def cluster_tasks():
descriptions = {
"task1": "10-job warmup. No adversary, no GPU failures. Learn the allocate/preempt/tick loop.",
"task2": "20-job stream with unreliable/slow/degrading workers and rare GPU failures.",
"task3": "30-job adversarial cluster: false memory reports, false completions, poisoned reward claims.",
}
out: dict[str, Any] = {}
for tid, cfg in CLUSTER_TASK_CONFIG.items():
out[tid] = {
"difficulty": {"task1": "easy", "task2": "medium", "task3": "hard"}[tid],
"description": descriptions[tid],
"adversary_active": cfg["adversary"],
"jobs": cfg["jobs"],
"gpus": cfg["gpus"],
"max_steps": cfg["max_steps"],
"failure_probability": cfg["failure_probability"],
}
return out
@app.post("/cluster/reset")
def cluster_reset(req: ClusterResetRequest = ClusterResetRequest()):
task_type = _cluster_task_type(req.task_type)
env = ClusterTrustEnv()
result = env.reset(task_type=task_type, seed=req.seed, adaptive=req.adaptive)
session_id = result["info"]["session_id"]
_sessions.set(session_id, env)
return _add_demo_context(result, env)
@app.post("/cluster/step")
def cluster_step(req: ClusterStepRequest, session_id: str = Query(...)):
if req.action_type not in CLUSTER_ACTION_TYPES:
raise HTTPException(
status_code=400,
detail=f"Unknown cluster action_type '{req.action_type}'. Expected one of: {', '.join(CLUSTER_ACTION_TYPES)}.",
)
env = _get_cluster_env(session_id)
try:
result = env.step(req.model_dump(exclude_none=True))
except (RuntimeError, ValueError) as exc:
raise HTTPException(status_code=400, detail=str(exc))
if result["done"]:
_sessions.pop(session_id)
else:
_add_demo_context(result, env)
return result
@app.get("/cluster/state")
def cluster_state(session_id: str = Query(...)):
env = _get_cluster_env(session_id)
return env.state()
@app.get("/cluster/gpus")
def cluster_gpus(session_id: str = Query(...), include_hidden: bool = Query(False)):
env = _get_cluster_env(session_id)
return {
"summary": env._pool.summary(),
"gpus": env._pool.snapshot(include_hidden=include_hidden),
}
@app.get("/cluster/jobs")
def cluster_jobs(
session_id: str = Query(...),
include_hidden: bool = Query(False),
deadline_window: int = Query(10, ge=1, le=240),
):
env = _get_cluster_env(session_id)
return {
"summary": env._jobs.summary(),
"jobs": env._jobs.snapshot(include_hidden=include_hidden),
"deadline_pressure": [
job.job_id for job in env._jobs.deadline_pressure(env.step_count, window=deadline_window)
],
}
@app.get("/cluster/workers")
def cluster_workers(session_id: str = Query(...)):
env = _get_cluster_env(session_id)
return {
"available": env._workers.available_ids(),
"trust_snapshot": env._trust.snapshot(),
"behavioral_fingerprints": env._trust.behavioral_fingerprints(),
"public_ground_truth_reliability": env._workers.public_ground_truth_reliability(),
}
@app.get("/cluster/audit")
def cluster_audit(session_id: str = Query(...)):
env = _get_cluster_env(session_id)
return env._audit.snapshot()
@app.get("/cluster/audit/investigate")
def cluster_audit_investigate(
session_id: str = Query(...),
agent_id: str = Query(..., description="Worker public id (S0..S4) or 'cluster'/'adversary'/'auditor'."),
window: int = Query(10, ge=1, le=240),
):
env = _get_cluster_env(session_id)
return env._audit.investigate(agent_id, window=window)
@app.get("/cluster/ai-failure-coverage")
def cluster_ai_failure_coverage(session_id: str = Query(...)):
env = _get_cluster_env(session_id)
return env.ai_failure_coverage()
@app.get("/cluster/reward-report")
def cluster_reward_report(session_id: str = Query(...)):
env = _get_cluster_env(session_id)
return env.reward_report()
@app.get("/cluster/stream")
async def cluster_stream(session_id: str = Query(...)):
async def event_gen():
while True:
env = _sessions.get(session_id)
if env is None or not isinstance(env, ClusterTrustEnv):
yield (
"event: close\n"
"data: {\"reason\":\"session_not_found_or_not_cluster\"}\n\n"
)
break
yield f"data: {json.dumps(env.stream_snapshot())}\n\n"
if env.done:
break
await asyncio.sleep(0.5)
return StreamingResponse(
event_gen(),
media_type="text/event-stream",
headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"},
)
def _trust_dashboard_html(session_id: str) -> str:
escaped_session = html.escape(session_id, quote=True)
return f"""<!doctype html>
<html lang="en">
<head>
<meta charset="utf-8" />
<meta name="viewport" content="width=device-width, initial-scale=1" />
<title>SENTINEL Trust Dashboard</title>
<style>
:root {{
color-scheme: dark;
font-family: Inter, ui-sans-serif, system-ui, -apple-system, BlinkMacSystemFont, "Segoe UI", sans-serif;
background: #0b0f14;
color: #e5eef8;
}}
body {{ margin: 0; min-height: 100vh; display: grid; place-items: center; background: #0b0f14; }}
main {{ width: min(1180px, calc(100vw - 32px)); }}
header {{ display: flex; justify-content: space-between; gap: 24px; align-items: end; margin-bottom: 28px; }}
h1 {{ margin: 0; font-size: clamp(28px, 5vw, 56px); letter-spacing: 0; }}
p {{ color: #94a3b8; line-height: 1.6; margin: 8px 0 0; max-width: 640px; }}
input {{ width: 360px; max-width: 100%; background: #111827; color: #e5eef8; border: 1px solid #263241; border-radius: 8px; padding: 11px 12px; }}
button {{ background: #e5eef8; color: #0b0f14; border: 0; border-radius: 8px; padding: 11px 14px; font-weight: 700; cursor: pointer; }}
.controls {{ display: flex; gap: 8px; flex-wrap: wrap; justify-content: end; }}
.panel {{ border: 1px solid #223043; background: #0f1722; border-radius: 8px; padding: 24px; box-shadow: 0 24px 80px rgba(0,0,0,.32); }}
.bar {{ display: grid; grid-template-columns: 56px 1fr 74px; align-items: center; gap: 16px; margin: 18px 0; }}
.id {{ font-weight: 800; font-size: 22px; }}
.track {{ height: 28px; background: #182231; border-radius: 6px; overflow: hidden; border: 1px solid #263241; }}
.fill {{ height: 100%; width: 50%; background: linear-gradient(90deg, #ef4444, #f59e0b, #10b981); transition: width .35s ease; }}
.score {{ font-variant-numeric: tabular-nums; text-align: right; color: #d9f99d; font-size: 22px; font-weight: 800; }}
.meta {{ display: grid; grid-template-columns: repeat(4, minmax(0, 1fr)); gap: 12px; margin-top: 22px; }}
.stat {{ border: 1px solid #223043; background: #0b111a; border-radius: 8px; padding: 14px; }}
.label {{ color: #94a3b8; font-size: 12px; text-transform: uppercase; letter-spacing: .08em; }}
.value {{ margin-top: 8px; font-size: 18px; font-weight: 800; }}
@media (max-width: 760px) {{
header, .meta {{ display: block; }}
.controls {{ justify-content: stretch; margin-top: 18px; }}
input, button {{ width: 100%; }}
.stat {{ margin-top: 12px; }}
}}
</style>
</head>
<body>
<main>
<header>
<div>
<h1>SENTINEL Live Trust</h1>
<p>Watch the orchestrator's trust ledger move in real time as specialists prove reliable, degrade, or get caught poisoning high-stakes work.</p>
</div>
<div class="controls">
<input id="sid" placeholder="session_id" value="{escaped_session}" />
<button onclick="connect()">Connect</button>
</div>
</header>
<section class="panel" id="bars"></section>
</main>
<script>
const ids = ["S0", "S1", "S2", "S3", "S4"];
const bars = document.getElementById("bars");
bars.innerHTML = ids.map(id => `
<div class="bar">
<div class="id">${{id}}</div>
<div class="track"><div class="fill" id="fill-${{id}}"></div></div>
<div class="score" id="score-${{id}}">0.500</div>
</div>
`).join("") + `
<div class="meta">
<div class="stat"><div class="label">step</div><div class="value" id="step">0 / 0</div></div>
<div class="stat"><div class="label">last reward</div><div class="value" id="reward">0.000</div></div>
<div class="stat"><div class="label">cluster health</div><div class="value" id="health">β€”</div></div>
<div class="stat"><div class="label">gpu utilization</div><div class="value" id="util">β€”</div></div>
<div class="stat"><div class="label">jobs complete</div><div class="value" id="jobs">β€”</div></div>
<div class="stat"><div class="label">attacks caught</div><div class="value" id="attacks">β€”</div></div>
<div class="stat"><div class="label">ai reliability</div><div class="value" id="airel">β€”</div></div>
<div class="stat"><div class="label">adaptive threshold</div><div class="value" id="threshold">0.700</div></div>
</div>`;
let source = null;
function connect() {{
if (source) source.close();
const sid = document.getElementById("sid").value.trim();
if (!sid) return;
source = new EventSource(`/stream?session_id=${{encodeURIComponent(sid)}}`);
source.onmessage = event => {{
const data = JSON.parse(event.data);
ids.forEach(id => {{
const value = data.trust_snapshot?.[id] ?? 0.5;
document.getElementById(`fill-${{id}}`).style.width = `${{Math.round(value * 100)}}%`;
document.getElementById(`score-${{id}}`).textContent = Number(value).toFixed(3);
}});
document.getElementById("step").textContent = `${{data.step_count}} / ${{data.max_steps}}`;
document.getElementById("reward").textContent = Number(data.last_reward || 0).toFixed(3);
const cluster = data.cluster || {{}};
const jobs = data.jobs || {{}};
const coverage = data.ai_failure_coverage || {{}};
document.getElementById("health").textContent = cluster.cluster_health_score == null ? "β€”" : Number(cluster.cluster_health_score).toFixed(3);
document.getElementById("util").textContent = cluster.utilization_rate == null ? "β€”" : `${{Math.round(Number(cluster.utilization_rate) * 100)}}%`;
const doneJobs = jobs.statuses?.complete;
document.getElementById("jobs").textContent = doneJobs == null ? "β€”" : `${{doneJobs}} / ${{jobs.jobs_total}}`;
const detections = data.attack_detections;
document.getElementById("attacks").textContent = detections == null ? "β€”" : `${{detections}} / ${{data.attack_attempts || 0}}`;
document.getElementById("airel").textContent = coverage.ai_reliability_modifier == null ? "β€”" : Number(coverage.ai_reliability_modifier).toFixed(3);
document.getElementById("threshold").textContent = Number(data.difficulty_profile?.adversarial_threshold || 0.7).toFixed(3);
}};
}}
if (document.getElementById("sid").value.trim()) connect();
</script>
</body>
</html>"""
# ---------------------------------------------------------------------------
# Entry point
# ---------------------------------------------------------------------------
if __name__ == "__main__":
import uvicorn
port = int(os.environ.get("PORT", 7860))
uvicorn.run("app:app", host="0.0.0.0", port=port, reload=False)