File size: 6,412 Bytes
0366d65
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
"""Persistent 'grows-with-you' agent memory.

Durable facts and preferences that personalize extraction over time:
people->roles ("Dana is the soccer coach"), rules ("you decline Mondays"),
default locations. Stored as JSON at MEMORY_PATH.

- recall()      -> a compact block injected into the agent prompt (server/agent.py)
- remember()    -> add/strengthen a fact (Memory tab, or a Hermes `remember` tool-call)
- forget()      -> drop a fact (Memory tab)
- observe_plan()-> conservatively learn recurring contacts from extracted events

This is the "memory" half of a Hermes-style grows-with-you agent; the model
(served via INFERENCE_BASE_URL) is the reasoning half.
"""
from __future__ import annotations

import json
import os
import threading
from pathlib import Path

MEMORY_PATH = Path(os.environ.get("MEMORY_PATH", "/tmp/agent_memory.json"))
MAX_FACTS = 200
KINDS = ("contact", "preference", "location", "note")

_lock = threading.Lock()


def _norm(text: str) -> str:
    return " ".join(text.lower().split())


def _load() -> dict:
    try:
        data = json.loads(MEMORY_PATH.read_text())
        if isinstance(data, dict) and isinstance(data.get("facts"), list):
            return data
    except Exception:  # noqa: BLE001  missing/corrupt -> empty
        pass
    return {"facts": [], "seq": 0}


def _save(state: dict) -> None:
    MEMORY_PATH.parent.mkdir(parents=True, exist_ok=True)
    MEMORY_PATH.write_text(json.dumps(state, indent=2))


def remember(text: str, kind: str = "note") -> dict | None:
    """Add a fact, or strengthen (bump weight) an existing one with the same text."""
    text = (text or "").strip()
    if not text:
        return None
    if kind not in KINDS:
        kind = "note"
    key = _norm(text)
    with _lock:
        state = _load()
        for f in state["facts"]:
            if _norm(f["text"]) == key:
                f["weight"] = f.get("weight", 1) + 1
                _save(state)
                return f
        state["seq"] += 1
        fact = {"id": state["seq"], "kind": kind, "text": text, "weight": 1}
        state["facts"].append(fact)
        state["facts"] = state["facts"][-MAX_FACTS:]
        _save(state)
        return fact


def forget(fact_id: int) -> bool:
    with _lock:
        state = _load()
        before = len(state["facts"])
        state["facts"] = [f for f in state["facts"] if f["id"] != int(fact_id)]
        changed = len(state["facts"]) != before
        if changed:
            _save(state)
        return changed


def list_facts() -> list[dict]:
    return _load()["facts"]


def recall(limit: int = 20) -> str:
    """Compact 'what I know about you' block for the prompt; '' if empty.

    Strongest (most-reinforced) facts first so the prompt stays small but useful.
    """
    facts = sorted(list_facts(), key=lambda f: f.get("weight", 1), reverse=True)[:limit]
    if not facts:
        return ""
    lines = "\n".join(f"- {f['text']}" for f in facts)
    return "What I know about you (memory):\n" + lines


def observe_plan(plan) -> None:
    """Conservatively learn from an extracted ActionPlan: record event attendees as
    contacts (reinforced over time). Cheap, deterministic 'growth' without an LLM
    round-trip; explicit facts still come via remember()/the Memory tab/tool-calls."""
    try:
        for ev in getattr(plan, "events", []) or []:
            for name in getattr(ev, "attendees", []) or []:
                name = (name or "").strip()
                if name and len(name) <= 40:
                    remember(f"{name} is a contact you make plans with", kind="contact")
    except Exception:  # noqa: BLE001  memory must never break extraction
        pass


def reset() -> None:
    """Clear memory (used by tests)."""
    with _lock:
        _save({"facts": [], "seq": 0})


# --------------------------------------------------------------------------- #
# Client-owned memory (per-user, browser localStorage). These are PURE helpers
# that operate on a passed-in facts list — no global file — so each visitor's
# memory can live on their device and be threaded through the agent per request.
# --------------------------------------------------------------------------- #
def facts_to_recall(facts: list[dict], limit: int = 20) -> str:
    """Same compact 'what I know about you' block as recall(), but for a passed
    facts list (client/localStorage memory). '' if empty."""
    facts = sorted(facts or [], key=lambda f: f.get("weight", 1), reverse=True)[:limit]
    if not facts:
        return ""
    lines = "\n".join(f"- {f['text']}" for f in facts if (f or {}).get("text"))
    return "What I know about you (memory):\n" + lines if lines else ""


def merge_facts(facts: list[dict], texts, kind: str = "note") -> list[dict]:
    """Add texts to a facts list (dedup by normalized text → bump weight), keeping
    ids stable. Returns a NEW list (caller persists it). `texts` is an iterable of
    strings, or of (text, kind) pairs."""
    facts = [dict(f) for f in (facts or [])]
    by_key = {_norm(f["text"]): f for f in facts if f.get("text")}
    next_id = max((int(f.get("id", 0)) for f in facts), default=0) + 1
    for item in texts or []:
        if isinstance(item, (tuple, list)):
            text, k = item[0], (item[1] if len(item) > 1 else kind)
        else:
            text, k = item, kind
        text = (text or "").strip()
        if not text:
            continue
        if k not in KINDS:
            k = "note"
        key = _norm(text)
        if key in by_key:
            f = by_key[key]
            f["weight"] = f.get("weight", 1) + 1
        else:
            f = {"id": next_id, "kind": k, "text": text, "weight": 1}
            next_id += 1
            facts.append(f)
            by_key[key] = f
    return facts[-MAX_FACTS:]


def learn_from_plan(plan) -> list[str]:
    """Contact texts to learn from an ActionPlan (the observe_plan() logic, but
    RETURNED for client-side merge instead of written to the global file)."""
    out: list[str] = []
    try:
        for ev in getattr(plan, "events", []) or []:
            for name in getattr(ev, "attendees", []) or []:
                name = (name or "").strip()
                if name and len(name) <= 40:
                    out.append(f"{name} is a contact you make plans with")
    except Exception:  # noqa: BLE001  memory must never break extraction
        pass
    return out