""" Système de Mémoire et Stockage Vectoriel pour l'Assistant de Recherche Gère : embeddings, recherche sémantique, historique et déduplication """ import chromadb from chromadb.config import Settings from langchain_community.embeddings import HuggingFaceEmbeddings from langchain_community.vectorstores import Chroma from langchain_core.documents import Document from typing import List, Dict, Optional, Tuple from datetime import datetime import hashlib import json import pickle from pathlib import Path from collections import deque # ============================================================================ # GESTIONNAIRE DE MÉMOIRE VECTORIELLE # ============================================================================ class VectorMemoryManager: """Gère le stockage vectoriel des documents et résumés""" def __init__(self, persist_directory: str = "./chroma_db", collection_name: str = "research_documents", embedding_model: str = "sentence-transformers/all-MiniLM-L6-v2"): """ Initialise le gestionnaire de mémoire vectorielle Args: persist_directory: Dossier de persistance de ChromaDB collection_name: Nom de la collection ChromaDB embedding_model: Modèle d'embeddings HuggingFace """ self.persist_directory = Path(persist_directory) self.persist_directory.mkdir(parents=True, exist_ok=True) print(f"🔧 Initialisation du système de mémoire vectorielle...") # Configuration des embeddings self.embeddings = HuggingFaceEmbeddings( model_name=embedding_model, model_kwargs={'device': 'cpu'}, encode_kwargs={'normalize_embeddings': True} ) # Configuration ChromaDB self.client = chromadb.PersistentClient( path=str(self.persist_directory), settings=Settings( anonymized_telemetry=False, allow_reset=True ) ) # Créer ou récupérer la collection try: self.collection = self.client.get_collection(collection_name) print(f"✅ Collection '{collection_name}' récupérée ({self.collection.count()} documents)") except: self.collection = self.client.create_collection( name=collection_name, metadata={"hnsw:space": "cosine"} ) print(f"✅ Nouvelle collection '{collection_name}' créée") # Initialiser le vectorstore LangChain self.vectorstore = Chroma( client=self.client, collection_name=collection_name, embedding_function=self.embeddings ) # Cache pour déduplication rapide self.content_hashes = set() self._load_existing_hashes() def _load_existing_hashes(self): """Charge les hashes des documents existants pour déduplication""" try: results = self.collection.get(include=['metadatas']) for metadata in results['metadatas']: if 'content_hash' in metadata: self.content_hashes.add(metadata['content_hash']) print(f"📋 {len(self.content_hashes)} hashes chargés pour déduplication") except Exception as e: print(f"⚠️ Erreur lors du chargement des hashes: {e}") def _compute_hash(self, content: str) -> str: """Calcule le hash MD5 d'un contenu""" return hashlib.md5(content.encode('utf-8')).hexdigest() def is_duplicate(self, content: str) -> bool: """Vérifie si un document est un doublon""" content_hash = self._compute_hash(content) return content_hash in self.content_hashes def add_documents(self, documents: List[Dict[str, any]], source: str = "research", check_duplicates: bool = True) -> Dict[str, int]: """ Ajoute des documents au vectorstore Args: documents: Liste de dicts avec 'content', 'title', 'url', etc. source: Source des documents (research, summary, synthesis) check_duplicates: Vérifier les doublons avant ajout Returns: Dict avec statistiques d'ajout """ print(f"\n📥 Ajout de {len(documents)} documents (source: {source})...") added = 0 skipped = 0 docs_to_add = [] metadatas_to_add = [] ids_to_add = [] for doc in documents: content = doc.get('content', '') # Vérification des doublons if check_duplicates and self.is_duplicate(content): skipped += 1 continue # Création du document LangChain content_hash = self._compute_hash(content) doc_id = f"{source}_{content_hash[:8]}_{datetime.now().timestamp()}" metadata = { 'title': doc.get('title', 'Sans titre'), 'url': doc.get('url', ''), 'source': source, 'timestamp': datetime.now().isoformat(), 'content_hash': content_hash, 'word_count': len(content.split()) } docs_to_add.append(content) metadatas_to_add.append(metadata) ids_to_add.append(doc_id) self.content_hashes.add(content_hash) added += 1 # Ajout batch à ChromaDB if docs_to_add: self.collection.add( documents=docs_to_add, metadatas=metadatas_to_add, ids=ids_to_add ) stats = { 'added': added, 'skipped': skipped, 'total_in_db': self.collection.count() } print(f"✅ Ajoutés: {added} | Doublons ignorés: {skipped} | Total DB: {stats['total_in_db']}") return stats def semantic_search(self, query: str, k: int = 5, filter_dict: Optional[Dict] = None) -> List[Tuple[Document, float]]: """ Recherche sémantique dans le vectorstore Args: query: Requête de recherche k: Nombre de résultats à retourner filter_dict: Filtres sur les métadonnées (ex: {'source': 'research'}) Returns: Liste de tuples (Document, score) """ print(f"\n🔍 Recherche sémantique: '{query}' (top-{k})") results = self.vectorstore.similarity_search_with_score( query=query, k=k, filter=filter_dict ) print(f"✅ {len(results)} résultats trouvés") return results def get_relevant_context(self, query: str, k: int = 3, source_filter: Optional[str] = None) -> str: """ Récupère le contexte pertinent pour une requête Args: query: Requête k: Nombre de documents à récupérer source_filter: Filtrer par source (research, summary, etc.) Returns: Contexte formaté en string """ filter_dict = {"source": source_filter} if source_filter else None results = self.semantic_search(query, k=k, filter_dict=filter_dict) if not results: return "" context_parts = [] for i, (doc, score) in enumerate(results, 1): context_parts.append( f"[Source {i} - Pertinence: {score:.2f}]\n" f"Titre: {doc.metadata.get('title', 'N/A')}\n" f"{doc.page_content[:500]}...\n" ) return "\n---\n".join(context_parts) def clear_old_documents(self, days: int = 30) -> int: """ Supprime les documents plus anciens que X jours Args: days: Nombre de jours de rétention Returns: Nombre de documents supprimés """ print(f"\n🧹 Nettoyage des documents > {days} jours...") from datetime import timedelta cutoff_date = datetime.now() - timedelta(days=days) results = self.collection.get(include=['metadatas']) ids_to_delete = [] for doc_id, metadata in zip(results['ids'], results['metadatas']): timestamp_str = metadata.get('timestamp', '') try: doc_date = datetime.fromisoformat(timestamp_str) if doc_date < cutoff_date: ids_to_delete.append(doc_id) hash_to_remove = metadata.get('content_hash') if hash_to_remove: self.content_hashes.discard(hash_to_remove) except: continue if ids_to_delete: self.collection.delete(ids=ids_to_delete) print(f"✅ {len(ids_to_delete)} documents supprimés") return len(ids_to_delete) # ============================================================================ # GESTIONNAIRE DE MÉMOIRE D'AGENT # ============================================================================ class AgentMemoryManager: """Gère l'historique des conversations et résumés""" def __init__(self, memory_file: str = "./agent_memory.pkl", max_history: int = 100, compression_threshold: int = 50): """ Initialise le gestionnaire de mémoire d'agent Args: memory_file: Fichier de sauvegarde de la mémoire max_history: Nombre maximum d'entrées dans l'historique compression_threshold: Seuil pour compression de mémoire """ self.memory_file = Path(memory_file) self.max_history = max_history self.compression_threshold = compression_threshold # Structures de données self.conversation_history = deque(maxlen=max_history) self.research_cache = {} # topic -> result self.summary_cache = {} # topic -> summary self.topic_keywords = {} # topic -> keywords print(f"🧠 Initialisation du gestionnaire de mémoire d'agent...") self._load_memory() def _load_memory(self): """Charge la mémoire depuis le fichier""" if self.memory_file.exists(): try: with open(self.memory_file, 'rb') as f: data = pickle.load(f) self.conversation_history = data.get('conversation_history', deque(maxlen=self.max_history)) self.research_cache = data.get('research_cache', {}) self.summary_cache = data.get('summary_cache', {}) self.topic_keywords = data.get('topic_keywords', {}) print(f"✅ Mémoire chargée: {len(self.conversation_history)} conversations, " f"{len(self.research_cache)} recherches en cache") except Exception as e: print(f"⚠️ Erreur lors du chargement de la mémoire: {e}") else: print("ℹ️ Nouvelle mémoire initialisée") def _save_memory(self): """Sauvegarde la mémoire dans le fichier""" try: data = { 'conversation_history': self.conversation_history, 'research_cache': self.research_cache, 'summary_cache': self.summary_cache, 'topic_keywords': self.topic_keywords } with open(self.memory_file, 'wb') as f: pickle.dump(data, f) except Exception as e: print(f"⚠️ Erreur lors de la sauvegarde de la mémoire: {e}") def add_conversation(self, user_message: str, assistant_response: str, metadata: Optional[Dict] = None): """Ajoute une conversation à l'historique""" entry = { 'timestamp': datetime.now().isoformat(), 'user': user_message, 'assistant': assistant_response, 'metadata': metadata or {} } self.conversation_history.append(entry) # Compression si nécessaire if len(self.conversation_history) >= self.compression_threshold: self._compress_memory() self._save_memory() def add_research_result(self, topic: str, result: any, keywords: List[str]): """Cache un résultat de recherche""" self.research_cache[topic] = { 'result': result, 'timestamp': datetime.now().isoformat() } self.topic_keywords[topic] = keywords self._save_memory() def get_research_result(self, topic: str, max_age_hours: int = 24) -> Optional[any]: """Récupère un résultat de recherche en cache""" if topic not in self.research_cache: return None cached = self.research_cache[topic] cached_time = datetime.fromisoformat(cached['timestamp']) from datetime import timedelta if datetime.now() - cached_time > timedelta(hours=max_age_hours): print(f"ℹ️ Cache expiré pour '{topic}'") return None print(f"✅ Résultat récupéré du cache pour '{topic}'") return cached['result'] def add_summary(self, topic: str, summary: str): """Ajoute un résumé au cache""" self.summary_cache[topic] = { 'summary': summary, 'timestamp': datetime.now().isoformat() } self._save_memory() def get_conversation_context(self, n_last: int = 5) -> str: """Récupère le contexte des N dernières conversations""" recent = list(self.conversation_history)[-n_last:] if not recent: return "" context = "Contexte des conversations récentes:\n" for i, conv in enumerate(recent, 1): context += f"\n[Conversation {i}]\n" context += f"User: {conv['user'][:100]}...\n" context += f"Assistant: {conv['assistant'][:100]}...\n" return context def _compress_memory(self): """Compresse la mémoire en gardant seulement les éléments importants""" print("🗜️ Compression de la mémoire...") # Supprimer les anciennes recherches en cache (> 7 jours) from datetime import timedelta cutoff = datetime.now() - timedelta(days=7) topics_to_remove = [] for topic, data in self.research_cache.items(): if datetime.fromisoformat(data['timestamp']) < cutoff: topics_to_remove.append(topic) for topic in topics_to_remove: del self.research_cache[topic] if topic in self.topic_keywords: del self.topic_keywords[topic] print(f"✅ {len(topics_to_remove)} anciennes recherches supprimées") self._save_memory() def get_related_topics(self, topic: str, threshold: float = 0.5) -> List[str]: """Trouve les topics similaires dans l'historique""" from difflib import SequenceMatcher related = [] for cached_topic in self.research_cache.keys(): similarity = SequenceMatcher(None, topic.lower(), cached_topic.lower()).ratio() if similarity > threshold: related.append((cached_topic, similarity)) return [t for t, _ in sorted(related, key=lambda x: x[1], reverse=True)] def clear_all(self): """Réinitialise complètement la mémoire""" print("🗑️ Réinitialisation complète de la mémoire...") self.conversation_history.clear() self.research_cache.clear() self.summary_cache.clear() self.topic_keywords.clear() self._save_memory() print("✅ Mémoire réinitialisée") # ============================================================================ # GESTIONNAIRE INTÉGRÉ # ============================================================================ class IntegratedMemorySystem: """Système de mémoire intégré combinant vectoriel et agent""" def __init__(self): self.vector_memory = VectorMemoryManager() self.agent_memory = AgentMemoryManager() print("✨ Système de mémoire intégré initialisé\n") def process_research_result(self, topic: str, extraction_result: any, summarization_result: any, global_synthesis: any): """ Traite et stocke tous les résultats d'une recherche Args: topic: Sujet de la recherche extraction_result: Résultat de l'extraction summarization_result: Résultat des résumés global_synthesis: Synthèse globale """ print(f"\n💾 Stockage des résultats pour '{topic}'...") # 1. Stocker les documents extraits dans le vectorstore if extraction_result and hasattr(extraction_result, 'documents'): docs_to_store = [] for doc in extraction_result.documents: docs_to_store.append({ 'content': doc.content, 'title': doc.title, 'url': str(doc.url) }) self.vector_memory.add_documents(docs_to_store, source='research') # 2. Stocker les résumés if summarization_result and hasattr(summarization_result, 'summaries'): summaries_to_store = [] for summary in summarization_result.summaries: summaries_to_store.append({ 'content': summary.detailed_summary, 'title': summary.title, 'url': str(summary.url) }) self.vector_memory.add_documents(summaries_to_store, source='summary') # 3. Stocker la synthèse globale if global_synthesis and hasattr(global_synthesis, 'final_report'): synthesis_text = global_synthesis.final_report.formatted_outputs.get('text', '') self.vector_memory.add_documents([{ 'content': synthesis_text, 'title': f"Synthèse: {topic}", 'url': '' }], source='synthesis') # 4. Mettre en cache dans la mémoire agent keywords = [] if hasattr(extraction_result, 'documents'): # Extraire quelques mots-clés simples all_text = ' '.join([doc.content[:100] for doc in extraction_result.documents[:3]]) keywords = list(set(all_text.split()[:10])) self.agent_memory.add_research_result(topic, global_synthesis, keywords) print("✅ Tous les résultats stockés avec succès") def retrieve_context_for_query(self, query: str, use_cache: bool = True) -> Dict: """ Récupère le contexte pertinent pour une requête Args: query: Requête de l'utilisateur use_cache: Utiliser le cache si disponible Returns: Dict avec le contexte vectoriel et conversationnel """ context = { 'semantic_context': '', 'conversation_context': '', 'cached_result': None, 'related_topics': [] } # 1. Vérifier le cache if use_cache: context['cached_result'] = self.agent_memory.get_research_result(query) # 2. Recherche sémantique context['semantic_context'] = self.vector_memory.get_relevant_context(query, k=3) # 3. Contexte conversationnel context['conversation_context'] = self.agent_memory.get_conversation_context(n_last=3) # 4. Topics similaires context['related_topics'] = self.agent_memory.get_related_topics(query) return context # ============================================================================ # INITIALISATION GLOBALE # ============================================================================ # Instance globale du système de mémoire memory_system = IntegratedMemorySystem() print("="*60) print("✅ SYSTÈME DE MÉMOIRE PRÊT") print("="*60)