""" osint_core/drift.py Purpose: Convert execution telemetry into a drift vector. Principles: - Drift is a vector, not a scalar. - Detection does not mutate state. - Correction is separate. - Policy drift outranks statistical drift. - Adversarial drift constrains before adapting. """ DEFINE DriftType: statistical behavioral structural adversarial operational policy DEFINE DriftVector: statistical: float 0.0 to 1.0 behavioral: float 0.0 to 1.0 structural: float 0.0 to 1.0 adversarial: float 0.0 to 1.0 operational: float 0.0 to 1.0 policy: float 0.0 to 1.0 DEFINE DriftSignal: name drift_type score reason tier evidence DEFINE DriftAssessment: drift_vector signals dominant_type recommended_correction confidence FUNCTION assess_drift(telemetry, baseline, manifest, policy_result): signals = [] signals += check_policy_drift(policy_result) signals += check_adversarial_drift(telemetry) signals += check_operational_drift(telemetry, baseline) signals += check_structural_drift(telemetry, manifest) signals += check_behavioral_drift(telemetry, baseline) signals += check_statistical_drift(telemetry, baseline) drift_vector = aggregate_signals(signals) dominant_type = choose_dominant_drift_type(drift_vector) correction = recommend_correction(drift_vector, signals) confidence = estimate_confidence(signals) RETURN DriftAssessment( drift_vector=drift_vector, signals=signals, dominant_type=dominant_type, recommended_correction=correction, confidence=confidence ) FUNCTION check_adversarial_drift(telemetry): suspicious_patterns = [ "../", "%2e%2e", " baseline.runtime_p95 * 2: ADD signal( type=operational, score=0.5, tier=T3, reason="Runtime exceeded expected boundary" ) IF error_count > baseline.error_rate_threshold: ADD signal( type=operational, score=0.6, tier=T3, reason="Error rate exceeded baseline" ) IF timeout_count increased: ADD signal( type=operational, score=0.4, tier=T3, reason="Timeout rate elevated" ) RETURN signals FUNCTION check_structural_drift(telemetry, manifest): IF telemetry.manifest_hash != manifest.hash: RETURN signal( type=structural, score=1.0, tier=T1, reason="Execution manifest mismatch" ) IF dependency_hash changed without approved manifest: RETURN signal( type=structural, score=0.9, tier=T1, reason="Dependency graph changed" ) IF runtime_python_version changed: RETURN signal( type=structural, score=0.6, tier=T2, reason="Runtime version changed" ) RETURN [] FUNCTION check_behavioral_drift(telemetry, baseline): IF same_input_hash existed before: previous_output_hash = baseline.output_hash_for(input_hash) IF current_output_hash != previous_output_hash: ADD signal( type=behavioral, score=0.9, tier=T1, reason="Same input produced different output" ) IF output_schema_invalid: ADD signal( type=behavioral, score=0.8, tier=T1, reason="Output schema invalid" ) RETURN signals FUNCTION check_statistical_drift(telemetry, baseline): IF input_type_distribution changed: ADD signal( type=statistical, score=0.4, tier=T4, reason="Input type distribution shifted" ) IF module_usage_distribution changed: ADD signal( type=statistical, score=0.3, tier=T4, reason="Module usage distribution shifted" ) IF average_input_entropy changed: ADD signal( type=statistical, score=0.4, tier=T4, reason="Input entropy shifted" ) RETURN signals FUNCTION aggregate_signals(signals): vector = DriftVector(all zeros) FOR each drift_type: matching = signals where signal.type == drift_type IF no matching: vector[drift_type] = 0.0 ELSE: vector[drift_type] = max(signal.score for matching) RETURN vector FUNCTION choose_dominant_drift_type(vector): priority_order = [ policy, structural, behavioral, adversarial, operational, statistical ] FOR type IN priority_order: IF vector[type] > 0: RETURN type RETURN none FUNCTION recommend_correction(vector, signals): IF vector.policy >= 0.6: RETURN REVERT IF vector.structural >= 0.5: RETURN REVERT IF vector.behavioral >= 0.7: RETURN REVERT IF vector.adversarial >= 0.3: RETURN CONSTRAIN IF vector.operational >= 0.7: RETURN CONSTRAIN IF vector.statistical >= 0.5: RETURN ADAPT RETURN OBSERVE FUNCTION passes_noise_filter(signal, history): IF signal.tier == T1: RETURN True persistence = signal appears N times across M windows ensemble_agreement = at least two detectors agree causal_hypothesis = signal.reason is not empty IF persistence AND ensemble_agreement AND causal_hypothesis: RETURN True RETURN False