| """ |
| osint_core.scheduler |
| ==================== |
| |
| Latency-conscious scheduler for the Enterprise Drift-Aware OSINT Control Fabric. |
| |
| The scheduler allocates time, trust, and authority. It does not execute actions. |
| """ |
|
|
| from __future__ import annotations |
|
|
| from dataclasses import dataclass, field |
| from enum import Enum |
| from typing import Literal |
|
|
|
|
| RiskLabel = Literal["low", "medium", "high", "critical"] |
| TrustState = Literal["normal", "degraded", "suspicious", "contested", "unsafe"] |
| RouteName = Literal["FAST", "DELIBERATIVE", "CONTAINMENT", "FAIL_CLOSED"] |
|
|
|
|
| class ScheduleReason(str, Enum): |
| FAST_PATH_AVAILABLE = "fast_path_available" |
| DELIBERATIVE_PATH_AVAILABLE = "deliberative_path_available" |
| DEADLINE_TOO_TIGHT = "deadline_too_tight" |
| TRUST_STATE_DEGRADED = "trust_state_degraded" |
| SHORTCUT_DEBT_TOO_HIGH = "shortcut_debt_too_high" |
| INVARIANT_VIOLATION = "invariant_violation" |
| NO_SAFE_ACTION_FITS = "no_safe_action_fits" |
| MISSING_ROLLBACK = "missing_rollback" |
| LOW_CONFIDENCE = "low_confidence" |
|
|
|
|
| @dataclass(frozen=True) |
| class DecisionPacket: |
| intent_id: str |
| action: str |
| risk_label: RiskLabel |
| confidence: float |
| reversibility: float |
| deadline_ms: int |
| verification_cost_ms: int |
| execution_cost_ms: int |
| rollback_cost_ms: int |
| expected_utility_decay: float |
| required_checks: tuple[str, ...] |
| rollback_plan: str |
| uncertainty_notes: tuple[str, ...] = field(default_factory=tuple) |
|
|
|
|
| @dataclass(frozen=True) |
| class ShortcutDebt: |
| reduced_checks: int = 0 |
| cached_policy_uses: int = 0 |
| skipped_deep_analysis: int = 0 |
| emergency_overrides: int = 0 |
|
|
| @property |
| def score(self) -> float: |
| return min( |
| 1.0, |
| (self.reduced_checks * 0.10) |
| + (self.cached_policy_uses * 0.05) |
| + (self.skipped_deep_analysis * 0.15) |
| + (self.emergency_overrides * 0.40), |
| ) |
|
|
|
|
| @dataclass(frozen=True) |
| class SystemState: |
| trust_state: TrustState = "normal" |
| shortcut_debt: ShortcutDebt = field(default_factory=ShortcutDebt) |
| shortcut_debt_limit: float = 0.70 |
| queue_depth: int = 0 |
| hardware_state: TrustState = "normal" |
|
|
|
|
| @dataclass(frozen=True) |
| class ScheduleDecision: |
| route: RouteName |
| reason: ScheduleReason |
| allowed: bool |
| authority_scale: float |
| required_checks: tuple[str, ...] |
| skipped_checks: tuple[str, ...] |
| notes: tuple[str, ...] = field(default_factory=tuple) |
|
|
|
|
| INVARIANT_CHECKS: tuple[str, ...] = ( |
| "hash_salt_present", |
| "intent_signature_verified", |
| "scope_validated", |
| "policy_evaluated", |
| "forbidden_modules_blocked", |
| "raw_indicators_excluded", |
| "conditional_modules_authorized", |
| ) |
|
|
| ADAPTIVE_CHECKS: tuple[str, ...] = ( |
| "deep_log_correlation", |
| "long_horizon_analysis", |
| "full_counterfactual_simulation", |
| "secondary_model_review", |
| ) |
|
|
|
|
| def total_required_time_ms(packet: DecisionPacket) -> int: |
| return packet.verification_cost_ms + packet.execution_cost_ms + packet.rollback_cost_ms |
|
|
|
|
| def fits_deadline(packet: DecisionPacket) -> bool: |
| return total_required_time_ms(packet) <= packet.deadline_ms |
|
|
|
|
| def has_required_rollback(packet: DecisionPacket) -> bool: |
| if packet.risk_label in {"high", "critical"}: |
| return bool(packet.rollback_plan and packet.rollback_cost_ms > 0) |
| return True |
|
|
|
|
| def invariant_violations(packet: DecisionPacket) -> tuple[str, ...]: |
| required = set(packet.required_checks) |
| return tuple(check for check in INVARIANT_CHECKS if check not in required) |
|
|
|
|
| def risk_weight(risk_label: RiskLabel) -> float: |
| RISK_WEIGHTS = {"low": 0.25, "medium": 0.50, "high": 0.75, "critical": 1.00} |
|
|
| def risk_weight(risk_label: RiskLabel) -> float: |
| return RISK_WEIGHTS.get(risk_label, 1.00) |
|
|
|
|
| def safe_utility(packet: DecisionPacket) -> float: |
| time_ratio = min(1.0, total_required_time_ms(packet) / max(packet.deadline_ms, 1)) |
| return max( |
| 0.0, |
| (packet.confidence * 0.40) |
| + (packet.reversibility * 0.30) |
| + ((1.0 - risk_weight(packet.risk_label)) * 0.20) |
| + ((1.0 - time_ratio) * 0.10) |
| - (packet.expected_utility_decay * 0.10), |
| ) |
|
|
|
|
| def schedule_decision(packet: DecisionPacket, state: SystemState | None = None) -> ScheduleDecision: |
| state = state or SystemState() |
|
|
| missing_invariants = invariant_violations(packet) |
| if missing_invariants: |
| return ScheduleDecision( |
| route="FAIL_CLOSED", |
| reason=ScheduleReason.INVARIANT_VIOLATION, |
| allowed=False, |
| authority_scale=0.0, |
| required_checks=tuple(packet.required_checks), |
| skipped_checks=missing_invariants, |
| notes=("Invariant checks cannot be skipped under deadline pressure.",), |
| ) |
|
|
| if not has_required_rollback(packet): |
| return ScheduleDecision( |
| route="FAIL_CLOSED", |
| reason=ScheduleReason.MISSING_ROLLBACK, |
| allowed=False, |
| authority_scale=0.0, |
| required_checks=tuple(packet.required_checks), |
| skipped_checks=(), |
| notes=("High-impact action requires rollback or containment plan.",), |
| ) |
|
|
| if state.shortcut_debt.score >= state.shortcut_debt_limit: |
| return containment_decision(packet, ScheduleReason.SHORTCUT_DEBT_TOO_HIGH, "Shortcut debt exceeded configured limit.") |
|
|
| if state.trust_state in {"contested", "unsafe"} or state.hardware_state in {"contested", "unsafe"}: |
| return containment_decision(packet, ScheduleReason.TRUST_STATE_DEGRADED, "Trust or hardware state is contested/unsafe.") |
|
|
| if packet.confidence < 0.30 and packet.risk_label in {"high", "critical"}: |
| return containment_decision(packet, ScheduleReason.LOW_CONFIDENCE, "Confidence too low for high-impact decision.") |
|
|
| if fits_deadline(packet): |
| if packet.risk_label in {"low", "medium"} and packet.reversibility >= 0.50: |
| return ScheduleDecision( |
| route="FAST", |
| reason=ScheduleReason.FAST_PATH_AVAILABLE, |
| allowed=True, |
| authority_scale=1.0, |
| required_checks=tuple(packet.required_checks), |
| skipped_checks=(), |
| notes=("Low/medium risk action fits available decision window.",), |
| ) |
|
|
| return ScheduleDecision( |
| route="DELIBERATIVE", |
| reason=ScheduleReason.DELIBERATIVE_PATH_AVAILABLE, |
| allowed=True, |
| authority_scale=0.75, |
| required_checks=tuple(packet.required_checks), |
| skipped_checks=(), |
| notes=("High-impact or lower-reversibility action fits full verification window.",), |
| ) |
|
|
| if packet.reversibility >= 0.75: |
| return containment_decision(packet, ScheduleReason.DEADLINE_TOO_TIGHT, "Full verification/execution/rollback does not fit deadline.") |
|
|
| return ScheduleDecision( |
| route="FAIL_CLOSED", |
| reason=ScheduleReason.NO_SAFE_ACTION_FITS, |
| allowed=False, |
| authority_scale=0.0, |
| required_checks=tuple(packet.required_checks), |
| skipped_checks=(), |
| notes=("No safe action fits inside the useful decision window.",), |
| ) |
|
|
|
|
| def containment_decision(packet: DecisionPacket, reason: ScheduleReason, note: str) -> ScheduleDecision: |
| skipped = tuple(check for check in ADAPTIVE_CHECKS if check in packet.required_checks) |
| effective = tuple(check for check in packet.required_checks if check not in skipped) |
| return ScheduleDecision( |
| route="CONTAINMENT", |
| reason=reason, |
| allowed=True, |
| authority_scale=0.25, |
| required_checks=effective, |
| skipped_checks=skipped, |
| notes=(note, "Authority reduced; prefer reversible, bounded action."), |
| ) |
|
|