""" Sentinel Memory Engine — Temporal context storage and trend analysis. Stores periodic snapshots of the ContextState, creating a timeline of what the system has observed. This enables: - Trend detection: "situation is getting worse over the last 30 seconds" - Historical queries: "what was happening 2 minutes ago?" - VLM context enrichment: "previous alerts show a person approaching" - Escalation logic: "3 warnings in 30 seconds → auto-escalate to critical" Usage: memory = MemoryEngine(max_entries=100) memory.store(context_engine.state, alert_level="warning", alert_text="Person nearby") summary = memory.get_trend_summary() # for VLM prompt trend = memory.analyze_trend() # "deteriorating" / "improving" / "stable" """ import time from dataclasses import dataclass, field try: import structlog logger = structlog.get_logger() except ImportError: import logging logger = logging.getLogger("sentinel.memory") from context_engine import ContextState @dataclass class MemoryEntry: """ A point-in-time snapshot of the system's understanding. Combines the context state with any alert that was generated, creating a complete record of "what happened at this moment." """ timestamp: float context_summary: str activity: str environment: str location_type: str nearby_people: int closest_person_distance: float closest_person_trend: str risk_level: float risk_factors: list = field(default_factory=list) alert_level: str = "ok" alert_text: str = "" battery: float = 100.0 frames_analyzed: int = 0 @classmethod def from_context( cls, ctx: ContextState, alert_level: str = "ok", alert_text: str = "" ) -> "MemoryEntry": """Create a MemoryEntry from a live ContextState snapshot.""" return cls( timestamp=time.time(), context_summary=ctx.to_prompt_context(), activity=ctx.activity, environment=ctx.environment, location_type=ctx.location_type, nearby_people=ctx.nearby_people, closest_person_distance=ctx.closest_person_distance, closest_person_trend=ctx.closest_person_trend, risk_level=ctx.risk_level, risk_factors=list(ctx.risk_factors), alert_level=alert_level, alert_text=alert_text, battery=ctx.battery, frames_analyzed=ctx.frames_analyzed, ) def age_seconds(self) -> float: return time.time() - self.timestamp def is_alert(self) -> bool: return self.alert_level in ("warning", "critical") class MemoryEngine: """ Bounded timeline of context snapshots for temporal reasoning. Stores up to max_entries (default 100) MemoryEntry objects. Provides trend analysis, historical queries, and formatted output for VLM prompt enrichment. Designed for Gradio per-session state (gr.State). All attributes are plain Python types — fully serializable. """ def __init__(self, max_entries: int = 100): self.entries: list[MemoryEntry] = [] self.max_entries = max_entries self._last_store_time: float = 0.0 def store( self, context: ContextState, alert_level: str = "ok", alert_text: str = "", min_interval: float = 2.0 ) -> MemoryEntry | None: """ Store a context snapshot. Args: context: Current ContextState to snapshot. alert_level: Alert level if one was generated this frame. alert_text: Alert text if one was generated. min_interval: Minimum seconds between stores (prevents flooding). Alerts (warning/critical) bypass this throttle. Returns: The created MemoryEntry, or None if throttled. """ now = time.time() is_alert = alert_level in ("warning", "critical") if not is_alert and (now - self._last_store_time) < min_interval: return None entry = MemoryEntry.from_context(context, alert_level, alert_text) self.entries.append(entry) if len(self.entries) > self.max_entries: self.entries = self.entries[-self.max_entries:] self._last_store_time = now return entry def recent(self, seconds: float = 60.0) -> list[MemoryEntry]: """Retrieve entries from the last N seconds.""" cutoff = time.time() - seconds return [e for e in self.entries if e.timestamp >= cutoff] def recent_alerts(self, seconds: float = 120.0) -> list[MemoryEntry]: """Retrieve only alert entries (warning/critical) from last N seconds.""" cutoff = time.time() - seconds return [e for e in self.entries if e.timestamp >= cutoff and e.is_alert()] def alert_count(self, seconds: float = 60.0, level: str | None = None) -> int: """Count alerts in the last N seconds, optionally filtered by level.""" cutoff = time.time() - seconds count = 0 for e in self.entries: if e.timestamp < cutoff: continue if not e.is_alert(): continue if level is None or e.alert_level == level: count += 1 return count # ── Trend Analysis ────────────────────────────────────────────────────── def analyze_trend(self, window: float = 30.0) -> dict: """ Compare recent entries against older entries to detect trends. Returns a dict: { "direction": "deteriorating" | "improving" | "stable", "risk_delta": float, # positive = risk increasing "alert_rate": float, # alerts per minute "warning_count": int, # warnings in window "critical_count": int, # criticals in window "person_approaching": bool, # is someone closing in? "activity_change": str | None, # e.g. "walking → fallen" } """ now = time.time() recent = [e for e in self.entries if e.timestamp > now - window] older = [ e for e in self.entries if now - window * 2 < e.timestamp <= now - window ] result = { "direction": "stable", "risk_delta": 0.0, "alert_rate": 0.0, "warning_count": 0, "critical_count": 0, "person_approaching": False, "activity_change": None, } if len(recent) < 2: return result recent_risk = sum(e.risk_level for e in recent) / len(recent) older_risk = ( sum(e.risk_level for e in older) / len(older) if older else 0.0 ) risk_delta = recent_risk - older_risk result["risk_delta"] = risk_delta if risk_delta > 0.15: result["direction"] = "deteriorating" elif risk_delta < -0.15: result["direction"] = "improving" alerts_in_window = [e for e in recent if e.is_alert()] result["warning_count"] = sum( 1 for e in alerts_in_window if e.alert_level == "warning" ) result["critical_count"] = sum( 1 for e in alerts_in_window if e.alert_level == "critical" ) result["alert_rate"] = len(alerts_in_window) / (window / 60.0) if any(e.closest_person_trend == "approaching" for e in recent): result["person_approaching"] = True if older and recent: old_activity = max(set(e.activity for e in older), key=lambda a: sum(1 for e in older if e.activity == a)) new_activity = max(set(e.activity for e in recent), key=lambda a: sum(1 for e in recent if e.activity == a)) if old_activity != new_activity: result["activity_change"] = f"{old_activity} → {new_activity}" return result # ── Formatted Output ──────────────────────────────────────────────────── def get_trend_summary(self, max_entries: int = 5) -> str: """ Format recent history for VLM prompt enrichment. Returns a compact multi-line string with timestamps and alerts. """ recent = self.entries[-max_entries:] if not recent: return "No previous context available." lines = ["Recent observations:"] for entry in recent: t = time.strftime("%H:%M:%S", time.localtime(entry.timestamp)) alert_tag = f" [{entry.alert_level.upper()}]" if entry.is_alert() else "" alert_msg = f" — {entry.alert_text}" if entry.alert_text else "" lines.append( f" [{t}]{alert_tag} {entry.activity} in {entry.environment} " f"{entry.location_type}, risk={entry.risk_level:.2f}{alert_msg}" ) return "\n".join(lines) def get_situation_summary(self, seconds: float = 120.0) -> str: """ High-level narrative of what happened in the last N seconds. Suitable for incident reports or caregiver dashboards. """ entries = self.recent(seconds) if not entries: return "No activity recorded." alerts = [e for e in entries if e.is_alert()] avg_risk = sum(e.risk_level for e in entries) / len(entries) activities = list(set(e.activity for e in entries)) trend = self.analyze_trend(seconds) parts = [ f"Over the last {seconds:.0f}s:", f" {len(entries)} observations, {len(alerts)} alerts", f" Average risk: {avg_risk:.2f}", f" Activities: {', '.join(activities)}", f" Trend: {trend['direction']}", ] if trend["activity_change"]: parts.append(f" Activity changed: {trend['activity_change']}") if trend["person_approaching"]: parts.append(" Person detected approaching") if trend["critical_count"] > 0: parts.append(f" {trend['critical_count']} critical alerts fired") return "\n".join(parts) def reset(self) -> None: """Clear all stored entries.""" self.entries.clear() self._last_store_time = 0.0 @property def total_entries(self) -> int: return len(self.entries) @property def total_alerts(self) -> int: return sum(1 for e in self.entries if e.is_alert())