sentinel / memory_engine.py
kswffs's picture
Upload folder using huggingface_hub
b96103d verified
Raw
History Blame Contribute Delete
10.8 kB
"""
Sentinel Memory Engine β€” Temporal context storage and trend analysis.
Stores periodic snapshots of the ContextState, creating a timeline of what
the system has observed. This enables:
- Trend detection: "situation is getting worse over the last 30 seconds"
- Historical queries: "what was happening 2 minutes ago?"
- VLM context enrichment: "previous alerts show a person approaching"
- Escalation logic: "3 warnings in 30 seconds β†’ auto-escalate to critical"
Usage:
memory = MemoryEngine(max_entries=100)
memory.store(context_engine.state, alert_level="warning", alert_text="Person nearby")
summary = memory.get_trend_summary() # for VLM prompt
trend = memory.analyze_trend() # "deteriorating" / "improving" / "stable"
"""
import time
from dataclasses import dataclass, field
try:
import structlog
logger = structlog.get_logger()
except ImportError:
import logging
logger = logging.getLogger("sentinel.memory")
from context_engine import ContextState
@dataclass
class MemoryEntry:
"""
A point-in-time snapshot of the system's understanding.
Combines the context state with any alert that was generated,
creating a complete record of "what happened at this moment."
"""
timestamp: float
context_summary: str
activity: str
environment: str
location_type: str
nearby_people: int
closest_person_distance: float
closest_person_trend: str
risk_level: float
risk_factors: list = field(default_factory=list)
alert_level: str = "ok"
alert_text: str = ""
battery: float = 100.0
frames_analyzed: int = 0
@classmethod
def from_context(
cls,
ctx: ContextState,
alert_level: str = "ok",
alert_text: str = ""
) -> "MemoryEntry":
"""Create a MemoryEntry from a live ContextState snapshot."""
return cls(
timestamp=time.time(),
context_summary=ctx.to_prompt_context(),
activity=ctx.activity,
environment=ctx.environment,
location_type=ctx.location_type,
nearby_people=ctx.nearby_people,
closest_person_distance=ctx.closest_person_distance,
closest_person_trend=ctx.closest_person_trend,
risk_level=ctx.risk_level,
risk_factors=list(ctx.risk_factors),
alert_level=alert_level,
alert_text=alert_text,
battery=ctx.battery,
frames_analyzed=ctx.frames_analyzed,
)
def age_seconds(self) -> float:
return time.time() - self.timestamp
def is_alert(self) -> bool:
return self.alert_level in ("warning", "critical")
class MemoryEngine:
"""
Bounded timeline of context snapshots for temporal reasoning.
Stores up to max_entries (default 100) MemoryEntry objects.
Provides trend analysis, historical queries, and formatted output
for VLM prompt enrichment.
Designed for Gradio per-session state (gr.State). All attributes
are plain Python types β€” fully serializable.
"""
def __init__(self, max_entries: int = 100):
self.entries: list[MemoryEntry] = []
self.max_entries = max_entries
self._last_store_time: float = 0.0
def store(
self,
context: ContextState,
alert_level: str = "ok",
alert_text: str = "",
min_interval: float = 2.0
) -> MemoryEntry | None:
"""
Store a context snapshot.
Args:
context: Current ContextState to snapshot.
alert_level: Alert level if one was generated this frame.
alert_text: Alert text if one was generated.
min_interval: Minimum seconds between stores (prevents flooding).
Alerts (warning/critical) bypass this throttle.
Returns:
The created MemoryEntry, or None if throttled.
"""
now = time.time()
is_alert = alert_level in ("warning", "critical")
if not is_alert and (now - self._last_store_time) < min_interval:
return None
entry = MemoryEntry.from_context(context, alert_level, alert_text)
self.entries.append(entry)
if len(self.entries) > self.max_entries:
self.entries = self.entries[-self.max_entries:]
self._last_store_time = now
return entry
def recent(self, seconds: float = 60.0) -> list[MemoryEntry]:
"""Retrieve entries from the last N seconds."""
cutoff = time.time() - seconds
return [e for e in self.entries if e.timestamp >= cutoff]
def recent_alerts(self, seconds: float = 120.0) -> list[MemoryEntry]:
"""Retrieve only alert entries (warning/critical) from last N seconds."""
cutoff = time.time() - seconds
return [e for e in self.entries if e.timestamp >= cutoff and e.is_alert()]
def alert_count(self, seconds: float = 60.0, level: str | None = None) -> int:
"""Count alerts in the last N seconds, optionally filtered by level."""
cutoff = time.time() - seconds
count = 0
for e in self.entries:
if e.timestamp < cutoff:
continue
if not e.is_alert():
continue
if level is None or e.alert_level == level:
count += 1
return count
# ── Trend Analysis ──────────────────────────────────────────────────────
def analyze_trend(self, window: float = 30.0) -> dict:
"""
Compare recent entries against older entries to detect trends.
Returns a dict:
{
"direction": "deteriorating" | "improving" | "stable",
"risk_delta": float, # positive = risk increasing
"alert_rate": float, # alerts per minute
"warning_count": int, # warnings in window
"critical_count": int, # criticals in window
"person_approaching": bool, # is someone closing in?
"activity_change": str | None, # e.g. "walking β†’ fallen"
}
"""
now = time.time()
recent = [e for e in self.entries if e.timestamp > now - window]
older = [
e for e in self.entries
if now - window * 2 < e.timestamp <= now - window
]
result = {
"direction": "stable",
"risk_delta": 0.0,
"alert_rate": 0.0,
"warning_count": 0,
"critical_count": 0,
"person_approaching": False,
"activity_change": None,
}
if len(recent) < 2:
return result
recent_risk = sum(e.risk_level for e in recent) / len(recent)
older_risk = (
sum(e.risk_level for e in older) / len(older) if older else 0.0
)
risk_delta = recent_risk - older_risk
result["risk_delta"] = risk_delta
if risk_delta > 0.15:
result["direction"] = "deteriorating"
elif risk_delta < -0.15:
result["direction"] = "improving"
alerts_in_window = [e for e in recent if e.is_alert()]
result["warning_count"] = sum(
1 for e in alerts_in_window if e.alert_level == "warning"
)
result["critical_count"] = sum(
1 for e in alerts_in_window if e.alert_level == "critical"
)
result["alert_rate"] = len(alerts_in_window) / (window / 60.0)
if any(e.closest_person_trend == "approaching" for e in recent):
result["person_approaching"] = True
if older and recent:
old_activity = max(set(e.activity for e in older), key=lambda a: sum(1 for e in older if e.activity == a))
new_activity = max(set(e.activity for e in recent), key=lambda a: sum(1 for e in recent if e.activity == a))
if old_activity != new_activity:
result["activity_change"] = f"{old_activity} β†’ {new_activity}"
return result
# ── Formatted Output ────────────────────────────────────────────────────
def get_trend_summary(self, max_entries: int = 5) -> str:
"""
Format recent history for VLM prompt enrichment.
Returns a compact multi-line string with timestamps and alerts.
"""
recent = self.entries[-max_entries:]
if not recent:
return "No previous context available."
lines = ["Recent observations:"]
for entry in recent:
t = time.strftime("%H:%M:%S", time.localtime(entry.timestamp))
alert_tag = f" [{entry.alert_level.upper()}]" if entry.is_alert() else ""
alert_msg = f" β€” {entry.alert_text}" if entry.alert_text else ""
lines.append(
f" [{t}]{alert_tag} {entry.activity} in {entry.environment} "
f"{entry.location_type}, risk={entry.risk_level:.2f}{alert_msg}"
)
return "\n".join(lines)
def get_situation_summary(self, seconds: float = 120.0) -> str:
"""
High-level narrative of what happened in the last N seconds.
Suitable for incident reports or caregiver dashboards.
"""
entries = self.recent(seconds)
if not entries:
return "No activity recorded."
alerts = [e for e in entries if e.is_alert()]
avg_risk = sum(e.risk_level for e in entries) / len(entries)
activities = list(set(e.activity for e in entries))
trend = self.analyze_trend(seconds)
parts = [
f"Over the last {seconds:.0f}s:",
f" {len(entries)} observations, {len(alerts)} alerts",
f" Average risk: {avg_risk:.2f}",
f" Activities: {', '.join(activities)}",
f" Trend: {trend['direction']}",
]
if trend["activity_change"]:
parts.append(f" Activity changed: {trend['activity_change']}")
if trend["person_approaching"]:
parts.append(" Person detected approaching")
if trend["critical_count"] > 0:
parts.append(f" {trend['critical_count']} critical alerts fired")
return "\n".join(parts)
def reset(self) -> None:
"""Clear all stored entries."""
self.entries.clear()
self._last_store_time = 0.0
@property
def total_entries(self) -> int:
return len(self.entries)
@property
def total_alerts(self) -> int:
return sum(1 for e in self.entries if e.is_alert())