PRobe / tests /test_dynamic_world.py
Thakur, Mahipal
refactor: remove legacy architecture, promote clean structure to repo root
85fab7b
"""
Tests for the dynamic world features:
- server/mutator.py (code mutation engine)
- Task 6 (causal chain / progressive observation)
- GET_CONTEXT action (line-context probing)
- Causal unlock chain (context_hints injected into observation)
- Tasks 3 & 5 unlocks (causal chains across tasks)
- EpisodeMemory (cross-episode persistence)
- RUN_SCANNER action (live tool interaction, noisy results)
"""
import sys
import os
import copy
import pytest
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
from environment.mutator import mutate_task
from environment.probe_environment import EpisodeState
from environment.tasks import TASKS
from environment.graders import CodeReviewGrader
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
TASK6 = TASKS[6] # causal chain task
def _grader(task):
return CodeReviewGrader(task)
# ===========================================================================
# MUTATOR TESTS
# ===========================================================================
class TestMutator:
def test_returns_deep_copy(self):
"""mutate_task must not modify the original TASKS entry."""
original_code = TASKS[1]["code"]
_ = mutate_task(TASKS[1], seed=0)
assert TASKS[1]["code"] == original_code
def test_mutation_seed_tag(self):
"""Mutated task carries _mutation_seed matching the supplied seed."""
t = mutate_task(TASKS[1], seed=42)
assert t["_mutation_seed"] == 42
def test_different_seeds_differ(self):
"""Two different seeds should (almost always) produce different code."""
t1 = mutate_task(TASKS[1], seed=0)
t2 = mutate_task(TASKS[1], seed=1)
# At minimum the blank-line insert shifts are different; codes differ
assert t1["code"] != TASKS[1]["code"] or t2["code"] != TASKS[1]["code"]
def test_same_seed_is_deterministic(self):
"""Same seed must always produce identical output."""
t1 = mutate_task(TASKS[2], seed=99)
t2 = mutate_task(TASKS[2], seed=99)
assert t1["code"] == t2["code"]
assert t1["issues"] == t2["issues"]
def test_line_shift_applied(self):
"""Line shift must move every issue line_range down by exactly 1."""
original = copy.deepcopy(TASKS[1])
mutated = mutate_task(TASKS[1], seed=7)
orig_ranges = [iss["line_range"] for iss in original["issues"]]
mut_ranges = [iss["line_range"] for iss in mutated["issues"]]
for orig_r, mut_r in zip(orig_ranges, mut_ranges):
assert mut_r[0] == orig_r[0] + 1
assert mut_r[1] == orig_r[1] + 1
def test_issue_count_preserved(self):
"""Mutation must not add or remove issues."""
for task in TASKS[:6]: # skip task 6 here, tested separately
mutated = mutate_task(task, seed=5)
assert len(mutated["issues"]) == len(task["issues"])
def test_issue_ids_preserved(self):
"""Issue ids must be unchanged after mutation."""
original_ids = [i["id"] for i in TASKS[2]["issues"]]
mutated_ids = [i["id"] for i in mutate_task(TASKS[2], seed=3)["issues"]]
assert original_ids == mutated_ids
def test_grader_still_matches_after_mutation(self):
"""
The grader must still award credit after mutation.
Use the off-by-one issue in task 1 β€” keyword 'range' is always present
and line_range shifts by exactly 1.
"""
mutated = mutate_task(TASKS[1], seed=10)
g = _grader(mutated)
off_by_one = next(i for i in mutated["issues"] if i["id"] == "off_by_one")
target_line = off_by_one["line_range"][0]
score, found, _ = g.score_comment(
line_number=target_line,
comment="off-by-one error: range(len + 1) causes IndexError on the last iteration",
already_found=[],
)
assert "off_by_one" in found
assert score > 0.0
def test_correct_decision_preserved(self):
"""correct_decision must be unchanged by mutation."""
for task in TASKS:
mutated = mutate_task(task, seed=1)
assert mutated["correct_decision"] == task["correct_decision"]
# ===========================================================================
# TASK 6 STRUCTURE TESTS
# ===========================================================================
class TestTask6Structure:
def test_task6_exists(self):
assert len(TASKS) >= 7, "Task 6 (causal chain) must exist in TASKS"
def test_task6_has_context_hints(self):
assert "context_hints" in TASK6
assert len(TASK6["context_hints"]) >= 2
def test_task6_unlock_keys_present(self):
"""Every 'unlocks' key in an issue must exist in context_hints dict."""
hints = TASK6["context_hints"]
for issue in TASK6["issues"]:
key = issue.get("unlocks")
if key:
assert key in hints, f"Issue {issue['id']} unlocks '{key}' but key not in context_hints"
def test_task6_total_weight_positive(self):
g = _grader(TASK6)
assert g.total_weight > 0.0
def test_task6_has_chained_issues(self):
"""At least two issues must have an 'unlocks' field."""
unlocking = [i for i in TASK6["issues"] if i.get("unlocks")]
assert len(unlocking) >= 2
def test_task6_correct_decision(self):
assert TASK6["correct_decision"] == "request_changes"
# ===========================================================================
# CAUSAL UNLOCK CHAIN TESTS (environment layer)
# ===========================================================================
class TestCausalUnlock:
"""
Test the unlock mechanic via the environment's _unlock_causal_hints helper
and _handle_add_comment pipeline.
"""
def _make_env(self):
"""Return a fresh environment instance fast-forwarded to task 6."""
import asyncio
try:
from environment.probe_environment import ProbeEnvironment
except ImportError:
from probe_environment import ProbeEnvironment # type: ignore
env = ProbeEnvironment()
# force-set episode to task 6 (bypass cycling for test speed)
from environment.mutator import mutate_task as _mt
from environment.probe_environment import EpisodeState
task = _mt(TASK6, seed=0)
from environment.graders import CodeReviewGrader as _G
env._grader = _G(task)
env._episode = EpisodeState(task=task)
return env
def test_no_hints_at_start(self):
env = self._make_env()
assert env._episode.context_hints == []
def test_unlock_fires_after_finding_trigger_issue(self):
"""Finding hardcoded_jwt_secret must append db_schema_hint."""
env = self._make_env()
jwt_issue = next(i for i in env._episode.task["issues"] if i["id"] == "hardcoded_jwt_secret")
target_line = jwt_issue["line_range"][0]
env._step_count = 1
reward = env._handle_add_comment(
type("A", (), {
"line_number": target_line,
"comment": "JWT_SECRET is hardcoded β€” must be loaded from environment variable to prevent token forgery",
"severity": type("S", (), {"value": "critical"})(),
"category": type("C", (), {"value": "security"})(),
})()
)
assert "hardcoded_jwt_secret" in env._episode.issues_found
assert len(env._episode.context_hints) == 1
assert "db_schema_hint" in env._episode.hints_unlocked
assert "Database Schema" in env._episode.context_hints[0]
def test_unlock_fires_only_once(self):
"""The same hint must not be appended twice even if issue found again."""
env = self._make_env()
jwt_issue = next(i for i in env._episode.task["issues"] if i["id"] == "hardcoded_jwt_secret")
target_line = jwt_issue["line_range"][0]
for _ in range(3):
env._step_count += 1
env._handle_add_comment(
type("A", (), {
"line_number": target_line,
"comment": "JWT_SECRET is hardcoded β€” must be loaded from environment variable",
"severity": type("S", (), {"value": "critical"})(),
"category": type("C", (), {"value": "security"})(),
})()
)
assert len(env._episode.context_hints) == 1
def test_second_unlock_fires_independently(self):
"""Finding no_rate_limit must append nginx_config_hint independently."""
env = self._make_env()
rate_issue = next(i for i in env._episode.task["issues"] if i["id"] == "no_rate_limit")
target_line = rate_issue["line_range"][0]
env._step_count = 1
env._handle_add_comment(
type("A", (), {
"line_number": target_line,
"comment": "No rate limiting on /auth endpoint β€” susceptible to brute-force attacks",
"severity": type("S", (), {"value": "error"})(),
"category": type("C", (), {"value": "security"})(),
})()
)
assert "nginx_config_hint" in env._episode.hints_unlocked
assert any("nginx" in h.lower() for h in env._episode.context_hints)
def test_both_unlocks_can_fire_in_same_episode(self):
"""Both hints can be unlocked within one episode."""
env = self._make_env()
task = env._episode.task
jwt_issue = next(i for i in task["issues"] if i["id"] == "hardcoded_jwt_secret")
rate_issue = next(i for i in task["issues"] if i["id"] == "no_rate_limit")
for step, (issue, kw) in enumerate([
(jwt_issue, "JWT_SECRET is hardcoded β€” must be loaded from environment variable to prevent forgery"),
(rate_issue, "No rate limiting on /auth endpoint β€” susceptible to brute-force attacks"),
], start=1):
env._step_count = step
env._handle_add_comment(
type("A", (), {
"line_number": issue["line_range"][0],
"comment": kw,
"severity": type("S", (), {"value": "critical"})(),
"category": type("C", (), {"value": "security"})(),
})()
)
assert len(env._episode.context_hints) == 2
assert env._episode.hints_unlocked == {"db_schema_hint", "nginx_config_hint"}
def test_context_hints_appear_in_observation(self):
"""context_hints list must be non-empty in the observation after an unlock."""
env = self._make_env()
jwt_issue = next(i for i in env._episode.task["issues"] if i["id"] == "hardcoded_jwt_secret")
env._step_count = 1
env._handle_add_comment(
type("A", (), {
"line_number": jwt_issue["line_range"][0],
"comment": "JWT_SECRET is hardcoded β€” must be loaded from environment variable",
"severity": type("S", (), {"value": "critical"})(),
"category": type("C", (), {"value": "security"})(),
})()
)
obs = env._build_observation(reward=0.0, done=False)
assert len(obs.context_hints) == 1
assert "Database Schema" in obs.context_hints[0]
# ===========================================================================
# GET_CONTEXT ACTION TESTS
# ===========================================================================
class TestGetContext:
def _make_env(self):
try:
from environment.probe_environment import ProbeEnvironment
except ImportError:
from probe_environment import ProbeEnvironment # type: ignore
from environment.mutator import mutate_task as _mt
from environment.graders import CodeReviewGrader as _G
env = ProbeEnvironment()
task = _mt(TASKS[1], seed=0)
env._grader = _G(task)
env._episode = EpisodeState(task=task)
return env
def test_get_context_near_issue_no_penalty(self):
"""Probing a line near a real issue must cost 0.0."""
env = self._make_env()
issue_line = env._episode.task["issues"][0]["line_range"][0]
env._step_count = 1
reward = env._handle_get_context(
type("A", (), {"line_number": issue_line})()
)
assert reward.total == 0.0
assert reward.passed is True
def test_get_context_far_from_issue_costs_penalty(self):
"""Probing a line far from any issue must cost -0.01."""
env = self._make_env()
env._step_count = 1
reward = env._handle_get_context(
type("A", (), {"line_number": 999})()
)
assert reward.total == pytest.approx(-0.01, abs=0.001)
assert reward.passed is False
def test_get_context_no_line_number_penalised(self):
"""GET_CONTEXT with no line_number must return -0.02."""
env = self._make_env()
env._step_count = 1
reward = env._handle_get_context(
type("A", (), {"line_number": None})()
)
assert reward.total == pytest.approx(-0.02, abs=0.001)
def test_get_context_snippet_stored_in_history(self):
"""The context probe must be recorded in review_comments."""
env = self._make_env()
env._step_count = 1
env._handle_get_context(
type("A", (), {"line_number": 4})()
)
probes = [c for c in env._episode.review_comments if c.get("type") == "context_probe"]
assert len(probes) == 1
assert probes[0]["line"] == 4
assert "context" in probes[0]
def test_get_context_snippet_contains_requested_line(self):
"""The returned snippet must reference the requested line number."""
env = self._make_env()
env._step_count = 1
reward = env._handle_get_context(
type("A", (), {"line_number": 4})()
)
# explanation contains the formatted snippet with line numbers
assert "4:" in reward.explanation or "4 :" in reward.explanation
# ===========================================================================
# TASK 3 & 5 CAUSAL UNLOCK TESTS
# ===========================================================================
class TestTask3CausalUnlocks:
"""Task 3 (data_pipeline) should unlock context hints via issue findings."""
TASK3 = TASKS[3]
def _make_env(self):
try:
from environment.probe_environment import ProbeEnvironment
except ImportError:
from probe_environment import ProbeEnvironment # type: ignore
env = ProbeEnvironment()
task = copy.deepcopy(self.TASK3)
from environment.graders import CodeReviewGrader as _G
env._grader = _G(task)
env._episode = EpisodeState(task=task)
return env
def test_task3_has_context_hints(self):
"""Task 3 must declare a context_hints dict with both expected keys."""
hints = self.TASK3.get("context_hints", {})
assert "api_docs_hint" in hints
assert "network_topology_hint" in hints
def test_task3_hardcoded_api_key_has_unlocks(self):
"""hardcoded_api_key issue must carry unlocks='api_docs_hint'."""
issue = next(i for i in self.TASK3["issues"] if i["id"] == "hardcoded_api_key")
assert issue.get("unlocks") == "api_docs_hint"
def test_task3_ssl_disabled_has_unlocks(self):
"""ssl_disabled issue must carry unlocks='network_topology_hint'."""
issue = next(i for i in self.TASK3["issues"] if i["id"] == "ssl_disabled")
assert issue.get("unlocks") == "network_topology_hint"
def test_task3_api_key_unlock_fires(self):
"""Finding hardcoded_api_key must append api_docs_hint to context_hints."""
env = self._make_env()
api_issue = next(i for i in env._episode.task["issues"] if i["id"] == "hardcoded_api_key")
env._step_count = 1
env._handle_add_comment(
type("A", (), {
"line_number": api_issue["line_range"][0],
"comment": "API key is hardcoded in source β€” move to os.environ",
"severity": type("S", (), {"value": "critical"})(),
"category": type("C", (), {"value": "security"})(),
})()
)
assert "api_docs_hint" in env._episode.hints_unlocked
assert any("batch" in h for h in env._episode.context_hints)
def test_task3_ssl_unlock_fires(self):
"""Finding ssl_disabled must append network_topology_hint to context_hints."""
env = self._make_env()
ssl_issue = next(i for i in env._episode.task["issues"] if i["id"] == "ssl_disabled")
env._step_count = 1
env._handle_add_comment(
type("A", (), {
"line_number": ssl_issue["line_range"][0],
"comment": "SSL certificate verification disabled (verify=False) β€” MITM risk",
"severity": type("S", (), {"value": "error"})(),
"category": type("C", (), {"value": "security"})(),
})()
)
assert "network_topology_hint" in env._episode.hints_unlocked
assert any("internet" in h.lower() for h in env._episode.context_hints)
def test_task3_hints_not_duplicated(self):
"""The same unlock key must not fire twice even if the issue is found twice."""
env = self._make_env()
api_issue = next(i for i in env._episode.task["issues"] if i["id"] == "hardcoded_api_key")
for step in range(1, 4):
env._step_count = step
env._handle_add_comment(
type("A", (), {
"line_number": api_issue["line_range"][0],
"comment": "Hardcoded API key β€” use environment variable",
"severity": type("S", (), {"value": "critical"})(),
"category": type("C", (), {"value": "security"})(),
})()
)
api_hints = [h for h in env._episode.context_hints if "batch" in h]
assert len(api_hints) == 1
class TestTask5CausalUnlocks:
"""Task 5 (Flask API) should unlock context hints via issue findings."""
TASK5 = TASKS[5]
def _make_env(self):
try:
from environment.probe_environment import ProbeEnvironment
except ImportError:
from probe_environment import ProbeEnvironment # type: ignore
env = ProbeEnvironment()
task = copy.deepcopy(self.TASK5)
from environment.graders import CodeReviewGrader as _G
env._grader = _G(task)
env._episode = EpisodeState(task=task)
return env
def test_task5_has_context_hints(self):
"""Task 5 must declare a context_hints dict with both expected keys."""
hints = self.TASK5.get("context_hints", {})
assert "server_config_hint" in hints
assert "client_usage_hint" in hints
def test_task5_command_injection_has_unlocks(self):
"""command_injection issue must carry unlocks='server_config_hint'."""
issue = next(i for i in self.TASK5["issues"] if i["id"] == "command_injection")
assert issue.get("unlocks") == "server_config_hint"
def test_task5_insecure_deserialization_has_unlocks(self):
"""insecure_deserialization issue must carry unlocks='client_usage_hint'."""
issue = next(i for i in self.TASK5["issues"] if i["id"] == "insecure_deserialization")
assert issue.get("unlocks") == "client_usage_hint"
def test_task5_command_injection_unlock_fires(self):
"""Finding command_injection must append server_config_hint."""
env = self._make_env()
ci_issue = next(i for i in env._episode.task["issues"] if i["id"] == "command_injection")
env._step_count = 1
env._handle_add_comment(
type("A", (), {
"line_number": ci_issue["line_range"][0],
"comment": "Command injection via shell=True with unsanitised user input",
"severity": type("S", (), {"value": "critical"})(),
"category": type("C", (), {"value": "security"})(),
})()
)
assert "server_config_hint" in env._episode.hints_unlocked
assert any("root" in h or "privileged" in h for h in env._episode.context_hints)
def test_task5_deserialization_unlock_fires(self):
"""Finding insecure_deserialization must append client_usage_hint."""
env = self._make_env()
deser_issue = next(i for i in env._episode.task["issues"] if i["id"] == "insecure_deserialization")
env._step_count = 1
env._handle_add_comment(
type("A", (), {
"line_number": deser_issue["line_range"][0],
"comment": "pickle.loads on untrusted data β€” insecure deserialization RCE",
"severity": type("S", (), {"value": "critical"})(),
"category": type("C", (), {"value": "security"})(),
})()
)
assert "client_usage_hint" in env._episode.hints_unlocked
assert any("pickle" in h for h in env._episode.context_hints)
# ===========================================================================
# EPISODE MEMORY TESTS
# ===========================================================================
class TestEpisodeMemory:
"""Cross-episode memory β€” records findings and injects prior hints."""
def _fresh_memory(self, tmp_path):
from environment.episode_memory import EpisodeMemory
return EpisodeMemory(memory_dir=str(tmp_path), instance_id="test")
def test_empty_memory_returns_no_hint(self, tmp_path):
"""New memory store must return None for any task."""
mem = self._fresh_memory(tmp_path)
assert mem.prior_hint(1, TASKS[1]) is None
def test_record_and_retrieve(self, tmp_path):
"""After recording, prior_hint must return a non-None string."""
mem = self._fresh_memory(tmp_path)
mem.record(1, ["off_by_one", "assignment_not_update"])
hint = mem.prior_hint(1, TASKS[1])
assert hint is not None
assert isinstance(hint, str)
assert len(hint) > 20
def test_hint_mentions_category(self, tmp_path):
"""Prior hint must mention the category of the recorded issue."""
mem = self._fresh_memory(tmp_path)
mem.record(1, ["off_by_one"]) # category='bug' in Task 1
hint = mem.prior_hint(1, TASKS[1])
assert "bug" in hint
def test_hint_mentions_task_name(self, tmp_path):
"""Prior hint must mention the task name."""
mem = self._fresh_memory(tmp_path)
mem.record(1, ["off_by_one"])
hint = mem.prior_hint(1, TASKS[1])
assert TASKS[1]["name"] in hint
def test_record_persists_across_instances(self, tmp_path):
"""Memory written by one instance must be readable by a fresh instance."""
mem1 = self._fresh_memory(tmp_path)
mem1.record(2, ["sql_injection", "eval_use"])
mem2 = self._fresh_memory(tmp_path)
hint = mem2.prior_hint(2, TASKS[2])
assert hint is not None
def test_record_deduplicates(self, tmp_path):
"""Recording the same issue_id twice must not inflate the stored list."""
mem = self._fresh_memory(tmp_path)
mem.record(1, ["off_by_one"])
mem.record(1, ["off_by_one"])
assert mem._data["1"].count("off_by_one") == 1
def test_record_merges_across_calls(self, tmp_path):
"""Findings across two episodes must be merged, not overwritten."""
mem = self._fresh_memory(tmp_path)
mem.record(1, ["off_by_one"])
mem.record(1, ["assignment_not_update"])
assert set(mem._data["1"]) == {"off_by_one", "assignment_not_update"}
def test_clear_single_task(self, tmp_path):
"""clear(task_id) must remove only that task's memory."""
mem = self._fresh_memory(tmp_path)
mem.record(1, ["off_by_one"])
mem.record(2, ["sql_injection"])
mem.clear(1)
assert mem.prior_hint(1, TASKS[1]) is None
assert mem.prior_hint(2, TASKS[2]) is not None
def test_clear_all(self, tmp_path):
"""clear() with no args must wipe all memory."""
mem = self._fresh_memory(tmp_path)
mem.record(1, ["off_by_one"])
mem.record(2, ["sql_injection"])
mem.clear()
assert mem.prior_hint(1, TASKS[1]) is None
assert mem.prior_hint(2, TASKS[2]) is None
def test_env_injects_prior_hint_on_second_reset(self, tmp_path):
"""After a full episode, the next reset for the same task_id must inject a hint."""
try:
from environment.probe_environment import ProbeEnvironment
except ImportError:
from probe_environment import ProbeEnvironment # type: ignore
import asyncio
env = ProbeEnvironment(memory_dir=str(tmp_path))
# reset_count starts at 0; task_id = reset_count % len(TASKS)
# Do one reset to consume task 0, then seed task-1 memory.
asyncio.run(env.async_reset()) # reset_count β†’ 1; ran task 0
# Manually seed task-1 memory so the next task-1 reset gets a hint.
task1_id = TASKS[1]["id"] # == 1
env._memory.record(task1_id, ["off_by_one"])
# Cycle through tasks 1..6 (6 resets) so reset_count reaches 7 (≑ task 0).
# Then one more reset puts us at reset_count=8 (≑ task 1) with prior memory.
for _ in range(len(TASKS)):
asyncio.run(env.async_reset())
# reset_count is now (1 + len(TASKS) + 1) % len(TASKS) == 1 β†’ task 1
obs = asyncio.run(env.async_reset())
assert any("PRIOR KNOWLEDGE" in h for h in obs.context_hints)
def test_env_records_memory_after_submit(self, tmp_path):
"""Submitting a review with findings must persist them in EpisodeMemory."""
try:
from environment.probe_environment import ProbeEnvironment
except ImportError:
from probe_environment import ProbeEnvironment # type: ignore
import asyncio
from agent.models import ProbeAction, ActionType
env = ProbeEnvironment(memory_dir=str(tmp_path))
asyncio.run(env.async_reset()) # task 0
# Add a correct comment on task 0 bootstrap issue
bootstrap_issue = next(
i for i in env._episode.task["issues"] if i["id"] == "bootstrap_off_by_one"
)
add_action = ProbeAction(
action_type=ActionType.ADD_COMMENT,
line_number=bootstrap_issue["line_range"][0],
comment="Off-by-one error: range(len+1) causes IndexError on the last iteration",
severity=None,
category=None,
)
asyncio.run(env.async_step(add_action))
# Submit review
from agent.models import ActionType as AT
submit_action = ProbeAction(
action_type=AT.SUBMIT_REVIEW,
line_number=None,
comment=None,
severity=None,
category=None,
)
asyncio.run(env.async_step(submit_action))
# Memory for task 0 must now be non-empty
assert env._memory._data.get("0") is not None
assert len(env._memory._data["0"]) > 0
# ===========================================================================
# RUN_SCANNER TESTS
# ===========================================================================
class TestRunScanner:
"""Tests for the scanner module and RUN_SCANNER action handler."""
def _make_env(self, task_index: int = 1):
try:
from environment.probe_environment import ProbeEnvironment
except ImportError:
from probe_environment import ProbeEnvironment # type: ignore
env = ProbeEnvironment()
import copy
task = copy.deepcopy(TASKS[task_index])
from environment.graders import CodeReviewGrader as _G
from environment.mutator import mutate_task as _mt
task = _mt(task, seed=7)
env._grader = _G(task)
env._episode = EpisodeState(task=task)
return env
# ── scanner module unit tests ────────────────────────────────────────
def test_scanner_returns_required_keys(self):
"""run_scanner must return dict with tool, findings, missed_count, note."""
from environment.scanner import run_scanner
result = run_scanner(TASKS[1], seed=0)
assert "tool" in result
assert "findings" in result
assert "missed_count" in result
assert "note" in result
def test_scanner_findings_are_list(self):
"""findings must be a list."""
from environment.scanner import run_scanner
result = run_scanner(TASKS[1], seed=0)
assert isinstance(result["findings"], list)
def test_scanner_finding_has_required_fields(self):
"""Every finding dict must have line, rule, message, category, severity, verified."""
from environment.scanner import run_scanner
result = run_scanner(TASKS[2], seed=42)
for f in result["findings"]:
for key in ("line", "rule", "message", "category", "severity", "verified"):
assert key in f, f"Missing key '{key}' in finding: {f}"
def test_scanner_verified_always_false(self):
"""All scanner findings start unverified β€” agent must confirm them."""
from environment.scanner import run_scanner
result = run_scanner(TASKS[2], seed=99)
for f in result["findings"]:
assert f["verified"] is False
def test_scanner_recall_below_100_percent(self):
"""With enough seeds, at least some issues must be missed (recall < 1.0)."""
from environment.scanner import run_scanner
total_issues = len(TASKS[2]["issues"]) # 5 issues in Task 2
missed_any = any(
run_scanner(TASKS[2], seed=s)["missed_count"] > 0
for s in range(20)
)
assert missed_any, "Scanner should miss at least one issue across 20 seeds"
def test_scanner_deterministic_per_seed(self):
"""Same seed must produce identical results."""
from environment.scanner import run_scanner
r1 = run_scanner(TASKS[3], seed=123)
r2 = run_scanner(TASKS[3], seed=123)
assert r1["findings"] == r2["findings"]
assert r1["missed_count"] == r2["missed_count"]
def test_scanner_different_seeds_differ(self):
"""Different seeds should (almost always) produce different findings."""
from environment.scanner import run_scanner
results = {
tuple(f["line"] for f in run_scanner(TASKS[3], seed=s)["findings"])
for s in range(10)
}
assert len(results) > 1, "Scanner findings should vary across seeds"
def test_scanner_line_numbers_within_code(self):
"""All reported line numbers must be within the code's line count."""
from environment.scanner import run_scanner
task = TASKS[2]
total_lines = len(task["code"].split("\n"))
result = run_scanner(task, seed=5)
for f in result["findings"]:
assert 1 <= f["line"] <= total_lines, (
f"Finding line {f['line']} out of range [1, {total_lines}]"
)
def test_scanner_tool_is_known_string(self):
"""tool field must be a non-empty string."""
from environment.scanner import run_scanner
result = run_scanner(TASKS[1], seed=0)
assert isinstance(result["tool"], str)
assert len(result["tool"]) > 0
# ── RUN_SCANNER action handler tests ────────────────────────────────
def test_run_scanner_first_call_free(self):
"""First RUN_SCANNER in an episode must cost 0.0."""
env = self._make_env()
env._step_count = 1
reward = env._handle_run_scanner()
assert reward.total == 0.0
assert reward.passed is True
def test_run_scanner_repeated_penalised(self):
"""Second RUN_SCANNER call must cost -0.02."""
env = self._make_env()
env._step_count = 1
env._handle_run_scanner() # first β€” free
env._step_count = 2
reward = env._handle_run_scanner() # second β€” penalised
assert reward.total == pytest.approx(-0.02, abs=0.001)
assert reward.passed is False
def test_run_scanner_stored_in_history(self):
"""Scanner result must be stored as 'scanner_result' in review_comments."""
env = self._make_env()
env._step_count = 1
env._handle_run_scanner()
scanner_entries = [
c for c in env._episode.review_comments if c.get("type") == "scanner_result"
]
assert len(scanner_entries) == 1
entry = scanner_entries[0]
assert "tool" in entry
assert "findings" in entry
assert "note" in entry
def test_run_scanner_sets_scanner_used_flag(self):
"""scanner_used flag must be False before, True after first call."""
env = self._make_env()
assert env._episode.scanner_used is False
env._step_count = 1
env._handle_run_scanner()
assert env._episode.scanner_used is True
def test_run_scanner_result_appears_in_obs_history(self):
"""After RUN_SCANNER, the next observation's review_history must contain the result."""
env = self._make_env()
env._step_count = 1
env._handle_run_scanner()
obs = env._build_observation(reward=0.0, done=False)
scanner_entries = [
e for e in obs.review_history if e.get("type") == "scanner_result"
]
assert len(scanner_entries) == 1
def test_run_scanner_via_async_step(self):
"""RUN_SCANNER dispatched through async_step must return a valid reward."""
import asyncio
try:
from environment.probe_environment import ProbeEnvironment
except ImportError:
from probe_environment import ProbeEnvironment # type: ignore
from agent.models import ProbeAction, ActionType
env = ProbeEnvironment()
asyncio.run(env.async_reset())
action = ProbeAction(
action_type=ActionType.RUN_SCANNER,
line_number=None,
comment=None,
severity=None,
category=None,
)
obs, reward, done, info = asyncio.run(env.async_step(action))
assert reward.total == 0.0 # first use is free
assert done is False
assert any(
e.get("type") == "scanner_result" for e in obs.review_history
)
def test_scanner_used_tracked_in_async_state(self):
"""async_state must reflect scanner_used after the action fires."""
import asyncio
try:
from environment.probe_environment import ProbeEnvironment
except ImportError:
from probe_environment import ProbeEnvironment # type: ignore
from agent.models import ProbeAction, ActionType
env = ProbeEnvironment()
asyncio.run(env.async_reset())
state_before = asyncio.run(env.async_state())
assert state_before["scanner_used"] is False
action = ProbeAction(
action_type=ActionType.RUN_SCANNER,
line_number=None, comment=None, severity=None, category=None,
)
asyncio.run(env.async_step(action))
state_after = asyncio.run(env.async_state())
assert state_after["scanner_used"] is True