| import math |
| from dataclasses import dataclass, field |
| from typing import Dict, List, Optional, Tuple, Any |
|
|
| @dataclass |
| class EntityState: |
| """Snapshot of an entity's current physical and behavioral state.""" |
| entity_id: str |
| position: Tuple[float, float] |
| velocity: Tuple[float, float] |
| behavior_vector: Dict[str, Any] = field(default_factory=dict) |
| context: Dict[str, Any] = field(default_factory=dict) |
| confidence: float = 1.0 |
|
|
| @dataclass |
| class PredictionResult: |
| """The output of the anticipatory logic layer.""" |
| entity_id: str |
| risk_score: float |
| predicted_motion: Tuple[float, float] |
| uncertainty: float |
| explanation: str |
|
|
| class PredictionEngine: |
| """ |
| Anticipatory Intelligence Layer (Sentinel Phase 5). |
| Estimates future risk based on temporal trends, behavior, and context. |
| """ |
| def __init__(self, risk_threshold: float = 0.7): |
| self.risk_threshold = risk_threshold |
|
|
| def predict(self, state: EntityState) -> PredictionResult: |
| """Estimate the future risk for a single entity.""" |
| |
| |
| motion_risk = self._motion_anomaly(state) |
| |
| |
| behavior_risk = self._behavior_risk(state) |
| |
| |
| context_weight = self._context_weight(state.context) |
|
|
| |
| |
| risk_score = ( |
| 0.4 * motion_risk + |
| 0.4 * behavior_risk + |
| 0.2 * context_weight |
| ) |
|
|
| |
| uncertainty = self._estimate_uncertainty(state) |
|
|
| return PredictionResult( |
| entity_id=state.entity_id, |
| risk_score=round(risk_score, 3), |
| predicted_motion=self._predict_motion(state), |
| uncertainty=round(uncertainty, 3), |
| explanation=self._generate_explanation(motion_risk, behavior_risk, context_weight) |
| ) |
|
|
| def _predict_motion(self, state: EntityState) -> Tuple[float, float]: |
| """Simple linear extrapolation (Kalman baseline).""" |
| x, y = state.position |
| vx, vy = state.velocity |
| return (x + vx, y + vy) |
|
|
| def _motion_anomaly(self, state: EntityState) -> float: |
| """High velocity or rapid acceleration increases risk.""" |
| vx, vy = state.velocity |
| speed = math.sqrt(vx**2 + vy**2) |
| |
| return min(speed / 10.0, 1.0) |
|
|
| def _behavior_risk(self, state: EntityState) -> float: |
| """Detect pre-threat behavioral sequences.""" |
| actions = state.behavior_vector.get("recent_actions", []) |
| |
| |
| risky_patterns = { |
| "approaching": 0.2, |
| "reaching": 0.4, |
| "hiding": 0.3, |
| "agitated": 0.4, |
| "running": 0.3, |
| "loitering": 0.1, |
| "drawn_weapon": 0.9 |
| } |
|
|
| score = 0.0 |
| for action in actions: |
| score += risky_patterns.get(action.lower(), 0.0) |
|
|
| return min(score, 1.0) |
|
|
| def _context_weight(self, context: Dict[str, Any]) -> float: |
| """Prior adjustment based on location and environment.""" |
| location = context.get("location", "unknown") |
| lighting = context.get("lighting", "normal") |
| |
| |
| if location == "kitchen": |
| return 0.2 |
| if location == "street" and lighting == "low": |
| return 0.8 |
| if location == "entryway" or location == "perimeter": |
| return 0.7 |
| |
| return 0.5 |
|
|
| def _estimate_uncertainty(self, state: EntityState) -> float: |
| """Inversely proportional to detection confidence.""" |
| return max(0.0, 1.0 - state.confidence) |
|
|
| def _generate_explanation(self, m_risk: float, b_risk: float, c_weight: float) -> str: |
| factors = [] |
| if m_risk > 0.5: factors.append("unusual_motion") |
| if b_risk > 0.5: factors.append("risky_sequence") |
| if c_weight > 0.7: factors.append("sensitive_context") |
| |
| if not factors: return "Steady state observation." |
| return "Warning: Risk elevated by " + " & ".join(factors) |
|
|
| |
| prediction_engine = PredictionEngine() |
|
|