""" tests/test_intent.py ==================== Contract tests for osint_core.intent. Core invariants: - Intent packets are immutable. - Intent packets do not store raw indicators. - Scope boundaries are explicit and validated. - Forbidden operations cannot appear in allowed operations. - Packets can be signed and verified. - Signature tampering is detected. - Risk and rollback helpers are deterministic. """ from __future__ import annotations from dataclasses import FrozenInstanceError, replace import pytest from osint_core.intent import ( DEFAULT_FORBIDDEN_OPERATIONS, IntentErrorCode, IntentPacket, IntentValidationError, canonical_json, create_intent_packet, default_rollback_for_risk, derive_risk_label, find_raw_indicator_fields, hash_manifest_payload, intent_fingerprint, make_scope, risk_score, sign_payload, unsigned_intent_fingerprint, validate_intent, validate_scope, verify_intent_signature, ) TEST_SECRET = "test-intent-signing-secret" TARGET_HASH = "a" * 64 MANIFEST_HASH = "b" * 64 def make_valid_scope(**overrides): data = { "target_hash": TARGET_HASH, "indicator_type": "domain", "allowed_operations": ["resource_links"], "success_criteria": ["links_generated"], } data.update(overrides) return make_scope(**data) def make_valid_packet(**overrides): scope = overrides.pop("scope", make_valid_scope()) data = { "action": "enrich_indicator", "purpose": "Generate passive OSINT source links for a validated indicator.", "scope": scope, "requested_modules": ["resource_links"], "expected_side_effects": ["report_created", "audit_event_created"], "rollback_strategy": "observe_only", "risk_label": "low", "manifest_hash": MANIFEST_HASH, "signing_secret": TEST_SECRET, } data.update(overrides) return create_intent_packet(**data) def test_make_scope_adds_default_forbidden_operations(): scope = make_valid_scope() for operation in DEFAULT_FORBIDDEN_OPERATIONS: assert operation in scope.forbidden_operations assert scope.target_hash == TARGET_HASH assert scope.indicator_type == "domain" assert scope.allowed_operations == ("resource_links",) def test_scope_rejects_missing_target_hash(): result = validate_scope(make_valid_scope(target_hash="c" * 64)) assert result.ok is True with pytest.raises(IntentValidationError) as exc: make_valid_scope(target_hash="") assert exc.value.code == IntentErrorCode.MISSING_FIELD def test_scope_rejects_non_hash_target_identity(): with pytest.raises(IntentValidationError) as exc: make_valid_scope(target_hash="example.com") assert exc.value.code == IntentErrorCode.INVALID_SCOPE def test_scope_rejects_empty_allowed_operations(): with pytest.raises(IntentValidationError) as exc: make_valid_scope(allowed_operations=[]) assert exc.value.code == IntentErrorCode.MISSING_FIELD def test_scope_rejects_forbidden_operation_overlap(): with pytest.raises(IntentValidationError) as exc: make_valid_scope(allowed_operations=["resource_links", "port_scan"]) assert exc.value.code == IntentErrorCode.FORBIDDEN_OPERATION_REQUESTED def test_scope_rejects_invalid_time_horizon(): with pytest.raises(IntentValidationError) as exc: make_valid_scope(time_horizon_seconds=0) assert exc.value.code == IntentErrorCode.INVALID_SCOPE with pytest.raises(IntentValidationError) as exc: make_valid_scope(time_horizon_seconds=90_000) assert exc.value.code == IntentErrorCode.INVALID_SCOPE def test_create_intent_packet_signs_and_verifies(): packet = make_valid_packet() assert isinstance(packet, IntentPacket) assert packet.signature is not None assert verify_intent_signature(packet, secret=TEST_SECRET) is True def test_intent_packet_is_immutable(): packet = make_valid_packet() with pytest.raises(FrozenInstanceError): packet.purpose = "mutated" # type: ignore[misc] def test_unsigned_payload_excludes_signature(): packet = make_valid_packet() payload = packet.unsigned_payload() assert "signature" not in payload assert packet.signature is not None def test_signature_tampering_is_detected(): packet = make_valid_packet() tampered = replace(packet, purpose="Changed purpose after signing.") with pytest.raises(IntentValidationError) as exc: verify_intent_signature(tampered, secret=TEST_SECRET) assert exc.value.code == IntentErrorCode.SIGNATURE_MISMATCH def test_unsigned_packet_fails_verification(): packet = create_intent_packet( action="enrich_indicator", purpose="Generate passive links.", scope=make_valid_scope(), requested_modules=["resource_links"], expected_side_effects=["report_created"], rollback_strategy="observe_only", risk_label="low", manifest_hash=MANIFEST_HASH, sign=False, ) assert packet.signature is None with pytest.raises(IntentValidationError) as exc: verify_intent_signature(packet, secret=TEST_SECRET) assert exc.value.code == IntentErrorCode.UNSIGNED_PACKET def test_packet_rejects_invalid_action(): with pytest.raises(IntentValidationError) as exc: make_valid_packet(action="delete_everything") # type: ignore[arg-type] assert exc.value.code == IntentErrorCode.INVALID_ACTION def test_packet_rejects_invalid_risk_label(): with pytest.raises(IntentValidationError) as exc: make_valid_packet(risk_label="extreme") # type: ignore[arg-type] assert exc.value.code == IntentErrorCode.INVALID_RISK def test_packet_rejects_invalid_rollback_strategy(): with pytest.raises(IntentValidationError) as exc: make_valid_packet(rollback_strategy="YOLO") # type: ignore[arg-type] assert exc.value.code == IntentErrorCode.INVALID_ROLLBACK def test_packet_rejects_invalid_manifest_hash(): with pytest.raises(IntentValidationError) as exc: make_valid_packet(manifest_hash="not-a-hash") assert exc.value.code == IntentErrorCode.MISSING_FIELD def test_packet_rejects_empty_purpose(): with pytest.raises(IntentValidationError) as exc: make_valid_packet(purpose=" ") assert exc.value.code == IntentErrorCode.MISSING_FIELD def test_raw_indicator_field_detection(): payload = { "safe": {"target_hash": TARGET_HASH}, "unsafe": { "raw_indicator": "example.com", "nested": {"email": "user@example.com"}, }, } findings = find_raw_indicator_fields(payload) assert "unsafe.raw_indicator" in findings assert "unsafe.nested.email" in findings def test_validate_intent_rejects_raw_indicator_like_fields(): packet = make_valid_packet() unsafe_dict = packet.to_dict() unsafe_dict["raw_indicator"] = "example.com" findings = find_raw_indicator_fields(unsafe_dict) assert "raw_indicator" in findings def test_canonical_json_is_deterministic(): assert canonical_json({"b": 2, "a": 1}) == canonical_json({"a": 1, "b": 2}) def test_sign_payload_is_deterministic_for_same_payload_and_secret(): payload = {"a": 1, "b": 2} assert sign_payload(payload, TEST_SECRET) == sign_payload(payload, TEST_SECRET) assert sign_payload(payload, TEST_SECRET) != sign_payload(payload, "different-secret") def test_hash_manifest_payload_is_stable(): payload = {"artifact": "test", "version": "1.0.0"} assert hash_manifest_payload(payload) == hash_manifest_payload(payload) assert len(hash_manifest_payload(payload)) == 64 def test_intent_fingerprints_are_stable_and_distinct(): packet = make_valid_packet() signed_fp = intent_fingerprint(packet) unsigned_fp = unsigned_intent_fingerprint(packet) assert len(signed_fp) == 64 assert len(unsigned_fp) == 64 assert signed_fp != unsigned_fp def test_validate_intent_accepts_valid_packet(): result = validate_intent(make_valid_packet()) assert result.ok is True assert result.errors == () assert result.error_codes == () def test_risk_score_mapping(): assert risk_score("low") == 0.25 assert risk_score("medium") == 0.5 assert risk_score("high") == 0.75 assert risk_score("critical") == 1.0 def test_default_rollback_for_risk(): assert default_rollback_for_risk("low") == "observe_only" assert default_rollback_for_risk("medium") == "disable_module" assert default_rollback_for_risk("high") == "sandbox" assert default_rollback_for_risk("critical") == "revert" def test_derive_risk_label_for_low_risk_passive_modules(): assert derive_risk_label( requested_modules=["resource_links"], authorized_target=False, ) == "low" def test_derive_risk_label_for_conditional_authorized_modules(): assert derive_risk_label( requested_modules=["http_headers"], authorized_target=True, ) == "medium" def test_derive_risk_label_for_conditional_unauthorized_modules(): assert derive_risk_label( requested_modules=["http_headers"], authorized_target=False, ) == "high" def test_derive_risk_label_for_forbidden_modules(): assert derive_risk_label( requested_modules=["nmap"], authorized_target=True, ) == "critical"