import asyncio import httpx import json import logging from typing import Dict, List, Optional from datetime import datetime from lxml import html from newspaper import Article from utils.config import STORAGE_PATH, SCRAPER_DELAY, SCRAPER_CONCURRENT_REQUESTS from utils.logger import setup_logger from db.postgres_connector import SessionLocal from db.models import Document, DocumentVersion from db.mongo_connector import save_to_mongo, db from parser.cleaner import clean_html from indexer.typesense_indexer import index_document as index_typesense from utils.uuid_gen import generate_uuid logger = setup_logger(__name__) class ScrapDjiScraper: def __init__(self, sources_file: str = "sources.json"): self.sources_file = sources_file self.sources = self.load_sources() self.sem = asyncio.Semaphore(10) # Augmenté pour scraping massif self.buffer = [] self.buffer_size = 50 # Buffer augmenté pour scraping massif self.discovered_urls = set() # Pour éviter les doublons (mémoire session) def load_sources(self) -> Dict: if not os.path.exists(self.sources_file): return {"sources": []} with open(self.sources_file, 'r', encoding='utf-8') as f: return json.load(f) async def is_url_scraped(self, url: str) -> bool: """Vérifie si l'URL existe déjà dans MongoDB""" try: # Vérification rapide sur l'ID (source_url est unique ?) # Ou count_documents doc = await db["documents"].find_one({"source_url": url}, {"_id": 1}) return doc is not None except Exception: return False async def scrape_article(self, client: httpx.AsyncClient, source: Dict, url: str) -> Optional[Dict]: """Scrape ultra-rapide avec lxml (C-level parsing)""" try: resp = await client.get(url, timeout=7.0) if resp.status_code != 200: return None # Utilisation de lxml directement pour la vitesse pure tree = html.fromstring(resp.content) title = tree.xpath('//h1/text()') title = title[0].strip() if title else "Sans titre" # Extraction brute via newspaper (très optimisée) article = Article(url) article.set_html(resp.text) article.parse() content = article.text if len(content) < 100: return None return { 'id': generate_uuid(), 'titre': title, 'texte': content, 'source_url': url, 'pays': source.get('pays', 'Afrique'), 'langue': source.get('langue', 'fr'), 'type_document': 'article', 'date': datetime.now().isoformat() } except Exception as e: logger.debug(f"Erreur scraping {url}: {e}") return None async def discover_links(self, client: httpx.AsyncClient, base_url: str) -> List[str]: """Découvre les liens d'articles sur une page (Optimisé)""" try: resp = await client.get(base_url, timeout=10.0) if resp.status_code != 200: return [] # Traitement CPU-bound dans un thread loop = asyncio.get_event_loop() links = await loop.run_in_executor(None, self._extract_links_sync, resp.content, base_url) # Filtrage rapide new_links = [] for link in links: if link not in self.discovered_urls: new_links.append(link) self.discovered_urls.add(link) return new_links[:100] except Exception as e: logger.debug(f"Erreur découverte liens {base_url}: {e}") return [] def _extract_links_sync(self, content: bytes, base_url: str) -> List[str]: """Extraction synchrone des liens via lxml""" try: tree = html.fromstring(content) raw_links = tree.xpath('//a/@href') valid_links = [] from urllib.parse import urljoin for link in raw_links: if not link: continue # Normalisation if link.startswith('/'): link = urljoin(base_url, link) elif not link.startswith('http'): continue # Filtrage simple if any(x in link.lower() for x in ['article', 'actualite', 'news', '/20', '-20']): valid_links.append(link) return valid_links except: return [] async def flush_buffer(self): """Sauvegarde groupée pour réduire les accès disque/réseau""" if not self.buffer: return logger.info(f"💾 Flush buffer: sauvegarde de {len(self.buffer)} documents...") # 1. Sauvegarde JSON (Bloquant -> Thread) loop = asyncio.get_event_loop() await loop.run_in_executor(None, self._save_buffer_sync, self.buffer.copy()) # 2. Sauvegarde MongoDB (Async -> Direct) # On lance les sauvegardes en parallèle sans bloquer try: mongo_tasks = [save_to_mongo("documents", doc) for doc in self.buffer] # On utilise gather avec return_exceptions=True pour que si un échoue, les autres continuent await asyncio.gather(*mongo_tasks, return_exceptions=True) logger.info("✅ Sauvegarde MongoDB terminées") except Exception as e: logger.error(f"Erreur sauvegarde MongoDB: {e}") self.buffer = [] def _save_buffer_sync(self, documents: List[Dict]): """Sauvegarde synchrone (disque uniquement)""" try: os.makedirs("data", exist_ok=True) local_file = "data/search_index.json" # Lecture existing_data = [] if os.path.exists(local_file): with open(local_file, "r", encoding="utf-8") as f: existing_data = json.load(f) # Ajout existing_data.extend(documents) # Écriture with open(local_file, "w", encoding="utf-8") as f: json.dump(existing_data, f, indent=2, ensure_ascii=False) except Exception as e: logger.error(f"Erreur sauvegarde JSON: {e}") async def process_source(self, client: httpx.AsyncClient, source: Dict): """Traite une source avec découverte de liens""" count = 0 async with self.sem: try: # 1. Scraper la page principale (si non scrapée récemment?) # On ne vérifie pas is_url_scraped pour la home car elle change doc = await self.scrape_article(client, source, source['url']) if doc: self.buffer.append(doc) count += 1 # 2. Découvrir et scraper les articles liés logger.info(f"🔍 Découverte de liens sur {source['name']}...") article_urls = await self.discover_links(client, source['url']) logger.info(f"📰 {len(article_urls)} articles découverts sur {source['name']}") # 3. Scraper les articles découverts (avec limite) for url in article_urls[:50]: # Limiter à 50 articles par source pour commencer # Vérification anti-doublon MongoDB if await self.is_url_scraped(url): # logger.debug(f"⏭️ Déjà scrapé: {url}") continue doc = await self.scrape_article(client, source, url) if doc: self.buffer.append(doc) count += 1 if len(self.buffer) >= self.buffer_size: await self.flush_buffer() await asyncio.sleep(0.5) # Petit délai pour ne pas surcharger return count except Exception as e: logger.error(f"Erreur traitement source {source.get('name')}: {e}") return count async def run(self): active_sources = [s for s in self.sources.get('sources', []) if s.get('active', True)] # Utilisation de HTTP/2 pour plus de vitesse si supporté par le site async with httpx.AsyncClient(timeout=10.0, follow_redirects=True, http2=True) as client: tasks = [self.process_source(client, s) for s in active_sources] counts = await asyncio.gather(*tasks) await self.flush_buffer() # Dernier flush logger.info(f"⚡ Turbo Scraping terminé. {sum(counts)} documents traités.") import os from datetime import datetime if __name__ == "__main__": scraper = ScrapDjiScraper() asyncio.run(scraper.run())