|
|
"""Cross-universe memory layer: embeddings, vector DB, knowledge graph, object store stubs."""
|
|
|
from typing import Dict, Any, List
|
|
|
from .adapters.vector_adapter import InMemoryVectorIndex
|
|
|
from .adapters.graph_adapter import NetworkXGraph, Neo4jAdapter
|
|
|
try:
|
|
|
from .adapters.vector_adapter_full import FaissIndex
|
|
|
except Exception:
|
|
|
FaissIndex = None
|
|
|
import json
|
|
|
import os
|
|
|
import os as _os
|
|
|
|
|
|
|
|
|
class MemoryManager:
|
|
|
def __init__(self, storage_dir: str = "./memory_store"):
|
|
|
|
|
|
if _os.environ.get("USE_FAISS", "0") == "1" and FaissIndex is not None:
|
|
|
try:
|
|
|
dim = int(_os.environ.get("FAISS_DIM", "32"))
|
|
|
self.vindex = FaissIndex(dim)
|
|
|
except Exception:
|
|
|
self.vindex = InMemoryVectorIndex()
|
|
|
else:
|
|
|
self.vindex = InMemoryVectorIndex()
|
|
|
self.kg = NetworkXGraph()
|
|
|
self.storage_dir = storage_dir
|
|
|
os.makedirs(self.storage_dir, exist_ok=True)
|
|
|
|
|
|
self.neo4j = None
|
|
|
try:
|
|
|
if _os.environ.get("USE_NEO4J", "0") == "1":
|
|
|
self.neo4j = Neo4jAdapter(uri=_os.environ.get("NEO4J_URI"), user=_os.environ.get("NEO4J_USER"), password=_os.environ.get("NEO4J_PASSWORD"))
|
|
|
if not self.neo4j.is_available():
|
|
|
self.neo4j = None
|
|
|
except Exception:
|
|
|
self.neo4j = None
|
|
|
|
|
|
def index_universe(self, universe_id: str, embedding: List[float], metadata: Dict[str, Any] = None):
|
|
|
self.vindex.upsert(universe_id, embedding)
|
|
|
self.kg.add_node(universe_id, **(metadata or {}))
|
|
|
if self.neo4j:
|
|
|
try:
|
|
|
props = {"id": universe_id}
|
|
|
if metadata:
|
|
|
props.update(metadata)
|
|
|
self.neo4j.create_node(labels=["Universe"], props=props)
|
|
|
except Exception:
|
|
|
pass
|
|
|
|
|
|
def snapshot_universe(self, universe_id: str, universe_obj: Any):
|
|
|
path = os.path.join(self.storage_dir, f"{universe_id}.json")
|
|
|
with open(path, "w", encoding="utf-8") as f:
|
|
|
json.dump(universe_obj, f, default=str)
|
|
|
return path
|
|
|
|
|
|
def query_similar(self, embedding: List[float], top_k: int = 10):
|
|
|
return self.vindex.search(embedding, top_k=top_k)
|
|
|
|