Spaces:
Running
Running
| """Long-term memory layer (semantic / procedural / episodic). | |
| Uses stdlib ``sqlite3`` so the project ships with no extra dependencies. | |
| The interface mirrors the three-tier taxonomy from the recent agent-memory | |
| literature, so an alternative backend (Mem0 / Letta / sqlite-vec) can | |
| replace this one without touching the call sites. | |
| Tiers | |
| ----- | |
| * **Working** — held in the LangGraph state (untouched by this module). | |
| * **Semantic** — atomic facts about the user (likes, dislikes, hard | |
| constraints, lab results). Survives across sessions. | |
| * **Procedural** — verdicts the validator produced. Lets the system note | |
| "this user rejected high-carb breakfasts twice" without re-asking. | |
| * **Episodic** — JSON snapshot of past sessions for replay / audit. | |
| The schema is three tables, one row per fact / verdict / session. SQL | |
| ``LIKE`` over short text is sufficient at the demo's scale; a vector | |
| backend can be added when retrieval recall becomes the bottleneck. | |
| """ | |
| from __future__ import annotations | |
| import json | |
| import sqlite3 | |
| import threading | |
| from datetime import datetime | |
| from typing import Any, Dict, List, Optional | |
| _SCHEMA = """ | |
| CREATE TABLE IF NOT EXISTS semantic_facts ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| user_id TEXT NOT NULL, | |
| fact_type TEXT NOT NULL, -- e.g. 'dislike', 'allergy', 'preference' | |
| content TEXT NOT NULL, | |
| source TEXT NOT NULL DEFAULT '', -- e.g. 'user_stated', 'inferred', 'validator' | |
| created_at TEXT NOT NULL | |
| ); | |
| CREATE INDEX IF NOT EXISTS idx_facts_user ON semantic_facts(user_id, fact_type); | |
| CREATE TABLE IF NOT EXISTS procedural_records ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| user_id TEXT NOT NULL, | |
| plan_summary TEXT NOT NULL, | |
| verdict TEXT NOT NULL, -- 'pass' | 'revise' | 'reject' | |
| issues_json TEXT NOT NULL, -- JSON list of ValidationIssue | |
| created_at TEXT NOT NULL | |
| ); | |
| CREATE INDEX IF NOT EXISTS idx_proc_user ON procedural_records(user_id, created_at); | |
| CREATE TABLE IF NOT EXISTS episodic_sessions ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| user_id TEXT NOT NULL, | |
| session_id TEXT NOT NULL, | |
| payload_json TEXT NOT NULL, -- JSON snapshot of session state | |
| created_at TEXT NOT NULL | |
| ); | |
| CREATE INDEX IF NOT EXISTS idx_episodic_user ON episodic_sessions(user_id, created_at); | |
| """ | |
| class LongTermMemory: | |
| """SQLite-backed three-tier long-term memory. | |
| Pass a file path for persistence across runs, or ``None`` (default) for an | |
| in-memory database useful in tests / ephemeral demos. | |
| """ | |
| def __init__(self, db_path: Optional[str] = None) -> None: | |
| self.db_path = db_path or ":memory:" | |
| # SQLite connections are not thread-safe by default; one connection per | |
| # thread is the standard pattern. The demo is single-process so a single | |
| # connection + lock is enough. | |
| self.conn = sqlite3.connect(self.db_path, check_same_thread=False) | |
| self.conn.row_factory = sqlite3.Row | |
| self._lock = threading.Lock() | |
| self._init_schema() | |
| def _init_schema(self) -> None: | |
| with self._lock: | |
| self.conn.executescript(_SCHEMA) | |
| self.conn.commit() | |
| def close(self) -> None: | |
| with self._lock: | |
| self.conn.close() | |
| # ------------------------------------------------------------------ | |
| # Semantic facts | |
| # ------------------------------------------------------------------ | |
| def remember_fact( | |
| self, | |
| user_id: str, | |
| fact_type: str, | |
| content: str, | |
| source: str = "user_stated", | |
| ) -> int: | |
| """Insert a semantic fact. Returns the row id.""" | |
| now = datetime.utcnow().isoformat() | |
| with self._lock: | |
| cur = self.conn.execute( | |
| "INSERT INTO semantic_facts (user_id, fact_type, content, source, created_at) " | |
| "VALUES (?, ?, ?, ?, ?)", | |
| (user_id, fact_type, content, source, now), | |
| ) | |
| self.conn.commit() | |
| return int(cur.lastrowid or 0) | |
| def recall_facts( | |
| self, | |
| user_id: str, | |
| fact_type: Optional[str] = None, | |
| contains: Optional[str] = None, | |
| limit: int = 50, | |
| ) -> List[Dict[str, Any]]: | |
| """List facts for a user, optionally filtered by type / substring.""" | |
| sql = "SELECT * FROM semantic_facts WHERE user_id = ?" | |
| params: List[Any] = [user_id] | |
| if fact_type: | |
| sql += " AND fact_type = ?" | |
| params.append(fact_type) | |
| if contains: | |
| sql += " AND content LIKE ?" | |
| params.append(f"%{contains}%") | |
| sql += " ORDER BY created_at DESC LIMIT ?" | |
| params.append(limit) | |
| with self._lock: | |
| cur = self.conn.execute(sql, params) | |
| return [dict(row) for row in cur.fetchall()] | |
| def forget_fact(self, fact_id: int) -> None: | |
| with self._lock: | |
| self.conn.execute("DELETE FROM semantic_facts WHERE id = ?", (fact_id,)) | |
| self.conn.commit() | |
| # ------------------------------------------------------------------ | |
| # Procedural records (validator history) | |
| # ------------------------------------------------------------------ | |
| def remember_validation( | |
| self, | |
| user_id: str, | |
| plan_summary: str, | |
| verdict: str, | |
| issues: List[Dict[str, Any]], | |
| ) -> int: | |
| now = datetime.utcnow().isoformat() | |
| with self._lock: | |
| cur = self.conn.execute( | |
| "INSERT INTO procedural_records (user_id, plan_summary, verdict, issues_json, created_at) " | |
| "VALUES (?, ?, ?, ?, ?)", | |
| (user_id, plan_summary, verdict, json.dumps(issues), now), | |
| ) | |
| self.conn.commit() | |
| return int(cur.lastrowid or 0) | |
| def recall_validations(self, user_id: str, limit: int = 10) -> List[Dict[str, Any]]: | |
| with self._lock: | |
| cur = self.conn.execute( | |
| "SELECT * FROM procedural_records WHERE user_id = ? ORDER BY created_at DESC LIMIT ?", | |
| (user_id, limit), | |
| ) | |
| return [ | |
| {**dict(row), "issues": json.loads(row["issues_json"])} | |
| for row in cur.fetchall() | |
| ] | |
| # ------------------------------------------------------------------ | |
| # Episodic sessions | |
| # ------------------------------------------------------------------ | |
| def remember_session(self, user_id: str, session_id: str, payload: Dict[str, Any]) -> int: | |
| now = datetime.utcnow().isoformat() | |
| with self._lock: | |
| cur = self.conn.execute( | |
| "INSERT INTO episodic_sessions (user_id, session_id, payload_json, created_at) " | |
| "VALUES (?, ?, ?, ?)", | |
| (user_id, session_id, json.dumps(payload, default=str), now), | |
| ) | |
| self.conn.commit() | |
| return int(cur.lastrowid or 0) | |
| def recall_sessions(self, user_id: str, limit: int = 5) -> List[Dict[str, Any]]: | |
| with self._lock: | |
| cur = self.conn.execute( | |
| "SELECT * FROM episodic_sessions WHERE user_id = ? ORDER BY created_at DESC LIMIT ?", | |
| (user_id, limit), | |
| ) | |
| return [ | |
| {**dict(row), "payload": json.loads(row["payload_json"])} | |
| for row in cur.fetchall() | |
| ] | |
| __all__ = ["LongTermMemory"] | |