""" osint_core.scheduler ==================== Latency-conscious scheduler for the Enterprise Drift-Aware OSINT Control Fabric. The scheduler allocates time, trust, and authority. It does not execute actions. """ from __future__ import annotations from dataclasses import dataclass, field from enum import Enum from typing import Literal RiskLabel = Literal["low", "medium", "high", "critical"] TrustState = Literal["normal", "degraded", "suspicious", "contested", "unsafe"] RouteName = Literal["FAST", "DELIBERATIVE", "CONTAINMENT", "FAIL_CLOSED"] class ScheduleReason(str, Enum): FAST_PATH_AVAILABLE = "fast_path_available" DELIBERATIVE_PATH_AVAILABLE = "deliberative_path_available" DEADLINE_TOO_TIGHT = "deadline_too_tight" TRUST_STATE_DEGRADED = "trust_state_degraded" SHORTCUT_DEBT_TOO_HIGH = "shortcut_debt_too_high" INVARIANT_VIOLATION = "invariant_violation" NO_SAFE_ACTION_FITS = "no_safe_action_fits" MISSING_ROLLBACK = "missing_rollback" LOW_CONFIDENCE = "low_confidence" @dataclass(frozen=True) class DecisionPacket: intent_id: str action: str risk_label: RiskLabel confidence: float reversibility: float deadline_ms: int verification_cost_ms: int execution_cost_ms: int rollback_cost_ms: int expected_utility_decay: float required_checks: tuple[str, ...] rollback_plan: str uncertainty_notes: tuple[str, ...] = field(default_factory=tuple) @dataclass(frozen=True) class ShortcutDebt: reduced_checks: int = 0 cached_policy_uses: int = 0 skipped_deep_analysis: int = 0 emergency_overrides: int = 0 @property def score(self) -> float: return min( 1.0, (self.reduced_checks * 0.10) + (self.cached_policy_uses * 0.05) + (self.skipped_deep_analysis * 0.15) + (self.emergency_overrides * 0.40), ) @dataclass(frozen=True) class SystemState: trust_state: TrustState = "normal" shortcut_debt: ShortcutDebt = field(default_factory=ShortcutDebt) shortcut_debt_limit: float = 0.70 queue_depth: int = 0 hardware_state: TrustState = "normal" @dataclass(frozen=True) class ScheduleDecision: route: RouteName reason: ScheduleReason allowed: bool authority_scale: float required_checks: tuple[str, ...] skipped_checks: tuple[str, ...] notes: tuple[str, ...] = field(default_factory=tuple) INVARIANT_CHECKS: tuple[str, ...] = ( "hash_salt_present", "intent_signature_verified", "scope_validated", "policy_evaluated", "forbidden_modules_blocked", "raw_indicators_excluded", "conditional_modules_authorized", ) ADAPTIVE_CHECKS: tuple[str, ...] = ( "deep_log_correlation", "long_horizon_analysis", "full_counterfactual_simulation", "secondary_model_review", ) def total_required_time_ms(packet: DecisionPacket) -> int: return packet.verification_cost_ms + packet.execution_cost_ms + packet.rollback_cost_ms def fits_deadline(packet: DecisionPacket) -> bool: return total_required_time_ms(packet) <= packet.deadline_ms def has_required_rollback(packet: DecisionPacket) -> bool: if packet.risk_label in {"high", "critical"}: return bool(packet.rollback_plan and packet.rollback_cost_ms > 0) return True def invariant_violations(packet: DecisionPacket) -> tuple[str, ...]: required = set(packet.required_checks) return tuple(check for check in INVARIANT_CHECKS if check not in required) def risk_weight(risk_label: RiskLabel) -> float: RISK_WEIGHTS = {"low": 0.25, "medium": 0.50, "high": 0.75, "critical": 1.00} def risk_weight(risk_label: RiskLabel) -> float: return RISK_WEIGHTS.get(risk_label, 1.00) def safe_utility(packet: DecisionPacket) -> float: time_ratio = min(1.0, total_required_time_ms(packet) / max(packet.deadline_ms, 1)) return max( 0.0, (packet.confidence * 0.40) + (packet.reversibility * 0.30) + ((1.0 - risk_weight(packet.risk_label)) * 0.20) + ((1.0 - time_ratio) * 0.10) - (packet.expected_utility_decay * 0.10), ) def schedule_decision(packet: DecisionPacket, state: SystemState | None = None) -> ScheduleDecision: state = state or SystemState() missing_invariants = invariant_violations(packet) if missing_invariants: return ScheduleDecision( route="FAIL_CLOSED", reason=ScheduleReason.INVARIANT_VIOLATION, allowed=False, authority_scale=0.0, required_checks=tuple(packet.required_checks), skipped_checks=missing_invariants, notes=("Invariant checks cannot be skipped under deadline pressure.",), ) if not has_required_rollback(packet): return ScheduleDecision( route="FAIL_CLOSED", reason=ScheduleReason.MISSING_ROLLBACK, allowed=False, authority_scale=0.0, required_checks=tuple(packet.required_checks), skipped_checks=(), notes=("High-impact action requires rollback or containment plan.",), ) if state.shortcut_debt.score >= state.shortcut_debt_limit: return containment_decision(packet, ScheduleReason.SHORTCUT_DEBT_TOO_HIGH, "Shortcut debt exceeded configured limit.") if state.trust_state in {"contested", "unsafe"} or state.hardware_state in {"contested", "unsafe"}: return containment_decision(packet, ScheduleReason.TRUST_STATE_DEGRADED, "Trust or hardware state is contested/unsafe.") if packet.confidence < 0.30 and packet.risk_label in {"high", "critical"}: return containment_decision(packet, ScheduleReason.LOW_CONFIDENCE, "Confidence too low for high-impact decision.") if fits_deadline(packet): if packet.risk_label in {"low", "medium"} and packet.reversibility >= 0.50: return ScheduleDecision( route="FAST", reason=ScheduleReason.FAST_PATH_AVAILABLE, allowed=True, authority_scale=1.0, required_checks=tuple(packet.required_checks), skipped_checks=(), notes=("Low/medium risk action fits available decision window.",), ) return ScheduleDecision( route="DELIBERATIVE", reason=ScheduleReason.DELIBERATIVE_PATH_AVAILABLE, allowed=True, authority_scale=0.75, required_checks=tuple(packet.required_checks), skipped_checks=(), notes=("High-impact or lower-reversibility action fits full verification window.",), ) if packet.reversibility >= 0.75: return containment_decision(packet, ScheduleReason.DEADLINE_TOO_TIGHT, "Full verification/execution/rollback does not fit deadline.") return ScheduleDecision( route="FAIL_CLOSED", reason=ScheduleReason.NO_SAFE_ACTION_FITS, allowed=False, authority_scale=0.0, required_checks=tuple(packet.required_checks), skipped_checks=(), notes=("No safe action fits inside the useful decision window.",), ) def containment_decision(packet: DecisionPacket, reason: ScheduleReason, note: str) -> ScheduleDecision: skipped = tuple(check for check in ADAPTIVE_CHECKS if check in packet.required_checks) effective = tuple(check for check in packet.required_checks if check not in skipped) return ScheduleDecision( route="CONTAINMENT", reason=reason, allowed=True, authority_scale=0.25, required_checks=effective, skipped_checks=skipped, notes=(note, "Authority reduced; prefer reversible, bounded action."), )