| """ |
| Sentinel Context Engine β Live state vector from multi-sensor fusion. |
| |
| Maintains a real-time understanding of the user's world by fusing events |
| from all sensors into a single coherent state object. Every frame, detection, |
| pose reading, audio event, and sensor update incrementally refines the context. |
| |
| The ContextState is the single source of truth for "what is happening right now." |
| It feeds the VLM prompt, the Prediction Engine, and the UI telemetry. |
| |
| Usage: |
| bus = EventBus() |
| context = ContextEngine(bus) # auto-subscribes to bus events |
| # ... events published to bus automatically update context.state ... |
| prompt = context.state.to_prompt_context() # string for VLM |
| """ |
|
|
| import math |
| import time |
| from dataclasses import dataclass, field |
|
|
| try: |
| import structlog |
| logger = structlog.get_logger() |
| except ImportError: |
| import logging |
| logger = logging.getLogger("sentinel.context") |
|
|
| from event_bus import EventBus, Event |
|
|
|
|
| def _compass_direction(deg: float) -> str: |
| """Convert degrees to compass heading string.""" |
| dirs = ["N", "NE", "E", "SE", "S", "SW", "W", "NW"] |
| idx = int(((deg + 22.5) % 360) / 45) |
| return dirs[idx] |
|
|
|
|
| @dataclass |
| class ContextState: |
| """ |
| Live state vector representing the system's current understanding |
| of the user's world. Updated incrementally from sensor events. |
| |
| All fields have safe defaults so the system works even when |
| individual sensors are unavailable (e.g., iOS Safari). |
| """ |
|
|
| |
| location_type: str = "unknown" |
| """Inferred environment: "unknown", "indoors", "outdoors", "transit".""" |
|
|
| gps: tuple = (0.0, 0.0) |
| speed: float = 0.0 |
| heading: float = 0.0 |
| heading_dir: str = "N" |
|
|
| |
| activity: str = "idle" |
| """Current activity: "idle", "walking", "standing", "sitting", "running", "fallen".""" |
|
|
| movement_speed: float = 0.0 |
| """Estimated movement speed from accelerometer variance (0.0 = stationary).""" |
|
|
| |
| light_level: float = 0.0 |
| """Ambient light in lux (0 = dark, 500+ = bright outdoor).""" |
|
|
| light_condition: str = "unknown" |
| """Derived label: "dark", "dim", "normal", "bright".""" |
|
|
| noise_level: float = 0.0 |
| """Audio RMS level (0.0 = silence, 1.0 = maximum).""" |
|
|
| noise_condition: str = "unknown" |
| """Derived label: "quiet", "moderate", "loud".""" |
|
|
| environment: str = "unknown" |
| """Combined environment descriptor: "dark quiet", "bright noisy", etc.""" |
|
|
| |
| nearby_people: int = 0 |
| closest_person_distance: float = 999.0 |
| """Estimated distance in meters from bounding box area (larger bbox = closer).""" |
|
|
| closest_person_trend: str = "none" |
| """Distance trend: "approaching", "receding", "stationary", "none".""" |
|
|
| detected_objects: list = field(default_factory=list) |
| """Current frame object class names: ["person", "vehicle", ...].""" |
|
|
| |
| risk_level: float = 0.0 |
| """Composite risk score 0.0 (safe) to 1.0 (critical).""" |
|
|
| risk_factors: list = field(default_factory=list) |
| """Active risk factor labels: ["fall_detected", "vehicle_nearby", ...].""" |
|
|
| |
| battery: float = 100.0 |
| frames_analyzed: int = 0 |
| vlm_calls: int = 0 |
| last_trigger_time: float = 0.0 |
|
|
| |
|
|
| def to_prompt_context(self) -> str: |
| """ |
| Format this state as a natural language string for VLM prompts. |
| Designed to be ~60-80 tokens to stay within budget. |
| """ |
| people_info = "No people detected." |
| if self.nearby_people > 0: |
| people_info = ( |
| f"{self.nearby_people} people nearby, " |
| f"closest ~{self.closest_person_distance:.1f}m " |
| f"({self.closest_person_trend})" |
| ) |
|
|
| risk_info = f"Risk: {self.risk_level:.2f}" |
| if self.risk_factors: |
| risk_info += f" [{', '.join(self.risk_factors[:3])}]" |
|
|
| return ( |
| f"Activity: {self.activity} in {self.environment} {self.location_type}. " |
| f"Light: {self.light_level:.0f} lux ({self.light_condition}). " |
| f"Speed: {self.speed:.1f} m/s, heading {self.heading:.0f}Β° {self.heading_dir}. " |
| f"{people_info}. " |
| f"{risk_info}." |
| ) |
|
|
| def to_dict(self) -> dict: |
| """Serializable snapshot for logging, storage, or API responses.""" |
| return { |
| "location_type": self.location_type, |
| "gps": self.gps, |
| "speed": self.speed, |
| "heading": self.heading, |
| "activity": self.activity, |
| "light_level": self.light_level, |
| "light_condition": self.light_condition, |
| "noise_level": self.noise_level, |
| "noise_condition": self.noise_condition, |
| "environment": self.environment, |
| "nearby_people": self.nearby_people, |
| "closest_person_distance": self.closest_person_distance, |
| "closest_person_trend": self.closest_person_trend, |
| "detected_objects": self.detected_objects, |
| "risk_level": self.risk_level, |
| "risk_factors": list(self.risk_factors), |
| "battery": self.battery, |
| "frames_analyzed": self.frames_analyzed, |
| } |
|
|
| def to_ui_summary(self) -> str: |
| """Compact one-line summary for HUD/UI display.""" |
| return ( |
| f"{self.activity.upper()} | {self.environment} | " |
| f"{self.nearby_people}ppl | " |
| f"risk:{self.risk_level:.1f} | " |
| f"bat:{self.battery:.0f}%" |
| ) |
|
|
|
|
| class ContextEngine: |
| """ |
| Subscribes to EventBus events and incrementally updates a ContextState. |
| |
| On construction, auto-subscribes to all relevant event types: |
| "frame.changed" β frame counter + change detection |
| "detection.objects" β people count + distance + object tracking |
| "pose.update" β activity classification + fall detection |
| "audio.class" β noise classification + alert sounds |
| "sensor.telemetry" β light, battery, heading, GPS, motion |
| "alert.generated" β VLM call counter + trigger timestamp |
| |
| The engine applies domain heuristics: |
| - Person distance from bounding box area (inverse relationship) |
| - Person trend from rolling distance history (approaching/receding) |
| - Risk accumulation from detections + pose + environment |
| - Risk decay when no new triggers arrive |
| - Indoor/outdoor inference from light levels |
| - Activity classification from pose + motion speed |
| """ |
|
|
| |
| _PERSON_HISTORY_MAX = 5 |
| _RISK_DECAY_RATE = 0.02 |
|
|
| def __init__(self, event_bus: EventBus | None = None): |
| self.state = ContextState() |
| self._person_distances: list[float] = [] |
| self._last_pose_time: float = 0.0 |
| self._fall_start_time: float = 0.0 |
|
|
| if event_bus is not None: |
| self.subscribe_to(event_bus) |
|
|
| def subscribe_to(self, bus: EventBus) -> None: |
| """Register this engine's handlers on an event bus.""" |
| bus.subscribe("frame.changed", self._on_frame) |
| bus.subscribe("detection.objects", self._on_detection) |
| bus.subscribe("pose.update", self._on_pose) |
| bus.subscribe("audio.class", self._on_audio) |
| bus.subscribe("sensor.telemetry", self._on_sensor) |
| bus.subscribe("alert.generated", self._on_alert) |
|
|
| |
|
|
| def _on_frame(self, event: Event) -> None: |
| """Update frame counter and motion metrics.""" |
| self.state.frames_analyzed += 1 |
| change_pct = event.data.get("change_pct", 0.0) |
| if change_pct < 1.0: |
| self.state.movement_speed *= 0.95 |
|
|
| def _on_detection(self, event: Event) -> None: |
| """Update social context and object-driven risk.""" |
| detections = event.data.get("detections", []) |
| self.state.detected_objects = [d.get("class", "") for d in detections] |
|
|
| self._update_person_tracking(detections) |
| self._update_object_risk(detections) |
|
|
| def _update_person_tracking(self, detections: list[dict]) -> None: |
| """Track people count and closest person distance with trend.""" |
| persons = [d for d in detections if d.get("class") == "person"] |
| self.state.nearby_people = len(persons) |
|
|
| if not persons: |
| self.state.closest_person_distance = 999.0 |
| self.state.closest_person_trend = "none" |
| self._person_distances.clear() |
| return |
|
|
| area_pct = persons[0].get("area_pct", 1.0) |
| est_distance = max(0.3, min(30.0, 8.0 / (area_pct + 0.1))) |
| self.state.closest_person_distance = est_distance |
|
|
| self._person_distances.append(est_distance) |
| if len(self._person_distances) > self._PERSON_HISTORY_MAX: |
| self._person_distances = self._person_distances[-self._PERSON_HISTORY_MAX:] |
|
|
| if len(self._person_distances) >= 3: |
| trend = self._person_distances[-1] - self._person_distances[0] |
| if trend < -0.5: |
| self.state.closest_person_trend = "approaching" |
| elif trend > 0.5: |
| self.state.closest_person_trend = "receding" |
| else: |
| self.state.closest_person_trend = "stationary" |
|
|
| def _update_object_risk(self, detections: list[dict]) -> None: |
| """Update risk factors based on detected object classes.""" |
| for det in detections: |
| cls = det.get("class", "") |
| dist = det.get("area_pct", 0.0) |
|
|
| if cls == "vehicle" and dist > 5.0: |
| self._add_risk("vehicle_nearby", 0.3) |
| elif cls == "fire": |
| self._add_risk("fire_detected", 0.7) |
| elif cls == "animal" and dist > 10.0: |
| self._add_risk("animal_nearby", 0.15) |
|
|
| def _on_pose(self, event: Event) -> None: |
| """Update activity classification and fall state.""" |
| data = event.data |
| self._last_pose_time = event.timestamp |
|
|
| if data.get("is_fall"): |
| self.state.activity = "fallen" |
| self._add_risk("fall_detected", 0.6) |
| if self._fall_start_time == 0.0: |
| self._fall_start_time = event.timestamp |
| else: |
| self._fall_start_time = 0.0 |
| if data.get("is_sitting"): |
| self.state.activity = "sitting" |
| elif self.state.movement_speed > 1.5: |
| self.state.activity = "running" |
| elif self.state.movement_speed > 0.3: |
| self.state.activity = "walking" |
| else: |
| self.state.activity = "standing" |
|
|
| def _on_audio(self, event: Event) -> None: |
| """Update noise environment from audio classification.""" |
| data = event.data |
| rms = data.get("rms", 0.0) |
| self.state.noise_level = max(self.state.noise_level * 0.9, rms) |
|
|
| if self.state.noise_level > 0.5: |
| self.state.noise_condition = "loud" |
| elif self.state.noise_level > 0.1: |
| self.state.noise_condition = "moderate" |
| else: |
| self.state.noise_condition = "quiet" |
|
|
| self._update_environment_label() |
|
|
| alert = data.get("alert_level", "") |
| if alert == "critical": |
| self._add_risk("alarm_sound", 0.4) |
| elif alert == "warning": |
| self._add_risk("alert_sound", 0.2) |
|
|
| def _on_sensor(self, event: Event) -> None: |
| """Update environmental readings and location inference.""" |
| data = event.data |
|
|
| self.state.light_level = data.get("light_level", self.state.light_level) |
| self.state.battery = data.get("battery_pct", self.state.battery) |
| self.state.heading = data.get("heading", self.state.heading) |
| self.state.heading_dir = _compass_direction(self.state.heading) |
|
|
| gps = data.get("gps") |
| if gps and gps != (0.0, 0.0): |
| old_gps = self.state.gps |
| self.state.gps = gps |
| if old_gps != (0.0, 0.0): |
| delta = math.sqrt( |
| (gps[0] - old_gps[0]) ** 2 + (gps[1] - old_gps[1]) ** 2 |
| ) |
| self.state.speed = delta * 111_000 |
|
|
| accel = data.get("accelerometer") |
| if accel: |
| magnitude = math.sqrt(sum(a ** 2 for a in accel)) |
| variance = abs(magnitude - 9.81) |
| self.state.movement_speed = max( |
| self.state.movement_speed * 0.8, |
| variance * 0.5 |
| ) |
|
|
| if self.state.light_level > 300: |
| self.state.light_condition = "bright" |
| self.state.location_type = "outdoors" |
| elif self.state.light_level > 50: |
| self.state.light_condition = "normal" |
| elif self.state.light_level > 5: |
| self.state.light_condition = "dim" |
| self.state.location_type = "indoors" |
| else: |
| self.state.light_condition = "dark" |
| self.state.location_type = "indoors" |
|
|
| self._update_environment_label() |
|
|
| def _on_alert(self, event: Event) -> None: |
| """Track VLM call metrics.""" |
| self.state.vlm_calls += 1 |
| self.state.last_trigger_time = event.timestamp |
|
|
| |
|
|
| def _add_risk(self, factor: str, amount: float) -> None: |
| """Add a risk factor and accumulate risk score (capped at 1.0).""" |
| if factor not in self.state.risk_factors: |
| self.state.risk_factors.append(factor) |
| self.state.risk_level = min(1.0, self.state.risk_level + amount) |
|
|
| def _remove_risk(self, factor: str) -> None: |
| """Remove a risk factor.""" |
| if factor in self.state.risk_factors: |
| self.state.risk_factors.remove(factor) |
|
|
| def decay_risk(self, amount: float = 0.01) -> None: |
| """ |
| Gradually reduce risk level when no new triggers arrive. |
| Call this periodically (e.g., every frame) to prevent |
| stale risk accumulation. |
| """ |
| self.state.risk_level = max(0.0, self.state.risk_level - amount) |
| if self.state.risk_level < 0.05: |
| self.state.risk_factors.clear() |
|
|
| def _update_environment_label(self) -> None: |
| """Recompute the combined environment descriptor.""" |
| parts = [] |
| if self.state.light_condition not in ("unknown", "normal"): |
| parts.append(self.state.light_condition) |
| if self.state.noise_condition not in ("unknown", "moderate"): |
| parts.append(self.state.noise_condition) |
| self.state.environment = " ".join(parts) if parts else "normal" |
|
|
| def reset(self) -> None: |
| """Reset to initial state.""" |
| self.state = ContextState() |
| self._person_distances.clear() |
| self._last_pose_time = 0.0 |
| self._fall_start_time = 0.0 |
|
|