| """Semantic memory index tests (ADR-0018). |
| |
| Three tiers, mirroring the optional-dependency tests elsewhere: |
| |
| * A FAKE in-memory ``MemoryIndex`` (no ``mem0`` required) proves the layering: |
| when an index is attached, ``SalienceMemory`` retrieves by semantic rank; |
| with none it falls back to keyword Jaccard. Indexing is idempotent. |
| * The env gate returns ``None`` when unset and a backend when set β provable |
| with no ``mem0`` installed (construction is lazy). |
| * A guarded real-``mem0`` round-trip (skipped without the package or an |
| embedder configured) asserts an event survives index β search. |
| """ |
|
|
| from __future__ import annotations |
|
|
| import os |
|
|
| import pytest |
|
|
| from src.agents.base import ManifestAgent |
| from src.core.events import Event |
| from src.core.manifest import AgentManifest, MemoryConfig |
| from src.core.memory import SalienceMemory |
| from src.core.memory_index import ( |
| Mem0CloudIndex, |
| Mem0MemoryIndex, |
| MemoryIndex, |
| memory_index_from_env, |
| ) |
| from src.models.router import ModelRouter |
|
|
|
|
| def _event(kind: str, actor: str = "x", turn: int = 1, text: str = "hello", eid: str | None = None) -> Event: |
| kwargs = {"run_id": "r", "turn": turn, "kind": kind, "actor": actor, "payload": {"text": text}} |
| if eid is not None: |
| kwargs["id"] = eid |
| return Event(**kwargs) |
|
|
|
|
| class _FakeIndex: |
| """A deterministic in-memory ``MemoryIndex`` β no ``mem0``, no embeddings. |
| |
| ``search`` ranks indexed events by substring/word overlap so a test can steer |
| *which* event the salience layer treats as most relevant, independently of |
| the keyword-Jaccard the offline path would compute. Records calls so a test |
| can assert idempotent indexing. |
| """ |
|
|
| def __init__(self) -> None: |
| self.store: dict[str, Event] = {} |
| self.add_calls: list[str] = [] |
| self.search_run_ids: list[str | None] = [] |
|
|
| def index(self, events: tuple[Event, ...]) -> None: |
| for e in events: |
| self.add_calls.append(e.id) |
| self.store[e.id] = e |
|
|
| def search(self, query: str, k: int, run_id: str | None = None) -> list[Event]: |
| self.search_run_ids.append(run_id) |
| q = set(query.lower().split()) |
| pool = [e for e in self.store.values() if run_id is None or e.run_id == run_id] |
| scored = [(len(q & set(str(e.payload.get("text", "")).lower().split())), e) for e in pool] |
| scored.sort(key=lambda t: t[0], reverse=True) |
| return [e for _, e in scored[:k]] |
|
|
|
|
| |
|
|
|
|
| class TestProtocol: |
| def test_fake_is_memory_index(self): |
| assert isinstance(_FakeIndex(), MemoryIndex) |
|
|
| def test_mem0_backend_is_memory_index(self): |
| |
| assert isinstance(Mem0MemoryIndex(), MemoryIndex) |
|
|
| def test_cloud_backend_is_memory_index(self): |
| |
| assert isinstance(Mem0CloudIndex(), MemoryIndex) |
|
|
|
|
| |
|
|
|
|
| class TestSalienceUsesIndex: |
| def test_semantic_hit_outranks_keyword_irrelevant(self): |
| """An event the index ranks top wins even with no keyword overlap to the |
| query β proving the relevance term came from the index, not Jaccard.""" |
| idx = _FakeIndex() |
| |
| |
| target = _event("world.observed", turn=2, text="beacon glow signal", eid="hit") |
| other = _event("world.observed", turn=2, text="quiet empty room", eid="miss") |
| mem = SalienceMemory("x", top_k=1, index=idx) |
|
|
| |
| recalled = mem.visible((other, target), current_turn=3, query="beacon glow") |
| assert [e.id for e in recalled] == ["hit"] |
|
|
| def test_falls_back_to_keyword_without_index(self): |
| match = _event("world.observed", turn=5, text="golden spores drift upward") |
| miss = _event("world.observed", turn=5, text="completely unrelated content") |
| mem = SalienceMemory("a") |
| s_match = mem.score(match, current_turn=6, query="golden spores") |
| s_miss = mem.score(miss, current_turn=6, query="golden spores") |
| assert s_match > s_miss |
|
|
| def test_index_is_populated_from_visible_events_only(self): |
| """The index is DERIVED from the ledger: only events that pass the |
| visibility filter are indexed, never another agent's private thoughts.""" |
| idx = _FakeIndex() |
| mine = _event("agent.thought", actor="a", turn=1, text="my secret", eid="mine") |
| theirs = _event("agent.thought", actor="b", turn=1, text="their secret", eid="theirs") |
| glob = _event("world.observed", actor="narrator", turn=1, text="the stage", eid="glob") |
| mem = SalienceMemory("a", index=idx) |
| mem.visible((mine, theirs, glob), current_turn=2, query="stage") |
| assert set(idx.store) == {"mine", "glob"} |
|
|
| def test_search_is_scoped_to_the_candidates_run(self): |
| """The index spans every run in the shared store; recall must be scoped to |
| the run the candidates came from, so one show's (or one user's) discussion |
| never crowds another's relevance budget.""" |
| idx = _FakeIndex() |
| |
| foreign = Event(run_id="other-run", turn=1, kind="world.observed", actor="n", payload={"text": "beacon glow"}) |
| idx.index((foreign,)) |
|
|
| ours = _event("world.observed", turn=2, text="beacon glow signal", eid="ours") |
| mem = SalienceMemory("x", top_k=2, index=idx) |
| recalled = mem.visible((ours,), current_turn=3, query="beacon glow") |
|
|
| assert idx.search_run_ids == ["r"] |
| assert [e.id for e in recalled] == ["ours"] |
|
|
| def test_recency_still_applies_with_index(self): |
| """Relevance is one term; recency must still separate equally-relevant |
| events so the index does not flatten the composite score.""" |
| idx = _FakeIndex() |
| old = _event("world.observed", turn=1, text="same words here", eid="old") |
| new = _event("world.observed", turn=10, text="same words here", eid="new") |
| mem = SalienceMemory("x", top_k=2, index=idx) |
| recalled = mem.visible((old, new), current_turn=12, query="same words here") |
| |
| s_old = mem.score(old, current_turn=12, query="x", relevance=1.0) |
| s_new = mem.score(new, current_turn=12, query="x", relevance=1.0) |
| assert s_new > s_old |
| assert {e.id for e in recalled} == {"old", "new"} |
|
|
| def test_format_for_prompt_shape_with_index(self): |
| idx = _FakeIndex() |
| e = _event("world.observed", turn=1, text="something", eid="e1") |
| out = SalienceMemory("x", index=idx).format_for_prompt((e,), current_turn=2, query="something") |
| assert isinstance(out, str) |
| assert "something" in out and "sal=" in out |
|
|
|
|
| |
|
|
|
|
| class TestIdempotentIndexing: |
| def test_reindex_does_not_duplicate(self): |
| idx = _FakeIndex() |
| events = ( |
| _event("world.observed", turn=1, text="a", eid="e1"), |
| _event("world.observed", turn=2, text="b", eid="e2"), |
| ) |
| idx.index(events) |
| idx.index(events) |
| assert len(idx.store) == 2 |
|
|
| def test_mem0_backend_skips_already_indexed_ids(self): |
| """The real backend dedupes by id before touching mem0, so a process that |
| re-indexes the same ledger slice each turn does not re-embed it.""" |
| backend = Mem0MemoryIndex() |
| backend._indexed.add("e1") |
| |
| |
| backend.index((_event("world.observed", eid="e1"),)) |
|
|
| def test_cloud_backend_skips_already_indexed_ids(self): |
| """Cloud dedup is identical and happens before the client is built β so a |
| repeat id is a no-op with mem0 absent and no MEM0_API_KEY set.""" |
| backend = Mem0CloudIndex() |
| backend._indexed.add("e1") |
| backend.index((_event("world.observed", eid="e1"),)) |
|
|
|
|
| |
|
|
|
|
| class TestEnvGate: |
| def test_none_when_unset(self): |
| assert memory_index_from_env({}) is None |
|
|
| def test_none_when_falsey(self): |
| assert memory_index_from_env({"MEMORY_INDEX": "0"}) is None |
|
|
| def test_backend_when_truthy(self): |
| idx = memory_index_from_env({"MEMORY_INDEX": "1"}) |
| assert isinstance(idx, Mem0MemoryIndex) |
|
|
| def test_config_blob_is_parsed(self): |
| idx = memory_index_from_env({"MEMORY_INDEX": "true", "MEMORY_INDEX_CONFIG": '{"version": "v1.1"}'}) |
| assert isinstance(idx, Mem0MemoryIndex) |
| assert idx._config == {"version": "v1.1"} |
|
|
| def test_truthy_gate_selects_local_not_cloud(self): |
| assert isinstance(memory_index_from_env({"MEMORY_INDEX": "1"}), Mem0MemoryIndex) |
|
|
| def test_cloud_spelling_selects_hosted_backend(self): |
| for spelling in ("cloud", "mem0-cloud", "platform", "hosted"): |
| idx = memory_index_from_env({"MEMORY_INDEX": spelling}) |
| assert isinstance(idx, Mem0CloudIndex), spelling |
|
|
| def test_backend_env_overrides_local_gate(self): |
| |
| idx = memory_index_from_env({"MEMORY_INDEX": "1", "MEMORY_INDEX_BACKEND": "cloud"}) |
| assert isinstance(idx, Mem0CloudIndex) |
|
|
| def test_cloud_reads_credentials_from_env(self): |
| idx = memory_index_from_env( |
| { |
| "MEMORY_INDEX": "cloud", |
| "MEM0_API_KEY": "k-123", |
| "MEM0_ORG_ID": "org-1", |
| "MEM0_PROJECT_ID": "proj-1", |
| } |
| ) |
| assert isinstance(idx, Mem0CloudIndex) |
| assert (idx._api_key, idx._org_id, idx._project_id) == ("k-123", "org-1", "proj-1") |
|
|
|
|
| |
|
|
|
|
| class _SalienceAgent(ManifestAgent): |
| manifest = AgentManifest( |
| name="recaller", |
| persona="p", |
| may_emit=["agent.spoke"], |
| memory=MemoryConfig(use_salience=True, salience_top_k=1), |
| ) |
|
|
|
|
| class TestRecallWiring: |
| def test_recall_uses_attached_index(self): |
| idx = _FakeIndex() |
| agent = _SalienceAgent(ModelRouter(offline=True), memory_index=idx) |
| from src.core.projections import StageProjection |
|
|
| events = ( |
| _event("world.observed", actor="n", turn=1, text="beacon glow signal", eid="hit"), |
| _event("world.observed", actor="n", turn=1, text="quiet empty room", eid="miss"), |
| ) |
| proj = StageProjection(current_scene="beacon glow") |
| out = agent._recall(turn=2, projection=proj, recent_events=events) |
| assert "beacon" in out |
| assert idx.store |
|
|
| def test_recall_without_index_is_keyword_path(self): |
| agent = _SalienceAgent(ModelRouter(offline=True)) |
| from src.core.projections import StageProjection |
|
|
| e = _event("world.observed", actor="n", turn=1, text="golden spores") |
| out = agent._recall(turn=2, projection=StageProjection(current_scene="golden spores"), recent_events=(e,)) |
| assert isinstance(out, str) and "golden" in out |
|
|
|
|
| |
|
|
|
|
| class TestMem0RoundTrip: |
| def test_index_then_search_recovers_event(self): |
| pytest.importorskip("mem0") |
| pytest.importorskip("sentence_transformers") |
| |
| |
| if not os.getenv("MEMORY_INDEX_E2E"): |
| pytest.skip("set MEMORY_INDEX_E2E=1 to run the local-embedder round-trip (downloads a model)") |
|
|
| backend = Mem0MemoryIndex() |
| ev = _event("world.observed", turn=1, text="golden spores drift over the glass forest", eid="rt1") |
| try: |
| backend.index((ev,)) |
| hits = backend.search("golden spores", k=5) |
| except Exception as exc: |
| pytest.skip(f"mem0 backend unavailable: {exc}") |
|
|
| assert any(h.id == "rt1" for h in hits) |
| |
| hit = next(h for h in hits if h.id == "rt1") |
| assert hit.kind == "world.observed" |
| assert hit.payload.get("text", "").startswith("golden spores") |
|
|
|
|
| |
|
|
|
|
| class TestMem0CloudRoundTrip: |
| def test_index_then_search_recovers_event(self): |
| pytest.importorskip("mem0") |
| |
| |
| if not (os.getenv("MEM0_API_KEY") and os.getenv("MEM0_CLOUD_E2E")): |
| pytest.skip("set MEM0_API_KEY and MEM0_CLOUD_E2E=1 to run the hosted round-trip") |
|
|
| backend = Mem0CloudIndex() |
| ev = _event("world.observed", turn=1, text="golden spores drift over the glass forest", eid="rtc1") |
| try: |
| backend.index((ev,)) |
| hits = backend.search("golden spores", k=5) |
| except Exception as exc: |
| pytest.skip(f"mem0 cloud unavailable: {exc}") |
|
|
| assert any(h.id == "rtc1" for h in hits) |
| hit = next(h for h in hits if h.id == "rtc1") |
| assert hit.kind == "world.observed" |
| assert hit.payload.get("text", "").startswith("golden spores") |
|
|