| | """ |
| | Memory System for LangGraph Multi-Agent System |
| | |
| | Simplified memory management system that provides: |
| | 1. Vector store integration for long-term memory |
| | 2. Session-based caching for recent queries |
| | 3. Q&A pair storage for learning from interactions |
| | 4. Similar question retrieval for context |
| | """ |
| |
|
| | import os |
| | import time |
| | import hashlib |
| | from typing import Optional, List, Dict, Any, Tuple |
| | from dotenv import load_dotenv |
| |
|
| | load_dotenv("env.local") |
| |
|
| |
|
| | class MemoryManager: |
| | """Manages memory for the multi-agent system""" |
| | |
| | def __init__(self): |
| | self.vector_store = None |
| | self.embeddings = None |
| | |
| | |
| | self.query_cache: Dict[str, Tuple[float, List]] = {} |
| | self.processed_tasks: set[str] = set() |
| | self.seen_hashes: set[str] = set() |
| | |
| | |
| | self.ttl = 300 |
| | self.similarity_threshold = 0.85 |
| | |
| | self._initialize_vector_store() |
| | |
| | def _initialize_vector_store(self) -> None: |
| | """Initialize vector store if credentials are available""" |
| | try: |
| | supabase_url = os.environ.get("SUPABASE_URL") |
| | supabase_key = os.environ.get("SUPABASE_SERVICE_KEY") |
| | |
| | if not supabase_url or not supabase_key: |
| | print("⚠️ Vector store disabled: Supabase credentials not found") |
| | return |
| | |
| | |
| | from langchain_community.vectorstores import SupabaseVectorStore |
| | from langchain_huggingface import HuggingFaceEmbeddings |
| | from supabase.client import Client, create_client |
| | |
| | self.embeddings = HuggingFaceEmbeddings( |
| | model_name="sentence-transformers/all-mpnet-base-v2" |
| | ) |
| | |
| | supabase: Client = create_client(supabase_url, supabase_key) |
| | self.vector_store = SupabaseVectorStore( |
| | client=supabase, |
| | embedding=self.embeddings, |
| | table_name="documents", |
| | query_name="match_documents_langchain", |
| | ) |
| | print("✅ Vector store initialized successfully") |
| | |
| | except Exception as e: |
| | print(f"⚠️ Vector store initialization failed: {e}") |
| | |
| | def similarity_search(self, query: str, k: int = 2) -> List[Any]: |
| | """Search for similar questions with caching""" |
| | if not self.vector_store: |
| | return [] |
| | |
| | |
| | q_hash = hashlib.sha256(query.encode()).hexdigest() |
| | now = time.time() |
| | |
| | if q_hash in self.query_cache and now - self.query_cache[q_hash][0] < self.ttl: |
| | print("💾 Memory cache hit") |
| | return self.query_cache[q_hash][1] |
| | |
| | try: |
| | print("🔍 Searching vector store for similar questions...") |
| | similar_questions = self.vector_store.similarity_search_with_relevance_scores(query, k=k) |
| | self.query_cache[q_hash] = (now, similar_questions) |
| | return similar_questions |
| | except Exception as e: |
| | print(f"⚠️ Vector store search error: {e}") |
| | return [] |
| | |
| | def should_ingest(self, query: str) -> bool: |
| | """Determine if this Q&A should be stored in long-term memory""" |
| | if not self.vector_store: |
| | return False |
| | |
| | similar_questions = self.similarity_search(query, k=1) |
| | top_score = similar_questions[0][1] if similar_questions else 0.0 |
| | return top_score < self.similarity_threshold |
| | |
| | def ingest_qa_pair(self, question: str, answer: str, attachments: str = "") -> None: |
| | """Store Q&A pair in long-term memory""" |
| | if not self.vector_store: |
| | print("⚠️ Vector store not available for storage") |
| | return |
| | |
| | try: |
| | payload = f"Question:\n{question}\n\nAnswer:\n{answer}" |
| | if attachments: |
| | payload += f"\n\nContext:\n{attachments}" |
| | |
| | hash_id = hashlib.sha256(payload.encode()).hexdigest() |
| | if hash_id in self.seen_hashes: |
| | print("⚠️ Duplicate Q&A pair - skipping storage") |
| | return |
| | |
| | self.seen_hashes.add(hash_id) |
| | self.vector_store.add_texts( |
| | [payload], |
| | metadatas=[{"hash_id": hash_id, "timestamp": str(time.time())}], |
| | ids=[hash_id] |
| | ) |
| | print("✅ Stored Q&A pair in long-term memory") |
| | except Exception as e: |
| | print(f"⚠️ Error storing Q&A pair: {e}") |
| | |
| | def get_similar_qa(self, query: str) -> Optional[str]: |
| | """Get similar Q&A for context""" |
| | similar_questions = self.similarity_search(query, k=1) |
| | if not similar_questions: |
| | return None |
| | |
| | |
| | if isinstance(similar_questions[0], tuple): |
| | doc, score = similar_questions[0] |
| | if score > self.similarity_threshold: |
| | return doc.page_content if hasattr(doc, 'page_content') else str(doc) |
| | |
| | return None |
| | |
| | def add_processed_task(self, task_id: str) -> None: |
| | """Mark a task as processed""" |
| | self.processed_tasks.add(task_id) |
| | |
| | def is_task_processed(self, task_id: str) -> bool: |
| | """Check if a task has been processed""" |
| | return task_id in self.processed_tasks |
| | |
| | def clear_session_cache(self) -> None: |
| | """Clear session-specific caches""" |
| | self.query_cache.clear() |
| | self.processed_tasks.clear() |
| | self.seen_hashes.clear() |
| | print("🗑️ Session cache cleared") |
| |
|
| |
|
| | |
| | memory_manager = MemoryManager() |