File size: 4,741 Bytes
0e4dd30 99b8b51 0e4dd30 14cf714 0e4dd30 99b8b51 0e4dd30 99b8b51 0e4dd30 99b8b51 0e4dd30 99b8b51 14cf714 99b8b51 14cf714 99b8b51 0e4dd30 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 | """
server/app.py — FastAPI application wiring.
Uses OpenEnv SDK's create_app() for WebSocket and standard endpoints
(/ws, /health, /schema, /metadata), then adds our own HTTP routes for
/reset, /step, /state, /tasks, /grader that use a singleton environment.
The SDK's HTTP /reset and /step are stateless (new env per request),
which doesn't work for our multi-step episodes. The WebSocket path
(used by the actual hackathon evaluation) handles sessions correctly.
We override the HTTP paths for testing and inference.
"""
from __future__ import annotations
from typing import Any, Dict, List, Optional
from fastapi import FastAPI, Request
from openenv.core.env_server import create_app
from openenv.core.env_server.serialization import serialize_observation
from pydantic import BaseModel
from models import SevZeroAction, SevZeroObservation
from server.environment import SevZeroEnvironment
from server.grader import grade_episode
from server.scenarios import TASK_DEFINITIONS
# Singleton environment for HTTP mode
_env = SevZeroEnvironment()
# Create the OpenEnv app (wires /ws, /health, /schema, /metadata, /mcp)
app = create_app(
SevZeroEnvironment,
SevZeroAction,
SevZeroObservation,
env_name="sevzero",
)
# ---------------------------------------------------------------------------
# Override HTTP endpoints with stateful versions
# ---------------------------------------------------------------------------
class ResetRequest(BaseModel):
seed: Optional[int] = None
episode_id: Optional[str] = None
task_id: str = "easy"
class StepRequest(BaseModel):
action: Dict[str, Any]
timeout_s: Optional[float] = None
# Remove SDK's stateless routes and replace with ours
_routes_to_remove = {"/reset", "/step", "/state"}
app.routes[:] = [r for r in app.routes if getattr(r, "path", None) not in _routes_to_remove]
@app.post("/reset")
async def reset_env(raw: Request) -> Dict[str, Any]:
"""Reset the environment and return initial observation.
Accepts any body: {}, null, missing, or {"task_id": "hard", "seed": 42}.
"""
try:
body = await raw.json()
except Exception:
body = {}
if not isinstance(body, dict):
body = {}
req = ResetRequest(**{k: v for k, v in body.items() if k in ResetRequest.model_fields})
obs = _env.reset(
seed=req.seed,
episode_id=req.episode_id,
task_id=req.task_id,
)
return serialize_observation(obs)
@app.post("/step")
async def step_env(request: StepRequest) -> Dict[str, Any]:
"""Execute an action and return the new observation."""
action = SevZeroAction(**request.action)
obs = _env.step(action, timeout_s=request.timeout_s)
return serialize_observation(obs)
@app.get("/state")
async def get_state() -> Dict[str, Any]:
"""Return the current environment state."""
state = _env.state
return state.model_dump()
# ---------------------------------------------------------------------------
# Custom routes
# ---------------------------------------------------------------------------
@app.get("/tasks")
async def list_tasks() -> List[Dict[str, Any]]:
"""Return the 3 task definitions (easy, medium, hard)."""
return [
{
"task_id": t["task_id"],
"name": t["name"],
"difficulty": t["difficulty"],
"description": t["description"],
"max_steps": t["max_steps"],
}
for t in TASK_DEFINITIONS
]
class GraderRequest(BaseModel):
final_slo_score: float
steps_taken: int
max_steps: int
actions_taken: List[Dict[str, Any]]
terminated: bool
termination_reason: Optional[str] = None
@app.post("/grader")
async def grade(request: GraderRequest) -> Dict[str, Any]:
"""
Deterministic grading endpoint.
Accepts episode results and returns a score 0.0–1.0 with breakdown.
"""
result = grade_episode(
final_slo_score=request.final_slo_score,
steps_taken=request.steps_taken,
max_steps=request.max_steps,
actions_taken=request.actions_taken,
terminated=request.terminated,
termination_reason=request.termination_reason,
)
return {
"score": result.score,
"slo_recovery": result.slo_recovery,
"action_efficiency": result.action_efficiency,
"time_efficiency": result.time_efficiency,
"details": result.details,
}
# ---------------------------------------------------------------------------
# Entry point
# ---------------------------------------------------------------------------
def main() -> None:
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=7860)
if __name__ == "__main__":
main()
|