atlasops / tests /test_bench_runner.py
Harikishanth R
fix: skip-kubectl + scroll + health — HF Space ready
7e9a520
"""Tests for the benchmark runner — reward contract, scoring, comparison table."""
import json
import math
import pytest
from pathlib import Path
# ── Reward contract ────────────────────────────────────────────────────────────
class TestRewardContract:
def _episode(self, **kwargs):
base = {
"tier": "single_fault", "resolved": True, "outcome": "resolved",
"total_turns": 10, "time_to_resolve_s": 120,
"judge": {"reasoning": 0.8, "correctness": 0.9, "efficiency": 0.85},
"postmortem_path": "docs/postmortems/test.md",
}
base.update(kwargs)
return base
def test_perfect_episode_near_one(self):
from bench.runner import _evaluate_episode_reward
ep = self._episode(total_turns=5, time_to_resolve_s=60)
r = _evaluate_episode_reward(ep)
assert r["total"] > 0.85
def test_unresolved_episode_lower(self):
from bench.runner import _evaluate_episode_reward
ep = self._episode(resolved=False, outcome="unknown",
judge={"reasoning": 0.3, "correctness": 0.4, "efficiency": 0.5})
r = _evaluate_episode_reward(ep)
assert r["total"] < 0.5
def test_command_spam_penalty_applied(self):
from bench.runner import _evaluate_episode_reward
ep_normal = self._episode(total_turns=10)
ep_spam = self._episode(total_turns=50)
r_normal = _evaluate_episode_reward(ep_normal)
r_spam = _evaluate_episode_reward(ep_spam)
assert r_spam["total"] < r_normal["total"]
assert r_spam["penalties"]["command_spam"] == pytest.approx(0.10)
def test_false_resolution_penalty(self):
from bench.runner import _evaluate_episode_reward
ep = self._episode(resolved=False, outcome="resolved")
r = _evaluate_episode_reward(ep)
assert r["penalties"]["false_resolution"] == pytest.approx(0.25)
def test_unsafe_shortcut_penalty(self):
from bench.runner import _evaluate_episode_reward
ep = self._episode(judge={"reasoning": 0.8, "correctness": 0.9, "efficiency": 0.1})
r = _evaluate_episode_reward(ep)
assert r["penalties"]["unsafe_shortcut"] == pytest.approx(0.20)
def test_no_postmortem_reduces_comms(self):
from bench.runner import _evaluate_episode_reward
ep_with = self._episode(postmortem_path="some/path.md")
ep_without = self._episode(postmortem_path=None)
r_with = _evaluate_episode_reward(ep_with)
r_without = _evaluate_episode_reward(ep_without)
assert r_with["total"] > r_without["total"]
def test_cascade_tier_weights_evidence_higher(self):
from bench.runner import _evaluate_episode_reward
ep_sf = self._episode(tier="single_fault")
ep_cs = self._episode(tier="cascade")
r_sf = _evaluate_episode_reward(ep_sf)
r_cs = _evaluate_episode_reward(ep_cs)
# cascade weights r_evidence at 0.25 vs single_fault at 0.25 too — both same
# but cascade weights r_resolve lower (0.30 vs 0.35) so a resolved episode
# scores slightly differently
assert isinstance(r_sf["total"], float) and isinstance(r_cs["total"], float)
def test_adversarial_penalties_amplified(self):
from bench.runner import _evaluate_episode_reward
ep_sf = self._episode(tier="single_fault", total_turns=50)
ep_adv = self._episode(tier="adversarial", total_turns=50)
r_sf = _evaluate_episode_reward(ep_sf)
r_adv = _evaluate_episode_reward(ep_adv)
assert r_adv["penalty_total"] > r_sf["penalty_total"]
def test_reward_always_clamped_01(self):
from bench.runner import _evaluate_episode_reward
# Worst possible episode
ep = self._episode(
resolved=False, outcome="resolved", total_turns=100,
time_to_resolve_s=9999, postmortem_path=None,
judge={"reasoning": 0.0, "correctness": 0.0, "efficiency": 0.0},
)
r = _evaluate_episode_reward(ep)
assert 0.0 <= r["total"] <= 1.0
def test_speed_score_logistic(self):
from bench.runner import _bounded_speed_score
# Fast resolution should score high
assert _bounded_speed_score(60, "single_fault") > 0.8
# Slow resolution should score low
assert _bounded_speed_score(600, "single_fault") < 0.2
# Midpoint ~150s for single_fault → score near 0.5
score_mid = _bounded_speed_score(150, "single_fault")
assert 0.4 < score_mid < 0.6
class TestFrozenScenarioList:
def test_frozen_scenarios_count(self):
from bench.runner import FROZEN_SCENARIOS
assert len(FROZEN_SCENARIOS) >= 28
def test_all_tiers_represented(self):
from bench.runner import FROZEN_SCENARIOS
tiers = {s.split("/")[0] for s in FROZEN_SCENARIOS}
assert "single_fault" in tiers
assert "cascade" in tiers
assert "named_replays" in tiers
assert "multi_fault" in tiers
def test_named_replays_count(self):
from bench.runner import FROZEN_SCENARIOS
replays = [s for s in FROZEN_SCENARIOS if "named_replays" in s]
assert len(replays) == 10
def test_no_duplicate_scenarios(self):
from bench.runner import FROZEN_SCENARIOS
assert len(FROZEN_SCENARIOS) == len(set(FROZEN_SCENARIOS))
class TestComparisonTable:
def test_summary_has_required_keys(self, tmp_path):
from bench.runner import compute_summary
results = [
{"scenario_id": "sf-001", "tier": "single_fault", "status": "ok",
"resolved": True, "outcome": "resolved", "time_to_resolve_s": 120,
"total_turns": 8, "judge": {"overall": 0.8}, "postmortem_path": "p.md",
"reward_contract": {"total": 0.75}},
]
summary = compute_summary(results, "test_tag", "test_model")
for key in ("tag", "model", "resolution_rate", "avg_reward", "cascade_resolution_rate",
"named_replay_resolution_rate", "per_tier"):
assert key in summary, f"Missing key: {key}"
def test_resolution_rate_calculation(self, tmp_path):
from bench.runner import compute_summary
results = [
{"scenario_id": f"s{i}", "tier": "single_fault", "status": "ok",
"resolved": i < 7, "outcome": "resolved" if i < 7 else "unknown",
"time_to_resolve_s": 100, "total_turns": 5,
"judge": {"overall": 0.7}, "postmortem_path": "p.md",
"reward_contract": {"total": 0.7}}
for i in range(10)
]
summary = compute_summary(results, "test", "test_model")
assert summary["resolution_rate"] == pytest.approx(0.7)