| """ |
| MemoryAgent - Persistent storage with FAISS vector search |
| UPDATED MODULE - Added FAISS indexing while keeping JSON metadata |
| """ |
|
|
| import json |
| import os |
| import numpy as np |
| from datetime import datetime |
| from sentence_transformers import SentenceTransformer |
|
|
|
|
| class MemoryAgent: |
| def __init__(self, memory_file="data/memory.json", faiss_index_file="data/memory.faiss"): |
| """ |
| Initialize memory with JSON + FAISS hybrid storage |
| |
| KEPT: |
| - JSON for metadata (timestamps, descriptions, importance) |
| - sentence-transformers for text embeddings |
| |
| ADDED: |
| - FAISS for fast vector similarity search |
| - Image embeddings storage |
| """ |
| self.memory_file = memory_file |
| self.faiss_index_file = faiss_index_file |
| |
| |
| self.text_model = SentenceTransformer("all-MiniLM-L6-v2") |
| |
| |
| self.faiss_available = False |
| self.index = None |
| self.embedding_dim = 512 |
| |
| try: |
| import faiss |
| self.faiss = faiss |
| self.faiss_available = True |
| print("[MemoryAgent] FAISS available") |
| except ImportError: |
| print("[MemoryAgent] WARNING: FAISS not installed. Using fallback.") |
| print("Install with: pip install faiss-cpu") |
| |
| |
| self.memories = [] |
| self._load() |
| |
| |
| if self.faiss_available: |
| self._init_faiss_index() |
|
|
| def _load(self): |
| """Load memories from JSON (KEPT)""" |
| try: |
| with open(self.memory_file, "r") as f: |
| self.memories = json.load(f) |
| print(f"[MemoryAgent] Loaded {len(self.memories)} memories") |
| except FileNotFoundError: |
| self.memories = [] |
| print("[MemoryAgent] No existing memory file, starting fresh") |
| except Exception as e: |
| print(f"[MemoryAgent] Error loading memories: {e}") |
| self.memories = [] |
|
|
| def _save(self): |
| """Save memories to JSON (KEPT)""" |
| os.makedirs(os.path.dirname(self.memory_file), exist_ok=True) |
| with open(self.memory_file, "w") as f: |
| json.dump(self.memories, f, indent=2) |
|
|
| def _init_faiss_index(self): |
| """Initialize or load FAISS index (NEW)""" |
| if not self.faiss_available: |
| return |
| |
| |
| if os.path.exists(self.faiss_index_file): |
| try: |
| self.index = self.faiss.read_index(self.faiss_index_file) |
| print(f"[MemoryAgent] Loaded FAISS index with {self.index.ntotal} vectors") |
| return |
| except Exception as e: |
| print(f"[MemoryAgent] Error loading FAISS index: {e}") |
| |
| |
| self.index = self.faiss.IndexFlatIP(self.embedding_dim) |
| print("[MemoryAgent] Created new FAISS index") |
|
|
| def _save_faiss_index(self): |
| """Save FAISS index to disk (NEW)""" |
| if self.faiss_available and self.index is not None: |
| os.makedirs(os.path.dirname(self.faiss_index_file), exist_ok=True) |
| self.faiss.write_index(self.index, self.faiss_index_file) |
|
|
| @staticmethod |
| def compute_importance(description): |
| """Compute importance score (KEPT)""" |
| desc = description.lower() |
| score = 1 |
|
|
| if "person" in desc: |
| score += 2 |
|
|
| if any(obj in desc for obj in ["phone", "bag", "book", "device"]): |
| score += 1 |
|
|
| if any(act in desc for act in ["entered", "left", "holding", "walking"]): |
| score += 2 |
| |
| |
| if "text visible" in desc: |
| score += 1 |
|
|
| return score |
|
|
| def add(self, description, image_embedding=None): |
| """ |
| Add memory with both text and image embeddings |
| |
| KEPT: |
| - Text embedding via sentence-transformers |
| - Importance scoring |
| - JSON storage |
| |
| ADDED: |
| - Image embedding storage |
| - FAISS indexing |
| """ |
| |
| text_embedding = self.text_model.encode(description).tolist() |
| importance = MemoryAgent.compute_importance(description) |
|
|
| memory = { |
| "id": len(self.memories), |
| "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), |
| "description": description, |
| "text_embedding": text_embedding, |
| "importance": importance |
| } |
| |
| |
| if image_embedding is not None: |
| memory["image_embedding"] = image_embedding.tolist() |
| |
| |
| if self.faiss_available and self.index is not None: |
| |
| norm = np.linalg.norm(image_embedding) |
| if norm > 0: |
| normalized = image_embedding / norm |
| self.index.add(normalized.reshape(1, -1).astype('float32')) |
|
|
| self.memories.append(memory) |
| self._save() |
| |
| if self.faiss_available: |
| self._save_faiss_index() |
| |
| print(f"[MemoryAgent] Added memory #{memory['id']}") |
|
|
| def recall_last(self): |
| """Recall most recent memory (KEPT)""" |
| if not self.memories: |
| return None |
| return self.memories[-1] |
|
|
| def recall_all(self): |
| """Return all memories (KEPT)""" |
| return self.memories |
|
|
| def search_by_image(self, query_embedding, k=5): |
| """ |
| Search memories by image similarity using FAISS (NEW) |
| |
| Args: |
| query_embedding: Image embedding vector |
| k: Number of results to return |
| |
| Returns: |
| list: Top-k matching memories with scores |
| """ |
| if not self.faiss_available or self.index is None or self.index.ntotal == 0: |
| return [] |
| |
| |
| norm = np.linalg.norm(query_embedding) |
| if norm > 0: |
| query_embedding = query_embedding / norm |
| |
| |
| query_vector = query_embedding.reshape(1, -1).astype('float32') |
| scores, indices = self.index.search(query_vector, min(k, self.index.ntotal)) |
| |
| |
| results = [] |
| for score, idx in zip(scores[0], indices[0]): |
| if idx < len(self.memories): |
| results.append({ |
| "memory": self.memories[idx], |
| "similarity": float(score) |
| }) |
| |
| return results |
|
|
| def search_by_text(self, query_text, threshold=0.45): |
| """ |
| Search memories by text similarity (KEPT, using text embeddings) |
| |
| Args: |
| query_text: Search query |
| threshold: Minimum similarity threshold |
| |
| Returns: |
| list: Matching memories with scores |
| """ |
| if not self.memories: |
| return [] |
| |
| query_embedding = self.text_model.encode(query_text) |
| results = [] |
| |
| for memory in self.memories: |
| if "text_embedding" not in memory: |
| continue |
| |
| mem_embedding = np.array(memory["text_embedding"]) |
| similarity = np.dot(query_embedding, mem_embedding) / ( |
| np.linalg.norm(query_embedding) * np.linalg.norm(mem_embedding) |
| ) |
| |
| if similarity >= threshold: |
| results.append({ |
| "memory": memory, |
| "similarity": float(similarity) |
| }) |
| |
| |
| results.sort(key=lambda x: x["similarity"], reverse=True) |
| return results |
|
|