Spaces:
Sleeping
Sleeping
| """Persistent 'grows-with-you' agent memory. | |
| Durable facts and preferences that personalize extraction over time: | |
| people->roles ("Dana is the soccer coach"), rules ("you decline Mondays"), | |
| default locations. Stored as JSON at MEMORY_PATH. | |
| - recall() -> a compact block injected into the agent prompt (server/agent.py) | |
| - remember() -> add/strengthen a fact (Memory tab, or a Hermes `remember` tool-call) | |
| - forget() -> drop a fact (Memory tab) | |
| - observe_plan()-> conservatively learn recurring contacts from extracted events | |
| This is the "memory" half of a Hermes-style grows-with-you agent; the model | |
| (served via INFERENCE_BASE_URL) is the reasoning half. | |
| """ | |
| from __future__ import annotations | |
| import json | |
| import os | |
| import threading | |
| from pathlib import Path | |
| MEMORY_PATH = Path(os.environ.get("MEMORY_PATH", "/tmp/agent_memory.json")) | |
| MAX_FACTS = 200 | |
| KINDS = ("contact", "preference", "location", "note") | |
| _lock = threading.Lock() | |
| def _norm(text: str) -> str: | |
| return " ".join(text.lower().split()) | |
| def _load() -> dict: | |
| try: | |
| data = json.loads(MEMORY_PATH.read_text()) | |
| if isinstance(data, dict) and isinstance(data.get("facts"), list): | |
| return data | |
| except Exception: # noqa: BLE001 missing/corrupt -> empty | |
| pass | |
| return {"facts": [], "seq": 0} | |
| def _save(state: dict) -> None: | |
| MEMORY_PATH.parent.mkdir(parents=True, exist_ok=True) | |
| MEMORY_PATH.write_text(json.dumps(state, indent=2)) | |
| def remember(text: str, kind: str = "note") -> dict | None: | |
| """Add a fact, or strengthen (bump weight) an existing one with the same text.""" | |
| text = (text or "").strip() | |
| if not text: | |
| return None | |
| if kind not in KINDS: | |
| kind = "note" | |
| key = _norm(text) | |
| with _lock: | |
| state = _load() | |
| for f in state["facts"]: | |
| if _norm(f["text"]) == key: | |
| f["weight"] = f.get("weight", 1) + 1 | |
| _save(state) | |
| return f | |
| state["seq"] += 1 | |
| fact = {"id": state["seq"], "kind": kind, "text": text, "weight": 1} | |
| state["facts"].append(fact) | |
| state["facts"] = state["facts"][-MAX_FACTS:] | |
| _save(state) | |
| return fact | |
| def forget(fact_id: int) -> bool: | |
| with _lock: | |
| state = _load() | |
| before = len(state["facts"]) | |
| state["facts"] = [f for f in state["facts"] if f["id"] != int(fact_id)] | |
| changed = len(state["facts"]) != before | |
| if changed: | |
| _save(state) | |
| return changed | |
| def list_facts() -> list[dict]: | |
| return _load()["facts"] | |
| def recall(limit: int = 20) -> str: | |
| """Compact 'what I know about you' block for the prompt; '' if empty. | |
| Strongest (most-reinforced) facts first so the prompt stays small but useful. | |
| """ | |
| facts = sorted(list_facts(), key=lambda f: f.get("weight", 1), reverse=True)[:limit] | |
| if not facts: | |
| return "" | |
| lines = "\n".join(f"- {f['text']}" for f in facts) | |
| return "What I know about you (memory):\n" + lines | |
| def observe_plan(plan) -> None: | |
| """Conservatively learn from an extracted ActionPlan: record event attendees as | |
| contacts (reinforced over time). Cheap, deterministic 'growth' without an LLM | |
| round-trip; explicit facts still come via remember()/the Memory tab/tool-calls.""" | |
| try: | |
| for ev in getattr(plan, "events", []) or []: | |
| for name in getattr(ev, "attendees", []) or []: | |
| name = (name or "").strip() | |
| if name and len(name) <= 40: | |
| remember(f"{name} is a contact you make plans with", kind="contact") | |
| except Exception: # noqa: BLE001 memory must never break extraction | |
| pass | |
| def reset() -> None: | |
| """Clear memory (used by tests).""" | |
| with _lock: | |
| _save({"facts": [], "seq": 0}) | |
| # --------------------------------------------------------------------------- # | |
| # Client-owned memory (per-user, browser localStorage). These are PURE helpers | |
| # that operate on a passed-in facts list — no global file — so each visitor's | |
| # memory can live on their device and be threaded through the agent per request. | |
| # --------------------------------------------------------------------------- # | |
| def facts_to_recall(facts: list[dict], limit: int = 20) -> str: | |
| """Same compact 'what I know about you' block as recall(), but for a passed | |
| facts list (client/localStorage memory). '' if empty.""" | |
| facts = sorted(facts or [], key=lambda f: f.get("weight", 1), reverse=True)[:limit] | |
| if not facts: | |
| return "" | |
| lines = "\n".join(f"- {f['text']}" for f in facts if (f or {}).get("text")) | |
| return "What I know about you (memory):\n" + lines if lines else "" | |
| def merge_facts(facts: list[dict], texts, kind: str = "note") -> list[dict]: | |
| """Add texts to a facts list (dedup by normalized text → bump weight), keeping | |
| ids stable. Returns a NEW list (caller persists it). `texts` is an iterable of | |
| strings, or of (text, kind) pairs.""" | |
| facts = [dict(f) for f in (facts or [])] | |
| by_key = {_norm(f["text"]): f for f in facts if f.get("text")} | |
| next_id = max((int(f.get("id", 0)) for f in facts), default=0) + 1 | |
| for item in texts or []: | |
| if isinstance(item, (tuple, list)): | |
| text, k = item[0], (item[1] if len(item) > 1 else kind) | |
| else: | |
| text, k = item, kind | |
| text = (text or "").strip() | |
| if not text: | |
| continue | |
| if k not in KINDS: | |
| k = "note" | |
| key = _norm(text) | |
| if key in by_key: | |
| f = by_key[key] | |
| f["weight"] = f.get("weight", 1) + 1 | |
| else: | |
| f = {"id": next_id, "kind": k, "text": text, "weight": 1} | |
| next_id += 1 | |
| facts.append(f) | |
| by_key[key] = f | |
| return facts[-MAX_FACTS:] | |
| def learn_from_plan(plan) -> list[str]: | |
| """Contact texts to learn from an ActionPlan (the observe_plan() logic, but | |
| RETURNED for client-side merge instead of written to the global file).""" | |
| out: list[str] = [] | |
| try: | |
| for ev in getattr(plan, "events", []) or []: | |
| for name in getattr(ev, "attendees", []) or []: | |
| name = (name or "").strip() | |
| if name and len(name) <= 40: | |
| out.append(f"{name} is a contact you make plans with") | |
| except Exception: # noqa: BLE001 memory must never break extraction | |
| pass | |
| return out | |