Eight scenarios of escalating operational complexity
Loading tasks...
ARIA Features
What makes this environment unique
🎓
Curriculum Engine
Tracks agent performance per task with rolling averages. Promotes when mastered (avg > 0.75). Scaffolds with hints when struggling (avg < 0.30). Agents always train at the edge of their capability.
Procedural incidents from seeds 0–99,999. Six failure modes × eight services × variable noise = infinite unique training scenarios. Same seed always produces the same incident.
Split observability between two agents. Observer sees logs and alerts. Responder sees metrics and dependencies. Neither can solve the incident alone — they must coordinate via share_finding.
"""
return html
@app.get("/health")
def health():
"""
Health check endpoint.
Returns a simple status object confirming the server is running.
Returns:
{"status": "ok", "env": "devops-incident-response", "version": "2.0.0"}
"""
return {"status": "ok", "env": "devops-incident-response", "version": "2.0.0"}
@app.get("/generate/preview")
def preview_incident(seed: int = 42):
"""
Preview a procedurally generated incident without starting an episode.
Uses ARIA's IncidentFactory to generate a deterministic incident description
from the given integer seed. Same seed always produces the same incident.
Args:
seed: Integer seed in range 0–99999 (default: 42)
Returns:
Incident object with: failure_mode, severity, affected_service,
description, noise_alerts, difficulty_score
"""
return _factory.generate(seed)
@app.post("/reset", response_model=Observation)
async def reset(req: Optional[ResetRequest] = None):
"""
Start a new episode.
Initializes the environment for the specified task and seed.
Same seed always produces the same episode (deterministic).
Args:
task_id: One of easy/medium/hard/bonus/security/database/failover/generated
seed: Integer seed for reproducibility (optional, random if not provided)
Returns:
Observation with: services, active_alerts, recent_logs,
service_dependencies, evidence_log, sla_status, available_runbooks
"""
if req is None:
req = ResetRequest()
if req.task_id not in VALID_TASKS and req.task_id != "generated":
raise HTTPException(
status_code=400,
detail=f"task_id must be one of {VALID_TASKS} or 'generated'. Got: {req.task_id}",
)
current_episode_steps.clear()
return await _env.reset(seed=req.seed, task_id=req.task_id)
@app.post("/step", response_model=StepResult)
async def step(action: Action):
"""
Take one action in the current episode.
Must call /reset first. Accepts any of the 14 action types with their
corresponding parameters. Returns the new observation, reward signal,
and done flag.
Args:
action_type: One of diagnose/read_logs/read_metrics/read_runbook/
search_logs/restart_service/rollback/scale_up/
alert_oncall/acknowledge/noop/block_ip_range/
create_index/failover
service: Target service name (required for most actions)
root_cause: Diagnosis string (required for diagnose action)
runbook: Runbook filename (required for read_runbook)
version: Target version (required for rollback)
reason: Reason string (required for alert_oncall)
ip_range: CIDR range (required for block_ip_range)
table: Table name (required for create_index)
column: Column name (required for create_index)
target_region: Target region (required for failover)
Returns:
StepResult with: observation (new state), reward (float), done (bool), info (dict)
Side effects:
On done=True, records the episode in the leaderboard and metrics history.
"""
global replay_counter
if _env._logic is None:
raise HTTPException(status_code=400, detail="Call /reset before /step")
res = await _env.step(action)
step_data = {
"step": len(current_episode_steps),
"action": action.dict(),
"reward": res.reward,
"observation_summary": {
"failing_services": [s.name for s in res.observation.services if s.status in ("down", "degraded")],
"alert_count": len(res.observation.active_alerts),
"evidence_count": len(res.observation.evidence_log),
},
}
current_episode_steps.append(step_data)
if res.done:
track_episode(_env.state)
state = _env.state
replay_store[str(replay_counter)] = {
"episode_id": str(replay_counter),
"task_id": state.task_id,
"seed": state.info.get("seed", 0),
"final_score": round(state.total_reward, 3),
"resolved": state.incident_resolved,
"total_steps": state.step,
"timestamp": datetime.utcnow().isoformat(),
"steps": list(current_episode_steps),
}
replay_counter += 1
if len(replay_store) > 20:
oldest = min(replay_store.keys(), key=int)
del replay_store[oldest]
return res
@app.get("/state", response_model=State)
def state():
"""
Return the full current environment state including ground truth.
Unlike /step which returns partial observations, /state reveals the
ground truth root cause, fix, and full action history. Useful for
evaluation and debugging.
Returns:
State with: all Observation fields plus ground_truth_root_cause,
ground_truth_fix, incident_resolved, total_reward, action_history,
episode_id, task_id, step count
"""
if _env._logic is None:
raise HTTPException(status_code=400, detail="Call /reset before /state")
return _env.state
@app.get("/tasks")
def list_tasks():
"""
List all 8 tasks with metadata.
Returns all available task IDs with their name, difficulty, max_steps,
and description. Use the task_id values in POST /reset to start an episode.
Returns:
{"tasks": [...]} — list of 8 task objects (7 curated + 1 procedural)
"""
return {
"tasks": [
{
"id": "easy",
"name": "Single Service OOM",
"difficulty": "easy",
"max_steps": 15,
"description": "One service crash-loops from a memory leak. Which service varies by seed.",
},
{
"id": "medium",
"name": "Cascading Multi-Service Failure",
"difficulty": "medium",
"max_steps": 20,
"description": (
"Bad deployment causes connection pool exhaustion cascading through 3 services. "
"One red-herring alert included."
),
},
{
"id": "hard",
"name": "Silent Data Corruption",
"difficulty": "hard",
"max_steps": 25,
"description": (
"No error-rate alerts fire. Signals are WARN-level logs and a business metric anomaly. "
"Requires rollback + on-call alert for full credit."
),
},
{
"id": "bonus",
"name": "Simultaneous Dual Failure",
"difficulty": "hard",
"max_steps": 25,
"description": (
"Two independent failures at once: disk full on log aggregator + "
"model reload CPU loop on ml-inference. Both must be fixed for full credit."
),
},
{
"id": "security",
"name": "Security Incident (DDoS)",
"difficulty": "hard",
"max_steps": 20,
"description": (
"A botnet is performing a DDoS and credential stuffing attack against the login endpoint. "
"The agent must read access logs, diagnose the attack IP range, block the CIDR, and alert the security team."
),
},
{
"id": "database",
"name": "Database Performance Degradation",
"difficulty": "hard",
"max_steps": 20,
"description": (
"A recent migration added a user_segment column to the orders table without an index. "
"Sequential table scans are spiking DB CPU. Discovered via read_metrics and the slow query log."
),
},
{
"id": "failover",
"name": "Multi-Region Failover",
"difficulty": "hard",
"max_steps": 25,
"description": (
"A primary datacenter region (us-east-1) is degraded due to a network partition. "
"The agent must correctly identify which services support automatic multi-region failover "
"and which do not. Failing over the wrong services causes severe data inconsistency penalties."
),
},
{
"id": "generated",
"name": "Procedural Incident",
"difficulty": "variable",
"max_steps": 20,
"description": "A seed-based procedural incident generated by ARIA. Deterministic and reproducible.",
},
]
}
@app.get("/validate")
def validate():
"""
Self-validation endpoint — runs all 7 curated tasks and returns per-task scores.
Instantiates each task environment with seed=42 and runs a random agent
for up to 30 steps. Verifies that: the environment runs without errors,
scores stay within [0.0, 1.0], and grading completes successfully.
This endpoint is safe to call at any time — it does not affect the current
episode state (the active _env._logic is restored after validation).
Returns:
{
"validation": "passed" | "failed",
"summary": "X/Y tasks passed validation",
"total_tasks": N,
"passed": N,
"tasks": [
{
"task_id": "easy",
"score": 0.12,
"in_range": true,
"resolved": false,
"steps": 15,
"status": "ok"
}, ...
]
}
"""
import random
from graders.grader import grade_episode
results = []
old_logic = _env._logic
for task_id in VALID_TASKS:
try:
import asyncio
from env import DevOpsIncidentEnv as LogicClass
env_logic = LogicClass(task_id=task_id, seed=42)
env_logic.reset()
done = False
steps = 0
import random as _random
while not done and steps < 30:
action = Action(action_type=_random.choice(list(ActionType)))
result = env_logic.step(action)
done = result.done
steps += 1
s = env_logic.state()
score = grade_episode(
task_id, s.action_history, s.ground_truth_root_cause,
s.ground_truth_fix, s.incident_resolved, s.total_reward,
)
results.append({
"task_id": task_id,
"score": round(float(score), 4),
"in_range": 0.0 <= score <= 1.0,
"resolved": s.incident_resolved,
"steps": steps,
"status": "ok",
})
except Exception as e:
results.append({"task_id": task_id, "status": "error", "error": str(e)})
_env._logic = old_logic
passed_count = sum(1 for r in results if r.get("status") == "ok" and r.get("in_range"))
total_count = len(results)
all_ok = passed_count == total_count
details = {}
for r in results:
details[r["task_id"]] = {
"status": "passed" if r.get("status") == "ok" and r.get("in_range") else r.get("status", "failed"),
"score": r.get("score"),
"resolved": r.get("resolved")
}
return {
"validation": "passed" if all_ok else "failed",
"summary": f"{passed_count}/{total_count} tasks passed validation",
"tasks_checked": total_count,
"tasks_passed": passed_count,
"details": details,
"environment": "devops-incident-response",
"version": "2.0.0",
"note": "Generated task excluded — procedural tasks require fixed parameters"
}
@app.get("/metrics")
def get_metrics():
"""
Aggregate episode statistics across all completed episodes.
Statistics are computed in-memory and reset when the server restarts.
Returns:
{
"total_episodes": N,
"overall_avg_score": 0.XX,
"by_task": {
"easy": {"count", "avg_score", "max_score", "min_score",
"resolution_rate", "avg_steps_to_diagnosis",
"avg_info_gathering_ratio"},
...
},
"last_updated": "ISO timestamp"
}
"""
total_episodes = len(episode_history)
by_task = {}
total_score = 0.0
if total_episodes == 0:
return {
"total_episodes": 0,
"by_task": {},
"overall_avg_score": 0.0,
"last_updated": datetime.utcnow().isoformat() + "Z"
}
for rec in episode_history:
tid = rec["task_id"]
if tid not in by_task:
by_task[tid] = {"scores": [], "resolved": 0, "steps_to_diag": [], "info_ratios": []}
by_task[tid]["scores"].append(rec["final_score"])
if rec["incident_resolved"]:
by_task[tid]["resolved"] += 1
if rec["steps_to_diagnosis"] is not None:
by_task[tid]["steps_to_diag"].append(rec["steps_to_diagnosis"])
by_task[tid]["info_ratios"].append(rec["info_gathering_ratio"])
total_score += rec["final_score"]
out_by_task = {}
for tid, agg in by_task.items():
cnt = len(agg["scores"])
out_by_task[tid] = {
"count": cnt,
"avg_score": round(sum(agg["scores"]) / cnt, 3),
"max_score": round(max(agg["scores"]), 3),
"min_score": round(min(agg["scores"]), 3),
"resolution_rate": round(agg["resolved"] / cnt, 3),
"avg_steps_to_diagnosis": round(sum(agg["steps_to_diag"]) / len(agg["steps_to_diag"]), 1) if agg["steps_to_diag"] else None,
"avg_info_gathering_ratio": round(sum(agg["info_ratios"]) / len(agg["info_ratios"]), 2) if agg["info_ratios"] else 0.0
}
return {
"total_episodes": total_episodes,
"by_task": out_by_task,
"overall_avg_score": round(total_score / total_episodes, 3),
"last_updated": datetime.utcnow().isoformat() + "Z"
}
@app.get("/leaderboard")
def get_leaderboard():
"""
Top-10 episodes ranked by score (ties broken by fewer steps).
Returns:
{"leaderboard": [{"rank", "task_id", "score", "steps", "timestamp"}, ...]}
"""
sorted_eps = sorted(episode_history, key=lambda x: (x["final_score"], -x["steps_taken"]), reverse=True)
top_10 = []
for i, rec in enumerate(sorted_eps[:10]):
top_10.append({
"rank": i + 1,
"task_id": rec["task_id"],
"score": rec["final_score"],
"steps": rec["steps_taken"],
"timestamp": rec["timestamp"]
})
return {"leaderboard": top_10}
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
# Independent environment instance for this connection
ws_env = DevOpsEnvironment()
try:
while True:
data = await websocket.receive_json()
command = data.get("command")
print(f"WebSocket received: {data}")
if command == "reset":
task_id = data.get("task_id", "easy")
seed = data.get("seed")
obs = await ws_env.reset(seed=seed, task_id=task_id)
await websocket.send_json({
"type": "observation",
"data": obs.model_dump() if hasattr(obs, "model_dump") else obs.dict()
})
elif command == "step":
if ws_env._logic is None:
await websocket.send_json({
"type": "error",
"message": "Call reset before step"
})
continue
action_data = data.get("action", {})
try:
action = Action(**action_data)
step_result = await ws_env.step(action)
if step_result.done:
track_episode(ws_env.state)
await websocket.send_json({
"type": "step_result",
"data": {
"observation": step_result.observation.model_dump() if hasattr(step_result.observation, "model_dump") else step_result.observation.dict(),
"reward": step_result.reward,
"done": step_result.done,
"info": step_result.info
}
})
except Exception as e:
await websocket.send_json({
"type": "error",
"message": str(e)
})
elif command == "state":
if ws_env._logic is None:
await websocket.send_json({
"type": "error",
"message": "Call reset before state"
})
continue
state = ws_env.state
await websocket.send_json({
"type": "state",
"data": state.model_dump() if hasattr(state, "model_dump") else state.dict()
})
else:
await websocket.send_json({
"type": "error",
"message": f"Unrecognized command: {command}"
})
except WebSocketDisconnect:
print("WebSocket client disconnected")
except Exception as e:
print(f"WebSocket error: {e}")
try:
await websocket.send_json({
"type": "error",
"message": str(e)
})
except:
pass
await websocket.close()
# ─── Multi-Agent Routes ────────────────────────────────────────────────────────
@app.post("/multi-agent/reset")
def multi_agent_reset(body: MultiAgentResetRequest):
"""
Start a new dual-agent session with split observability.
Creates two views of the same incident:
- Agent A (Observer): sees logs and active alerts only
- Agent B (Responder): sees metrics and service dependencies only
Args:
task_id: Task to run (same valid values as POST /reset)
seed: Deterministic seed (default: 42)
Returns:
session_id, agent roles, step instructions, and initial observations
for both agents.
"""
session = DualAgentSession(task_id=body.task_id, seed=body.seed)
multi_agent_sessions[session.session_id] = session
return {
"session_id": session.session_id,
"task_id": body.task_id,
"seed": body.seed,
"agent_a_role": "observer — sees logs and alerts only",
"agent_b_role": "responder — sees metrics and dependencies only",
"instructions": {
"agent_a": "POST /multi-agent/step/a/{session_id} body: {\"finding\": \"your observation\"}",
"agent_b": "POST /multi-agent/step/b/{session_id} body: Action JSON (same schema as POST /step)",
},
"observation_a": session.get_observation_a(),
"observation_b": session.get_observation_b(),
}
@app.post("/multi-agent/step/a/{session_id}")
def multi_agent_step_a(session_id: str, body: AgentAStepRequest):
"""
Agent A (Observer) shares a finding with Agent B.
Agent A sees logs and alerts only. Findings are appended to the shared
findings log that Agent B can see when deciding its next action.
Args:
session_id: Session ID from POST /multi-agent/reset
finding: Text description of what Agent A observed
Returns:
Updated findings log and current Observer-view observation.
"""
session = multi_agent_sessions.get(session_id)
if not session:
raise HTTPException(status_code=404, detail="Session not found")
return session.step_a(body.finding)
@app.post("/multi-agent/step/b/{session_id}")
def multi_agent_step_b(session_id: str, body: Action):
"""
Agent B (Responder) takes an action in the environment.
Agent B sees metrics and service dependencies. It receives all findings
shared by Agent A, then executes an action. Action schema is identical
to POST /step.
Args:
session_id: Session ID from POST /multi-agent/reset
body: Action object (same schema as POST /step)
Returns:
StepResult with reward, done flag, and updated Responder-view observation.
"""
session = multi_agent_sessions.get(session_id)
if not session:
raise HTTPException(status_code=404, detail="Session not found")
return session.step_b(body)
@app.get("/multi-agent/state/{session_id}")
def multi_agent_state(session_id: str):
"""
Full state for a dual-agent session including both agent perspectives.
Returns:
Session state with findings_log, step count, done flag,
and both Observer and Responder observations.
"""
session = multi_agent_sessions.get(session_id)
if not session:
raise HTTPException(status_code=404, detail="Session not found")
return session.get_state()
@app.get("/multi-agent/sessions")
def list_multi_agent_sessions():
"""
List all active dual-agent sessions.
Returns:
List of active sessions with session_id, task_id, current step,
done flag, and number of findings shared by Agent A.
"""
return [
{
"session_id": s.session_id,
"task_id": s.task_id,
"step": s.step_count,
"done": s.done,
"findings_count": len(s.findings_log),
}
for s in multi_agent_sessions.values()
]
# ─── Curriculum Routes ─────────────────────────────────────────────────────────
@app.get("/curriculum/status")
def get_curriculum_status():
"""
Agent mastery levels across all tasks.
Returns the curriculum engine's current view of agent performance:
rolling average score, mastery level (0–3), whether scaffolding is
needed, and a diagnostic hint per task.
Returns:
{"tasks": {"easy": {"rolling_avg", "mastery_level", "scaffold_needed", "hint"}, ...},
"recommended_task": "easy"}
"""
return curriculum_engine.get_status()
@app.get("/curriculum/next")
def get_next_curriculum_task():
"""
Recommended next task for adaptive training.
Returns the task with the lowest rolling average score among non-mastered
tasks. Training loops should call this between episodes to implement
curriculum learning automatically.
Returns:
{"recommended_task": "medium", "reasoning": "..."}
"""
return {
"recommended_task": curriculum_engine.get_next_curriculum_task(),
"reasoning": "Lowest rolling average among non-mastered tasks.",
}
@app.post("/curriculum/record")
def record_curriculum_episode(req: CurriculumRecordRequest):
"""
Record an episode result to update the curriculum engine.
Training loops should call this after each episode to keep the
curriculum engine's rolling averages and mastery levels current.
Args:
task_id: Task that was just run
score: Episode score (float, typically 0.0–1.0)
Returns:
{"recorded": true, "new_status": {...}} — updated task status
"""
try:
curriculum_engine.record_episode(req.task_id, req.score)
except ValueError as exc:
raise HTTPException(status_code=400, detail=str(exc))
return {
"recorded": True,
"new_status": curriculum_engine.get_status()["tasks"][req.task_id],
}
@app.get("/curriculum/hint/{task_id}")
def get_curriculum_hint(task_id: str):
"""
Get a diagnostic hint and scaffold flag for a specific task.
If an agent is repeatedly failing a task, this returns a structured hint
explaining what the agent should try (e.g., "read logs before acting").
Args:
task_id: One of easy/medium/hard/bonus/security/database/failover
Returns:
{"task_id", "hint", "scaffold_needed": bool, "mastery_level": 0–3}
"""
try:
return {
"task_id": task_id,
"hint": curriculum_engine.get_hint(task_id),
"scaffold_needed": curriculum_engine.should_scaffold(task_id),
"mastery_level": curriculum_engine.get_mastery(task_id),
}
except ValueError as exc:
raise HTTPException(status_code=400, detail=str(exc))
# ─── Feature 1: Episode Replay System ────────────────────────────────────────
@app.get("/replays")
def list_replays():
"""List available episode replays, newest first."""
items = []
for ep_id, r in replay_store.items():
items.append({
"episode_id": r["episode_id"],
"task_id": r["task_id"],
"score": r["final_score"],
"resolved": r["resolved"],
"total_steps": r["total_steps"],
"timestamp": r["timestamp"],
})
items.sort(key=lambda x: int(x["episode_id"]), reverse=True)
return items
@app.get("/replay/{episode_id}")
def get_replay(episode_id: str):
"""Return full replay data for an episode."""
if episode_id not in replay_store:
raise HTTPException(status_code=404, detail="Episode not found")
return replay_store[episode_id]
@app.get("/replay/{episode_id}/html", response_class=HTMLResponse)
def get_replay_html(episode_id: str):
"""Return an HTML timeline visualization of an episode replay."""
if episode_id not in replay_store:
raise HTTPException(status_code=404, detail="Episode not found")
r = replay_store[episode_id]
def reward_color(reward: float) -> str:
if reward > 0.2:
return "#00ff88"
if reward > 0:
return "#ffaa00"
if reward == 0:
return "#ff3355"
return "#ff3355"
def reward_bg(reward: float) -> str:
if reward > 0.2:
return "rgba(0,255,136,0.08)"
if reward > 0:
return "rgba(255,170,0,0.08)"
return "rgba(255,51,85,0.08)"
steps_html = ""
running_score = 0.0
for s in r["steps"]:
running_score += s["reward"]
act = s["action"]
act_type = act.get("action_type", "unknown")
service = act.get("service") or ""
rc = act.get("root_cause") or ""
obs = s["observation_summary"]
col = reward_color(s["reward"])
bg = reward_bg(s["reward"])
reward_sign = "+" if s["reward"] > 0 else ""
failing = ", ".join(obs["failing_services"]) if obs["failing_services"] else "none"
steps_html += f"""