"""Tests for population-guided mutation selection policy.""" from __future__ import annotations import asyncio import json import os import random import subprocess import sys from pathlib import Path from types import SimpleNamespace import pytest from open_range.builder.mutation_policy import ( MutationPolicySettings, PopulationMutationPolicy, load_mutation_policy_settings, ) from open_range.builder.snapshot_store import SnapshotStore from open_range.protocols import BuildContext, MutationOp def test_policy_selects_structural_and_security_when_both_available(sample_snapshot_spec): policy = PopulationMutationPolicy() structural = [ MutationOp( mutation_id="add_service_web", op_type="add_service", target_selector={"host": "web"}, params={"service": "redis"}, ) ] security = [ MutationOp( mutation_id="seed_sqli", op_type="seed_vuln", target_selector={"host": "web"}, params={"vuln_type": "sqli"}, ), MutationOp( mutation_id="noise1", op_type="add_benign_noise", target_selector={"location": "siem:noise.log"}, params={"location": "siem:noise.log"}, ), ] ops, _score, _breakdown = policy.choose_mutations( structural_candidates=structural, security_candidates=security, snapshot=sample_snapshot_spec, context=BuildContext(seed=1, tier=1), rng=random.Random(7), ) op_types = {op.op_type for op in ops} assert "add_service" in op_types assert op_types.intersection({"seed_vuln", "add_benign_noise"}) def test_policy_best_effort_when_only_security_available(sample_snapshot_spec): policy = PopulationMutationPolicy() security = [ MutationOp( mutation_id="seed_sqli", op_type="seed_vuln", target_selector={"host": "web"}, params={"vuln_type": "sqli"}, ), MutationOp( mutation_id="noise1", op_type="add_benign_noise", target_selector={"location": "siem:noise.log"}, params={"location": "siem:noise.log"}, ), ] ops, _score, _breakdown = policy.choose_mutations( structural_candidates=[], security_candidates=security, snapshot=sample_snapshot_spec, context=BuildContext(seed=1, tier=1), rng=random.Random(11), ) assert len(ops) == 1 assert ops[0].op_type in {"seed_vuln", "add_benign_noise"} def test_policy_best_effort_when_only_structural_available(sample_snapshot_spec): policy = PopulationMutationPolicy() structural = [ MutationOp( mutation_id="add_trust_edge_1", op_type="add_trust_edge", target_selector={"source": "alice", "target": "bob"}, params={"type": "delegation"}, ), MutationOp( mutation_id="add_dep_1", op_type="add_dependency_edge", target_selector={"source": "web", "target": "db"}, params={}, ), ] ops, _score, _breakdown = policy.choose_mutations( structural_candidates=structural, security_candidates=[], snapshot=sample_snapshot_spec, context=BuildContext(seed=1, tier=1), rng=random.Random(21), ) assert len(ops) == 1 assert ops[0].op_type in {"add_trust_edge", "add_dependency_edge"} def test_load_policy_settings_from_yaml(tmp_path: Path): settings_path = tmp_path / "policy.yaml" settings_path.write_text( "\n".join( [ "profile_name: tuned_policy", "parent:", " frontier_weight: 0.5", "mutation:", " structural_gain_weight: 0.6", ] ), encoding="utf-8", ) settings = load_mutation_policy_settings(settings_path) assert settings.profile_name == "tuned_policy" assert settings.parent.frontier_weight == 0.5 assert settings.mutation.structural_gain_weight == 0.6 assert settings.structural_gains.add_service == 1.0 def test_parent_scores_expose_weighted_contributions(sample_snapshot_spec): policy = PopulationMutationPolicy() snapshot = sample_snapshot_spec.model_copy(deep=True) snapshot.lineage.root_snapshot_id = "root_a" entry = SimpleNamespace(snapshot_id="snap_a", snapshot=snapshot) score = policy.score_parents( [entry], context=BuildContext(seed=1, tier=1, weak_areas=["sqli"]), snapshot_stats={ "snap_a": { "plays": 2, "plays_recent": 1, "red_solve_rate": 0.5, "blue_detect_rate": 0.25, } }, )[0] assert score.weights["frontier"] == pytest.approx( policy.settings.parent.frontier_weight ) assert score.contributions["frontier"] == pytest.approx( score.signals["frontier"] * score.weights["frontier"], rel=1e-3, ) assert score.total == pytest.approx(sum(score.contributions.values()), rel=1e-3) def test_custom_settings_change_candidate_ranking(sample_snapshot_spec): settings = MutationPolicySettings( profile_name="structural_gain_only", mutation={ "curriculum_weight": 0.0, "novelty_weight": 0.0, "structural_gain_weight": 1.0, "lineage_weight": 0.0, }, structural_gains={ "add_service": 0.2, "add_dependency_edge": 0.2, "add_trust_edge": 0.2, "add_user": 0.2, "seed_vuln": 0.1, "add_benign_noise": 2.5, "default_gain": 0.0, }, ) policy = PopulationMutationPolicy(settings=settings) ranked = policy._rank_candidates( [ MutationOp( mutation_id="seed_sqli", op_type="seed_vuln", target_selector={"host": "web"}, params={"vuln_type": "sqli"}, ), MutationOp( mutation_id="noise_1", op_type="add_benign_noise", target_selector={"location": "siem:noise.log"}, params={"location": "siem:noise.log"}, ), ], snapshot=sample_snapshot_spec, context=BuildContext(seed=1, tier=1), ) assert ranked[0].op.op_type == "add_benign_noise" assert ranked[0].contributions["structural_gain"] == pytest.approx( ranked[0].total, rel=1e-3, ) def test_calibration_script_compares_default_and_custom_settings( tmp_path: Path, sample_snapshot_spec, ): store_dir = tmp_path / "snapshots" asyncio.run(SnapshotStore(str(store_dir)).store(sample_snapshot_spec, "snap_demo")) stats_path = tmp_path / "snapshot_stats.json" stats_path.write_text( json.dumps( { "snap_demo": { "plays": 3, "plays_recent": 1, "red_solve_rate": 0.5, "blue_detect_rate": 0.0, } } ), encoding="utf-8", ) context_path = tmp_path / "context.json" context_path.write_text( BuildContext(seed=7, tier=2, weak_areas=["sqli"]).model_dump_json(indent=2), encoding="utf-8", ) settings_path = tmp_path / "tuned.json" settings_path.write_text( MutationPolicySettings( profile_name="tuned", parent={"frontier_weight": 0.5}, ).model_dump_json(indent=2), encoding="utf-8", ) result = subprocess.run( [ sys.executable, "scripts/calibrate_mutation_policy.py", "--store-dir", str(store_dir), "--stats", str(stats_path), "--context", str(context_path), "--settings", f"tuned={settings_path}", ], capture_output=True, check=False, cwd=Path(__file__).resolve().parents[1], env={**os.environ, "PYTHONPATH": "src"}, text=True, ) assert result.returncode == 0, result.stderr payload = json.loads(result.stdout) assert payload["snapshot_count"] == 1 assert [policy["label"] for policy in payload["policies"]] == ["default", "tuned"] assert payload["policies"][0]["top_parents"][0]["snapshot_id"] == "snap_demo"