Spaces:
Runtime error
Runtime error
| """ | |
| Tests for the dynamic world features: | |
| - server/mutator.py (code mutation engine) | |
| - Task 6 (causal chain / progressive observation) | |
| - GET_CONTEXT action (line-context probing) | |
| - Causal unlock chain (context_hints injected into observation) | |
| - Tasks 3 & 5 unlocks (causal chains across tasks) | |
| - EpisodeMemory (cross-episode persistence) | |
| - RUN_SCANNER action (live tool interaction, noisy results) | |
| """ | |
| import sys | |
| import os | |
| import copy | |
| import pytest | |
| sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..")) | |
| from environment.mutator import mutate_task | |
| from environment.probe_environment import EpisodeState | |
| from environment.tasks import TASKS | |
| from environment.graders import CodeReviewGrader | |
| # --------------------------------------------------------------------------- | |
| # Helpers | |
| # --------------------------------------------------------------------------- | |
| TASK6 = TASKS[6] # causal chain task | |
| def _grader(task): | |
| return CodeReviewGrader(task) | |
| # =========================================================================== | |
| # MUTATOR TESTS | |
| # =========================================================================== | |
| class TestMutator: | |
| def test_returns_deep_copy(self): | |
| """mutate_task must not modify the original TASKS entry.""" | |
| original_code = TASKS[1]["code"] | |
| _ = mutate_task(TASKS[1], seed=0) | |
| assert TASKS[1]["code"] == original_code | |
| def test_mutation_seed_tag(self): | |
| """Mutated task carries _mutation_seed matching the supplied seed.""" | |
| t = mutate_task(TASKS[1], seed=42) | |
| assert t["_mutation_seed"] == 42 | |
| def test_different_seeds_differ(self): | |
| """Two different seeds should (almost always) produce different code.""" | |
| t1 = mutate_task(TASKS[1], seed=0) | |
| t2 = mutate_task(TASKS[1], seed=1) | |
| # At minimum the blank-line insert shifts are different; codes differ | |
| assert t1["code"] != TASKS[1]["code"] or t2["code"] != TASKS[1]["code"] | |
| def test_same_seed_is_deterministic(self): | |
| """Same seed must always produce identical output.""" | |
| t1 = mutate_task(TASKS[2], seed=99) | |
| t2 = mutate_task(TASKS[2], seed=99) | |
| assert t1["code"] == t2["code"] | |
| assert t1["issues"] == t2["issues"] | |
| def test_line_shift_applied(self): | |
| """Line shift must move every issue line_range down by exactly 1.""" | |
| original = copy.deepcopy(TASKS[1]) | |
| mutated = mutate_task(TASKS[1], seed=7) | |
| orig_ranges = [iss["line_range"] for iss in original["issues"]] | |
| mut_ranges = [iss["line_range"] for iss in mutated["issues"]] | |
| for orig_r, mut_r in zip(orig_ranges, mut_ranges): | |
| assert mut_r[0] == orig_r[0] + 1 | |
| assert mut_r[1] == orig_r[1] + 1 | |
| def test_issue_count_preserved(self): | |
| """Mutation must not add or remove issues.""" | |
| for task in TASKS[:6]: # skip task 6 here, tested separately | |
| mutated = mutate_task(task, seed=5) | |
| assert len(mutated["issues"]) == len(task["issues"]) | |
| def test_issue_ids_preserved(self): | |
| """Issue ids must be unchanged after mutation.""" | |
| original_ids = [i["id"] for i in TASKS[2]["issues"]] | |
| mutated_ids = [i["id"] for i in mutate_task(TASKS[2], seed=3)["issues"]] | |
| assert original_ids == mutated_ids | |
| def test_grader_still_matches_after_mutation(self): | |
| """ | |
| The grader must still award credit after mutation. | |
| Use the off-by-one issue in task 1 β keyword 'range' is always present | |
| and line_range shifts by exactly 1. | |
| """ | |
| mutated = mutate_task(TASKS[1], seed=10) | |
| g = _grader(mutated) | |
| off_by_one = next(i for i in mutated["issues"] if i["id"] == "off_by_one") | |
| target_line = off_by_one["line_range"][0] | |
| score, found, _ = g.score_comment( | |
| line_number=target_line, | |
| comment="off-by-one error: range(len + 1) causes IndexError on the last iteration", | |
| already_found=[], | |
| ) | |
| assert "off_by_one" in found | |
| assert score > 0.0 | |
| def test_correct_decision_preserved(self): | |
| """correct_decision must be unchanged by mutation.""" | |
| for task in TASKS: | |
| mutated = mutate_task(task, seed=1) | |
| assert mutated["correct_decision"] == task["correct_decision"] | |
| # =========================================================================== | |
| # TASK 6 STRUCTURE TESTS | |
| # =========================================================================== | |
| class TestTask6Structure: | |
| def test_task6_exists(self): | |
| assert len(TASKS) >= 7, "Task 6 (causal chain) must exist in TASKS" | |
| def test_task6_has_context_hints(self): | |
| assert "context_hints" in TASK6 | |
| assert len(TASK6["context_hints"]) >= 2 | |
| def test_task6_unlock_keys_present(self): | |
| """Every 'unlocks' key in an issue must exist in context_hints dict.""" | |
| hints = TASK6["context_hints"] | |
| for issue in TASK6["issues"]: | |
| key = issue.get("unlocks") | |
| if key: | |
| assert key in hints, f"Issue {issue['id']} unlocks '{key}' but key not in context_hints" | |
| def test_task6_total_weight_positive(self): | |
| g = _grader(TASK6) | |
| assert g.total_weight > 0.0 | |
| def test_task6_has_chained_issues(self): | |
| """At least two issues must have an 'unlocks' field.""" | |
| unlocking = [i for i in TASK6["issues"] if i.get("unlocks")] | |
| assert len(unlocking) >= 2 | |
| def test_task6_correct_decision(self): | |
| assert TASK6["correct_decision"] == "request_changes" | |
| # =========================================================================== | |
| # CAUSAL UNLOCK CHAIN TESTS (environment layer) | |
| # =========================================================================== | |
| class TestCausalUnlock: | |
| """ | |
| Test the unlock mechanic via the environment's _unlock_causal_hints helper | |
| and _handle_add_comment pipeline. | |
| """ | |
| def _make_env(self): | |
| """Return a fresh environment instance fast-forwarded to task 6.""" | |
| import asyncio | |
| try: | |
| from environment.probe_environment import ProbeEnvironment | |
| except ImportError: | |
| from probe_environment import ProbeEnvironment # type: ignore | |
| env = ProbeEnvironment() | |
| # force-set episode to task 6 (bypass cycling for test speed) | |
| from environment.mutator import mutate_task as _mt | |
| from environment.probe_environment import EpisodeState | |
| task = _mt(TASK6, seed=0) | |
| from environment.graders import CodeReviewGrader as _G | |
| env._grader = _G(task) | |
| env._episode = EpisodeState(task=task) | |
| return env | |
| def test_no_hints_at_start(self): | |
| env = self._make_env() | |
| assert env._episode.context_hints == [] | |
| def test_unlock_fires_after_finding_trigger_issue(self): | |
| """Finding hardcoded_jwt_secret must append db_schema_hint.""" | |
| env = self._make_env() | |
| jwt_issue = next(i for i in env._episode.task["issues"] if i["id"] == "hardcoded_jwt_secret") | |
| target_line = jwt_issue["line_range"][0] | |
| env._step_count = 1 | |
| reward = env._handle_add_comment( | |
| type("A", (), { | |
| "line_number": target_line, | |
| "comment": "JWT_SECRET is hardcoded β must be loaded from environment variable to prevent token forgery", | |
| "severity": type("S", (), {"value": "critical"})(), | |
| "category": type("C", (), {"value": "security"})(), | |
| })() | |
| ) | |
| assert "hardcoded_jwt_secret" in env._episode.issues_found | |
| assert len(env._episode.context_hints) == 1 | |
| assert "db_schema_hint" in env._episode.hints_unlocked | |
| assert "Database Schema" in env._episode.context_hints[0] | |
| def test_unlock_fires_only_once(self): | |
| """The same hint must not be appended twice even if issue found again.""" | |
| env = self._make_env() | |
| jwt_issue = next(i for i in env._episode.task["issues"] if i["id"] == "hardcoded_jwt_secret") | |
| target_line = jwt_issue["line_range"][0] | |
| for _ in range(3): | |
| env._step_count += 1 | |
| env._handle_add_comment( | |
| type("A", (), { | |
| "line_number": target_line, | |
| "comment": "JWT_SECRET is hardcoded β must be loaded from environment variable", | |
| "severity": type("S", (), {"value": "critical"})(), | |
| "category": type("C", (), {"value": "security"})(), | |
| })() | |
| ) | |
| assert len(env._episode.context_hints) == 1 | |
| def test_second_unlock_fires_independently(self): | |
| """Finding no_rate_limit must append nginx_config_hint independently.""" | |
| env = self._make_env() | |
| rate_issue = next(i for i in env._episode.task["issues"] if i["id"] == "no_rate_limit") | |
| target_line = rate_issue["line_range"][0] | |
| env._step_count = 1 | |
| env._handle_add_comment( | |
| type("A", (), { | |
| "line_number": target_line, | |
| "comment": "No rate limiting on /auth endpoint β susceptible to brute-force attacks", | |
| "severity": type("S", (), {"value": "error"})(), | |
| "category": type("C", (), {"value": "security"})(), | |
| })() | |
| ) | |
| assert "nginx_config_hint" in env._episode.hints_unlocked | |
| assert any("nginx" in h.lower() for h in env._episode.context_hints) | |
| def test_both_unlocks_can_fire_in_same_episode(self): | |
| """Both hints can be unlocked within one episode.""" | |
| env = self._make_env() | |
| task = env._episode.task | |
| jwt_issue = next(i for i in task["issues"] if i["id"] == "hardcoded_jwt_secret") | |
| rate_issue = next(i for i in task["issues"] if i["id"] == "no_rate_limit") | |
| for step, (issue, kw) in enumerate([ | |
| (jwt_issue, "JWT_SECRET is hardcoded β must be loaded from environment variable to prevent forgery"), | |
| (rate_issue, "No rate limiting on /auth endpoint β susceptible to brute-force attacks"), | |
| ], start=1): | |
| env._step_count = step | |
| env._handle_add_comment( | |
| type("A", (), { | |
| "line_number": issue["line_range"][0], | |
| "comment": kw, | |
| "severity": type("S", (), {"value": "critical"})(), | |
| "category": type("C", (), {"value": "security"})(), | |
| })() | |
| ) | |
| assert len(env._episode.context_hints) == 2 | |
| assert env._episode.hints_unlocked == {"db_schema_hint", "nginx_config_hint"} | |
| def test_context_hints_appear_in_observation(self): | |
| """context_hints list must be non-empty in the observation after an unlock.""" | |
| env = self._make_env() | |
| jwt_issue = next(i for i in env._episode.task["issues"] if i["id"] == "hardcoded_jwt_secret") | |
| env._step_count = 1 | |
| env._handle_add_comment( | |
| type("A", (), { | |
| "line_number": jwt_issue["line_range"][0], | |
| "comment": "JWT_SECRET is hardcoded β must be loaded from environment variable", | |
| "severity": type("S", (), {"value": "critical"})(), | |
| "category": type("C", (), {"value": "security"})(), | |
| })() | |
| ) | |
| obs = env._build_observation(reward=0.0, done=False) | |
| assert len(obs.context_hints) == 1 | |
| assert "Database Schema" in obs.context_hints[0] | |
| # =========================================================================== | |
| # GET_CONTEXT ACTION TESTS | |
| # =========================================================================== | |
| class TestGetContext: | |
| def _make_env(self): | |
| try: | |
| from environment.probe_environment import ProbeEnvironment | |
| except ImportError: | |
| from probe_environment import ProbeEnvironment # type: ignore | |
| from environment.mutator import mutate_task as _mt | |
| from environment.graders import CodeReviewGrader as _G | |
| env = ProbeEnvironment() | |
| task = _mt(TASKS[1], seed=0) | |
| env._grader = _G(task) | |
| env._episode = EpisodeState(task=task) | |
| return env | |
| def test_get_context_near_issue_no_penalty(self): | |
| """Probing a line near a real issue must cost 0.0.""" | |
| env = self._make_env() | |
| issue_line = env._episode.task["issues"][0]["line_range"][0] | |
| env._step_count = 1 | |
| reward = env._handle_get_context( | |
| type("A", (), {"line_number": issue_line})() | |
| ) | |
| assert reward.total == 0.0 | |
| assert reward.passed is True | |
| def test_get_context_far_from_issue_costs_penalty(self): | |
| """Probing a line far from any issue must cost -0.01.""" | |
| env = self._make_env() | |
| env._step_count = 1 | |
| reward = env._handle_get_context( | |
| type("A", (), {"line_number": 999})() | |
| ) | |
| assert reward.total == pytest.approx(-0.01, abs=0.001) | |
| assert reward.passed is False | |
| def test_get_context_no_line_number_penalised(self): | |
| """GET_CONTEXT with no line_number must return -0.02.""" | |
| env = self._make_env() | |
| env._step_count = 1 | |
| reward = env._handle_get_context( | |
| type("A", (), {"line_number": None})() | |
| ) | |
| assert reward.total == pytest.approx(-0.02, abs=0.001) | |
| def test_get_context_snippet_stored_in_history(self): | |
| """The context probe must be recorded in review_comments.""" | |
| env = self._make_env() | |
| env._step_count = 1 | |
| env._handle_get_context( | |
| type("A", (), {"line_number": 4})() | |
| ) | |
| probes = [c for c in env._episode.review_comments if c.get("type") == "context_probe"] | |
| assert len(probes) == 1 | |
| assert probes[0]["line"] == 4 | |
| assert "context" in probes[0] | |
| def test_get_context_snippet_contains_requested_line(self): | |
| """The returned snippet must reference the requested line number.""" | |
| env = self._make_env() | |
| env._step_count = 1 | |
| reward = env._handle_get_context( | |
| type("A", (), {"line_number": 4})() | |
| ) | |
| # explanation contains the formatted snippet with line numbers | |
| assert "4:" in reward.explanation or "4 :" in reward.explanation | |
| # =========================================================================== | |
| # TASK 3 & 5 CAUSAL UNLOCK TESTS | |
| # =========================================================================== | |
| class TestTask3CausalUnlocks: | |
| """Task 3 (data_pipeline) should unlock context hints via issue findings.""" | |
| TASK3 = TASKS[3] | |
| def _make_env(self): | |
| try: | |
| from environment.probe_environment import ProbeEnvironment | |
| except ImportError: | |
| from probe_environment import ProbeEnvironment # type: ignore | |
| env = ProbeEnvironment() | |
| task = copy.deepcopy(self.TASK3) | |
| from environment.graders import CodeReviewGrader as _G | |
| env._grader = _G(task) | |
| env._episode = EpisodeState(task=task) | |
| return env | |
| def test_task3_has_context_hints(self): | |
| """Task 3 must declare a context_hints dict with both expected keys.""" | |
| hints = self.TASK3.get("context_hints", {}) | |
| assert "api_docs_hint" in hints | |
| assert "network_topology_hint" in hints | |
| def test_task3_hardcoded_api_key_has_unlocks(self): | |
| """hardcoded_api_key issue must carry unlocks='api_docs_hint'.""" | |
| issue = next(i for i in self.TASK3["issues"] if i["id"] == "hardcoded_api_key") | |
| assert issue.get("unlocks") == "api_docs_hint" | |
| def test_task3_ssl_disabled_has_unlocks(self): | |
| """ssl_disabled issue must carry unlocks='network_topology_hint'.""" | |
| issue = next(i for i in self.TASK3["issues"] if i["id"] == "ssl_disabled") | |
| assert issue.get("unlocks") == "network_topology_hint" | |
| def test_task3_api_key_unlock_fires(self): | |
| """Finding hardcoded_api_key must append api_docs_hint to context_hints.""" | |
| env = self._make_env() | |
| api_issue = next(i for i in env._episode.task["issues"] if i["id"] == "hardcoded_api_key") | |
| env._step_count = 1 | |
| env._handle_add_comment( | |
| type("A", (), { | |
| "line_number": api_issue["line_range"][0], | |
| "comment": "API key is hardcoded in source β move to os.environ", | |
| "severity": type("S", (), {"value": "critical"})(), | |
| "category": type("C", (), {"value": "security"})(), | |
| })() | |
| ) | |
| assert "api_docs_hint" in env._episode.hints_unlocked | |
| assert any("batch" in h for h in env._episode.context_hints) | |
| def test_task3_ssl_unlock_fires(self): | |
| """Finding ssl_disabled must append network_topology_hint to context_hints.""" | |
| env = self._make_env() | |
| ssl_issue = next(i for i in env._episode.task["issues"] if i["id"] == "ssl_disabled") | |
| env._step_count = 1 | |
| env._handle_add_comment( | |
| type("A", (), { | |
| "line_number": ssl_issue["line_range"][0], | |
| "comment": "SSL certificate verification disabled (verify=False) β MITM risk", | |
| "severity": type("S", (), {"value": "error"})(), | |
| "category": type("C", (), {"value": "security"})(), | |
| })() | |
| ) | |
| assert "network_topology_hint" in env._episode.hints_unlocked | |
| assert any("internet" in h.lower() for h in env._episode.context_hints) | |
| def test_task3_hints_not_duplicated(self): | |
| """The same unlock key must not fire twice even if the issue is found twice.""" | |
| env = self._make_env() | |
| api_issue = next(i for i in env._episode.task["issues"] if i["id"] == "hardcoded_api_key") | |
| for step in range(1, 4): | |
| env._step_count = step | |
| env._handle_add_comment( | |
| type("A", (), { | |
| "line_number": api_issue["line_range"][0], | |
| "comment": "Hardcoded API key β use environment variable", | |
| "severity": type("S", (), {"value": "critical"})(), | |
| "category": type("C", (), {"value": "security"})(), | |
| })() | |
| ) | |
| api_hints = [h for h in env._episode.context_hints if "batch" in h] | |
| assert len(api_hints) == 1 | |
| class TestTask5CausalUnlocks: | |
| """Task 5 (Flask API) should unlock context hints via issue findings.""" | |
| TASK5 = TASKS[5] | |
| def _make_env(self): | |
| try: | |
| from environment.probe_environment import ProbeEnvironment | |
| except ImportError: | |
| from probe_environment import ProbeEnvironment # type: ignore | |
| env = ProbeEnvironment() | |
| task = copy.deepcopy(self.TASK5) | |
| from environment.graders import CodeReviewGrader as _G | |
| env._grader = _G(task) | |
| env._episode = EpisodeState(task=task) | |
| return env | |
| def test_task5_has_context_hints(self): | |
| """Task 5 must declare a context_hints dict with both expected keys.""" | |
| hints = self.TASK5.get("context_hints", {}) | |
| assert "server_config_hint" in hints | |
| assert "client_usage_hint" in hints | |
| def test_task5_command_injection_has_unlocks(self): | |
| """command_injection issue must carry unlocks='server_config_hint'.""" | |
| issue = next(i for i in self.TASK5["issues"] if i["id"] == "command_injection") | |
| assert issue.get("unlocks") == "server_config_hint" | |
| def test_task5_insecure_deserialization_has_unlocks(self): | |
| """insecure_deserialization issue must carry unlocks='client_usage_hint'.""" | |
| issue = next(i for i in self.TASK5["issues"] if i["id"] == "insecure_deserialization") | |
| assert issue.get("unlocks") == "client_usage_hint" | |
| def test_task5_command_injection_unlock_fires(self): | |
| """Finding command_injection must append server_config_hint.""" | |
| env = self._make_env() | |
| ci_issue = next(i for i in env._episode.task["issues"] if i["id"] == "command_injection") | |
| env._step_count = 1 | |
| env._handle_add_comment( | |
| type("A", (), { | |
| "line_number": ci_issue["line_range"][0], | |
| "comment": "Command injection via shell=True with unsanitised user input", | |
| "severity": type("S", (), {"value": "critical"})(), | |
| "category": type("C", (), {"value": "security"})(), | |
| })() | |
| ) | |
| assert "server_config_hint" in env._episode.hints_unlocked | |
| assert any("root" in h or "privileged" in h for h in env._episode.context_hints) | |
| def test_task5_deserialization_unlock_fires(self): | |
| """Finding insecure_deserialization must append client_usage_hint.""" | |
| env = self._make_env() | |
| deser_issue = next(i for i in env._episode.task["issues"] if i["id"] == "insecure_deserialization") | |
| env._step_count = 1 | |
| env._handle_add_comment( | |
| type("A", (), { | |
| "line_number": deser_issue["line_range"][0], | |
| "comment": "pickle.loads on untrusted data β insecure deserialization RCE", | |
| "severity": type("S", (), {"value": "critical"})(), | |
| "category": type("C", (), {"value": "security"})(), | |
| })() | |
| ) | |
| assert "client_usage_hint" in env._episode.hints_unlocked | |
| assert any("pickle" in h for h in env._episode.context_hints) | |
| # =========================================================================== | |
| # EPISODE MEMORY TESTS | |
| # =========================================================================== | |
| class TestEpisodeMemory: | |
| """Cross-episode memory β records findings and injects prior hints.""" | |
| def _fresh_memory(self, tmp_path): | |
| from environment.episode_memory import EpisodeMemory | |
| return EpisodeMemory(memory_dir=str(tmp_path), instance_id="test") | |
| def test_empty_memory_returns_no_hint(self, tmp_path): | |
| """New memory store must return None for any task.""" | |
| mem = self._fresh_memory(tmp_path) | |
| assert mem.prior_hint(1, TASKS[1]) is None | |
| def test_record_and_retrieve(self, tmp_path): | |
| """After recording, prior_hint must return a non-None string.""" | |
| mem = self._fresh_memory(tmp_path) | |
| mem.record(1, ["off_by_one", "assignment_not_update"]) | |
| hint = mem.prior_hint(1, TASKS[1]) | |
| assert hint is not None | |
| assert isinstance(hint, str) | |
| assert len(hint) > 20 | |
| def test_hint_mentions_category(self, tmp_path): | |
| """Prior hint must mention the category of the recorded issue.""" | |
| mem = self._fresh_memory(tmp_path) | |
| mem.record(1, ["off_by_one"]) # category='bug' in Task 1 | |
| hint = mem.prior_hint(1, TASKS[1]) | |
| assert "bug" in hint | |
| def test_hint_mentions_task_name(self, tmp_path): | |
| """Prior hint must mention the task name.""" | |
| mem = self._fresh_memory(tmp_path) | |
| mem.record(1, ["off_by_one"]) | |
| hint = mem.prior_hint(1, TASKS[1]) | |
| assert TASKS[1]["name"] in hint | |
| def test_record_persists_across_instances(self, tmp_path): | |
| """Memory written by one instance must be readable by a fresh instance.""" | |
| mem1 = self._fresh_memory(tmp_path) | |
| mem1.record(2, ["sql_injection", "eval_use"]) | |
| mem2 = self._fresh_memory(tmp_path) | |
| hint = mem2.prior_hint(2, TASKS[2]) | |
| assert hint is not None | |
| def test_record_deduplicates(self, tmp_path): | |
| """Recording the same issue_id twice must not inflate the stored list.""" | |
| mem = self._fresh_memory(tmp_path) | |
| mem.record(1, ["off_by_one"]) | |
| mem.record(1, ["off_by_one"]) | |
| assert mem._data["1"].count("off_by_one") == 1 | |
| def test_record_merges_across_calls(self, tmp_path): | |
| """Findings across two episodes must be merged, not overwritten.""" | |
| mem = self._fresh_memory(tmp_path) | |
| mem.record(1, ["off_by_one"]) | |
| mem.record(1, ["assignment_not_update"]) | |
| assert set(mem._data["1"]) == {"off_by_one", "assignment_not_update"} | |
| def test_clear_single_task(self, tmp_path): | |
| """clear(task_id) must remove only that task's memory.""" | |
| mem = self._fresh_memory(tmp_path) | |
| mem.record(1, ["off_by_one"]) | |
| mem.record(2, ["sql_injection"]) | |
| mem.clear(1) | |
| assert mem.prior_hint(1, TASKS[1]) is None | |
| assert mem.prior_hint(2, TASKS[2]) is not None | |
| def test_clear_all(self, tmp_path): | |
| """clear() with no args must wipe all memory.""" | |
| mem = self._fresh_memory(tmp_path) | |
| mem.record(1, ["off_by_one"]) | |
| mem.record(2, ["sql_injection"]) | |
| mem.clear() | |
| assert mem.prior_hint(1, TASKS[1]) is None | |
| assert mem.prior_hint(2, TASKS[2]) is None | |
| def test_env_injects_prior_hint_on_second_reset(self, tmp_path): | |
| """After a full episode, the next reset for the same task_id must inject a hint.""" | |
| try: | |
| from environment.probe_environment import ProbeEnvironment | |
| except ImportError: | |
| from probe_environment import ProbeEnvironment # type: ignore | |
| import asyncio | |
| env = ProbeEnvironment(memory_dir=str(tmp_path)) | |
| # reset_count starts at 0; task_id = reset_count % len(TASKS) | |
| # Do one reset to consume task 0, then seed task-1 memory. | |
| asyncio.run(env.async_reset()) # reset_count β 1; ran task 0 | |
| # Manually seed task-1 memory so the next task-1 reset gets a hint. | |
| task1_id = TASKS[1]["id"] # == 1 | |
| env._memory.record(task1_id, ["off_by_one"]) | |
| # Cycle through tasks 1..6 (6 resets) so reset_count reaches 7 (β‘ task 0). | |
| # Then one more reset puts us at reset_count=8 (β‘ task 1) with prior memory. | |
| for _ in range(len(TASKS)): | |
| asyncio.run(env.async_reset()) | |
| # reset_count is now (1 + len(TASKS) + 1) % len(TASKS) == 1 β task 1 | |
| obs = asyncio.run(env.async_reset()) | |
| assert any("PRIOR KNOWLEDGE" in h for h in obs.context_hints) | |
| def test_env_records_memory_after_submit(self, tmp_path): | |
| """Submitting a review with findings must persist them in EpisodeMemory.""" | |
| try: | |
| from environment.probe_environment import ProbeEnvironment | |
| except ImportError: | |
| from probe_environment import ProbeEnvironment # type: ignore | |
| import asyncio | |
| from agent.models import ProbeAction, ActionType | |
| env = ProbeEnvironment(memory_dir=str(tmp_path)) | |
| asyncio.run(env.async_reset()) # task 0 | |
| # Add a correct comment on task 0 bootstrap issue | |
| bootstrap_issue = next( | |
| i for i in env._episode.task["issues"] if i["id"] == "bootstrap_off_by_one" | |
| ) | |
| add_action = ProbeAction( | |
| action_type=ActionType.ADD_COMMENT, | |
| line_number=bootstrap_issue["line_range"][0], | |
| comment="Off-by-one error: range(len+1) causes IndexError on the last iteration", | |
| severity=None, | |
| category=None, | |
| ) | |
| asyncio.run(env.async_step(add_action)) | |
| # Submit review | |
| from agent.models import ActionType as AT | |
| submit_action = ProbeAction( | |
| action_type=AT.SUBMIT_REVIEW, | |
| line_number=None, | |
| comment=None, | |
| severity=None, | |
| category=None, | |
| ) | |
| asyncio.run(env.async_step(submit_action)) | |
| # Memory for task 0 must now be non-empty | |
| assert env._memory._data.get("0") is not None | |
| assert len(env._memory._data["0"]) > 0 | |
| # =========================================================================== | |
| # RUN_SCANNER TESTS | |
| # =========================================================================== | |
| class TestRunScanner: | |
| """Tests for the scanner module and RUN_SCANNER action handler.""" | |
| def _make_env(self, task_index: int = 1): | |
| try: | |
| from environment.probe_environment import ProbeEnvironment | |
| except ImportError: | |
| from probe_environment import ProbeEnvironment # type: ignore | |
| env = ProbeEnvironment() | |
| import copy | |
| task = copy.deepcopy(TASKS[task_index]) | |
| from environment.graders import CodeReviewGrader as _G | |
| from environment.mutator import mutate_task as _mt | |
| task = _mt(task, seed=7) | |
| env._grader = _G(task) | |
| env._episode = EpisodeState(task=task) | |
| return env | |
| # ββ scanner module unit tests ββββββββββββββββββββββββββββββββββββββββ | |
| def test_scanner_returns_required_keys(self): | |
| """run_scanner must return dict with tool, findings, missed_count, note.""" | |
| from environment.scanner import run_scanner | |
| result = run_scanner(TASKS[1], seed=0) | |
| assert "tool" in result | |
| assert "findings" in result | |
| assert "missed_count" in result | |
| assert "note" in result | |
| def test_scanner_findings_are_list(self): | |
| """findings must be a list.""" | |
| from environment.scanner import run_scanner | |
| result = run_scanner(TASKS[1], seed=0) | |
| assert isinstance(result["findings"], list) | |
| def test_scanner_finding_has_required_fields(self): | |
| """Every finding dict must have line, rule, message, category, severity, verified.""" | |
| from environment.scanner import run_scanner | |
| result = run_scanner(TASKS[2], seed=42) | |
| for f in result["findings"]: | |
| for key in ("line", "rule", "message", "category", "severity", "verified"): | |
| assert key in f, f"Missing key '{key}' in finding: {f}" | |
| def test_scanner_verified_always_false(self): | |
| """All scanner findings start unverified β agent must confirm them.""" | |
| from environment.scanner import run_scanner | |
| result = run_scanner(TASKS[2], seed=99) | |
| for f in result["findings"]: | |
| assert f["verified"] is False | |
| def test_scanner_recall_below_100_percent(self): | |
| """With enough seeds, at least some issues must be missed (recall < 1.0).""" | |
| from environment.scanner import run_scanner | |
| total_issues = len(TASKS[2]["issues"]) # 5 issues in Task 2 | |
| missed_any = any( | |
| run_scanner(TASKS[2], seed=s)["missed_count"] > 0 | |
| for s in range(20) | |
| ) | |
| assert missed_any, "Scanner should miss at least one issue across 20 seeds" | |
| def test_scanner_deterministic_per_seed(self): | |
| """Same seed must produce identical results.""" | |
| from environment.scanner import run_scanner | |
| r1 = run_scanner(TASKS[3], seed=123) | |
| r2 = run_scanner(TASKS[3], seed=123) | |
| assert r1["findings"] == r2["findings"] | |
| assert r1["missed_count"] == r2["missed_count"] | |
| def test_scanner_different_seeds_differ(self): | |
| """Different seeds should (almost always) produce different findings.""" | |
| from environment.scanner import run_scanner | |
| results = { | |
| tuple(f["line"] for f in run_scanner(TASKS[3], seed=s)["findings"]) | |
| for s in range(10) | |
| } | |
| assert len(results) > 1, "Scanner findings should vary across seeds" | |
| def test_scanner_line_numbers_within_code(self): | |
| """All reported line numbers must be within the code's line count.""" | |
| from environment.scanner import run_scanner | |
| task = TASKS[2] | |
| total_lines = len(task["code"].split("\n")) | |
| result = run_scanner(task, seed=5) | |
| for f in result["findings"]: | |
| assert 1 <= f["line"] <= total_lines, ( | |
| f"Finding line {f['line']} out of range [1, {total_lines}]" | |
| ) | |
| def test_scanner_tool_is_known_string(self): | |
| """tool field must be a non-empty string.""" | |
| from environment.scanner import run_scanner | |
| result = run_scanner(TASKS[1], seed=0) | |
| assert isinstance(result["tool"], str) | |
| assert len(result["tool"]) > 0 | |
| # ββ RUN_SCANNER action handler tests ββββββββββββββββββββββββββββββββ | |
| def test_run_scanner_first_call_free(self): | |
| """First RUN_SCANNER in an episode must cost 0.0.""" | |
| env = self._make_env() | |
| env._step_count = 1 | |
| reward = env._handle_run_scanner() | |
| assert reward.total == 0.0 | |
| assert reward.passed is True | |
| def test_run_scanner_repeated_penalised(self): | |
| """Second RUN_SCANNER call must cost -0.02.""" | |
| env = self._make_env() | |
| env._step_count = 1 | |
| env._handle_run_scanner() # first β free | |
| env._step_count = 2 | |
| reward = env._handle_run_scanner() # second β penalised | |
| assert reward.total == pytest.approx(-0.02, abs=0.001) | |
| assert reward.passed is False | |
| def test_run_scanner_stored_in_history(self): | |
| """Scanner result must be stored as 'scanner_result' in review_comments.""" | |
| env = self._make_env() | |
| env._step_count = 1 | |
| env._handle_run_scanner() | |
| scanner_entries = [ | |
| c for c in env._episode.review_comments if c.get("type") == "scanner_result" | |
| ] | |
| assert len(scanner_entries) == 1 | |
| entry = scanner_entries[0] | |
| assert "tool" in entry | |
| assert "findings" in entry | |
| assert "note" in entry | |
| def test_run_scanner_sets_scanner_used_flag(self): | |
| """scanner_used flag must be False before, True after first call.""" | |
| env = self._make_env() | |
| assert env._episode.scanner_used is False | |
| env._step_count = 1 | |
| env._handle_run_scanner() | |
| assert env._episode.scanner_used is True | |
| def test_run_scanner_result_appears_in_obs_history(self): | |
| """After RUN_SCANNER, the next observation's review_history must contain the result.""" | |
| env = self._make_env() | |
| env._step_count = 1 | |
| env._handle_run_scanner() | |
| obs = env._build_observation(reward=0.0, done=False) | |
| scanner_entries = [ | |
| e for e in obs.review_history if e.get("type") == "scanner_result" | |
| ] | |
| assert len(scanner_entries) == 1 | |
| def test_run_scanner_via_async_step(self): | |
| """RUN_SCANNER dispatched through async_step must return a valid reward.""" | |
| import asyncio | |
| try: | |
| from environment.probe_environment import ProbeEnvironment | |
| except ImportError: | |
| from probe_environment import ProbeEnvironment # type: ignore | |
| from agent.models import ProbeAction, ActionType | |
| env = ProbeEnvironment() | |
| asyncio.run(env.async_reset()) | |
| action = ProbeAction( | |
| action_type=ActionType.RUN_SCANNER, | |
| line_number=None, | |
| comment=None, | |
| severity=None, | |
| category=None, | |
| ) | |
| obs, reward, done, info = asyncio.run(env.async_step(action)) | |
| assert reward.total == 0.0 # first use is free | |
| assert done is False | |
| assert any( | |
| e.get("type") == "scanner_result" for e in obs.review_history | |
| ) | |
| def test_scanner_used_tracked_in_async_state(self): | |
| """async_state must reflect scanner_used after the action fires.""" | |
| import asyncio | |
| try: | |
| from environment.probe_environment import ProbeEnvironment | |
| except ImportError: | |
| from probe_environment import ProbeEnvironment # type: ignore | |
| from agent.models import ProbeAction, ActionType | |
| env = ProbeEnvironment() | |
| asyncio.run(env.async_reset()) | |
| state_before = asyncio.run(env.async_state()) | |
| assert state_before["scanner_used"] is False | |
| action = ProbeAction( | |
| action_type=ActionType.RUN_SCANNER, | |
| line_number=None, comment=None, severity=None, category=None, | |
| ) | |
| asyncio.run(env.async_step(action)) | |
| state_after = asyncio.run(env.async_state()) | |
| assert state_after["scanner_used"] is True | |