File size: 5,593 Bytes
36a2dfc
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
import pytest

from osint_core.policy import (
    ALLOWED_CORRECTION_VERBS,
    PolicyDecision,
    PolicyErrorCode,
    PolicyViolationException,
    allowed_ui_modules,
    canonicalize_module_name,
    enforce_audit_payload,
    enforce_correction_verb,
    enforce_policy_mutation_gate,
    evaluate_modules,
    get_module_policy,
    may_mutate_policy,
    module_catalog,
)


def test_canonicalize_module_aliases():
    assert canonicalize_module_name("Resource Links") == "resource_links"
    assert canonicalize_module_name("robots.txt") == "robots_txt"
    assert canonicalize_module_name("nmap") == "port_scan"


def test_low_risk_modules_allowed_in_passive_mode():
    result = evaluate_modules(
        ["Resource Links", "DNS Records", "Local URL Parse"],
        authorized_target=False,
        passive_only=True,
    )

    assert result.decision == PolicyDecision.ALLOW
    assert result.allowed_modules == [
        "resource_links",
        "dns_records",
        "local_url_parse",
    ]
    assert result.blocked_modules == []
    assert result.violations == []


def test_conditional_module_blocked_without_authorization():
    result = evaluate_modules(
        ["HTTP Headers"],
        authorized_target=False,
        passive_only=False,
    )

    assert result.decision == PolicyDecision.CONSTRAIN
    assert result.allowed_modules == []
    assert result.blocked_modules == ["http_headers"]
    assert result.violations[0].code == PolicyErrorCode.AUTHORIZATION_REQUIRED


def test_conditional_module_blocked_in_passive_only_even_with_authorization():
    result = evaluate_modules(
        ["HTTP Headers"],
        authorized_target=True,
        passive_only=True,
    )

    assert result.decision == PolicyDecision.CONSTRAIN
    assert result.allowed_modules == []
    assert result.blocked_modules == ["http_headers"]
    assert result.violations[0].code == PolicyErrorCode.AUTHORIZATION_REQUIRED


def test_conditional_module_allowed_when_authorized_and_not_passive_only():
    result = evaluate_modules(
        ["HTTP Headers", "Robots.txt"],
        authorized_target=True,
        passive_only=False,
    )

    assert result.decision == PolicyDecision.ALLOW
    assert result.allowed_modules == ["http_headers", "robots_txt"]
    assert result.blocked_modules == []


def test_forbidden_module_is_always_blocked():
    result = evaluate_modules(
        ["Resource Links", "nmap", "Credential Testing"],
        authorized_target=True,
        passive_only=False,
    )

    assert result.decision == PolicyDecision.CONSTRAIN
    assert "resource_links" in result.allowed_modules
    assert "port_scan" in result.blocked_modules
    assert "credential_testing" in result.blocked_modules
    assert {v.code for v in result.violations} == {PolicyErrorCode.FORBIDDEN_MODULE}


def test_unknown_module_blocked_by_default():
    result = evaluate_modules(["Unknown Thing"])

    assert result.decision == PolicyDecision.CONSTRAIN
    assert result.allowed_modules == []
    assert result.blocked_modules == ["unknown_thing"]
    assert result.violations[0].code == PolicyErrorCode.UNKNOWN_MODULE


def test_unknown_module_can_be_allowed_only_when_explicitly_enabled():
    result = evaluate_modules(["Experimental"], allow_unknown_modules=True)

    assert result.decision == PolicyDecision.ALLOW
    assert result.allowed_modules == ["experimental"]
    assert result.blocked_modules == []


def test_correction_verbs_are_closed():
    assert set(ALLOWED_CORRECTION_VERBS) == {"ADAPT", "CONSTRAIN", "REVERT", "OBSERVE"}
    assert enforce_correction_verb("adapt") == "ADAPT"
    assert enforce_correction_verb(" OBSERVE ") == "OBSERVE"

    with pytest.raises(PolicyViolationException) as exc:
        enforce_correction_verb("EXPAND")

    assert exc.value.violation.code == PolicyErrorCode.INVALID_CORRECTION_VERB


def test_policy_cannot_mutate_without_out_of_band_approval():
    assert may_mutate_policy(out_of_band_approval=False) is False
    assert may_mutate_policy(out_of_band_approval=True) is True

    with pytest.raises(PolicyViolationException) as exc:
        enforce_policy_mutation_gate(out_of_band_approval=False)

    assert exc.value.violation.code == PolicyErrorCode.POLICY_MUTATION_BLOCKED

    enforce_policy_mutation_gate(out_of_band_approval=True)


def test_audit_payload_blocks_raw_indicator_fields():
    safe_payload = {
        "run_id": "run_123",
        "indicator_hash": "abc123",
        "modules": ["resource_links"],
    }
    enforce_audit_payload(safe_payload)

    with pytest.raises(PolicyViolationException) as exc:
        enforce_audit_payload({"raw_indicator": "example.com", "indicator_hash": "abc"})

    assert exc.value.violation.code == PolicyErrorCode.RAW_LOGGING_BLOCKED


def test_catalog_and_ui_modules_exclude_forbidden_by_default():
    catalog = module_catalog()
    names = {item["canonical_name"] for item in catalog}
    assert "resource_links" in names
    assert "port_scan" in names

    ui_modules = allowed_ui_modules()
    assert "Resource Links" in ui_modules
    assert "HTTP Headers" in ui_modules
    assert "Port Scan" not in ui_modules
    assert "Credential Testing" not in ui_modules

    passive_ui_modules = allowed_ui_modules(include_conditional=False)
    assert "Resource Links" in passive_ui_modules
    assert "HTTP Headers" not in passive_ui_modules


def test_get_module_policy_returns_registered_policy():
    policy = get_module_policy("Resource Links")
    assert policy is not None
    assert policy.canonical_name == "resource_links"
    assert policy.risk == "low"