Spaces:
Sleeping
Sleeping
| """Long-term memory using vector store for persistent context.""" | |
| import logging | |
| from typing import List, Dict, Optional, Any | |
| from datetime import datetime | |
| import uuid | |
| from src.core.config import get_settings | |
| from src.retrieval.vector_store import get_vector_store | |
| logger = logging.getLogger(__name__) | |
| class LongTermMemory: | |
| """Manages long-term memory using vector store for semantic search.""" | |
| def __init__(self, collection_name: Optional[str] = None): | |
| """Initialize long-term memory.""" | |
| self.settings = get_settings() | |
| self.enabled = self.settings.long_term_memory_enabled | |
| if not self.enabled: | |
| logger.info("Long-term memory is disabled") | |
| return | |
| # Use a separate collection for long-term memory | |
| memory_collection = collection_name or f"{self.settings.chroma_collection_name}_memory" | |
| self.vector_store = get_vector_store() | |
| # Note: We'll use the same vector store but with different collection | |
| # For simplicity, we'll use metadata to distinguish memory entries | |
| self.memory_collection_name = memory_collection | |
| def store_conversation( | |
| self, | |
| session_id: str, | |
| messages: List[Dict[str, Any]], | |
| summary: Optional[str] = None, | |
| ) -> str: | |
| """ | |
| Store a conversation in long-term memory. | |
| Args: | |
| session_id: Session identifier | |
| messages: List of messages | |
| summary: Optional conversation summary | |
| Returns: | |
| Memory entry ID | |
| """ | |
| if not self.enabled: | |
| return "" | |
| try: | |
| # Create a text representation of the conversation | |
| conversation_text = self._format_conversation(messages, summary) | |
| # Generate a unique ID | |
| memory_id = str(uuid.uuid4()) | |
| # Store in vector store with metadata | |
| metadata = { | |
| "session_id": session_id, | |
| "timestamp": datetime.now().isoformat(), | |
| "message_count": len(messages), | |
| "type": "conversation", | |
| } | |
| if summary: | |
| metadata["summary"] = summary | |
| self.vector_store.add_documents( | |
| documents=[conversation_text], | |
| metadatas=[metadata], | |
| ids=[memory_id], | |
| ) | |
| logger.info(f"Stored conversation in long-term memory: {memory_id}") | |
| return memory_id | |
| except Exception as e: | |
| logger.error(f"Error storing conversation: {e}") | |
| return "" | |
| def search_memories( | |
| self, | |
| query: str, | |
| session_id: Optional[str] = None, | |
| n_results: int = 5, | |
| ) -> List[Dict[str, Any]]: | |
| """ | |
| Search for relevant memories. | |
| Args: | |
| query: Search query | |
| session_id: Optional session ID to filter by | |
| n_results: Number of results to return | |
| Returns: | |
| List of memory entries | |
| """ | |
| if not self.enabled: | |
| return [] | |
| try: | |
| # Build filter if session_id is provided | |
| filter_dict = None | |
| if session_id: | |
| filter_dict = {"session_id": session_id} | |
| # Search vector store | |
| results = self.vector_store.search( | |
| query=query, | |
| n_results=n_results, | |
| filter=filter_dict, | |
| ) | |
| # Format results | |
| memories = [] | |
| for i, doc_id in enumerate(results["ids"]): | |
| memories.append({ | |
| "id": doc_id, | |
| "content": results["documents"][i], | |
| "metadata": results["metadatas"][i], | |
| "distance": results["distances"][i], | |
| }) | |
| return memories | |
| except Exception as e: | |
| logger.error(f"Error searching memories: {e}") | |
| return [] | |
| def get_session_memories( | |
| self, | |
| session_id: str, | |
| limit: int = 10, | |
| ) -> List[Dict[str, Any]]: | |
| """ | |
| Get all memories for a specific session. | |
| Args: | |
| session_id: Session identifier | |
| limit: Maximum number of memories to return | |
| Returns: | |
| List of memory entries | |
| """ | |
| if not self.enabled: | |
| return [] | |
| try: | |
| # Search with session filter | |
| results = self.vector_store.search( | |
| query="", # Empty query to get all | |
| n_results=limit, | |
| filter={"session_id": session_id}, | |
| ) | |
| memories = [] | |
| for i, doc_id in enumerate(results["ids"]): | |
| memories.append({ | |
| "id": doc_id, | |
| "content": results["documents"][i], | |
| "metadata": results["metadatas"][i], | |
| }) | |
| # Sort by timestamp | |
| memories.sort( | |
| key=lambda x: x["metadata"].get("timestamp", ""), | |
| reverse=True, | |
| ) | |
| return memories | |
| except Exception as e: | |
| logger.error(f"Error getting session memories: {e}") | |
| return [] | |
| def delete_memory(self, memory_id: str) -> bool: | |
| """ | |
| Delete a specific memory entry. | |
| Args: | |
| memory_id: Memory entry ID | |
| Returns: | |
| True if successful | |
| """ | |
| if not self.enabled: | |
| return False | |
| try: | |
| self.vector_store.delete(ids=[memory_id]) | |
| logger.info(f"Deleted memory: {memory_id}") | |
| return True | |
| except Exception as e: | |
| logger.error(f"Error deleting memory: {e}") | |
| return False | |
| def delete_session_memories(self, session_id: str) -> int: | |
| """ | |
| Delete all memories for a session. | |
| Args: | |
| session_id: Session identifier | |
| Returns: | |
| Number of memories deleted | |
| """ | |
| if not self.enabled: | |
| return 0 | |
| try: | |
| memories = self.get_session_memories(session_id, limit=1000) | |
| if not memories: | |
| return 0 | |
| memory_ids = [m["id"] for m in memories] | |
| self.vector_store.delete(ids=memory_ids) | |
| logger.info(f"Deleted {len(memory_ids)} memories for session: {session_id}") | |
| return len(memory_ids) | |
| except Exception as e: | |
| logger.error(f"Error deleting session memories: {e}") | |
| return 0 | |
| def _format_conversation( | |
| self, | |
| messages: List[Dict[str, Any]], | |
| summary: Optional[str] = None, | |
| ) -> str: | |
| """Format conversation for storage.""" | |
| parts = [] | |
| if summary: | |
| parts.append(f"Summary: {summary}\n") | |
| parts.append("Conversation:") | |
| for msg in messages: | |
| role = msg.get("role", "unknown") | |
| content = msg.get("content", "") | |
| parts.append(f"{role}: {content}") | |
| return "\n".join(parts) | |
| def store_fact( | |
| self, | |
| fact: str, | |
| session_id: Optional[str] = None, | |
| metadata: Optional[Dict[str, Any]] = None, | |
| ) -> str: | |
| """ | |
| Store a fact or piece of information. | |
| Args: | |
| fact: Fact to store | |
| session_id: Optional session ID | |
| metadata: Optional additional metadata | |
| Returns: | |
| Memory entry ID | |
| """ | |
| if not self.enabled: | |
| return "" | |
| try: | |
| memory_id = str(uuid.uuid4()) | |
| fact_metadata = { | |
| "timestamp": datetime.now().isoformat(), | |
| "type": "fact", | |
| } | |
| if session_id: | |
| fact_metadata["session_id"] = session_id | |
| if metadata: | |
| fact_metadata.update(metadata) | |
| self.vector_store.add_documents( | |
| documents=[fact], | |
| metadatas=[fact_metadata], | |
| ids=[memory_id], | |
| ) | |
| logger.info(f"Stored fact in long-term memory: {memory_id}") | |
| return memory_id | |
| except Exception as e: | |
| logger.error(f"Error storing fact: {e}") | |
| return "" | |