Spaces:
Running
Running
| from __future__ import annotations | |
| import os | |
| from typing import Any | |
| from fastapi import FastAPI, HTTPException, Query | |
| from fastapi.responses import JSONResponse | |
| from pydantic import BaseModel | |
| from environment import SentinelEnv | |
| from scenarios import scenario_summary | |
| # --------------------------------------------------------------------------- | |
| # App + session store | |
| # --------------------------------------------------------------------------- | |
| app = FastAPI( | |
| title="SENTINEL — Multi-Agent Trust Calibration Environment", | |
| description=( | |
| "OpenEnv-compatible RL environment where an orchestrator agent learns " | |
| "dynamic trust calibration across adversarial long-horizon tasks." | |
| ), | |
| version="1.0.0", | |
| ) | |
| # One env instance per session_id | |
| _sessions: dict[str, SentinelEnv] = {} | |
| def _get_env(session_id: str) -> SentinelEnv: | |
| if session_id not in _sessions: | |
| raise HTTPException(status_code=404, detail=f"Session '{session_id}' not found. Call /reset first.") | |
| return _sessions[session_id] | |
| # --------------------------------------------------------------------------- | |
| # Request / Response models | |
| # --------------------------------------------------------------------------- | |
| class ResetRequest(BaseModel): | |
| task_type: str | None = None | |
| scenario_id: str | None = None | |
| seed: int | None = None | |
| class StepRequest(BaseModel): | |
| session_id: str | |
| task_type: str | |
| action_type: str # delegate | verify | solve_independently | skip | |
| specialist_id: str | None = None | |
| subtask_response: str | None = None | |
| reasoning: str | None = None | |
| # --------------------------------------------------------------------------- | |
| # Endpoints | |
| # --------------------------------------------------------------------------- | |
| def health(): | |
| return {"status": "ok", "environment": "sentinel-env", "version": "1.0.0"} | |
| def root(): | |
| return { | |
| "name": "sentinel-env", | |
| "status": "ok", | |
| "summary": ( | |
| "SENTINEL trains an orchestrator to calibrate trust, verify risky " | |
| "outputs, recover from failures, and finish long multi-agent tasks." | |
| ), | |
| "routes": ["/health", "/metadata", "/tasks", "/schema", "/grader", "/reset", "/step", "/state"], | |
| } | |
| def metadata(): | |
| summary = scenario_summary() | |
| return { | |
| "name": "sentinel-env", | |
| "version": "1.0.0", | |
| "description": "Multi-agent trust calibration RL environment.", | |
| "tasks": { | |
| "task1": {"name": "Single-Step Trust Decision", "difficulty": "easy", "subtasks": 10, "max_steps": 15}, | |
| "task2": {"name": "Multi-Step Delegation Chain","difficulty": "medium","subtasks": 15, "max_steps": 30}, | |
| "task3": {"name": "Full Adversarial Episode", "difficulty": "hard", "subtasks": 20, "max_steps": 45}, | |
| }, | |
| "specialists": ["S0 (AccurateSlow)", "S1 (OverconfidentFast)", | |
| "S2 (DomainBound)", "S3 (Adversarial)", "S4 (Degrading)"], | |
| "action_types": ["delegate", "verify", "solve_independently", "skip"], | |
| "scenarios": summary, | |
| "reward_range": "(0.01, 0.99) boundary-exclusive", | |
| } | |
| def tasks(): | |
| return { | |
| "task1": { | |
| "difficulty": "easy", | |
| "description": "10-subtask linear chain. Choose which specialist to delegate each subtask to.", | |
| "adversary_active": False, | |
| "reward": "0.99 correct delegation + stakes awareness | 0.02 skip", | |
| }, | |
| "task2": { | |
| "difficulty": "medium", | |
| "description": "15-subtask branching DAG. Build trust profile across honest specialists.", | |
| "adversary_active": False, | |
| "reward": "0.99 completion × calibration bonus", | |
| }, | |
| "task3": { | |
| "difficulty": "hard", | |
| "description": "20-subtask full DAG. Adversarial specialist active. Detect and route around poison.", | |
| "adversary_active": True, | |
| "reward": "0.35×completion + 0.30×detection + 0.25×calibration + 0.10×efficiency", | |
| }, | |
| } | |
| def schema(): | |
| return { | |
| "reset_request": ResetRequest.model_json_schema(), | |
| "step_request": StepRequest.model_json_schema(), | |
| } | |
| def grader(): | |
| return { | |
| "task1": { | |
| "step": "task_accuracy×0.50 + stakes_awareness×0.35 + efficiency×0.15", | |
| "terminal": "same as last step", | |
| }, | |
| "task2": { | |
| "step": "task_accuracy×0.65 + efficiency×0.35", | |
| "terminal": "completion_rate×0.65 + trust_calibration×0.35", | |
| }, | |
| "task3": { | |
| "step": "task_accuracy×0.40 + stakes_awareness×0.45 + efficiency×0.15", | |
| "terminal": "completion×0.35 + detection×0.30 + calibration×0.25 + efficiency×0.10", | |
| }, | |
| } | |
| def reset(req: ResetRequest = ResetRequest()): | |
| env = SentinelEnv() | |
| result = env.reset( | |
| task_type=req.task_type, | |
| scenario_id=req.scenario_id, | |
| seed=req.seed, | |
| ) | |
| session_id = result["info"]["session_id"] | |
| _sessions[session_id] = env | |
| return result | |
| def step(req: StepRequest, session_id: str = Query(...)): | |
| env = _get_env(session_id) | |
| try: | |
| result = env.step(req.model_dump()) | |
| except (RuntimeError, ValueError) as e: | |
| raise HTTPException(status_code=400, detail=str(e)) | |
| # Clean up completed sessions to avoid memory leak | |
| if result["done"]: | |
| _sessions.pop(session_id, None) | |
| return result | |
| def state(session_id: str = Query(...)): | |
| env = _get_env(session_id) | |
| return env.state(session_id=session_id) | |
| def mcp(body: dict[str, Any]): | |
| """MCP-compatible endpoint for tool-calling agents.""" | |
| method = body.get("method", "") | |
| params = body.get("params", {}) | |
| if method == "reset": | |
| env = SentinelEnv() | |
| result = env.reset(**params) | |
| session_id = result["info"]["session_id"] | |
| _sessions[session_id] = env | |
| return {"result": result} | |
| elif method == "step": | |
| session_id = params.get("session_id") or body.get("session_id") | |
| if not session_id: | |
| raise HTTPException(status_code=400, detail="session_id required for step.") | |
| env = _get_env(session_id) | |
| result = env.step(params) | |
| if result["done"]: | |
| _sessions.pop(session_id, None) | |
| return {"result": result} | |
| elif method == "state": | |
| session_id = params.get("session_id") | |
| if not session_id: | |
| raise HTTPException(status_code=400, detail="session_id required for state.") | |
| return {"result": _get_env(session_id).state(session_id)} | |
| else: | |
| raise HTTPException(status_code=400, detail=f"Unknown method: {method}") | |
| # --------------------------------------------------------------------------- | |
| # Entry point | |
| # --------------------------------------------------------------------------- | |
| if __name__ == "__main__": | |
| import uvicorn | |
| port = int(os.environ.get("PORT", 7860)) | |
| uvicorn.run("app:app", host="0.0.0.0", port=port, reload=False) | |