import math from dataclasses import dataclass, field from typing import Dict, List, Optional, Tuple, Any @dataclass class EntityState: """Snapshot of an entity's current physical and behavioral state.""" entity_id: str position: Tuple[float, float] # (x, y) coordinates (normalized or pixel center) velocity: Tuple[float, float] # (vx, vy) change per frame behavior_vector: Dict[str, Any] = field(default_factory=dict) context: Dict[str, Any] = field(default_factory=dict) confidence: float = 1.0 @dataclass class PredictionResult: """The output of the anticipatory logic layer.""" entity_id: str risk_score: float predicted_motion: Tuple[float, float] uncertainty: float explanation: str class PredictionEngine: """ Anticipatory Intelligence Layer (Sentinel Phase 5). Estimates future risk based on temporal trends, behavior, and context. """ def __init__(self, risk_threshold: float = 0.7): self.risk_threshold = risk_threshold def predict(self, state: EntityState) -> PredictionResult: """Estimate the future risk for a single entity.""" # 1. Motion Anomaly (Velocity and strange paths) motion_risk = self._motion_anomaly(state) # 2. Behavioral Risk (Sequential pattern matching) behavior_risk = self._behavior_risk(state) # 3. Contextual Weight (Situational priors) context_weight = self._context_weight(state.context) # 4. Weighted Fusion of Risk Signals # 40% Motion, 40% Behavior, 20% Context risk_score = ( 0.4 * motion_risk + 0.4 * behavior_risk + 0.2 * context_weight ) # 5. Uncertainty (Entropy or sensor confidence inverse) uncertainty = self._estimate_uncertainty(state) return PredictionResult( entity_id=state.entity_id, risk_score=round(risk_score, 3), predicted_motion=self._predict_motion(state), uncertainty=round(uncertainty, 3), explanation=self._generate_explanation(motion_risk, behavior_risk, context_weight) ) def _predict_motion(self, state: EntityState) -> Tuple[float, float]: """Simple linear extrapolation (Kalman baseline).""" x, y = state.position vx, vy = state.velocity return (x + vx, y + vy) def _motion_anomaly(self, state: EntityState) -> float: """High velocity or rapid acceleration increases risk.""" vx, vy = state.velocity speed = math.sqrt(vx**2 + vy**2) # Normalize: 10 units/frame is 'critically high speed' return min(speed / 10.0, 1.0) def _behavior_risk(self, state: EntityState) -> float: """Detect pre-threat behavioral sequences.""" actions = state.behavior_vector.get("recent_actions", []) # High-risk pre-event patterns risky_patterns = { "approaching": 0.2, "reaching": 0.4, "hiding": 0.3, "agitated": 0.4, "running": 0.3, "loitering": 0.1, "drawn_weapon": 0.9 # High but still probabilistic } score = 0.0 for action in actions: score += risky_patterns.get(action.lower(), 0.0) return min(score, 1.0) def _context_weight(self, context: Dict[str, Any]) -> float: """Prior adjustment based on location and environment.""" location = context.get("location", "unknown") lighting = context.get("lighting", "normal") # Simple prior map if location == "kitchen": return 0.2 # Lower suspicion for sharp objects if location == "street" and lighting == "low": return 0.8 # Higher suspicion in dark streets if location == "entryway" or location == "perimeter": return 0.7 # High sensitivity zones return 0.5 # Neutral prior def _estimate_uncertainty(self, state: EntityState) -> float: """Inversely proportional to detection confidence.""" return max(0.0, 1.0 - state.confidence) def _generate_explanation(self, m_risk: float, b_risk: float, c_weight: float) -> str: factors = [] if m_risk > 0.5: factors.append("unusual_motion") if b_risk > 0.5: factors.append("risky_sequence") if c_weight > 0.7: factors.append("sensitive_context") if not factors: return "Steady state observation." return "Warning: Risk elevated by " + " & ".join(factors) # Global singleton prediction_engine = PredictionEngine()