JatinAutonomousLabs commited on
Commit
414b83b
·
verified ·
1 Parent(s): cdf55aa

Update memory_manager.py

Browse files
Files changed (1) hide show
  1. memory_manager.py +329 -107
memory_manager.py CHANGED
@@ -1,5 +1,13 @@
 
 
1
  import os
2
  import shutil
 
 
 
 
 
 
3
  from langchain_huggingface import HuggingFaceEmbeddings
4
  from langchain_community.vectorstores import FAISS
5
  from langchain.docstore.document import Document
@@ -8,127 +16,341 @@ from langchain.docstore.document import Document
8
  MEMORY_DIR = "memory"
9
  INDEX_NAME = "faiss"
10
  MODEL_NAME = "all-MiniLM-L6-v2"
 
 
 
 
11
 
12
  class MemoryManager:
13
- def __init__(self):
14
- self.embeddings = HuggingFaceEmbeddings(model_name=MODEL_NAME)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
15
  self.vector_store = self._load_or_create_vector_store()
16
-
17
- def reset_memory(self):
18
- """Removes the memory directory and re-initializes a new, empty index."""
19
- if os.path.exists(MEMORY_DIR):
20
- shutil.rmtree(MEMORY_DIR)
21
- os.makedirs(MEMORY_DIR, exist_ok=True)
22
- print("🧠 Memory reset successfully.")
23
- self.vector_store = self._create_new_index()
24
-
25
- def _load_or_create_vector_store(self):
26
- """Loads FAISS index or creates a new one, handling potential corruption."""
27
- index_path = os.path.join(MEMORY_DIR, f"{INDEX_NAME}.faiss")
28
- if os.path.exists(index_path):
29
  try:
30
- print("🧠 Loading existing memory from disk...")
31
- return FAISS.load_local(
32
- folder_path=MEMORY_DIR,
33
- embeddings=self.embeddings,
34
- index_name=INDEX_NAME,
35
- allow_dangerous_deserialization=True
36
- )
37
  except Exception as e:
38
- print(f"⚠️ Error loading memory index: {e}. Rebuilding index.")
39
- shutil.rmtree(MEMORY_DIR)
40
- os.makedirs(MEMORY_DIR, exist_ok=True)
41
- return self._create_new_index()
42
- else:
43
- print("🧠 No existing memory found. Creating a new one.")
44
- return self._create_new_index()
45
-
46
- def _create_new_index(self):
47
- """Creates a fresh, empty FAISS index."""
48
- dummy_doc = [Document(page_content="Initial memory entry.")]
49
- # Note: If memory needs to be truly empty, use a small, persistent dummy doc
50
- # or handle an empty index creation if FAISS allows it. Keeping dummy for robustness.
51
- vs = FAISS.from_documents(dummy_doc, self.embeddings)
52
- vs.save_local(folder_path=MEMORY_DIR, index_name=INDEX_NAME)
53
- return vs
54
-
55
- def add_to_memory(self, text_to_add: str, metadata: dict):
56
- print(f"📝 Adding new memory: {text_to_add[:100]}...")
57
- doc = Document(page_content=text_to_add, metadata=metadata)
58
- self.vector_store.add_documents([doc])
59
- self.vector_store.save_local(folder_path=MEMORY_DIR, index_name=INDEX_NAME)
60
-
61
- def retrieve_relevant_memories(self, query: str, k: int = 5) -> list[Document]:
62
- print(f"🔍 Searching memory for: {query[:50]}...")
63
- return self.vector_store.similarity_search(query, k=k)
64
-
65
- # --- FIX: Instantiate the class globally to satisfy 'from memory_manager import memory_manager' ---
66
- # The name of the instance must match the name being imported by graph.py
67
- memory_manager = MemoryManager()
68
- import os
69
- import shutil
70
- from langchain_huggingface import HuggingFaceEmbeddings
71
- from langchain_community.vectorstores import FAISS
72
- from langchain.docstore.document import Document
73
-
74
- # --- Configuration ---
75
- MEMORY_DIR = "memory"
76
- INDEX_NAME = "faiss"
77
- MODEL_NAME = "all-MiniLM-L6-v2"
78
-
79
- class MemoryManager:
80
- def __init__(self):
81
- self.embeddings = HuggingFaceEmbeddings(model_name=MODEL_NAME)
82
- self.vector_store = self._load_or_create_vector_store()
83
-
84
- def reset_memory(self):
85
- """Removes the memory directory and re-initializes a new, empty index."""
86
- if os.path.exists(MEMORY_DIR):
87
- shutil.rmtree(MEMORY_DIR)
88
- os.makedirs(MEMORY_DIR, exist_ok=True)
89
- print("🧠 Memory reset successfully.")
90
  self.vector_store = self._create_new_index()
91
-
92
- def _load_or_create_vector_store(self):
93
- """Loads FAISS index or creates a new one, handling potential corruption."""
94
- index_path = os.path.join(MEMORY_DIR, f"{INDEX_NAME}.faiss")
 
 
 
 
 
 
 
 
 
 
 
 
 
95
  if os.path.exists(index_path):
96
  try:
97
- print("🧠 Loading existing memory from disk...")
98
- return FAISS.load_local(
99
- folder_path=MEMORY_DIR,
100
  embeddings=self.embeddings,
101
  index_name=INDEX_NAME,
102
- allow_dangerous_deserialization=True
103
  )
 
 
 
104
  except Exception as e:
105
- print(f"⚠️ Error loading memory index: {e}. Rebuilding index.")
106
- shutil.rmtree(MEMORY_DIR)
107
- os.makedirs(MEMORY_DIR, exist_ok=True)
 
 
 
 
 
 
 
 
108
  return self._create_new_index()
109
  else:
110
- print("🧠 No existing memory found. Creating a new one.")
111
  return self._create_new_index()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
112
 
113
- def _create_new_index(self):
114
- """Creates a fresh, empty FAISS index."""
115
- dummy_doc = [Document(page_content="Initial memory entry.")]
116
- # Note: If memory needs to be truly empty, use a small, persistent dummy doc
117
- # or handle an empty index creation if FAISS allows it. Keeping dummy for robustness.
118
- vs = FAISS.from_documents(dummy_doc, self.embeddings)
119
- vs.save_local(folder_path=MEMORY_DIR, index_name=INDEX_NAME)
120
- return vs
121
-
122
- def add_to_memory(self, text_to_add: str, metadata: dict):
123
- print(f"📝 Adding new memory: {text_to_add[:100]}...")
124
- doc = Document(page_content=text_to_add, metadata=metadata)
125
- self.vector_store.add_documents([doc])
126
- self.vector_store.save_local(folder_path=MEMORY_DIR, index_name=INDEX_NAME)
127
-
128
- def retrieve_relevant_memories(self, query: str, k: int = 5) -> list[Document]:
129
- print(f"🔍 Searching memory for: {query[:50]}...")
130
- return self.vector_store.similarity_search(query, k=k)
131
 
132
- # --- FIX: Instantiate the class globally to satisfy 'from memory_manager import memory_manager' ---
133
- # The name of the instance must match the name being imported by graph.py
134
- memory_manager = MemoryManager()
 
1
+ # memory_manager.py - Production-ready memory management with FAISS
2
+
3
  import os
4
  import shutil
5
+ import json
6
+ import hashlib
7
+ from datetime import datetime
8
+ from typing import List, Dict, Optional, Any
9
+ import logging
10
+
11
  from langchain_huggingface import HuggingFaceEmbeddings
12
  from langchain_community.vectorstores import FAISS
13
  from langchain.docstore.document import Document
 
16
  MEMORY_DIR = "memory"
17
  INDEX_NAME = "faiss"
18
  MODEL_NAME = "all-MiniLM-L6-v2"
19
+ METADATA_FILE = "memory_metadata.json"
20
+
21
+ # Get logger
22
+ logger = logging.getLogger(__name__)
23
 
24
  class MemoryManager:
25
+ """
26
+ Manages long-term memory using FAISS vector store with semantic search capabilities.
27
+ """
28
+
29
+ def __init__(self, memory_dir: str = MEMORY_DIR, model_name: str = MODEL_NAME):
30
+ """
31
+ Initialize the memory manager with embeddings and vector store.
32
+
33
+ Args:
34
+ memory_dir: Directory to store memory files
35
+ model_name: Name of the HuggingFace embedding model
36
+ """
37
+ self.memory_dir = memory_dir
38
+ self.model_name = model_name
39
+ self.metadata_path = os.path.join(memory_dir, METADATA_FILE)
40
+
41
+ logger.info(f"Initializing MemoryManager with model: {model_name}")
42
+
43
+ # Initialize embeddings
44
+ try:
45
+ self.embeddings = HuggingFaceEmbeddings(
46
+ model_name=model_name,
47
+ model_kwargs={'device': 'cpu'},
48
+ encode_kwargs={'normalize_embeddings': True}
49
+ )
50
+ logger.info("Embeddings model loaded successfully")
51
+ except Exception as e:
52
+ logger.error(f"Failed to load embeddings model: {e}")
53
+ raise
54
+
55
+ # Load or create vector store
56
  self.vector_store = self._load_or_create_vector_store()
57
+ self.metadata = self._load_metadata()
58
+
59
+ def _load_metadata(self) -> Dict[str, Any]:
60
+ """Load metadata about stored memories."""
61
+ if os.path.exists(self.metadata_path):
 
 
 
 
 
 
 
 
62
  try:
63
+ with open(self.metadata_path, 'r', encoding='utf-8') as f:
64
+ return json.load(f)
 
 
 
 
 
65
  except Exception as e:
66
+ logger.warning(f"Could not load metadata: {e}")
67
+
68
+ return {
69
+ "created_at": datetime.now().isoformat(),
70
+ "total_memories": 0,
71
+ "last_updated": None,
72
+ "memory_hashes": set()
73
+ }
74
+
75
+ def _save_metadata(self) -> None:
76
+ """Save metadata about stored memories."""
77
+ try:
78
+ # Convert set to list for JSON serialization
79
+ metadata_copy = self.metadata.copy()
80
+ if "memory_hashes" in metadata_copy and isinstance(metadata_copy["memory_hashes"], set):
81
+ metadata_copy["memory_hashes"] = list(metadata_copy["memory_hashes"])
82
+
83
+ with open(self.metadata_path, 'w', encoding='utf-8') as f:
84
+ json.dump(metadata_copy, f, indent=2)
85
+ except Exception as e:
86
+ logger.warning(f"Could not save metadata: {e}")
87
+
88
+ def reset_memory(self) -> bool:
89
+ """
90
+ Removes all memory data and re-initializes a new, empty index.
91
+
92
+ Returns:
93
+ True if successful, False otherwise
94
+ """
95
+ try:
96
+ if os.path.exists(self.memory_dir):
97
+ shutil.rmtree(self.memory_dir)
98
+ logger.info("Removed existing memory directory")
99
+
100
+ os.makedirs(self.memory_dir, exist_ok=True)
101
+ logger.info("Memory reset successfully")
102
+
103
+ # Reinitialize
 
 
 
 
 
 
 
 
 
 
 
 
 
 
104
  self.vector_store = self._create_new_index()
105
+ self.metadata = self._load_metadata()
106
+
107
+ return True
108
+
109
+ except Exception as e:
110
+ logger.error(f"Failed to reset memory: {e}")
111
+ return False
112
+
113
+ def _load_or_create_vector_store(self) -> FAISS:
114
+ """
115
+ Loads FAISS index from disk or creates a new one if not found or corrupted.
116
+
117
+ Returns:
118
+ FAISS vector store instance
119
+ """
120
+ index_path = os.path.join(self.memory_dir, f"{INDEX_NAME}.faiss")
121
+
122
  if os.path.exists(index_path):
123
  try:
124
+ logger.info("Loading existing memory from disk...")
125
+ vector_store = FAISS.load_local(
126
+ folder_path=self.memory_dir,
127
  embeddings=self.embeddings,
128
  index_name=INDEX_NAME,
129
+ allow_dangerous_deserialization=True # Required for loading
130
  )
131
+ logger.info("Successfully loaded existing memory index")
132
+ return vector_store
133
+
134
  except Exception as e:
135
+ logger.warning(f"Error loading memory index: {e}. Creating new index...")
136
+
137
+ # Backup corrupted index
138
+ backup_dir = os.path.join(self.memory_dir, f"backup_{datetime.now().strftime('%Y%m%d_%H%M%S')}")
139
+ try:
140
+ shutil.move(self.memory_dir, backup_dir)
141
+ logger.info(f"Corrupted index backed up to: {backup_dir}")
142
+ except Exception as backup_error:
143
+ logger.warning(f"Could not backup corrupted index: {backup_error}")
144
+
145
+ os.makedirs(self.memory_dir, exist_ok=True)
146
  return self._create_new_index()
147
  else:
148
+ logger.info("No existing memory found. Creating new index...")
149
  return self._create_new_index()
150
+
151
+ def _create_new_index(self) -> FAISS:
152
+ """
153
+ Creates a fresh, empty FAISS index.
154
+
155
+ Returns:
156
+ New FAISS vector store instance
157
+ """
158
+ os.makedirs(self.memory_dir, exist_ok=True)
159
+
160
+ # Create with a minimal initialization document
161
+ init_doc = Document(
162
+ page_content="System initialized.",
163
+ metadata={
164
+ "type": "system",
165
+ "timestamp": datetime.now().isoformat(),
166
+ "importance": 0.0
167
+ }
168
+ )
169
+
170
+ try:
171
+ vector_store = FAISS.from_documents([init_doc], self.embeddings)
172
+ vector_store.save_local(folder_path=self.memory_dir, index_name=INDEX_NAME)
173
+ logger.info("Created new memory index")
174
+ return vector_store
175
+
176
+ except Exception as e:
177
+ logger.error(f"Failed to create new index: {e}")
178
+ raise
179
+
180
+ def _compute_hash(self, text: str) -> str:
181
+ """Compute hash of text to detect duplicates."""
182
+ return hashlib.md5(text.encode('utf-8')).hexdigest()
183
+
184
+ def add_to_memory(
185
+ self,
186
+ text_to_add: str,
187
+ metadata: Optional[Dict[str, Any]] = None,
188
+ importance: float = 0.5,
189
+ check_duplicate: bool = True
190
+ ) -> bool:
191
+ """
192
+ Add new information to memory with metadata.
193
+
194
+ Args:
195
+ text_to_add: Text content to store
196
+ metadata: Additional metadata for the memory
197
+ importance: Importance score (0.0 to 1.0)
198
+ check_duplicate: Whether to check for duplicate memories
199
+
200
+ Returns:
201
+ True if memory was added, False if duplicate detected
202
+ """
203
+ if not text_to_add or not text_to_add.strip():
204
+ logger.warning("Attempted to add empty memory")
205
+ return False
206
+
207
+ # Check for duplicates
208
+ text_hash = self._compute_hash(text_to_add)
209
+ if check_duplicate:
210
+ if "memory_hashes" not in self.metadata:
211
+ self.metadata["memory_hashes"] = set()
212
+ elif not isinstance(self.metadata["memory_hashes"], set):
213
+ self.metadata["memory_hashes"] = set(self.metadata["memory_hashes"])
214
+
215
+ if text_hash in self.metadata["memory_hashes"]:
216
+ logger.debug(f"Duplicate memory detected, skipping: {text_to_add[:50]}...")
217
+ return False
218
+
219
+ # Prepare metadata
220
+ if metadata is None:
221
+ metadata = {}
222
+
223
+ metadata.update({
224
+ "timestamp": datetime.now().isoformat(),
225
+ "importance": max(0.0, min(1.0, importance)), # Clamp between 0 and 1
226
+ "hash": text_hash,
227
+ "length": len(text_to_add)
228
+ })
229
+
230
+ # Add to vector store
231
+ try:
232
+ logger.info(f"Adding new memory: {text_to_add[:100]}...")
233
+ doc = Document(page_content=text_to_add, metadata=metadata)
234
+ self.vector_store.add_documents([doc])
235
+ self.vector_store.save_local(folder_path=self.memory_dir, index_name=INDEX_NAME)
236
+
237
+ # Update metadata
238
+ if "memory_hashes" not in self.metadata:
239
+ self.metadata["memory_hashes"] = set()
240
+ elif not isinstance(self.metadata["memory_hashes"], set):
241
+ self.metadata["memory_hashes"] = set(self.metadata["memory_hashes"])
242
+
243
+ self.metadata["memory_hashes"].add(text_hash)
244
+ self.metadata["total_memories"] = self.metadata.get("total_memories", 0) + 1
245
+ self.metadata["last_updated"] = datetime.now().isoformat()
246
+ self._save_metadata()
247
+
248
+ logger.info("Memory added successfully")
249
+ return True
250
+
251
+ except Exception as e:
252
+ logger.error(f"Failed to add memory: {e}")
253
+ return False
254
+
255
+ def retrieve_relevant_memories(
256
+ self,
257
+ query: str,
258
+ k: int = 5,
259
+ score_threshold: Optional[float] = None,
260
+ filter_metadata: Optional[Dict[str, Any]] = None
261
+ ) -> List[Document]:
262
+ """
263
+ Retrieve memories relevant to a query using semantic search.
264
+
265
+ Args:
266
+ query: Search query
267
+ k: Number of results to return
268
+ score_threshold: Minimum similarity score (if supported by vector store)
269
+ filter_metadata: Metadata filters to apply
270
+
271
+ Returns:
272
+ List of relevant documents
273
+ """
274
+ if not query or not query.strip():
275
+ logger.warning("Empty query for memory retrieval")
276
+ return []
277
+
278
+ try:
279
+ logger.info(f"Searching memory for: {query[:50]}...")
280
+
281
+ # Perform similarity search
282
+ if filter_metadata:
283
+ # Note: FAISS doesn't natively support metadata filtering
284
+ # This would need custom implementation
285
+ results = self.vector_store.similarity_search(query, k=k*2) # Get extra to filter
286
+
287
+ # Manual filtering
288
+ filtered = []
289
+ for doc in results:
290
+ match = all(
291
+ doc.metadata.get(key) == value
292
+ for key, value in filter_metadata.items()
293
+ )
294
+ if match:
295
+ filtered.append(doc)
296
+ if len(filtered) >= k:
297
+ break
298
+ results = filtered
299
+ else:
300
+ results = self.vector_store.similarity_search(query, k=k)
301
+
302
+ logger.info(f"Retrieved {len(results)} relevant memories")
303
+ return results
304
+
305
+ except Exception as e:
306
+ logger.error(f"Memory retrieval failed: {e}")
307
+ return []
308
+
309
+ def get_memory_stats(self) -> Dict[str, Any]:
310
+ """
311
+ Get statistics about the memory store.
312
+
313
+ Returns:
314
+ Dictionary with memory statistics
315
+ """
316
+ stats = {
317
+ "total_memories": self.metadata.get("total_memories", 0),
318
+ "created_at": self.metadata.get("created_at"),
319
+ "last_updated": self.metadata.get("last_updated"),
320
+ "memory_dir_size_mb": 0.0
321
+ }
322
+
323
+ # Calculate directory size
324
+ if os.path.exists(self.memory_dir):
325
+ total_size = 0
326
+ for dirpath, _, filenames in os.walk(self.memory_dir):
327
+ for filename in filenames:
328
+ filepath = os.path.join(dirpath, filename)
329
+ try:
330
+ total_size += os.path.getsize(filepath)
331
+ except Exception:
332
+ pass
333
+ stats["memory_dir_size_mb"] = round(total_size / (1024 * 1024), 2)
334
+
335
+ return stats
336
+
337
+ def cleanup_old_memories(self, days_to_keep: int = 30) -> int:
338
+ """
339
+ Remove memories older than specified days (not implemented for FAISS).
340
+
341
+ Args:
342
+ days_to_keep: Number of days to keep memories
343
+
344
+ Returns:
345
+ Number of memories removed
346
+ """
347
+ # FAISS doesn't support selective deletion easily
348
+ # This would require rebuilding the entire index
349
+ logger.warning("Memory cleanup not implemented for FAISS backend")
350
+ return 0
351
 
352
+ # Create singleton instance for import compatibility
353
+ memory_manager = MemoryManager()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
354
 
355
+ # Export the instance for backward compatibility
356
+ __all__ = ['MemoryManager', 'memory_manager']