| """Adapter for MemVerse — uses build_memory for storage + cosine retrieval.""" |
|
|
| from __future__ import annotations |
|
|
| import json |
| import os |
| import sys |
| import shutil |
| import tempfile |
| from pathlib import Path |
| from typing import Any |
|
|
| import numpy as np |
| from dotenv import load_dotenv |
|
|
| load_dotenv(Path(__file__).resolve().parents[2] / ".env") |
|
|
| from eval_framework.datasets.schemas import ( |
| MemoryDeltaRecord, |
| MemorySnapshotRecord, |
| NormalizedTurn, |
| RetrievalItem, |
| RetrievalRecord, |
| ) |
| from eval_framework.memory_adapters.base import MemoryAdapter |
|
|
| _DEFAULT_SOURCE = Path("/data1/toby/nips26/baselines/MemVerse") |
|
|
|
|
| class MemVerseAdapter(MemoryAdapter): |
| """Adapter for MemVerse using build_memory + cosine retrieval. |
| |
| Bypasses the async orchestrator/LightRAG and uses the core |
| memory building + embedding-based retrieval directly. |
| """ |
|
|
| def __init__( |
| self, |
| *, |
| source_root: str | os.PathLike[str] | None = None, |
| **kwargs: Any, |
| ) -> None: |
| root = Path(source_root or _DEFAULT_SOURCE).resolve() |
| if str(root) not in sys.path: |
| sys.path.insert(0, str(root)) |
|
|
| from openai import OpenAI |
|
|
| self._client = OpenAI( |
| api_key=os.getenv("OPENAI_API_KEY"), |
| base_url=os.getenv("OPENAI_BASE_URL"), |
| ) |
| self._model = os.getenv("OPENAI_MODEL") or "gpt-4o" |
|
|
| |
| self._work_dir = Path(tempfile.mkdtemp(prefix="memverse_eval_")) |
| self._root = root |
| self._session_id = "" |
| self._prev_snapshot_ids: set[str] = set() |
| self._memories: list[dict[str, Any]] = [] |
| self._conversation: list[dict[str, Any]] = [] |
| self._turn_counter = 0 |
|
|
| |
| self._prompts: dict[str, str] = {} |
| for name in ["core_memory_agent", "episodic_memory_agent", "semantic_memory_agent"]: |
| prompt_path = root / "MemoryKB" / "Long_Term_Memory" / "system" / f"{name}.txt" |
| if prompt_path.exists(): |
| self._prompts[name] = prompt_path.read_text(encoding="utf-8").strip() |
|
|
| def _get_embedding(self, text: str) -> np.ndarray: |
| resp = self._client.embeddings.create( |
| model="text-embedding-3-small", |
| input=text, |
| ) |
| return np.array(resp.data[0].embedding) |
|
|
| def _cosine_sim(self, a: np.ndarray, b: np.ndarray) -> float: |
| norm = np.linalg.norm(a) * np.linalg.norm(b) |
| if norm == 0: |
| return 0.0 |
| return float(np.dot(a, b) / norm) |
|
|
| def reset(self) -> None: |
| self._memories = [] |
| self._conversation = [] |
| self._prev_snapshot_ids = set() |
| self._turn_counter = 0 |
| if self._work_dir.exists(): |
| shutil.rmtree(self._work_dir, ignore_errors=True) |
| self._work_dir = Path(tempfile.mkdtemp(prefix="memverse_eval_")) |
|
|
| def ingest_turn(self, turn: NormalizedTurn) -> None: |
| self._session_id = turn.session_id |
| text = f"{turn.role}: {turn.text}" |
| for att in turn.attachments: |
| text += f"\n[{att.type}] {att.caption}" |
|
|
| entry_id = f"turn_{self._turn_counter}" |
| self._turn_counter += 1 |
|
|
| entry = { |
| "id": entry_id, |
| "query": text, |
| "videocaption": None, |
| "audiocaption": None, |
| "imagecaption": None, |
| } |
| self._conversation.append(entry) |
|
|
| |
| embedding = self._get_embedding(text) |
|
|
| |
| prompt = next(iter(self._prompts.values()), "Extract key facts from this text.") |
| try: |
| resp = self._client.chat.completions.create( |
| model=self._model, |
| messages=[ |
| {"role": "system", "content": prompt}, |
| {"role": "user", "content": text}, |
| ], |
| temperature=0, |
| max_tokens=512, |
| ) |
| output = resp.choices[0].message.content or "" |
| except Exception: |
| output = text |
|
|
| self._memories.append({ |
| "id": entry_id, |
| "text": text, |
| "output": output, |
| "embedding": embedding, |
| "session_id": turn.session_id, |
| }) |
|
|
| def end_session(self, session_id: str) -> None: |
| self._session_id = session_id |
|
|
| def snapshot_memories(self) -> list[MemorySnapshotRecord]: |
| return [ |
| MemorySnapshotRecord( |
| memory_id=m["id"], |
| text=m["output"], |
| session_id=m.get("session_id", self._session_id), |
| status="active", |
| source="MemVerse", |
| raw_backend_id=m["id"], |
| raw_backend_type="memverse", |
| metadata={}, |
| ) |
| for m in self._memories |
| ] |
|
|
| def export_memory_delta(self, session_id: str) -> list[MemoryDeltaRecord]: |
| current = self.snapshot_memories() |
| current_ids = {s.memory_id for s in current} |
| deltas = [ |
| MemoryDeltaRecord( |
| session_id=session_id, op="add", text=s.text, |
| linked_previous=(), raw_backend_id=s.raw_backend_id, |
| metadata={"baseline": "MemVerse"}, |
| ) |
| for s in current if s.memory_id not in self._prev_snapshot_ids |
| ] |
| self._prev_snapshot_ids = current_ids |
| return deltas |
|
|
| def retrieve(self, query: str, top_k: int) -> RetrievalRecord: |
| if not self._memories: |
| return RetrievalRecord(query=query, top_k=top_k, items=[], raw_trace={}) |
|
|
| query_emb = self._get_embedding(query) |
| scored = [] |
| for m in self._memories: |
| sim = self._cosine_sim(query_emb, m["embedding"]) |
| scored.append((sim, m)) |
| scored.sort(key=lambda x: x[0], reverse=True) |
|
|
| items = [ |
| RetrievalItem( |
| rank=i, |
| memory_id=m["id"], |
| text=m["output"], |
| score=float(sim), |
| raw_backend_id=m["id"], |
| ) |
| for i, (sim, m) in enumerate(scored[:top_k]) |
| ] |
| return RetrievalRecord( |
| query=query, top_k=top_k, items=items, |
| raw_trace={"baseline": "MemVerse"}, |
| ) |
|
|
| def get_capabilities(self) -> dict[str, Any]: |
| return { |
| "backend": "MemVerse", |
| "baseline": "MemVerse", |
| "available": True, |
| "delta_granularity": "per_turn", |
| "snapshot_mode": "full", |
| } |
|
|