Spaces:
Paused
Paused
| """ | |
| Long-Term Memory System - Virtual Me Agent | |
| Uses ChromaDB for vector-based memory retrieval of past conversations. | |
| """ | |
| import os | |
| import json | |
| import hashlib | |
| from datetime import datetime | |
| from typing import List, Dict, Optional, Any | |
| import chromadb | |
| from chromadb.config import Settings | |
| from sentence_transformers import SentenceTransformer | |
| class MemoryManager: | |
| """Manages long-term memory using vector embeddings for semantic retrieval.""" | |
| def __init__(self, | |
| user_id: str = "default", | |
| db_path: str = "./memory_db", | |
| embedding_model: str = "sentence-transformers/all-MiniLM-L6-v2"): | |
| self.user_id = user_id | |
| self.db_path = db_path | |
| self.embedding_model_name = embedding_model | |
| self.embedding_model = None | |
| # Initialize ChromaDB | |
| os.makedirs(db_path, exist_ok=True) | |
| self.client = chromadb.PersistentClient( | |
| path=db_path, | |
| settings=Settings(anonymized_telemetry=False) | |
| ) | |
| # Get or create collection for this user | |
| self.collection = self.client.get_or_create_collection( | |
| name=f"virtual_me_{user_id}", | |
| metadata={"hnsw:space": "cosine"} | |
| ) | |
| self._load_embedding_model() | |
| def _load_embedding_model(self): | |
| """Lazy-load the embedding model.""" | |
| if self.embedding_model is None: | |
| print(f"Loading embedding model: {self.embedding_model_name}") | |
| self.embedding_model = SentenceTransformer(self.embedding_model_name) | |
| print("Embedding model loaded.") | |
| def _embed(self, texts: List[str]) -> List[List[float]]: | |
| """Embed texts into vectors.""" | |
| self._load_embedding_model() | |
| return self.embedding_model.encode(texts, convert_to_numpy=True).tolist() | |
| def _generate_id(self, text: str, timestamp: str = None) -> str: | |
| """Generate a unique ID for a memory.""" | |
| content = f"{text}_{timestamp or datetime.now().isoformat()}" | |
| return hashlib.md5(content.encode()).hexdigest() | |
| def add_memory(self, | |
| content: str, | |
| metadata: Optional[Dict[str, Any]] = None, | |
| memory_type: str = "conversation") -> str: | |
| """ | |
| Add a new memory to long-term storage. | |
| Args: | |
| content: The text to remember | |
| metadata: Additional info (timestamp, type, etc.) | |
| memory_type: Category of memory (conversation, fact, preference, etc.) | |
| Returns: | |
| The memory ID | |
| """ | |
| timestamp = metadata.get("timestamp", datetime.now().isoformat()) if metadata else datetime.now().isoformat() | |
| memory_id = self._generate_id(content, timestamp) | |
| # Prepare metadata | |
| full_metadata = { | |
| "timestamp": timestamp, | |
| "type": memory_type, | |
| "content_preview": content[:100], | |
| } | |
| if metadata: | |
| # Filter to only allow serializable simple types | |
| for k, v in metadata.items(): | |
| if isinstance(v, (str, int, float, bool)): | |
| full_metadata[k] = v | |
| # Generate embedding and store | |
| embeddings = self._embed([content]) | |
| self.collection.add( | |
| ids=[memory_id], | |
| embeddings=embeddings, | |
| documents=[content], | |
| metadatas=[full_metadata] | |
| ) | |
| return memory_id | |
| def get_relevant_memories(self, | |
| query: str, | |
| n_results: int = 5, | |
| memory_type: Optional[str] = None) -> List[Dict]: | |
| """ | |
| Retrieve memories relevant to a query. | |
| Args: | |
| query: Text to find relevant memories for | |
| n_results: Number of memories to retrieve | |
| memory_type: Filter by memory type | |
| Returns: | |
| List of memory dicts with content, metadata, and distance | |
| """ | |
| query_embedding = self._embed([query]) | |
| where_filter = None | |
| if memory_type: | |
| where_filter = {"type": memory_type} | |
| results = self.collection.query( | |
| query_embeddings=query_embedding, | |
| n_results=min(n_results, self.collection.count()), | |
| where=where_filter, | |
| include=["documents", "metadatas", "distances"] | |
| ) | |
| memories = [] | |
| if results["ids"] and results["ids"][0]: | |
| for i, memory_id in enumerate(results["ids"][0]): | |
| memories.append({ | |
| "id": memory_id, | |
| "content": results["documents"][0][i] if results["documents"] else "", | |
| "metadata": results["metadatas"][0][i] if results["metadatas"] else {}, | |
| "distance": results["distances"][0][i] if results["distances"] else 1.0 | |
| }) | |
| return memories | |
| def get_recent_memories(self, n_results: int = 10) -> List[Dict]: | |
| """Get most recent memories by timestamp.""" | |
| # ChromaDB doesn't have a native sort, so we get all and sort | |
| all_results = self.collection.get(include=["documents", "metadatas"]) | |
| if not all_results["ids"]: | |
| return [] | |
| memories = [] | |
| for i, memory_id in enumerate(all_results["ids"]): | |
| memories.append({ | |
| "id": memory_id, | |
| "content": all_results["documents"][i] if all_results["documents"] else "", | |
| "metadata": all_results["metadatas"][i] if all_results["metadatas"] else {}, | |
| }) | |
| # Sort by timestamp descending | |
| memories.sort( | |
| key=lambda x: x["metadata"].get("timestamp", ""), | |
| reverse=True | |
| ) | |
| return memories[:n_results] | |
| def get_memory_summary(self) -> Dict: | |
| """Get a summary of stored memories.""" | |
| count = self.collection.count() | |
| all_data = self.collection.get(include=["metadatas"]) | |
| type_counts = {} | |
| if all_data["metadatas"]: | |
| for meta in all_data["metadatas"]: | |
| t = meta.get("type", "unknown") | |
| type_counts[t] = type_counts.get(t, 0) + 1 | |
| return { | |
| "total_memories": count, | |
| "type_distribution": type_counts, | |
| "user_id": self.user_id | |
| } | |
| def search_by_type(self, memory_type: str, n_results: int = 50) -> List[Dict]: | |
| """Search memories by type.""" | |
| results = self.collection.get( | |
| where={"type": memory_type}, | |
| include=["documents", "metadatas"] | |
| ) | |
| memories = [] | |
| if results["ids"]: | |
| for i, memory_id in enumerate(results["ids"]): | |
| memories.append({ | |
| "id": memory_id, | |
| "content": results["documents"][i] if results["documents"] else "", | |
| "metadata": results["metadatas"][i] if results["metadatas"] else {}, | |
| }) | |
| return memories[:n_results] | |
| def delete_memory(self, memory_id: str) -> bool: | |
| """Delete a specific memory.""" | |
| try: | |
| self.collection.delete(ids=[memory_id]) | |
| return True | |
| except Exception as e: | |
| print(f"Error deleting memory: {e}") | |
| return False | |
| def clear_all_memories(self): | |
| """Clear all memories for this user.""" | |
| self.client.delete_collection(name=f"virtual_me_{self.user_id}") | |
| self.collection = self.client.get_or_create_collection( | |
| name=f"virtual_me_{self.user_id}", | |
| metadata={"hnsw:space": "cosine"} | |
| ) | |
| class InteractionTracker: | |
| """Tracks interaction patterns for learning and improvement.""" | |
| def __init__(self, memory_manager: MemoryManager): | |
| self.memory = memory_manager | |
| self.interaction_stats = { | |
| "total_conversations": 0, | |
| "total_messages": 0, | |
| "avg_response_time": 0.0, | |
| "user_satisfaction_scores": [], | |
| "topics_discussed": set(), | |
| "emotional_patterns": [] | |
| } | |
| def log_interaction(self, | |
| user_message: str, | |
| assistant_response: str, | |
| duration_ms: float = None, | |
| user_feedback: str = None): | |
| """Log a single interaction for learning.""" | |
| self.interaction_stats["total_messages"] += 1 | |
| # Extract topics (simple keyword extraction) | |
| topics = self._extract_topics(user_message) | |
| self.interaction_stats["topics_discussed"].update(topics) | |
| # Store in memory | |
| self.memory.add_memory( | |
| content=f"Interaction: User: '{user_message}' | Me: '{assistant_response}'", | |
| metadata={ | |
| "type": "interaction", | |
| "topics": ",".join(topics), | |
| "feedback": user_feedback or "none", | |
| "duration_ms": duration_ms or 0 | |
| } | |
| ) | |
| def log_feedback(self, original_message: str, feedback: str, rating: int = None): | |
| """Log explicit feedback for learning.""" | |
| self.memory.add_memory( | |
| content=f"Feedback on my response to '{original_message}': {feedback}", | |
| metadata={ | |
| "type": "feedback", | |
| "rating": rating or 0, | |
| "original_message": original_message | |
| } | |
| ) | |
| if rating: | |
| self.interaction_stats["user_satisfaction_scores"].append(rating) | |
| def _extract_topics(self, text: str) -> List[str]: | |
| """Simple topic extraction.""" | |
| # Simple keyword-based extraction | |
| text_lower = text.lower() | |
| common_topics = [ | |
| "work", "family", "friends", "hobbies", "music", "movies", "books", | |
| "travel", "food", "sports", "technology", "politics", "health", | |
| "money", "education", "love", "stress", "happiness", "goals", | |
| "dreams", "fears", "memories", "plans", "advice", "learning" | |
| ] | |
| found = [t for t in common_topics if t in text_lower] | |
| return found | |
| def get_learning_insights(self) -> Dict: | |
| """Get insights from all interactions for improving the clone.""" | |
| feedback_memories = self.memory.search_by_type("feedback", n_results=100) | |
| interaction_memories = self.memory.search_by_type("interaction", n_results=100) | |
| insights = { | |
| "total_interactions": len(interaction_memories), | |
| "total_feedback": len(feedback_memories), | |
| "common_topics": list(self.interaction_stats["topics_discussed"]), | |
| "avg_satisfaction": ( | |
| sum(self.interaction_stats["user_satisfaction_scores"]) / | |
| len(self.interaction_stats["user_satisfaction_scores"]) | |
| if self.interaction_stats["user_satisfaction_scores"] else 0 | |
| ) | |
| } | |
| # Analyze feedback patterns | |
| corrections = [m for m in feedback_memories if "wrong" in m["content"].lower() or "incorrect" in m["content"].lower()] | |
| preferences = [m for m in feedback_memories if any(w in m["content"].lower() for w in ["prefer", "like", "want"])] | |
| insights["common_corrections"] = [m["content"] for m in corrections[:5]] | |
| insights["expressed_preferences"] = [m["content"] for m in preferences[:5]] | |
| return insights | |