| """ |
| Sentinel Memory Engine β Temporal context storage and trend analysis. |
| |
| Stores periodic snapshots of the ContextState, creating a timeline of what |
| the system has observed. This enables: |
| - Trend detection: "situation is getting worse over the last 30 seconds" |
| - Historical queries: "what was happening 2 minutes ago?" |
| - VLM context enrichment: "previous alerts show a person approaching" |
| - Escalation logic: "3 warnings in 30 seconds β auto-escalate to critical" |
| |
| Usage: |
| memory = MemoryEngine(max_entries=100) |
| memory.store(context_engine.state, alert_level="warning", alert_text="Person nearby") |
| summary = memory.get_trend_summary() # for VLM prompt |
| trend = memory.analyze_trend() # "deteriorating" / "improving" / "stable" |
| """ |
|
|
| import time |
| from dataclasses import dataclass, field |
|
|
| try: |
| import structlog |
| logger = structlog.get_logger() |
| except ImportError: |
| import logging |
| logger = logging.getLogger("sentinel.memory") |
|
|
| from context_engine import ContextState |
|
|
|
|
| @dataclass |
| class MemoryEntry: |
| """ |
| A point-in-time snapshot of the system's understanding. |
| |
| Combines the context state with any alert that was generated, |
| creating a complete record of "what happened at this moment." |
| """ |
| timestamp: float |
| context_summary: str |
| activity: str |
| environment: str |
| location_type: str |
| nearby_people: int |
| closest_person_distance: float |
| closest_person_trend: str |
| risk_level: float |
| risk_factors: list = field(default_factory=list) |
| alert_level: str = "ok" |
| alert_text: str = "" |
| battery: float = 100.0 |
| frames_analyzed: int = 0 |
|
|
| @classmethod |
| def from_context( |
| cls, |
| ctx: ContextState, |
| alert_level: str = "ok", |
| alert_text: str = "" |
| ) -> "MemoryEntry": |
| """Create a MemoryEntry from a live ContextState snapshot.""" |
| return cls( |
| timestamp=time.time(), |
| context_summary=ctx.to_prompt_context(), |
| activity=ctx.activity, |
| environment=ctx.environment, |
| location_type=ctx.location_type, |
| nearby_people=ctx.nearby_people, |
| closest_person_distance=ctx.closest_person_distance, |
| closest_person_trend=ctx.closest_person_trend, |
| risk_level=ctx.risk_level, |
| risk_factors=list(ctx.risk_factors), |
| alert_level=alert_level, |
| alert_text=alert_text, |
| battery=ctx.battery, |
| frames_analyzed=ctx.frames_analyzed, |
| ) |
|
|
| def age_seconds(self) -> float: |
| return time.time() - self.timestamp |
|
|
| def is_alert(self) -> bool: |
| return self.alert_level in ("warning", "critical") |
|
|
|
|
| class MemoryEngine: |
| """ |
| Bounded timeline of context snapshots for temporal reasoning. |
| |
| Stores up to max_entries (default 100) MemoryEntry objects. |
| Provides trend analysis, historical queries, and formatted output |
| for VLM prompt enrichment. |
| |
| Designed for Gradio per-session state (gr.State). All attributes |
| are plain Python types β fully serializable. |
| """ |
|
|
| def __init__(self, max_entries: int = 100): |
| self.entries: list[MemoryEntry] = [] |
| self.max_entries = max_entries |
| self._last_store_time: float = 0.0 |
|
|
| def store( |
| self, |
| context: ContextState, |
| alert_level: str = "ok", |
| alert_text: str = "", |
| min_interval: float = 2.0 |
| ) -> MemoryEntry | None: |
| """ |
| Store a context snapshot. |
| |
| Args: |
| context: Current ContextState to snapshot. |
| alert_level: Alert level if one was generated this frame. |
| alert_text: Alert text if one was generated. |
| min_interval: Minimum seconds between stores (prevents flooding). |
| Alerts (warning/critical) bypass this throttle. |
| |
| Returns: |
| The created MemoryEntry, or None if throttled. |
| """ |
| now = time.time() |
| is_alert = alert_level in ("warning", "critical") |
|
|
| if not is_alert and (now - self._last_store_time) < min_interval: |
| return None |
|
|
| entry = MemoryEntry.from_context(context, alert_level, alert_text) |
| self.entries.append(entry) |
|
|
| if len(self.entries) > self.max_entries: |
| self.entries = self.entries[-self.max_entries:] |
|
|
| self._last_store_time = now |
| return entry |
|
|
| def recent(self, seconds: float = 60.0) -> list[MemoryEntry]: |
| """Retrieve entries from the last N seconds.""" |
| cutoff = time.time() - seconds |
| return [e for e in self.entries if e.timestamp >= cutoff] |
|
|
| def recent_alerts(self, seconds: float = 120.0) -> list[MemoryEntry]: |
| """Retrieve only alert entries (warning/critical) from last N seconds.""" |
| cutoff = time.time() - seconds |
| return [e for e in self.entries if e.timestamp >= cutoff and e.is_alert()] |
|
|
| def alert_count(self, seconds: float = 60.0, level: str | None = None) -> int: |
| """Count alerts in the last N seconds, optionally filtered by level.""" |
| cutoff = time.time() - seconds |
| count = 0 |
| for e in self.entries: |
| if e.timestamp < cutoff: |
| continue |
| if not e.is_alert(): |
| continue |
| if level is None or e.alert_level == level: |
| count += 1 |
| return count |
|
|
| |
|
|
| def analyze_trend(self, window: float = 30.0) -> dict: |
| """ |
| Compare recent entries against older entries to detect trends. |
| |
| Returns a dict: |
| { |
| "direction": "deteriorating" | "improving" | "stable", |
| "risk_delta": float, # positive = risk increasing |
| "alert_rate": float, # alerts per minute |
| "warning_count": int, # warnings in window |
| "critical_count": int, # criticals in window |
| "person_approaching": bool, # is someone closing in? |
| "activity_change": str | None, # e.g. "walking β fallen" |
| } |
| """ |
| now = time.time() |
| recent = [e for e in self.entries if e.timestamp > now - window] |
| older = [ |
| e for e in self.entries |
| if now - window * 2 < e.timestamp <= now - window |
| ] |
|
|
| result = { |
| "direction": "stable", |
| "risk_delta": 0.0, |
| "alert_rate": 0.0, |
| "warning_count": 0, |
| "critical_count": 0, |
| "person_approaching": False, |
| "activity_change": None, |
| } |
|
|
| if len(recent) < 2: |
| return result |
|
|
| recent_risk = sum(e.risk_level for e in recent) / len(recent) |
| older_risk = ( |
| sum(e.risk_level for e in older) / len(older) if older else 0.0 |
| ) |
| risk_delta = recent_risk - older_risk |
| result["risk_delta"] = risk_delta |
|
|
| if risk_delta > 0.15: |
| result["direction"] = "deteriorating" |
| elif risk_delta < -0.15: |
| result["direction"] = "improving" |
|
|
| alerts_in_window = [e for e in recent if e.is_alert()] |
| result["warning_count"] = sum( |
| 1 for e in alerts_in_window if e.alert_level == "warning" |
| ) |
| result["critical_count"] = sum( |
| 1 for e in alerts_in_window if e.alert_level == "critical" |
| ) |
| result["alert_rate"] = len(alerts_in_window) / (window / 60.0) |
|
|
| if any(e.closest_person_trend == "approaching" for e in recent): |
| result["person_approaching"] = True |
|
|
| if older and recent: |
| old_activity = max(set(e.activity for e in older), key=lambda a: sum(1 for e in older if e.activity == a)) |
| new_activity = max(set(e.activity for e in recent), key=lambda a: sum(1 for e in recent if e.activity == a)) |
| if old_activity != new_activity: |
| result["activity_change"] = f"{old_activity} β {new_activity}" |
|
|
| return result |
|
|
| |
|
|
| def get_trend_summary(self, max_entries: int = 5) -> str: |
| """ |
| Format recent history for VLM prompt enrichment. |
| Returns a compact multi-line string with timestamps and alerts. |
| """ |
| recent = self.entries[-max_entries:] |
| if not recent: |
| return "No previous context available." |
|
|
| lines = ["Recent observations:"] |
| for entry in recent: |
| t = time.strftime("%H:%M:%S", time.localtime(entry.timestamp)) |
| alert_tag = f" [{entry.alert_level.upper()}]" if entry.is_alert() else "" |
| alert_msg = f" β {entry.alert_text}" if entry.alert_text else "" |
| lines.append( |
| f" [{t}]{alert_tag} {entry.activity} in {entry.environment} " |
| f"{entry.location_type}, risk={entry.risk_level:.2f}{alert_msg}" |
| ) |
| return "\n".join(lines) |
|
|
| def get_situation_summary(self, seconds: float = 120.0) -> str: |
| """ |
| High-level narrative of what happened in the last N seconds. |
| Suitable for incident reports or caregiver dashboards. |
| """ |
| entries = self.recent(seconds) |
| if not entries: |
| return "No activity recorded." |
|
|
| alerts = [e for e in entries if e.is_alert()] |
| avg_risk = sum(e.risk_level for e in entries) / len(entries) |
| activities = list(set(e.activity for e in entries)) |
| trend = self.analyze_trend(seconds) |
|
|
| parts = [ |
| f"Over the last {seconds:.0f}s:", |
| f" {len(entries)} observations, {len(alerts)} alerts", |
| f" Average risk: {avg_risk:.2f}", |
| f" Activities: {', '.join(activities)}", |
| f" Trend: {trend['direction']}", |
| ] |
|
|
| if trend["activity_change"]: |
| parts.append(f" Activity changed: {trend['activity_change']}") |
| if trend["person_approaching"]: |
| parts.append(" Person detected approaching") |
| if trend["critical_count"] > 0: |
| parts.append(f" {trend['critical_count']} critical alerts fired") |
|
|
| return "\n".join(parts) |
|
|
| def reset(self) -> None: |
| """Clear all stored entries.""" |
| self.entries.clear() |
| self._last_store_time = 0.0 |
|
|
| @property |
| def total_entries(self) -> int: |
| return len(self.entries) |
|
|
| @property |
| def total_alerts(self) -> int: |
| return sum(1 for e in self.entries if e.is_alert()) |
|
|