|
|
""" |
|
|
Agent Content Extractor - Extraction et nettoyage de contenu web. |
|
|
Extrait le contenu de pages web, PDFs et autres documents. |
|
|
""" |
|
|
|
|
|
import asyncio |
|
|
from typing import List, Optional |
|
|
from datetime import datetime |
|
|
|
|
|
from src.agents.base_agent import BaseAgent |
|
|
from src.models.document_models import Document, ExtractionInput, ExtractionResult |
|
|
from src.models.research_models import ResearchOutput |
|
|
from src.models.state_models import AgentState, AgentType |
|
|
from src.services.content_extraction import ContentExtractionManager, ContentExtractionError |
|
|
from src.core.logging import setup_logger |
|
|
|
|
|
|
|
|
class ContentExtractorAgent(BaseAgent[ExtractionInput, ExtractionResult]): |
|
|
""" |
|
|
Agent responsable de l'extraction de contenu depuis des URLs. |
|
|
|
|
|
Fonctionnalités: |
|
|
- Extraction de contenu HTML avec nettoyage intelligent |
|
|
- Support des PDFs et autres formats |
|
|
- Traitement parallèle de plusieurs URLs |
|
|
- Gestion des erreurs et retry automatique |
|
|
- Structuration et nettoyage du contenu |
|
|
""" |
|
|
|
|
|
def __init__(self, max_concurrent_extractions: int = 5, max_retries: int = 2): |
|
|
super().__init__( |
|
|
agent_type=AgentType.CONTENT_EXTRACTOR, |
|
|
name="content_extractor", |
|
|
max_retries=max_retries, |
|
|
timeout=300.0 |
|
|
) |
|
|
self.extraction_manager = ContentExtractionManager( |
|
|
max_concurrent=max_concurrent_extractions, |
|
|
max_retries=max_retries |
|
|
) |
|
|
|
|
|
def validate_input(self, input_data: ExtractionInput) -> bool: |
|
|
""" |
|
|
Valide les données d'entrée pour l'extraction. |
|
|
|
|
|
Args: |
|
|
input_data: Input contenant les URLs à extraire |
|
|
|
|
|
Returns: |
|
|
True si les données sont valides |
|
|
""" |
|
|
if not input_data.urls: |
|
|
self.logger.error("Aucune URL fournie pour l'extraction") |
|
|
return False |
|
|
|
|
|
if len(input_data.urls) > 50: |
|
|
self.logger.error(f"Trop d'URLs ({len(input_data.urls)}), maximum 50") |
|
|
return False |
|
|
|
|
|
|
|
|
valid_urls = self._filter_valid_urls(input_data.urls) |
|
|
if not valid_urls: |
|
|
self.logger.error("Aucune URL valide trouvée") |
|
|
return False |
|
|
|
|
|
return True |
|
|
|
|
|
async def process_from_research_output(self, research_output: ResearchOutput) -> ExtractionResult: |
|
|
""" |
|
|
Traite directement un ResearchOutput pour extraire le contenu des URLs. |
|
|
|
|
|
Args: |
|
|
research_output: Résultats de recherche avec URLs à extraire |
|
|
|
|
|
Returns: |
|
|
ExtractionResult avec les documents extraits |
|
|
""" |
|
|
|
|
|
urls = [str(result.url) for result in research_output.results] |
|
|
|
|
|
self.logger.info(f"Extraction de contenu depuis ResearchOutput: {len(urls)} URLs") |
|
|
self.logger.info(f"Sujet de recherche: {research_output.query.topic}") |
|
|
|
|
|
|
|
|
extraction_input = ExtractionInput( |
|
|
urls=urls, |
|
|
content_filters={ |
|
|
'min_content_length': 200, |
|
|
'max_content_length': 50000, |
|
|
'required_keywords': research_output.query.keywords |
|
|
}, |
|
|
extraction_options={ |
|
|
'source_query': research_output.query.topic, |
|
|
'search_keywords': research_output.query.keywords |
|
|
} |
|
|
) |
|
|
|
|
|
|
|
|
return await self.process(extraction_input) |
|
|
|
|
|
async def process(self, input_data: ExtractionInput) -> ExtractionResult: |
|
|
""" |
|
|
Exécute l'extraction de contenu pour les URLs fournies. |
|
|
|
|
|
Args: |
|
|
input_data: Input contenant les URLs à extraire et les options |
|
|
|
|
|
Returns: |
|
|
ExtractionResult avec les documents extraits |
|
|
|
|
|
Raises: |
|
|
ValueError: Si les URLs sont invalides |
|
|
ContentExtractionError: Si l'extraction échoue |
|
|
""" |
|
|
start_time = datetime.now() |
|
|
self.logger.info(f"Début extraction de contenu pour {len(input_data.urls)} URLs") |
|
|
|
|
|
|
|
|
valid_urls = self._filter_valid_urls(input_data.urls) |
|
|
self.logger.info(f"URLs valides à traiter: {len(valid_urls)}/{len(input_data.urls)}") |
|
|
|
|
|
try: |
|
|
|
|
|
documents = await self._extract_all_content(valid_urls, input_data) |
|
|
|
|
|
|
|
|
processed_documents = self._post_process_documents(documents, input_data) |
|
|
|
|
|
|
|
|
execution_time = (datetime.now() - start_time).total_seconds() |
|
|
|
|
|
|
|
|
successful_urls = {str(doc.url) for doc in processed_documents} |
|
|
failed_urls = [url for url in valid_urls if url not in successful_urls] |
|
|
|
|
|
|
|
|
result = ExtractionResult( |
|
|
documents=processed_documents, |
|
|
total_urls=len(input_data.urls), |
|
|
successful_extractions=len(processed_documents), |
|
|
failed_extractions=len(input_data.urls) - len(processed_documents), |
|
|
failed_urls=failed_urls, |
|
|
execution_time=execution_time, |
|
|
extraction_stats=self._calculate_stats(processed_documents) |
|
|
) |
|
|
|
|
|
self.logger.info( |
|
|
f"Extraction terminée: {result.successful_extractions}/{result.total_urls} " |
|
|
f"succès en {execution_time:.2f}s" |
|
|
) |
|
|
|
|
|
return result |
|
|
|
|
|
except Exception as e: |
|
|
self.logger.error(f"Erreur lors de l'extraction: {str(e)}") |
|
|
raise ContentExtractionError(f"Échec de l'extraction de contenu: {str(e)}") |
|
|
|
|
|
def _filter_valid_urls(self, urls: List[str]) -> List[str]: |
|
|
"""Filtre et valide les URLs.""" |
|
|
import re |
|
|
from urllib.parse import urlparse |
|
|
|
|
|
valid_urls = [] |
|
|
url_pattern = re.compile( |
|
|
r'^https?://' |
|
|
r'(?:(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+[A-Z]{2,6}\.?|' |
|
|
r'localhost|' |
|
|
r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})' |
|
|
r'(?::\d+)?' |
|
|
r'(?:/?|[/?]\S+)$', re.IGNORECASE) |
|
|
|
|
|
for url in urls: |
|
|
if not url or not isinstance(url, str): |
|
|
self.logger.warning(f"URL invalide ignorée: {url}") |
|
|
continue |
|
|
|
|
|
url = url.strip() |
|
|
if not url: |
|
|
continue |
|
|
|
|
|
|
|
|
if not url_pattern.match(url): |
|
|
self.logger.warning(f"Format URL invalide: {url}") |
|
|
continue |
|
|
|
|
|
|
|
|
try: |
|
|
parsed = urlparse(url) |
|
|
if not parsed.netloc: |
|
|
self.logger.warning(f"URL sans domaine: {url}") |
|
|
continue |
|
|
|
|
|
valid_urls.append(url) |
|
|
|
|
|
except Exception as e: |
|
|
self.logger.warning(f"Erreur de parsing URL {url}: {e}") |
|
|
continue |
|
|
|
|
|
return valid_urls |
|
|
|
|
|
async def _extract_all_content(self, urls: List[str], input_data: ExtractionInput) -> List[Document]: |
|
|
"""Extrait le contenu de toutes les URLs.""" |
|
|
try: |
|
|
|
|
|
documents = await self.extraction_manager.extract_multiple(urls) |
|
|
|
|
|
|
|
|
if input_data.content_filters: |
|
|
documents = self._apply_content_filters(documents, input_data.content_filters) |
|
|
|
|
|
return documents |
|
|
|
|
|
except Exception as e: |
|
|
self.logger.error(f"Erreur lors de l'extraction multiple: {str(e)}") |
|
|
raise |
|
|
|
|
|
def _apply_content_filters(self, documents: List[Document], filters: dict) -> List[Document]: |
|
|
"""Applique les filtres de contenu aux documents.""" |
|
|
filtered_documents = [] |
|
|
|
|
|
for doc in documents: |
|
|
|
|
|
min_length = filters.get('min_content_length', 100) |
|
|
if len(doc.content) < min_length: |
|
|
self.logger.debug(f"Document {doc.title} trop court: {len(doc.content)} caractères") |
|
|
continue |
|
|
|
|
|
|
|
|
max_length = filters.get('max_content_length', 100000) |
|
|
if len(doc.content) > max_length: |
|
|
self.logger.debug(f"Document {doc.title} trop long, troncature") |
|
|
doc.content = doc.content[:max_length] + "... [Contenu tronqué]" |
|
|
|
|
|
|
|
|
required_language = filters.get('language') |
|
|
if required_language and doc.language != required_language: |
|
|
self.logger.debug(f"Document {doc.title} ignoré: langue {doc.language}") |
|
|
continue |
|
|
|
|
|
|
|
|
required_keywords = filters.get('required_keywords', []) |
|
|
if required_keywords: |
|
|
content_lower = doc.content.lower() |
|
|
if not any(keyword.lower() in content_lower for keyword in required_keywords): |
|
|
self.logger.debug(f"Document {doc.title} ignoré: mots-clés manquants") |
|
|
continue |
|
|
|
|
|
filtered_documents.append(doc) |
|
|
|
|
|
self.logger.info(f"Filtres appliqués: {len(filtered_documents)}/{len(documents)} documents retenus") |
|
|
return filtered_documents |
|
|
|
|
|
def _post_process_documents(self, documents: List[Document], input_data: ExtractionInput) -> List[Document]: |
|
|
"""Post-traitement des documents extraits.""" |
|
|
processed_docs = [] |
|
|
|
|
|
for doc in documents: |
|
|
|
|
|
doc.content = self._clean_content(doc.content) |
|
|
|
|
|
|
|
|
doc.word_count = len(doc.content.split()) |
|
|
|
|
|
|
|
|
if self._is_valid_document(doc, input_data): |
|
|
processed_docs.append(doc) |
|
|
else: |
|
|
self.logger.debug(f"Document {doc.title} rejeté lors de la validation finale") |
|
|
|
|
|
return processed_docs |
|
|
|
|
|
def _clean_content(self, content: str) -> str: |
|
|
"""Nettoyage avancé du contenu.""" |
|
|
import re |
|
|
|
|
|
if not content: |
|
|
return "" |
|
|
|
|
|
|
|
|
content = re.sub(r'[\x00-\x08\x0B\x0C\x0E-\x1F\x7F]', '', content) |
|
|
|
|
|
|
|
|
content = re.sub(r'[ \t]+', ' ', content) |
|
|
|
|
|
|
|
|
content = re.sub(r'\n\s*\n\s*\n+', '\n\n', content) |
|
|
|
|
|
|
|
|
lines = content.split('\n') |
|
|
lines = [line.strip() for line in lines] |
|
|
content = '\n'.join(lines) |
|
|
|
|
|
|
|
|
content = re.sub(r'\n{3,}', '\n\n', content) |
|
|
|
|
|
return content.strip() |
|
|
|
|
|
def _is_valid_document(self, doc: Document, input_data: ExtractionInput) -> bool: |
|
|
"""Valide un document extrait.""" |
|
|
|
|
|
if not doc.content or not doc.content.strip(): |
|
|
return False |
|
|
|
|
|
if len(doc.content) < 50: |
|
|
return False |
|
|
|
|
|
|
|
|
if doc.word_count < 20: |
|
|
return False |
|
|
|
|
|
|
|
|
if hasattr(input_data, 'min_quality_score'): |
|
|
quality_score = self._calculate_content_quality(doc) |
|
|
if quality_score < input_data.min_quality_score: |
|
|
return False |
|
|
|
|
|
return True |
|
|
|
|
|
def _calculate_content_quality(self, doc: Document) -> float: |
|
|
"""Calcule un score de qualité pour le contenu (0-1).""" |
|
|
score = 0.0 |
|
|
|
|
|
|
|
|
if doc.word_count > 100: |
|
|
score += 0.3 |
|
|
elif doc.word_count > 50: |
|
|
score += 0.1 |
|
|
|
|
|
|
|
|
if doc.title and len(doc.title) > 10: |
|
|
score += 0.2 |
|
|
|
|
|
if doc.author: |
|
|
score += 0.1 |
|
|
|
|
|
if doc.published_date: |
|
|
score += 0.1 |
|
|
|
|
|
|
|
|
content = doc.content.lower() |
|
|
if any(marker in content for marker in ['conclusion', 'introduction', 'sommaire']): |
|
|
score += 0.2 |
|
|
|
|
|
|
|
|
lines = doc.content.split('\n') |
|
|
unique_lines = set(line.strip() for line in lines if line.strip()) |
|
|
if len(lines) > 0: |
|
|
uniqueness_ratio = len(unique_lines) / len(lines) |
|
|
if uniqueness_ratio < 0.5: |
|
|
score -= 0.2 |
|
|
|
|
|
return max(0.0, min(1.0, score)) |
|
|
|
|
|
def _calculate_stats(self, documents: List[Document]) -> dict: |
|
|
"""Calcule les statistiques d'extraction.""" |
|
|
if not documents: |
|
|
return { |
|
|
'total_words': 0, |
|
|
'average_words_per_doc': 0, |
|
|
'doc_types': {}, |
|
|
'languages': {}, |
|
|
'has_authors': 0, |
|
|
'has_dates': 0 |
|
|
} |
|
|
|
|
|
total_words = sum(doc.word_count for doc in documents) |
|
|
|
|
|
|
|
|
doc_types = {} |
|
|
for doc in documents: |
|
|
doc_type = doc.doc_type.value if doc.doc_type else 'unknown' |
|
|
doc_types[doc_type] = doc_types.get(doc_type, 0) + 1 |
|
|
|
|
|
|
|
|
languages = {} |
|
|
for doc in documents: |
|
|
lang = doc.language or 'unknown' |
|
|
languages[lang] = languages.get(lang, 0) + 1 |
|
|
|
|
|
|
|
|
has_authors = sum(1 for doc in documents if doc.author) |
|
|
has_dates = sum(1 for doc in documents if doc.published_date) |
|
|
|
|
|
return { |
|
|
'total_words': total_words, |
|
|
'average_words_per_doc': total_words // len(documents), |
|
|
'doc_types': doc_types, |
|
|
'languages': languages, |
|
|
'has_authors': has_authors, |
|
|
'has_dates': has_dates |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
async def extract_content_from_urls(urls: List[str], **options) -> List[Document]: |
|
|
""" |
|
|
Fonction utilitaire pour extraire du contenu depuis une liste d'URLs. |
|
|
|
|
|
Args: |
|
|
urls: Liste des URLs à extraire |
|
|
**options: Options d'extraction (filters, etc.) |
|
|
|
|
|
Returns: |
|
|
Liste des documents extraits |
|
|
""" |
|
|
agent = ContentExtractorAgent() |
|
|
|
|
|
input_data = ExtractionInput( |
|
|
urls=urls, |
|
|
content_filters=options.get('content_filters', {}), |
|
|
extraction_options=options.get('extraction_options', {}) |
|
|
) |
|
|
|
|
|
result = await agent.execute(input_data) |
|
|
return result.documents |
|
|
|
|
|
|
|
|
|
|
|
async def extract_from_search_results(search_results: List[dict]) -> List[Document]: |
|
|
""" |
|
|
Extrait le contenu depuis des résultats de recherche. |
|
|
|
|
|
Args: |
|
|
search_results: Résultats de recherche avec URLs |
|
|
|
|
|
Returns: |
|
|
Liste des documents extraits |
|
|
""" |
|
|
urls = [] |
|
|
for result in search_results: |
|
|
if isinstance(result, dict) and 'url' in result: |
|
|
urls.append(result['url']) |
|
|
elif hasattr(result, 'url'): |
|
|
urls.append(result.url) |
|
|
|
|
|
if not urls: |
|
|
return [] |
|
|
|
|
|
return await extract_content_from_urls(urls) |
|
|
|
|
|
|
|
|
|
|
|
def save_extraction_result(result: ExtractionResult, filename: str = None) -> str: |
|
|
""" |
|
|
Sauvegarde un ExtractionResult dans un fichier JSON. |
|
|
|
|
|
Args: |
|
|
result: Résultat d'extraction à sauvegarder |
|
|
filename: Nom du fichier (optionnel) |
|
|
|
|
|
Returns: |
|
|
Nom du fichier sauvegardé |
|
|
""" |
|
|
import json |
|
|
from datetime import datetime |
|
|
|
|
|
if not filename: |
|
|
|
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") |
|
|
filename = f"extraction_result_{result.successful_extractions}docs_{timestamp}.json" |
|
|
|
|
|
try: |
|
|
|
|
|
result_dict = result.model_dump(mode='json') |
|
|
|
|
|
|
|
|
with open(filename, 'w', encoding='utf-8') as f: |
|
|
json.dump(result_dict, f, indent=2, ensure_ascii=False) |
|
|
|
|
|
return filename |
|
|
|
|
|
except Exception as e: |
|
|
raise Exception(f"Erreur lors de la sauvegarde: {e}") |
|
|
|
|
|
|
|
|
def load_extraction_result(filename: str) -> ExtractionResult: |
|
|
""" |
|
|
Charge un ExtractionResult depuis un fichier JSON. |
|
|
|
|
|
Args: |
|
|
filename: Nom du fichier à charger |
|
|
|
|
|
Returns: |
|
|
ExtractionResult chargé |
|
|
""" |
|
|
import json |
|
|
|
|
|
try: |
|
|
with open(filename, 'r', encoding='utf-8') as f: |
|
|
data = json.load(f) |
|
|
|
|
|
|
|
|
return ExtractionResult(**data) |
|
|
|
|
|
except Exception as e: |
|
|
raise Exception(f"Erreur lors du chargement: {e}") |
|
|
|
|
|
|
|
|
|
|
|
logger = setup_logger("ContentExtractorAgent") |
|
|
|
|
|
if __name__ == "__main__": |
|
|
import asyncio |
|
|
import json |
|
|
from src.models.research_models import ResearchOutput |
|
|
|
|
|
async def test_with_research_output(): |
|
|
"""Test avec un fichier ResearchOutput sauvegardé.""" |
|
|
|
|
|
research_file = "research_output_impact_de_lintelligence_artifi_20251116_141136.json" |
|
|
|
|
|
try: |
|
|
|
|
|
with open(research_file, 'r', encoding='utf-8') as f: |
|
|
research_data = json.load(f) |
|
|
|
|
|
research_output = ResearchOutput(**research_data) |
|
|
logger.info(f"=== CHARGEMENT DU RESEARCH OUTPUT ===") |
|
|
logger.info(f"Sujet: {research_output.query.topic}") |
|
|
logger.info(f"URLs à extraire: {len(research_output.results)}") |
|
|
|
|
|
|
|
|
agent = ContentExtractorAgent() |
|
|
|
|
|
logger.info(f"=== DÉBUT DE L'EXTRACTION DE CONTENU ===") |
|
|
extraction_result = await agent.process_from_research_output(research_output) |
|
|
|
|
|
logger.info(f"=== RÉSULTATS D'EXTRACTION ===") |
|
|
logger.info(f"URLs traitées: {extraction_result.total_urls}") |
|
|
logger.info(f"Extractions réussies: {extraction_result.successful_extractions}") |
|
|
logger.info(f"Extractions échouées: {extraction_result.failed_extractions}") |
|
|
logger.info(f"Temps d'exécution: {extraction_result.execution_time:.2f}s") |
|
|
|
|
|
|
|
|
for i, doc in enumerate(extraction_result.documents, 1): |
|
|
logger.info(f"\n{i}. {doc.title}") |
|
|
logger.info(f" URL: {doc.url}") |
|
|
logger.info(f" Mots: {doc.word_count}") |
|
|
logger.info(f" Langue: {doc.language}") |
|
|
logger.info(f" Type: {doc.doc_type}") |
|
|
logger.info(f" Contenu (aperçu): {doc.content[:200]}...") |
|
|
|
|
|
|
|
|
if extraction_result.failed_urls: |
|
|
logger.info(f"\n❌ URLs en échec:") |
|
|
for url in extraction_result.failed_urls: |
|
|
logger.info(f" • {url}") |
|
|
|
|
|
|
|
|
logger.info(f"\n=== SAUVEGARDE DE L'EXTRACTION RESULT ===") |
|
|
|
|
|
try: |
|
|
filename = save_extraction_result(extraction_result) |
|
|
logger.info(f"✅ ExtractionResult sauvegardé dans: {filename}") |
|
|
|
|
|
|
|
|
logger.info("📄 Contenu sauvegardé:") |
|
|
logger.info(f" • Documents extraits: {len(extraction_result.documents)}") |
|
|
logger.info(f" • Temps d'extraction: {extraction_result.execution_time:.2f}s") |
|
|
logger.info(f" • Statistiques: {extraction_result.extraction_stats}") |
|
|
|
|
|
|
|
|
logger.info("=== Test de chargement ===") |
|
|
loaded_result = load_extraction_result(filename) |
|
|
logger.info(f"✅ ExtractionResult rechargé avec succès") |
|
|
logger.info(f" • Vérification: {len(loaded_result.documents)} documents chargés") |
|
|
|
|
|
|
|
|
if loaded_result.successful_extractions == extraction_result.successful_extractions: |
|
|
logger.info("✅ Intégrité des données vérifiée") |
|
|
else: |
|
|
logger.error("❌ Erreur d'intégrité des données") |
|
|
|
|
|
|
|
|
logger.info("\n📋 EXEMPLE DE FORMAT JSON SAUVEGARDÉ:") |
|
|
logger.info("-" * 50) |
|
|
|
|
|
|
|
|
example_result = { |
|
|
"documents": [ |
|
|
{ |
|
|
"title": doc.title, |
|
|
"url": str(doc.url), |
|
|
"content": doc.content[:200] + "...", |
|
|
"word_count": doc.word_count, |
|
|
"language": doc.language, |
|
|
"doc_type": doc.doc_type.value if doc.doc_type else None |
|
|
} for doc in extraction_result.documents[:2] |
|
|
], |
|
|
"total_urls": extraction_result.total_urls, |
|
|
"successful_extractions": extraction_result.successful_extractions, |
|
|
"failed_extractions": extraction_result.failed_extractions, |
|
|
"failed_urls": extraction_result.failed_urls, |
|
|
"execution_time": extraction_result.execution_time, |
|
|
"extraction_stats": extraction_result.extraction_stats |
|
|
} |
|
|
|
|
|
print(json.dumps(example_result, indent=2, ensure_ascii=False)) |
|
|
|
|
|
except Exception as save_error: |
|
|
logger.error(f"❌ Erreur lors de la sauvegarde: {save_error}") |
|
|
|
|
|
except FileNotFoundError: |
|
|
logger.error(f"❌ Fichier ResearchOutput non trouvé: {research_file}") |
|
|
logger.info("Utilisation de l'exemple avec URLs directes...") |
|
|
await test_with_direct_urls() |
|
|
except Exception as e: |
|
|
logger.error(f"❌ Erreur lors du traitement: {e}") |
|
|
|
|
|
async def test_with_direct_urls(): |
|
|
"""Test avec des URLs directes.""" |
|
|
urls = [ |
|
|
'https://www.iana.org/help/example-domains', |
|
|
] |
|
|
|
|
|
logger.info(f"=== TEST AVEC URLS DIRECTES ===") |
|
|
documents = await extract_content_from_urls(urls) |
|
|
for doc in documents: |
|
|
logger.info(f"Title: {doc.title}, URL: {doc.url}, Word Count: {doc.word_count}, Language: {doc.language}, Content Length: {len(doc.content)}") |
|
|
|
|
|
|
|
|
import sys |
|
|
if len(sys.argv) > 1 and sys.argv[1] == "--direct": |
|
|
asyncio.run(test_with_direct_urls()) |
|
|
else: |
|
|
asyncio.run(test_with_research_output()) |