"""Cross-universe memory layer: embeddings, vector DB, knowledge graph, object store stubs.""" from typing import Dict, Any, List from .adapters.vector_adapter import InMemoryVectorIndex from .adapters.graph_adapter import NetworkXGraph, Neo4jAdapter try: from .adapters.vector_adapter_full import FaissIndex except Exception: FaissIndex = None import json import os import os as _os class MemoryManager: def __init__(self, storage_dir: str = "./memory_store"): # optionally use FAISS if requested if _os.environ.get("USE_FAISS", "0") == "1" and FaissIndex is not None: try: dim = int(_os.environ.get("FAISS_DIM", "32")) self.vindex = FaissIndex(dim) except Exception: self.vindex = InMemoryVectorIndex() else: self.vindex = InMemoryVectorIndex() self.kg = NetworkXGraph() self.storage_dir = storage_dir os.makedirs(self.storage_dir, exist_ok=True) # optionally try to connect to Neo4j if requested via env self.neo4j = None try: if _os.environ.get("USE_NEO4J", "0") == "1": self.neo4j = Neo4jAdapter(uri=_os.environ.get("NEO4J_URI"), user=_os.environ.get("NEO4J_USER"), password=_os.environ.get("NEO4J_PASSWORD")) if not self.neo4j.is_available(): self.neo4j = None except Exception: self.neo4j = None def index_universe(self, universe_id: str, embedding: List[float], metadata: Dict[str, Any] = None): self.vindex.upsert(universe_id, embedding) self.kg.add_node(universe_id, **(metadata or {})) if self.neo4j: try: props = {"id": universe_id} if metadata: props.update(metadata) self.neo4j.create_node(labels=["Universe"], props=props) except Exception: pass def snapshot_universe(self, universe_id: str, universe_obj: Any): path = os.path.join(self.storage_dir, f"{universe_id}.json") with open(path, "w", encoding="utf-8") as f: json.dump(universe_obj, f, default=str) return path def query_similar(self, embedding: List[float], top_k: int = 10): return self.vindex.search(embedding, top_k=top_k)