| """ |
| server/scenarios.py — Procedural scenario generation from seed + difficulty. |
| |
| Maps difficulty to graph topology, failure count, and failure placement. |
| Same seed + same difficulty = identical scenario every time. |
| """ |
|
|
| from __future__ import annotations |
|
|
| import random |
| from dataclasses import dataclass, field |
| from typing import List, Optional |
|
|
| from server.failures import ( |
| FailureSpec, |
| FailureType, |
| make_failure_spec, |
| select_failure_type, |
| select_multi_root_failures, |
| ) |
| from server.graph import ServiceGraph, generate_graph |
|
|
|
|
| @dataclass |
| class ScenarioConfig: |
| """Complete scenario definition for one episode.""" |
| difficulty: str |
| seed: int |
| graph: ServiceGraph |
| failure_specs: List[FailureSpec] |
| max_steps: int |
| description: str |
|
|
|
|
| |
| |
| |
|
|
| TASK_DEFINITIONS = [ |
| { |
| "task_id": "easy", |
| "name": "Single Service Outage", |
| "difficulty": "easy", |
| "description": ( |
| "A single service in a small linear microservice chain is experiencing failures. " |
| "Diagnose the root cause and apply the correct remediation within 10 steps." |
| ), |
| "max_steps": 10, |
| "num_failures": 1, |
| }, |
| { |
| "task_id": "medium", |
| "name": "Cascading Failure", |
| "difficulty": "medium", |
| "description": ( |
| "A failure in a shared infrastructure service is cascading through a branching " |
| "dependency graph. Trace the root cause upstream from symptomatic services and " |
| "remediate within 20 steps." |
| ), |
| "max_steps": 20, |
| "num_failures": 1, |
| }, |
| { |
| "task_id": "hard", |
| "name": "Multi-Root Sev-0 Incident", |
| "difficulty": "hard", |
| "description": ( |
| "Multiple simultaneous failures across a multi-region microservice architecture. " |
| "Failures may have conflicting mitigations. Triage, diagnose, and resolve all " |
| "root causes within 50 steps." |
| ), |
| "max_steps": 50, |
| "num_failures": 3, |
| }, |
| ] |
|
|
|
|
| def get_task_definition(task_id: str) -> dict: |
| """Get a task definition by ID.""" |
| for t in TASK_DEFINITIONS: |
| if t["task_id"] == task_id: |
| return t |
| raise ValueError(f"Unknown task_id: {task_id!r}. Must be one of: easy, medium, hard") |
|
|
|
|
| |
| |
| |
|
|
|
|
| def _pick_failure_target( |
| graph: ServiceGraph, |
| failure_type: FailureType, |
| rng: random.Random, |
| exclude: set, |
| ) -> Optional[str]: |
| """Pick an appropriate service to inject this failure type into.""" |
| candidates = [] |
|
|
| for node in graph.nodes: |
| if node.id in exclude: |
| continue |
|
|
| |
| if failure_type == FailureType.CACHE_FAILURE: |
| if node.is_cache: |
| candidates.append(node.id) |
| continue |
|
|
| |
| if failure_type == FailureType.DB_DEGRADATION: |
| if node.layer == "infra" and "postgres" in node.id: |
| candidates.append(node.id) |
| continue |
|
|
| |
| if failure_type == FailureType.NETWORK_ERROR: |
| if node.layer != "edge": |
| candidates.append(node.id) |
| continue |
|
|
| |
| if failure_type in (FailureType.CONFIG_STARTUP, FailureType.CONFIG_RUNTIME): |
| if node.layer != "edge": |
| candidates.append(node.id) |
| continue |
|
|
| |
| if failure_type == FailureType.BAD_DEPLOY: |
| if node.layer in ("business", "identity"): |
| candidates.append(node.id) |
| continue |
|
|
| |
| if failure_type == FailureType.RESOURCE_LEAK: |
| if node.layer in ("business", "identity"): |
| candidates.append(node.id) |
| continue |
|
|
| |
| if failure_type == FailureType.CRASH: |
| if node.layer != "edge": |
| candidates.append(node.id) |
| continue |
|
|
| |
| if failure_type == FailureType.CASCADING_LATENCY: |
| if node.is_hotspot or node.layer == "business": |
| candidates.append(node.id) |
| continue |
|
|
| if not candidates: |
| |
| candidates = [n.id for n in graph.nodes if n.layer != "edge" and n.id not in exclude] |
|
|
| if not candidates: |
| return None |
|
|
| return rng.choice(candidates) |
|
|
|
|
| |
| |
| |
|
|
|
|
| def generate_scenario(seed: int, task_id: str) -> ScenarioConfig: |
| """ |
| Generate a complete scenario for the given task and seed. |
| Deterministic: same seed + same task_id = identical scenario. |
| """ |
| task = get_task_definition(task_id) |
| rng = random.Random(seed) |
|
|
| |
| difficulty = task["difficulty"] |
| graph = generate_graph(difficulty, rng) |
|
|
| |
| num_failures = task["num_failures"] |
| used_services: set = set() |
| failure_specs: List[FailureSpec] = [] |
|
|
| if num_failures == 1: |
| ft = select_failure_type(rng) |
| target = _pick_failure_target(graph, ft, rng, used_services) |
| if target: |
| spec = make_failure_spec(target, ft, rng) |
| failure_specs.append(spec) |
| used_services.add(target) |
| else: |
| failure_types = select_multi_root_failures(rng, count=num_failures) |
| for ft in failure_types: |
| target = _pick_failure_target(graph, ft, rng, used_services) |
| if target: |
| spec = make_failure_spec(target, ft, rng) |
| failure_specs.append(spec) |
| used_services.add(target) |
|
|
| return ScenarioConfig( |
| difficulty=difficulty, |
| seed=seed, |
| graph=graph, |
| failure_specs=failure_specs, |
| max_steps=task["max_steps"], |
| description=task["description"], |
| ) |
|
|