| """ |
| Memory V2 — Brain-inspired hierarchical memory engine (additive). |
| |
| Implements Working -> Consolidation -> Semantic memory with: |
| - Exponential decay (activation fades over time) |
| - Reinforcement (accessed/confirmed memories grow stronger) |
| - Consolidation (repeated working items promote to semantic) |
| - Pruning (low-activation + low-importance semantic items are forgotten) |
| - Persona Integrity Kernel isolation (never modifies persona identity) |
| |
| Memory types: |
| P = Pinned (user-approved "core memories"; never auto-pruned, tau=infinity) |
| S = Semantic (stable facts/preferences; slow decay tau~30 days; prunable) |
| W = Working (short-lived context traces; fast decay tau~6 hours) |
| A = Anchor (profile/persona kernel — not stored here; injected from profile.py) |
| |
| IMPORTANT: This module is additive-only. |
| - Reuses the existing persona_memory table |
| - Adds optional columns via safe ALTER TABLE (ignored by V1 code) |
| - V1 (ltm.py) continues to work unchanged |
| - V2 is opt-in via memoryEngine='v2' in settings |
| |
| IMPORTANT: Persona identity is NEVER learned/changed here. |
| This stores only user-overlay memories (facts about the user, preferences, |
| boundaries). The persona's voice/role/rules remain immutable in the |
| PersonalityAgent definitions. |
| """ |
| from __future__ import annotations |
|
|
| import hashlib |
| import math |
| import re |
| import sqlite3 |
| import time |
| from dataclasses import dataclass |
| from typing import Any, Dict, List, Optional, Tuple |
|
|
| from .storage import _get_db_path |
|
|
|
|
| |
| |
| |
|
|
| @dataclass |
| class V2Config: |
| |
| tau_working: float = 6 * 3600.0 |
| tau_semantic: float = 30 * 24 * 3600.0 |
|
|
| |
| eta_user_confirmed: float = 0.25 |
| eta_inferred: float = 0.05 |
|
|
| |
| consolidate_min_repeats: int = 2 |
| consolidate_min_importance: float = 0.45 |
| consolidate_min_activation: float = 0.25 |
|
|
| |
| prune_activation_thresh: float = 0.05 |
| prune_importance_thresh: float = 0.25 |
|
|
| |
| top_pinned: int = 4 |
| top_semantic: int = 8 |
| top_working: int = 1 |
|
|
| |
| max_value_chars: int = 600 |
|
|
| |
| maintenance_interval: float = 30.0 |
|
|
|
|
| |
| |
| |
|
|
| def _now() -> float: |
| return time.time() |
|
|
|
|
| def _clamp(x: float, lo: float, hi: float) -> float: |
| return max(lo, min(hi, x)) |
|
|
|
|
| def _activation(strength: float, last_access_at: float, tau: float) -> float: |
| """Compute retrievability: A(t) = s * exp(-dt / tau).""" |
| dt = max(0.0, _now() - (last_access_at or _now())) |
| return float(strength) * math.exp(-dt / tau) |
|
|
|
|
| def _reinforce(strength: float, eta: float) -> float: |
| """Saturating reinforcement: s <- 1 - (1-s) * exp(-eta).""" |
| s = _clamp(float(strength), 0.0, 1.0) |
| return 1.0 - (1.0 - s) * math.exp(-float(eta)) |
|
|
|
|
| def _stable_hash(text: str) -> str: |
| """Deterministic hash (stable across Python restarts, unlike hash()).""" |
| return hashlib.md5(text.encode("utf-8", errors="replace")).hexdigest()[:12] |
|
|
|
|
| def _clean_value(s: str, max_chars: int) -> str: |
| s2 = (s or "").strip() |
| s2 = re.sub(r"\s+", " ", s2) |
| if len(s2) > max_chars: |
| s2 = s2[:max_chars - 1].rstrip() + "..." |
| return s2 |
|
|
|
|
| def _keyword_score(query: str, text: str) -> float: |
| """Lightweight relevance: normalized token overlap.""" |
| q = set(re.findall(r"[a-z0-9]{3,}", (query or "").lower())) |
| t = set(re.findall(r"[a-z0-9]{3,}", (text or "").lower())) |
| if not q or not t: |
| return 0.0 |
| overlap = len(q.intersection(t)) |
| return overlap / max(1, min(len(q), 8)) |
|
|
|
|
| |
| |
| |
|
|
| _RE_REMEMBER = re.compile( |
| r"\b(remember this|remember that|don't forget|do not forget|please remember)\b", |
| re.I, |
| ) |
|
|
|
|
| |
| |
| |
|
|
| def ensure_v2_columns() -> None: |
| """ |
| Add V2-specific columns to persona_memory table. |
| Safe to call on every startup — skips columns that already exist. |
| V1 code ignores these columns (uses SELECT * with sqlite3.Row). |
| """ |
| path = _get_db_path() |
| con = sqlite3.connect(path) |
| cur = con.cursor() |
| cur.execute("PRAGMA table_info(persona_memory)") |
| existing = {row[1] for row in cur.fetchall()} |
|
|
| additions = [ |
| ("mem_type", "TEXT DEFAULT 'S'"), |
| ("strength", "REAL DEFAULT 0.5"), |
| ("importance", "REAL DEFAULT 0.3"), |
| ("last_access_at", "REAL DEFAULT 0"), |
| ("access_count", "INTEGER DEFAULT 0"), |
| ("last_seen_at", "REAL DEFAULT 0"), |
| ] |
|
|
| for col_name, col_def in additions: |
| if col_name not in existing: |
| cur.execute(f"ALTER TABLE persona_memory ADD COLUMN {col_name} {col_def}") |
|
|
| con.commit() |
| con.close() |
|
|
|
|
| |
| |
| |
|
|
| def _connect() -> sqlite3.Connection: |
| """Create a connection with Row factory (dict-like access by column name).""" |
| con = sqlite3.connect(_get_db_path()) |
| con.row_factory = sqlite3.Row |
| return con |
|
|
|
|
| def _select_memories(project_id: str, user_id: Optional[str] = None) -> List[Dict[str, Any]]: |
| """Fetch all V2 memories for a project, optionally scoped to user.""" |
| con = _connect() |
| cur = con.cursor() |
| if user_id: |
| cur.execute( |
| "SELECT * FROM persona_memory WHERE project_id = ? AND user_id = ?", |
| (project_id, user_id), |
| ) |
| else: |
| cur.execute("SELECT * FROM persona_memory WHERE project_id = ?", (project_id,)) |
| rows = [dict(r) for r in cur.fetchall()] |
| con.close() |
| return rows |
|
|
|
|
| def _upsert_memory( |
| project_id: str, |
| category: str, |
| key: str, |
| value: str, |
| mem_type: str, |
| source_type: str, |
| confidence: float, |
| strength: float, |
| importance: float, |
| seen_now: bool = True, |
| reinforce_eta: Optional[float] = None, |
| user_id: Optional[str] = None, |
| ) -> None: |
| """ |
| Additive upsert using existing UNIQUE(project_id, category, key). |
| On conflict: updates value/confidence/source_type, preserves V2 fields. |
| """ |
| con = sqlite3.connect(_get_db_path()) |
| cur = con.cursor() |
| now_ts = _now() |
| now_dt = time.strftime("%Y-%m-%d %H:%M:%S") |
| value = _clean_value(value, max_chars=600) |
|
|
| |
| cur.execute( |
| """ |
| UPDATE persona_memory |
| SET |
| value = ?, |
| confidence = ?, |
| source_type = ?, |
| updated_at = ?, |
| mem_type = COALESCE(mem_type, ?), |
| strength = COALESCE(strength, 0.5), |
| importance = COALESCE(importance, ?), |
| last_seen_at = CASE WHEN ? THEN ? ELSE COALESCE(last_seen_at, 0) END |
| WHERE project_id = ? AND category = ? AND key = ? |
| """, |
| (value, float(confidence), source_type, now_dt, |
| mem_type, float(importance), |
| 1 if seen_now else 0, now_ts, |
| project_id, category, key), |
| ) |
|
|
| if cur.rowcount == 0: |
| |
| cur.execute( |
| """ |
| INSERT INTO persona_memory |
| (project_id, category, key, value, confidence, source_session, |
| source_type, created_at, updated_at, |
| mem_type, strength, importance, last_access_at, access_count, last_seen_at, |
| user_id) |
| VALUES |
| (?, ?, ?, ?, ?, NULL, |
| ?, ?, ?, |
| ?, ?, ?, 0, 0, ?, |
| ?) |
| """, |
| (project_id, category, key, value, float(confidence), |
| source_type, now_dt, now_dt, |
| mem_type, float(_clamp(strength, 0.0, 1.0)), |
| float(_clamp(importance, 0.0, 1.0)), |
| now_ts if seen_now else 0.0, |
| user_id), |
| ) |
|
|
| |
| if reinforce_eta is not None: |
| cur.execute( |
| """ |
| UPDATE persona_memory |
| SET |
| strength = MIN(1.0, 1.0 - (1.0 - COALESCE(strength, 0.5)) * EXP(-?)), |
| last_access_at = ?, |
| access_count = COALESCE(access_count, 0) + 1 |
| WHERE project_id = ? AND category = ? AND key = ? |
| """, |
| (float(reinforce_eta), now_ts, project_id, category, key), |
| ) |
|
|
| con.commit() |
| con.close() |
|
|
|
|
| def _touch_access(project_id: str, mem_id: int, eta: float) -> None: |
| """Reinforce an existing memory entry by ID (light touch on retrieval).""" |
| con = sqlite3.connect(_get_db_path()) |
| cur = con.cursor() |
| now_ts = _now() |
| cur.execute( |
| """ |
| UPDATE persona_memory |
| SET |
| strength = MIN(1.0, 1.0 - (1.0 - COALESCE(strength, 0.5)) * EXP(-?)), |
| last_access_at = ?, |
| access_count = COALESCE(access_count, 0) + 1 |
| WHERE project_id = ? AND id = ? |
| """, |
| (float(eta), now_ts, project_id, int(mem_id)), |
| ) |
| con.commit() |
| con.close() |
|
|
|
|
| def _delete_by_id(project_id: str, mem_id: int) -> None: |
| con = sqlite3.connect(_get_db_path()) |
| cur = con.cursor() |
| cur.execute( |
| "DELETE FROM persona_memory WHERE project_id = ? AND id = ?", |
| (project_id, int(mem_id)), |
| ) |
| con.commit() |
| con.close() |
|
|
|
|
| def purge_memories_by_keyword(project_id: str, keyword: str) -> int: |
| """ |
| Delete all Working and Semantic memories whose value mentions `keyword`. |
| Called when an outfit/item is deleted from the wardrobe so stale references |
| don't linger. Pinned (P) memories are preserved — the user explicitly |
| asked to remember those. |
| |
| Returns count of deleted entries. |
| """ |
| if not keyword or not keyword.strip(): |
| return 0 |
| kw = keyword.strip().lower() |
| con = sqlite3.connect(_get_db_path()) |
| con.row_factory = sqlite3.Row |
| cur = con.cursor() |
| cur.execute( |
| "SELECT id, value, mem_type FROM persona_memory WHERE project_id = ?", |
| (project_id,), |
| ) |
| to_delete: List[int] = [] |
| for row in cur.fetchall(): |
| mem_type = (row["mem_type"] or "S").upper() |
| if mem_type == "P": |
| continue |
| val = (row["value"] or "").lower() |
| if kw in val: |
| to_delete.append(int(row["id"])) |
| for mid in to_delete: |
| cur.execute("DELETE FROM persona_memory WHERE id = ?", (mid,)) |
| con.commit() |
| con.close() |
| return len(to_delete) |
|
|
|
|
| |
| |
| |
|
|
| class MemoryV2Engine: |
| def __init__(self, config: Optional[V2Config] = None) -> None: |
| self.cfg = config or V2Config() |
| self._last_maintenance: Dict[str, float] = {} |
|
|
| |
|
|
| def ingest_user_text(self, project_id: str, user_text: str, user_id: Optional[str] = None) -> None: |
| """ |
| Ingest user text as memory. Three paths: |
| 1. Explicit "remember this" -> Pinned (P), never decays |
| 2. Default -> Working trace (W), fast decay |
| Consolidation + pruning run on a throttled schedule (not every message). |
| """ |
| t = _clean_value(user_text, self.cfg.max_value_chars) |
| if not t or len(t) < 5: |
| return |
|
|
| |
| if _RE_REMEMBER.search(t): |
| _upsert_memory( |
| project_id=project_id, |
| category="user", |
| key=f"pinned:{_stable_hash(t)}", |
| value=t, |
| mem_type="P", |
| source_type="user", |
| confidence=1.0, |
| strength=1.0, |
| importance=0.95, |
| reinforce_eta=self.cfg.eta_user_confirmed, |
| user_id=user_id, |
| ) |
| return |
|
|
| |
| _upsert_memory( |
| project_id=project_id, |
| category="working", |
| key=f"w:{_stable_hash(t)}", |
| value=t, |
| mem_type="W", |
| source_type="user", |
| confidence=0.5, |
| strength=0.5, |
| importance=0.25, |
| reinforce_eta=None, |
| user_id=user_id, |
| ) |
|
|
| |
| self._maybe_maintain(project_id, user_id=user_id) |
|
|
| |
|
|
| def _maybe_maintain(self, project_id: str, user_id: Optional[str] = None) -> None: |
| """Run consolidation + pruning at most once per maintenance_interval.""" |
| now = _now() |
| last = self._last_maintenance.get(project_id, 0.0) |
| if now - last < self.cfg.maintenance_interval: |
| return |
| self._last_maintenance[project_id] = now |
| try: |
| self.consolidate(project_id, user_id=user_id) |
| self.prune(project_id, user_id=user_id) |
| except Exception as e: |
| print(f"[MEMORY_V2] Maintenance warning (non-fatal): {e}") |
|
|
| |
|
|
| def consolidate(self, project_id: str, user_id: Optional[str] = None) -> None: |
| """ |
| Promote Working -> Semantic if: |
| - Repetition (similar to 2+ other W items OR similar to existing S item) |
| - Still active (activation >= threshold) |
| - Important enough (importance >= threshold) |
| |
| Also: when W overlaps with an existing S entry, reinforce that S entry. |
| """ |
| mem = _select_memories(project_id, user_id=user_id) |
| working = [m for m in mem if (m.get("mem_type") or "S") == "W"] |
| semantic = [m for m in mem if (m.get("mem_type") or "S") == "S"] |
|
|
| if not working: |
| return |
|
|
| |
| working_sorted = sorted( |
| working, |
| key=lambda x: float(x.get("last_seen_at") or 0), |
| reverse=True, |
| )[:15] |
|
|
| for w in working_sorted: |
| w_text = w.get("value") or "" |
| w_strength = float(w.get("strength") or 0.5) |
| w_last = float(w.get("last_access_at") or w.get("last_seen_at") or 0.0) |
| if w_last <= 0: |
| w_last = _now() |
|
|
| |
| act = _activation(w_strength, w_last, self.cfg.tau_working) |
|
|
| |
| rep = 0 |
| for w2 in working_sorted: |
| if w2["id"] == w["id"]: |
| continue |
| if _keyword_score(w2.get("value") or "", w_text) >= 0.6: |
| rep += 1 |
|
|
| |
| imp = float(w.get("importance") or 0.25) |
| if re.search(r"\b(prefer|always|never|important|boundary|hate|love)\b", w_text, re.I): |
| imp = max(imp, 0.5) |
|
|
| |
| best_sem = None |
| best_score = 0.0 |
| for s in semantic: |
| sc = _keyword_score(w_text, s.get("value") or "") |
| if sc > best_score: |
| best_score = sc |
| best_sem = s |
|
|
| |
| if best_sem and best_score >= 0.45: |
| _touch_access(project_id, int(best_sem["id"]), self.cfg.eta_inferred) |
| continue |
|
|
| |
| if ( |
| rep >= self.cfg.consolidate_min_repeats |
| and imp >= self.cfg.consolidate_min_importance |
| and act >= self.cfg.consolidate_min_activation |
| ): |
| _upsert_memory( |
| project_id=project_id, |
| category="semantic", |
| key=f"s:{_stable_hash(w_text)}", |
| value=f"Stable note: {w_text}", |
| mem_type="S", |
| source_type="inferred", |
| confidence=0.55, |
| strength=0.55, |
| importance=_clamp(imp, 0.0, 1.0), |
| reinforce_eta=self.cfg.eta_inferred, |
| user_id=user_id, |
| ) |
| |
| _delete_by_id(project_id, int(w["id"])) |
|
|
| |
|
|
| def prune(self, project_id: str, user_id: Optional[str] = None) -> None: |
| """ |
| Prune low-activation + low-importance Semantic entries (forgetting). |
| Never prune Pinned (P). Trim excessive Working noise to ~25 items. |
| """ |
| mem = _select_memories(project_id, user_id=user_id) |
|
|
| |
| for s in mem: |
| if (s.get("mem_type") or "S") != "S": |
| continue |
| strength = float(s.get("strength") or 0.5) |
| last = float(s.get("last_access_at") or s.get("last_seen_at") or 0.0) |
| if last <= 0: |
| last = _now() |
| imp = float(s.get("importance") or 0.3) |
| act = _activation(strength, last, self.cfg.tau_semantic) |
| if act < self.cfg.prune_activation_thresh and imp < self.cfg.prune_importance_thresh: |
| _delete_by_id(project_id, int(s["id"])) |
|
|
| |
| working = [m for m in mem if (m.get("mem_type") or "S") == "W"] |
| if len(working) > 25: |
| working_sorted = sorted( |
| working, |
| key=lambda x: float(x.get("last_seen_at") or 0.0), |
| reverse=True, |
| ) |
| for w in working_sorted[25:]: |
| _delete_by_id(project_id, int(w["id"])) |
|
|
| |
|
|
| def build_context(self, project_id: str, query: str, user_id: Optional[str] = None) -> str: |
| """ |
| Build a compact memory context block for the system prompt. |
| Retrieves: top Pinned + top Semantic (scored by relevance+activation) + 1 Working. |
| Reinforces retrieved Semantic entries (light touch). |
| """ |
| mem = _select_memories(project_id, user_id=user_id) |
|
|
| pinned = [m for m in mem if (m.get("mem_type") or "S") == "P"] |
| sem = [m for m in mem if (m.get("mem_type") or "S") == "S"] |
| working = [m for m in mem if (m.get("mem_type") or "S") == "W"] |
|
|
| |
| pinned_sorted = sorted( |
| pinned, |
| key=lambda x: ( |
| float(x.get("importance") or 0.9), |
| float(x.get("last_seen_at") or 0.0), |
| ), |
| reverse=True, |
| )[:self.cfg.top_pinned] |
|
|
| |
| scored_sem: List[Tuple[float, Dict[str, Any]]] = [] |
| for s in sem: |
| strength = float(s.get("strength") or 0.5) |
| last = float(s.get("last_access_at") or s.get("last_seen_at") or 0.0) |
| if last <= 0: |
| last = _now() |
| imp = float(s.get("importance") or 0.3) |
| act = _activation(strength, last, self.cfg.tau_semantic) |
| rel = _keyword_score(query, s.get("value") or "") |
| score = 0.55 * rel + 0.30 * act + 0.15 * imp |
| scored_sem.append((score, s)) |
| scored_sem.sort(key=lambda x: x[0], reverse=True) |
| sem_top = [s for _, s in scored_sem[:self.cfg.top_semantic]] |
|
|
| |
| for s in sem_top: |
| try: |
| _touch_access(project_id, int(s["id"]), self.cfg.eta_inferred) |
| except Exception: |
| pass |
|
|
| |
| working_sorted = sorted( |
| working, |
| key=lambda x: float(x.get("last_seen_at") or 0.0), |
| reverse=True, |
| )[:self.cfg.top_working] |
|
|
| |
| lines: List[str] = [] |
|
|
| if pinned_sorted: |
| lines.append("PINNED MEMORY (user-approved, permanent):") |
| for m in pinned_sorted: |
| lines.append(f" - {m.get('value')}") |
| lines.append("") |
|
|
| if sem_top: |
| lines.append("LEARNED MEMORY (semantic, long-term):") |
| for m in sem_top: |
| conf = float(m.get("confidence") or 0.5) |
| tag = "" if conf >= 0.8 else " (uncertain)" |
| lines.append(f" - {m.get('value')}{tag}") |
| lines.append("") |
|
|
| if working_sorted: |
| lines.append("WORKING CONTEXT (short-lived):") |
| for m in working_sorted: |
| lines.append(f" - {m.get('value')}") |
| lines.append("") |
|
|
| return "\n".join(lines).strip() |
|
|
|
|
| |
| |
| |
|
|
| _engine: Optional[MemoryV2Engine] = None |
|
|
|
|
| def get_memory_v2() -> MemoryV2Engine: |
| global _engine |
| if _engine is None: |
| _engine = MemoryV2Engine() |
| return _engine |
|
|