eval_framework / memory_adapters /memverse_adapter.py
LCZZZZ's picture
Upload eval_framework source code
85b19cf verified
"""Adapter for MemVerse — uses build_memory for storage + cosine retrieval."""
from __future__ import annotations
import json
import os
import sys
import shutil
import tempfile
from pathlib import Path
from typing import Any
import numpy as np
from dotenv import load_dotenv
load_dotenv(Path(__file__).resolve().parents[2] / ".env")
from eval_framework.datasets.schemas import (
MemoryDeltaRecord,
MemorySnapshotRecord,
NormalizedTurn,
RetrievalItem,
RetrievalRecord,
)
from eval_framework.memory_adapters.base import MemoryAdapter
_DEFAULT_SOURCE = Path("/data1/toby/nips26/baselines/MemVerse")
class MemVerseAdapter(MemoryAdapter):
"""Adapter for MemVerse using build_memory + cosine retrieval.
Bypasses the async orchestrator/LightRAG and uses the core
memory building + embedding-based retrieval directly.
"""
def __init__(
self,
*,
source_root: str | os.PathLike[str] | None = None,
**kwargs: Any,
) -> None:
root = Path(source_root or _DEFAULT_SOURCE).resolve()
if str(root) not in sys.path:
sys.path.insert(0, str(root))
from openai import OpenAI
self._client = OpenAI(
api_key=os.getenv("OPENAI_API_KEY"),
base_url=os.getenv("OPENAI_BASE_URL"),
)
self._model = os.getenv("OPENAI_MODEL") or "gpt-4o"
# Working directory for memory files
self._work_dir = Path(tempfile.mkdtemp(prefix="memverse_eval_"))
self._root = root
self._session_id = ""
self._prev_snapshot_ids: set[str] = set()
self._memories: list[dict[str, Any]] = [] # {id, text, embedding, output}
self._conversation: list[dict[str, Any]] = []
self._turn_counter = 0
# Load system prompts for memory agents
self._prompts: dict[str, str] = {}
for name in ["core_memory_agent", "episodic_memory_agent", "semantic_memory_agent"]:
prompt_path = root / "MemoryKB" / "Long_Term_Memory" / "system" / f"{name}.txt"
if prompt_path.exists():
self._prompts[name] = prompt_path.read_text(encoding="utf-8").strip()
def _get_embedding(self, text: str) -> np.ndarray:
resp = self._client.embeddings.create(
model="text-embedding-3-small",
input=text,
)
return np.array(resp.data[0].embedding)
def _cosine_sim(self, a: np.ndarray, b: np.ndarray) -> float:
norm = np.linalg.norm(a) * np.linalg.norm(b)
if norm == 0:
return 0.0
return float(np.dot(a, b) / norm)
def reset(self) -> None:
self._memories = []
self._conversation = []
self._prev_snapshot_ids = set()
self._turn_counter = 0
if self._work_dir.exists():
shutil.rmtree(self._work_dir, ignore_errors=True)
self._work_dir = Path(tempfile.mkdtemp(prefix="memverse_eval_"))
def ingest_turn(self, turn: NormalizedTurn) -> None:
self._session_id = turn.session_id
text = f"{turn.role}: {turn.text}"
for att in turn.attachments:
text += f"\n[{att.type}] {att.caption}"
entry_id = f"turn_{self._turn_counter}"
self._turn_counter += 1
entry = {
"id": entry_id,
"query": text,
"videocaption": None,
"audiocaption": None,
"imagecaption": None,
}
self._conversation.append(entry)
# Build memory: get embedding + LLM extraction for each memory type
embedding = self._get_embedding(text)
# Use the first available prompt (core memory agent) for extraction
prompt = next(iter(self._prompts.values()), "Extract key facts from this text.")
try:
resp = self._client.chat.completions.create(
model=self._model,
messages=[
{"role": "system", "content": prompt},
{"role": "user", "content": text},
],
temperature=0,
max_tokens=512,
)
output = resp.choices[0].message.content or ""
except Exception:
output = text
self._memories.append({
"id": entry_id,
"text": text,
"output": output,
"embedding": embedding,
"session_id": turn.session_id,
})
def end_session(self, session_id: str) -> None:
self._session_id = session_id
def snapshot_memories(self) -> list[MemorySnapshotRecord]:
return [
MemorySnapshotRecord(
memory_id=m["id"],
text=m["output"],
session_id=m.get("session_id", self._session_id),
status="active",
source="MemVerse",
raw_backend_id=m["id"],
raw_backend_type="memverse",
metadata={},
)
for m in self._memories
]
def export_memory_delta(self, session_id: str) -> list[MemoryDeltaRecord]:
current = self.snapshot_memories()
current_ids = {s.memory_id for s in current}
deltas = [
MemoryDeltaRecord(
session_id=session_id, op="add", text=s.text,
linked_previous=(), raw_backend_id=s.raw_backend_id,
metadata={"baseline": "MemVerse"},
)
for s in current if s.memory_id not in self._prev_snapshot_ids
]
self._prev_snapshot_ids = current_ids
return deltas
def retrieve(self, query: str, top_k: int) -> RetrievalRecord:
if not self._memories:
return RetrievalRecord(query=query, top_k=top_k, items=[], raw_trace={})
query_emb = self._get_embedding(query)
scored = []
for m in self._memories:
sim = self._cosine_sim(query_emb, m["embedding"])
scored.append((sim, m))
scored.sort(key=lambda x: x[0], reverse=True)
items = [
RetrievalItem(
rank=i,
memory_id=m["id"],
text=m["output"],
score=float(sim),
raw_backend_id=m["id"],
)
for i, (sim, m) in enumerate(scored[:top_k])
]
return RetrievalRecord(
query=query, top_k=top_k, items=items,
raw_trace={"baseline": "MemVerse"},
)
def get_capabilities(self) -> dict[str, Any]:
return {
"backend": "MemVerse",
"baseline": "MemVerse",
"available": True,
"delta_granularity": "per_turn",
"snapshot_mode": "full",
}