File size: 19,397 Bytes
f487b74
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c3f5c19
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f487b74
9dd6dab
f487b74
 
 
a196e34
f487b74
 
a196e34
f487b74
 
 
c3f5c19
f487b74
c3f5c19
 
f487b74
 
 
c3f5c19
 
 
 
 
 
 
 
 
f487b74
9dd6dab
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f487b74
 
 
9dd6dab
f487b74
 
 
 
 
 
 
 
 
 
 
 
 
a2ca0e0
 
 
 
 
 
 
f487b74
 
 
c3f5c19
f487b74
9dd6dab
f487b74
 
 
 
 
c3f5c19
f487b74
 
c3f5c19
 
f487b74
c3f5c19
 
 
 
f487b74
 
c3f5c19
f487b74
 
 
c3f5c19
 
f487b74
 
c3f5c19
f487b74
c3f5c19
 
 
9dd6dab
c3f5c19
 
 
 
a2ca0e0
c3f5c19
 
f487b74
c3f5c19
 
 
 
 
f487b74
 
 
 
 
c3f5c19
f487b74
c3f5c19
 
f487b74
 
 
 
 
c3f5c19
f487b74
 
a2ca0e0
 
 
 
 
 
 
f487b74
 
a196e34
 
 
 
 
 
 
a2ca0e0
a196e34
a2ca0e0
 
 
 
 
a196e34
 
 
 
 
 
 
 
 
 
 
 
 
 
f487b74
 
c3f5c19
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a2ca0e0
 
 
 
 
c3f5c19
 
 
 
a2ca0e0
 
 
 
 
c3f5c19
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a2ca0e0
 
c3f5c19
 
 
 
a2ca0e0
 
 
 
 
c3f5c19
 
f487b74
 
9dd6dab
f487b74
 
 
 
 
 
 
 
 
 
 
a2ca0e0
a71301e
 
f487b74
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a2ca0e0
a71301e
 
f487b74
 
 
 
 
 
c3f5c19
f487b74
c3f5c19
 
 
f487b74
 
 
 
 
 
 
 
 
 
9dd6dab
f487b74
 
 
 
 
c3f5c19
 
 
f487b74
c3f5c19
 
 
 
 
 
 
f487b74
 
c3f5c19
 
 
 
 
f487b74
c3f5c19
 
 
 
 
 
 
 
 
f487b74
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
"""Retrieval index for the salience *relevance* term β€” a derived ledger lens.

The append-only event ledger is the single source of truth (ADR-0005). This
module adds an optional **semantic** retrieval index *over* that ledger: it does
not store anything the ledger does not already own, and it can be wiped and
rebuilt from the ledger at any time. It is a faster lens on the same events, not
a second store (ADR-0018).

Two pieces:

  * :class:`MemoryIndex` β€” a tiny protocol the salience layer can lean on:
    ``index(events)`` derives vector entries from ledger events (idempotent,
    keyed by ``event.id``) and ``search(query, k)`` returns the most relevant
    events back. Any backend that satisfies this protocol can supply semantic
    relevance β€” a vector service, a local embedding store, or a fake in tests.

  * Two concrete backends behind that protocol, both **lazy-imported and
    env-gated** so nothing is imported and :class:`~src.core.memory.SalienceMemory`
    stays on its keyword-Jaccard relevance unless an index is configured:

      - :class:`Mem0MemoryIndex` β€” the default, **off the grid**. Wraps the
        ``mem0`` OSS ``Memory`` with a local sentence-transformers embedder; no
        API key, nothing leaves the machine (ADR-0019).
      - :class:`Mem0CloudIndex` β€” opt-in hosted backend. Wraps the ``mem0``
        platform ``MemoryClient`` (api.mem0.ai). **Activating it sends ledger
        event text to mem0's servers** and needs ``MEM0_API_KEY`` β€” a deliberate
        departure from the off-the-grid default, so it is never the default
        (ADR-0020).

Because the index is derived, both backends upsert each event under its
``event.id`` (process-local dedup) so re-indexing the same events is a no-op (no
duplicates) β€” this is what makes the index rebuildable rather than authoritative.
"""

from __future__ import annotations

import os
import time
from typing import TYPE_CHECKING, Protocol, runtime_checkable

from src import observability as obs
from src.core.events import Event

if TYPE_CHECKING:  # pragma: no cover - typing only
    from mem0 import Memory, MemoryClient

#: Env gate. Set to a truthy value to activate the local semantic index, or to a
#: cloud spelling (see :data:`_CLOUD_VALUES`) for the hosted backend; unset (the
#: default) keeps memory on the offline keyword path with nothing imported.
INDEX_ENV = "MEMORY_INDEX"

#: Optional explicit backend selector (``local`` | ``cloud``). Takes precedence
#: over the spelling of ``MEMORY_INDEX`` when set.
BACKEND_ENV = "MEMORY_INDEX_BACKEND"

#: Truthy spellings accepted for the gate and boolean sub-options (β†’ local backend).
_TRUTHY: frozenset[str] = frozenset({"1", "true", "yes", "on", "mem0", "local"})

#: Spellings of ``MEMORY_INDEX`` that select the hosted mem0 platform backend.
_CLOUD_VALUES: frozenset[str] = frozenset({"cloud", "mem0-cloud", "platform", "hosted"})

#: Default mem0 config when ``MEMORY_INDEX_CONFIG`` is unset: embed LOCALLY with
#: sentence-transformers (no API key; fully offline once the model is cached), so
#: the active index stays off the grid like the rest of the engine. The index
#: stores ledger events verbatim (``infer=False``) and search embeds the query
#: locally, so mem0's generative LLM is never invoked β€” the placeholder key just
#: keeps its construction from demanding a cloud credential. Any of this can be
#: overridden with a ``MEMORY_INDEX_CONFIG`` JSON blob (passed verbatim to mem0).
_LOCAL_INDEX_CONFIG: dict = {
    "embedder": {
        "provider": "huggingface",
        "config": {"model": "sentence-transformers/all-MiniLM-L6-v2"},
    },
    "llm": {"provider": "openai", "config": {"api_key": "EMPTY"}},
}


# ── protocol ──────────────────────────────────────────────────────────────────


@runtime_checkable
class MemoryIndex(Protocol):
    """A derived, rebuildable semantic index over ledger events.

    Implementations MUST treat the ledger as authoritative: ``index`` is an
    idempotent upsert keyed by ``event.id`` (re-indexing never duplicates), and
    ``search`` only ever returns events that were previously indexed.
    """

    def index(self, events: tuple[Event, ...]) -> None:
        """Derive/refresh index entries for *events* (idempotent by ``event.id``)."""
        ...

    def search(self, query: str, k: int, run_id: str | None = None) -> list[Event]:
        """Return up to *k* indexed events most semantically relevant to *query*.

        When *run_id* is given, hits MUST be scoped to that run β€” the ledger holds
        every run, and recall across runs would leak one show's (or one user's)
        discussion into another's context.
        """
        ...


# ── shared event ⇄ entry helpers ──────────────────────────────────────────────


def _event_text(event: Event) -> str:
    """The natural-language surface of an event used for embedding/recall."""
    return str(event.payload.get("text") or event.payload.get("summary") or event.payload)


# ── shared mem0 backend base ──────────────────────────────────────────────────


class _Mem0BackendBase:
    """Shared :class:`MemoryIndex` machinery for the mem0-backed indexes.

    Subclasses supply the three variation points β€” how the client is built and
    how a single event is stored / queried β€” while this base owns the protocol
    surface that keeps the index *derived*: idempotent upsert keyed by
    ``event.id`` and search-hit β†’ :class:`Event` reconstruction from metadata.
    """

    #: mem0 scopes memories to an id; the index is engine-wide, so a fixed
    #: namespace keeps every event in one searchable space.
    _NAMESPACE = "ledger"

    def __init__(self) -> None:
        self._mem: object | None = None
        self._indexed: set[str] = set()

    # ── variation points (subclass) ───────────────────────────────────────────

    def _build_memory(self) -> object:
        """Construct the underlying mem0 client (lazy-imported by the subclass)."""
        raise NotImplementedError

    def _store(self, mem: object, event: Event) -> None:
        """Upsert one event verbatim into *mem* (``infer=False``; ledger is truth)."""
        raise NotImplementedError

    def _query(self, mem: object, query: str, k: int, run_id: str | None) -> list[dict]:
        """Run semantic search on *mem*; return raw hit dicts (carrying metadata)."""
        raise NotImplementedError

    # ── lazy construction ─────────────────────────────────────────────────────

    def _memory(self) -> object:
        if self._mem is None:
            self._mem = self._build_memory()
        return self._mem

    # ── MemoryIndex protocol ──────────────────────────────────────────────────

    def index(self, events: tuple[Event, ...]) -> None:
        """Upsert *events*, keyed by ``event.id`` β€” idempotent within the process.

        Dedup happens *before* the client is built, so re-indexing the same
        ledger slice each turn never re-embeds and never forces a mem0 import."""
        fresh = [e for e in events if e.id not in self._indexed]
        if not fresh:
            return
        mem = self._memory()
        for event in fresh:
            self._store(mem, event)
            self._indexed.add(event.id)

    def search(self, query: str, k: int, run_id: str | None = None) -> list[Event]:
        """Semantic search; map hits back to :class:`Event` via stored metadata.

        *run_id* scopes recall to one run, filtered both natively (mem0's own
        ``run_id`` identity, pushed down to the vector store) and defensively
        here on the reconstructed event β€” belt and suspenders against backends
        that ignore unknown filters."""
        if not query or k <= 0:
            return []
        with obs.span(
            "memory.index.search",
            **{"memory.query": query, "memory.k": k, "memory.backend": type(self).__name__},
        ):
            started = time.perf_counter()
            mem = self._memory()
            events: list[Event] = []
            for hit in self._query(mem, query, k, run_id):
                event = _event_from_metadata(hit.get("metadata"))
                if event is None:
                    continue
                if run_id is not None and event.run_id != run_id:
                    continue
                events.append(event)
            elapsed_ms = (time.perf_counter() - started) * 1000
            obs.add_span_attrs(**{"memory.hits": len(events), "memory.latency_ms": round(elapsed_ms, 2)})
            obs.observe("memory.index.hits", len(events))
            obs.observe("memory.index.latency_ms", elapsed_ms)
            obs.log(
                "memory.index.search",
                level="debug",
                backend=type(self).__name__,
                query=query,
                k=k,
                hits=len(events),
                latency_ms=round(elapsed_ms, 2),
            )
            return events


# ── local (off-the-grid) backend ──────────────────────────────────────────────


class Mem0MemoryIndex(_Mem0BackendBase):
    """Local semantic :class:`MemoryIndex` backed by the ``mem0`` OSS ``Memory``.

    Derived, not authoritative, and **off the grid**: each ledger event is
    upserted as one raw memory (``infer=False`` β€” text stored verbatim, **no
    model extraction**) carrying the full event in ``metadata`` so a search hit
    reconstructs the :class:`Event` without a second lookup. Embeddings run
    locally via sentence-transformers by default (:data:`_LOCAL_INDEX_CONFIG`).

    Configuration (env, read by :func:`memory_index_from_env`):

      * ``MEMORY_INDEX`` β€” gate; truthy (``1``/``true``/``local``/…) activates this
        backend, unset disables it.
      * ``MEMORY_INDEX_CONFIG`` β€” optional JSON config forwarded verbatim to
        ``mem0.Memory.from_config``, replacing the local default (pick a different
        embedder, or persist vectors in the project's Postgres/pgvector, ADR-0014).

    ``mem0`` is imported lazily inside :meth:`_build_memory` so ``import src.*`` and
    ``import app`` work with the package not installed.
    """

    def __init__(self, config: dict | None = None) -> None:
        super().__init__()
        self._config = config

    def _build_memory(self) -> "Memory":
        from mem0 import Memory  # lazy: offline import must not require mem0

        return Memory.from_config(self._config or _LOCAL_INDEX_CONFIG)

    def _store(self, mem: object, event: Event) -> None:
        mem.add(  # type: ignore[attr-defined]
            _event_text(event),
            user_id=self._NAMESPACE,
            # mem0's native identity scopes: run_id partitions recall per run and
            # agent_id per actor, so filtering happens in the vector store rather
            # than post-hoc in Python (mem0 best practice).
            run_id=event.run_id,
            agent_id=event.actor or None,
            metadata=_event_metadata(event),
            infer=False,  # store verbatim; the ledger, not a model, is truth
        )

    def _query(self, mem: object, query: str, k: int, run_id: str | None) -> list[dict]:
        filters: dict = {"user_id": self._NAMESPACE}
        if run_id:
            filters["run_id"] = run_id
        return _result_items(mem.search(query, top_k=k, filters=filters))  # type: ignore[attr-defined]


# ── hosted (opt-in) backend ────────────────────────────────────────────────────


class Mem0CloudIndex(_Mem0BackendBase):
    """Hosted semantic :class:`MemoryIndex` backed by the ``mem0`` platform.

    Wraps ``mem0.MemoryClient`` (api.mem0.ai): embeddings, the vector store, and
    retrieval all live in mem0's managed service. The :class:`MemoryIndex`
    contract is identical to the local backend β€” derived, idempotent, ledger is
    truth β€” and events are still stored verbatim (``infer=False``) with the full
    event in ``metadata`` for reconstruction. The only difference is *where* the
    work happens.

    **Off-the-grid caveat (ADR-0019/0020).** Activating this backend sends ledger
    event text to mem0's servers and requires a ``MEM0_API_KEY``. It is therefore
    strictly opt-in and never the default; the local backend remains the engine's
    off-the-grid default.

    Configuration (env, read by :func:`memory_index_from_env`):

      * ``MEMORY_INDEX=cloud`` (or ``MEMORY_INDEX_BACKEND=cloud``) β€” selects this
        backend.
      * ``MEM0_API_KEY`` β€” required platform key (falls back to the client's own
        ``MEM0_API_KEY`` env read if not passed explicitly).
      * ``MEM0_ORG_ID`` / ``MEM0_PROJECT_ID`` / ``MEM0_HOST`` β€” optional scoping.

    ``mem0`` is imported lazily inside :meth:`_build_memory`, so the offline path
    needs neither the package nor a key.
    """

    def __init__(
        self,
        api_key: str | None = None,
        org_id: str | None = None,
        project_id: str | None = None,
        host: str | None = None,
    ) -> None:
        super().__init__()
        self._api_key = api_key
        self._org_id = org_id
        self._project_id = project_id
        self._host = host

    def _build_memory(self) -> "MemoryClient":
        from mem0 import MemoryClient  # lazy: offline import must not require mem0

        # Pass only what is set; MemoryClient falls back to MEM0_API_KEY from the
        # environment and raises loudly here (not at import) if no key is found.
        kwargs = {
            k: v
            for k, v in {
                "api_key": self._api_key,
                "org_id": self._org_id,
                "project_id": self._project_id,
                "host": self._host,
            }.items()
            if v
        }
        return MemoryClient(**kwargs)

    def _store(self, mem: object, event: Event) -> None:
        # The platform `add` takes chat-style messages; one verbatim user turn per
        # event, inference disabled so nothing but the ledger text is stored.
        mem.add(  # type: ignore[attr-defined]
            [{"role": "user", "content": _event_text(event)}],
            user_id=self._NAMESPACE,
            run_id=event.run_id,  # native per-run scope (see local backend)
            agent_id=event.actor or None,
            metadata=_event_metadata(event),
            infer=False,
        )

    def _query(self, mem: object, query: str, k: int, run_id: str | None) -> list[dict]:
        filters: dict = {"user_id": self._NAMESPACE}
        if run_id:
            filters["run_id"] = run_id
        return _result_items(mem.search(query, top_k=k, filters=filters))  # type: ignore[attr-defined]


# ── metadata round-trip (event ⇄ vector entry) ────────────────────────────────


def _event_metadata(event: Event) -> dict:
    """Flatten an event into JSON-safe metadata for the vector entry."""
    return {
        "event_id": event.id,
        "run_id": event.run_id,
        "turn": event.turn,
        "kind": event.kind,
        "actor": event.actor,
        "payload": event.payload,
        "created_at": event.created_at.isoformat(),
        "schema_version": event.schema_version,
        "session_id": event.session_id,
        "model_profile": event.model_profile,
        "model_id": event.model_id,
    }


def _event_from_metadata(metadata: dict | None) -> Event | None:
    """Reconstruct an :class:`Event` from stored metadata, or ``None`` if absent."""
    if not metadata or "event_id" not in metadata:
        return None
    try:
        return Event(
            id=str(metadata["event_id"]),
            run_id=str(metadata.get("run_id", "")),
            turn=int(metadata.get("turn", 0)),
            kind=str(metadata["kind"]),
            actor=str(metadata.get("actor", "")),
            payload=dict(metadata.get("payload") or {}),
            schema_version=int(metadata.get("schema_version", 1)),
            session_id=metadata.get("session_id") or None,
            model_profile=metadata.get("model_profile") or None,
            model_id=metadata.get("model_id") or None,
        )
    except (KeyError, ValueError, TypeError):  # pragma: no cover - defensive
        return None


def _result_items(hits: object) -> list[dict]:
    """Normalise mem0 ``search`` output to a list of hit dicts.

    Both the OSS ``Memory`` and the platform ``MemoryClient`` return either
    ``{"results": [...]}`` or a bare list depending on version/config; accept both
    so the backends are version-tolerant.
    """
    if isinstance(hits, dict):
        results = hits.get("results", [])
    else:
        results = hits
    return [h for h in results if isinstance(h, dict)] if isinstance(results, list) else []


# ── env gate ───────────────────────────────────────────────────────────────────


def _is_truthy(value: str | None) -> bool:
    return (value or "").strip().lower() in _TRUTHY


def memory_index_from_env(env: dict[str, str] | None = None) -> MemoryIndex | None:
    """Build a mem0-backed :class:`MemoryIndex` from the env, or ``None`` if unset.

    Selection (``mem0`` is only imported later, on first use):

      * gate unset / falsey β†’ ``None`` (the offline keyword path the suite exercises).
      * ``MEMORY_INDEX`` truthy (``1``/``true``/``local``/…) β†’ :class:`Mem0MemoryIndex`
        (local sentence-transformers; off the grid).
      * ``MEMORY_INDEX`` ∈ {cloud, mem0-cloud, platform, hosted}, or
        ``MEMORY_INDEX_BACKEND=cloud`` β†’ :class:`Mem0CloudIndex` (hosted; sends
        ledger text to mem0). An explicit ``MEMORY_INDEX_BACKEND`` wins over the
        ``MEMORY_INDEX`` spelling.
    """
    source = os.environ if env is None else env
    gate = (source.get(INDEX_ENV) or "").strip().lower()
    backend = (source.get(BACKEND_ENV) or "").strip().lower()

    is_cloud = backend == "cloud" or gate in _CLOUD_VALUES
    if not (is_cloud or _is_truthy(gate)):
        return None

    if is_cloud:
        return Mem0CloudIndex(
            api_key=source.get("MEM0_API_KEY") or None,
            org_id=source.get("MEM0_ORG_ID") or None,
            project_id=source.get("MEM0_PROJECT_ID") or None,
            host=source.get("MEM0_HOST") or None,
        )

    raw_config = (source.get("MEMORY_INDEX_CONFIG") or "").strip()
    config: dict | None = None
    if raw_config:
        import json

        config = json.loads(raw_config)
    return Mem0MemoryIndex(config=config)