sevzero / server /scenarios.py
Mist-ic's picture
Add core simulation engine, environment, grader, and app wiring
0e4dd30
"""
server/scenarios.py — Procedural scenario generation from seed + difficulty.
Maps difficulty to graph topology, failure count, and failure placement.
Same seed + same difficulty = identical scenario every time.
"""
from __future__ import annotations
import random
from dataclasses import dataclass, field
from typing import List, Optional
from server.failures import (
FailureSpec,
FailureType,
make_failure_spec,
select_failure_type,
select_multi_root_failures,
)
from server.graph import ServiceGraph, generate_graph
@dataclass
class ScenarioConfig:
"""Complete scenario definition for one episode."""
difficulty: str
seed: int
graph: ServiceGraph
failure_specs: List[FailureSpec]
max_steps: int
description: str
# ---------------------------------------------------------------------------
# Task definitions (the 3 required tasks)
# ---------------------------------------------------------------------------
TASK_DEFINITIONS = [
{
"task_id": "easy",
"name": "Single Service Outage",
"difficulty": "easy",
"description": (
"A single service in a small linear microservice chain is experiencing failures. "
"Diagnose the root cause and apply the correct remediation within 10 steps."
),
"max_steps": 10,
"num_failures": 1,
},
{
"task_id": "medium",
"name": "Cascading Failure",
"difficulty": "medium",
"description": (
"A failure in a shared infrastructure service is cascading through a branching "
"dependency graph. Trace the root cause upstream from symptomatic services and "
"remediate within 20 steps."
),
"max_steps": 20,
"num_failures": 1,
},
{
"task_id": "hard",
"name": "Multi-Root Sev-0 Incident",
"difficulty": "hard",
"description": (
"Multiple simultaneous failures across a multi-region microservice architecture. "
"Failures may have conflicting mitigations. Triage, diagnose, and resolve all "
"root causes within 50 steps."
),
"max_steps": 50,
"num_failures": 3,
},
]
def get_task_definition(task_id: str) -> dict:
"""Get a task definition by ID."""
for t in TASK_DEFINITIONS:
if t["task_id"] == task_id:
return t
raise ValueError(f"Unknown task_id: {task_id!r}. Must be one of: easy, medium, hard")
# ---------------------------------------------------------------------------
# Failure placement logic
# ---------------------------------------------------------------------------
def _pick_failure_target(
graph: ServiceGraph,
failure_type: FailureType,
rng: random.Random,
exclude: set,
) -> Optional[str]:
"""Pick an appropriate service to inject this failure type into."""
candidates = []
for node in graph.nodes:
if node.id in exclude:
continue
# Cache failures only on cache services
if failure_type == FailureType.CACHE_FAILURE:
if node.is_cache:
candidates.append(node.id)
continue
# DB degradation on infra services (postgres, etc.)
if failure_type == FailureType.DB_DEGRADATION:
if node.layer == "infra" and "postgres" in node.id:
candidates.append(node.id)
continue
# Network errors prefer non-edge services
if failure_type == FailureType.NETWORK_ERROR:
if node.layer != "edge":
candidates.append(node.id)
continue
# Config errors on any non-edge service
if failure_type in (FailureType.CONFIG_STARTUP, FailureType.CONFIG_RUNTIME):
if node.layer != "edge":
candidates.append(node.id)
continue
# Bad deploy on business or identity services
if failure_type == FailureType.BAD_DEPLOY:
if node.layer in ("business", "identity"):
candidates.append(node.id)
continue
# Resource leak on business services
if failure_type == FailureType.RESOURCE_LEAK:
if node.layer in ("business", "identity"):
candidates.append(node.id)
continue
# Crash on any non-edge service
if failure_type == FailureType.CRASH:
if node.layer != "edge":
candidates.append(node.id)
continue
# Cascading latency: prefer hotspot infra or busy business
if failure_type == FailureType.CASCADING_LATENCY:
if node.is_hotspot or node.layer == "business":
candidates.append(node.id)
continue
if not candidates:
# Fallback: any non-edge service
candidates = [n.id for n in graph.nodes if n.layer != "edge" and n.id not in exclude]
if not candidates:
return None
return rng.choice(candidates)
# ---------------------------------------------------------------------------
# Scenario generation
# ---------------------------------------------------------------------------
def generate_scenario(seed: int, task_id: str) -> ScenarioConfig:
"""
Generate a complete scenario for the given task and seed.
Deterministic: same seed + same task_id = identical scenario.
"""
task = get_task_definition(task_id)
rng = random.Random(seed)
# Generate graph
difficulty = task["difficulty"]
graph = generate_graph(difficulty, rng)
# Select and place failures
num_failures = task["num_failures"]
used_services: set = set()
failure_specs: List[FailureSpec] = []
if num_failures == 1:
ft = select_failure_type(rng)
target = _pick_failure_target(graph, ft, rng, used_services)
if target:
spec = make_failure_spec(target, ft, rng)
failure_specs.append(spec)
used_services.add(target)
else:
failure_types = select_multi_root_failures(rng, count=num_failures)
for ft in failure_types:
target = _pick_failure_target(graph, ft, rng, used_services)
if target:
spec = make_failure_spec(target, ft, rng)
failure_specs.append(spec)
used_services.add(target)
return ScenarioConfig(
difficulty=difficulty,
seed=seed,
graph=graph,
failure_specs=failure_specs,
max_steps=task["max_steps"],
description=task["description"],
)