from __future__ import annotations
from fastapi import FastAPI, HTTPException
from fastapi.responses import HTMLResponse
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from typing import Optional
from curriculum import CurriculumEngine
from env import DevOpsIncidentEnv
from models import Action, ActionType, Observation, StepResult, State
from multi_agent import DualAgentSession
from collections import deque
from datetime import datetime
import uuid
import statistics
from generator import IncidentFactory
from tasks.task_generated import GeneratedTask
curriculum_engine = CurriculumEngine()
episode_tracker: dict = {} # tracks active episode task_id per session
episode_history = deque(maxlen=1000)
_factory = IncidentFactory()
def track_episode(state_obj: State):
from graders.grader import grade_episode
score = grade_episode(
task_id=state_obj.task_id,
action_history=state_obj.action_history,
ground_truth_root_cause=state_obj.ground_truth_root_cause,
ground_truth_fix=state_obj.ground_truth_fix,
incident_resolved=state_obj.incident_resolved,
total_reward=state_obj.total_reward
)
info_actions = {"read_logs", "read_metrics", "read_runbook", "search_logs"}
info_count = 0
diag_step = None
for act in state_obj.action_history:
at = act["action"].get("action_type")
if at in info_actions:
info_count += 1
if at == "diagnose" and diag_step is None:
diag_step = act["step"]
info_ratio = info_count / len(state_obj.action_history) if state_obj.action_history else 0.0
seed = state_obj.info.get("seed", 42)
record = {
"episode_id": state_obj.episode_id or str(uuid.uuid4()),
"task_id": state_obj.task_id,
"seed": seed,
"steps_taken": state_obj.step,
"incident_resolved": state_obj.incident_resolved,
"final_score": float(score),
"steps_to_diagnosis": diag_step,
"info_gathering_ratio": float(info_ratio),
"timestamp": datetime.utcnow().isoformat() + "Z"
}
episode_history.append(record)
app = FastAPI(
title="DevOps Incident Response — OpenEnv",
description=(
"An OpenEnv-compliant RL environment where AI agents diagnose and remediate "
"production software incidents across a simulated microservices architecture. "
"Seven tasks of escalating difficulty: OOM crash-loop, cascading failure, "
"silent data corruption, dual simultaneous failure, DDoS attack, database "
"degradation, and multi-region failover."
),
version="1.0.0",
)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["*"],
allow_headers=["*"],
)
VALID_TASKS = ("easy", "medium", "hard", "bonus", "security", "database", "failover")
_env: Optional[DevOpsIncidentEnv] = None
multi_agent_sessions: dict[str, DualAgentSession] = {}
class ResetRequest(BaseModel):
task_id: str = "easy"
seed: Optional[int] = None
class CurriculumRecordRequest(BaseModel):
task_id: str
score: float
class MultiAgentResetRequest(BaseModel):
task_id: str
seed: int = 42
class AgentAStepRequest(BaseModel):
finding: str
@app.get("/", response_class=HTMLResponse)
def dashboard():
html = f"""
ARIA — DevOps Incident Response | OpenEnv
Environment — 7 Tasks
EASY
Single Service OOM
One service crash-loops from a memory leak. Varies by seed.
max_steps: 15
MEDIUM
Cascading Failure
Bad deployment cascades through 3 services + red-herring.
max_steps: 20
HARD
Silent Data Corruption
No alerts. Signal in WARN logs and business metric slippage.
max_steps: 25
BONUS
Dual Simultaneous Failure
Two independent failures at once. Both must be fixed.
max_steps: 25
SECURITY
Security Incident (DDoS)
Botnet DDoS + credential stuffing. CIDR block required.
max_steps: 20
DATABASE
Database Degradation
Missing schema index causing DB CPU spike and slow queries.
max_steps: 20
FAILOVER
Multi-Region Failover
Region failure. Escalate or failover selective services.
max_steps: 25
GENERATED
Procedural Incident
Seed-based procedural incidents. Infinite unique scenarios.
max_steps: 20
Curriculum Engine
Adapts training difficulty to agent performance
Loading curriculum data...
Feed your training loop: POST /curriculum/record with {{"task_id", "score"}}
Incident Generator
Procedural incidents — infinite unique scenarios from seeds
Noise alerts:
POST /reset {{"task_id":"generated","seed":}}
Dual-Agent Mode
Split observability — Observer sees logs, Responder sees metrics
┌──────────────────┐ share_finding ┌──────────────────┐
│ AGENT A │ ────────────────▶ │ AGENT B │
│ Observer │ │ Responder │
│ │ │ │
│ Sees: │ │ Sees: │
│ · alerts │ │ · cpu/memory │
│ · logs │ │ · error_rate │
│ · evidence │ │ · dependencies │
└──────────────────┘ └──────────────────┘
│ ▲
└──────── Shared Findings Log ─────────┘
Quick Start
# Install
pip install openenv
# Reset environment
curl -X POST .../reset \\
-H "Content-Type: application/json" \\
-d '{{"task_id":"easy","seed":42}}'
# Take an action
curl -X POST .../step \\
-d '{{"action_type":"read_logs",
"service":"payment-service"}}'
# Check curriculum
curl .../curriculum/next
# Start session
curl -X POST .../multi-agent/reset \\
-d '{{"task_id":"easy","seed":42}}'
# Agent A shares finding
curl -X POST .../multi-agent/step/a/{{id}} \\
-d '{{"finding":"payment-service OOM"}}'
# Agent B responds
curl -X POST .../multi-agent/step/b/{{id}} \\
-d '{{"action_type":"restart_service",
"service":"payment-service"}}'
"""
return html
@app.get("/health")
def health():
return {"status": "ok", "env": "devops-incident-response", "version": "1.0.0"}
@app.get("/generate/preview")
def preview_incident(seed: int = 42):
return _factory.generate(seed)
@app.post("/reset", response_model=Observation)
def reset(req: Optional[ResetRequest] = None):
if req is None:
req = ResetRequest()
global _env
if req.task_id == "generated":
seed = req.seed if req.seed is not None else 42
incident = _factory.generate(seed)
task = GeneratedTask(incident_dict=incident)
state = task.initialize()
_env = DevOpsIncidentEnv(task_id="easy", seed=seed)
_env.task_id = "generated"
_env._task = task
_env._internal_state = state
observation = state._build_observation()
else:
if req.task_id not in VALID_TASKS:
raise HTTPException(
status_code=400,
detail=f"task_id must be one of {VALID_TASKS}. Got: {req.task_id}",
)
_env = DevOpsIncidentEnv(task_id=req.task_id, seed=req.seed)
observation = _env.reset()
episode_tracker[_env.state().episode_id] = req.task_id
return observation
@app.post("/step", response_model=StepResult)
def step(action: Action):
if _env is None:
raise HTTPException(status_code=400, detail="Call /reset before /step")
res = _env.step(action)
if res.done:
from graders.grader import grade_episode
current_state = _env.state()
current_task_id = episode_tracker.get(current_state.episode_id, current_state.task_id)
final_score = grade_episode(
task_id=current_task_id,
action_history=current_state.action_history,
ground_truth_root_cause=current_state.ground_truth_root_cause,
ground_truth_fix=current_state.ground_truth_fix,
incident_resolved=current_state.incident_resolved,
total_reward=current_state.total_reward,
)
curriculum_engine.record_episode(current_task_id, float(final_score))
episode_tracker.pop(current_state.episode_id, None)
track_episode(current_state)
return res
@app.get("/state", response_model=State)
def state():
if _env is None:
raise HTTPException(status_code=400, detail="Call /reset before /state")
return _env.state()
@app.post("/multi-agent/reset")
def multi_agent_reset(body: MultiAgentResetRequest):
session = DualAgentSession(task_id=body.task_id, seed=body.seed)
multi_agent_sessions[session.session_id] = session
return {
"session_id": session.session_id,
"task_id": body.task_id,
"seed": body.seed,
"agent_a_role": "observer — sees logs and alerts only",
"agent_b_role": "responder — sees metrics and dependencies only",
"instructions": {
"agent_a": (
"POST /multi-agent/step/a/{session_id} "
"body: {\"finding\": \"your observation\"}"
),
"agent_b": (
"POST /multi-agent/step/b/{session_id} "
"body: Action JSON (same schema as POST /step)"
),
},
"observation_a": session.get_observation_a(),
"observation_b": session.get_observation_b(),
}
@app.post("/multi-agent/step/a/{session_id}")
def multi_agent_step_a(session_id: str, body: AgentAStepRequest):
session = multi_agent_sessions.get(session_id)
if not session:
raise HTTPException(status_code=404, detail="Session not found")
return session.step_a(body.finding)
@app.post("/multi-agent/step/b/{session_id}")
def multi_agent_step_b(session_id: str, body: Action):
session = multi_agent_sessions.get(session_id)
if not session:
raise HTTPException(status_code=404, detail="Session not found")
return session.step_b(body)
@app.get("/multi-agent/state/{session_id}")
def multi_agent_state(session_id: str):
session = multi_agent_sessions.get(session_id)
if not session:
raise HTTPException(status_code=404, detail="Session not found")
return session.get_state()
@app.get("/multi-agent/sessions")
def list_multi_agent_sessions():
return [
{
"session_id": session.session_id,
"task_id": session.task_id,
"step": session.step_count,
"done": session.done,
"findings_count": len(session.findings_log),
}
for session in multi_agent_sessions.values()
]
@app.get("/tasks")
def list_tasks():
return {
"tasks": [
{
"id": "easy",
"name": "Single Service OOM",
"difficulty": "easy",
"max_steps": 15,
"description": "One service crash-loops from a memory leak. Which service varies by seed.",
},
{
"id": "medium",
"name": "Cascading Multi-Service Failure",
"difficulty": "medium",
"max_steps": 20,
"description": (
"Bad deployment causes connection pool exhaustion cascading through 3 services. "
"One red-herring alert included."
),
},
{
"id": "hard",
"name": "Silent Data Corruption",
"difficulty": "hard",
"max_steps": 25,
"description": (
"No error-rate alerts fire. Signals are WARN-level logs and a business metric anomaly. "
"Requires rollback + on-call alert for full credit."
),
},
{
"id": "bonus",
"name": "Simultaneous Dual Failure",
"difficulty": "hard",
"max_steps": 25,
"description": (
"Two independent failures at once: disk full on log aggregator + "
"model reload CPU loop on ml-inference. Both must be fixed for full credit."
),
},
{
"id": "security",
"name": "Security Incident (DDoS)",
"difficulty": "hard",
"max_steps": 20,
"description": (
"A botnet is performing a DDoS and credential stuffing attack against the login endpoint. "
"The agent must read access logs, diagnose the attack IP range, block the CIDR, and alert the security team."
),
},
{
"id": "database",
"name": "Database Performance Degradation",
"difficulty": "hard",
"max_steps": 20,
"description": (
"A recent migration added a user_segment column to the orders table without an index. "
"Sequential table scans are spiking DB CPU. Discovered via read_metrics and the slow query log."
),
},
{
"id": "failover",
"name": "Multi-Region Failover",
"difficulty": "hard",
"max_steps": 25,
"description": (
"A primary datacenter region (us-east-1) is degraded due to a network partition. "
"The agent must correctly identify which services support automatic multi-region failover "
"and which do not. Failing over the wrong services causes severe data inconsistency penalties."
),
},
{
"id": "generated",
"name": "Procedural Incident",
"difficulty": "variable",
"max_steps": 20,
"description": "Procedurally generated incident. Use with any seed 0-99999 for infinite unique scenarios.",
},
]
}
@app.get("/curriculum/status")
def get_curriculum_status():
return curriculum_engine.get_status()
@app.get("/curriculum/next")
def get_next_curriculum_task():
return {
"recommended_task": curriculum_engine.get_next_curriculum_task(),
"reasoning": "Lowest rolling average among non-mastered tasks.",
}
@app.post("/curriculum/record")
def record_curriculum_episode(req: CurriculumRecordRequest):
try:
curriculum_engine.record_episode(req.task_id, req.score)
except ValueError as exc:
raise HTTPException(status_code=400, detail=str(exc))
return {
"recorded": True,
"new_status": curriculum_engine.get_status()["tasks"][req.task_id],
}
@app.get("/curriculum/hint/{task_id}")
def get_curriculum_hint(task_id: str):
try:
return {
"task_id": task_id,
"hint": curriculum_engine.get_hint(task_id),
"scaffold_needed": curriculum_engine.should_scaffold(task_id),
"mastery_level": curriculum_engine.get_mastery(task_id),
}
except ValueError as exc:
raise HTTPException(status_code=400, detail=str(exc))
@app.get("/validate")
def validate():
"""
Self-validation endpoint for judges.
Runs a quick episode on each task and confirms graders return [0.0, 1.0].
"""
import random
from graders.grader import grade_episode
results = []
for task_id in VALID_TASKS:
try:
env = DevOpsIncidentEnv(task_id=task_id, seed=42)
env.reset()
done = False
rng = random.Random(7)
steps = 0
import random as _random
while not done and steps < 30:
action = Action(action_type=_random.choice(list(ActionType)))
result = env.step(action)
done = result.done
steps += 1
s = env.state()
score = grade_episode(
task_id, s.action_history, s.ground_truth_root_cause,
s.ground_truth_fix, s.incident_resolved, s.total_reward,
)
results.append({
"task_id": task_id,
"score": score,
"in_range": 0.0 <= score <= 1.0,
"resolved": s.incident_resolved,
"steps": steps,
"status": "ok",
})
except Exception as e:
results.append({"task_id": task_id, "status": "error", "error": str(e)})
all_ok = all(r.get("status") == "ok" and r.get("in_range") for r in results)
return {"validation": "passed" if all_ok else "failed", "tasks": results}
@app.get("/metrics")
def get_metrics():
total_episodes = len(episode_history)
by_task = {}
total_score = 0.0
if total_episodes == 0:
return {
"total_episodes": 0,
"by_task": {},
"overall_avg_score": 0.0,
"last_updated": datetime.utcnow().isoformat() + "Z"
}
for rec in episode_history:
tid = rec["task_id"]
if tid not in by_task:
by_task[tid] = {"scores": [], "resolved": 0, "steps_to_diag": [], "info_ratios": []}
by_task[tid]["scores"].append(rec["final_score"])
if rec["incident_resolved"]:
by_task[tid]["resolved"] += 1
if rec["steps_to_diagnosis"] is not None:
by_task[tid]["steps_to_diag"].append(rec["steps_to_diagnosis"])
by_task[tid]["info_ratios"].append(rec["info_gathering_ratio"])
total_score += rec["final_score"]
out_by_task = {}
for tid, agg in by_task.items():
cnt = len(agg["scores"])
out_by_task[tid] = {
"count": cnt,
"avg_score": round(sum(agg["scores"]) / cnt, 3),
"max_score": round(max(agg["scores"]), 3),
"min_score": round(min(agg["scores"]), 3),
"resolution_rate": round(agg["resolved"] / cnt, 3),
"avg_steps_to_diagnosis": round(sum(agg["steps_to_diag"]) / len(agg["steps_to_diag"]), 1) if agg["steps_to_diag"] else None,
"avg_info_gathering_ratio": round(sum(agg["info_ratios"]) / len(agg["info_ratios"]), 2) if agg["info_ratios"] else 0.0
}
return {
"total_episodes": total_episodes,
"by_task": out_by_task,
"overall_avg_score": round(total_score / total_episodes, 3),
"last_updated": datetime.utcnow().isoformat() + "Z"
}
@app.get("/leaderboard")
def get_leaderboard():
sorted_eps = sorted(episode_history, key=lambda x: (x["final_score"], -x["steps_taken"]), reverse=True)
top_10 = []
for i, rec in enumerate(sorted_eps[:10]):
top_10.append({
"rank": i + 1,
"task_id": rec["task_id"],
"score": rec["final_score"],
"steps": rec["steps_taken"],
"timestamp": rec["timestamp"]
})
return {"leaderboard": top_10}