IncidentMind / test_env_validation.py
amrita8642's picture
Deploying incidentmind on hugging face
ec81028
"""
test_env_validation.py
======================
IncidentMind — Production-Grade Environment Validation Suite
============================================================
Test categories:
T1 Determinism — same seed → identical output every time
T2 Task constraints — alert counts, noise %, red-herring counts in-spec
T3 Temporal ordering — alerts sorted by timestamp; cascade after root cause
T4 Noise / red herrings — flags, severity rules enforced
T5 Grader correctness — perfect=high score; greedy/wrong=penalized
T6 Performance — full pipeline < 500ms per iteration
T7 Service graph — DAG, 12+ services, cascade hops, health states
T8 Partial observability — hidden fields masked until INVESTIGATE
T9 Runbook registry — 7 runbooks, applicability, effect correctness
T10 Multi-root scoring — task3 partial + full credit paths
Usage:
python test_env_validation.py
python test_env_validation.py -v # verbose sub-step detail
python test_env_validation.py --fast # skip performance test
"""
from __future__ import annotations
import argparse
import sys
import time
import traceback
from typing import Any, Callable, Dict, List
import numpy as np
import os as _os
sys.path.insert(0, _os.path.dirname(_os.path.abspath(__file__)))
from envs.service_graph import ServiceGraph, HealthState, health_state_from_score
from envs.incident_generator import IncidentGenerator, IncidentScenario
from envs.alert_generator import AlertGenerator, Alert, AlertSeverity
from envs.runbooks import RunbookRegistry, SimulatedState
from envs.grader import Grader, ActionType, GradeResult
from envs.tasks import get_task, list_tasks
# ─────────────────────────── runner infra ───────────────────────────────────
_PASS = "\033[92m PASS\033[0m"
_FAIL = "\033[91m FAIL\033[0m"
_SKIP = "\033[93m SKIP\033[0m"
_TITLE = "\033[1;96m"
_RESET = "\033[0m"
_results: List[Dict[str, Any]] = []
_verbose = False
def _log(msg: str) -> None:
if _verbose:
print(f" {msg}")
def run_test(name: str, fn: Callable[[], None], skip: bool = False) -> bool:
if skip:
print(f"{_SKIP} {name}")
_results.append({"name": name, "status": "SKIP"})
return True
try:
fn()
print(f"{_PASS} {name}")
_results.append({"name": name, "status": "PASS"})
return True
except AssertionError as exc:
print(f"{_FAIL} {name}")
print(f" AssertionError: {exc}")
_results.append({"name": name, "status": "FAIL", "error": str(exc)})
return False
except Exception as exc:
print(f"{_FAIL} {name}")
print(f" Exception: {exc}")
if _verbose:
traceback.print_exc()
_results.append({"name": name, "status": "FAIL", "error": str(exc)})
return False
def section(title: str) -> None:
print(f"\n{_TITLE}{'─'*65}{_RESET}")
print(f"{_TITLE} {title}{_RESET}")
print(f"{_TITLE}{'─'*65}{_RESET}")
# ─────────────────────────── shared fixtures ────────────────────────────────
SEED = 42
_sg = ServiceGraph()
_gen = IncidentGenerator(seed=SEED)
_ag = AlertGenerator(seed=SEED)
_reg = RunbookRegistry()
_grdr = Grader()
_scenarios = {tid: _gen.generate(tid) for tid in (1, 2, 3)}
_alerts = {tid: _ag.generate(_scenarios[tid]) for tid in (1, 2, 3)}
def _build_gt(sc: IncidentScenario) -> Dict:
root_svcs = sc.metadata.get("root_services",
[sc.metadata.get("root_service", "")])
if isinstance(root_svcs, str):
root_svcs = [root_svcs]
return {
"task_id": sc.task_id,
"root_cause_alert_ids": sc.root_cause_alert_ids,
"cascade_chain": [{"alert_id": s.alert_id} for s in sc.cascade_chain],
"noise_alert_ids": sc.noise_alert_ids,
"red_herring_alert_ids": sc.red_herring_alert_ids,
"correct_runbook_ids": sc.correct_runbook_ids,
"involved_services": sc.involved_services,
"root_services": root_svcs,
"alerts_by_service": {},
}
def _perfect_actions(sc: IncidentScenario, al: List[Alert]) -> List[Dict]:
acts, step = [], 0
for a in al[:3]:
acts.append({"type": ActionType.INVESTIGATE, "alert_id": a.id, "runbook_id": None, "step": step}); step += 1
for rc in sc.root_cause_alert_ids:
acts.append({"type": ActionType.IDENTIFY_ROOT_CAUSE, "alert_id": rc, "runbook_id": None, "step": step}); step += 1
for rb in sc.correct_runbook_ids:
acts.append({"type": ActionType.APPLY_RUNBOOK, "alert_id": None, "runbook_id": rb, "step": step}); step += 1
for nid in sc.noise_alert_ids:
acts.append({"type": ActionType.DISMISS_NOISE, "alert_id": nid, "runbook_id": None, "step": step}); step += 1
acts.append({"type": ActionType.RESOLVE, "alert_id": None, "runbook_id": None, "step": step})
return acts
def _greedy_actions(sc: IncidentScenario, al: List[Alert]) -> List[Dict]:
first_crit = next((a for a in al if a.severity == AlertSeverity.CRITICAL), al[0])
return [
{"type": ActionType.IDENTIFY_ROOT_CAUSE, "alert_id": first_crit.id, "runbook_id": None, "step": 0},
{"type": ActionType.RESOLVE, "alert_id": None, "runbook_id": None, "step": 1},
]
# ══════════════════════════════════════════════════════════════════════════════
# T1 — Determinism
# ══════════════════════════════════════════════════════════════════════════════
def test_determinism_scenario():
for tid in (1, 2, 3):
a, b = IncidentGenerator(seed=SEED).generate(tid), IncidentGenerator(seed=SEED).generate(tid)
assert a.root_cause_alert_ids == b.root_cause_alert_ids, f"Task {tid}: root_cause_alert_ids differ"
assert len(a.cascade_chain) == len(b.cascade_chain), f"Task {tid}: cascade chain length differs"
assert a.noise_alert_ids == b.noise_alert_ids, f"Task {tid}: noise_alert_ids differ"
_log(f"Task {tid} scenario OK")
def test_determinism_alerts():
for tid in (1, 2, 3):
sc = _scenarios[tid]
al_a = AlertGenerator(seed=SEED).generate(sc)
al_b = AlertGenerator(seed=SEED).generate(sc)
assert [a.id for a in al_a] == [a.id for a in al_b], f"Task {tid}: alert IDs differ"
assert [a.timestamp_offset for a in al_a] == [a.timestamp_offset for a in al_b], f"Task {tid}: timestamps differ"
_log(f"Task {tid} alerts OK ({len(al_a)} alerts)")
def test_determinism_failure_propagation():
for root in ("payment-db", "redis-cache", "storage-node"):
hops_a = ServiceGraph().simulate_failure_impact(root, np.random.RandomState(SEED))
hops_b = ServiceGraph().simulate_failure_impact(root, np.random.RandomState(SEED))
assert [h.service for h in hops_a] == [h.service for h in hops_b], f"{root}: cascade services differ"
for ha, hb in zip(hops_a, hops_b):
assert abs(ha.delay_seconds - hb.delay_seconds) < 1e-9, f"{root}: delay mismatch at {ha.service}"
_log(f"{root}: {[h.service for h in hops_a]}")
def test_determinism_grader():
for tid in (1, 2, 3):
gt = _build_gt(_scenarios[tid])
acts = _perfect_actions(_scenarios[tid], _alerts[tid])
r1, r2 = _grdr.grade(gt, acts, tid), _grdr.grade(gt, acts, tid)
assert r1.total_score == r2.total_score, f"Task {tid}: total_score not deterministic"
assert r1.root_cause_score == r2.root_cause_score, f"Task {tid}: root_cause_score not deterministic"
_log(f"Task {tid} grader OK (score={r1.total_score:.4f})")
# ══════════════════════════════════════════════════════════════════════════════
# T2 — Task constraints
# ══════════════════════════════════════════════════════════════════════════════
def test_task_alert_counts():
for tid in (1, 2, 3):
task = get_task(tid)
n = len(_alerts[tid])
lo, hi = task.alert_count_range
_log(f"Task {tid}: {n} alerts, range=[{lo},{hi}]")
assert lo <= n <= hi, f"Task {tid}: alert count {n} outside [{lo},{hi}]"
def test_task_noise_percentage():
for tid in (1, 2, 3):
task = get_task(tid)
alerts = _alerts[tid]
n_noise = sum(1 for a in alerts if a.is_noise)
pct = n_noise / len(alerts) if alerts else 0.0
_log(f"Task {tid}: noise={n_noise}/{len(alerts)} ({pct:.0%}) expected≈{task.noise_percentage:.0%}")
if tid == 1:
assert n_noise == 0, f"Task 1 must have 0 noise alerts, got {n_noise}"
else:
assert abs(pct - task.noise_percentage) <= 0.15, (
f"Task {tid}: noise% {pct:.2%} deviates > 15% from expected {task.noise_percentage:.2%}")
def test_task_red_herring_counts():
for tid in (1, 2, 3):
task = get_task(tid)
n_rh = len(_scenarios[tid].red_herring_alert_ids)
_log(f"Task {tid}: red_herrings={n_rh} expected={task.red_herring_count}")
assert n_rh == task.red_herring_count, f"Task {tid}: expected {task.red_herring_count} RH, got {n_rh}"
def test_task_definitions_sane():
tasks = list_tasks()
assert len(tasks) == 3
assert [t.task_id for t in tasks] == [1, 2, 3]
for t in tasks:
lo, hi = t.alert_count_range
assert lo < hi
assert 0.0 <= t.noise_percentage <= 1.0
assert 0.0 < t.passing_score <= 1.0
assert t.max_steps > 0
_log(f"Task {t.task_id} ({t.difficulty}): max_steps={t.max_steps}, passing={t.passing_score}")
# ══════════════════════════════════════════════════════════════════════════════
# T3 — Temporal ordering
# ══════════════════════════════════════════════════════════════════════════════
def test_temporal_sort_order():
for tid in (1, 2, 3):
ts = [a.timestamp_offset for a in _alerts[tid]]
assert ts == sorted(ts), f"Task {tid}: alerts not time-sorted"
_log(f"Task {tid}: {len(ts)} alerts time-sorted ✓")
def test_root_cause_fires_first():
for tid in (1, 2, 3):
sc = _scenarios[tid]
rc_ts = [a.timestamp_offset for a in _alerts[tid] if a.id in sc.root_cause_alert_ids]
all_ts = [a.timestamp_offset for a in _alerts[tid]]
assert rc_ts, f"Task {tid}: no root cause alerts found"
min_rc = min(rc_ts)
avg_all = sum(all_ts) / len(all_ts)
assert min_rc <= avg_all, f"Task {tid}: root cause T={min_rc:.1f} > avg T={avg_all:.1f}"
_log(f"Task {tid}: root cause T={min_rc:.1f}s, avg all={avg_all:.1f}s")
def test_cascade_after_root_cause():
for tid in (1, 2, 3):
sc = _scenarios[tid]
cascade_ids = {s.alert_id for s in sc.cascade_chain}
for a in _alerts[tid]:
if a.id in cascade_ids:
assert a.timestamp_offset > 0.0, \
f"Task {tid}: cascade alert {a.id} at T={a.timestamp_offset} not > 0"
_log(f"Task {tid}: all cascade alerts post-T0 ✓")
def test_cascade_delays_monotonic():
for tid in (1, 2, 3):
delays = [s.delay_seconds for s in _scenarios[tid].cascade_chain]
for i in range(1, len(delays)):
assert delays[i] >= delays[i-1], \
f"Task {tid}: cascade delay at stage {i} ({delays[i]:.2f}) < stage {i-1} ({delays[i-1]:.2f})"
_log(f"Task {tid}: cascade delays {[round(d,1) for d in delays]}")
# ══════════════════════════════════════════════════════════════════════════════
# T4 — Noise / red-herring flags
# ══════════════════════════════════════════════════════════════════════════════
def test_noise_flags_correct():
for tid in (1, 2, 3):
noise_ids = set(_scenarios[tid].noise_alert_ids)
for a in _alerts[tid]:
if a.id in noise_ids:
assert a.is_noise, f"Task {tid}: {a.id} in noise_ids but is_noise=False"
assert not a.is_root_cause, f"Task {tid}: noise {a.id} has is_root_cause=True"
def test_root_cause_flags_correct():
for tid in (1, 2, 3):
rc_ids = set(_scenarios[tid].root_cause_alert_ids)
for a in _alerts[tid]:
if a.id in rc_ids:
assert a.is_root_cause, f"Task {tid}: {a.id} in RC ids but is_root_cause=False"
assert not a.is_noise, f"Task {tid}: root cause {a.id} has is_noise=True"
def test_red_herring_severity():
high_sev = {AlertSeverity.CRITICAL, AlertSeverity.HIGH}
for tid in (1, 2, 3):
sc = _scenarios[tid]
rh_ids = set(sc.red_herring_alert_ids)
for a in _alerts[tid]:
if a.id in rh_ids:
assert a.severity in high_sev, \
f"Task {tid}: red herring {a.id} severity={a.severity} (must be HIGH/CRITICAL)"
_log(f"Task {tid}: {len(rh_ids)} red herrings — all HIGH/CRITICAL ✓")
def test_noise_not_critical():
for tid in (1, 2, 3):
noise_ids = set(_scenarios[tid].noise_alert_ids)
for a in _alerts[tid]:
if a.id in noise_ids:
assert a.severity != AlertSeverity.CRITICAL, \
f"Task {tid}: noise alert {a.id} is CRITICAL"
def test_partial_observability_masking():
for tid in (1, 2, 3):
a = _alerts[tid][0]
obs_hidden = a.to_observation(investigated=False)
obs_open = a.to_observation(investigated=True)
assert isinstance(obs_open["related_services"], list), \
f"Alert {a.id}: related_services must be list when investigated"
assert "REDACTED" in str(obs_hidden["related_services"]) or \
obs_hidden["related_services"] != obs_open["related_services"], \
f"Alert {a.id}: hidden field not masked when uninvestigated"
_log(f"Task {tid}: masking OK on alert {a.id}")
# ══════════════════════════════════════════════════════════════════════════════
# T5 — Grader correctness
# ══════════════════════════════════════════════════════════════════════════════
def test_perfect_agent_passes():
for tid in (1, 2, 3):
sc = _scenarios[tid]; al = _alerts[tid]
r = _grdr.grade(_build_gt(sc), _perfect_actions(sc, al), tid)
_log(f"Task {tid}: total={r.total_score:.4f} rc={r.root_cause_score:.4f} "
f"rb={r.runbook_score:.4f} ns={r.noise_suppression_score:.4f} eff={r.efficiency_score:.4f}")
assert r.passed, f"Task {tid}: perfect agent did not pass"
assert r.root_cause_score == 1.0, f"Task {tid}: perfect RC should score 1.0"
assert r.runbook_score == 1.0, f"Task {tid}: perfect runbook should score 1.0"
def test_greedy_agent_penalized():
for tid in (1, 2, 3):
sc = _scenarios[tid]; al = _alerts[tid]; gt = _build_gt(sc)
rp = _grdr.grade(gt, _perfect_actions(sc, al), tid)
rg = _grdr.grade(gt, _greedy_actions(sc, al), tid)
_log(f"Task {tid}: perfect={rp.total_score:.4f} greedy={rg.total_score:.4f}")
assert rg.efficiency_score < rp.efficiency_score, \
f"Task {tid}: greedy efficiency not penalized"
assert rg.total_score < rp.total_score, \
f"Task {tid}: greedy total_score not below perfect"
def test_wrong_root_cause_penalized():
for tid in (1, 2, 3):
sc = _scenarios[tid]; al = _alerts[tid]; gt = _build_gt(sc)
wrong_id = next((a.id for a in al if not a.is_root_cause), al[-1].id)
acts = [{"type": ActionType.INVESTIGATE, "alert_id": al[0].id, "runbook_id": None, "step": 0},
{"type": ActionType.IDENTIFY_ROOT_CAUSE,"alert_id": wrong_id, "runbook_id": None, "step": 1},
{"type": ActionType.RESOLVE, "alert_id": None, "runbook_id": None, "step": 2}]
r = _grdr.grade(gt, acts, tid)
_log(f"Task {tid}: wrong RC → rc_score={r.root_cause_score:.4f}")
assert r.root_cause_score < 1.0, f"Task {tid}: wrong RC should reduce root_cause_score"
def test_wrong_runbook_penalized():
for tid in (1, 2, 3):
sc = _scenarios[tid]; al = _alerts[tid]; gt = _build_gt(sc)
acts = []
step = 0
for a in al[:2]:
acts.append({"type": ActionType.INVESTIGATE, "alert_id": a.id, "runbook_id": None, "step": step}); step+=1
for rc in sc.root_cause_alert_ids:
acts.append({"type": ActionType.IDENTIFY_ROOT_CAUSE, "alert_id": rc, "runbook_id": None, "step": step}); step+=1
acts.append({"type": ActionType.APPLY_RUNBOOK, "alert_id": None, "runbook_id": "rb_wrong_action", "step": step}); step+=1
acts.append({"type": ActionType.RESOLVE, "alert_id": None, "runbook_id": None, "step": step})
r = _grdr.grade(gt, acts, tid)
_log(f"Task {tid}: wrong runbook → rb_score={r.runbook_score:.4f}")
assert r.runbook_score < 1.0, f"Task {tid}: wrong runbook should reduce runbook_score"
def test_no_action_scores_zero():
for tid in (1, 2, 3):
r = _grdr.grade(_build_gt(_scenarios[tid]), [], tid)
assert r.total_score == 0.0, f"Task {tid}: empty actions should score 0.0, got {r.total_score}"
_log(f"Task {tid}: no-action → 0.0 ✓")
def test_grade_result_complete():
for tid in (1, 2, 3):
sc = _scenarios[tid]; al = _alerts[tid]
r = _grdr.grade(_build_gt(sc), _perfect_actions(sc, al), tid)
for attr in ("total_score","root_cause_score","runbook_score","noise_suppression_score","efficiency_score"):
val = getattr(r, attr)
assert 0.0 <= val <= 1.0, f"Task {tid}: {attr}={val} out of [0,1]"
assert isinstance(r.passed, bool)
assert "task_id" in r.details
# ══════════════════════════════════════════════════════════════════════════════
# T6 — Performance
# ══════════════════════════════════════════════════════════════════════════════
def test_pipeline_performance():
MAX_MS = 500.0
ITERS = 10
for tid in (1, 2, 3):
t0 = time.perf_counter()
for i in range(ITERS):
sc = IncidentGenerator(seed=SEED+i).generate(tid)
al = AlertGenerator(seed=SEED+i).generate(sc)
_grdr.grade(_build_gt(sc), _perfect_actions(sc, al), tid)
elapsed_ms = (time.perf_counter() - t0) * 1000
per_ms = elapsed_ms / ITERS
_log(f"Task {tid}: {ITERS} iters in {elapsed_ms:.1f}ms ({per_ms:.1f}ms each)")
assert per_ms < MAX_MS, f"Task {tid}: avg {per_ms:.1f}ms > {MAX_MS}ms limit"
# ══════════════════════════════════════════════════════════════════════════════
# T7 — Service graph
# ══════════════════════════════════════════════════════════════════════════════
def test_service_count():
svcs = _sg.get_all_services()
_log(f"{len(svcs)} services: {svcs}")
assert len(svcs) >= 12, f"Need >= 12 services, got {len(svcs)}"
def test_graph_is_dag():
import networkx as nx
g = _sg.get_graph()
assert nx.is_directed_acyclic_graph(g), "Service graph contains a cycle"
_log(f"DAG: {g.number_of_nodes()} nodes, {g.number_of_edges()} edges")
def test_all_tiers_present():
required = {"frontend", "backend", "data", "infra"}
found = {_sg.get_metadata(s)["tier"] for s in _sg.get_all_services()}
_log(f"Tiers found: {found}")
assert required <= found, f"Missing tiers: {required - found}"
def test_criticality_range():
for svc in _sg.get_all_services():
m = _sg.get_metadata(svc)
assert 0.0 <= m["criticality_score"] <= 1.0, f"{svc}: criticality_score out of range"
assert 0.0 <= m["failure_sensitivity"] <= 1.0, f"{svc}: failure_sensitivity out of range"
assert m["health_score"] == 1.0, f"{svc}: initial health != 1.0"
def test_cascade_hops():
for root in ("payment-db", "redis-cache", "storage-node"):
sg = ServiceGraph()
hops = sg.simulate_failure_impact(root, np.random.RandomState(SEED))
_log(f"{root}: {[h.service for h in hops]}")
assert len(hops) >= 1, f"{root}: expected >= 1 hop"
assert hops[0].service == root, f"{root}: first hop must be root itself"
assert hops[0].delay_seconds == 0.0, f"{root}: root hop delay must be 0.0"
def test_health_states():
assert health_state_from_score(1.0) == HealthState.NORMAL
assert health_state_from_score(0.79) == HealthState.DEGRADED
assert health_state_from_score(0.39) == HealthState.FAILING
def test_graph_reset():
sg = ServiceGraph()
sg.simulate_failure_impact("redis-cache", np.random.RandomState(SEED))
sg.reset_all_health()
for svc in sg.get_all_services():
assert _sg.get_metadata(svc)["health_score"] == 1.0, f"{svc}: not reset"
# ══════════════════════════════════════════════════════════════════════════════
# T8 — Partial observability
# ══════════════════════════════════════════════════════════════════════════════
def test_investigation_reveals_fields():
for tid in (1, 2, 3):
a = _alerts[tid][0]
hidden = a.to_observation(investigated=False)
open_ = a.to_observation(investigated=True)
assert isinstance(open_["related_services"], list), \
f"Alert {a.id}: related_services not list when investigated"
assert "REDACTED" in str(hidden["related_services"]) or \
hidden["related_services"] != open_["related_services"], \
f"Alert {a.id}: field not masked when uninvestigated"
_log(f"Task {tid}: {a.id} masking ✓")
def test_observation_keys():
required = {"id","severity","source_service","alert_type","message","timestamp_offset","is_noise","is_root_cause"}
for tid in (1, 2, 3):
for a in _alerts[tid][:5]:
missing = required - set(a.to_observation().keys())
assert not missing, f"Alert {a.id} missing keys: {missing}"
# ══════════════════════════════════════════════════════════════════════════════
# T9 — Runbook registry
# ══════════════════════════════════════════════════════════════════════════════
def test_runbook_count():
rbs = _reg.get_all()
_log(f"Runbooks: {[r.id for r in rbs]}")
assert len(rbs) == 7, f"Expected 7 runbooks, got {len(rbs)}"
def test_runbook_ids_unique():
ids = _reg.list_ids()
assert len(ids) == len(set(ids)), f"Duplicate runbook IDs: {ids}"
def test_runbook_task1_applicability():
gt = _build_gt(_scenarios[1])
assert _reg.get("rb_db_failover").is_applicable(gt), "rb_db_failover must apply to task1"
def test_runbook_task2_applicability():
gt = _build_gt(_scenarios[2])
assert _reg.get("rb_cache_flush_restart").is_applicable(gt), "rb_cache_flush_restart must apply to task2"
def test_runbook_task3_applicability():
gt = _build_gt(_scenarios[3])
assert _reg.get("rb_storage_volume_remount").is_applicable(gt)
assert _reg.get("rb_ml_model_rollback").is_applicable(gt)
def test_wrong_action_never_applicable():
for tid in (1, 2, 3):
assert not _reg.get("rb_wrong_action").is_applicable(_build_gt(_scenarios[tid])), \
f"Task {tid}: rb_wrong_action must never be applicable"
def test_runbook_effect_heals():
gt = _build_gt(_scenarios[1])
gt["root_services"] = ["payment-db"]
state = SimulatedState(service_health={"payment-db": 0.10, "payment-service": 0.20})
after = _reg.get("rb_db_failover").apply(state, gt)
assert after.service_health["payment-db"] > 0.5, "rb_db_failover should restore payment-db health"
assert "payment-db" in after.stopped_cascades, "rb_db_failover should stop cascade"
# ══════════════════════════════════════════════════════════════════════════════
# T10 — Multi-root scoring (task3)
# ══════════════════════════════════════════════════════════════════════════════
def test_task3_two_root_causes():
sc = _scenarios[3]
assert len(sc.root_cause_alert_ids) == 2, \
f"Task 3 must have 2 root causes, got {sc.root_cause_alert_ids}"
_log(f"Task 3 root causes: {sc.root_cause_alert_ids}")
def test_task3_full_credit():
sc = _scenarios[3]; al = _alerts[3]
r = _grdr.grade(_build_gt(sc), _perfect_actions(sc, al), 3)
_log(f"Task 3 full credit: rc_score={r.root_cause_score:.4f}")
assert r.root_cause_score == 1.0, \
f"Task 3: both RCs identified → expected 1.0, got {r.root_cause_score:.4f}"
def test_task3_partial_credit():
sc = _scenarios[3]; al = _alerts[3]; gt = _build_gt(sc)
acts = []
step = 0
for a in al[:3]:
acts.append({"type": ActionType.INVESTIGATE, "alert_id": a.id, "runbook_id": None, "step": step}); step+=1
# Only the FIRST root cause
acts.append({"type": ActionType.IDENTIFY_ROOT_CAUSE,
"alert_id": sc.root_cause_alert_ids[0], "runbook_id": None, "step": step}); step+=1
for rb in sc.correct_runbook_ids:
acts.append({"type": ActionType.APPLY_RUNBOOK, "alert_id": None, "runbook_id": rb, "step": step}); step+=1
acts.append({"type": ActionType.RESOLVE, "alert_id": None, "runbook_id": None, "step": step})
r = _grdr.grade(gt, acts, 3)
_log(f"Task 3 partial credit: rc_score={r.root_cause_score:.4f}")
assert 0.0 < r.root_cause_score < 1.0, \
f"Task 3: one of two RCs → expected partial (0,1), got {r.root_cause_score:.4f}"
# ══════════════════════════════════════════════════════════════════════════════
# Main
# ══════════════════════════════════════════════════════════════════════════════
def main(fast: bool = False, verbose: bool = False) -> int:
global _verbose
_verbose = verbose
print(f"\n{'═'*65}")
print(f" IncidentMind — Environment Validation Suite (seed={SEED})")
print(f"{'═'*65}")
section("T1 · Determinism")
run_test("T1.1 Scenario determinism (all tasks)", test_determinism_scenario)
run_test("T1.2 Alert sequence determinism", test_determinism_alerts)
run_test("T1.3 Failure propagation determinism", test_determinism_failure_propagation)
run_test("T1.4 Grader output determinism", test_determinism_grader)
section("T2 · Task Constraints")
run_test("T2.1 Alert count within task ranges", test_task_alert_counts)
run_test("T2.2 Noise percentage approximately correct", test_task_noise_percentage)
run_test("T2.3 Red herring counts match spec", test_task_red_herring_counts)
run_test("T2.4 Task definitions metadata sane", test_task_definitions_sane)
section("T3 · Temporal Ordering")
run_test("T3.1 Alerts sorted by timestamp", test_temporal_sort_order)
run_test("T3.2 Root cause fires earliest", test_root_cause_fires_first)
run_test("T3.3 Cascade alerts strictly post-T0", test_cascade_after_root_cause)
run_test("T3.4 Cascade delays monotonically increasing",test_cascade_delays_monotonic)
section("T4 · Noise & Red Herring Flags")
run_test("T4.1 Noise alert flags correct", test_noise_flags_correct)
run_test("T4.2 Root cause flags correct", test_root_cause_flags_correct)
run_test("T4.3 Red herrings are HIGH/CRITICAL", test_red_herring_severity)
run_test("T4.4 Noise alerts not CRITICAL", test_noise_not_critical)
run_test("T4.5 Partial observability masking active", test_partial_observability_masking)
section("T5 · Grader Correctness")
run_test("T5.1 Perfect agent passes all tasks", test_perfect_agent_passes)
run_test("T5.2 Greedy agent penalized", test_greedy_agent_penalized)
run_test("T5.3 Wrong root cause penalized", test_wrong_root_cause_penalized)
run_test("T5.4 Wrong runbook penalized", test_wrong_runbook_penalized)
run_test("T5.5 No-action agent scores 0.0", test_no_action_scores_zero)
run_test("T5.6 GradeResult fields complete & in range", test_grade_result_complete)
section("T6 · Performance")
run_test("T6.1 Full pipeline < 500ms per iteration", test_pipeline_performance, skip=fast)
section("T7 · Service Graph")
run_test("T7.1 12+ services defined", test_service_count)
run_test("T7.2 Graph is valid DAG", test_graph_is_dag)
run_test("T7.3 All 4 tiers represented", test_all_tiers_present)
run_test("T7.4 Criticality/sensitivity in [0,1]", test_criticality_range)
run_test("T7.5 Cascade propagation produces hops", test_cascade_hops)
run_test("T7.6 HealthState transitions correct", test_health_states)
run_test("T7.7 reset_all_health restores scores", test_graph_reset)
section("T8 · Partial Observability")
run_test("T8.1 Investigation reveals hidden fields", test_investigation_reveals_fields)
run_test("T8.2 Observation has all required keys", test_observation_keys)
section("T9 · Runbook Registry")
run_test("T9.1 Exactly 7 runbooks registered", test_runbook_count)
run_test("T9.2 All runbook IDs unique", test_runbook_ids_unique)
run_test("T9.3 rb_db_failover applicable to task1", test_runbook_task1_applicability)
run_test("T9.4 rb_cache_flush applicable to task2", test_runbook_task2_applicability)
run_test("T9.5 Storage+ML runbooks apply to task3", test_runbook_task3_applicability)
run_test("T9.6 rb_wrong_action never applicable", test_wrong_action_never_applicable)
run_test("T9.7 Runbook effect restores service health", test_runbook_effect_heals)
section("T10 · Multi-Root Cause Scoring (Task 3)")
run_test("T10.1 Task 3 has exactly 2 root causes", test_task3_two_root_causes)
run_test("T10.2 Both root causes → score 1.0", test_task3_full_credit)
run_test("T10.3 One of two root causes → partial score", test_task3_partial_credit)
# ── summary ──────────────────────────────────────────────────────────────
n_pass = sum(1 for r in _results if r["status"] == "PASS")
n_fail = sum(1 for r in _results if r["status"] == "FAIL")
n_skip = sum(1 for r in _results if r["status"] == "SKIP")
n_total = len(_results)
print(f"\n{'═'*65}")
print(f" Results: {n_pass}/{n_total} passed", end="")
if n_skip: print(f" ({n_skip} skipped)", end="")
if n_fail:
print(f" \033[91m{n_fail} FAILED\033[0m")
print("\n Failed tests:")
for r in _results:
if r["status"] == "FAIL":
print(f" ✗ {r['name']}")
if r.get("error"):
print(f" → {r['error']}")
else:
print()
print(f"{'═'*65}\n")
if n_fail == 0:
print(" \033[92m✅ ALL TESTS PASSED — IncidentMind env is hackathon-ready.\033[0m\n")
else:
print(" \033[91m❌ SOME TESTS FAILED — review output above.\033[0m\n")
# Don't exit here - let the caller decide
return 0 if n_fail == 0 else 1
# ============================================================================
# ── EXTENDED TEST SUITE (v2 upgrade validation) ─────────────────────────────
# T11 Fingerprint consistency
# T12 Duplicate / burst alert grouping
# T13 Burst alert simulation
# T14 Flapping alert simulation
# T15 Edge cases (zero noise, identical timestamps, alert storm, isolated failure)
# T16 Failure mode propagation & state machine
# ============================================================================
# ── shared v2 fixtures ───────────────────────────────────────────────────────
from envs.service_graph import FailureMode, ServiceState, service_state_from_score
from envs.grader import ActionType as _AT
_v2_scenarios = {tid: IncidentGenerator(seed=SEED).generate(tid) for tid in (1, 2, 3)}
_v2_alerts = {tid: AlertGenerator(seed=SEED).generate(_v2_scenarios[tid])
for tid in (1, 2, 3)}
def _v2_perfect_actions(sc, al):
"""Perfect agent that also correctly deduplicates burst alerts."""
acts, step = [], 0
for a in al[:3]:
acts.append({"type": _AT.INVESTIGATE, "alert_id": a.id, "runbook_id": None, "step": step}); step += 1
# Investigate burst alerts before acting
for a in al:
if a.is_burst:
acts.append({"type": _AT.INVESTIGATE, "alert_id": a.id, "runbook_id": None, "step": step}); step += 1
for rc in sc.root_cause_alert_ids:
acts.append({"type": _AT.IDENTIFY_ROOT_CAUSE, "alert_id": rc, "runbook_id": None, "step": step}); step += 1
# Deduplicate burst alerts
for bid in sc.burst_alert_ids:
canon_id = sc.root_cause_alert_ids[0] if sc.root_cause_alert_ids else rc
acts.append({"type": _AT.DEDUPLICATE_ALERT, "alert_id": bid,
"canonical_id": canon_id, "runbook_id": None, "step": step}); step += 1
# Group burst alerts
if sc.burst_alert_ids:
group_ids = [sc.root_cause_alert_ids[0]] + sc.burst_alert_ids
acts.append({"type": _AT.GROUP_ALERTS, "alert_ids": group_ids,
"group_label": sc.duplicate_group_map.get(sc.root_cause_alert_ids[0], "burst_group"),
"runbook_id": None, "step": step}); step += 1
for rb in sc.correct_runbook_ids:
acts.append({"type": _AT.APPLY_RUNBOOK, "alert_id": None, "runbook_id": rb, "step": step}); step += 1
for nid in sc.noise_alert_ids:
acts.append({"type": _AT.DISMISS_NOISE, "alert_id": nid, "runbook_id": None, "step": step}); step += 1
acts.append({"type": _AT.RESOLVE, "alert_id": None, "runbook_id": None, "step": step})
return acts
def _v2_build_gt(sc):
root_svcs = sc.metadata.get("root_services", [sc.metadata.get("root_service", "")])
if isinstance(root_svcs, str):
root_svcs = [root_svcs]
return {
"task_id": sc.task_id,
"root_cause_alert_ids": sc.root_cause_alert_ids,
"cascade_chain": [{"alert_id": s.alert_id} for s in sc.cascade_chain],
"noise_alert_ids": sc.noise_alert_ids,
"red_herring_alert_ids": sc.red_herring_alert_ids,
"correct_runbook_ids": sc.correct_runbook_ids,
"involved_services": sc.involved_services,
"root_services": root_svcs,
"alerts_by_service": {},
"burst_alert_ids": sc.burst_alert_ids,
"duplicate_group_map": sc.duplicate_group_map,
}
# ============================================================================
# T11 — Fingerprint consistency
# ============================================================================
def test_fingerprint_stable_across_seeds():
"""Same (service, alert_type, failure_mode) → same fingerprint_id regardless of seed."""
from envs.alert_generator import _make_fingerprint
for svc in ("payment-db", "redis-cache", "storage-node", "ml-inference"):
for fmode in FailureMode:
fp1 = _make_fingerprint(svc, "service_down", fmode)
fp2 = _make_fingerprint(svc, "service_down", fmode)
assert fp1 == fp2, f"{svc}/{fmode}: fingerprint not stable"
assert fp1.startswith("fp_"), f"{svc}/{fmode}: fingerprint format wrong"
_log(f"{svc}/{fmode.value}: {fp1}")
def test_fingerprint_unique_per_service():
"""Different services must produce different fingerprints for same alert_type."""
from envs.alert_generator import _make_fingerprint
seen = {}
for svc in ("payment-db", "redis-cache", "storage-node", "ml-inference", "api-gateway"):
fp = _make_fingerprint(svc, "service_down", FailureMode.TIMEOUT)
assert fp not in seen.values(), f"Fingerprint collision: {svc} matches {[k for k,v in seen.items() if v==fp]}"
seen[svc] = fp
def test_fingerprint_unique_per_failure_mode():
"""Same service with different failure modes → different fingerprints."""
from envs.alert_generator import _make_fingerprint
fps = {fmode: _make_fingerprint("payment-db", "service_down", fmode)
for fmode in FailureMode}
assert len(set(fps.values())) == len(FailureMode), \
"Some failure modes produce identical fingerprints for same service"
def test_alert_fingerprint_present():
"""Every generated alert must have a non-empty fingerprint_id and group_key."""
for tid in (1, 2, 3):
for a in _v2_alerts[tid]:
assert a.fingerprint_id, f"Alert {a.id}: fingerprint_id is empty"
assert a.group_key, f"Alert {a.id}: group_key is empty"
assert a.fingerprint_id.startswith("fp_"), \
f"Alert {a.id}: fingerprint_id format '{a.fingerprint_id}' unexpected"
_log(f"Alert {a.id}: fp={a.fingerprint_id} gk={a.group_key}")
def test_burst_alerts_share_fingerprint_with_canonical():
"""Burst alerts must share fingerprint_id and group_key with their canonical parent."""
for tid in (1, 2, 3):
sc = _v2_scenarios[tid]
al = _v2_alerts[tid]
alert_map = {a.id: a for a in al}
if not sc.burst_alert_ids:
_log(f"Task {tid}: no burst alerts — skip")
continue
canonical_id = sc.root_cause_alert_ids[0]
if canonical_id not in alert_map:
continue
canonical = alert_map[canonical_id]
for bid in sc.burst_alert_ids:
if bid not in alert_map:
continue
burst = alert_map[bid]
assert burst.fingerprint_id == canonical.fingerprint_id, \
f"Task {tid}: burst {bid} fp={burst.fingerprint_id} != canonical fp={canonical.fingerprint_id}"
assert burst.group_key == canonical.group_key, \
f"Task {tid}: burst {bid} gk={burst.group_key} != canonical gk={canonical.group_key}"
_log(f"Task {tid}: burst {bid} shares fp/gk with canonical ✓")
# ============================================================================
# T12 — Duplicate / burst alert grouping
# ============================================================================
def test_duplicate_group_map_present():
"""Every scenario with burst alerts must have a populated duplicate_group_map."""
for tid in (1, 2, 3):
sc = _v2_scenarios[tid]
if sc.burst_alert_ids:
assert sc.duplicate_group_map, \
f"Task {tid}: burst_alert_ids non-empty but duplicate_group_map is empty"
for bid in sc.burst_alert_ids:
assert bid in sc.duplicate_group_map, \
f"Task {tid}: burst alert {bid} not in duplicate_group_map"
_log(f"Task {tid}: dup_map={sc.duplicate_group_map}")
def test_burst_alerts_in_all_tasks():
"""Every task must define at least 1 burst alert (production systems always burst)."""
for tid in (1, 2, 3):
sc = _v2_scenarios[tid]
assert len(sc.burst_alert_ids) >= 1, \
f"Task {tid}: expected >=1 burst alert, got {len(sc.burst_alert_ids)}"
_log(f"Task {tid}: {len(sc.burst_alert_ids)} burst alerts: {sc.burst_alert_ids}")
def test_burst_occurrence_count_increments():
"""Burst alerts must have occurrence_count > 1; canonical must be 1."""
for tid in (1, 2, 3):
sc = _v2_scenarios[tid]
al = _v2_alerts[tid]
alert_map = {a.id: a for a in al}
for bid in sc.burst_alert_ids:
if bid not in alert_map:
continue
burst = alert_map[bid]
assert burst.occurrence_count > 1, \
f"Task {tid}: burst {bid} has occurrence_count={burst.occurrence_count}, expected >1"
assert burst.is_burst, \
f"Task {tid}: burst {bid} has is_burst=False"
# Canonical root cause must have occurrence_count == 1
for rc_id in sc.root_cause_alert_ids:
if rc_id in alert_map:
assert alert_map[rc_id].occurrence_count == 1, \
f"Task {tid}: canonical RC {rc_id} has occurrence_count != 1"
_log(f"Task {tid}: burst occurrence_count verified ✓")
def test_dedup_score_perfect_agent():
"""Perfect agent with correct DEDUPLICATE_ALERT actions should score dedup_score near 1.0."""
for tid in (1, 2, 3):
sc = _v2_scenarios[tid]
al = _v2_alerts[tid]
gt = _v2_build_gt(sc)
acts = _v2_perfect_actions(sc, al)
result = _grdr.grade(gt, acts, tid)
_log(f"Task {tid}: dedup_score={result.dedup_score:.4f}")
assert result.dedup_score >= 0.5, \
f"Task {tid}: perfect agent dedup_score={result.dedup_score:.4f} < 0.5"
def test_dedup_score_penalises_burst_as_root_cause():
"""Marking a burst duplicate as root cause must reduce dedup_score."""
for tid in (1, 2, 3):
sc = _v2_scenarios[tid]
al = _v2_alerts[tid]
gt = _v2_build_gt(sc)
if not sc.burst_alert_ids:
continue
# Agent marks the burst duplicate as root cause
acts = []
step = 0
acts.append({"type": _AT.INVESTIGATE, "alert_id": al[0].id,
"runbook_id": None, "step": step}); step += 1
acts.append({"type": _AT.IDENTIFY_ROOT_CAUSE, "alert_id": sc.burst_alert_ids[0],
"runbook_id": None, "step": step}); step += 1
for rb in sc.correct_runbook_ids:
acts.append({"type": _AT.APPLY_RUNBOOK, "alert_id": None,
"runbook_id": rb, "step": step}); step += 1
acts.append({"type": _AT.RESOLVE, "alert_id": None, "runbook_id": None, "step": step})
result = _grdr.grade(gt, acts, tid)
_log(f"Task {tid}: burst-as-RC → dedup_score={result.dedup_score:.4f}")
# When burst marked as RC the root_cause_score is also reduced;
# dedup_score should be below the perfect agent's score
perfect_acts = _v2_perfect_actions(sc, al)
perfect_result = _grdr.grade(gt, perfect_acts, tid)
assert result.dedup_score <= perfect_result.dedup_score, \
f"Task {tid}: burst-as-RC dedup_score not penalised"
# ============================================================================
# T13 — Burst alert simulation
# ============================================================================
def test_burst_alerts_present_in_generated_list():
"""Generated alert list must contain all burst alert IDs from scenario."""
for tid in (1, 2, 3):
sc = _v2_scenarios[tid]
al = _v2_alerts[tid]
alert_ids = {a.id for a in al}
for bid in sc.burst_alert_ids:
assert bid in alert_ids, \
f"Task {tid}: burst alert {bid} not found in generated alerts"
_log(f"Task {tid}: all {len(sc.burst_alert_ids)} burst alerts present ✓")
def test_burst_alerts_near_root_cause_timestamp():
"""Burst alerts must fire within 5 seconds of T=0 (rapid-fire window)."""
for tid in (1, 2, 3):
sc = _v2_scenarios[tid]
al = _v2_alerts[tid]
alert_map = {a.id: a for a in al}
for bid in sc.burst_alert_ids:
if bid not in alert_map:
continue
ts = alert_map[bid].timestamp_offset
assert ts <= 5.0, \
f"Task {tid}: burst alert {bid} at T={ts:.1f}s > 5s rapid-fire window"
assert ts > 0.0, \
f"Task {tid}: burst alert {bid} at T={ts} <= 0 (must be after root cause)"
_log(f"Task {tid}: burst timing verified ✓")
def test_burst_determinism():
"""Same seed must produce identical burst alert IDs and timestamps."""
for tid in (1, 2, 3):
sc_a = IncidentGenerator(seed=SEED).generate(tid)
sc_b = IncidentGenerator(seed=SEED).generate(tid)
assert sc_a.burst_alert_ids == sc_b.burst_alert_ids, \
f"Task {tid}: burst_alert_ids not deterministic"
al_a = AlertGenerator(seed=SEED).generate(sc_a)
al_b = AlertGenerator(seed=SEED).generate(sc_b)
ts_a = {a.id: a.timestamp_offset for a in al_a if a.is_burst}
ts_b = {a.id: a.timestamp_offset for a in al_b if a.is_burst}
assert ts_a == ts_b, f"Task {tid}: burst timestamps not deterministic"
_log(f"Task {tid}: burst determinism ✓")
# ============================================================================
# T14 — Flapping alert simulation
# ============================================================================
def test_flapping_alerts_generate_three_events():
"""Each flapping_alert_id must produce: fire, clear, refire (3 events per flap)."""
for tid in (1, 2, 3):
sc = _v2_scenarios[tid]
al = _v2_alerts[tid]
for fid in sc.flapping_alert_ids:
flap_events = [a for a in al
if a.id in (fid, f"{fid}_clear", f"{fid}_refire")]
assert len(flap_events) == 3, \
(f"Task {tid}: flapping {fid} expected 3 events "
f"(fire/clear/refire), got {len(flap_events)}: {[e.id for e in flap_events]}")
types = {a.id.split("_")[-1] if "_" in a.id else "fire" for a in flap_events}
_log(f"Task {tid}: flap {fid} events: {[e.id for e in flap_events]}")
def test_flapping_clear_event_lower_severity():
"""The CLEAR event of a flapping alert must be LOW severity."""
for tid in (1, 2, 3):
sc = _v2_scenarios[tid]
al = _v2_alerts[tid]
for fid in sc.flapping_alert_ids:
clear_alerts = [a for a in al if a.id == f"{fid}_clear"]
if not clear_alerts:
continue
clear = clear_alerts[0]
assert clear.severity == AlertSeverity.LOW, \
f"Task {tid}: clear event severity={clear.severity}, expected LOW"
assert clear.alert_type == "clear", \
f"Task {tid}: clear event alert_type={clear.alert_type}, expected 'clear'"
def test_flapping_refire_higher_severity_than_clear():
"""The re-fire event must be higher severity than the CLEAR event."""
for tid in (1, 2, 3):
sc = _v2_scenarios[tid]
al = _v2_alerts[tid]
sev_rank = {AlertSeverity.CRITICAL: 0, AlertSeverity.HIGH: 1,
AlertSeverity.MEDIUM: 2, AlertSeverity.LOW: 3}
for fid in sc.flapping_alert_ids:
clear_list = [a for a in al if a.id == f"{fid}_clear"]
refire_list = [a for a in al if a.id == f"{fid}_refire"]
if not clear_list or not refire_list:
continue
clear_rank = sev_rank[clear_list[0].severity]
refire_rank = sev_rank[refire_list[0].severity]
assert refire_rank < clear_rank, \
(f"Task {tid}: refire sev={refire_list[0].severity} "
f"not higher than clear sev={clear_list[0].severity}")
def test_flapping_temporal_order():
"""fire.ts < clear.ts < refire.ts must hold for every flapping alert."""
for tid in (1, 2, 3):
sc = _v2_scenarios[tid]
al = _v2_alerts[tid]
alert_map = {a.id: a for a in al}
for fid in sc.flapping_alert_ids:
fire = alert_map.get(fid)
clear = alert_map.get(f"{fid}_clear")
refire = alert_map.get(f"{fid}_refire")
if not all([fire, clear, refire]):
continue
assert fire.timestamp_offset < clear.timestamp_offset, \
f"Task {tid}: {fid} fire.ts >= clear.ts"
assert clear.timestamp_offset < refire.timestamp_offset, \
f"Task {tid}: {fid} clear.ts >= refire.ts"
_log(f"Task {tid}: {fid} fire={fire.timestamp_offset:.1f} "
f"clear={clear.timestamp_offset:.1f} "
f"refire={refire.timestamp_offset:.1f}")
def test_flapping_share_fingerprint():
"""Fire, clear, and refire events of a flapping alert must share fingerprint_id."""
for tid in (1, 2, 3):
sc = _v2_scenarios[tid]
al = _v2_alerts[tid]
alert_map = {a.id: a for a in al}
for fid in sc.flapping_alert_ids:
fire = alert_map.get(fid)
clear = alert_map.get(f"{fid}_clear")
refire = alert_map.get(f"{fid}_refire")
if not all([fire, clear, refire]):
continue
assert fire.fingerprint_id == clear.fingerprint_id == refire.fingerprint_id, \
f"Task {tid}: flapping events have different fingerprints"
assert fire.group_key == clear.group_key == refire.group_key, \
f"Task {tid}: flapping events have different group_keys"
# ============================================================================
# T15 — Edge cases
# ============================================================================
def test_edge_zero_noise_task1():
"""Task 1 is the zero-noise scenario — noise lists must be empty."""
sc = _v2_scenarios[1]
al = _v2_alerts[1]
assert len(sc.noise_alert_ids) == 0, \
f"Task 1: noise_alert_ids should be empty, got {sc.noise_alert_ids}"
noise_in_alerts = [a for a in al if a.is_noise]
assert len(noise_in_alerts) == 0, \
f"Task 1: {len(noise_in_alerts)} noise alerts found in generated list"
_log("Task 1: zero-noise verified ✓")
def test_edge_identical_timestamps():
"""
Task 2 red herring fires at T≈0 alongside root cause.
Multiple alerts at the same timestamp must all be present and ordered stably
(CRITICAL before HIGH).
"""
sc = _v2_scenarios[2]
al = _v2_alerts[2]
# Find alerts at T=0 or very close
early_alerts = [a for a in al if a.timestamp_offset <= 10.0]
assert len(early_alerts) >= 2, \
(f"Task 2: expected >=2 alerts near T=0 (root + red herring), "
f"got {len(early_alerts)}: {[(a.id, a.timestamp_offset) for a in early_alerts]}")
# Verify sort stability: among ties, CRITICAL before HIGH
for i in range(len(early_alerts) - 1):
a, b = early_alerts[i], early_alerts[i + 1]
if a.timestamp_offset == b.timestamp_offset:
rank = {AlertSeverity.CRITICAL: 0, AlertSeverity.HIGH: 1,
AlertSeverity.MEDIUM: 2, AlertSeverity.LOW: 3}
assert rank[a.severity] <= rank[b.severity], \
(f"Task 2: timestamp tie {a.id}({a.severity}) before "
f"{b.id}({b.severity}) — sort not stable")
_log(f"Task 2: {len(early_alerts)} early alerts, sort-stable ✓")
def test_edge_isolated_failure_no_cascade():
"""
metrics-collector has no downstream dependents (isolated service).
simulate_failure_impact must return exactly 1 hop (root only).
"""
sg = ServiceGraph()
rng = np.random.RandomState(SEED)
hops = sg.simulate_failure_impact("metrics-collector", rng)
assert len(hops) == 1, \
(f"metrics-collector is isolated; expected 1 hop, "
f"got {len(hops)}: {[h.service for h in hops]}")
assert hops[0].service == "metrics-collector"
assert sg.is_isolated_failure("metrics-collector"), \
"is_isolated_failure('metrics-collector') should return True"
_log("Isolated failure (metrics-collector) → 1 hop ✓")
def test_edge_alert_storm_100_plus():
"""
Generating a synthetic storm of 120+ alerts must complete without error
and maintain correct field structure on every alert.
"""
from envs.incident_generator import IncidentScenario, CascadeStage
# Build a synthetic scenario with 100 noise IDs
storm_sc = IncidentScenario(
task_id=3,
scenario_name="storm_test",
root_cause_alert_ids=["storm_rc_001"],
cascade_chain=[
CascadeStage("storm_cs_001", "api-gateway", 5.0, 0.5),
],
involved_services=["payment-db", "api-gateway"],
noise_alert_ids=[f"storm_noise_{i:03d}" for i in range(100)],
red_herring_alert_ids=[],
correct_runbook_ids=["rb_db_failover"],
metadata={"root_service": "payment-db"},
)
ag = AlertGenerator(seed=SEED)
al = ag.generate(storm_sc)
assert len(al) >= 100, f"Storm scenario: expected >=100 alerts, got {len(al)}"
required_keys = {"id", "severity", "source_service", "alert_type",
"message", "timestamp_offset", "fingerprint_id", "group_key"}
for a in al[:10]: # spot-check first 10
obs = a.to_observation()
missing = required_keys - set(obs.keys())
assert not missing, f"Storm alert {a.id} missing keys: {missing}"
_log(f"Alert storm: {len(al)} alerts generated, spot-check passed ✓")
def test_edge_all_services_degraded_detection():
"""ServiceGraph.all_services_degraded() must return True after mass damage."""
sg = ServiceGraph()
rng = np.random.RandomState(SEED)
# Damage every service enough to degrade
for svc in sg.get_all_services():
sg.get_service(svc).apply_damage(0.30)
assert sg.all_services_degraded(), \
"all_services_degraded() should return True after mass damage"
sg.reset_all_health()
assert not sg.all_services_degraded(), \
"all_services_degraded() should return False after reset"
_log("all_services_degraded detection ✓")
def test_edge_missing_metadata_fields_graceful():
"""
Grader must handle ground_truth with missing optional v2 fields gracefully
(backward-compat: burst_alert_ids and duplicate_group_map absent).
"""
sc = _v2_scenarios[1]
al = _v2_alerts[1]
# Deliberately omit v2 fields
gt_minimal = {
"task_id": 1,
"root_cause_alert_ids": sc.root_cause_alert_ids,
"cascade_chain": [{"alert_id": s.alert_id} for s in sc.cascade_chain],
"noise_alert_ids": sc.noise_alert_ids,
"red_herring_alert_ids": sc.red_herring_alert_ids,
"correct_runbook_ids": sc.correct_runbook_ids,
"involved_services": sc.involved_services,
"root_services": ["payment-db"],
"alerts_by_service": {},
# burst_alert_ids and duplicate_group_map intentionally omitted
}
acts = _perfect_actions(sc, al)
try:
result = _grdr.grade(gt_minimal, acts, 1)
assert result.total_score >= 0.0
_log(f"Missing v2 fields handled gracefully, score={result.total_score:.4f} ✓")
except Exception as exc:
assert False, f"Grader raised on missing v2 fields: {exc}"
# ============================================================================
# T16 — Failure mode propagation & state machine
# ============================================================================
def test_all_failure_modes_propagate():
"""Every FailureMode must produce at least 1 cascade hop for high-sensitivity roots."""
for fmode in FailureMode:
sg = ServiceGraph()
rng = np.random.RandomState(SEED)
hops = sg.simulate_failure_impact("redis-cache", rng, failure_mode=fmode)
assert len(hops) >= 1, f"{fmode.value}: expected >=1 cascade hop"
assert hops[0].service == "redis-cache", f"{fmode.value}: first hop must be root"
assert hops[0].failure_mode == fmode, \
f"{fmode.value}: hop failure_mode mismatch {hops[0].failure_mode}"
_log(f"{fmode.value}: {len(hops)} hops")
def test_network_partition_bypasses_sensitivity():
"""NETWORK_PARTITION must affect MORE services than TIMEOUT from the same root."""
results = {}
for fmode in (FailureMode.TIMEOUT, FailureMode.NETWORK_PARTITION):
sg = ServiceGraph()
rng = np.random.RandomState(SEED)
hops = sg.simulate_failure_impact("redis-cache", rng, failure_mode=fmode)
results[fmode] = len(hops)
_log(f"{fmode.value}: {len(hops)} hops")
assert results[FailureMode.NETWORK_PARTITION] >= results[FailureMode.TIMEOUT], \
("NETWORK_PARTITION should affect >= as many services as TIMEOUT "
f"(partition={results[FailureMode.NETWORK_PARTITION]}, "
f"timeout={results[FailureMode.TIMEOUT]})")
def test_memory_leak_lower_initial_damage():
"""MEMORY_LEAK root hop must leave root service with higher health than TIMEOUT."""
for root in ("redis-cache", "storage-node"):
sg_timeout = ServiceGraph()
sg_leak = ServiceGraph()
rng_t = np.random.RandomState(SEED)
rng_l = np.random.RandomState(SEED)
hops_t = sg_timeout.simulate_failure_impact(root, rng_t, failure_mode=FailureMode.TIMEOUT)
hops_l = sg_leak.simulate_failure_impact(root, rng_l, failure_mode=FailureMode.MEMORY_LEAK)
health_timeout = hops_t[0].health_score_after
health_leak = hops_l[0].health_score_after
_log(f"{root}: timeout health={health_timeout:.3f}, leak health={health_leak:.3f}")
assert health_leak > health_timeout, \
(f"{root}: MEMORY_LEAK should leave higher health than TIMEOUT "
f"(leak={health_leak:.3f}, timeout={health_timeout:.3f})")
def test_service_state_machine_transitions():
"""ServiceState transitions: HEALTHY → DEGRADED → FAILING → RECOVERING."""
from envs.service_graph import service_state_from_score
assert service_state_from_score(1.0) == ServiceState.HEALTHY
assert service_state_from_score(0.9) == ServiceState.HEALTHY
assert service_state_from_score(0.79) == ServiceState.DEGRADED
assert service_state_from_score(0.5) == ServiceState.DEGRADED
assert service_state_from_score(0.39) == ServiceState.FAILING
assert service_state_from_score(0.0) == ServiceState.FAILING
# RECOVERING needs recovering=True flag
assert service_state_from_score(0.5, recovering=True) == ServiceState.RECOVERING
assert service_state_from_score(0.9, recovering=True) == ServiceState.RECOVERING
assert service_state_from_score(0.35, recovering=True) == ServiceState.FAILING # still too low
_log("State machine transitions ✓")
def test_tick_recovery_increments_health():
"""tick_recovery() must increment service health_score."""
sg = ServiceGraph()
rng = np.random.RandomState(SEED)
sg.simulate_failure_impact("redis-cache", rng)
health_before = sg.get_service("auth-service").health_score
if health_before < 1.0:
new_state = sg.tick_recovery("auth-service", recovery_per_tick=0.10)
health_after = sg.get_service("auth-service").health_score
assert health_after > health_before, \
f"tick_recovery did not increase health: {health_before}{health_after}"
assert new_state == ServiceState.RECOVERING, \
f"Expected RECOVERING state after tick, got {new_state}"
_log(f"tick_recovery: {health_before:.3f}{health_after:.3f}, state={new_state} ✓")
def test_scenario_failure_mode_set():
"""Each scenario must have a FailureMode that is not None."""
for tid in (1, 2, 3):
sc = _v2_scenarios[tid]
assert sc.failure_mode is not None, f"Task {tid}: failure_mode is None"
assert isinstance(sc.failure_mode, FailureMode), \
f"Task {tid}: failure_mode is not a FailureMode instance"
_log(f"Task {tid}: failure_mode={sc.failure_mode.value}")
def test_cascade_stage_failure_mode_set():
"""Every CascadeStage must carry a FailureMode."""
for tid in (1, 2, 3):
sc = _v2_scenarios[tid]
for stage in sc.cascade_chain:
assert stage.failure_mode is not None, \
f"Task {tid}: CascadeStage {stage.alert_id} has None failure_mode"
assert isinstance(stage.failure_mode, FailureMode), \
f"Task {tid}: CascadeStage {stage.alert_id} failure_mode wrong type"
# ============================================================================
# Extended runner (append to main)
# ============================================================================
def _run_extended(fast: bool = False, verbose: bool = False) -> int:
global _verbose
_verbose = verbose
section("T11 · Fingerprint Consistency")
run_test("T11.1 Fingerprint stable across seeds", test_fingerprint_stable_across_seeds)
run_test("T11.2 Fingerprint unique per service", test_fingerprint_unique_per_service)
run_test("T11.3 Fingerprint unique per failure mode", test_fingerprint_unique_per_failure_mode)
run_test("T11.4 All alerts have non-empty fingerprint+gk", test_alert_fingerprint_present)
run_test("T11.5 Burst alerts share fp/gk with canonical", test_burst_alerts_share_fingerprint_with_canonical)
section("T12 · Duplicate / Burst Alert Grouping")
run_test("T12.1 duplicate_group_map present when bursts exist", test_duplicate_group_map_present)
run_test("T12.2 Every task has >= 1 burst alert", test_burst_alerts_in_all_tasks)
run_test("T12.3 Burst occurrence_count increments correctly", test_burst_occurrence_count_increments)
run_test("T12.4 Perfect agent dedup_score >= 0.5", test_dedup_score_perfect_agent)
run_test("T12.5 Burst-as-root-cause penalised in dedup_score", test_dedup_score_penalises_burst_as_root_cause)
section("T13 · Burst Alert Simulation")
run_test("T13.1 Burst alerts present in generated list", test_burst_alerts_present_in_generated_list)
run_test("T13.2 Burst alerts within 5s rapid-fire window", test_burst_alerts_near_root_cause_timestamp)
run_test("T13.3 Burst timestamps deterministic", test_burst_determinism)
section("T14 · Flapping Alert Simulation")
run_test("T14.1 Each flap produces fire+clear+refire", test_flapping_alerts_generate_three_events)
run_test("T14.2 CLEAR event has LOW severity", test_flapping_clear_event_lower_severity)
run_test("T14.3 Refire severity > clear severity", test_flapping_refire_higher_severity_than_clear)
run_test("T14.4 fire.ts < clear.ts < refire.ts", test_flapping_temporal_order)
run_test("T14.5 Flapping events share fingerprint+gk", test_flapping_share_fingerprint)
section("T15 · Edge Cases")
run_test("T15.1 Zero-noise scenario (task 1)", test_edge_zero_noise_task1)
run_test("T15.2 Identical timestamps sorted stably", test_edge_identical_timestamps)
run_test("T15.3 Isolated failure → 1 hop only", test_edge_isolated_failure_no_cascade)
run_test("T15.4 Alert storm (100+ alerts) handled", test_edge_alert_storm_100_plus)
run_test("T15.5 all_services_degraded detection", test_edge_all_services_degraded_detection)
run_test("T15.6 Missing v2 fields handled gracefully", test_edge_missing_metadata_fields_graceful)
section("T16 · Failure Mode Propagation & State Machine")
run_test("T16.1 All failure modes produce cascade hops", test_all_failure_modes_propagate)
run_test("T16.2 NETWORK_PARTITION bypasses sensitivity", test_network_partition_bypasses_sensitivity)
run_test("T16.3 MEMORY_LEAK lower initial damage than TIMEOUT", test_memory_leak_lower_initial_damage)
run_test("T16.4 ServiceState machine transitions correct", test_service_state_machine_transitions)
run_test("T16.5 tick_recovery increments health", test_tick_recovery_increments_health)
run_test("T16.6 Scenario failure_mode populated", test_scenario_failure_mode_set)
run_test("T16.7 CascadeStage failure_mode populated", test_cascade_stage_failure_mode_set)
# summary
n_pass = sum(1 for r in _results if r["status"] == "PASS")
n_fail = sum(1 for r in _results if r["status"] == "FAIL")
n_skip = sum(1 for r in _results if r["status"] == "SKIP")
n_total = len(_results)
print(f"\n{'═'*65}")
print(f" Grand Total: {n_pass}/{n_total} passed", end="")
if n_skip: print(f" ({n_skip} skipped)", end="")
if n_fail:
print(f" \033[91m{n_fail} FAILED\033[0m")
print("\n Failed tests:")
for r in _results:
if r["status"] == "FAIL":
print(f" ✗ {r['name']}")
if r.get("error"):
print(f" → {r['error']}")
else:
print()
print(f"{'═'*65}\n")
if n_fail == 0:
print(" \033[92m✅ ALL TESTS PASSED (v1 + v2) — IncidentMind env is production-grade.\033[0m\n")
else:
print(" \033[91m❌ SOME TESTS FAILED — review output above.\033[0m\n")
return 0 if n_fail == 0 else 1
# Patch the original main to also run extended tests
_original_main = main
def main(fast: bool = False, verbose: bool = False) -> int:
rc = _original_main(fast=fast, verbose=verbose)
rc2 = _run_extended(fast=fast, verbose=verbose)
return 0 if (rc == 0 and rc2 == 0) else 1
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="IncidentMind full validation suite (v1+v2)")
parser.add_argument("--fast", action="store_true", help="Skip performance test")
parser.add_argument("-v","--verbose", action="store_true", help="Show sub-step detail")
args = parser.parse_args()
sys.exit(main(fast=args.fast, verbose=args.verbose))