File size: 5,593 Bytes
36a2dfc | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 | import pytest
from osint_core.policy import (
ALLOWED_CORRECTION_VERBS,
PolicyDecision,
PolicyErrorCode,
PolicyViolationException,
allowed_ui_modules,
canonicalize_module_name,
enforce_audit_payload,
enforce_correction_verb,
enforce_policy_mutation_gate,
evaluate_modules,
get_module_policy,
may_mutate_policy,
module_catalog,
)
def test_canonicalize_module_aliases():
assert canonicalize_module_name("Resource Links") == "resource_links"
assert canonicalize_module_name("robots.txt") == "robots_txt"
assert canonicalize_module_name("nmap") == "port_scan"
def test_low_risk_modules_allowed_in_passive_mode():
result = evaluate_modules(
["Resource Links", "DNS Records", "Local URL Parse"],
authorized_target=False,
passive_only=True,
)
assert result.decision == PolicyDecision.ALLOW
assert result.allowed_modules == [
"resource_links",
"dns_records",
"local_url_parse",
]
assert result.blocked_modules == []
assert result.violations == []
def test_conditional_module_blocked_without_authorization():
result = evaluate_modules(
["HTTP Headers"],
authorized_target=False,
passive_only=False,
)
assert result.decision == PolicyDecision.CONSTRAIN
assert result.allowed_modules == []
assert result.blocked_modules == ["http_headers"]
assert result.violations[0].code == PolicyErrorCode.AUTHORIZATION_REQUIRED
def test_conditional_module_blocked_in_passive_only_even_with_authorization():
result = evaluate_modules(
["HTTP Headers"],
authorized_target=True,
passive_only=True,
)
assert result.decision == PolicyDecision.CONSTRAIN
assert result.allowed_modules == []
assert result.blocked_modules == ["http_headers"]
assert result.violations[0].code == PolicyErrorCode.AUTHORIZATION_REQUIRED
def test_conditional_module_allowed_when_authorized_and_not_passive_only():
result = evaluate_modules(
["HTTP Headers", "Robots.txt"],
authorized_target=True,
passive_only=False,
)
assert result.decision == PolicyDecision.ALLOW
assert result.allowed_modules == ["http_headers", "robots_txt"]
assert result.blocked_modules == []
def test_forbidden_module_is_always_blocked():
result = evaluate_modules(
["Resource Links", "nmap", "Credential Testing"],
authorized_target=True,
passive_only=False,
)
assert result.decision == PolicyDecision.CONSTRAIN
assert "resource_links" in result.allowed_modules
assert "port_scan" in result.blocked_modules
assert "credential_testing" in result.blocked_modules
assert {v.code for v in result.violations} == {PolicyErrorCode.FORBIDDEN_MODULE}
def test_unknown_module_blocked_by_default():
result = evaluate_modules(["Unknown Thing"])
assert result.decision == PolicyDecision.CONSTRAIN
assert result.allowed_modules == []
assert result.blocked_modules == ["unknown_thing"]
assert result.violations[0].code == PolicyErrorCode.UNKNOWN_MODULE
def test_unknown_module_can_be_allowed_only_when_explicitly_enabled():
result = evaluate_modules(["Experimental"], allow_unknown_modules=True)
assert result.decision == PolicyDecision.ALLOW
assert result.allowed_modules == ["experimental"]
assert result.blocked_modules == []
def test_correction_verbs_are_closed():
assert set(ALLOWED_CORRECTION_VERBS) == {"ADAPT", "CONSTRAIN", "REVERT", "OBSERVE"}
assert enforce_correction_verb("adapt") == "ADAPT"
assert enforce_correction_verb(" OBSERVE ") == "OBSERVE"
with pytest.raises(PolicyViolationException) as exc:
enforce_correction_verb("EXPAND")
assert exc.value.violation.code == PolicyErrorCode.INVALID_CORRECTION_VERB
def test_policy_cannot_mutate_without_out_of_band_approval():
assert may_mutate_policy(out_of_band_approval=False) is False
assert may_mutate_policy(out_of_band_approval=True) is True
with pytest.raises(PolicyViolationException) as exc:
enforce_policy_mutation_gate(out_of_band_approval=False)
assert exc.value.violation.code == PolicyErrorCode.POLICY_MUTATION_BLOCKED
enforce_policy_mutation_gate(out_of_band_approval=True)
def test_audit_payload_blocks_raw_indicator_fields():
safe_payload = {
"run_id": "run_123",
"indicator_hash": "abc123",
"modules": ["resource_links"],
}
enforce_audit_payload(safe_payload)
with pytest.raises(PolicyViolationException) as exc:
enforce_audit_payload({"raw_indicator": "example.com", "indicator_hash": "abc"})
assert exc.value.violation.code == PolicyErrorCode.RAW_LOGGING_BLOCKED
def test_catalog_and_ui_modules_exclude_forbidden_by_default():
catalog = module_catalog()
names = {item["canonical_name"] for item in catalog}
assert "resource_links" in names
assert "port_scan" in names
ui_modules = allowed_ui_modules()
assert "Resource Links" in ui_modules
assert "HTTP Headers" in ui_modules
assert "Port Scan" not in ui_modules
assert "Credential Testing" not in ui_modules
passive_ui_modules = allowed_ui_modules(include_conditional=False)
assert "Resource Links" in passive_ui_modules
assert "HTTP Headers" not in passive_ui_modules
def test_get_module_policy_returns_registered_policy():
policy = get_module_policy("Resource Links")
assert policy is not None
assert policy.canonical_name == "resource_links"
assert policy.risk == "low"
|