| """ |
| tests/test_intent.py |
| ==================== |
| |
| Contract tests for osint_core.intent. |
| |
| Core invariants: |
| - Intent packets are immutable. |
| - Intent packets do not store raw indicators. |
| - Scope boundaries are explicit and validated. |
| - Forbidden operations cannot appear in allowed operations. |
| - Packets can be signed and verified. |
| - Signature tampering is detected. |
| - Risk and rollback helpers are deterministic. |
| """ |
|
|
| from __future__ import annotations |
|
|
| from dataclasses import FrozenInstanceError, replace |
|
|
| import pytest |
|
|
| from osint_core.intent import ( |
| DEFAULT_FORBIDDEN_OPERATIONS, |
| IntentErrorCode, |
| IntentPacket, |
| IntentValidationError, |
| canonical_json, |
| create_intent_packet, |
| default_rollback_for_risk, |
| derive_risk_label, |
| find_raw_indicator_fields, |
| hash_manifest_payload, |
| intent_fingerprint, |
| make_scope, |
| risk_score, |
| sign_payload, |
| unsigned_intent_fingerprint, |
| validate_intent, |
| validate_scope, |
| verify_intent_signature, |
| ) |
|
|
|
|
| TEST_SECRET = "test-intent-signing-secret" |
| TARGET_HASH = "a" * 64 |
| MANIFEST_HASH = "b" * 64 |
|
|
|
|
| def make_valid_scope(**overrides): |
| data = { |
| "target_hash": TARGET_HASH, |
| "indicator_type": "domain", |
| "allowed_operations": ["resource_links"], |
| "success_criteria": ["links_generated"], |
| } |
| data.update(overrides) |
| return make_scope(**data) |
|
|
|
|
| def make_valid_packet(**overrides): |
| scope = overrides.pop("scope", make_valid_scope()) |
| data = { |
| "action": "enrich_indicator", |
| "purpose": "Generate passive OSINT source links for a validated indicator.", |
| "scope": scope, |
| "requested_modules": ["resource_links"], |
| "expected_side_effects": ["report_created", "audit_event_created"], |
| "rollback_strategy": "observe_only", |
| "risk_label": "low", |
| "manifest_hash": MANIFEST_HASH, |
| "signing_secret": TEST_SECRET, |
| } |
| data.update(overrides) |
| return create_intent_packet(**data) |
|
|
|
|
| def test_make_scope_adds_default_forbidden_operations(): |
| scope = make_valid_scope() |
| for operation in DEFAULT_FORBIDDEN_OPERATIONS: |
| assert operation in scope.forbidden_operations |
| assert scope.target_hash == TARGET_HASH |
| assert scope.indicator_type == "domain" |
| assert scope.allowed_operations == ("resource_links",) |
|
|
|
|
| def test_scope_rejects_missing_target_hash(): |
| result = validate_scope(make_valid_scope(target_hash="c" * 64)) |
| assert result.ok is True |
| with pytest.raises(IntentValidationError) as exc: |
| make_valid_scope(target_hash="") |
| assert exc.value.code == IntentErrorCode.MISSING_FIELD |
|
|
|
|
| def test_scope_rejects_non_hash_target_identity(): |
| with pytest.raises(IntentValidationError) as exc: |
| make_valid_scope(target_hash="example.com") |
| assert exc.value.code == IntentErrorCode.INVALID_SCOPE |
|
|
|
|
| def test_scope_rejects_empty_allowed_operations(): |
| with pytest.raises(IntentValidationError) as exc: |
| make_valid_scope(allowed_operations=[]) |
| assert exc.value.code == IntentErrorCode.MISSING_FIELD |
|
|
|
|
| def test_scope_rejects_forbidden_operation_overlap(): |
| with pytest.raises(IntentValidationError) as exc: |
| make_valid_scope(allowed_operations=["resource_links", "port_scan"]) |
| assert exc.value.code == IntentErrorCode.FORBIDDEN_OPERATION_REQUESTED |
|
|
|
|
| def test_scope_rejects_invalid_time_horizon(): |
| with pytest.raises(IntentValidationError) as exc: |
| make_valid_scope(time_horizon_seconds=0) |
| assert exc.value.code == IntentErrorCode.INVALID_SCOPE |
|
|
| with pytest.raises(IntentValidationError) as exc: |
| make_valid_scope(time_horizon_seconds=90_000) |
| assert exc.value.code == IntentErrorCode.INVALID_SCOPE |
|
|
|
|
| def test_create_intent_packet_signs_and_verifies(): |
| packet = make_valid_packet() |
| assert isinstance(packet, IntentPacket) |
| assert packet.signature is not None |
| assert verify_intent_signature(packet, secret=TEST_SECRET) is True |
|
|
|
|
| def test_intent_packet_is_immutable(): |
| packet = make_valid_packet() |
| with pytest.raises(FrozenInstanceError): |
| packet.purpose = "mutated" |
|
|
|
|
| def test_unsigned_payload_excludes_signature(): |
| packet = make_valid_packet() |
| payload = packet.unsigned_payload() |
| assert "signature" not in payload |
| assert packet.signature is not None |
|
|
|
|
| def test_signature_tampering_is_detected(): |
| packet = make_valid_packet() |
| tampered = replace(packet, purpose="Changed purpose after signing.") |
| with pytest.raises(IntentValidationError) as exc: |
| verify_intent_signature(tampered, secret=TEST_SECRET) |
| assert exc.value.code == IntentErrorCode.SIGNATURE_MISMATCH |
|
|
|
|
| def test_unsigned_packet_fails_verification(): |
| packet = create_intent_packet( |
| action="enrich_indicator", |
| purpose="Generate passive links.", |
| scope=make_valid_scope(), |
| requested_modules=["resource_links"], |
| expected_side_effects=["report_created"], |
| rollback_strategy="observe_only", |
| risk_label="low", |
| manifest_hash=MANIFEST_HASH, |
| sign=False, |
| ) |
| assert packet.signature is None |
| with pytest.raises(IntentValidationError) as exc: |
| verify_intent_signature(packet, secret=TEST_SECRET) |
| assert exc.value.code == IntentErrorCode.UNSIGNED_PACKET |
|
|
|
|
| def test_packet_rejects_invalid_action(): |
| with pytest.raises(IntentValidationError) as exc: |
| make_valid_packet(action="delete_everything") |
| assert exc.value.code == IntentErrorCode.INVALID_ACTION |
|
|
|
|
| def test_packet_rejects_invalid_risk_label(): |
| with pytest.raises(IntentValidationError) as exc: |
| make_valid_packet(risk_label="extreme") |
| assert exc.value.code == IntentErrorCode.INVALID_RISK |
|
|
|
|
| def test_packet_rejects_invalid_rollback_strategy(): |
| with pytest.raises(IntentValidationError) as exc: |
| make_valid_packet(rollback_strategy="YOLO") |
| assert exc.value.code == IntentErrorCode.INVALID_ROLLBACK |
|
|
|
|
| def test_packet_rejects_invalid_manifest_hash(): |
| with pytest.raises(IntentValidationError) as exc: |
| make_valid_packet(manifest_hash="not-a-hash") |
| assert exc.value.code == IntentErrorCode.MISSING_FIELD |
|
|
|
|
| def test_packet_rejects_empty_purpose(): |
| with pytest.raises(IntentValidationError) as exc: |
| make_valid_packet(purpose=" ") |
| assert exc.value.code == IntentErrorCode.MISSING_FIELD |
|
|
|
|
| def test_raw_indicator_field_detection(): |
| payload = { |
| "safe": {"target_hash": TARGET_HASH}, |
| "unsafe": { |
| "raw_indicator": "example.com", |
| "nested": {"email": "user@example.com"}, |
| }, |
| } |
| findings = find_raw_indicator_fields(payload) |
| assert "unsafe.raw_indicator" in findings |
| assert "unsafe.nested.email" in findings |
|
|
|
|
| def test_validate_intent_rejects_raw_indicator_like_fields(): |
| packet = make_valid_packet() |
| unsafe_dict = packet.to_dict() |
| unsafe_dict["raw_indicator"] = "example.com" |
| findings = find_raw_indicator_fields(unsafe_dict) |
| assert "raw_indicator" in findings |
|
|
|
|
| def test_canonical_json_is_deterministic(): |
| assert canonical_json({"b": 2, "a": 1}) == canonical_json({"a": 1, "b": 2}) |
|
|
|
|
| def test_sign_payload_is_deterministic_for_same_payload_and_secret(): |
| payload = {"a": 1, "b": 2} |
| assert sign_payload(payload, TEST_SECRET) == sign_payload(payload, TEST_SECRET) |
| assert sign_payload(payload, TEST_SECRET) != sign_payload(payload, "different-secret") |
|
|
|
|
| def test_hash_manifest_payload_is_stable(): |
| payload = {"artifact": "test", "version": "1.0.0"} |
| assert hash_manifest_payload(payload) == hash_manifest_payload(payload) |
| assert len(hash_manifest_payload(payload)) == 64 |
|
|
|
|
| def test_intent_fingerprints_are_stable_and_distinct(): |
| packet = make_valid_packet() |
| signed_fp = intent_fingerprint(packet) |
| unsigned_fp = unsigned_intent_fingerprint(packet) |
| assert len(signed_fp) == 64 |
| assert len(unsigned_fp) == 64 |
| assert signed_fp != unsigned_fp |
|
|
|
|
| def test_validate_intent_accepts_valid_packet(): |
| result = validate_intent(make_valid_packet()) |
| assert result.ok is True |
| assert result.errors == () |
| assert result.error_codes == () |
|
|
|
|
| def test_risk_score_mapping(): |
| assert risk_score("low") == 0.25 |
| assert risk_score("medium") == 0.5 |
| assert risk_score("high") == 0.75 |
| assert risk_score("critical") == 1.0 |
|
|
|
|
| def test_default_rollback_for_risk(): |
| assert default_rollback_for_risk("low") == "observe_only" |
| assert default_rollback_for_risk("medium") == "disable_module" |
| assert default_rollback_for_risk("high") == "sandbox" |
| assert default_rollback_for_risk("critical") == "revert" |
|
|
|
|
| def test_derive_risk_label_for_low_risk_passive_modules(): |
| assert derive_risk_label( |
| requested_modules=["resource_links"], |
| authorized_target=False, |
| ) == "low" |
|
|
|
|
| def test_derive_risk_label_for_conditional_authorized_modules(): |
| assert derive_risk_label( |
| requested_modules=["http_headers"], |
| authorized_target=True, |
| ) == "medium" |
|
|
|
|
| def test_derive_risk_label_for_conditional_unauthorized_modules(): |
| assert derive_risk_label( |
| requested_modules=["http_headers"], |
| authorized_target=False, |
| ) == "high" |
|
|
|
|
| def test_derive_risk_label_for_forbidden_modules(): |
| assert derive_risk_label( |
| requested_modules=["nmap"], |
| authorized_target=True, |
| ) == "critical" |
|
|