Spaces:
Running on Zero
Running on Zero
| """Retrieval index for the salience *relevance* term β a derived ledger lens. | |
| The append-only event ledger is the single source of truth (ADR-0005). This | |
| module adds an optional **semantic** retrieval index *over* that ledger: it does | |
| not store anything the ledger does not already own, and it can be wiped and | |
| rebuilt from the ledger at any time. It is a faster lens on the same events, not | |
| a second store (ADR-0018). | |
| Two pieces: | |
| * :class:`MemoryIndex` β a tiny protocol the salience layer can lean on: | |
| ``index(events)`` derives vector entries from ledger events (idempotent, | |
| keyed by ``event.id``) and ``search(query, k)`` returns the most relevant | |
| events back. Any backend that satisfies this protocol can supply semantic | |
| relevance β a vector service, a local embedding store, or a fake in tests. | |
| * Two concrete backends behind that protocol, both **lazy-imported and | |
| env-gated** so nothing is imported and :class:`~src.core.memory.SalienceMemory` | |
| stays on its keyword-Jaccard relevance unless an index is configured: | |
| - :class:`Mem0MemoryIndex` β the default, **off the grid**. Wraps the | |
| ``mem0`` OSS ``Memory`` with a local sentence-transformers embedder; no | |
| API key, nothing leaves the machine (ADR-0019). | |
| - :class:`Mem0CloudIndex` β opt-in hosted backend. Wraps the ``mem0`` | |
| platform ``MemoryClient`` (api.mem0.ai). **Activating it sends ledger | |
| event text to mem0's servers** and needs ``MEM0_API_KEY`` β a deliberate | |
| departure from the off-the-grid default, so it is never the default | |
| (ADR-0020). | |
| Because the index is derived, both backends upsert each event under its | |
| ``event.id`` (process-local dedup) so re-indexing the same events is a no-op (no | |
| duplicates) β this is what makes the index rebuildable rather than authoritative. | |
| """ | |
| from __future__ import annotations | |
| import os | |
| import time | |
| from typing import TYPE_CHECKING, Protocol, runtime_checkable | |
| from src import observability as obs | |
| from src.core.events import Event | |
| if TYPE_CHECKING: # pragma: no cover - typing only | |
| from mem0 import Memory, MemoryClient | |
| #: Env gate. Set to a truthy value to activate the local semantic index, or to a | |
| #: cloud spelling (see :data:`_CLOUD_VALUES`) for the hosted backend; unset (the | |
| #: default) keeps memory on the offline keyword path with nothing imported. | |
| INDEX_ENV = "MEMORY_INDEX" | |
| #: Optional explicit backend selector (``local`` | ``cloud``). Takes precedence | |
| #: over the spelling of ``MEMORY_INDEX`` when set. | |
| BACKEND_ENV = "MEMORY_INDEX_BACKEND" | |
| #: Truthy spellings accepted for the gate and boolean sub-options (β local backend). | |
| _TRUTHY: frozenset[str] = frozenset({"1", "true", "yes", "on", "mem0", "local"}) | |
| #: Spellings of ``MEMORY_INDEX`` that select the hosted mem0 platform backend. | |
| _CLOUD_VALUES: frozenset[str] = frozenset({"cloud", "mem0-cloud", "platform", "hosted"}) | |
| #: Default mem0 config when ``MEMORY_INDEX_CONFIG`` is unset: embed LOCALLY with | |
| #: sentence-transformers (no API key; fully offline once the model is cached), so | |
| #: the active index stays off the grid like the rest of the engine. The index | |
| #: stores ledger events verbatim (``infer=False``) and search embeds the query | |
| #: locally, so mem0's generative LLM is never invoked β the placeholder key just | |
| #: keeps its construction from demanding a cloud credential. Any of this can be | |
| #: overridden with a ``MEMORY_INDEX_CONFIG`` JSON blob (passed verbatim to mem0). | |
| _LOCAL_INDEX_CONFIG: dict = { | |
| "embedder": { | |
| "provider": "huggingface", | |
| "config": {"model": "sentence-transformers/all-MiniLM-L6-v2"}, | |
| }, | |
| "llm": {"provider": "openai", "config": {"api_key": "EMPTY"}}, | |
| } | |
| # ββ protocol ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class MemoryIndex(Protocol): | |
| """A derived, rebuildable semantic index over ledger events. | |
| Implementations MUST treat the ledger as authoritative: ``index`` is an | |
| idempotent upsert keyed by ``event.id`` (re-indexing never duplicates), and | |
| ``search`` only ever returns events that were previously indexed. | |
| """ | |
| def index(self, events: tuple[Event, ...]) -> None: | |
| """Derive/refresh index entries for *events* (idempotent by ``event.id``).""" | |
| ... | |
| def search(self, query: str, k: int, run_id: str | None = None) -> list[Event]: | |
| """Return up to *k* indexed events most semantically relevant to *query*. | |
| When *run_id* is given, hits MUST be scoped to that run β the ledger holds | |
| every run, and recall across runs would leak one show's (or one user's) | |
| discussion into another's context. | |
| """ | |
| ... | |
| # ββ shared event β entry helpers ββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _event_text(event: Event) -> str: | |
| """The natural-language surface of an event used for embedding/recall.""" | |
| return str(event.payload.get("text") or event.payload.get("summary") or event.payload) | |
| # ββ shared mem0 backend base ββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class _Mem0BackendBase: | |
| """Shared :class:`MemoryIndex` machinery for the mem0-backed indexes. | |
| Subclasses supply the three variation points β how the client is built and | |
| how a single event is stored / queried β while this base owns the protocol | |
| surface that keeps the index *derived*: idempotent upsert keyed by | |
| ``event.id`` and search-hit β :class:`Event` reconstruction from metadata. | |
| """ | |
| #: mem0 scopes memories to an id; the index is engine-wide, so a fixed | |
| #: namespace keeps every event in one searchable space. | |
| _NAMESPACE = "ledger" | |
| def __init__(self) -> None: | |
| self._mem: object | None = None | |
| self._indexed: set[str] = set() | |
| # ββ variation points (subclass) βββββββββββββββββββββββββββββββββββββββββββ | |
| def _build_memory(self) -> object: | |
| """Construct the underlying mem0 client (lazy-imported by the subclass).""" | |
| raise NotImplementedError | |
| def _store(self, mem: object, event: Event) -> None: | |
| """Upsert one event verbatim into *mem* (``infer=False``; ledger is truth).""" | |
| raise NotImplementedError | |
| def _query(self, mem: object, query: str, k: int, run_id: str | None) -> list[dict]: | |
| """Run semantic search on *mem*; return raw hit dicts (carrying metadata).""" | |
| raise NotImplementedError | |
| # ββ lazy construction βββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _memory(self) -> object: | |
| if self._mem is None: | |
| self._mem = self._build_memory() | |
| return self._mem | |
| # ββ MemoryIndex protocol ββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def index(self, events: tuple[Event, ...]) -> None: | |
| """Upsert *events*, keyed by ``event.id`` β idempotent within the process. | |
| Dedup happens *before* the client is built, so re-indexing the same | |
| ledger slice each turn never re-embeds and never forces a mem0 import.""" | |
| fresh = [e for e in events if e.id not in self._indexed] | |
| if not fresh: | |
| return | |
| mem = self._memory() | |
| for event in fresh: | |
| self._store(mem, event) | |
| self._indexed.add(event.id) | |
| def search(self, query: str, k: int, run_id: str | None = None) -> list[Event]: | |
| """Semantic search; map hits back to :class:`Event` via stored metadata. | |
| *run_id* scopes recall to one run, filtered both natively (mem0's own | |
| ``run_id`` identity, pushed down to the vector store) and defensively | |
| here on the reconstructed event β belt and suspenders against backends | |
| that ignore unknown filters.""" | |
| if not query or k <= 0: | |
| return [] | |
| with obs.span( | |
| "memory.index.search", | |
| **{"memory.query": query, "memory.k": k, "memory.backend": type(self).__name__}, | |
| ): | |
| started = time.perf_counter() | |
| mem = self._memory() | |
| events: list[Event] = [] | |
| for hit in self._query(mem, query, k, run_id): | |
| event = _event_from_metadata(hit.get("metadata")) | |
| if event is None: | |
| continue | |
| if run_id is not None and event.run_id != run_id: | |
| continue | |
| events.append(event) | |
| elapsed_ms = (time.perf_counter() - started) * 1000 | |
| obs.add_span_attrs(**{"memory.hits": len(events), "memory.latency_ms": round(elapsed_ms, 2)}) | |
| obs.observe("memory.index.hits", len(events)) | |
| obs.observe("memory.index.latency_ms", elapsed_ms) | |
| obs.log( | |
| "memory.index.search", | |
| level="debug", | |
| backend=type(self).__name__, | |
| query=query, | |
| k=k, | |
| hits=len(events), | |
| latency_ms=round(elapsed_ms, 2), | |
| ) | |
| return events | |
| # ββ local (off-the-grid) backend ββββββββββββββββββββββββββββββββββββββββββββββ | |
| class Mem0MemoryIndex(_Mem0BackendBase): | |
| """Local semantic :class:`MemoryIndex` backed by the ``mem0`` OSS ``Memory``. | |
| Derived, not authoritative, and **off the grid**: each ledger event is | |
| upserted as one raw memory (``infer=False`` β text stored verbatim, **no | |
| model extraction**) carrying the full event in ``metadata`` so a search hit | |
| reconstructs the :class:`Event` without a second lookup. Embeddings run | |
| locally via sentence-transformers by default (:data:`_LOCAL_INDEX_CONFIG`). | |
| Configuration (env, read by :func:`memory_index_from_env`): | |
| * ``MEMORY_INDEX`` β gate; truthy (``1``/``true``/``local``/β¦) activates this | |
| backend, unset disables it. | |
| * ``MEMORY_INDEX_CONFIG`` β optional JSON config forwarded verbatim to | |
| ``mem0.Memory.from_config``, replacing the local default (pick a different | |
| embedder, or persist vectors in the project's Postgres/pgvector, ADR-0014). | |
| ``mem0`` is imported lazily inside :meth:`_build_memory` so ``import src.*`` and | |
| ``import app`` work with the package not installed. | |
| """ | |
| def __init__(self, config: dict | None = None) -> None: | |
| super().__init__() | |
| self._config = config | |
| def _build_memory(self) -> "Memory": | |
| from mem0 import Memory # lazy: offline import must not require mem0 | |
| return Memory.from_config(self._config or _LOCAL_INDEX_CONFIG) | |
| def _store(self, mem: object, event: Event) -> None: | |
| mem.add( # type: ignore[attr-defined] | |
| _event_text(event), | |
| user_id=self._NAMESPACE, | |
| # mem0's native identity scopes: run_id partitions recall per run and | |
| # agent_id per actor, so filtering happens in the vector store rather | |
| # than post-hoc in Python (mem0 best practice). | |
| run_id=event.run_id, | |
| agent_id=event.actor or None, | |
| metadata=_event_metadata(event), | |
| infer=False, # store verbatim; the ledger, not a model, is truth | |
| ) | |
| def _query(self, mem: object, query: str, k: int, run_id: str | None) -> list[dict]: | |
| filters: dict = {"user_id": self._NAMESPACE} | |
| if run_id: | |
| filters["run_id"] = run_id | |
| return _result_items(mem.search(query, top_k=k, filters=filters)) # type: ignore[attr-defined] | |
| # ββ hosted (opt-in) backend ββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class Mem0CloudIndex(_Mem0BackendBase): | |
| """Hosted semantic :class:`MemoryIndex` backed by the ``mem0`` platform. | |
| Wraps ``mem0.MemoryClient`` (api.mem0.ai): embeddings, the vector store, and | |
| retrieval all live in mem0's managed service. The :class:`MemoryIndex` | |
| contract is identical to the local backend β derived, idempotent, ledger is | |
| truth β and events are still stored verbatim (``infer=False``) with the full | |
| event in ``metadata`` for reconstruction. The only difference is *where* the | |
| work happens. | |
| **Off-the-grid caveat (ADR-0019/0020).** Activating this backend sends ledger | |
| event text to mem0's servers and requires a ``MEM0_API_KEY``. It is therefore | |
| strictly opt-in and never the default; the local backend remains the engine's | |
| off-the-grid default. | |
| Configuration (env, read by :func:`memory_index_from_env`): | |
| * ``MEMORY_INDEX=cloud`` (or ``MEMORY_INDEX_BACKEND=cloud``) β selects this | |
| backend. | |
| * ``MEM0_API_KEY`` β required platform key (falls back to the client's own | |
| ``MEM0_API_KEY`` env read if not passed explicitly). | |
| * ``MEM0_ORG_ID`` / ``MEM0_PROJECT_ID`` / ``MEM0_HOST`` β optional scoping. | |
| ``mem0`` is imported lazily inside :meth:`_build_memory`, so the offline path | |
| needs neither the package nor a key. | |
| """ | |
| def __init__( | |
| self, | |
| api_key: str | None = None, | |
| org_id: str | None = None, | |
| project_id: str | None = None, | |
| host: str | None = None, | |
| ) -> None: | |
| super().__init__() | |
| self._api_key = api_key | |
| self._org_id = org_id | |
| self._project_id = project_id | |
| self._host = host | |
| def _build_memory(self) -> "MemoryClient": | |
| from mem0 import MemoryClient # lazy: offline import must not require mem0 | |
| # Pass only what is set; MemoryClient falls back to MEM0_API_KEY from the | |
| # environment and raises loudly here (not at import) if no key is found. | |
| kwargs = { | |
| k: v | |
| for k, v in { | |
| "api_key": self._api_key, | |
| "org_id": self._org_id, | |
| "project_id": self._project_id, | |
| "host": self._host, | |
| }.items() | |
| if v | |
| } | |
| return MemoryClient(**kwargs) | |
| def _store(self, mem: object, event: Event) -> None: | |
| # The platform `add` takes chat-style messages; one verbatim user turn per | |
| # event, inference disabled so nothing but the ledger text is stored. | |
| mem.add( # type: ignore[attr-defined] | |
| [{"role": "user", "content": _event_text(event)}], | |
| user_id=self._NAMESPACE, | |
| run_id=event.run_id, # native per-run scope (see local backend) | |
| agent_id=event.actor or None, | |
| metadata=_event_metadata(event), | |
| infer=False, | |
| ) | |
| def _query(self, mem: object, query: str, k: int, run_id: str | None) -> list[dict]: | |
| filters: dict = {"user_id": self._NAMESPACE} | |
| if run_id: | |
| filters["run_id"] = run_id | |
| return _result_items(mem.search(query, top_k=k, filters=filters)) # type: ignore[attr-defined] | |
| # ββ metadata round-trip (event β vector entry) ββββββββββββββββββββββββββββββββ | |
| def _event_metadata(event: Event) -> dict: | |
| """Flatten an event into JSON-safe metadata for the vector entry.""" | |
| return { | |
| "event_id": event.id, | |
| "run_id": event.run_id, | |
| "turn": event.turn, | |
| "kind": event.kind, | |
| "actor": event.actor, | |
| "payload": event.payload, | |
| "created_at": event.created_at.isoformat(), | |
| "schema_version": event.schema_version, | |
| "session_id": event.session_id, | |
| "model_profile": event.model_profile, | |
| "model_id": event.model_id, | |
| } | |
| def _event_from_metadata(metadata: dict | None) -> Event | None: | |
| """Reconstruct an :class:`Event` from stored metadata, or ``None`` if absent.""" | |
| if not metadata or "event_id" not in metadata: | |
| return None | |
| try: | |
| return Event( | |
| id=str(metadata["event_id"]), | |
| run_id=str(metadata.get("run_id", "")), | |
| turn=int(metadata.get("turn", 0)), | |
| kind=str(metadata["kind"]), | |
| actor=str(metadata.get("actor", "")), | |
| payload=dict(metadata.get("payload") or {}), | |
| schema_version=int(metadata.get("schema_version", 1)), | |
| session_id=metadata.get("session_id") or None, | |
| model_profile=metadata.get("model_profile") or None, | |
| model_id=metadata.get("model_id") or None, | |
| ) | |
| except (KeyError, ValueError, TypeError): # pragma: no cover - defensive | |
| return None | |
| def _result_items(hits: object) -> list[dict]: | |
| """Normalise mem0 ``search`` output to a list of hit dicts. | |
| Both the OSS ``Memory`` and the platform ``MemoryClient`` return either | |
| ``{"results": [...]}`` or a bare list depending on version/config; accept both | |
| so the backends are version-tolerant. | |
| """ | |
| if isinstance(hits, dict): | |
| results = hits.get("results", []) | |
| else: | |
| results = hits | |
| return [h for h in results if isinstance(h, dict)] if isinstance(results, list) else [] | |
| # ββ env gate βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _is_truthy(value: str | None) -> bool: | |
| return (value or "").strip().lower() in _TRUTHY | |
| def memory_index_from_env(env: dict[str, str] | None = None) -> MemoryIndex | None: | |
| """Build a mem0-backed :class:`MemoryIndex` from the env, or ``None`` if unset. | |
| Selection (``mem0`` is only imported later, on first use): | |
| * gate unset / falsey β ``None`` (the offline keyword path the suite exercises). | |
| * ``MEMORY_INDEX`` truthy (``1``/``true``/``local``/β¦) β :class:`Mem0MemoryIndex` | |
| (local sentence-transformers; off the grid). | |
| * ``MEMORY_INDEX`` β {cloud, mem0-cloud, platform, hosted}, or | |
| ``MEMORY_INDEX_BACKEND=cloud`` β :class:`Mem0CloudIndex` (hosted; sends | |
| ledger text to mem0). An explicit ``MEMORY_INDEX_BACKEND`` wins over the | |
| ``MEMORY_INDEX`` spelling. | |
| """ | |
| source = os.environ if env is None else env | |
| gate = (source.get(INDEX_ENV) or "").strip().lower() | |
| backend = (source.get(BACKEND_ENV) or "").strip().lower() | |
| is_cloud = backend == "cloud" or gate in _CLOUD_VALUES | |
| if not (is_cloud or _is_truthy(gate)): | |
| return None | |
| if is_cloud: | |
| return Mem0CloudIndex( | |
| api_key=source.get("MEM0_API_KEY") or None, | |
| org_id=source.get("MEM0_ORG_ID") or None, | |
| project_id=source.get("MEM0_PROJECT_ID") or None, | |
| host=source.get("MEM0_HOST") or None, | |
| ) | |
| raw_config = (source.get("MEMORY_INDEX_CONFIG") or "").strip() | |
| config: dict | None = None | |
| if raw_config: | |
| import json | |
| config = json.loads(raw_config) | |
| return Mem0MemoryIndex(config=config) | |