File size: 6,904 Bytes
7e9a520 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 | """Tests for the benchmark runner β reward contract, scoring, comparison table."""
import json
import math
import pytest
from pathlib import Path
# ββ Reward contract ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
class TestRewardContract:
def _episode(self, **kwargs):
base = {
"tier": "single_fault", "resolved": True, "outcome": "resolved",
"total_turns": 10, "time_to_resolve_s": 120,
"judge": {"reasoning": 0.8, "correctness": 0.9, "efficiency": 0.85},
"postmortem_path": "docs/postmortems/test.md",
}
base.update(kwargs)
return base
def test_perfect_episode_near_one(self):
from bench.runner import _evaluate_episode_reward
ep = self._episode(total_turns=5, time_to_resolve_s=60)
r = _evaluate_episode_reward(ep)
assert r["total"] > 0.85
def test_unresolved_episode_lower(self):
from bench.runner import _evaluate_episode_reward
ep = self._episode(resolved=False, outcome="unknown",
judge={"reasoning": 0.3, "correctness": 0.4, "efficiency": 0.5})
r = _evaluate_episode_reward(ep)
assert r["total"] < 0.5
def test_command_spam_penalty_applied(self):
from bench.runner import _evaluate_episode_reward
ep_normal = self._episode(total_turns=10)
ep_spam = self._episode(total_turns=50)
r_normal = _evaluate_episode_reward(ep_normal)
r_spam = _evaluate_episode_reward(ep_spam)
assert r_spam["total"] < r_normal["total"]
assert r_spam["penalties"]["command_spam"] == pytest.approx(0.10)
def test_false_resolution_penalty(self):
from bench.runner import _evaluate_episode_reward
ep = self._episode(resolved=False, outcome="resolved")
r = _evaluate_episode_reward(ep)
assert r["penalties"]["false_resolution"] == pytest.approx(0.25)
def test_unsafe_shortcut_penalty(self):
from bench.runner import _evaluate_episode_reward
ep = self._episode(judge={"reasoning": 0.8, "correctness": 0.9, "efficiency": 0.1})
r = _evaluate_episode_reward(ep)
assert r["penalties"]["unsafe_shortcut"] == pytest.approx(0.20)
def test_no_postmortem_reduces_comms(self):
from bench.runner import _evaluate_episode_reward
ep_with = self._episode(postmortem_path="some/path.md")
ep_without = self._episode(postmortem_path=None)
r_with = _evaluate_episode_reward(ep_with)
r_without = _evaluate_episode_reward(ep_without)
assert r_with["total"] > r_without["total"]
def test_cascade_tier_weights_evidence_higher(self):
from bench.runner import _evaluate_episode_reward
ep_sf = self._episode(tier="single_fault")
ep_cs = self._episode(tier="cascade")
r_sf = _evaluate_episode_reward(ep_sf)
r_cs = _evaluate_episode_reward(ep_cs)
# cascade weights r_evidence at 0.25 vs single_fault at 0.25 too β both same
# but cascade weights r_resolve lower (0.30 vs 0.35) so a resolved episode
# scores slightly differently
assert isinstance(r_sf["total"], float) and isinstance(r_cs["total"], float)
def test_adversarial_penalties_amplified(self):
from bench.runner import _evaluate_episode_reward
ep_sf = self._episode(tier="single_fault", total_turns=50)
ep_adv = self._episode(tier="adversarial", total_turns=50)
r_sf = _evaluate_episode_reward(ep_sf)
r_adv = _evaluate_episode_reward(ep_adv)
assert r_adv["penalty_total"] > r_sf["penalty_total"]
def test_reward_always_clamped_01(self):
from bench.runner import _evaluate_episode_reward
# Worst possible episode
ep = self._episode(
resolved=False, outcome="resolved", total_turns=100,
time_to_resolve_s=9999, postmortem_path=None,
judge={"reasoning": 0.0, "correctness": 0.0, "efficiency": 0.0},
)
r = _evaluate_episode_reward(ep)
assert 0.0 <= r["total"] <= 1.0
def test_speed_score_logistic(self):
from bench.runner import _bounded_speed_score
# Fast resolution should score high
assert _bounded_speed_score(60, "single_fault") > 0.8
# Slow resolution should score low
assert _bounded_speed_score(600, "single_fault") < 0.2
# Midpoint ~150s for single_fault β score near 0.5
score_mid = _bounded_speed_score(150, "single_fault")
assert 0.4 < score_mid < 0.6
class TestFrozenScenarioList:
def test_frozen_scenarios_count(self):
from bench.runner import FROZEN_SCENARIOS
assert len(FROZEN_SCENARIOS) >= 28
def test_all_tiers_represented(self):
from bench.runner import FROZEN_SCENARIOS
tiers = {s.split("/")[0] for s in FROZEN_SCENARIOS}
assert "single_fault" in tiers
assert "cascade" in tiers
assert "named_replays" in tiers
assert "multi_fault" in tiers
def test_named_replays_count(self):
from bench.runner import FROZEN_SCENARIOS
replays = [s for s in FROZEN_SCENARIOS if "named_replays" in s]
assert len(replays) == 10
def test_no_duplicate_scenarios(self):
from bench.runner import FROZEN_SCENARIOS
assert len(FROZEN_SCENARIOS) == len(set(FROZEN_SCENARIOS))
class TestComparisonTable:
def test_summary_has_required_keys(self, tmp_path):
from bench.runner import compute_summary
results = [
{"scenario_id": "sf-001", "tier": "single_fault", "status": "ok",
"resolved": True, "outcome": "resolved", "time_to_resolve_s": 120,
"total_turns": 8, "judge": {"overall": 0.8}, "postmortem_path": "p.md",
"reward_contract": {"total": 0.75}},
]
summary = compute_summary(results, "test_tag", "test_model")
for key in ("tag", "model", "resolution_rate", "avg_reward", "cascade_resolution_rate",
"named_replay_resolution_rate", "per_tier"):
assert key in summary, f"Missing key: {key}"
def test_resolution_rate_calculation(self, tmp_path):
from bench.runner import compute_summary
results = [
{"scenario_id": f"s{i}", "tier": "single_fault", "status": "ok",
"resolved": i < 7, "outcome": "resolved" if i < 7 else "unknown",
"time_to_resolve_s": 100, "total_turns": 5,
"judge": {"overall": 0.7}, "postmortem_path": "p.md",
"reward_contract": {"total": 0.7}}
for i in range(10)
]
summary = compute_summary(results, "test", "test_model")
assert summary["resolution_rate"] == pytest.approx(0.7)
|