open-range / tests /test_mutation_policy.py
Lars Talian
Make mutation policy weights explicit
228ed67
"""Tests for population-guided mutation selection policy."""
from __future__ import annotations
import asyncio
import json
import os
import random
import subprocess
import sys
from pathlib import Path
from types import SimpleNamespace
import pytest
from open_range.builder.mutation_policy import (
MutationPolicySettings,
PopulationMutationPolicy,
load_mutation_policy_settings,
)
from open_range.builder.snapshot_store import SnapshotStore
from open_range.protocols import BuildContext, MutationOp
def test_policy_selects_structural_and_security_when_both_available(sample_snapshot_spec):
policy = PopulationMutationPolicy()
structural = [
MutationOp(
mutation_id="add_service_web",
op_type="add_service",
target_selector={"host": "web"},
params={"service": "redis"},
)
]
security = [
MutationOp(
mutation_id="seed_sqli",
op_type="seed_vuln",
target_selector={"host": "web"},
params={"vuln_type": "sqli"},
),
MutationOp(
mutation_id="noise1",
op_type="add_benign_noise",
target_selector={"location": "siem:noise.log"},
params={"location": "siem:noise.log"},
),
]
ops, _score, _breakdown = policy.choose_mutations(
structural_candidates=structural,
security_candidates=security,
snapshot=sample_snapshot_spec,
context=BuildContext(seed=1, tier=1),
rng=random.Random(7),
)
op_types = {op.op_type for op in ops}
assert "add_service" in op_types
assert op_types.intersection({"seed_vuln", "add_benign_noise"})
def test_policy_best_effort_when_only_security_available(sample_snapshot_spec):
policy = PopulationMutationPolicy()
security = [
MutationOp(
mutation_id="seed_sqli",
op_type="seed_vuln",
target_selector={"host": "web"},
params={"vuln_type": "sqli"},
),
MutationOp(
mutation_id="noise1",
op_type="add_benign_noise",
target_selector={"location": "siem:noise.log"},
params={"location": "siem:noise.log"},
),
]
ops, _score, _breakdown = policy.choose_mutations(
structural_candidates=[],
security_candidates=security,
snapshot=sample_snapshot_spec,
context=BuildContext(seed=1, tier=1),
rng=random.Random(11),
)
assert len(ops) == 1
assert ops[0].op_type in {"seed_vuln", "add_benign_noise"}
def test_policy_best_effort_when_only_structural_available(sample_snapshot_spec):
policy = PopulationMutationPolicy()
structural = [
MutationOp(
mutation_id="add_trust_edge_1",
op_type="add_trust_edge",
target_selector={"source": "alice", "target": "bob"},
params={"type": "delegation"},
),
MutationOp(
mutation_id="add_dep_1",
op_type="add_dependency_edge",
target_selector={"source": "web", "target": "db"},
params={},
),
]
ops, _score, _breakdown = policy.choose_mutations(
structural_candidates=structural,
security_candidates=[],
snapshot=sample_snapshot_spec,
context=BuildContext(seed=1, tier=1),
rng=random.Random(21),
)
assert len(ops) == 1
assert ops[0].op_type in {"add_trust_edge", "add_dependency_edge"}
def test_load_policy_settings_from_yaml(tmp_path: Path):
settings_path = tmp_path / "policy.yaml"
settings_path.write_text(
"\n".join(
[
"profile_name: tuned_policy",
"parent:",
" frontier_weight: 0.5",
"mutation:",
" structural_gain_weight: 0.6",
]
),
encoding="utf-8",
)
settings = load_mutation_policy_settings(settings_path)
assert settings.profile_name == "tuned_policy"
assert settings.parent.frontier_weight == 0.5
assert settings.mutation.structural_gain_weight == 0.6
assert settings.structural_gains.add_service == 1.0
def test_parent_scores_expose_weighted_contributions(sample_snapshot_spec):
policy = PopulationMutationPolicy()
snapshot = sample_snapshot_spec.model_copy(deep=True)
snapshot.lineage.root_snapshot_id = "root_a"
entry = SimpleNamespace(snapshot_id="snap_a", snapshot=snapshot)
score = policy.score_parents(
[entry],
context=BuildContext(seed=1, tier=1, weak_areas=["sqli"]),
snapshot_stats={
"snap_a": {
"plays": 2,
"plays_recent": 1,
"red_solve_rate": 0.5,
"blue_detect_rate": 0.25,
}
},
)[0]
assert score.weights["frontier"] == pytest.approx(
policy.settings.parent.frontier_weight
)
assert score.contributions["frontier"] == pytest.approx(
score.signals["frontier"] * score.weights["frontier"],
rel=1e-3,
)
assert score.total == pytest.approx(sum(score.contributions.values()), rel=1e-3)
def test_custom_settings_change_candidate_ranking(sample_snapshot_spec):
settings = MutationPolicySettings(
profile_name="structural_gain_only",
mutation={
"curriculum_weight": 0.0,
"novelty_weight": 0.0,
"structural_gain_weight": 1.0,
"lineage_weight": 0.0,
},
structural_gains={
"add_service": 0.2,
"add_dependency_edge": 0.2,
"add_trust_edge": 0.2,
"add_user": 0.2,
"seed_vuln": 0.1,
"add_benign_noise": 2.5,
"default_gain": 0.0,
},
)
policy = PopulationMutationPolicy(settings=settings)
ranked = policy._rank_candidates(
[
MutationOp(
mutation_id="seed_sqli",
op_type="seed_vuln",
target_selector={"host": "web"},
params={"vuln_type": "sqli"},
),
MutationOp(
mutation_id="noise_1",
op_type="add_benign_noise",
target_selector={"location": "siem:noise.log"},
params={"location": "siem:noise.log"},
),
],
snapshot=sample_snapshot_spec,
context=BuildContext(seed=1, tier=1),
)
assert ranked[0].op.op_type == "add_benign_noise"
assert ranked[0].contributions["structural_gain"] == pytest.approx(
ranked[0].total,
rel=1e-3,
)
def test_calibration_script_compares_default_and_custom_settings(
tmp_path: Path,
sample_snapshot_spec,
):
store_dir = tmp_path / "snapshots"
asyncio.run(SnapshotStore(str(store_dir)).store(sample_snapshot_spec, "snap_demo"))
stats_path = tmp_path / "snapshot_stats.json"
stats_path.write_text(
json.dumps(
{
"snap_demo": {
"plays": 3,
"plays_recent": 1,
"red_solve_rate": 0.5,
"blue_detect_rate": 0.0,
}
}
),
encoding="utf-8",
)
context_path = tmp_path / "context.json"
context_path.write_text(
BuildContext(seed=7, tier=2, weak_areas=["sqli"]).model_dump_json(indent=2),
encoding="utf-8",
)
settings_path = tmp_path / "tuned.json"
settings_path.write_text(
MutationPolicySettings(
profile_name="tuned",
parent={"frontier_weight": 0.5},
).model_dump_json(indent=2),
encoding="utf-8",
)
result = subprocess.run(
[
sys.executable,
"scripts/calibrate_mutation_policy.py",
"--store-dir",
str(store_dir),
"--stats",
str(stats_path),
"--context",
str(context_path),
"--settings",
f"tuned={settings_path}",
],
capture_output=True,
check=False,
cwd=Path(__file__).resolve().parents[1],
env={**os.environ, "PYTHONPATH": "src"},
text=True,
)
assert result.returncode == 0, result.stderr
payload = json.loads(result.stdout)
assert payload["snapshot_count"] == 1
assert [policy["label"] for policy in payload["policies"]] == ["default", "tuned"]
assert payload["policies"][0]["top_parents"][0]["snapshot_id"] == "snap_demo"