""" MemoryAgent - Persistent storage with FAISS vector search UPDATED MODULE - Added FAISS indexing while keeping JSON metadata """ import json import os import numpy as np from datetime import datetime from sentence_transformers import SentenceTransformer class MemoryAgent: def __init__(self, memory_file="data/memory.json", faiss_index_file="data/memory.faiss"): """ Initialize memory with JSON + FAISS hybrid storage KEPT: - JSON for metadata (timestamps, descriptions, importance) - sentence-transformers for text embeddings ADDED: - FAISS for fast vector similarity search - Image embeddings storage """ self.memory_file = memory_file self.faiss_index_file = faiss_index_file # Text embedding model (KEPT) self.text_model = SentenceTransformer("all-MiniLM-L6-v2") # FAISS setup (NEW) self.faiss_available = False self.index = None self.embedding_dim = 512 # CLIP embedding dimension try: import faiss self.faiss = faiss self.faiss_available = True print("[MemoryAgent] FAISS available") except ImportError: print("[MemoryAgent] WARNING: FAISS not installed. Using fallback.") print("Install with: pip install faiss-cpu") # Load existing data self.memories = [] self._load() # Initialize or load FAISS index if self.faiss_available: self._init_faiss_index() def _load(self): """Load memories from JSON (KEPT)""" try: with open(self.memory_file, "r") as f: self.memories = json.load(f) print(f"[MemoryAgent] Loaded {len(self.memories)} memories") except FileNotFoundError: self.memories = [] print("[MemoryAgent] No existing memory file, starting fresh") except Exception as e: print(f"[MemoryAgent] Error loading memories: {e}") self.memories = [] def _save(self): """Save memories to JSON (KEPT)""" os.makedirs(os.path.dirname(self.memory_file), exist_ok=True) with open(self.memory_file, "w") as f: json.dump(self.memories, f, indent=2) def _init_faiss_index(self): """Initialize or load FAISS index (NEW)""" if not self.faiss_available: return # Try loading existing index if os.path.exists(self.faiss_index_file): try: self.index = self.faiss.read_index(self.faiss_index_file) print(f"[MemoryAgent] Loaded FAISS index with {self.index.ntotal} vectors") return except Exception as e: print(f"[MemoryAgent] Error loading FAISS index: {e}") # Create new index self.index = self.faiss.IndexFlatIP(self.embedding_dim) # Inner product (cosine similarity) print("[MemoryAgent] Created new FAISS index") def _save_faiss_index(self): """Save FAISS index to disk (NEW)""" if self.faiss_available and self.index is not None: os.makedirs(os.path.dirname(self.faiss_index_file), exist_ok=True) self.faiss.write_index(self.index, self.faiss_index_file) @staticmethod def compute_importance(description): """Compute importance score (KEPT)""" desc = description.lower() score = 1 if "person" in desc: score += 2 if any(obj in desc for obj in ["phone", "bag", "book", "device"]): score += 1 if any(act in desc for act in ["entered", "left", "holding", "walking"]): score += 2 # NEW: Boost importance if text detected if "text visible" in desc: score += 1 return score def add(self, description, image_embedding=None): """ Add memory with both text and image embeddings KEPT: - Text embedding via sentence-transformers - Importance scoring - JSON storage ADDED: - Image embedding storage - FAISS indexing """ # Text embedding (KEPT) text_embedding = self.text_model.encode(description).tolist() importance = MemoryAgent.compute_importance(description) memory = { "id": len(self.memories), "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "description": description, "text_embedding": text_embedding, "importance": importance } # Add image embedding if provided (NEW) if image_embedding is not None: memory["image_embedding"] = image_embedding.tolist() # Add to FAISS index (NEW) if self.faiss_available and self.index is not None: # Normalize for cosine similarity norm = np.linalg.norm(image_embedding) if norm > 0: normalized = image_embedding / norm self.index.add(normalized.reshape(1, -1).astype('float32')) self.memories.append(memory) self._save() if self.faiss_available: self._save_faiss_index() print(f"[MemoryAgent] Added memory #{memory['id']}") def recall_last(self): """Recall most recent memory (KEPT)""" if not self.memories: return None return self.memories[-1] def recall_all(self): """Return all memories (KEPT)""" return self.memories def search_by_image(self, query_embedding, k=5): """ Search memories by image similarity using FAISS (NEW) Args: query_embedding: Image embedding vector k: Number of results to return Returns: list: Top-k matching memories with scores """ if not self.faiss_available or self.index is None or self.index.ntotal == 0: return [] # Normalize query norm = np.linalg.norm(query_embedding) if norm > 0: query_embedding = query_embedding / norm # Search FAISS query_vector = query_embedding.reshape(1, -1).astype('float32') scores, indices = self.index.search(query_vector, min(k, self.index.ntotal)) # Return matching memories results = [] for score, idx in zip(scores[0], indices[0]): if idx < len(self.memories): results.append({ "memory": self.memories[idx], "similarity": float(score) }) return results def search_by_text(self, query_text, threshold=0.45): """ Search memories by text similarity (KEPT, using text embeddings) Args: query_text: Search query threshold: Minimum similarity threshold Returns: list: Matching memories with scores """ if not self.memories: return [] query_embedding = self.text_model.encode(query_text) results = [] for memory in self.memories: if "text_embedding" not in memory: continue mem_embedding = np.array(memory["text_embedding"]) similarity = np.dot(query_embedding, mem_embedding) / ( np.linalg.norm(query_embedding) * np.linalg.norm(mem_embedding) ) if similarity >= threshold: results.append({ "memory": memory, "similarity": float(similarity) }) # Sort by similarity results.sort(key=lambda x: x["similarity"], reverse=True) return results