| """FastAPI server for the Cloud-Native DevOps Debug Environment.""" |
|
|
| from pathlib import Path |
| from typing import Optional |
|
|
| import uvicorn |
| from fastapi import FastAPI, HTTPException |
| from fastapi.middleware.cors import CORSMiddleware |
| from fastapi.responses import HTMLResponse |
| from fastapi.staticfiles import StaticFiles |
|
|
| from server.environment import CloudNativeDebugEnvironment |
| from server.graders import run_grader |
| from server.models import ( |
| Action, |
| BaselineRequest, |
| BaselineResponse, |
| EnvironmentInfo, |
| GraderRequest, |
| GraderResponse, |
| Observation, |
| ResetRequest, |
| ResetResponse, |
| StateResponse, |
| StepRequest, |
| StepResponse, |
| TaskInfo, |
| ) |
| from server.tasks.task_registry import TASK_REGISTRY |
|
|
| STATIC_DIR = Path(__file__).resolve().parent / "static" |
|
|
| app = FastAPI( |
| title="Cloud-Native Debug Environment", |
| description="OpenEnv-style environment for Docker + GitHub Actions + Kubernetes debugging", |
| version="1.0.0", |
| ) |
|
|
| app.add_middleware( |
| CORSMiddleware, |
| allow_origins=["*"], |
| allow_credentials=True, |
| allow_methods=["*"], |
| allow_headers=["*"], |
| ) |
|
|
| |
| app.mount("/static", StaticFiles(directory=str(STATIC_DIR)), name="static") |
|
|
| env: Optional[CloudNativeDebugEnvironment] = None |
|
|
|
|
| @app.get("/", response_class=HTMLResponse) |
| async def root(): |
| html_path = STATIC_DIR / "index.html" |
| return HTMLResponse(content=html_path.read_text(encoding="utf-8"), status_code=200) |
|
|
|
|
| @app.get("/health") |
| async def health(): |
| return {"status": "healthy"} |
|
|
|
|
| @app.get("/metadata") |
| async def metadata(): |
| return { |
| "name": "cloud-native-devops-env", |
| "description": "Debug broken GitHub Actions workflows, Dockerfiles, and Kubernetes manifests. AI agents identify and fix cloud-native deployment pipeline issues.", |
| "version": "1.0.0", |
| "author": "Krishna", |
| "tags": ["devops", "docker", "github-actions", "kubernetes", "debugging", "infrastructure", "cloud-native"], |
| } |
|
|
|
|
| @app.get("/schema") |
| async def schema(): |
| return { |
| "action": Action.model_json_schema(), |
| "observation": Observation.model_json_schema(), |
| "state": StateResponse.model_json_schema(), |
| } |
|
|
|
|
| @app.post("/mcp") |
| async def mcp(request: dict = None): |
| """JSON-RPC 2.0 MCP endpoint.""" |
| request = request or {} |
| method = request.get("method", "") |
| req_id = request.get("id", 1) |
|
|
| if method == "initialize": |
| return { |
| "jsonrpc": "2.0", |
| "id": req_id, |
| "result": { |
| "protocolVersion": "2024-11-05", |
| "capabilities": {"tools": {}}, |
| "serverInfo": {"name": "cloud-native-devops-env", "version": "1.0.0"}, |
| }, |
| } |
| elif method == "tools/list": |
| return { |
| "jsonrpc": "2.0", |
| "id": req_id, |
| "result": { |
| "tools": [ |
| { |
| "name": "reset", |
| "description": "Reset the environment and start a new episode", |
| "inputSchema": ResetRequest.model_json_schema(), |
| }, |
| { |
| "name": "step", |
| "description": "Take an action in the environment", |
| "inputSchema": Action.model_json_schema(), |
| }, |
| { |
| "name": "get_state", |
| "description": "Get the current environment state", |
| "inputSchema": {"type": "object", "properties": {}}, |
| }, |
| ] |
| }, |
| } |
| else: |
| return { |
| "jsonrpc": "2.0", |
| "id": req_id, |
| "error": {"code": -32601, "message": f"Method not found: {method}"}, |
| } |
|
|
|
|
| @app.post("/reset", response_model=ResetResponse) |
| async def reset(request: Optional[ResetRequest] = None): |
| global env |
|
|
| request = request or ResetRequest() |
| env = CloudNativeDebugEnvironment() |
| try: |
| observation = env.reset( |
| task_id=request.task_id, |
| scenario_id=request.scenario_id, |
| seed=request.seed, |
| ) |
| except ValueError as exc: |
| raise HTTPException(status_code=400, detail=str(exc)) from exc |
|
|
| return ResetResponse( |
| observation=observation, |
| info={ |
| "task_id": env.current_task_id, |
| "scenario_id": env.current_scenario_id, |
| "difficulty": env.current_difficulty, |
| }, |
| ) |
|
|
|
|
| @app.post("/step", response_model=StepResponse) |
| async def step(request: StepRequest): |
| global env |
|
|
| if env is None: |
| raise HTTPException(status_code=400, detail="Environment not initialized. Call /reset first.") |
|
|
| observation, reward, done, info = env.step(request.action) |
| return StepResponse(observation=observation, reward=reward, done=done, info=info) |
|
|
|
|
| @app.get("/state", response_model=StateResponse) |
| async def get_state(): |
| global env |
|
|
| if env is None: |
| raise HTTPException(status_code=400, detail="Environment not initialized. Call /reset first.") |
|
|
| return StateResponse( |
| observation=env.get_observation(), |
| episode_reward=env.episode_reward, |
| steps_taken=env.step_count, |
| done=env.done, |
| ) |
|
|
|
|
| @app.get("/info", response_model=EnvironmentInfo) |
| async def get_info(): |
| tasks = [ |
| TaskInfo( |
| id=task_id, |
| name=task_cls.NAME, |
| description=task_cls.DESCRIPTION, |
| difficulty=task_cls.DIFFICULTY, |
| num_scenarios=len(task_cls.SCENARIOS), |
| ) |
| for task_id, task_cls in TASK_REGISTRY.items() |
| ] |
| return EnvironmentInfo( |
| tasks=tasks, |
| max_steps=10, |
| action_space=Action.model_json_schema(), |
| observation_space=Observation.model_json_schema(), |
| ) |
|
|
|
|
| @app.get("/tasks") |
| async def get_tasks(): |
| return { |
| "tasks": [ |
| { |
| "id": task_id, |
| "name": task_cls.NAME, |
| "description": task_cls.DESCRIPTION, |
| "difficulty": task_cls.DIFFICULTY.value, |
| } |
| for task_id, task_cls in TASK_REGISTRY.items() |
| ] |
| } |
|
|
|
|
| @app.post("/grader", response_model=GraderResponse) |
| async def grade(request: GraderRequest): |
| result = run_grader(task_id=request.task_id, trajectory=request.trajectory) |
| return GraderResponse(result=result) |
|
|
|
|
| @app.post("/baseline", response_model=BaselineResponse) |
| async def run_baseline(request: Optional[BaselineRequest] = None): |
| request = request or BaselineRequest() |
|
|
| from baseline_runner import run_baseline_episodes |
|
|
| results = run_baseline_episodes(task_id=request.task_id, num_episodes=request.num_episodes) |
| aggregate = sum(r.score for r in results) / len(results) if results else 0.0 |
| return BaselineResponse(results=results, aggregate_score=aggregate) |
|
|
|
|
| def main(): |
| uvicorn.run(app, host="0.0.0.0", port=7860) |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|