from __future__ import annotations from fastapi import FastAPI, HTTPException from fastapi.responses import HTMLResponse from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel from typing import Optional from curriculum import CurriculumEngine from env import DevOpsIncidentEnv from models import Action, ActionType, Observation, StepResult, State from multi_agent import DualAgentSession from collections import deque from datetime import datetime import uuid import statistics from generator import IncidentFactory from tasks.task_generated import GeneratedTask curriculum_engine = CurriculumEngine() episode_tracker: dict = {} # tracks active episode task_id per session episode_history = deque(maxlen=1000) _factory = IncidentFactory() def track_episode(state_obj: State): from graders.grader import grade_episode score = grade_episode( task_id=state_obj.task_id, action_history=state_obj.action_history, ground_truth_root_cause=state_obj.ground_truth_root_cause, ground_truth_fix=state_obj.ground_truth_fix, incident_resolved=state_obj.incident_resolved, total_reward=state_obj.total_reward ) info_actions = {"read_logs", "read_metrics", "read_runbook", "search_logs"} info_count = 0 diag_step = None for act in state_obj.action_history: at = act["action"].get("action_type") if at in info_actions: info_count += 1 if at == "diagnose" and diag_step is None: diag_step = act["step"] info_ratio = info_count / len(state_obj.action_history) if state_obj.action_history else 0.0 seed = state_obj.info.get("seed", 42) record = { "episode_id": state_obj.episode_id or str(uuid.uuid4()), "task_id": state_obj.task_id, "seed": seed, "steps_taken": state_obj.step, "incident_resolved": state_obj.incident_resolved, "final_score": float(score), "steps_to_diagnosis": diag_step, "info_gathering_ratio": float(info_ratio), "timestamp": datetime.utcnow().isoformat() + "Z" } episode_history.append(record) app = FastAPI( title="DevOps Incident Response — OpenEnv", description=( "An OpenEnv-compliant RL environment where AI agents diagnose and remediate " "production software incidents across a simulated microservices architecture. " "Seven tasks of escalating difficulty: OOM crash-loop, cascading failure, " "silent data corruption, dual simultaneous failure, DDoS attack, database " "degradation, and multi-region failover." ), version="1.0.0", ) app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"], ) VALID_TASKS = ("easy", "medium", "hard", "bonus", "security", "database", "failover") _env: Optional[DevOpsIncidentEnv] = None multi_agent_sessions: dict[str, DualAgentSession] = {} class ResetRequest(BaseModel): task_id: str = "easy" seed: Optional[int] = None class CurriculumRecordRequest(BaseModel): task_id: str score: float class MultiAgentResetRequest(BaseModel): task_id: str seed: int = 42 class AgentAStepRequest(BaseModel): finding: str @app.get("/", response_class=HTMLResponse) def dashboard(): html = f""" ARIA — DevOps Incident Response | OpenEnv

ARIA

Adaptive Reward & Incident Architecture

DevOps Incident Response · OpenEnv · Meta × PyTorch × HuggingFace Hackathon

LOADING...
Docs  |  Validate

Environment — 7 Tasks

EASY
Single Service OOM
One service crash-loops from a memory leak. Varies by seed.
max_steps: 15
MEDIUM
Cascading Failure
Bad deployment cascades through 3 services + red-herring.
max_steps: 20
HARD
Silent Data Corruption
No alerts. Signal in WARN logs and business metric slippage.
max_steps: 25
BONUS
Dual Simultaneous Failure
Two independent failures at once. Both must be fixed.
max_steps: 25
SECURITY
Security Incident (DDoS)
Botnet DDoS + credential stuffing. CIDR block required.
max_steps: 20
DATABASE
Database Degradation
Missing schema index causing DB CPU spike and slow queries.
max_steps: 20
FAILOVER
Multi-Region Failover
Region failure. Escalate or failover selective services.
max_steps: 25
GENERATED
Procedural Incident
Seed-based procedural incidents. Infinite unique scenarios.
max_steps: 20

Curriculum Engine

Adapts training difficulty to agent performance

Loading curriculum data...

Feed your training loop: POST /curriculum/record with {{"task_id", "score"}}

Incident Generator

Procedural incidents — infinite unique scenarios from seeds

Noise alerts:
POST /reset {{"task_id":"generated","seed":}}

Dual-Agent Mode

Split observability — Observer sees logs, Responder sees metrics

┌──────────────────┐ share_finding ┌──────────────────┐ │ AGENT A │ ────────────────▶ │ AGENT B │ │ Observer │ │ Responder │ │ │ │ │ │ Sees: │ │ Sees: │ │ · alerts │ │ · cpu/memory │ │ · logs │ │ · error_rate │ │ · evidence │ │ · dependencies │ └──────────────────┘ └──────────────────┘ │ ▲ └──────── Shared Findings Log ─────────┘

                

Quick Start

Single Agent
# Install
pip install openenv

# Reset environment  
curl -X POST .../reset \\
  -H "Content-Type: application/json" \\
  -d '{{"task_id":"easy","seed":42}}'

# Take an action
curl -X POST .../step \\
  -d '{{"action_type":"read_logs",
       "service":"payment-service"}}'

# Check curriculum
curl .../curriculum/next
Dual Agent
# Start session
curl -X POST .../multi-agent/reset \\
  -d '{{"task_id":"easy","seed":42}}'

# Agent A shares finding
curl -X POST .../multi-agent/step/a/{{id}} \\
  -d '{{"finding":"payment-service OOM"}}'

# Agent B responds
curl -X POST .../multi-agent/step/b/{{id}} \\
  -d '{{"action_type":"restart_service",
       "service":"payment-service"}}'
""" return html @app.get("/health") def health(): return {"status": "ok", "env": "devops-incident-response", "version": "1.0.0"} @app.get("/generate/preview") def preview_incident(seed: int = 42): return _factory.generate(seed) @app.post("/reset", response_model=Observation) def reset(req: Optional[ResetRequest] = None): if req is None: req = ResetRequest() global _env if req.task_id == "generated": seed = req.seed if req.seed is not None else 42 incident = _factory.generate(seed) task = GeneratedTask(incident_dict=incident) state = task.initialize() _env = DevOpsIncidentEnv(task_id="easy", seed=seed) _env.task_id = "generated" _env._task = task _env._internal_state = state observation = state._build_observation() else: if req.task_id not in VALID_TASKS: raise HTTPException( status_code=400, detail=f"task_id must be one of {VALID_TASKS}. Got: {req.task_id}", ) _env = DevOpsIncidentEnv(task_id=req.task_id, seed=req.seed) observation = _env.reset() episode_tracker[_env.state().episode_id] = req.task_id return observation @app.post("/step", response_model=StepResult) def step(action: Action): if _env is None: raise HTTPException(status_code=400, detail="Call /reset before /step") res = _env.step(action) if res.done: from graders.grader import grade_episode current_state = _env.state() current_task_id = episode_tracker.get(current_state.episode_id, current_state.task_id) final_score = grade_episode( task_id=current_task_id, action_history=current_state.action_history, ground_truth_root_cause=current_state.ground_truth_root_cause, ground_truth_fix=current_state.ground_truth_fix, incident_resolved=current_state.incident_resolved, total_reward=current_state.total_reward, ) curriculum_engine.record_episode(current_task_id, float(final_score)) episode_tracker.pop(current_state.episode_id, None) track_episode(current_state) return res @app.get("/state", response_model=State) def state(): if _env is None: raise HTTPException(status_code=400, detail="Call /reset before /state") return _env.state() @app.post("/multi-agent/reset") def multi_agent_reset(body: MultiAgentResetRequest): session = DualAgentSession(task_id=body.task_id, seed=body.seed) multi_agent_sessions[session.session_id] = session return { "session_id": session.session_id, "task_id": body.task_id, "seed": body.seed, "agent_a_role": "observer — sees logs and alerts only", "agent_b_role": "responder — sees metrics and dependencies only", "instructions": { "agent_a": ( "POST /multi-agent/step/a/{session_id} " "body: {\"finding\": \"your observation\"}" ), "agent_b": ( "POST /multi-agent/step/b/{session_id} " "body: Action JSON (same schema as POST /step)" ), }, "observation_a": session.get_observation_a(), "observation_b": session.get_observation_b(), } @app.post("/multi-agent/step/a/{session_id}") def multi_agent_step_a(session_id: str, body: AgentAStepRequest): session = multi_agent_sessions.get(session_id) if not session: raise HTTPException(status_code=404, detail="Session not found") return session.step_a(body.finding) @app.post("/multi-agent/step/b/{session_id}") def multi_agent_step_b(session_id: str, body: Action): session = multi_agent_sessions.get(session_id) if not session: raise HTTPException(status_code=404, detail="Session not found") return session.step_b(body) @app.get("/multi-agent/state/{session_id}") def multi_agent_state(session_id: str): session = multi_agent_sessions.get(session_id) if not session: raise HTTPException(status_code=404, detail="Session not found") return session.get_state() @app.get("/multi-agent/sessions") def list_multi_agent_sessions(): return [ { "session_id": session.session_id, "task_id": session.task_id, "step": session.step_count, "done": session.done, "findings_count": len(session.findings_log), } for session in multi_agent_sessions.values() ] @app.get("/tasks") def list_tasks(): return { "tasks": [ { "id": "easy", "name": "Single Service OOM", "difficulty": "easy", "max_steps": 15, "description": "One service crash-loops from a memory leak. Which service varies by seed.", }, { "id": "medium", "name": "Cascading Multi-Service Failure", "difficulty": "medium", "max_steps": 20, "description": ( "Bad deployment causes connection pool exhaustion cascading through 3 services. " "One red-herring alert included." ), }, { "id": "hard", "name": "Silent Data Corruption", "difficulty": "hard", "max_steps": 25, "description": ( "No error-rate alerts fire. Signals are WARN-level logs and a business metric anomaly. " "Requires rollback + on-call alert for full credit." ), }, { "id": "bonus", "name": "Simultaneous Dual Failure", "difficulty": "hard", "max_steps": 25, "description": ( "Two independent failures at once: disk full on log aggregator + " "model reload CPU loop on ml-inference. Both must be fixed for full credit." ), }, { "id": "security", "name": "Security Incident (DDoS)", "difficulty": "hard", "max_steps": 20, "description": ( "A botnet is performing a DDoS and credential stuffing attack against the login endpoint. " "The agent must read access logs, diagnose the attack IP range, block the CIDR, and alert the security team." ), }, { "id": "database", "name": "Database Performance Degradation", "difficulty": "hard", "max_steps": 20, "description": ( "A recent migration added a user_segment column to the orders table without an index. " "Sequential table scans are spiking DB CPU. Discovered via read_metrics and the slow query log." ), }, { "id": "failover", "name": "Multi-Region Failover", "difficulty": "hard", "max_steps": 25, "description": ( "A primary datacenter region (us-east-1) is degraded due to a network partition. " "The agent must correctly identify which services support automatic multi-region failover " "and which do not. Failing over the wrong services causes severe data inconsistency penalties." ), }, { "id": "generated", "name": "Procedural Incident", "difficulty": "variable", "max_steps": 20, "description": "Procedurally generated incident. Use with any seed 0-99999 for infinite unique scenarios.", }, ] } @app.get("/curriculum/status") def get_curriculum_status(): return curriculum_engine.get_status() @app.get("/curriculum/next") def get_next_curriculum_task(): return { "recommended_task": curriculum_engine.get_next_curriculum_task(), "reasoning": "Lowest rolling average among non-mastered tasks.", } @app.post("/curriculum/record") def record_curriculum_episode(req: CurriculumRecordRequest): try: curriculum_engine.record_episode(req.task_id, req.score) except ValueError as exc: raise HTTPException(status_code=400, detail=str(exc)) return { "recorded": True, "new_status": curriculum_engine.get_status()["tasks"][req.task_id], } @app.get("/curriculum/hint/{task_id}") def get_curriculum_hint(task_id: str): try: return { "task_id": task_id, "hint": curriculum_engine.get_hint(task_id), "scaffold_needed": curriculum_engine.should_scaffold(task_id), "mastery_level": curriculum_engine.get_mastery(task_id), } except ValueError as exc: raise HTTPException(status_code=400, detail=str(exc)) @app.get("/validate") def validate(): """ Self-validation endpoint for judges. Runs a quick episode on each task and confirms graders return [0.0, 1.0]. """ import random from graders.grader import grade_episode results = [] for task_id in VALID_TASKS: try: env = DevOpsIncidentEnv(task_id=task_id, seed=42) env.reset() done = False rng = random.Random(7) steps = 0 import random as _random while not done and steps < 30: action = Action(action_type=_random.choice(list(ActionType))) result = env.step(action) done = result.done steps += 1 s = env.state() score = grade_episode( task_id, s.action_history, s.ground_truth_root_cause, s.ground_truth_fix, s.incident_resolved, s.total_reward, ) results.append({ "task_id": task_id, "score": score, "in_range": 0.0 <= score <= 1.0, "resolved": s.incident_resolved, "steps": steps, "status": "ok", }) except Exception as e: results.append({"task_id": task_id, "status": "error", "error": str(e)}) all_ok = all(r.get("status") == "ok" and r.get("in_range") for r in results) return {"validation": "passed" if all_ok else "failed", "tasks": results} @app.get("/metrics") def get_metrics(): total_episodes = len(episode_history) by_task = {} total_score = 0.0 if total_episodes == 0: return { "total_episodes": 0, "by_task": {}, "overall_avg_score": 0.0, "last_updated": datetime.utcnow().isoformat() + "Z" } for rec in episode_history: tid = rec["task_id"] if tid not in by_task: by_task[tid] = {"scores": [], "resolved": 0, "steps_to_diag": [], "info_ratios": []} by_task[tid]["scores"].append(rec["final_score"]) if rec["incident_resolved"]: by_task[tid]["resolved"] += 1 if rec["steps_to_diagnosis"] is not None: by_task[tid]["steps_to_diag"].append(rec["steps_to_diagnosis"]) by_task[tid]["info_ratios"].append(rec["info_gathering_ratio"]) total_score += rec["final_score"] out_by_task = {} for tid, agg in by_task.items(): cnt = len(agg["scores"]) out_by_task[tid] = { "count": cnt, "avg_score": round(sum(agg["scores"]) / cnt, 3), "max_score": round(max(agg["scores"]), 3), "min_score": round(min(agg["scores"]), 3), "resolution_rate": round(agg["resolved"] / cnt, 3), "avg_steps_to_diagnosis": round(sum(agg["steps_to_diag"]) / len(agg["steps_to_diag"]), 1) if agg["steps_to_diag"] else None, "avg_info_gathering_ratio": round(sum(agg["info_ratios"]) / len(agg["info_ratios"]), 2) if agg["info_ratios"] else 0.0 } return { "total_episodes": total_episodes, "by_task": out_by_task, "overall_avg_score": round(total_score / total_episodes, 3), "last_updated": datetime.utcnow().isoformat() + "Z" } @app.get("/leaderboard") def get_leaderboard(): sorted_eps = sorted(episode_history, key=lambda x: (x["final_score"], -x["steps_taken"]), reverse=True) top_10 = [] for i, rec in enumerate(sorted_eps[:10]): top_10.append({ "rank": i + 1, "task_id": rec["task_id"], "score": rec["final_score"], "steps": rec["steps_taken"], "timestamp": rec["timestamp"] }) return {"leaderboard": top_10}