"""Adapter for MemVerse — uses build_memory for storage + cosine retrieval.""" from __future__ import annotations import json import os import sys import shutil import tempfile from pathlib import Path from typing import Any import numpy as np from dotenv import load_dotenv load_dotenv(Path(__file__).resolve().parents[2] / ".env") from eval_framework.datasets.schemas import ( MemoryDeltaRecord, MemorySnapshotRecord, NormalizedTurn, RetrievalItem, RetrievalRecord, ) from eval_framework.memory_adapters.base import MemoryAdapter _DEFAULT_SOURCE = Path("/data1/toby/nips26/baselines/MemVerse") class MemVerseAdapter(MemoryAdapter): """Adapter for MemVerse using build_memory + cosine retrieval. Bypasses the async orchestrator/LightRAG and uses the core memory building + embedding-based retrieval directly. """ def __init__( self, *, source_root: str | os.PathLike[str] | None = None, **kwargs: Any, ) -> None: root = Path(source_root or _DEFAULT_SOURCE).resolve() if str(root) not in sys.path: sys.path.insert(0, str(root)) from openai import OpenAI self._client = OpenAI( api_key=os.getenv("OPENAI_API_KEY"), base_url=os.getenv("OPENAI_BASE_URL"), ) self._model = os.getenv("OPENAI_MODEL") or "gpt-4o" # Working directory for memory files self._work_dir = Path(tempfile.mkdtemp(prefix="memverse_eval_")) self._root = root self._session_id = "" self._prev_snapshot_ids: set[str] = set() self._memories: list[dict[str, Any]] = [] # {id, text, embedding, output} self._conversation: list[dict[str, Any]] = [] self._turn_counter = 0 # Load system prompts for memory agents self._prompts: dict[str, str] = {} for name in ["core_memory_agent", "episodic_memory_agent", "semantic_memory_agent"]: prompt_path = root / "MemoryKB" / "Long_Term_Memory" / "system" / f"{name}.txt" if prompt_path.exists(): self._prompts[name] = prompt_path.read_text(encoding="utf-8").strip() def _get_embedding(self, text: str) -> np.ndarray: resp = self._client.embeddings.create( model="text-embedding-3-small", input=text, ) return np.array(resp.data[0].embedding) def _cosine_sim(self, a: np.ndarray, b: np.ndarray) -> float: norm = np.linalg.norm(a) * np.linalg.norm(b) if norm == 0: return 0.0 return float(np.dot(a, b) / norm) def reset(self) -> None: self._memories = [] self._conversation = [] self._prev_snapshot_ids = set() self._turn_counter = 0 if self._work_dir.exists(): shutil.rmtree(self._work_dir, ignore_errors=True) self._work_dir = Path(tempfile.mkdtemp(prefix="memverse_eval_")) def ingest_turn(self, turn: NormalizedTurn) -> None: self._session_id = turn.session_id text = f"{turn.role}: {turn.text}" for att in turn.attachments: text += f"\n[{att.type}] {att.caption}" entry_id = f"turn_{self._turn_counter}" self._turn_counter += 1 entry = { "id": entry_id, "query": text, "videocaption": None, "audiocaption": None, "imagecaption": None, } self._conversation.append(entry) # Build memory: get embedding + LLM extraction for each memory type embedding = self._get_embedding(text) # Use the first available prompt (core memory agent) for extraction prompt = next(iter(self._prompts.values()), "Extract key facts from this text.") try: resp = self._client.chat.completions.create( model=self._model, messages=[ {"role": "system", "content": prompt}, {"role": "user", "content": text}, ], temperature=0, max_tokens=512, ) output = resp.choices[0].message.content or "" except Exception: output = text self._memories.append({ "id": entry_id, "text": text, "output": output, "embedding": embedding, "session_id": turn.session_id, }) def end_session(self, session_id: str) -> None: self._session_id = session_id def snapshot_memories(self) -> list[MemorySnapshotRecord]: return [ MemorySnapshotRecord( memory_id=m["id"], text=m["output"], session_id=m.get("session_id", self._session_id), status="active", source="MemVerse", raw_backend_id=m["id"], raw_backend_type="memverse", metadata={}, ) for m in self._memories ] def export_memory_delta(self, session_id: str) -> list[MemoryDeltaRecord]: current = self.snapshot_memories() current_ids = {s.memory_id for s in current} deltas = [ MemoryDeltaRecord( session_id=session_id, op="add", text=s.text, linked_previous=(), raw_backend_id=s.raw_backend_id, metadata={"baseline": "MemVerse"}, ) for s in current if s.memory_id not in self._prev_snapshot_ids ] self._prev_snapshot_ids = current_ids return deltas def retrieve(self, query: str, top_k: int) -> RetrievalRecord: if not self._memories: return RetrievalRecord(query=query, top_k=top_k, items=[], raw_trace={}) query_emb = self._get_embedding(query) scored = [] for m in self._memories: sim = self._cosine_sim(query_emb, m["embedding"]) scored.append((sim, m)) scored.sort(key=lambda x: x[0], reverse=True) items = [ RetrievalItem( rank=i, memory_id=m["id"], text=m["output"], score=float(sim), raw_backend_id=m["id"], ) for i, (sim, m) in enumerate(scored[:top_k]) ] return RetrievalRecord( query=query, top_k=top_k, items=items, raw_trace={"baseline": "MemVerse"}, ) def get_capabilities(self) -> dict[str, Any]: return { "backend": "MemVerse", "baseline": "MemVerse", "available": True, "delta_granularity": "per_turn", "snapshot_mode": "full", }