Humanlearning's picture
multi agent architecture
fe36046
raw
history blame
6.6 kB
"""Memory Layer Implementation for LangGraph Agent System"""
import os
import time
import hashlib
import sqlite3
from typing import Optional, List, Dict, Any, Tuple
from langchain_community.vectorstores import SupabaseVectorStore
from langchain_huggingface import HuggingFaceEmbeddings
from supabase.client import Client, create_client
from langgraph.checkpoint.sqlite import SqliteSaver
from langchain_core.messages import BaseMessage, HumanMessage
# Constants for memory management
TTL = 300 # seconds – how long we keep similarity-search results
SIMILARITY_THRESHOLD = 0.85 # cosine score above which we assume we already know the answer
class MemoryManager:
"""Manages short-term, long-term memory and checkpointing for the agent system"""
def __init__(self):
self.embeddings = HuggingFaceEmbeddings(model_name="sentence-transformers/all-mpnet-base-v2")
self.vector_store = None
self.checkpointer = None
self._sqlite_connection = None
# In-memory caches
self.query_cache: Dict[str, Tuple[float, List]] = {}
self.processed_tasks: set[str] = set()
self.seen_hashes: set[str] = set()
self._initialize_vector_store()
self._initialize_checkpointer()
def _initialize_vector_store(self) -> None:
"""Initialize Supabase vector store for long-term memory"""
try:
supabase_url = os.environ.get("SUPABASE_URL")
supabase_key = os.environ.get("SUPABASE_SERVICE_KEY")
if not supabase_url or not supabase_key:
print("Warning: Supabase credentials not found, vector store will be disabled")
return
supabase: Client = create_client(supabase_url, supabase_key)
self.vector_store = SupabaseVectorStore(
client=supabase,
embedding=self.embeddings,
table_name="documents",
query_name="match_documents_langchain",
)
print("Vector store initialized successfully")
except Exception as e:
print(f"Warning: Could not initialize Supabase vector store: {e}")
def _initialize_checkpointer(self) -> None:
"""Initialize SQLite checkpointer for short-term memory"""
try:
# Create a direct SQLite connection
self._sqlite_connection = sqlite3.connect(":memory:", check_same_thread=False)
self.checkpointer = SqliteSaver(self._sqlite_connection)
print("Checkpointer initialized successfully")
except Exception as e:
print(f"Warning: Could not initialize checkpointer: {e}")
def get_checkpointer(self) -> Optional[SqliteSaver]:
"""Get the checkpointer instance"""
return self.checkpointer
def close_checkpointer(self) -> None:
"""Close the checkpointer and its SQLite connection"""
if self._sqlite_connection:
try:
self._sqlite_connection.close()
print("SQLite connection closed")
except Exception as e:
print(f"Warning: Error closing SQLite connection: {e}")
def similarity_search(self, query: str, k: int = 2) -> List[Any]:
"""Search for similar questions with caching"""
if not self.vector_store:
return []
# Check cache first
q_hash = hashlib.sha256(query.encode()).hexdigest()
now = time.time()
if q_hash in self.query_cache and now - self.query_cache[q_hash][0] < TTL:
print("Memory: Cache hit for similarity search")
return self.query_cache[q_hash][1]
try:
print("Memory: Searching vector store for similar questions...")
similar_questions = self.vector_store.similarity_search_with_relevance_scores(query, k=k)
self.query_cache[q_hash] = (now, similar_questions)
return similar_questions
except Exception as e:
print(f"Memory: Vector store search error – {e}")
return []
def should_ingest(self, query: str) -> bool:
"""Determine if this query/answer should be ingested to long-term memory"""
if not self.vector_store:
return False
similar_questions = self.similarity_search(query, k=1)
top_score = similar_questions[0][1] if similar_questions else 0.0
return top_score < SIMILARITY_THRESHOLD
def ingest_qa_pair(self, question: str, answer: str, attachments: str = "") -> None:
"""Store Q/A pair in long-term memory"""
if not self.vector_store:
print("Memory: Vector store not available for ingestion")
return
try:
payload = f"Question:\n{question}\n\nAnswer:\n{answer}"
if attachments:
payload += f"\n\n{attachments}"
hash_id = hashlib.sha256(payload.encode()).hexdigest()
if hash_id in self.seen_hashes:
print("Memory: Duplicate payload within session – skip")
return
self.seen_hashes.add(hash_id)
self.vector_store.add_texts(
[payload],
metadatas=[{"hash_id": hash_id, "timestamp": time.time()}]
)
print("Memory: Stored new Q/A pair in vector store")
except Exception as e:
print(f"Memory: Error while upserting – {e}")
def get_similar_qa(self, query: str) -> Optional[str]:
"""Get similar Q/A for context"""
similar_questions = self.similarity_search(query, k=1)
if not similar_questions:
return None
example_doc = similar_questions[0][0] if isinstance(similar_questions[0], tuple) else similar_questions[0]
return example_doc.page_content
def add_processed_task(self, task_id: str) -> None:
"""Mark a task as processed to avoid re-downloading attachments"""
self.processed_tasks.add(task_id)
def is_task_processed(self, task_id: str) -> bool:
"""Check if a task has already been processed"""
return task_id in self.processed_tasks
def clear_session_cache(self) -> None:
"""Clear session-specific caches"""
self.query_cache.clear()
self.processed_tasks.clear()
self.seen_hashes.clear()
print("Memory: Session cache cleared")
# Global memory manager instance
memory_manager = MemoryManager()