File size: 4,813 Bytes
83ecd75
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.

"""FastAPI application for the DevOps Pipeline Environment."""

from openenv.core.env_server.http_server import create_app

from devops_pipeline_env.models import PipelineAction, PipelineObservation
from server.pipeline_environment import PipelineEnvironment

app = create_app(
    PipelineEnvironment,
    PipelineAction,
    PipelineObservation,
    env_name="devops_pipeline_env",
    max_concurrent_envs=1,
)

# Store active env on app.state so /grader can access it without class singletons.
# PipelineEnvironment.reset() calls _register_callback if set.
app.state.active_env = None
PipelineEnvironment._register_callback = lambda env: setattr(app.state, "active_env", env)


# --- Additional Required Endpoints -------------------------------------------

@app.get("/tasks")
def get_tasks():
    """Returns list of tasks and the action schema."""
    return {
        "tasks": [
            {
                "name": "clean_deploy",
                "difficulty": "easy",
                "description": "Deploy 2 services with all tests passing. No complications.",
                "max_steps": 15,
            },
            {
                "name": "broken_pipeline",
                "difficulty": "medium",
                "description": "Diagnose test failures, fix config errors, run migrations.",
                "max_steps": 20,
            },
            {
                "name": "judgment_call",
                "difficulty": "hard",
                "description": "Production incident with cascading failures. Hotfix breaks downstream service. 12-step time limit with degrading health.",
                "max_steps": 12,
            },
            {
                "name": "cascading_failure",
                "difficulty": "medium-hard",
                "description": "Root cause analysis across dependency chain. cache-service down, dragging api-gateway and web-frontend. Fix root cause first.",
                "max_steps": 15,
            },
            {
                "name": "capacity_crisis",
                "difficulty": "medium-hard",
                "description": "Peak traffic 4x normal. database-primary connection pool nearly full. Stabilize before tipping points trigger cascading collapse.",
                "max_steps": 15,
            },
            {
                "name": "random_incident",
                "difficulty": "variable",
                "description": "Procedurally generated incident. Service, failure type, and severity are randomized from seed. Infinite variation for curriculum learning.",
                "max_steps": 15,
            },
        ],
        "action_schema": PipelineAction.model_json_schema(),
    }


@app.get("/health")
def health_check():
    """Health check endpoint."""
    return {"status": "healthy"}


@app.post("/baseline")
async def run_baseline():
    """Return pre-recorded baseline scores. Does NOT run inference.py."""
    return {
        "scores": {
            "clean_deploy": 0.700,
            "broken_pipeline": 0.482,
            "judgment_call": 0.184,
            "cascading_failure": 0.280,
            "capacity_crisis": 0.250,
            "random_incident": 0.350,
        },
        "model": "Qwen/Qwen2.5-72B-Instruct",
        "note": "Baselines re-calibrated after environment tuning for clean_deploy (v2). Recorded 2026-04-08.",
    }


@app.post("/grader")
async def run_grader(task_name: str = ""):
    """Score from active session's episode history."""
    from server.graders import grade_task as _grade_task

    env = getattr(app.state, "active_env", None)
    if env is None or env.get_engine() is None:
        return {"task": task_name, "score": 0.001, "error": "No active session. Call /reset first."}
    if not env.get_episode_history():
        return {"task": env.get_task_name(), "score": 0.001, "error": "No steps taken. Call /step first."}
    active_task = env.get_task_name()
    if task_name and task_name != active_task:
        return {"task": task_name, "score": 0.001, "error": f"Task mismatch: requested '{task_name}' but active task is '{active_task}'."}
    if not task_name:
        task_name = active_task
    score = _grade_task(
        env.get_task_name(),
        env.get_episode_history(),
        env.get_engine(),
    )
    return {"task": env.get_task_name(), "score": score}


def main(host: str = "0.0.0.0", port: int = 8000):
    import uvicorn
    uvicorn.run(app, host=host, port=port)


if __name__ == "__main__":
    main()