Toun / prediction_engine.py
babaTEEpe's picture
Upload 17 files
513d6d1 verified
import math
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Tuple, Any
@dataclass
class EntityState:
"""Snapshot of an entity's current physical and behavioral state."""
entity_id: str
position: Tuple[float, float] # (x, y) coordinates (normalized or pixel center)
velocity: Tuple[float, float] # (vx, vy) change per frame
behavior_vector: Dict[str, Any] = field(default_factory=dict)
context: Dict[str, Any] = field(default_factory=dict)
confidence: float = 1.0
@dataclass
class PredictionResult:
"""The output of the anticipatory logic layer."""
entity_id: str
risk_score: float
predicted_motion: Tuple[float, float]
uncertainty: float
explanation: str
class PredictionEngine:
"""
Anticipatory Intelligence Layer (Sentinel Phase 5).
Estimates future risk based on temporal trends, behavior, and context.
"""
def __init__(self, risk_threshold: float = 0.7):
self.risk_threshold = risk_threshold
def predict(self, state: EntityState) -> PredictionResult:
"""Estimate the future risk for a single entity."""
# 1. Motion Anomaly (Velocity and strange paths)
motion_risk = self._motion_anomaly(state)
# 2. Behavioral Risk (Sequential pattern matching)
behavior_risk = self._behavior_risk(state)
# 3. Contextual Weight (Situational priors)
context_weight = self._context_weight(state.context)
# 4. Weighted Fusion of Risk Signals
# 40% Motion, 40% Behavior, 20% Context
risk_score = (
0.4 * motion_risk +
0.4 * behavior_risk +
0.2 * context_weight
)
# 5. Uncertainty (Entropy or sensor confidence inverse)
uncertainty = self._estimate_uncertainty(state)
return PredictionResult(
entity_id=state.entity_id,
risk_score=round(risk_score, 3),
predicted_motion=self._predict_motion(state),
uncertainty=round(uncertainty, 3),
explanation=self._generate_explanation(motion_risk, behavior_risk, context_weight)
)
def _predict_motion(self, state: EntityState) -> Tuple[float, float]:
"""Simple linear extrapolation (Kalman baseline)."""
x, y = state.position
vx, vy = state.velocity
return (x + vx, y + vy)
def _motion_anomaly(self, state: EntityState) -> float:
"""High velocity or rapid acceleration increases risk."""
vx, vy = state.velocity
speed = math.sqrt(vx**2 + vy**2)
# Normalize: 10 units/frame is 'critically high speed'
return min(speed / 10.0, 1.0)
def _behavior_risk(self, state: EntityState) -> float:
"""Detect pre-threat behavioral sequences."""
actions = state.behavior_vector.get("recent_actions", [])
# High-risk pre-event patterns
risky_patterns = {
"approaching": 0.2,
"reaching": 0.4,
"hiding": 0.3,
"agitated": 0.4,
"running": 0.3,
"loitering": 0.1,
"drawn_weapon": 0.9 # High but still probabilistic
}
score = 0.0
for action in actions:
score += risky_patterns.get(action.lower(), 0.0)
return min(score, 1.0)
def _context_weight(self, context: Dict[str, Any]) -> float:
"""Prior adjustment based on location and environment."""
location = context.get("location", "unknown")
lighting = context.get("lighting", "normal")
# Simple prior map
if location == "kitchen":
return 0.2 # Lower suspicion for sharp objects
if location == "street" and lighting == "low":
return 0.8 # Higher suspicion in dark streets
if location == "entryway" or location == "perimeter":
return 0.7 # High sensitivity zones
return 0.5 # Neutral prior
def _estimate_uncertainty(self, state: EntityState) -> float:
"""Inversely proportional to detection confidence."""
return max(0.0, 1.0 - state.confidence)
def _generate_explanation(self, m_risk: float, b_risk: float, c_weight: float) -> str:
factors = []
if m_risk > 0.5: factors.append("unusual_motion")
if b_risk > 0.5: factors.append("risky_sequence")
if c_weight > 0.7: factors.append("sensitive_context")
if not factors: return "Steady state observation."
return "Warning: Risk elevated by " + " & ".join(factors)
# Global singleton
prediction_engine = PredictionEngine()