""" Service d'extraction de contenu web. Supporte HTML, PDF et autres formats de documents. """ import aiohttp import asyncio from typing import Optional, List, Dict, Any, TYPE_CHECKING from urllib.parse import urljoin, urlparse from datetime import datetime import re import mimetypes from asyncssh import logger from src.core.logging import setup_logger from src.models.document_models import Document, DocumentType # Import conditionnel des dépendances try: from bs4 import BeautifulSoup BEAUTIFULSOUP_AVAILABLE = True except ImportError: BEAUTIFULSOUP_AVAILABLE = False if TYPE_CHECKING: from bs4 import BeautifulSoup try: import PyPDF2 PDF_AVAILABLE = True except ImportError: PDF_AVAILABLE = False PyPDF2 = None try: import requests REQUESTS_AVAILABLE = True except ImportError: REQUESTS_AVAILABLE = False requests = None class ContentExtractionError(Exception): """Exception pour les erreurs d'extraction de contenu.""" pass class WebContentExtractor: """ Extracteur de contenu web avec support multi-format. """ def __init__(self, timeout: int = 30, max_content_length: int = 10_000_000): self.logger = setup_logger("content_extractor") self.timeout = timeout self.max_content_length = max_content_length # Headers pour simuler un navigateur réel self.headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36', 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8', 'Accept-Language': 'fr-FR,fr;q=0.9,en;q=0.8', 'Accept-Encoding': 'gzip, deflate', 'Connection': 'keep-alive', 'Upgrade-Insecure-Requests': '1', } # Vérification des dépendances self._check_dependencies() def _check_dependencies(self): """Vérifie que les dépendances nécessaires sont installées.""" if not BEAUTIFULSOUP_AVAILABLE: self.logger.warning("BeautifulSoup4 non installé - extraction HTML limitée") if not PDF_AVAILABLE: self.logger.warning("PyPDF2 non installé - extraction PDF non disponible") if not REQUESTS_AVAILABLE: self.logger.warning("requests non installé - extraction synchrone non disponible") async def extract_content(self, url: str) -> Document: """ Extrait le contenu d'une URL. Args: url: URL à extraire Returns: Document avec le contenu extrait Raises: ContentExtractionError: Si l'extraction échoue """ self.logger.info(f"Extraction de contenu: {url}") try: # Détecter le type de contenu content_type = await self._detect_content_type(url) if content_type.startswith('application/pdf'): return await self._extract_pdf_content(url) elif content_type.startswith('text/html') or 'html' in content_type: return await self._extract_html_content(url) else: # Tentative d'extraction générique #################### faire aussi l'extraction en fonction de l'extension du fichier et le js #################### return await self._extract_generic_content(url) except Exception as e: self.logger.error(f"Erreur lors de l'extraction de {url}: {str(e)}") raise ContentExtractionError(f"Impossible d'extraire le contenu de {url}: {str(e)}") async def _detect_content_type(self, url: str) -> str: """Détecte le type de contenu d'une URL.""" try: async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=10)) as session: async with session.head(url, headers=self.headers) as response: content_type = response.headers.get('content-type', '').lower() if content_type: return content_type.split(';')[0] # Enlever le charset # Fallback: détecter par extension parsed_url = urlparse(url) content_type, _ = mimetypes.guess_type(parsed_url.path) return content_type or 'text/html' except Exception as e: self.logger.warning(f"Impossible de détecter le type de contenu pour {url}: {e}") return 'text/html' # Default fallback async def _extract_html_content(self, url: str) -> Document: """Extrait le contenu d'une page HTML.""" if not BEAUTIFULSOUP_AVAILABLE: raise ContentExtractionError("BeautifulSoup4 non installé pour l'extraction HTML") async with aiohttp.ClientSession( timeout=aiohttp.ClientTimeout(total=self.timeout) ) as session: async with session.get(url, headers=self.headers) as response: if response.status != 200: raise ContentExtractionError(f"Erreur HTTP {response.status} pour {url}") # Vérifier la taille du contenu content_length = response.headers.get('content-length') if content_length and int(content_length) > self.max_content_length: raise ContentExtractionError(f"Contenu trop volumineux: {content_length} bytes") html_content = await response.text() # Parser avec BeautifulSoup from bs4 import BeautifulSoup soup = BeautifulSoup(html_content, 'html.parser') # Extraire le titre title = self._extract_title(soup) # Extraire le contenu principal content = self._extract_main_content(soup) # Vérifier la longueur du contenu if len(content) > self.max_content_length: raise ContentExtractionError(f"Contenu extrait trop volumineux: {len(content)} caractères") # Afficher le contenu # self.logger.info(f"Contenu extrait ({len(content)} caractères)") # Extraire les métadonnées author = self._extract_author(soup) publish_date = self._extract_publish_date(soup) return Document( title=title, url=url, content=content, doc_type=DocumentType.ARTICLE, author=author, published_date=publish_date, word_count=len(content.split()), language='fr' ############################################# Détection automatique à implémenter ################### ) def _extract_title(self, soup: "BeautifulSoup") -> str: """Extrait le titre de la page.""" # Priorité: title tag, h1, og:title, première heading # Title tag title_tag = soup.find('title') if title_tag and title_tag.get_text().strip(): return title_tag.get_text().strip() # Meta og:title og_title = soup.find('meta', {'property': 'og:title'}) if og_title and og_title.get('content'): return og_title.get('content').strip() # Premier h1 h1 = soup.find('h1') if h1 and h1.get_text().strip(): return h1.get_text().strip() # Fallback return "Titre non trouvé" def _extract_main_content(self, soup: "BeautifulSoup") -> str: """Extrait le contenu principal de la page.""" # Supprimer les éléments indésirables for element in soup.find_all(['script', 'style', 'nav', 'header', 'footer', 'aside', 'form']): element.decompose() # Supprimer les commentaires for comment in soup.find_all(string=lambda text: isinstance(text, str) and text.strip().startswith('