Spaces:
Sleeping
Sleeping
| """ | |
| Agent Summarizer - Analyse et résumé de documents. | |
| Crée des résumés structurés et des analyses approfondies des documents extraits. | |
| """ | |
| import asyncio | |
| from typing import List, Dict, Any, Optional | |
| from datetime import datetime | |
| import hashlib | |
| from src.agents.base_agent import BaseAgent | |
| from src.models.document_models import Document, DocumentSummary, SummarizationOutput, KeyPoint, Citation | |
| from src.models.state_models import AgentType | |
| from src.services.llm_service import LLMManager, LLMError | |
| from src.services.text_chunking import ChunkingManager, TextChunk | |
| from src.core.logging import setup_logger | |
| from config.prompts import SUMMARIZER_PROMPTS, SYSTEM_PROMPTS | |
| import hashlib | |
| import re | |
| class SummarizationInput: | |
| """Input pour l'agent Summarizer.""" | |
| def __init__( | |
| self, | |
| documents: List[Document], | |
| summary_options: Optional[Dict[str, Any]] = None | |
| ): | |
| self.documents = documents | |
| self.summary_options = summary_options or {} | |
| # Options par défaut | |
| self.include_sentiment = self.summary_options.get('include_sentiment', True) | |
| self.include_citations = self.summary_options.get('include_citations', True) | |
| self.max_key_points = self.summary_options.get('max_key_points', 5) | |
| self.detailed_analysis = self.summary_options.get('detailed_analysis', True) | |
| self.chunk_large_docs = self.summary_options.get('chunk_large_docs', True) | |
| self.max_doc_size = self.summary_options.get('max_doc_size', 8000) # caractères | |
| class SummarizerAgent(BaseAgent): | |
| """ | |
| Agent responsable de l'analyse et du résumé de documents. | |
| Fonctionnalités: | |
| - Résumé exécutif et détaillé | |
| - Extraction de points clés et arguments | |
| - Analyse de sentiment et biais | |
| - Gestion des documents longs via chunking | |
| - Citations et statistiques importantes | |
| - Évaluation de crédibilité | |
| """ | |
| def __init__( | |
| self, | |
| max_retries: int = 2, | |
| timeout: float = 600.0 # 10 minutes pour traiter plusieurs documents | |
| ): | |
| super().__init__( | |
| agent_type=AgentType.READER, | |
| name="summarizer", | |
| max_retries=max_retries, | |
| timeout=timeout | |
| ) | |
| # Services | |
| self.llm_manager = LLMManager() | |
| self.chunking_manager = ChunkingManager() | |
| # Configuration | |
| self.max_concurrent_summaries = 3 # maximum de résumés parallèles | |
| self.chunk_overlap_threshold = 6000 # Seuil pour le chunking en caractères | |
| def validate_input(self, input_data: SummarizationInput) -> bool: | |
| """ | |
| Valide les données d'entrée pour la summarization. | |
| Args: | |
| input_data: Input contenant les documents à résumer | |
| Returns: | |
| True si les données sont valides | |
| """ | |
| if not input_data.documents: | |
| self.logger.error("Aucun document fourni pour la summarization") | |
| return False | |
| if len(input_data.documents) > 20: # Limite raisonnable | |
| self.logger.error(f"Trop de documents ({len(input_data.documents)}), maximum 20") | |
| return False | |
| # Vérifier que les documents ont du contenu | |
| valid_docs = [doc for doc in input_data.documents if doc.content and doc.content.strip()] | |
| if not valid_docs: | |
| self.logger.error("Aucun document avec contenu valide") | |
| return False | |
| return True | |
| async def process(self, input_data: SummarizationInput) -> SummarizationOutput: | |
| """ | |
| Traite la summarization des documents. | |
| Args: | |
| input_data: Input contenant les documents à résumer | |
| Returns: | |
| SummarizationOutput avec tous les résumés | |
| """ | |
| start_time = datetime.now() | |
| self.logger.info(f"Début summarization de {len(input_data.documents)} documents") | |
| # Filtrer les documents valides | |
| valid_documents = [doc for doc in input_data.documents if doc.content and doc.content.strip()] | |
| self.logger.info(f"Documents valides à traiter: {len(valid_documents)}") | |
| try: | |
| # Traitement parallèle des résumés | |
| summaries = await self._summarize_all_documents(valid_documents, input_data) | |
| # Analyse globale | |
| global_analysis = await self._perform_global_analysis(summaries) | |
| # Calcul des métriques | |
| total_processing_time = (datetime.now() - start_time).total_seconds() | |
| average_credibility = self._calculate_average_credibility(summaries) | |
| # Création du résultat | |
| result = SummarizationOutput( | |
| summaries=summaries, | |
| total_documents=len(input_data.documents), | |
| total_processing_time=total_processing_time, | |
| average_credibility=average_credibility, | |
| common_themes=global_analysis.get('common_themes', []), | |
| consensus_points=global_analysis.get('consensus_points', []), | |
| conflicting_views=global_analysis.get('conflicting_views', []) | |
| ) | |
| self.logger.info( | |
| f"Summarization terminée: {len(summaries)} résumés créés en {total_processing_time:.2f}s" | |
| ) | |
| return result | |
| except Exception as e: | |
| self.logger.error(f"Erreur lors de la summarization: {str(e)}") | |
| raise | |
| async def _summarize_all_documents( | |
| self, | |
| documents: List[Document], | |
| input_data: SummarizationInput | |
| ) -> List[DocumentSummary]: | |
| """Résume tous les documents en parallèle.""" | |
| semaphore = asyncio.Semaphore(self.max_concurrent_summaries) | |
| async def summarize_single(doc: Document) -> DocumentSummary: | |
| async with semaphore: | |
| try: | |
| return await self._summarize_document(doc, input_data) | |
| except Exception as e: | |
| self.logger.error(f"Erreur résumé document {doc.title}: {e}") | |
| # Créer un résumé d'erreur minimal | |
| return self._create_error_summary(doc, str(e)) | |
| # Lancer tous les résumés en parallèle | |
| tasks = [summarize_single(doc) for doc in documents] | |
| summaries = await asyncio.gather(*tasks, return_exceptions=True) | |
| # Filtrer les résultats valides | |
| valid_summaries = [] | |
| for summary in summaries: | |
| if isinstance(summary, DocumentSummary): | |
| valid_summaries.append(summary) | |
| else: | |
| self.logger.error(f"Résumé invalide: {summary}") | |
| return valid_summaries | |
| async def _summarize_document(self, document: Document, input_data: SummarizationInput) -> DocumentSummary: | |
| """Résume un document individuel.""" | |
| start_time = datetime.now() | |
| doc_id = self._generate_document_id(document) | |
| self.logger.info(f"Résumé document: {document.title} ({len(document.content)} caractères)") | |
| # Décider si chunking nécessaire | |
| if (input_data.chunk_large_docs and | |
| len(document.content) > self.chunk_overlap_threshold): | |
| summary = await self._summarize_large_document(document, input_data) | |
| else: | |
| summary = await self._summarize_standard_document(document, input_data) | |
| # Finaliser le résumé | |
| processing_time = (datetime.now() - start_time).total_seconds() | |
| summary.document_id = doc_id | |
| summary.processing_time = processing_time | |
| summary.processed_at = datetime.now() | |
| return summary | |
| async def _summarize_standard_document( | |
| self, | |
| document: Document, | |
| input_data: SummarizationInput | |
| ) -> DocumentSummary: | |
| """Résume un document de taille standard.""" | |
| # Préparer le contexte | |
| context = { | |
| 'title': document.title, | |
| 'author': document.author or "Non spécifié", | |
| 'url': str(document.url), | |
| 'content': document.content | |
| } | |
| # Tâches parallèles | |
| tasks = [] | |
| # 1. Résumé exécutif | |
| exec_prompt = SUMMARIZER_PROMPTS['executive_summary'].format(**context) | |
| tasks.append(self._get_llm_response(exec_prompt, "executive_summary")) | |
| # 2. Analyse détaillée | |
| if input_data.detailed_analysis: | |
| detailed_prompt = SUMMARIZER_PROMPTS['detailed_analysis'].format(**context) | |
| tasks.append(self._get_llm_response(detailed_prompt, "detailed_analysis")) | |
| # 3. Analyse de sentiment (optionnelle) | |
| if input_data.include_sentiment: | |
| sentiment_prompt = SUMMARIZER_PROMPTS['sentiment_analysis'].format(**context) | |
| tasks.append(self._get_llm_response(sentiment_prompt, "sentiment_analysis")) | |
| # Exécuter les tâches | |
| results = await asyncio.gather(*tasks, return_exceptions=True) | |
| # Parser les résultats | |
| executive_summary = "" | |
| detailed_summary = "" | |
| key_points = [] | |
| sentiment = None | |
| credibility_score = None | |
| for i, result in enumerate(results): | |
| if isinstance(result, Exception): | |
| self.logger.error(f"Erreur tâche {i}: {result}") | |
| continue | |
| task_type, content = result | |
| if task_type == "executive_summary": | |
| executive_summary = content | |
| elif task_type == "detailed_analysis": | |
| # Parser l'analyse détaillée | |
| parsed = self._parse_detailed_analysis(content) | |
| detailed_summary = parsed.get('summary', content) | |
| key_points = parsed.get('key_points', []) | |
| elif task_type == "sentiment_analysis": | |
| # Parser l'analyse de sentiment | |
| parsed = self._parse_sentiment_analysis(content) | |
| sentiment = parsed.get('sentiment') | |
| credibility_score = parsed.get('credibility_score') | |
| # Créer le résumé | |
| summary = DocumentSummary( | |
| document_id="", # Sera rempli plus tard | |
| title=document.title, | |
| url=document.url, | |
| executive_summary=executive_summary, | |
| detailed_summary=detailed_summary, | |
| key_points=key_points[:input_data.max_key_points], | |
| sentiment=sentiment, | |
| credibility_score=credibility_score | |
| ) | |
| return summary | |
| async def _summarize_large_document( | |
| self, | |
| document: Document, | |
| input_data: SummarizationInput | |
| ) -> DocumentSummary: | |
| """Résume un document long via chunking.""" | |
| self.logger.info(f"Chunking document long: {document.title}") | |
| # Découper le document | |
| chunks = self.chunking_manager.chunk_document( | |
| document.content, | |
| strategy="default", | |
| preserve_structure=True | |
| ) | |
| self.logger.info(f"Document découpé en {len(chunks)} chunks") | |
| # Résumer chaque chunk | |
| chunk_summaries = await self._summarize_chunks(chunks, document) | |
| # Synthétiser les résumés partiels | |
| synthesis = await self._synthesize_chunk_summaries(chunk_summaries, document) | |
| return synthesis | |
| async def _summarize_chunks(self, chunks: List[TextChunk], document: Document) -> List[str]: | |
| """Résume chaque chunk individuellement en parallèle.""" | |
| async def summarize_chunk(chunk: TextChunk) -> str: | |
| context = { | |
| 'title': document.title, | |
| 'chunk_index': chunk.chunk_id, | |
| 'total_chunks': chunk.total_chunks, | |
| 'chunk_content': chunk.content | |
| } | |
| prompt = SUMMARIZER_PROMPTS['chunked_summary'].format(**context) | |
| try: | |
| return await self.llm_manager.get_completion( | |
| prompt, | |
| system_prompt=SYSTEM_PROMPTS['summarizer'] | |
| ) | |
| except Exception as e: | |
| self.logger.error(f"Erreur résumé chunk {chunk.chunk_id}: {e}") | |
| return f"Erreur résumé chunk {chunk.chunk_id}" | |
| # Parallélisation sur tous les chunks | |
| tasks = [summarize_chunk(chunk) for chunk in chunks] | |
| summaries = await asyncio.gather(*tasks) | |
| return summaries | |
| async def _synthesize_chunk_summaries( | |
| self, | |
| chunk_summaries: List[str], | |
| document: Document | |
| ) -> DocumentSummary: | |
| """Synthétise les résumés de chunks en un résumé unifié.""" | |
| # Combiner tous les résumés partiels | |
| combined_summaries = "\n\n".join([ | |
| f"Partie {i+1}: {summary}" | |
| for i, summary in enumerate(chunk_summaries) | |
| ]) | |
| context = { | |
| 'partial_summaries': combined_summaries, | |
| 'title': document.title, | |
| 'url': str(document.url) | |
| } | |
| # Synthèse finale | |
| synthesis_prompt = SUMMARIZER_PROMPTS['synthesis'].format(**context) | |
| try: | |
| synthesis_result = await self.llm_manager.get_completion( | |
| synthesis_prompt, | |
| system_prompt=SYSTEM_PROMPTS['summarizer'] | |
| ) | |
| # Parser le résultat de synthèse | |
| parsed = self._parse_synthesis_result(synthesis_result) | |
| summary = DocumentSummary( | |
| document_id="", | |
| title=document.title, | |
| url=document.url, | |
| executive_summary=parsed.get('executive_summary', ''), | |
| detailed_summary=parsed.get('detailed_summary', ''), | |
| key_points=parsed.get('key_points', []), | |
| sentiment=parsed.get('sentiment'), | |
| credibility_score=parsed.get('credibility_score') | |
| ) | |
| return summary | |
| except Exception as e: | |
| self.logger.error(f"Erreur synthèse finale: {e}") | |
| # Fallback: créer un résumé basique | |
| return self._create_basic_summary_from_chunks(chunk_summaries, document) | |
| async def _get_llm_response(self, prompt: str, task_type: str) -> tuple: | |
| """Obtient une réponse LLM pour une tâche spécifique.""" | |
| try: | |
| response = await self.llm_manager.get_completion( | |
| prompt, | |
| system_prompt=SYSTEM_PROMPTS['summarizer'], | |
| temperature=0.3, | |
| max_tokens=2000 | |
| ) | |
| return task_type, response | |
| except Exception as e: | |
| self.logger.error(f"Erreur LLM pour {task_type}: {e}") | |
| return task_type, f"Erreur: {str(e)}" | |
| def _parse_detailed_analysis(self, content: str) -> Dict[str, Any]: | |
| """Parse l'analyse détaillée pour extraire les composants.""" | |
| # Implémentation simplifiée - à améliorer selon le format de réponse | |
| result = {'summary': content, 'key_points': []} | |
| # Chercher les points clés (format: - Point clé) | |
| import re | |
| key_point_pattern = r'^[-•]\s*(.+)$' | |
| lines = content.split('\n') | |
| current_key_points = [] | |
| for line in lines: | |
| match = re.match(key_point_pattern, line.strip()) | |
| if match: | |
| point_text = match.group(1).strip() | |
| if len(point_text) > 10: # Filtrer les points trop courts | |
| key_point = KeyPoint( | |
| title=point_text[:50] + "..." if len(point_text) > 50 else point_text, | |
| content=point_text, | |
| importance=0.8, # Score par défaut | |
| category="general" | |
| ) | |
| current_key_points.append(key_point) | |
| result['key_points'] = current_key_points | |
| return result | |
| def _parse_sentiment_analysis(self, content: str) -> Dict[str, Any]: | |
| """Parse l'analyse de sentiment.""" | |
| result = {} | |
| # Extraction simplifiée | |
| content_lower = content.lower() | |
| if 'positif' in content_lower: | |
| result['sentiment'] = 'positif' | |
| elif 'négatif' in content_lower: | |
| result['sentiment'] = 'négatif' | |
| else: | |
| result['sentiment'] = 'neutre' | |
| # Chercher un score de crédibilité | |
| import re | |
| # Chercher un pattern comme "Crédibilité: 0.8" ou "0.8" | |
| credibility_pattern = r'crédibilité\s*:?\s*(\d+(?:\.\d+)?)|(\d+(?:\.\d+)?)\s*\/\s*[1510]|(\d+(?:\.\d+)?)\s*%' | |
| match = re.search(credibility_pattern, content_lower) | |
| if match: | |
| score = float(match.group(1) or match.group(2) or match.group(3)) | |
| if score > 1: # Si en pourcentage | |
| score = score / 100 | |
| result['credibility_score'] = min(max(score, 0.0), 1.0) | |
| else: | |
| result['credibility_score'] = 0.5 # Valeur par défaut | |
| return result | |
| return result | |
| def _parse_synthesis_result(self, content: str) -> Dict[str, Any]: | |
| """Parse le résultat de synthèse.""" | |
| # Version simplifiée - à améliorer | |
| return { | |
| 'executive_summary': content[:200] + "..." if len(content) > 200 else content, | |
| 'detailed_summary': content, | |
| 'key_points': [], | |
| 'sentiment': 'neutre', | |
| 'credibility_score': 0.7 | |
| } | |
| def _create_basic_summary_from_chunks( | |
| self, | |
| chunk_summaries: List[str], | |
| document: Document | |
| ) -> DocumentSummary: | |
| """Crée un résumé basique à partir des résumés de chunks.""" | |
| combined = " ".join(chunk_summaries) | |
| return DocumentSummary( | |
| document_id="", | |
| title=document.title, | |
| url=document.url, | |
| executive_summary=combined[:200] + "..." if len(combined) > 200 else combined, | |
| detailed_summary=combined, | |
| key_points=[], | |
| sentiment="neutre", | |
| credibility_score=0.5 | |
| ) | |
| def _create_error_summary(self, document: Document, error: str) -> DocumentSummary: | |
| """Crée un résumé d'erreur minimal.""" | |
| return DocumentSummary( | |
| document_id=self._generate_document_id(document), | |
| title=document.title, | |
| url=document.url, | |
| executive_summary=f"Erreur lors du résumé: {error}", | |
| detailed_summary=f"Le résumé de ce document n'a pas pu être généré: {error}", | |
| key_points=[], | |
| sentiment=None, | |
| credibility_score=None | |
| ) | |
| def _generate_document_id(self, document: Document) -> str: | |
| """Génère un ID unique pour un document.""" | |
| content_hash = hashlib.md5(f"{document.url}{document.title}".encode()).hexdigest() | |
| return f"doc_{content_hash[:8]}" | |
| async def _perform_global_analysis(self, summaries: List[DocumentSummary]) -> Dict[str, List[str]]: | |
| """Effectue une analyse globale de tous les résumés.""" | |
| if len(summaries) < 2: | |
| return {'common_themes': [], 'consensus_points': [], 'conflicting_views': []} | |
| # Combiner tous les résumés pour l'analyse | |
| all_summaries = "\n\n".join([ | |
| f"Document: {s.title}\nRésumé: {s.detailed_summary}" | |
| for s in summaries | |
| ]) | |
| # Prompt d'analyse globale | |
| global_prompt = f""" | |
| Analyse les résumés de documents suivants et identifie: | |
| 1. **Thèmes communs** : Les sujets qui reviennent dans plusieurs documents | |
| 2. **Points de consensus** : Les idées sur lesquelles les sources s'accordent | |
| 3. **Points conflictuels** : Les idées contradictoires entre les sources | |
| RÉSUMÉS: | |
| {all_summaries} | |
| Format ta réponse avec des sections claires et des listes à puces. | |
| """ | |
| try: | |
| response = await self.llm_manager.get_completion( | |
| global_prompt, | |
| system_prompt="Tu es un expert en analyse comparative de documents." | |
| ) | |
| # Parser la réponse (implémentation simplifiée) | |
| return self._parse_global_analysis(response) | |
| except Exception as e: | |
| self.logger.error(f"Erreur analyse globale: {e}") | |
| return {'common_themes': [], 'consensus_points': [], 'conflicting_views': []} | |
| def _parse_global_analysis(self, content: str) -> Dict[str, List[str]]: | |
| """Parse l'analyse globale.""" | |
| # Implémentation simplifiée | |
| lines = content.split('\n') | |
| result = { | |
| 'common_themes': [], | |
| 'consensus_points': [], | |
| 'conflicting_views': [] | |
| } | |
| current_section = None | |
| for line in lines: | |
| line = line.strip() | |
| if not line: | |
| continue | |
| # Détecter les sections | |
| if 'thème' in line.lower() or 'theme' in line.lower(): | |
| current_section = 'common_themes' | |
| elif 'consensus' in line.lower(): | |
| current_section = 'consensus_points' | |
| elif 'conflict' in line.lower() or 'contradictoire' in line.lower(): | |
| current_section = 'conflicting_views' | |
| elif line.startswith('-') or line.startswith('•'): | |
| # Point de liste | |
| if current_section: | |
| point = line[1:].strip() | |
| if len(point) > 5: # Filtrer les points trop courts | |
| result[current_section].append(point) | |
| return result | |
| def _calculate_average_credibility(self, summaries: List[DocumentSummary]) -> Optional[float]: | |
| """Calcule le score de crédibilité moyen.""" | |
| scores = [s.credibility_score for s in summaries if s.credibility_score is not None] | |
| if not scores: | |
| return None | |
| return sum(scores) / len(scores) | |
| #fonction summary from content extraction result | |
| async def process_from_extraction_result( | |
| self, | |
| extraction_result: 'ExtractionResult' | |
| ) -> SummarizationOutput: | |
| """ | |
| Traite la summarization à partir d'un ExtractionResult. | |
| Args: | |
| extraction_result: Résultat de l'extraction de contenu | |
| Returns: | |
| SummarizationOutput avec tous les résumés | |
| """ | |
| # Préparer l'input de summarization | |
| input_data = SummarizationInput( | |
| documents=extraction_result.documents, | |
| summary_options={ | |
| 'include_sentiment': True, | |
| 'include_citations': True, | |
| 'max_key_points': 5, | |
| 'detailed_analysis': True, | |
| 'chunk_large_docs': True | |
| } | |
| ) | |
| if not self.validate_input(input_data): | |
| self.logger.error("Input ExtractionResult invalide pour la summarization") | |
| raise ValueError("Input ExtractionResult invalide pour la summarization") | |
| # Appeler le processus principal de summarization | |
| return await self.process(input_data) | |
| # Exemple d'utilisation | |
| if __name__ == "__main__": | |
| import asyncio | |
| import json | |
| from src.models.document_models import ExtractionResult | |
| def save_summarization_output(output, filename=None): | |
| """Sauvegarde un SummarizationOutput au format JSON.""" | |
| from datetime import datetime | |
| if not filename: | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| filename = f"summarization_output_{len(output.summaries)}docs_{timestamp}.json" | |
| try: | |
| with open(filename, 'w', encoding='utf-8') as f: | |
| json.dump(output.model_dump(mode='json'), f, indent=2, ensure_ascii=False) | |
| return filename | |
| except Exception as e: | |
| print(f"Erreur lors de la sauvegarde: {e}") | |
| return None | |
| async def summarize_from_extraction_file(): | |
| # Charger ExtractionResult | |
| extraction_file = "extraction_result_2docs_20251116_141527.json" | |
| try: | |
| with open(extraction_file, 'r', encoding='utf-8') as f: | |
| extraction_data = json.load(f) | |
| extraction_result = ExtractionResult(**extraction_data) | |
| except Exception as e: | |
| print(f"Erreur chargement ExtractionResult: {e}") | |
| return | |
| summarizer = SummarizerAgent() | |
| output = await summarizer.process_from_extraction_result(extraction_result) | |
| # Sauvegarde | |
| filename = save_summarization_output(output) | |
| if filename: | |
| print(f"✅ Résumés sauvegardés dans: {filename}") | |
| else: | |
| print("❌ Erreur lors de la sauvegarde du résumé.") | |
| # Affichage rapide | |
| for summary in output.summaries: | |
| print(f"\nRésumé pour {summary.title}:") | |
| print(f"Résumé exécutif: {summary.executive_summary[:200]}...") | |
| print(f"Points clés: {[kp.title for kp in summary.key_points]}") | |
| print(f"Sentiment: {summary.sentiment}") | |
| print(f"Score de crédibilité: {summary.credibility_score}") | |
| print(f"Temps total de traitement: {output.total_processing_time:.2f}s") | |
| print(f"Score de crédibilité moyen: {output.average_credibility}") | |
| asyncio.run(summarize_from_extraction_file()) |