| """Tests for the benchmark runner — reward contract, scoring, comparison table.""" |
|
|
| import json |
| import math |
| import pytest |
| from pathlib import Path |
|
|
|
|
| |
|
|
| class TestRewardContract: |
| def _episode(self, **kwargs): |
| base = { |
| "tier": "single_fault", "resolved": True, "outcome": "resolved", |
| "total_turns": 10, "time_to_resolve_s": 120, |
| "judge": {"reasoning": 0.8, "correctness": 0.9, "efficiency": 0.85}, |
| "postmortem_path": "docs/postmortems/test.md", |
| } |
| base.update(kwargs) |
| return base |
|
|
| def test_perfect_episode_near_one(self): |
| from bench.runner import _evaluate_episode_reward |
| ep = self._episode(total_turns=5, time_to_resolve_s=60) |
| r = _evaluate_episode_reward(ep) |
| assert r["total"] > 0.85 |
|
|
| def test_unresolved_episode_lower(self): |
| from bench.runner import _evaluate_episode_reward |
| ep = self._episode(resolved=False, outcome="unknown", |
| judge={"reasoning": 0.3, "correctness": 0.4, "efficiency": 0.5}) |
| r = _evaluate_episode_reward(ep) |
| assert r["total"] < 0.5 |
|
|
| def test_command_spam_penalty_applied(self): |
| from bench.runner import _evaluate_episode_reward |
| ep_normal = self._episode(total_turns=10) |
| ep_spam = self._episode(total_turns=50) |
| r_normal = _evaluate_episode_reward(ep_normal) |
| r_spam = _evaluate_episode_reward(ep_spam) |
| assert r_spam["total"] < r_normal["total"] |
| assert r_spam["penalties"]["command_spam"] == pytest.approx(0.10) |
|
|
| def test_false_resolution_penalty(self): |
| from bench.runner import _evaluate_episode_reward |
| ep = self._episode(resolved=False, outcome="resolved") |
| r = _evaluate_episode_reward(ep) |
| assert r["penalties"]["false_resolution"] == pytest.approx(0.25) |
|
|
| def test_unsafe_shortcut_penalty(self): |
| from bench.runner import _evaluate_episode_reward |
| ep = self._episode(judge={"reasoning": 0.8, "correctness": 0.9, "efficiency": 0.1}) |
| r = _evaluate_episode_reward(ep) |
| assert r["penalties"]["unsafe_shortcut"] == pytest.approx(0.20) |
|
|
| def test_no_postmortem_reduces_comms(self): |
| from bench.runner import _evaluate_episode_reward |
| ep_with = self._episode(postmortem_path="some/path.md") |
| ep_without = self._episode(postmortem_path=None) |
| r_with = _evaluate_episode_reward(ep_with) |
| r_without = _evaluate_episode_reward(ep_without) |
| assert r_with["total"] > r_without["total"] |
|
|
| def test_cascade_tier_weights_evidence_higher(self): |
| from bench.runner import _evaluate_episode_reward |
| ep_sf = self._episode(tier="single_fault") |
| ep_cs = self._episode(tier="cascade") |
| r_sf = _evaluate_episode_reward(ep_sf) |
| r_cs = _evaluate_episode_reward(ep_cs) |
| |
| |
| |
| assert isinstance(r_sf["total"], float) and isinstance(r_cs["total"], float) |
|
|
| def test_adversarial_penalties_amplified(self): |
| from bench.runner import _evaluate_episode_reward |
| ep_sf = self._episode(tier="single_fault", total_turns=50) |
| ep_adv = self._episode(tier="adversarial", total_turns=50) |
| r_sf = _evaluate_episode_reward(ep_sf) |
| r_adv = _evaluate_episode_reward(ep_adv) |
| assert r_adv["penalty_total"] > r_sf["penalty_total"] |
|
|
| def test_reward_always_clamped_01(self): |
| from bench.runner import _evaluate_episode_reward |
| |
| ep = self._episode( |
| resolved=False, outcome="resolved", total_turns=100, |
| time_to_resolve_s=9999, postmortem_path=None, |
| judge={"reasoning": 0.0, "correctness": 0.0, "efficiency": 0.0}, |
| ) |
| r = _evaluate_episode_reward(ep) |
| assert 0.0 <= r["total"] <= 1.0 |
|
|
| def test_speed_score_logistic(self): |
| from bench.runner import _bounded_speed_score |
| |
| assert _bounded_speed_score(60, "single_fault") > 0.8 |
| |
| assert _bounded_speed_score(600, "single_fault") < 0.2 |
| |
| score_mid = _bounded_speed_score(150, "single_fault") |
| assert 0.4 < score_mid < 0.6 |
|
|
|
|
| class TestFrozenScenarioList: |
| def test_frozen_scenarios_count(self): |
| from bench.runner import FROZEN_SCENARIOS |
| assert len(FROZEN_SCENARIOS) >= 28 |
|
|
| def test_all_tiers_represented(self): |
| from bench.runner import FROZEN_SCENARIOS |
| tiers = {s.split("/")[0] for s in FROZEN_SCENARIOS} |
| assert "single_fault" in tiers |
| assert "cascade" in tiers |
| assert "named_replays" in tiers |
| assert "multi_fault" in tiers |
|
|
| def test_named_replays_count(self): |
| from bench.runner import FROZEN_SCENARIOS |
| replays = [s for s in FROZEN_SCENARIOS if "named_replays" in s] |
| assert len(replays) == 10 |
|
|
| def test_no_duplicate_scenarios(self): |
| from bench.runner import FROZEN_SCENARIOS |
| assert len(FROZEN_SCENARIOS) == len(set(FROZEN_SCENARIOS)) |
|
|
|
|
| class TestComparisonTable: |
| def test_summary_has_required_keys(self, tmp_path): |
| from bench.runner import compute_summary |
| results = [ |
| {"scenario_id": "sf-001", "tier": "single_fault", "status": "ok", |
| "resolved": True, "outcome": "resolved", "time_to_resolve_s": 120, |
| "total_turns": 8, "judge": {"overall": 0.8}, "postmortem_path": "p.md", |
| "reward_contract": {"total": 0.75}}, |
| ] |
| summary = compute_summary(results, "test_tag", "test_model") |
| for key in ("tag", "model", "resolution_rate", "avg_reward", "cascade_resolution_rate", |
| "named_replay_resolution_rate", "per_tier"): |
| assert key in summary, f"Missing key: {key}" |
|
|
| def test_resolution_rate_calculation(self, tmp_path): |
| from bench.runner import compute_summary |
| results = [ |
| {"scenario_id": f"s{i}", "tier": "single_fault", "status": "ok", |
| "resolved": i < 7, "outcome": "resolved" if i < 7 else "unknown", |
| "time_to_resolve_s": 100, "total_turns": 5, |
| "judge": {"overall": 0.7}, "postmortem_path": "p.md", |
| "reward_contract": {"total": 0.7}} |
| for i in range(10) |
| ] |
| summary = compute_summary(results, "test", "test_model") |
| assert summary["resolution_rate"] == pytest.approx(0.7) |
|
|