File size: 2,403 Bytes
b657fcc
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
"""Cross-universe memory layer: embeddings, vector DB, knowledge graph, object store stubs."""
from typing import Dict, Any, List
from .adapters.vector_adapter import InMemoryVectorIndex
from .adapters.graph_adapter import NetworkXGraph, Neo4jAdapter
try:
    from .adapters.vector_adapter_full import FaissIndex
except Exception:
    FaissIndex = None
import json
import os
import os as _os


class MemoryManager:
    def __init__(self, storage_dir: str = "./memory_store"):
        # optionally use FAISS if requested
        if _os.environ.get("USE_FAISS", "0") == "1" and FaissIndex is not None:
            try:
                dim = int(_os.environ.get("FAISS_DIM", "32"))
                self.vindex = FaissIndex(dim)
            except Exception:
                self.vindex = InMemoryVectorIndex()
        else:
            self.vindex = InMemoryVectorIndex()
        self.kg = NetworkXGraph()
        self.storage_dir = storage_dir
        os.makedirs(self.storage_dir, exist_ok=True)
        # optionally try to connect to Neo4j if requested via env
        self.neo4j = None
        try:
            if _os.environ.get("USE_NEO4J", "0") == "1":
                self.neo4j = Neo4jAdapter(uri=_os.environ.get("NEO4J_URI"), user=_os.environ.get("NEO4J_USER"), password=_os.environ.get("NEO4J_PASSWORD"))
                if not self.neo4j.is_available():
                    self.neo4j = None
        except Exception:
            self.neo4j = None

    def index_universe(self, universe_id: str, embedding: List[float], metadata: Dict[str, Any] = None):
        self.vindex.upsert(universe_id, embedding)
        self.kg.add_node(universe_id, **(metadata or {}))
        if self.neo4j:
            try:
                props = {"id": universe_id}
                if metadata:
                    props.update(metadata)
                self.neo4j.create_node(labels=["Universe"], props=props)
            except Exception:
                pass

    def snapshot_universe(self, universe_id: str, universe_obj: Any):
        path = os.path.join(self.storage_dir, f"{universe_id}.json")
        with open(path, "w", encoding="utf-8") as f:
            json.dump(universe_obj, f, default=str)
        return path

    def query_similar(self, embedding: List[float], top_k: int = 10):
        return self.vindex.search(embedding, top_k=top_k)