| import pytest |
|
|
| from osint_core.policy import ( |
| ALLOWED_CORRECTION_VERBS, |
| PolicyDecision, |
| PolicyErrorCode, |
| PolicyViolationException, |
| allowed_ui_modules, |
| canonicalize_module_name, |
| enforce_audit_payload, |
| enforce_correction_verb, |
| enforce_policy_mutation_gate, |
| evaluate_modules, |
| get_module_policy, |
| may_mutate_policy, |
| module_catalog, |
| ) |
|
|
|
|
| def test_canonicalize_module_aliases(): |
| assert canonicalize_module_name("Resource Links") == "resource_links" |
| assert canonicalize_module_name("robots.txt") == "robots_txt" |
| assert canonicalize_module_name("nmap") == "port_scan" |
|
|
|
|
| def test_low_risk_modules_allowed_in_passive_mode(): |
| result = evaluate_modules( |
| ["Resource Links", "DNS Records", "Local URL Parse"], |
| authorized_target=False, |
| passive_only=True, |
| ) |
|
|
| assert result.decision == PolicyDecision.ALLOW |
| assert result.allowed_modules == [ |
| "resource_links", |
| "dns_records", |
| "local_url_parse", |
| ] |
| assert result.blocked_modules == [] |
| assert result.violations == [] |
|
|
|
|
| def test_conditional_module_blocked_without_authorization(): |
| result = evaluate_modules( |
| ["HTTP Headers"], |
| authorized_target=False, |
| passive_only=False, |
| ) |
|
|
| assert result.decision == PolicyDecision.CONSTRAIN |
| assert result.allowed_modules == [] |
| assert result.blocked_modules == ["http_headers"] |
| assert result.violations[0].code == PolicyErrorCode.AUTHORIZATION_REQUIRED |
|
|
|
|
| def test_conditional_module_blocked_in_passive_only_even_with_authorization(): |
| result = evaluate_modules( |
| ["HTTP Headers"], |
| authorized_target=True, |
| passive_only=True, |
| ) |
|
|
| assert result.decision == PolicyDecision.CONSTRAIN |
| assert result.allowed_modules == [] |
| assert result.blocked_modules == ["http_headers"] |
| assert result.violations[0].code == PolicyErrorCode.AUTHORIZATION_REQUIRED |
|
|
|
|
| def test_conditional_module_allowed_when_authorized_and_not_passive_only(): |
| result = evaluate_modules( |
| ["HTTP Headers", "Robots.txt"], |
| authorized_target=True, |
| passive_only=False, |
| ) |
|
|
| assert result.decision == PolicyDecision.ALLOW |
| assert result.allowed_modules == ["http_headers", "robots_txt"] |
| assert result.blocked_modules == [] |
|
|
|
|
| def test_forbidden_module_is_always_blocked(): |
| result = evaluate_modules( |
| ["Resource Links", "nmap", "Credential Testing"], |
| authorized_target=True, |
| passive_only=False, |
| ) |
|
|
| assert result.decision == PolicyDecision.CONSTRAIN |
| assert "resource_links" in result.allowed_modules |
| assert "port_scan" in result.blocked_modules |
| assert "credential_testing" in result.blocked_modules |
| assert {v.code for v in result.violations} == {PolicyErrorCode.FORBIDDEN_MODULE} |
|
|
|
|
| def test_unknown_module_blocked_by_default(): |
| result = evaluate_modules(["Unknown Thing"]) |
|
|
| assert result.decision == PolicyDecision.CONSTRAIN |
| assert result.allowed_modules == [] |
| assert result.blocked_modules == ["unknown_thing"] |
| assert result.violations[0].code == PolicyErrorCode.UNKNOWN_MODULE |
|
|
|
|
| def test_unknown_module_can_be_allowed_only_when_explicitly_enabled(): |
| result = evaluate_modules(["Experimental"], allow_unknown_modules=True) |
|
|
| assert result.decision == PolicyDecision.ALLOW |
| assert result.allowed_modules == ["experimental"] |
| assert result.blocked_modules == [] |
|
|
|
|
| def test_correction_verbs_are_closed(): |
| assert set(ALLOWED_CORRECTION_VERBS) == {"ADAPT", "CONSTRAIN", "REVERT", "OBSERVE"} |
| assert enforce_correction_verb("adapt") == "ADAPT" |
| assert enforce_correction_verb(" OBSERVE ") == "OBSERVE" |
|
|
| with pytest.raises(PolicyViolationException) as exc: |
| enforce_correction_verb("EXPAND") |
|
|
| assert exc.value.violation.code == PolicyErrorCode.INVALID_CORRECTION_VERB |
|
|
|
|
| def test_policy_cannot_mutate_without_out_of_band_approval(): |
| assert may_mutate_policy(out_of_band_approval=False) is False |
| assert may_mutate_policy(out_of_band_approval=True) is True |
|
|
| with pytest.raises(PolicyViolationException) as exc: |
| enforce_policy_mutation_gate(out_of_band_approval=False) |
|
|
| assert exc.value.violation.code == PolicyErrorCode.POLICY_MUTATION_BLOCKED |
|
|
| enforce_policy_mutation_gate(out_of_band_approval=True) |
|
|
|
|
| def test_audit_payload_blocks_raw_indicator_fields(): |
| safe_payload = { |
| "run_id": "run_123", |
| "indicator_hash": "abc123", |
| "modules": ["resource_links"], |
| } |
| enforce_audit_payload(safe_payload) |
|
|
| with pytest.raises(PolicyViolationException) as exc: |
| enforce_audit_payload({"raw_indicator": "example.com", "indicator_hash": "abc"}) |
|
|
| assert exc.value.violation.code == PolicyErrorCode.RAW_LOGGING_BLOCKED |
|
|
|
|
| def test_catalog_and_ui_modules_exclude_forbidden_by_default(): |
| catalog = module_catalog() |
| names = {item["canonical_name"] for item in catalog} |
| assert "resource_links" in names |
| assert "port_scan" in names |
|
|
| ui_modules = allowed_ui_modules() |
| assert "Resource Links" in ui_modules |
| assert "HTTP Headers" in ui_modules |
| assert "Port Scan" not in ui_modules |
| assert "Credential Testing" not in ui_modules |
|
|
| passive_ui_modules = allowed_ui_modules(include_conditional=False) |
| assert "Resource Links" in passive_ui_modules |
| assert "HTTP Headers" not in passive_ui_modules |
|
|
|
|
| def test_get_module_policy_returns_registered_policy(): |
| policy = get_module_policy("Resource Links") |
| assert policy is not None |
| assert policy.canonical_name == "resource_links" |
| assert policy.risk == "low" |
|
|