| """ |
| tests/test_drift.py |
| =================== |
| |
| Contract tests for osint_core.drift. |
| |
| These tests define the expected behavior of the drift layer before implementation. |
| |
| Core invariants: |
| - Drift is represented as a vector, not a scalar. |
| - Drift detection is pure: it does not mutate baseline, manifest, telemetry, or policy input. |
| - Policy drift outranks all other drift. |
| - Structural and behavioral drift are revert-class. |
| - Adversarial drift constrains before the system adapts. |
| - Statistical drift may adapt only when higher-priority drift classes are absent. |
| """ |
|
|
| from __future__ import annotations |
|
|
| import copy |
| from dataclasses import asdict |
| from typing import Any |
|
|
| import pytest |
|
|
| from osint_core.drift import ( |
| DriftAssessment, |
| DriftSignal, |
| DriftType, |
| DriftVector, |
| TelemetrySnapshot, |
| aggregate_signals, |
| assess_drift, |
| choose_dominant_drift_type, |
| estimate_confidence, |
| recommend_correction, |
| ) |
|
|
|
|
| def make_telemetry(**overrides: Any) -> TelemetrySnapshot: |
| data: dict[str, Any] = { |
| "run_id": "run_test_001", |
| "manifest_hash": "manifest_good", |
| "dependency_hash": "deps_good", |
| "runtime_python_version": "3.13.0", |
| "indicator_hash": "hmac_abc123", |
| "indicator_type": "domain", |
| "input_rejected": False, |
| "rejection_reason": "", |
| "sanitized_input_trace": "", |
| "modules_requested": ["resource_links"], |
| "modules_executed": ["resource_links"], |
| "modules_blocked": [], |
| "authorized_target": False, |
| "duration_ms": 100, |
| "error_count": 0, |
| "timeout_count": 0, |
| "output_hash": "output_good", |
| "output_schema_valid": True, |
| } |
| data.update(overrides) |
| return TelemetrySnapshot(**data) |
|
|
|
|
| def make_baseline(**overrides: Any) -> dict[str, Any]: |
| data: dict[str, Any] = { |
| "runtime_p95_ms": 500, |
| "error_rate_threshold": 2, |
| "timeout_threshold": 1, |
| "expected_manifest_hash": "manifest_good", |
| "expected_dependency_hash": "deps_good", |
| "expected_runtime_python_version": "3.13.0", |
| "known_output_hashes": { |
| "hmac_abc123": "output_good", |
| }, |
| "input_type_distribution": { |
| "domain": 0.8, |
| "username": 0.2, |
| }, |
| "module_usage_distribution": { |
| "resource_links": 1.0, |
| }, |
| "input_entropy_avg": 3.2, |
| } |
| data.update(overrides) |
| return data |
|
|
|
|
| def make_policy_result(**overrides: Any) -> dict[str, Any]: |
| data: dict[str, Any] = { |
| "decision": "allow", |
| "allowed_modules": ["resource_links"], |
| "blocked_modules": [], |
| "violations": [], |
| } |
| data.update(overrides) |
| return data |
|
|
|
|
| @pytest.fixture |
| def telemetry() -> TelemetrySnapshot: |
| return make_telemetry() |
|
|
|
|
| @pytest.fixture |
| def baseline() -> dict[str, Any]: |
| return make_baseline() |
|
|
|
|
| @pytest.fixture |
| def policy_result() -> dict[str, Any]: |
| return make_policy_result() |
|
|
|
|
| def test_drift_vector_defaults_to_zero() -> None: |
| vector = DriftVector() |
|
|
| assert vector.statistical == 0.0 |
| assert vector.behavioral == 0.0 |
| assert vector.structural == 0.0 |
| assert vector.adversarial == 0.0 |
| assert vector.operational == 0.0 |
| assert vector.policy == 0.0 |
|
|
|
|
| def test_aggregate_signals_empty_returns_zero_vector() -> None: |
| assert aggregate_signals([]) == DriftVector() |
|
|
|
|
| def test_aggregate_signals_uses_max_score_per_type() -> None: |
| signals = [ |
| DriftSignal( |
| name="weak_adversarial_signal", |
| drift_type=DriftType.ADVERSARIAL, |
| score=0.2, |
| reason="weak suspicious pattern", |
| tier="T2", |
| evidence={"pattern": ";"}, |
| ), |
| DriftSignal( |
| name="strong_adversarial_signal", |
| drift_type=DriftType.ADVERSARIAL, |
| score=0.7, |
| reason="strong suspicious pattern", |
| tier="T2", |
| evidence={"pattern": "169.254.169.254"}, |
| ), |
| DriftSignal( |
| name="operational_signal", |
| drift_type=DriftType.OPERATIONAL, |
| score=0.4, |
| reason="runtime elevated", |
| tier="T3", |
| evidence={"duration_ms": 1500}, |
| ), |
| ] |
|
|
| vector = aggregate_signals(signals) |
|
|
| assert vector.adversarial == 0.7 |
| assert vector.operational == 0.4 |
| assert vector.policy == 0.0 |
|
|
|
|
| def test_dominant_type_prefers_adversarial_over_statistical() -> None: |
| |
| vector = DriftVector( |
| statistical=0.9, |
| adversarial=0.4, |
| policy=0.0, |
| ) |
|
|
| assert choose_dominant_drift_type(vector) == DriftType.ADVERSARIAL |
|
|
|
|
| def test_dominant_type_prefers_policy_over_all() -> None: |
| vector = DriftVector( |
| statistical=0.9, |
| adversarial=0.4, |
| policy=0.6, |
| ) |
|
|
| assert choose_dominant_drift_type(vector) == DriftType.POLICY |
|
|
|
|
| def test_dominant_type_respects_structural_over_behavioral_over_operational() -> None: |
| vector = DriftVector(structural=0.1, behavioral=0.9, operational=1.0) |
| assert choose_dominant_drift_type(vector) == DriftType.STRUCTURAL |
|
|
| vector = DriftVector(behavioral=0.2, adversarial=0.9, operational=1.0) |
| assert choose_dominant_drift_type(vector) == DriftType.BEHAVIORAL |
|
|
|
|
| @pytest.mark.parametrize( |
| ("vector", "expected"), |
| [ |
| (DriftVector(policy=0.6, statistical=1.0, adversarial=0.2), "REVERT"), |
| (DriftVector(structural=0.5), "REVERT"), |
| (DriftVector(behavioral=0.7), "REVERT"), |
| (DriftVector(adversarial=0.3, statistical=0.9), "CONSTRAIN"), |
| (DriftVector(statistical=0.5), "ADAPT"), |
| (DriftVector(statistical=0.1, operational=0.1), "OBSERVE"), |
| ], |
| ids=[ |
| "policy_revert", |
| "structural_revert", |
| "behavioral_revert", |
| "adversarial_constrain", |
| "statistical_adapt", |
| "default_observe", |
| ], |
| ) |
| def test_recommend_correction(vector: DriftVector, expected: str) -> None: |
| assert recommend_correction(vector) == expected |
|
|
|
|
| def test_policy_violation_creates_policy_signal_and_revert_recommendation( |
| telemetry: TelemetrySnapshot, |
| baseline: dict[str, Any], |
| ) -> None: |
| policy_result = make_policy_result( |
| decision="constrain", |
| blocked_modules=["port_scan"], |
| violations=[ |
| { |
| "code": "forbidden_module", |
| "message": "Forbidden module blocked: Port Scan", |
| "module": "port_scan", |
| } |
| ], |
| ) |
|
|
| assessment = assess_drift( |
| telemetry=telemetry, |
| baseline=baseline, |
| policy_result=policy_result, |
| ) |
|
|
| assert isinstance(assessment, DriftAssessment) |
| assert assessment.drift_vector.policy == 1.0 |
| assert assessment.dominant_type == DriftType.POLICY |
| assert assessment.recommended_correction == "REVERT" |
| assert any(signal.drift_type == DriftType.POLICY for signal in assessment.signals) |
|
|
|
|
| def test_authorization_gate_trigger_creates_policy_signal( |
| baseline: dict[str, Any], |
| ) -> None: |
| telemetry = make_telemetry( |
| modules_requested=["http_headers"], |
| modules_blocked=["http_headers"], |
| authorized_target=False, |
| ) |
| policy_result = make_policy_result( |
| decision="constrain", |
| blocked_modules=["http_headers"], |
| violations=[ |
| { |
| "code": "authorization_required", |
| "message": "Authorization required for module: HTTP Headers", |
| "module": "http_headers", |
| } |
| ], |
| ) |
|
|
| assessment = assess_drift( |
| telemetry=telemetry, |
| baseline=baseline, |
| policy_result=policy_result, |
| ) |
|
|
| assert assessment.drift_vector.policy >= 0.6 |
| assert assessment.recommended_correction == "REVERT" |
|
|
|
|
| def test_adversarial_patterns_create_constrain_recommendation( |
| baseline: dict[str, Any], |
| policy_result: dict[str, Any], |
| ) -> None: |
| telemetry = make_telemetry( |
| input_rejected=True, |
| rejection_reason="Input contains a blocked pattern.", |
| sanitized_input_trace="https://example.com/?next=http://169.254.169.254/latest", |
| ) |
|
|
| assessment = assess_drift( |
| telemetry=telemetry, |
| baseline=baseline, |
| policy_result=policy_result, |
| ) |
|
|
| assert assessment.drift_vector.adversarial >= 0.7 |
| assert assessment.dominant_type == DriftType.ADVERSARIAL |
| assert assessment.recommended_correction == "CONSTRAIN" |
|
|
|
|
| def test_input_rejected_without_trace_does_not_trigger_adversarial_drift( |
| baseline: dict[str, Any], |
| policy_result: dict[str, Any], |
| ) -> None: |
| telemetry = make_telemetry( |
| input_rejected=True, |
| rejection_reason="", |
| sanitized_input_trace="", |
| ) |
|
|
| assessment = assess_drift( |
| telemetry=telemetry, |
| baseline=baseline, |
| policy_result=policy_result, |
| ) |
|
|
| assert assessment.drift_vector.adversarial == 0.0 |
| assert not any(s.drift_type == DriftType.ADVERSARIAL for s in assessment.signals) |
|
|
|
|
| def test_operational_runtime_drift_detected( |
| baseline: dict[str, Any], |
| policy_result: dict[str, Any], |
| ) -> None: |
| telemetry = make_telemetry(duration_ms=1200) |
|
|
| assessment = assess_drift( |
| telemetry=telemetry, |
| baseline=baseline, |
| policy_result=policy_result, |
| ) |
|
|
| assert assessment.drift_vector.operational >= 0.5 |
| assert any(signal.name == "runtime_boundary_exceeded" for signal in assessment.signals) |
|
|
|
|
| def test_operational_error_drift_detected( |
| baseline: dict[str, Any], |
| policy_result: dict[str, Any], |
| ) -> None: |
| telemetry = make_telemetry(error_count=3) |
|
|
| assessment = assess_drift( |
| telemetry=telemetry, |
| baseline=baseline, |
| policy_result=policy_result, |
| ) |
|
|
| assert assessment.drift_vector.operational >= 0.6 |
| assert any(signal.name == "error_threshold_exceeded" for signal in assessment.signals) |
|
|
|
|
| def test_operational_timeout_drift_detected( |
| baseline: dict[str, Any], |
| policy_result: dict[str, Any], |
| ) -> None: |
| telemetry = make_telemetry(timeout_count=2) |
| baseline = make_baseline(timeout_threshold=1) |
|
|
| assessment = assess_drift( |
| telemetry=telemetry, |
| baseline=baseline, |
| policy_result=policy_result, |
| ) |
|
|
| assert assessment.drift_vector.operational > 0.0 |
| assert any(signal.name == "timeout_threshold_exceeded" for signal in assessment.signals) |
|
|
|
|
| def test_structural_manifest_mismatch_reverts( |
| baseline: dict[str, Any], |
| policy_result: dict[str, Any], |
| ) -> None: |
| telemetry = make_telemetry(manifest_hash="manifest_changed") |
|
|
| assessment = assess_drift( |
| telemetry=telemetry, |
| baseline=baseline, |
| policy_result=policy_result, |
| ) |
|
|
| assert assessment.drift_vector.structural == 1.0 |
| assert assessment.dominant_type == DriftType.STRUCTURAL |
| assert assessment.recommended_correction == "REVERT" |
|
|
|
|
| def test_structural_dependency_mismatch_reverts( |
| baseline: dict[str, Any], |
| policy_result: dict[str, Any], |
| ) -> None: |
| telemetry = make_telemetry(dependency_hash="deps_changed") |
|
|
| assessment = assess_drift( |
| telemetry=telemetry, |
| baseline=baseline, |
| policy_result=policy_result, |
| ) |
|
|
| assert assessment.drift_vector.structural >= 0.9 |
| assert assessment.recommended_correction == "REVERT" |
|
|
|
|
| def test_structural_runtime_python_version_mismatch_reverts( |
| baseline: dict[str, Any], |
| policy_result: dict[str, Any], |
| ) -> None: |
| telemetry = make_telemetry(runtime_python_version="3.13.1") |
|
|
| assessment = assess_drift( |
| telemetry=telemetry, |
| baseline=baseline, |
| policy_result=policy_result, |
| ) |
|
|
| assert assessment.drift_vector.structural > 0.0 |
| assert assessment.recommended_correction == "REVERT" |
| assert any(signal.name == "runtime_python_version_changed" for signal in assessment.signals) |
|
|
|
|
| def test_behavioral_same_input_different_output_reverts( |
| baseline: dict[str, Any], |
| policy_result: dict[str, Any], |
| ) -> None: |
| telemetry = make_telemetry( |
| indicator_hash="hmac_abc123", |
| output_hash="output_changed", |
| ) |
| baseline = make_baseline( |
| known_output_hashes={"hmac_abc123": "output_good"}, |
| ) |
|
|
| assessment = assess_drift( |
| telemetry=telemetry, |
| baseline=baseline, |
| policy_result=policy_result, |
| ) |
|
|
| assert assessment.drift_vector.behavioral >= 0.9 |
| assert assessment.dominant_type == DriftType.BEHAVIORAL |
| assert assessment.recommended_correction == "REVERT" |
|
|
|
|
| def test_behavioral_invalid_schema_reverts( |
| baseline: dict[str, Any], |
| policy_result: dict[str, Any], |
| ) -> None: |
| telemetry = make_telemetry(output_schema_valid=False) |
|
|
| assessment = assess_drift( |
| telemetry=telemetry, |
| baseline=baseline, |
| policy_result=policy_result, |
| ) |
|
|
| assert assessment.drift_vector.behavioral >= 0.8 |
| assert assessment.recommended_correction == "REVERT" |
|
|
|
|
| def test_statistical_shift_can_adapt_when_no_higher_priority_signal( |
| baseline: dict[str, Any], |
| policy_result: dict[str, Any], |
| ) -> None: |
| telemetry = make_telemetry(indicator_type="ip") |
| baseline = make_baseline( |
| input_type_distribution={"domain": 0.9, "username": 0.1}, |
| ) |
|
|
| assessment = assess_drift( |
| telemetry=telemetry, |
| baseline=baseline, |
| policy_result=policy_result, |
| ) |
|
|
| assert assessment.drift_vector.statistical >= 0.5 |
| assert assessment.dominant_type == DriftType.STATISTICAL |
| assert assessment.recommended_correction == "ADAPT" |
|
|
|
|
| def test_statistical_module_usage_shift_detected( |
| baseline: dict[str, Any], |
| policy_result: dict[str, Any], |
| ) -> None: |
| telemetry = make_telemetry( |
| modules_executed=["resource_links", "dns_lookup"], |
| ) |
| baseline = make_baseline( |
| module_usage_distribution={"resource_links": 1.0}, |
| ) |
|
|
| assessment = assess_drift( |
| telemetry=telemetry, |
| baseline=baseline, |
| policy_result=policy_result, |
| ) |
|
|
| assert assessment.drift_vector.statistical > 0.0 |
| assert any(signal.name == "module_usage_distribution_shifted" for signal in assessment.signals) |
|
|
|
|
| def test_policy_drift_overrides_statistical_adaptation( |
| baseline: dict[str, Any], |
| ) -> None: |
| telemetry = make_telemetry(indicator_type="ip") |
| baseline = make_baseline( |
| input_type_distribution={"domain": 0.9, "username": 0.1}, |
| ) |
| policy_result = make_policy_result( |
| decision="constrain", |
| blocked_modules=["port_scan"], |
| violations=[ |
| { |
| "code": "forbidden_module", |
| "message": "Forbidden module blocked", |
| "module": "port_scan", |
| } |
| ], |
| ) |
|
|
| assessment = assess_drift( |
| telemetry=telemetry, |
| baseline=baseline, |
| policy_result=policy_result, |
| ) |
|
|
| assert assessment.drift_vector.statistical >= 0.5 |
| assert assessment.drift_vector.policy == 1.0 |
| assert assessment.dominant_type == DriftType.POLICY |
| assert assessment.recommended_correction == "REVERT" |
|
|
|
|
| def test_adversarial_drift_overrides_statistical_adaptation( |
| baseline: dict[str, Any], |
| policy_result: dict[str, Any], |
| ) -> None: |
| telemetry = make_telemetry( |
| indicator_type="ip", |
| sanitized_input_trace="http://169.254.169.254/latest", |
| ) |
| baseline = make_baseline( |
| input_type_distribution={"domain": 0.9, "username": 0.1}, |
| ) |
|
|
| assessment = assess_drift( |
| telemetry=telemetry, |
| baseline=baseline, |
| policy_result=policy_result, |
| ) |
|
|
| assert assessment.drift_vector.statistical >= 0.5 |
| assert assessment.drift_vector.adversarial >= 0.7 |
| assert assessment.dominant_type == DriftType.ADVERSARIAL |
| assert assessment.recommended_correction == "CONSTRAIN" |
|
|
|
|
| def test_estimate_confidence_increases_with_signal_count_and_tier() -> None: |
| low_signal = DriftSignal( |
| name="weak", |
| drift_type=DriftType.STATISTICAL, |
| score=0.3, |
| reason="weak distribution shift", |
| tier="T4", |
| evidence={}, |
| ) |
| high_signal = DriftSignal( |
| name="policy", |
| drift_type=DriftType.POLICY, |
| score=1.0, |
| reason="forbidden module", |
| tier="T1", |
| evidence={}, |
| ) |
|
|
| assert estimate_confidence([]) == 0.0 |
| assert estimate_confidence([high_signal]) > estimate_confidence([low_signal]) |
|
|
| |
| assert estimate_confidence([low_signal, high_signal]) > estimate_confidence([high_signal]) |
|
|
|
|
| def test_assess_drift_is_pure_and_does_not_mutate_inputs( |
| telemetry: TelemetrySnapshot, |
| baseline: dict[str, Any], |
| policy_result: dict[str, Any], |
| ) -> None: |
| telemetry_before = copy.deepcopy(asdict(telemetry)) |
| baseline_before = copy.deepcopy(baseline) |
| policy_before = copy.deepcopy(policy_result) |
|
|
| assess_drift( |
| telemetry=telemetry, |
| baseline=baseline, |
| policy_result=policy_result, |
| ) |
|
|
| assert asdict(telemetry) == telemetry_before |
| assert baseline == baseline_before |
| assert policy_result == policy_before |
|
|
|
|
| def test_clean_execution_observes_without_significant_drift( |
| telemetry: TelemetrySnapshot, |
| baseline: dict[str, Any], |
| policy_result: dict[str, Any], |
| ) -> None: |
| assessment = assess_drift( |
| telemetry=telemetry, |
| baseline=baseline, |
| policy_result=policy_result, |
| ) |
|
|
| assert assessment.drift_vector == DriftVector() |
| assert assessment.signals == [] |
| assert assessment.dominant_type is None |
| assert assessment.recommended_correction == "OBSERVE" |
| assert assessment.confidence == pytest.approx(0.0) |
|
|