""" Agent Content Extractor - Extraction et nettoyage de contenu web. Extrait le contenu de pages web, PDFs et autres documents. """ import asyncio from typing import List, Optional from datetime import datetime from src.agents.base_agent import BaseAgent from src.models.document_models import Document, ExtractionInput, ExtractionResult from src.models.research_models import ResearchOutput from src.models.state_models import AgentState, AgentType from src.services.content_extraction import ContentExtractionManager, ContentExtractionError from src.core.logging import setup_logger class ContentExtractorAgent(BaseAgent[ExtractionInput, ExtractionResult]): """ Agent responsable de l'extraction de contenu depuis des URLs. Fonctionnalités: - Extraction de contenu HTML avec nettoyage intelligent - Support des PDFs et autres formats - Traitement parallèle de plusieurs URLs - Gestion des erreurs et retry automatique - Structuration et nettoyage du contenu """ def __init__(self, max_concurrent_extractions: int = 5, max_retries: int = 2): super().__init__( agent_type=AgentType.CONTENT_EXTRACTOR, name="content_extractor", max_retries=max_retries, timeout=300.0 # 5 minutes ) self.extraction_manager = ContentExtractionManager( max_concurrent=max_concurrent_extractions, max_retries=max_retries ) def validate_input(self, input_data: ExtractionInput) -> bool: """ Valide les données d'entrée pour l'extraction. Args: input_data: Input contenant les URLs à extraire Returns: True si les données sont valides """ if not input_data.urls: self.logger.error("Aucune URL fournie pour l'extraction") return False if len(input_data.urls) > 50: # Limite raisonnable self.logger.error(f"Trop d'URLs ({len(input_data.urls)}), maximum 50") return False # Filtrer les URLs valides valid_urls = self._filter_valid_urls(input_data.urls) if not valid_urls: self.logger.error("Aucune URL valide trouvée") return False return True async def process_from_research_output(self, research_output: ResearchOutput) -> ExtractionResult: """ Traite directement un ResearchOutput pour extraire le contenu des URLs. Args: research_output: Résultats de recherche avec URLs à extraire Returns: ExtractionResult avec les documents extraits """ # Extraire les URLs des résultats de recherche (conversion en string) urls = [str(result.url) for result in research_output.results] self.logger.info(f"Extraction de contenu depuis ResearchOutput: {len(urls)} URLs") self.logger.info(f"Sujet de recherche: {research_output.query.topic}") # Créer l'input d'extraction extraction_input = ExtractionInput( urls=urls, content_filters={ 'min_content_length': 200, # Minimum de contenu 'max_content_length': 50000, # Maximum pour éviter les textes trop longs 'required_keywords': research_output.query.keywords # Filtrer par mots-clés de recherche }, extraction_options={ 'source_query': research_output.query.topic, 'search_keywords': research_output.query.keywords } ) # Traiter avec la méthode normale return await self.process(extraction_input) async def process(self, input_data: ExtractionInput) -> ExtractionResult: """ Exécute l'extraction de contenu pour les URLs fournies. Args: input_data: Input contenant les URLs à extraire et les options Returns: ExtractionResult avec les documents extraits Raises: ValueError: Si les URLs sont invalides ContentExtractionError: Si l'extraction échoue """ start_time = datetime.now() self.logger.info(f"Début extraction de contenu pour {len(input_data.urls)} URLs") # Filtrer les URLs valides (validation déjà faite dans validate_input) valid_urls = self._filter_valid_urls(input_data.urls) self.logger.info(f"URLs valides à traiter: {len(valid_urls)}/{len(input_data.urls)}") try: # Extraction du contenu documents = await self._extract_all_content(valid_urls, input_data) # Post-traitement des documents processed_documents = self._post_process_documents(documents, input_data) # Calcul des statistiques execution_time = (datetime.now() - start_time).total_seconds() # Identifier les URLs qui ont échoué successful_urls = {str(doc.url) for doc in processed_documents} failed_urls = [url for url in valid_urls if url not in successful_urls] # Création du résultat result = ExtractionResult( documents=processed_documents, total_urls=len(input_data.urls), successful_extractions=len(processed_documents), failed_extractions=len(input_data.urls) - len(processed_documents), failed_urls=failed_urls, execution_time=execution_time, extraction_stats=self._calculate_stats(processed_documents) ) self.logger.info( f"Extraction terminée: {result.successful_extractions}/{result.total_urls} " f"succès en {execution_time:.2f}s" ) return result except Exception as e: self.logger.error(f"Erreur lors de l'extraction: {str(e)}") raise ContentExtractionError(f"Échec de l'extraction de contenu: {str(e)}") def _filter_valid_urls(self, urls: List[str]) -> List[str]: """Filtre et valide les URLs.""" import re from urllib.parse import urlparse valid_urls = [] url_pattern = re.compile( r'^https?://' # http:// ou https:// r'(?:(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+[A-Z]{2,6}\.?|' # domain r'localhost|' # localhost r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})' # IP r'(?::\d+)?' # port r'(?:/?|[/?]\S+)$', re.IGNORECASE) for url in urls: if not url or not isinstance(url, str): self.logger.warning(f"URL invalide ignorée: {url}") continue url = url.strip() if not url: continue # Validation du format if not url_pattern.match(url): self.logger.warning(f"Format URL invalide: {url}") continue # Validation avec urlparse try: parsed = urlparse(url) if not parsed.netloc: self.logger.warning(f"URL sans domaine: {url}") continue valid_urls.append(url) except Exception as e: self.logger.warning(f"Erreur de parsing URL {url}: {e}") continue return valid_urls async def _extract_all_content(self, urls: List[str], input_data: ExtractionInput) -> List[Document]: """Extrait le contenu de toutes les URLs.""" try: # Utiliser le gestionnaire d'extraction documents = await self.extraction_manager.extract_multiple(urls) # Appliquer les filtres si spécifiés if input_data.content_filters: documents = self._apply_content_filters(documents, input_data.content_filters) return documents except Exception as e: self.logger.error(f"Erreur lors de l'extraction multiple: {str(e)}") raise def _apply_content_filters(self, documents: List[Document], filters: dict) -> List[Document]: """Applique les filtres de contenu aux documents.""" filtered_documents = [] for doc in documents: # Filtrer par longueur minimale min_length = filters.get('min_content_length', 100) if len(doc.content) < min_length: self.logger.debug(f"Document {doc.title} trop court: {len(doc.content)} caractères") continue # Filtrer par longueur maximale max_length = filters.get('max_content_length', 100000) if len(doc.content) > max_length: self.logger.debug(f"Document {doc.title} trop long, troncature") doc.content = doc.content[:max_length] + "... [Contenu tronqué]" # Filtrer par langue si spécifiée required_language = filters.get('language') if required_language and doc.language != required_language: self.logger.debug(f"Document {doc.title} ignoré: langue {doc.language}") continue # Filtrer par mots-clés si spécifiés required_keywords = filters.get('required_keywords', []) if required_keywords: content_lower = doc.content.lower() if not any(keyword.lower() in content_lower for keyword in required_keywords): self.logger.debug(f"Document {doc.title} ignoré: mots-clés manquants") continue filtered_documents.append(doc) self.logger.info(f"Filtres appliqués: {len(filtered_documents)}/{len(documents)} documents retenus") return filtered_documents def _post_process_documents(self, documents: List[Document], input_data: ExtractionInput) -> List[Document]: """Post-traitement des documents extraits.""" processed_docs = [] for doc in documents: # Nettoyage supplémentaire du contenu doc.content = self._clean_content(doc.content) # Recalcul du nombre de mots après nettoyage doc.word_count = len(doc.content.split()) # Validation finale if self._is_valid_document(doc, input_data): processed_docs.append(doc) else: self.logger.debug(f"Document {doc.title} rejeté lors de la validation finale") return processed_docs def _clean_content(self, content: str) -> str: """Nettoyage avancé du contenu.""" import re if not content: return "" # Supprimer les caractères de contrôle content = re.sub(r'[\x00-\x08\x0B\x0C\x0E-\x1F\x7F]', '', content) # Normaliser les espaces content = re.sub(r'[ \t]+', ' ', content) # Normaliser les sauts de ligne content = re.sub(r'\n\s*\n\s*\n+', '\n\n', content) # Supprimer les espaces en début et fin de lignes lines = content.split('\n') lines = [line.strip() for line in lines] content = '\n'.join(lines) # Supprimer les lignes vides multiples content = re.sub(r'\n{3,}', '\n\n', content) return content.strip() def _is_valid_document(self, doc: Document, input_data: ExtractionInput) -> bool: """Valide un document extrait.""" # Vérifications de base if not doc.content or not doc.content.strip(): return False if len(doc.content) < 50: # Contenu trop court return False # Vérification du ratio texte/contenu (détecter les pages avec peu de contenu) if doc.word_count < 20: return False # Vérifications spécifiques aux options d'entrée if hasattr(input_data, 'min_quality_score'): quality_score = self._calculate_content_quality(doc) if quality_score < input_data.min_quality_score: return False return True def _calculate_content_quality(self, doc: Document) -> float: """Calcule un score de qualité pour le contenu (0-1).""" score = 0.0 # Points pour la longueur if doc.word_count > 100: score += 0.3 elif doc.word_count > 50: score += 0.1 # Points pour la structure if doc.title and len(doc.title) > 10: score += 0.2 if doc.author: score += 0.1 if doc.published_date: score += 0.1 # Points pour la richesse du contenu content = doc.content.lower() if any(marker in content for marker in ['conclusion', 'introduction', 'sommaire']): score += 0.2 # Pénalité pour contenu répétitif lines = doc.content.split('\n') unique_lines = set(line.strip() for line in lines if line.strip()) if len(lines) > 0: uniqueness_ratio = len(unique_lines) / len(lines) if uniqueness_ratio < 0.5: score -= 0.2 return max(0.0, min(1.0, score)) def _calculate_stats(self, documents: List[Document]) -> dict: """Calcule les statistiques d'extraction.""" if not documents: return { 'total_words': 0, 'average_words_per_doc': 0, 'doc_types': {}, 'languages': {}, 'has_authors': 0, 'has_dates': 0 } total_words = sum(doc.word_count for doc in documents) # Compter les types de documents doc_types = {} for doc in documents: doc_type = doc.doc_type.value if doc.doc_type else 'unknown' doc_types[doc_type] = doc_types.get(doc_type, 0) + 1 # Compter les langues languages = {} for doc in documents: lang = doc.language or 'unknown' languages[lang] = languages.get(lang, 0) + 1 # Compter les métadonnées has_authors = sum(1 for doc in documents if doc.author) has_dates = sum(1 for doc in documents if doc.published_date) return { 'total_words': total_words, 'average_words_per_doc': total_words // len(documents), 'doc_types': doc_types, 'languages': languages, 'has_authors': has_authors, 'has_dates': has_dates } # Fonction utilitaire pour les tests async def extract_content_from_urls(urls: List[str], **options) -> List[Document]: """ Fonction utilitaire pour extraire du contenu depuis une liste d'URLs. Args: urls: Liste des URLs à extraire **options: Options d'extraction (filters, etc.) Returns: Liste des documents extraits """ agent = ContentExtractorAgent() input_data = ExtractionInput( urls=urls, content_filters=options.get('content_filters', {}), extraction_options=options.get('extraction_options', {}) ) result = await agent.execute(input_data) return result.documents # Fonction utilitaire pour l'intégration avec le Researcher async def extract_from_search_results(search_results: List[dict]) -> List[Document]: """ Extrait le contenu depuis des résultats de recherche. Args: search_results: Résultats de recherche avec URLs Returns: Liste des documents extraits """ urls = [] for result in search_results: if isinstance(result, dict) and 'url' in result: urls.append(result['url']) elif hasattr(result, 'url'): urls.append(result.url) if not urls: return [] return await extract_content_from_urls(urls) # Fonctions utilitaires pour la sauvegarde def save_extraction_result(result: ExtractionResult, filename: str = None) -> str: """ Sauvegarde un ExtractionResult dans un fichier JSON. Args: result: Résultat d'extraction à sauvegarder filename: Nom du fichier (optionnel) Returns: Nom du fichier sauvegardé """ import json from datetime import datetime if not filename: # Générer un nom de fichier basé sur le nombre de documents et timestamp timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") filename = f"extraction_result_{result.successful_extractions}docs_{timestamp}.json" try: # Conversion en dictionnaire avec sérialisation des dates result_dict = result.model_dump(mode='json') # Sauvegarde dans le fichier with open(filename, 'w', encoding='utf-8') as f: json.dump(result_dict, f, indent=2, ensure_ascii=False) return filename except Exception as e: raise Exception(f"Erreur lors de la sauvegarde: {e}") def load_extraction_result(filename: str) -> ExtractionResult: """ Charge un ExtractionResult depuis un fichier JSON. Args: filename: Nom du fichier à charger Returns: ExtractionResult chargé """ import json try: with open(filename, 'r', encoding='utf-8') as f: data = json.load(f) # Reconstruction de l'ExtractionResult return ExtractionResult(**data) except Exception as e: raise Exception(f"Erreur lors du chargement: {e}") # Configuration du logger pour l'agent logger = setup_logger("ContentExtractorAgent") # Exemple d'utilisation if __name__ == "__main__": import asyncio import json from src.models.research_models import ResearchOutput async def test_with_research_output(): """Test avec un fichier ResearchOutput sauvegardé.""" # Charger le ResearchOutput depuis le fichier JSON le plus récent research_file = "research_output_impact_de_lintelligence_artifi_20251116_141136.json" try: # Charger le ResearchOutput with open(research_file, 'r', encoding='utf-8') as f: research_data = json.load(f) research_output = ResearchOutput(**research_data) logger.info(f"=== CHARGEMENT DU RESEARCH OUTPUT ===") logger.info(f"Sujet: {research_output.query.topic}") logger.info(f"URLs à extraire: {len(research_output.results)}") # Créer l'agent et traiter agent = ContentExtractorAgent() logger.info(f"=== DÉBUT DE L'EXTRACTION DE CONTENU ===") extraction_result = await agent.process_from_research_output(research_output) logger.info(f"=== RÉSULTATS D'EXTRACTION ===") logger.info(f"URLs traitées: {extraction_result.total_urls}") logger.info(f"Extractions réussies: {extraction_result.successful_extractions}") logger.info(f"Extractions échouées: {extraction_result.failed_extractions}") logger.info(f"Temps d'exécution: {extraction_result.execution_time:.2f}s") # Afficher les détails des documents extraits for i, doc in enumerate(extraction_result.documents, 1): logger.info(f"\n{i}. {doc.title}") logger.info(f" URL: {doc.url}") logger.info(f" Mots: {doc.word_count}") logger.info(f" Langue: {doc.language}") logger.info(f" Type: {doc.doc_type}") logger.info(f" Contenu (aperçu): {doc.content[:200]}...") # URLs qui ont échoué if extraction_result.failed_urls: logger.info(f"\n❌ URLs en échec:") for url in extraction_result.failed_urls: logger.info(f" • {url}") # === SAUVEGARDE DE L'EXTRACTION RESULT === logger.info(f"\n=== SAUVEGARDE DE L'EXTRACTION RESULT ===") try: filename = save_extraction_result(extraction_result) logger.info(f"✅ ExtractionResult sauvegardé dans: {filename}") # Affichage du contenu sauvegardé logger.info("📄 Contenu sauvegardé:") logger.info(f" • Documents extraits: {len(extraction_result.documents)}") logger.info(f" • Temps d'extraction: {extraction_result.execution_time:.2f}s") logger.info(f" • Statistiques: {extraction_result.extraction_stats}") # Test de chargement pour vérifier l'intégrité logger.info("=== Test de chargement ===") loaded_result = load_extraction_result(filename) logger.info(f"✅ ExtractionResult rechargé avec succès") logger.info(f" • Vérification: {len(loaded_result.documents)} documents chargés") # Comparaison des données if loaded_result.successful_extractions == extraction_result.successful_extractions: logger.info("✅ Intégrité des données vérifiée") else: logger.error("❌ Erreur d'intégrité des données") # Affichage du format JSON pour référence logger.info("\n📋 EXEMPLE DE FORMAT JSON SAUVEGARDÉ:") logger.info("-" * 50) # Créer un exemple compact pour l'affichage example_result = { "documents": [ { "title": doc.title, "url": str(doc.url), "content": doc.content[:200] + "...", "word_count": doc.word_count, "language": doc.language, "doc_type": doc.doc_type.value if doc.doc_type else None } for doc in extraction_result.documents[:2] # Limiter à 2 documents ], "total_urls": extraction_result.total_urls, "successful_extractions": extraction_result.successful_extractions, "failed_extractions": extraction_result.failed_extractions, "failed_urls": extraction_result.failed_urls, "execution_time": extraction_result.execution_time, "extraction_stats": extraction_result.extraction_stats } print(json.dumps(example_result, indent=2, ensure_ascii=False)) except Exception as save_error: logger.error(f"❌ Erreur lors de la sauvegarde: {save_error}") except FileNotFoundError: logger.error(f"❌ Fichier ResearchOutput non trouvé: {research_file}") logger.info("Utilisation de l'exemple avec URLs directes...") await test_with_direct_urls() except Exception as e: logger.error(f"❌ Erreur lors du traitement: {e}") async def test_with_direct_urls(): """Test avec des URLs directes.""" urls = [ 'https://www.iana.org/help/example-domains', ] logger.info(f"=== TEST AVEC URLS DIRECTES ===") documents = await extract_content_from_urls(urls) for doc in documents: logger.info(f"Title: {doc.title}, URL: {doc.url}, Word Count: {doc.word_count}, Language: {doc.language}, Content Length: {len(doc.content)}") # Choisir le test à exécuter import sys if len(sys.argv) > 1 and sys.argv[1] == "--direct": asyncio.run(test_with_direct_urls()) else: asyncio.run(test_with_research_output())