visionq / agents /memory_agent.py
NanG01's picture
architectural change: restructure project and update documentation
bc3cab1
"""
MemoryAgent - Persistent storage with FAISS vector search
UPDATED MODULE - Added FAISS indexing while keeping JSON metadata
"""
import json
import os
import numpy as np
from datetime import datetime
from sentence_transformers import SentenceTransformer
class MemoryAgent:
def __init__(self, memory_file="data/memory.json", faiss_index_file="data/memory.faiss"):
"""
Initialize memory with JSON + FAISS hybrid storage
KEPT:
- JSON for metadata (timestamps, descriptions, importance)
- sentence-transformers for text embeddings
ADDED:
- FAISS for fast vector similarity search
- Image embeddings storage
"""
self.memory_file = memory_file
self.faiss_index_file = faiss_index_file
# Text embedding model (KEPT)
self.text_model = SentenceTransformer("all-MiniLM-L6-v2")
# FAISS setup (NEW)
self.faiss_available = False
self.index = None
self.embedding_dim = 512 # CLIP embedding dimension
try:
import faiss
self.faiss = faiss
self.faiss_available = True
print("[MemoryAgent] FAISS available")
except ImportError:
print("[MemoryAgent] WARNING: FAISS not installed. Using fallback.")
print("Install with: pip install faiss-cpu")
# Load existing data
self.memories = []
self._load()
# Initialize or load FAISS index
if self.faiss_available:
self._init_faiss_index()
def _load(self):
"""Load memories from JSON (KEPT)"""
try:
with open(self.memory_file, "r") as f:
self.memories = json.load(f)
print(f"[MemoryAgent] Loaded {len(self.memories)} memories")
except FileNotFoundError:
self.memories = []
print("[MemoryAgent] No existing memory file, starting fresh")
except Exception as e:
print(f"[MemoryAgent] Error loading memories: {e}")
self.memories = []
def _save(self):
"""Save memories to JSON (KEPT)"""
os.makedirs(os.path.dirname(self.memory_file), exist_ok=True)
with open(self.memory_file, "w") as f:
json.dump(self.memories, f, indent=2)
def _init_faiss_index(self):
"""Initialize or load FAISS index (NEW)"""
if not self.faiss_available:
return
# Try loading existing index
if os.path.exists(self.faiss_index_file):
try:
self.index = self.faiss.read_index(self.faiss_index_file)
print(f"[MemoryAgent] Loaded FAISS index with {self.index.ntotal} vectors")
return
except Exception as e:
print(f"[MemoryAgent] Error loading FAISS index: {e}")
# Create new index
self.index = self.faiss.IndexFlatIP(self.embedding_dim) # Inner product (cosine similarity)
print("[MemoryAgent] Created new FAISS index")
def _save_faiss_index(self):
"""Save FAISS index to disk (NEW)"""
if self.faiss_available and self.index is not None:
os.makedirs(os.path.dirname(self.faiss_index_file), exist_ok=True)
self.faiss.write_index(self.index, self.faiss_index_file)
@staticmethod
def compute_importance(description):
"""Compute importance score (KEPT)"""
desc = description.lower()
score = 1
if "person" in desc:
score += 2
if any(obj in desc for obj in ["phone", "bag", "book", "device"]):
score += 1
if any(act in desc for act in ["entered", "left", "holding", "walking"]):
score += 2
# NEW: Boost importance if text detected
if "text visible" in desc:
score += 1
return score
def add(self, description, image_embedding=None):
"""
Add memory with both text and image embeddings
KEPT:
- Text embedding via sentence-transformers
- Importance scoring
- JSON storage
ADDED:
- Image embedding storage
- FAISS indexing
"""
# Text embedding (KEPT)
text_embedding = self.text_model.encode(description).tolist()
importance = MemoryAgent.compute_importance(description)
memory = {
"id": len(self.memories),
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"description": description,
"text_embedding": text_embedding,
"importance": importance
}
# Add image embedding if provided (NEW)
if image_embedding is not None:
memory["image_embedding"] = image_embedding.tolist()
# Add to FAISS index (NEW)
if self.faiss_available and self.index is not None:
# Normalize for cosine similarity
norm = np.linalg.norm(image_embedding)
if norm > 0:
normalized = image_embedding / norm
self.index.add(normalized.reshape(1, -1).astype('float32'))
self.memories.append(memory)
self._save()
if self.faiss_available:
self._save_faiss_index()
print(f"[MemoryAgent] Added memory #{memory['id']}")
def recall_last(self):
"""Recall most recent memory (KEPT)"""
if not self.memories:
return None
return self.memories[-1]
def recall_all(self):
"""Return all memories (KEPT)"""
return self.memories
def search_by_image(self, query_embedding, k=5):
"""
Search memories by image similarity using FAISS (NEW)
Args:
query_embedding: Image embedding vector
k: Number of results to return
Returns:
list: Top-k matching memories with scores
"""
if not self.faiss_available or self.index is None or self.index.ntotal == 0:
return []
# Normalize query
norm = np.linalg.norm(query_embedding)
if norm > 0:
query_embedding = query_embedding / norm
# Search FAISS
query_vector = query_embedding.reshape(1, -1).astype('float32')
scores, indices = self.index.search(query_vector, min(k, self.index.ntotal))
# Return matching memories
results = []
for score, idx in zip(scores[0], indices[0]):
if idx < len(self.memories):
results.append({
"memory": self.memories[idx],
"similarity": float(score)
})
return results
def search_by_text(self, query_text, threshold=0.45):
"""
Search memories by text similarity (KEPT, using text embeddings)
Args:
query_text: Search query
threshold: Minimum similarity threshold
Returns:
list: Matching memories with scores
"""
if not self.memories:
return []
query_embedding = self.text_model.encode(query_text)
results = []
for memory in self.memories:
if "text_embedding" not in memory:
continue
mem_embedding = np.array(memory["text_embedding"])
similarity = np.dot(query_embedding, mem_embedding) / (
np.linalg.norm(query_embedding) * np.linalg.norm(mem_embedding)
)
if similarity >= threshold:
results.append({
"memory": memory,
"similarity": float(similarity)
})
# Sort by similarity
results.sort(key=lambda x: x["similarity"], reverse=True)
return results