sentinel / context_engine.py
kswffs's picture
Upload folder using huggingface_hub
b96103d verified
Raw
History Blame Contribute Delete
16.4 kB
"""
Sentinel Context Engine β€” Live state vector from multi-sensor fusion.
Maintains a real-time understanding of the user's world by fusing events
from all sensors into a single coherent state object. Every frame, detection,
pose reading, audio event, and sensor update incrementally refines the context.
The ContextState is the single source of truth for "what is happening right now."
It feeds the VLM prompt, the Prediction Engine, and the UI telemetry.
Usage:
bus = EventBus()
context = ContextEngine(bus) # auto-subscribes to bus events
# ... events published to bus automatically update context.state ...
prompt = context.state.to_prompt_context() # string for VLM
"""
import math
import time
from dataclasses import dataclass, field
try:
import structlog
logger = structlog.get_logger()
except ImportError:
import logging
logger = logging.getLogger("sentinel.context")
from event_bus import EventBus, Event
def _compass_direction(deg: float) -> str:
"""Convert degrees to compass heading string."""
dirs = ["N", "NE", "E", "SE", "S", "SW", "W", "NW"]
idx = int(((deg + 22.5) % 360) / 45)
return dirs[idx]
@dataclass
class ContextState:
"""
Live state vector representing the system's current understanding
of the user's world. Updated incrementally from sensor events.
All fields have safe defaults so the system works even when
individual sensors are unavailable (e.g., iOS Safari).
"""
# ── Location ────────────────────────────────────────────────────────────
location_type: str = "unknown"
"""Inferred environment: "unknown", "indoors", "outdoors", "transit"."""
gps: tuple = (0.0, 0.0)
speed: float = 0.0
heading: float = 0.0
heading_dir: str = "N"
# ── Activity ────────────────────────────────────────────────────────────
activity: str = "idle"
"""Current activity: "idle", "walking", "standing", "sitting", "running", "fallen"."""
movement_speed: float = 0.0
"""Estimated movement speed from accelerometer variance (0.0 = stationary)."""
# ── Environment ─────────────────────────────────────────────────────────
light_level: float = 0.0
"""Ambient light in lux (0 = dark, 500+ = bright outdoor)."""
light_condition: str = "unknown"
"""Derived label: "dark", "dim", "normal", "bright"."""
noise_level: float = 0.0
"""Audio RMS level (0.0 = silence, 1.0 = maximum)."""
noise_condition: str = "unknown"
"""Derived label: "quiet", "moderate", "loud"."""
environment: str = "unknown"
"""Combined environment descriptor: "dark quiet", "bright noisy", etc."""
# ── Social ──────────────────────────────────────────────────────────────
nearby_people: int = 0
closest_person_distance: float = 999.0
"""Estimated distance in meters from bounding box area (larger bbox = closer)."""
closest_person_trend: str = "none"
"""Distance trend: "approaching", "receding", "stationary", "none"."""
detected_objects: list = field(default_factory=list)
"""Current frame object class names: ["person", "vehicle", ...]."""
# ── Risk ────────────────────────────────────────────────────────────────
risk_level: float = 0.0
"""Composite risk score 0.0 (safe) to 1.0 (critical)."""
risk_factors: list = field(default_factory=list)
"""Active risk factor labels: ["fall_detected", "vehicle_nearby", ...]."""
# ── System ──────────────────────────────────────────────────────────────
battery: float = 100.0
frames_analyzed: int = 0
vlm_calls: int = 0
last_trigger_time: float = 0.0
# ── Formatters ──────────────────────────────────────────────────────────
def to_prompt_context(self) -> str:
"""
Format this state as a natural language string for VLM prompts.
Designed to be ~60-80 tokens to stay within budget.
"""
people_info = "No people detected."
if self.nearby_people > 0:
people_info = (
f"{self.nearby_people} people nearby, "
f"closest ~{self.closest_person_distance:.1f}m "
f"({self.closest_person_trend})"
)
risk_info = f"Risk: {self.risk_level:.2f}"
if self.risk_factors:
risk_info += f" [{', '.join(self.risk_factors[:3])}]"
return (
f"Activity: {self.activity} in {self.environment} {self.location_type}. "
f"Light: {self.light_level:.0f} lux ({self.light_condition}). "
f"Speed: {self.speed:.1f} m/s, heading {self.heading:.0f}Β° {self.heading_dir}. "
f"{people_info}. "
f"{risk_info}."
)
def to_dict(self) -> dict:
"""Serializable snapshot for logging, storage, or API responses."""
return {
"location_type": self.location_type,
"gps": self.gps,
"speed": self.speed,
"heading": self.heading,
"activity": self.activity,
"light_level": self.light_level,
"light_condition": self.light_condition,
"noise_level": self.noise_level,
"noise_condition": self.noise_condition,
"environment": self.environment,
"nearby_people": self.nearby_people,
"closest_person_distance": self.closest_person_distance,
"closest_person_trend": self.closest_person_trend,
"detected_objects": self.detected_objects,
"risk_level": self.risk_level,
"risk_factors": list(self.risk_factors),
"battery": self.battery,
"frames_analyzed": self.frames_analyzed,
}
def to_ui_summary(self) -> str:
"""Compact one-line summary for HUD/UI display."""
return (
f"{self.activity.upper()} | {self.environment} | "
f"{self.nearby_people}ppl | "
f"risk:{self.risk_level:.1f} | "
f"bat:{self.battery:.0f}%"
)
class ContextEngine:
"""
Subscribes to EventBus events and incrementally updates a ContextState.
On construction, auto-subscribes to all relevant event types:
"frame.changed" β†’ frame counter + change detection
"detection.objects" β†’ people count + distance + object tracking
"pose.update" β†’ activity classification + fall detection
"audio.class" β†’ noise classification + alert sounds
"sensor.telemetry" β†’ light, battery, heading, GPS, motion
"alert.generated" β†’ VLM call counter + trigger timestamp
The engine applies domain heuristics:
- Person distance from bounding box area (inverse relationship)
- Person trend from rolling distance history (approaching/receding)
- Risk accumulation from detections + pose + environment
- Risk decay when no new triggers arrive
- Indoor/outdoor inference from light levels
- Activity classification from pose + motion speed
"""
# Distance history for trend calculation
_PERSON_HISTORY_MAX = 5
_RISK_DECAY_RATE = 0.02
def __init__(self, event_bus: EventBus | None = None):
self.state = ContextState()
self._person_distances: list[float] = []
self._last_pose_time: float = 0.0
self._fall_start_time: float = 0.0
if event_bus is not None:
self.subscribe_to(event_bus)
def subscribe_to(self, bus: EventBus) -> None:
"""Register this engine's handlers on an event bus."""
bus.subscribe("frame.changed", self._on_frame)
bus.subscribe("detection.objects", self._on_detection)
bus.subscribe("pose.update", self._on_pose)
bus.subscribe("audio.class", self._on_audio)
bus.subscribe("sensor.telemetry", self._on_sensor)
bus.subscribe("alert.generated", self._on_alert)
# ── Event Handlers ──────────────────────────────────────────────────────
def _on_frame(self, event: Event) -> None:
"""Update frame counter and motion metrics."""
self.state.frames_analyzed += 1
change_pct = event.data.get("change_pct", 0.0)
if change_pct < 1.0:
self.state.movement_speed *= 0.95
def _on_detection(self, event: Event) -> None:
"""Update social context and object-driven risk."""
detections = event.data.get("detections", [])
self.state.detected_objects = [d.get("class", "") for d in detections]
self._update_person_tracking(detections)
self._update_object_risk(detections)
def _update_person_tracking(self, detections: list[dict]) -> None:
"""Track people count and closest person distance with trend."""
persons = [d for d in detections if d.get("class") == "person"]
self.state.nearby_people = len(persons)
if not persons:
self.state.closest_person_distance = 999.0
self.state.closest_person_trend = "none"
self._person_distances.clear()
return
area_pct = persons[0].get("area_pct", 1.0)
est_distance = max(0.3, min(30.0, 8.0 / (area_pct + 0.1)))
self.state.closest_person_distance = est_distance
self._person_distances.append(est_distance)
if len(self._person_distances) > self._PERSON_HISTORY_MAX:
self._person_distances = self._person_distances[-self._PERSON_HISTORY_MAX:]
if len(self._person_distances) >= 3:
trend = self._person_distances[-1] - self._person_distances[0]
if trend < -0.5:
self.state.closest_person_trend = "approaching"
elif trend > 0.5:
self.state.closest_person_trend = "receding"
else:
self.state.closest_person_trend = "stationary"
def _update_object_risk(self, detections: list[dict]) -> None:
"""Update risk factors based on detected object classes."""
for det in detections:
cls = det.get("class", "")
dist = det.get("area_pct", 0.0)
if cls == "vehicle" and dist > 5.0:
self._add_risk("vehicle_nearby", 0.3)
elif cls == "fire":
self._add_risk("fire_detected", 0.7)
elif cls == "animal" and dist > 10.0:
self._add_risk("animal_nearby", 0.15)
def _on_pose(self, event: Event) -> None:
"""Update activity classification and fall state."""
data = event.data
self._last_pose_time = event.timestamp
if data.get("is_fall"):
self.state.activity = "fallen"
self._add_risk("fall_detected", 0.6)
if self._fall_start_time == 0.0:
self._fall_start_time = event.timestamp
else:
self._fall_start_time = 0.0
if data.get("is_sitting"):
self.state.activity = "sitting"
elif self.state.movement_speed > 1.5:
self.state.activity = "running"
elif self.state.movement_speed > 0.3:
self.state.activity = "walking"
else:
self.state.activity = "standing"
def _on_audio(self, event: Event) -> None:
"""Update noise environment from audio classification."""
data = event.data
rms = data.get("rms", 0.0)
self.state.noise_level = max(self.state.noise_level * 0.9, rms)
if self.state.noise_level > 0.5:
self.state.noise_condition = "loud"
elif self.state.noise_level > 0.1:
self.state.noise_condition = "moderate"
else:
self.state.noise_condition = "quiet"
self._update_environment_label()
alert = data.get("alert_level", "")
if alert == "critical":
self._add_risk("alarm_sound", 0.4)
elif alert == "warning":
self._add_risk("alert_sound", 0.2)
def _on_sensor(self, event: Event) -> None:
"""Update environmental readings and location inference."""
data = event.data
self.state.light_level = data.get("light_level", self.state.light_level)
self.state.battery = data.get("battery_pct", self.state.battery)
self.state.heading = data.get("heading", self.state.heading)
self.state.heading_dir = _compass_direction(self.state.heading)
gps = data.get("gps")
if gps and gps != (0.0, 0.0):
old_gps = self.state.gps
self.state.gps = gps
if old_gps != (0.0, 0.0):
delta = math.sqrt(
(gps[0] - old_gps[0]) ** 2 + (gps[1] - old_gps[1]) ** 2
)
self.state.speed = delta * 111_000
accel = data.get("accelerometer")
if accel:
magnitude = math.sqrt(sum(a ** 2 for a in accel))
variance = abs(magnitude - 9.81)
self.state.movement_speed = max(
self.state.movement_speed * 0.8,
variance * 0.5
)
if self.state.light_level > 300:
self.state.light_condition = "bright"
self.state.location_type = "outdoors"
elif self.state.light_level > 50:
self.state.light_condition = "normal"
elif self.state.light_level > 5:
self.state.light_condition = "dim"
self.state.location_type = "indoors"
else:
self.state.light_condition = "dark"
self.state.location_type = "indoors"
self._update_environment_label()
def _on_alert(self, event: Event) -> None:
"""Track VLM call metrics."""
self.state.vlm_calls += 1
self.state.last_trigger_time = event.timestamp
# ── Internal Helpers ────────────────────────────────────────────────────
def _add_risk(self, factor: str, amount: float) -> None:
"""Add a risk factor and accumulate risk score (capped at 1.0)."""
if factor not in self.state.risk_factors:
self.state.risk_factors.append(factor)
self.state.risk_level = min(1.0, self.state.risk_level + amount)
def _remove_risk(self, factor: str) -> None:
"""Remove a risk factor."""
if factor in self.state.risk_factors:
self.state.risk_factors.remove(factor)
def decay_risk(self, amount: float = 0.01) -> None:
"""
Gradually reduce risk level when no new triggers arrive.
Call this periodically (e.g., every frame) to prevent
stale risk accumulation.
"""
self.state.risk_level = max(0.0, self.state.risk_level - amount)
if self.state.risk_level < 0.05:
self.state.risk_factors.clear()
def _update_environment_label(self) -> None:
"""Recompute the combined environment descriptor."""
parts = []
if self.state.light_condition not in ("unknown", "normal"):
parts.append(self.state.light_condition)
if self.state.noise_condition not in ("unknown", "moderate"):
parts.append(self.state.noise_condition)
self.state.environment = " ".join(parts) if parts else "normal"
def reset(self) -> None:
"""Reset to initial state."""
self.state = ContextState()
self._person_distances.clear()
self._last_pose_time = 0.0
self._fall_start_time = 0.0