Spaces:
Sleeping
Sleeping
| import asyncio | |
| import httpx | |
| import json | |
| import logging | |
| from typing import Dict, List, Optional | |
| from datetime import datetime | |
| from lxml import html | |
| from newspaper import Article | |
| from utils.config import STORAGE_PATH, SCRAPER_DELAY, SCRAPER_CONCURRENT_REQUESTS | |
| from utils.logger import setup_logger | |
| from db.postgres_connector import SessionLocal | |
| from db.models import Document, DocumentVersion | |
| from db.mongo_connector import save_to_mongo, db | |
| from parser.cleaner import clean_html | |
| from indexer.typesense_indexer import index_document as index_typesense | |
| from utils.uuid_gen import generate_uuid | |
| logger = setup_logger(__name__) | |
| class ScrapDjiScraper: | |
| def __init__(self, sources_file: str = "sources.json"): | |
| self.sources_file = sources_file | |
| self.sources = self.load_sources() | |
| self.sem = asyncio.Semaphore(10) # Augmenté pour scraping massif | |
| self.buffer = [] | |
| self.buffer_size = 50 # Buffer augmenté pour scraping massif | |
| self.discovered_urls = set() # Pour éviter les doublons (mémoire session) | |
| def load_sources(self) -> Dict: | |
| if not os.path.exists(self.sources_file): | |
| return {"sources": []} | |
| with open(self.sources_file, 'r', encoding='utf-8') as f: | |
| return json.load(f) | |
| async def is_url_scraped(self, url: str) -> bool: | |
| """Vérifie si l'URL existe déjà dans MongoDB""" | |
| try: | |
| # Vérification rapide sur l'ID (source_url est unique ?) | |
| # Ou count_documents | |
| doc = await db["documents"].find_one({"source_url": url}, {"_id": 1}) | |
| return doc is not None | |
| except Exception: | |
| return False | |
| async def scrape_article(self, client: httpx.AsyncClient, source: Dict, url: str) -> Optional[Dict]: | |
| """Scrape ultra-rapide avec lxml (C-level parsing)""" | |
| try: | |
| resp = await client.get(url, timeout=7.0) | |
| if resp.status_code != 200: return None | |
| # Utilisation de lxml directement pour la vitesse pure | |
| tree = html.fromstring(resp.content) | |
| title = tree.xpath('//h1/text()') | |
| title = title[0].strip() if title else "Sans titre" | |
| # Extraction brute via newspaper (très optimisée) | |
| article = Article(url) | |
| article.set_html(resp.text) | |
| article.parse() | |
| content = article.text | |
| if len(content) < 100: return None | |
| return { | |
| 'id': generate_uuid(), | |
| 'titre': title, | |
| 'texte': content, | |
| 'source_url': url, | |
| 'pays': source.get('pays', 'Afrique'), | |
| 'langue': source.get('langue', 'fr'), | |
| 'type_document': 'article', | |
| 'date': datetime.now().isoformat() | |
| } | |
| except Exception as e: | |
| logger.debug(f"Erreur scraping {url}: {e}") | |
| return None | |
| async def discover_links(self, client: httpx.AsyncClient, base_url: str) -> List[str]: | |
| """Découvre les liens d'articles sur une page (Optimisé)""" | |
| try: | |
| resp = await client.get(base_url, timeout=10.0) | |
| if resp.status_code != 200: | |
| return [] | |
| # Traitement CPU-bound dans un thread | |
| loop = asyncio.get_event_loop() | |
| links = await loop.run_in_executor(None, self._extract_links_sync, resp.content, base_url) | |
| # Filtrage rapide | |
| new_links = [] | |
| for link in links: | |
| if link not in self.discovered_urls: | |
| new_links.append(link) | |
| self.discovered_urls.add(link) | |
| return new_links[:100] | |
| except Exception as e: | |
| logger.debug(f"Erreur découverte liens {base_url}: {e}") | |
| return [] | |
| def _extract_links_sync(self, content: bytes, base_url: str) -> List[str]: | |
| """Extraction synchrone des liens via lxml""" | |
| try: | |
| tree = html.fromstring(content) | |
| raw_links = tree.xpath('//a/@href') | |
| valid_links = [] | |
| from urllib.parse import urljoin | |
| for link in raw_links: | |
| if not link: continue | |
| # Normalisation | |
| if link.startswith('/'): | |
| link = urljoin(base_url, link) | |
| elif not link.startswith('http'): | |
| continue | |
| # Filtrage simple | |
| if any(x in link.lower() for x in ['article', 'actualite', 'news', '/20', '-20']): | |
| valid_links.append(link) | |
| return valid_links | |
| except: | |
| return [] | |
| async def flush_buffer(self): | |
| """Sauvegarde groupée pour réduire les accès disque/réseau""" | |
| if not self.buffer: return | |
| logger.info(f"💾 Flush buffer: sauvegarde de {len(self.buffer)} documents...") | |
| # 1. Sauvegarde JSON (Bloquant -> Thread) | |
| loop = asyncio.get_event_loop() | |
| await loop.run_in_executor(None, self._save_buffer_sync, self.buffer.copy()) | |
| # 2. Sauvegarde MongoDB (Async -> Direct) | |
| # On lance les sauvegardes en parallèle sans bloquer | |
| try: | |
| mongo_tasks = [save_to_mongo("documents", doc) for doc in self.buffer] | |
| # On utilise gather avec return_exceptions=True pour que si un échoue, les autres continuent | |
| await asyncio.gather(*mongo_tasks, return_exceptions=True) | |
| logger.info("✅ Sauvegarde MongoDB terminées") | |
| except Exception as e: | |
| logger.error(f"Erreur sauvegarde MongoDB: {e}") | |
| self.buffer = [] | |
| def _save_buffer_sync(self, documents: List[Dict]): | |
| """Sauvegarde synchrone (disque uniquement)""" | |
| try: | |
| os.makedirs("data", exist_ok=True) | |
| local_file = "data/search_index.json" | |
| # Lecture | |
| existing_data = [] | |
| if os.path.exists(local_file): | |
| with open(local_file, "r", encoding="utf-8") as f: | |
| existing_data = json.load(f) | |
| # Ajout | |
| existing_data.extend(documents) | |
| # Écriture | |
| with open(local_file, "w", encoding="utf-8") as f: | |
| json.dump(existing_data, f, indent=2, ensure_ascii=False) | |
| except Exception as e: | |
| logger.error(f"Erreur sauvegarde JSON: {e}") | |
| async def process_source(self, client: httpx.AsyncClient, source: Dict): | |
| """Traite une source avec découverte de liens""" | |
| count = 0 | |
| async with self.sem: | |
| try: | |
| # 1. Scraper la page principale (si non scrapée récemment?) | |
| # On ne vérifie pas is_url_scraped pour la home car elle change | |
| doc = await self.scrape_article(client, source, source['url']) | |
| if doc: | |
| self.buffer.append(doc) | |
| count += 1 | |
| # 2. Découvrir et scraper les articles liés | |
| logger.info(f"🔍 Découverte de liens sur {source['name']}...") | |
| article_urls = await self.discover_links(client, source['url']) | |
| logger.info(f"📰 {len(article_urls)} articles découverts sur {source['name']}") | |
| # 3. Scraper les articles découverts (avec limite) | |
| for url in article_urls[:50]: # Limiter à 50 articles par source pour commencer | |
| # Vérification anti-doublon MongoDB | |
| if await self.is_url_scraped(url): | |
| # logger.debug(f"⏭️ Déjà scrapé: {url}") | |
| continue | |
| doc = await self.scrape_article(client, source, url) | |
| if doc: | |
| self.buffer.append(doc) | |
| count += 1 | |
| if len(self.buffer) >= self.buffer_size: | |
| await self.flush_buffer() | |
| await asyncio.sleep(0.5) # Petit délai pour ne pas surcharger | |
| return count | |
| except Exception as e: | |
| logger.error(f"Erreur traitement source {source.get('name')}: {e}") | |
| return count | |
| async def run(self): | |
| active_sources = [s for s in self.sources.get('sources', []) if s.get('active', True)] | |
| # Utilisation de HTTP/2 pour plus de vitesse si supporté par le site | |
| async with httpx.AsyncClient(timeout=10.0, follow_redirects=True, http2=True) as client: | |
| tasks = [self.process_source(client, s) for s in active_sources] | |
| counts = await asyncio.gather(*tasks) | |
| await self.flush_buffer() # Dernier flush | |
| logger.info(f"⚡ Turbo Scraping terminé. {sum(counts)} documents traités.") | |
| import os | |
| from datetime import datetime | |
| if __name__ == "__main__": | |
| scraper = ScrapDjiScraper() | |
| asyncio.run(scraper.run()) |