from __future__ import annotations from typing import Iterable import pytest from pydantic import ValidationError from code_security_auditor_env.models import CodeSecurityAction from code_security_auditor_env.server.security_environment import CodeSecurityAuditorEnvironment def _action(**kwargs) -> CodeSecurityAction: return CodeSecurityAction(**kwargs) def _run_actions(task_id: str, actions: Iterable[CodeSecurityAction]) -> tuple[float, list[float]]: env = CodeSecurityAuditorEnvironment(default_task_id=task_id) obs = env.reset(task_id=task_id) rewards: list[float] = [float(obs.reward or 0.0)] for action in actions: obs = env.step(action) rewards.append(float(obs.reward or 0.0)) if obs.done: break if not obs.done: obs = env.step(_action(action_type="submit_final_report")) rewards.append(float(obs.reward or 0.0)) return float(obs.reward or 0.0), rewards @pytest.mark.parametrize( "task_id,expected_file_count", [ ("easy", 3), ("medium", 3), ("hard", 4), ], ) def test_reset_exposes_task_specific_observation_space(task_id: str, expected_file_count: int) -> None: env = CodeSecurityAuditorEnvironment(default_task_id=task_id) obs = env.reset(task_id=task_id) assert obs.task_id == task_id assert len(obs.available_files) == expected_file_count assert obs.steps_remaining > 0 assert obs.file_excerpt == "" assert obs.focused_file is None assert 0.0 <= float(obs.score_hint) <= 1.0 def test_action_space_validation_rejects_invalid_values() -> None: with pytest.raises(ValidationError): _action(action_type="not_valid") with pytest.raises(ValidationError): _action(action_type="submit_finding", confidence=1.5) with pytest.raises(ValidationError): _action(action_type="submit_finding", line_start=0) def test_inspect_file_returns_numbered_excerpt() -> None: env = CodeSecurityAuditorEnvironment(default_task_id="easy") env.reset(task_id="easy") obs = env.step(_action(action_type="inspect_file", filename="app/routes.py")) assert obs.focused_file == "app/routes.py" assert " 1:" in obs.file_excerpt assert "SELECT id, email, role" in obs.file_excerpt def test_partial_progress_reward_for_near_miss_finding() -> None: env = CodeSecurityAuditorEnvironment(default_task_id="easy") env.reset(task_id="easy") obs = env.step( _action( action_type="submit_finding", filename="app/routes.py", line_start=11, line_end=11, vuln_type="sql_injection", severity="high", confidence=0.8, evidence="nearby SQL line", summary="line slightly off", ) ) assert 0.0 < float(obs.reward or 0.0) <= 0.2 assert "Partial progress" in obs.last_feedback def test_easy_task_high_quality_trajectory_scores_high() -> None: actions = [ _action(action_type="inspect_file", filename="app/routes.py"), _action(action_type="inspect_file", filename="app/config.py"), _action( action_type="submit_finding", filename="app/routes.py", line_start=8, vuln_type="sql_injection", severity="high", confidence=0.8, evidence="f-string SQL query with request arg", summary="SQL injection", ), _action( action_type="submit_finding", filename="app/config.py", line_start=5, vuln_type="hardcoded_secret", severity="high", confidence=0.85, evidence="secret embedded in config", summary="hardcoded secret", ), _action( action_type="submit_finding", filename="app/routes.py", line_start=15, vuln_type="weak_authentication", severity="medium", confidence=0.65, evidence="static token auth bypass", summary="weak authentication", ), _action(action_type="submit_final_report"), ] score, rewards = _run_actions("easy", actions) assert score >= 0.75 assert all(0.0 <= r <= 1.0 for r in rewards) def test_reward_hacking_by_spam_and_duplicates_is_penalized() -> None: strong_actions = [ _action(action_type="inspect_file", filename="app/routes.py"), _action(action_type="inspect_file", filename="app/config.py"), _action( action_type="submit_finding", filename="app/routes.py", line_start=8, vuln_type="sql_injection", severity="high", confidence=0.8, evidence="sql injection", summary="sql injection", ), _action( action_type="submit_finding", filename="app/config.py", line_start=5, vuln_type="hardcoded_secret", severity="high", confidence=0.85, evidence="hardcoded secret", summary="hardcoded secret", ), _action( action_type="submit_finding", filename="app/routes.py", line_start=15, vuln_type="weak_authentication", severity="medium", confidence=0.65, evidence="static token", summary="weak auth", ), _action(action_type="submit_final_report"), ] spam_actions = [ _action(action_type="inspect_file", filename="app/routes.py"), _action( action_type="submit_finding", filename="app/routes.py", line_start=8, vuln_type="sql_injection", severity="high", confidence=0.8, evidence="sql injection", summary="sql injection", ), _action( action_type="submit_finding", filename="app/routes.py", line_start=8, vuln_type="sql_injection", severity="high", confidence=0.95, evidence="duplicate #1", summary="duplicate #1", ), _action( action_type="submit_finding", filename="app/routes.py", line_start=8, vuln_type="sql_injection", severity="high", confidence=0.99, evidence="duplicate #2", summary="duplicate #2", ), _action( action_type="submit_finding", filename="app/routes.py", line_start=2, vuln_type="xss", severity="critical", confidence=1.0, evidence="intentional false positive", summary="intentional false positive", ), _action(action_type="submit_final_report"), ] strong_score, _ = _run_actions("easy", strong_actions) spam_score, _ = _run_actions("easy", spam_actions) assert strong_score > spam_score assert spam_score < 0.6 def test_medium_and_hard_tasks_support_successful_completion() -> None: medium_actions = [ _action(action_type="inspect_file", filename="service/webhook.py"), _action(action_type="inspect_file", filename="service/export.py"), _action(action_type="inspect_file", filename="service/serializers.py"), _action( action_type="submit_finding", filename="service/webhook.py", line_start=10, vuln_type="weak_authentication", severity="medium", confidence=0.65, evidence="timing unsafe compare", summary="signature compare", ), _action( action_type="submit_finding", filename="service/webhook.py", line_start=22, vuln_type="weak_authentication", severity="high", confidence=0.8, evidence="debug bypass", summary="debug bypass", ), _action( action_type="submit_finding", filename="service/export.py", line_start=8, vuln_type="command_injection", severity="critical", confidence=0.92, evidence="os.system with user input", summary="command injection", ), _action( action_type="submit_finding", filename="service/serializers.py", line_start=4, vuln_type="insecure_deserialization", severity="high", confidence=0.83, evidence="yaml.Loader unsafe", summary="unsafe yaml load", ), _action(action_type="submit_final_report"), ] hard_actions = [ _action(action_type="inspect_file", filename="api/auth.py"), _action(action_type="inspect_file", filename="api/files.py"), _action(action_type="inspect_file", filename="api/fetcher.py"), _action(action_type="inspect_file", filename="api/storage.py"), _action( action_type="submit_finding", filename="api/auth.py", line_start=12, vuln_type="weak_authentication", severity="critical", confidence=0.9, evidence="alg=none token acceptance", summary="jwt none alg", ), _action( action_type="submit_finding", filename="api/files.py", line_start=11, vuln_type="weak_authentication", severity="high", confidence=0.8, evidence="tenant param controls authorization", summary="idor cross tenant", ), _action( action_type="submit_finding", filename="api/fetcher.py", line_start=4, vuln_type="ssrf", severity="high", confidence=0.8, evidence="requests.get arbitrary URL", summary="ssrf", ), _action( action_type="submit_finding", filename="api/storage.py", line_start=6, vuln_type="path_traversal", severity="critical", confidence=0.9, evidence="path join without normalization", summary="path traversal", ), _action(action_type="submit_final_report"), ] medium_score, medium_rewards = _run_actions("medium", medium_actions) hard_score, hard_rewards = _run_actions("hard", hard_actions) assert medium_score >= 0.7 assert hard_score >= 0.7 assert all(0.0 <= r <= 1.0 for r in medium_rewards) assert all(0.0 <= r <= 1.0 for r in hard_rewards) def test_confidence_miscalibration_reduces_partial_progress_rewards() -> None: # Use line offsets that produce partial (not confirmed) matches so confidence # calibration impacts component score and therefore shaped reward. overconfident_actions = [ _action(action_type="inspect_file", filename="app/routes.py"), _action( action_type="submit_finding", filename="app/routes.py", line_start=13, vuln_type="sql_injection", severity="high", confidence=1.0, evidence="near miss with inflated confidence #1", summary="near miss #1", ), _action( action_type="submit_finding", filename="app/config.py", line_start=1, vuln_type="hardcoded_secret", severity="high", confidence=1.0, evidence="near miss with inflated confidence #2", summary="near miss #2", ), _action( action_type="submit_finding", filename="app/routes.py", line_start=20, vuln_type="weak_authentication", severity="medium", confidence=1.0, evidence="near miss with inflated confidence #3", summary="near miss #3", ), _action(action_type="submit_final_report"), ] calibrated_actions = [ _action(action_type="inspect_file", filename="app/routes.py"), _action( action_type="submit_finding", filename="app/routes.py", line_start=13, vuln_type="sql_injection", severity="high", confidence=0.8, evidence="near miss with calibrated confidence #1", summary="near miss #1", ), _action( action_type="submit_finding", filename="app/config.py", line_start=1, vuln_type="hardcoded_secret", severity="high", confidence=0.8, evidence="near miss with calibrated confidence #2", summary="near miss #2", ), _action( action_type="submit_finding", filename="app/routes.py", line_start=20, vuln_type="weak_authentication", severity="medium", confidence=0.65, evidence="near miss with calibrated confidence #3", summary="near miss #3", ), _action(action_type="submit_final_report"), ] overconf_score, overconf_rewards = _run_actions("easy", overconfident_actions) calibrated_score, calibrated_rewards = _run_actions("easy", calibrated_actions) assert sum(calibrated_rewards) > sum(overconf_rewards) assert calibrated_score >= overconf_score def test_step_limit_stalling_strategy_auto_finalizes_with_low_score() -> None: env = CodeSecurityAuditorEnvironment(default_task_id="easy") obs = env.reset(task_id="easy") # Repeatedly inspect the same non-critical pattern to simulate stalling. while not obs.done: obs = env.step(_action(action_type="inspect_file", filename="app/db.py")) assert obs.done is True assert 0.0 <= float(obs.reward or 0.0) <= 1.0 assert float(obs.reward or 0.0) < 0.5 assert "Max steps reached" in obs.last_feedback def test_repeated_duplicate_confirmed_findings_reduce_quality_multiplier() -> None: env = CodeSecurityAuditorEnvironment(default_task_id="easy") env.reset(task_id="easy") first = env.step( _action( action_type="submit_finding", filename="app/routes.py", line_start=8, vuln_type="sql_injection", severity="high", confidence=0.8, evidence="correct first finding", summary="correct first finding", ) ) qm_after_first = float(first.metadata["quality_multiplier"]) second = env.step( _action( action_type="submit_finding", filename="app/routes.py", line_start=8, vuln_type="sql_injection", severity="high", confidence=0.95, evidence="duplicate second", summary="duplicate second", ) ) qm_after_second = float(second.metadata["quality_multiplier"]) third = env.step( _action( action_type="submit_finding", filename="app/routes.py", line_start=8, vuln_type="sql_injection", severity="high", confidence=1.0, evidence="duplicate third", summary="duplicate third", ) ) qm_after_third = float(third.metadata["quality_multiplier"]) assert qm_after_second < qm_after_first assert qm_after_third < qm_after_second assert int(third.metadata["duplicate_submission_count"]) >= 2