Spaces:
Running
Running
| """OpsGate FastAPI server — thin wrapper that holds a single env instance.""" | |
| import sys, os | |
| from fastapi import FastAPI | |
| from pydantic import BaseModel | |
| from typing import Optional | |
| from opsgate_environment import OpsGateEnvironment | |
| from models import ToolCall | |
| app = FastAPI(title="OpsGate Environment API") | |
| env = OpsGateEnvironment() | |
| class StepRequest(BaseModel): | |
| action: dict | |
| def health(): | |
| return {"status": "healthy"} | |
| def reset(): | |
| obs = env.reset() | |
| return {"observation": obs.model_dump(), "reward": 0.0, "done": False} | |
| def step(req: StepRequest): | |
| action = ToolCall(**req.action) | |
| obs = env.step(action) | |
| return { | |
| "observation": obs.model_dump(), | |
| "reward": obs.reward, | |
| "done": obs.done, | |
| } | |
| def state(): | |
| s = env.state | |
| if s is None: | |
| return {"error": "No active episode. Call /reset first."} | |
| return s.model_dump() | |