testpush / src /agents /content_extractor_agent.py
Bachir00's picture
source code
8a848a5
"""
Agent Content Extractor - Extraction et nettoyage de contenu web.
Extrait le contenu de pages web, PDFs et autres documents.
"""
import asyncio
from typing import List, Optional
from datetime import datetime
from src.agents.base_agent import BaseAgent
from src.models.document_models import Document, ExtractionInput, ExtractionResult
from src.models.research_models import ResearchOutput
from src.models.state_models import AgentState, AgentType
from src.services.content_extraction import ContentExtractionManager, ContentExtractionError
from src.core.logging import setup_logger
class ContentExtractorAgent(BaseAgent[ExtractionInput, ExtractionResult]):
"""
Agent responsable de l'extraction de contenu depuis des URLs.
Fonctionnalités:
- Extraction de contenu HTML avec nettoyage intelligent
- Support des PDFs et autres formats
- Traitement parallèle de plusieurs URLs
- Gestion des erreurs et retry automatique
- Structuration et nettoyage du contenu
"""
def __init__(self, max_concurrent_extractions: int = 5, max_retries: int = 2):
super().__init__(
agent_type=AgentType.CONTENT_EXTRACTOR,
name="content_extractor",
max_retries=max_retries,
timeout=300.0 # 5 minutes
)
self.extraction_manager = ContentExtractionManager(
max_concurrent=max_concurrent_extractions,
max_retries=max_retries
)
def validate_input(self, input_data: ExtractionInput) -> bool:
"""
Valide les données d'entrée pour l'extraction.
Args:
input_data: Input contenant les URLs à extraire
Returns:
True si les données sont valides
"""
if not input_data.urls:
self.logger.error("Aucune URL fournie pour l'extraction")
return False
if len(input_data.urls) > 50: # Limite raisonnable
self.logger.error(f"Trop d'URLs ({len(input_data.urls)}), maximum 50")
return False
# Filtrer les URLs valides
valid_urls = self._filter_valid_urls(input_data.urls)
if not valid_urls:
self.logger.error("Aucune URL valide trouvée")
return False
return True
async def process_from_research_output(self, research_output: ResearchOutput) -> ExtractionResult:
"""
Traite directement un ResearchOutput pour extraire le contenu des URLs.
Args:
research_output: Résultats de recherche avec URLs à extraire
Returns:
ExtractionResult avec les documents extraits
"""
# Extraire les URLs des résultats de recherche (conversion en string)
urls = [str(result.url) for result in research_output.results]
self.logger.info(f"Extraction de contenu depuis ResearchOutput: {len(urls)} URLs")
self.logger.info(f"Sujet de recherche: {research_output.query.topic}")
# Créer l'input d'extraction
extraction_input = ExtractionInput(
urls=urls,
content_filters={
'min_content_length': 200, # Minimum de contenu
'max_content_length': 50000, # Maximum pour éviter les textes trop longs
'required_keywords': research_output.query.keywords # Filtrer par mots-clés de recherche
},
extraction_options={
'source_query': research_output.query.topic,
'search_keywords': research_output.query.keywords
}
)
# Traiter avec la méthode normale
return await self.process(extraction_input)
async def process(self, input_data: ExtractionInput) -> ExtractionResult:
"""
Exécute l'extraction de contenu pour les URLs fournies.
Args:
input_data: Input contenant les URLs à extraire et les options
Returns:
ExtractionResult avec les documents extraits
Raises:
ValueError: Si les URLs sont invalides
ContentExtractionError: Si l'extraction échoue
"""
start_time = datetime.now()
self.logger.info(f"Début extraction de contenu pour {len(input_data.urls)} URLs")
# Filtrer les URLs valides (validation déjà faite dans validate_input)
valid_urls = self._filter_valid_urls(input_data.urls)
self.logger.info(f"URLs valides à traiter: {len(valid_urls)}/{len(input_data.urls)}")
try:
# Extraction du contenu
documents = await self._extract_all_content(valid_urls, input_data)
# Post-traitement des documents
processed_documents = self._post_process_documents(documents, input_data)
# Calcul des statistiques
execution_time = (datetime.now() - start_time).total_seconds()
# Identifier les URLs qui ont échoué
successful_urls = {str(doc.url) for doc in processed_documents}
failed_urls = [url for url in valid_urls if url not in successful_urls]
# Création du résultat
result = ExtractionResult(
documents=processed_documents,
total_urls=len(input_data.urls),
successful_extractions=len(processed_documents),
failed_extractions=len(input_data.urls) - len(processed_documents),
failed_urls=failed_urls,
execution_time=execution_time,
extraction_stats=self._calculate_stats(processed_documents)
)
self.logger.info(
f"Extraction terminée: {result.successful_extractions}/{result.total_urls} "
f"succès en {execution_time:.2f}s"
)
return result
except Exception as e:
self.logger.error(f"Erreur lors de l'extraction: {str(e)}")
raise ContentExtractionError(f"Échec de l'extraction de contenu: {str(e)}")
def _filter_valid_urls(self, urls: List[str]) -> List[str]:
"""Filtre et valide les URLs."""
import re
from urllib.parse import urlparse
valid_urls = []
url_pattern = re.compile(
r'^https?://' # http:// ou https://
r'(?:(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+[A-Z]{2,6}\.?|' # domain
r'localhost|' # localhost
r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})' # IP
r'(?::\d+)?' # port
r'(?:/?|[/?]\S+)$', re.IGNORECASE)
for url in urls:
if not url or not isinstance(url, str):
self.logger.warning(f"URL invalide ignorée: {url}")
continue
url = url.strip()
if not url:
continue
# Validation du format
if not url_pattern.match(url):
self.logger.warning(f"Format URL invalide: {url}")
continue
# Validation avec urlparse
try:
parsed = urlparse(url)
if not parsed.netloc:
self.logger.warning(f"URL sans domaine: {url}")
continue
valid_urls.append(url)
except Exception as e:
self.logger.warning(f"Erreur de parsing URL {url}: {e}")
continue
return valid_urls
async def _extract_all_content(self, urls: List[str], input_data: ExtractionInput) -> List[Document]:
"""Extrait le contenu de toutes les URLs."""
try:
# Utiliser le gestionnaire d'extraction
documents = await self.extraction_manager.extract_multiple(urls)
# Appliquer les filtres si spécifiés
if input_data.content_filters:
documents = self._apply_content_filters(documents, input_data.content_filters)
return documents
except Exception as e:
self.logger.error(f"Erreur lors de l'extraction multiple: {str(e)}")
raise
def _apply_content_filters(self, documents: List[Document], filters: dict) -> List[Document]:
"""Applique les filtres de contenu aux documents."""
filtered_documents = []
for doc in documents:
# Filtrer par longueur minimale
min_length = filters.get('min_content_length', 100)
if len(doc.content) < min_length:
self.logger.debug(f"Document {doc.title} trop court: {len(doc.content)} caractères")
continue
# Filtrer par longueur maximale
max_length = filters.get('max_content_length', 100000)
if len(doc.content) > max_length:
self.logger.debug(f"Document {doc.title} trop long, troncature")
doc.content = doc.content[:max_length] + "... [Contenu tronqué]"
# Filtrer par langue si spécifiée
required_language = filters.get('language')
if required_language and doc.language != required_language:
self.logger.debug(f"Document {doc.title} ignoré: langue {doc.language}")
continue
# Filtrer par mots-clés si spécifiés
required_keywords = filters.get('required_keywords', [])
if required_keywords:
content_lower = doc.content.lower()
if not any(keyword.lower() in content_lower for keyword in required_keywords):
self.logger.debug(f"Document {doc.title} ignoré: mots-clés manquants")
continue
filtered_documents.append(doc)
self.logger.info(f"Filtres appliqués: {len(filtered_documents)}/{len(documents)} documents retenus")
return filtered_documents
def _post_process_documents(self, documents: List[Document], input_data: ExtractionInput) -> List[Document]:
"""Post-traitement des documents extraits."""
processed_docs = []
for doc in documents:
# Nettoyage supplémentaire du contenu
doc.content = self._clean_content(doc.content)
# Recalcul du nombre de mots après nettoyage
doc.word_count = len(doc.content.split())
# Validation finale
if self._is_valid_document(doc, input_data):
processed_docs.append(doc)
else:
self.logger.debug(f"Document {doc.title} rejeté lors de la validation finale")
return processed_docs
def _clean_content(self, content: str) -> str:
"""Nettoyage avancé du contenu."""
import re
if not content:
return ""
# Supprimer les caractères de contrôle
content = re.sub(r'[\x00-\x08\x0B\x0C\x0E-\x1F\x7F]', '', content)
# Normaliser les espaces
content = re.sub(r'[ \t]+', ' ', content)
# Normaliser les sauts de ligne
content = re.sub(r'\n\s*\n\s*\n+', '\n\n', content)
# Supprimer les espaces en début et fin de lignes
lines = content.split('\n')
lines = [line.strip() for line in lines]
content = '\n'.join(lines)
# Supprimer les lignes vides multiples
content = re.sub(r'\n{3,}', '\n\n', content)
return content.strip()
def _is_valid_document(self, doc: Document, input_data: ExtractionInput) -> bool:
"""Valide un document extrait."""
# Vérifications de base
if not doc.content or not doc.content.strip():
return False
if len(doc.content) < 50: # Contenu trop court
return False
# Vérification du ratio texte/contenu (détecter les pages avec peu de contenu)
if doc.word_count < 20:
return False
# Vérifications spécifiques aux options d'entrée
if hasattr(input_data, 'min_quality_score'):
quality_score = self._calculate_content_quality(doc)
if quality_score < input_data.min_quality_score:
return False
return True
def _calculate_content_quality(self, doc: Document) -> float:
"""Calcule un score de qualité pour le contenu (0-1)."""
score = 0.0
# Points pour la longueur
if doc.word_count > 100:
score += 0.3
elif doc.word_count > 50:
score += 0.1
# Points pour la structure
if doc.title and len(doc.title) > 10:
score += 0.2
if doc.author:
score += 0.1
if doc.published_date:
score += 0.1
# Points pour la richesse du contenu
content = doc.content.lower()
if any(marker in content for marker in ['conclusion', 'introduction', 'sommaire']):
score += 0.2
# Pénalité pour contenu répétitif
lines = doc.content.split('\n')
unique_lines = set(line.strip() for line in lines if line.strip())
if len(lines) > 0:
uniqueness_ratio = len(unique_lines) / len(lines)
if uniqueness_ratio < 0.5:
score -= 0.2
return max(0.0, min(1.0, score))
def _calculate_stats(self, documents: List[Document]) -> dict:
"""Calcule les statistiques d'extraction."""
if not documents:
return {
'total_words': 0,
'average_words_per_doc': 0,
'doc_types': {},
'languages': {},
'has_authors': 0,
'has_dates': 0
}
total_words = sum(doc.word_count for doc in documents)
# Compter les types de documents
doc_types = {}
for doc in documents:
doc_type = doc.doc_type.value if doc.doc_type else 'unknown'
doc_types[doc_type] = doc_types.get(doc_type, 0) + 1
# Compter les langues
languages = {}
for doc in documents:
lang = doc.language or 'unknown'
languages[lang] = languages.get(lang, 0) + 1
# Compter les métadonnées
has_authors = sum(1 for doc in documents if doc.author)
has_dates = sum(1 for doc in documents if doc.published_date)
return {
'total_words': total_words,
'average_words_per_doc': total_words // len(documents),
'doc_types': doc_types,
'languages': languages,
'has_authors': has_authors,
'has_dates': has_dates
}
# Fonction utilitaire pour les tests
async def extract_content_from_urls(urls: List[str], **options) -> List[Document]:
"""
Fonction utilitaire pour extraire du contenu depuis une liste d'URLs.
Args:
urls: Liste des URLs à extraire
**options: Options d'extraction (filters, etc.)
Returns:
Liste des documents extraits
"""
agent = ContentExtractorAgent()
input_data = ExtractionInput(
urls=urls,
content_filters=options.get('content_filters', {}),
extraction_options=options.get('extraction_options', {})
)
result = await agent.execute(input_data)
return result.documents
# Fonction utilitaire pour l'intégration avec le Researcher
async def extract_from_search_results(search_results: List[dict]) -> List[Document]:
"""
Extrait le contenu depuis des résultats de recherche.
Args:
search_results: Résultats de recherche avec URLs
Returns:
Liste des documents extraits
"""
urls = []
for result in search_results:
if isinstance(result, dict) and 'url' in result:
urls.append(result['url'])
elif hasattr(result, 'url'):
urls.append(result.url)
if not urls:
return []
return await extract_content_from_urls(urls)
# Fonctions utilitaires pour la sauvegarde
def save_extraction_result(result: ExtractionResult, filename: str = None) -> str:
"""
Sauvegarde un ExtractionResult dans un fichier JSON.
Args:
result: Résultat d'extraction à sauvegarder
filename: Nom du fichier (optionnel)
Returns:
Nom du fichier sauvegardé
"""
import json
from datetime import datetime
if not filename:
# Générer un nom de fichier basé sur le nombre de documents et timestamp
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"extraction_result_{result.successful_extractions}docs_{timestamp}.json"
try:
# Conversion en dictionnaire avec sérialisation des dates
result_dict = result.model_dump(mode='json')
# Sauvegarde dans le fichier
with open(filename, 'w', encoding='utf-8') as f:
json.dump(result_dict, f, indent=2, ensure_ascii=False)
return filename
except Exception as e:
raise Exception(f"Erreur lors de la sauvegarde: {e}")
def load_extraction_result(filename: str) -> ExtractionResult:
"""
Charge un ExtractionResult depuis un fichier JSON.
Args:
filename: Nom du fichier à charger
Returns:
ExtractionResult chargé
"""
import json
try:
with open(filename, 'r', encoding='utf-8') as f:
data = json.load(f)
# Reconstruction de l'ExtractionResult
return ExtractionResult(**data)
except Exception as e:
raise Exception(f"Erreur lors du chargement: {e}")
# Configuration du logger pour l'agent
logger = setup_logger("ContentExtractorAgent")
# Exemple d'utilisation
if __name__ == "__main__":
import asyncio
import json
from src.models.research_models import ResearchOutput
async def test_with_research_output():
"""Test avec un fichier ResearchOutput sauvegardé."""
# Charger le ResearchOutput depuis le fichier JSON le plus récent
research_file = "research_output_impact_de_lintelligence_artifi_20251116_141136.json"
try:
# Charger le ResearchOutput
with open(research_file, 'r', encoding='utf-8') as f:
research_data = json.load(f)
research_output = ResearchOutput(**research_data)
logger.info(f"=== CHARGEMENT DU RESEARCH OUTPUT ===")
logger.info(f"Sujet: {research_output.query.topic}")
logger.info(f"URLs à extraire: {len(research_output.results)}")
# Créer l'agent et traiter
agent = ContentExtractorAgent()
logger.info(f"=== DÉBUT DE L'EXTRACTION DE CONTENU ===")
extraction_result = await agent.process_from_research_output(research_output)
logger.info(f"=== RÉSULTATS D'EXTRACTION ===")
logger.info(f"URLs traitées: {extraction_result.total_urls}")
logger.info(f"Extractions réussies: {extraction_result.successful_extractions}")
logger.info(f"Extractions échouées: {extraction_result.failed_extractions}")
logger.info(f"Temps d'exécution: {extraction_result.execution_time:.2f}s")
# Afficher les détails des documents extraits
for i, doc in enumerate(extraction_result.documents, 1):
logger.info(f"\n{i}. {doc.title}")
logger.info(f" URL: {doc.url}")
logger.info(f" Mots: {doc.word_count}")
logger.info(f" Langue: {doc.language}")
logger.info(f" Type: {doc.doc_type}")
logger.info(f" Contenu (aperçu): {doc.content[:200]}...")
# URLs qui ont échoué
if extraction_result.failed_urls:
logger.info(f"\n❌ URLs en échec:")
for url in extraction_result.failed_urls:
logger.info(f" • {url}")
# === SAUVEGARDE DE L'EXTRACTION RESULT ===
logger.info(f"\n=== SAUVEGARDE DE L'EXTRACTION RESULT ===")
try:
filename = save_extraction_result(extraction_result)
logger.info(f"✅ ExtractionResult sauvegardé dans: {filename}")
# Affichage du contenu sauvegardé
logger.info("📄 Contenu sauvegardé:")
logger.info(f" • Documents extraits: {len(extraction_result.documents)}")
logger.info(f" • Temps d'extraction: {extraction_result.execution_time:.2f}s")
logger.info(f" • Statistiques: {extraction_result.extraction_stats}")
# Test de chargement pour vérifier l'intégrité
logger.info("=== Test de chargement ===")
loaded_result = load_extraction_result(filename)
logger.info(f"✅ ExtractionResult rechargé avec succès")
logger.info(f" • Vérification: {len(loaded_result.documents)} documents chargés")
# Comparaison des données
if loaded_result.successful_extractions == extraction_result.successful_extractions:
logger.info("✅ Intégrité des données vérifiée")
else:
logger.error("❌ Erreur d'intégrité des données")
# Affichage du format JSON pour référence
logger.info("\n📋 EXEMPLE DE FORMAT JSON SAUVEGARDÉ:")
logger.info("-" * 50)
# Créer un exemple compact pour l'affichage
example_result = {
"documents": [
{
"title": doc.title,
"url": str(doc.url),
"content": doc.content[:200] + "...",
"word_count": doc.word_count,
"language": doc.language,
"doc_type": doc.doc_type.value if doc.doc_type else None
} for doc in extraction_result.documents[:2] # Limiter à 2 documents
],
"total_urls": extraction_result.total_urls,
"successful_extractions": extraction_result.successful_extractions,
"failed_extractions": extraction_result.failed_extractions,
"failed_urls": extraction_result.failed_urls,
"execution_time": extraction_result.execution_time,
"extraction_stats": extraction_result.extraction_stats
}
print(json.dumps(example_result, indent=2, ensure_ascii=False))
except Exception as save_error:
logger.error(f"❌ Erreur lors de la sauvegarde: {save_error}")
except FileNotFoundError:
logger.error(f"❌ Fichier ResearchOutput non trouvé: {research_file}")
logger.info("Utilisation de l'exemple avec URLs directes...")
await test_with_direct_urls()
except Exception as e:
logger.error(f"❌ Erreur lors du traitement: {e}")
async def test_with_direct_urls():
"""Test avec des URLs directes."""
urls = [
'https://www.iana.org/help/example-domains',
]
logger.info(f"=== TEST AVEC URLS DIRECTES ===")
documents = await extract_content_from_urls(urls)
for doc in documents:
logger.info(f"Title: {doc.title}, URL: {doc.url}, Word Count: {doc.word_count}, Language: {doc.language}, Content Length: {len(doc.content)}")
# Choisir le test à exécuter
import sys
if len(sys.argv) > 1 and sys.argv[1] == "--direct":
asyncio.run(test_with_direct_urls())
else:
asyncio.run(test_with_research_output())