File size: 4,653 Bytes
513d6d1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
import math
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Tuple, Any

@dataclass
class EntityState:
    """Snapshot of an entity's current physical and behavioral state."""
    entity_id: str
    position: Tuple[float, float]  # (x, y) coordinates (normalized or pixel center)
    velocity: Tuple[float, float]   # (vx, vy) change per frame
    behavior_vector: Dict[str, Any] = field(default_factory=dict)
    context: Dict[str, Any] = field(default_factory=dict)
    confidence: float = 1.0

@dataclass
class PredictionResult:
    """The output of the anticipatory logic layer."""
    entity_id: str
    risk_score: float
    predicted_motion: Tuple[float, float]
    uncertainty: float
    explanation: str

class PredictionEngine:
    """
    Anticipatory Intelligence Layer (Sentinel Phase 5).
    Estimates future risk based on temporal trends, behavior, and context.
    """
    def __init__(self, risk_threshold: float = 0.7):
        self.risk_threshold = risk_threshold

    def predict(self, state: EntityState) -> PredictionResult:
        """Estimate the future risk for a single entity."""
        
        # 1. Motion Anomaly (Velocity and strange paths)
        motion_risk = self._motion_anomaly(state)
        
        # 2. Behavioral Risk (Sequential pattern matching)
        behavior_risk = self._behavior_risk(state)
        
        # 3. Contextual Weight (Situational priors)
        context_weight = self._context_weight(state.context)

        # 4. Weighted Fusion of Risk Signals
        # 40% Motion, 40% Behavior, 20% Context
        risk_score = (
            0.4 * motion_risk +
            0.4 * behavior_risk +
            0.2 * context_weight
        )

        # 5. Uncertainty (Entropy or sensor confidence inverse)
        uncertainty = self._estimate_uncertainty(state)

        return PredictionResult(
            entity_id=state.entity_id,
            risk_score=round(risk_score, 3),
            predicted_motion=self._predict_motion(state),
            uncertainty=round(uncertainty, 3),
            explanation=self._generate_explanation(motion_risk, behavior_risk, context_weight)
        )

    def _predict_motion(self, state: EntityState) -> Tuple[float, float]:
        """Simple linear extrapolation (Kalman baseline)."""
        x, y = state.position
        vx, vy = state.velocity
        return (x + vx, y + vy)

    def _motion_anomaly(self, state: EntityState) -> float:
        """High velocity or rapid acceleration increases risk."""
        vx, vy = state.velocity
        speed = math.sqrt(vx**2 + vy**2)
        # Normalize: 10 units/frame is 'critically high speed'
        return min(speed / 10.0, 1.0)

    def _behavior_risk(self, state: EntityState) -> float:
        """Detect pre-threat behavioral sequences."""
        actions = state.behavior_vector.get("recent_actions", [])
        
        # High-risk pre-event patterns
        risky_patterns = {
            "approaching": 0.2,
            "reaching": 0.4,
            "hiding": 0.3,
            "agitated": 0.4,
            "running": 0.3,
            "loitering": 0.1,
            "drawn_weapon": 0.9 # High but still probabilistic
        }

        score = 0.0
        for action in actions:
            score += risky_patterns.get(action.lower(), 0.0)

        return min(score, 1.0)

    def _context_weight(self, context: Dict[str, Any]) -> float:
        """Prior adjustment based on location and environment."""
        location = context.get("location", "unknown")
        lighting = context.get("lighting", "normal")
        
        # Simple prior map
        if location == "kitchen": 
            return 0.2 # Lower suspicion for sharp objects
        if location == "street" and lighting == "low":
            return 0.8 # Higher suspicion in dark streets
        if location == "entryway" or location == "perimeter":
            return 0.7 # High sensitivity zones
            
        return 0.5 # Neutral prior

    def _estimate_uncertainty(self, state: EntityState) -> float:
        """Inversely proportional to detection confidence."""
        return max(0.0, 1.0 - state.confidence)

    def _generate_explanation(self, m_risk: float, b_risk: float, c_weight: float) -> str:
        factors = []
        if m_risk > 0.5: factors.append("unusual_motion")
        if b_risk > 0.5: factors.append("risky_sequence")
        if c_weight > 0.7: factors.append("sensitive_context")
        
        if not factors: return "Steady state observation."
        return "Warning: Risk elevated by " + " & ".join(factors)

# Global singleton
prediction_engine = PredictionEngine()