yashash045's picture
Upload folder using huggingface_hub
83ecd75 verified
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.
"""FastAPI application for the DevOps Pipeline Environment."""
from openenv.core.env_server.http_server import create_app
from devops_pipeline_env.models import PipelineAction, PipelineObservation
from server.pipeline_environment import PipelineEnvironment
app = create_app(
PipelineEnvironment,
PipelineAction,
PipelineObservation,
env_name="devops_pipeline_env",
max_concurrent_envs=1,
)
# Store active env on app.state so /grader can access it without class singletons.
# PipelineEnvironment.reset() calls _register_callback if set.
app.state.active_env = None
PipelineEnvironment._register_callback = lambda env: setattr(app.state, "active_env", env)
# --- Additional Required Endpoints -------------------------------------------
@app.get("/tasks")
def get_tasks():
"""Returns list of tasks and the action schema."""
return {
"tasks": [
{
"name": "clean_deploy",
"difficulty": "easy",
"description": "Deploy 2 services with all tests passing. No complications.",
"max_steps": 15,
},
{
"name": "broken_pipeline",
"difficulty": "medium",
"description": "Diagnose test failures, fix config errors, run migrations.",
"max_steps": 20,
},
{
"name": "judgment_call",
"difficulty": "hard",
"description": "Production incident with cascading failures. Hotfix breaks downstream service. 12-step time limit with degrading health.",
"max_steps": 12,
},
{
"name": "cascading_failure",
"difficulty": "medium-hard",
"description": "Root cause analysis across dependency chain. cache-service down, dragging api-gateway and web-frontend. Fix root cause first.",
"max_steps": 15,
},
{
"name": "capacity_crisis",
"difficulty": "medium-hard",
"description": "Peak traffic 4x normal. database-primary connection pool nearly full. Stabilize before tipping points trigger cascading collapse.",
"max_steps": 15,
},
{
"name": "random_incident",
"difficulty": "variable",
"description": "Procedurally generated incident. Service, failure type, and severity are randomized from seed. Infinite variation for curriculum learning.",
"max_steps": 15,
},
],
"action_schema": PipelineAction.model_json_schema(),
}
@app.get("/health")
def health_check():
"""Health check endpoint."""
return {"status": "healthy"}
@app.post("/baseline")
async def run_baseline():
"""Return pre-recorded baseline scores. Does NOT run inference.py."""
return {
"scores": {
"clean_deploy": 0.700,
"broken_pipeline": 0.482,
"judgment_call": 0.184,
"cascading_failure": 0.280,
"capacity_crisis": 0.250,
"random_incident": 0.350,
},
"model": "Qwen/Qwen2.5-72B-Instruct",
"note": "Baselines re-calibrated after environment tuning for clean_deploy (v2). Recorded 2026-04-08.",
}
@app.post("/grader")
async def run_grader(task_name: str = ""):
"""Score from active session's episode history."""
from server.graders import grade_task as _grade_task
env = getattr(app.state, "active_env", None)
if env is None or env.get_engine() is None:
return {"task": task_name, "score": 0.001, "error": "No active session. Call /reset first."}
if not env.get_episode_history():
return {"task": env.get_task_name(), "score": 0.001, "error": "No steps taken. Call /step first."}
active_task = env.get_task_name()
if task_name and task_name != active_task:
return {"task": task_name, "score": 0.001, "error": f"Task mismatch: requested '{task_name}' but active task is '{active_task}'."}
if not task_name:
task_name = active_task
score = _grade_task(
env.get_task_name(),
env.get_episode_history(),
env.get_engine(),
)
return {"task": env.get_task_name(), "score": score}
def main(host: str = "0.0.0.0", port: int = 8000):
import uvicorn
uvicorn.run(app, host=host, port=port)
if __name__ == "__main__":
main()