Spaces:
Sleeping
Sleeping
File size: 9,169 Bytes
dfdddb1 655a82b dfdddb1 655a82b dfdddb1 655a82b dfdddb1 1d03e6e dfdddb1 1d03e6e dfdddb1 655a82b 1d03e6e dfdddb1 1d03e6e dfdddb1 1d03e6e dfdddb1 1d03e6e dfdddb1 1d03e6e dfdddb1 1d03e6e b009395 1d03e6e b009395 dfdddb1 1d03e6e b009395 dfdddb1 1d03e6e dfdddb1 1d03e6e dfdddb1 1d03e6e dfdddb1 1d03e6e dfdddb1 655a82b dfdddb1 655a82b dfdddb1 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 | import asyncio
import httpx
import json
import logging
from typing import Dict, List, Optional
from datetime import datetime
from lxml import html
from newspaper import Article
from utils.config import STORAGE_PATH, SCRAPER_DELAY, SCRAPER_CONCURRENT_REQUESTS
from utils.logger import setup_logger
from db.postgres_connector import SessionLocal
from db.models import Document, DocumentVersion
from db.mongo_connector import save_to_mongo, db
from parser.cleaner import clean_html
from indexer.typesense_indexer import index_document as index_typesense
from utils.uuid_gen import generate_uuid
logger = setup_logger(__name__)
class ScrapDjiScraper:
def __init__(self, sources_file: str = "sources.json"):
self.sources_file = sources_file
self.sources = self.load_sources()
self.sem = asyncio.Semaphore(10) # Augmenté pour scraping massif
self.buffer = []
self.buffer_size = 50 # Buffer augmenté pour scraping massif
self.discovered_urls = set() # Pour éviter les doublons (mémoire session)
def load_sources(self) -> Dict:
if not os.path.exists(self.sources_file):
return {"sources": []}
with open(self.sources_file, 'r', encoding='utf-8') as f:
return json.load(f)
async def is_url_scraped(self, url: str) -> bool:
"""Vérifie si l'URL existe déjà dans MongoDB"""
try:
# Vérification rapide sur l'ID (source_url est unique ?)
# Ou count_documents
doc = await db["documents"].find_one({"source_url": url}, {"_id": 1})
return doc is not None
except Exception:
return False
async def scrape_article(self, client: httpx.AsyncClient, source: Dict, url: str) -> Optional[Dict]:
"""Scrape ultra-rapide avec lxml (C-level parsing)"""
try:
resp = await client.get(url, timeout=7.0)
if resp.status_code != 200: return None
# Utilisation de lxml directement pour la vitesse pure
tree = html.fromstring(resp.content)
title = tree.xpath('//h1/text()')
title = title[0].strip() if title else "Sans titre"
# Extraction brute via newspaper (très optimisée)
article = Article(url)
article.set_html(resp.text)
article.parse()
content = article.text
if len(content) < 100: return None
return {
'id': generate_uuid(),
'titre': title,
'texte': content,
'source_url': url,
'pays': source.get('pays', 'Afrique'),
'langue': source.get('langue', 'fr'),
'type_document': 'article',
'date': datetime.now().isoformat()
}
except Exception as e:
logger.debug(f"Erreur scraping {url}: {e}")
return None
async def discover_links(self, client: httpx.AsyncClient, base_url: str) -> List[str]:
"""Découvre les liens d'articles sur une page (Optimisé)"""
try:
resp = await client.get(base_url, timeout=10.0)
if resp.status_code != 200:
return []
# Traitement CPU-bound dans un thread
loop = asyncio.get_event_loop()
links = await loop.run_in_executor(None, self._extract_links_sync, resp.content, base_url)
# Filtrage rapide
new_links = []
for link in links:
if link not in self.discovered_urls:
new_links.append(link)
self.discovered_urls.add(link)
return new_links[:100]
except Exception as e:
logger.debug(f"Erreur découverte liens {base_url}: {e}")
return []
def _extract_links_sync(self, content: bytes, base_url: str) -> List[str]:
"""Extraction synchrone des liens via lxml"""
try:
tree = html.fromstring(content)
raw_links = tree.xpath('//a/@href')
valid_links = []
from urllib.parse import urljoin
for link in raw_links:
if not link: continue
# Normalisation
if link.startswith('/'):
link = urljoin(base_url, link)
elif not link.startswith('http'):
continue
# Filtrage simple
if any(x in link.lower() for x in ['article', 'actualite', 'news', '/20', '-20']):
valid_links.append(link)
return valid_links
except:
return []
async def flush_buffer(self):
"""Sauvegarde groupée pour réduire les accès disque/réseau"""
if not self.buffer: return
logger.info(f"💾 Flush buffer: sauvegarde de {len(self.buffer)} documents...")
# 1. Sauvegarde JSON (Bloquant -> Thread)
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, self._save_buffer_sync, self.buffer.copy())
# 2. Sauvegarde MongoDB (Async -> Direct)
# On lance les sauvegardes en parallèle sans bloquer
try:
mongo_tasks = [save_to_mongo("documents", doc) for doc in self.buffer]
# On utilise gather avec return_exceptions=True pour que si un échoue, les autres continuent
await asyncio.gather(*mongo_tasks, return_exceptions=True)
logger.info("✅ Sauvegarde MongoDB terminées")
except Exception as e:
logger.error(f"Erreur sauvegarde MongoDB: {e}")
self.buffer = []
def _save_buffer_sync(self, documents: List[Dict]):
"""Sauvegarde synchrone (disque uniquement)"""
try:
os.makedirs("data", exist_ok=True)
local_file = "data/search_index.json"
# Lecture
existing_data = []
if os.path.exists(local_file):
with open(local_file, "r", encoding="utf-8") as f:
existing_data = json.load(f)
# Ajout
existing_data.extend(documents)
# Écriture
with open(local_file, "w", encoding="utf-8") as f:
json.dump(existing_data, f, indent=2, ensure_ascii=False)
except Exception as e:
logger.error(f"Erreur sauvegarde JSON: {e}")
async def process_source(self, client: httpx.AsyncClient, source: Dict):
"""Traite une source avec découverte de liens"""
count = 0
async with self.sem:
try:
# 1. Scraper la page principale (si non scrapée récemment?)
# On ne vérifie pas is_url_scraped pour la home car elle change
doc = await self.scrape_article(client, source, source['url'])
if doc:
self.buffer.append(doc)
count += 1
# 2. Découvrir et scraper les articles liés
logger.info(f"🔍 Découverte de liens sur {source['name']}...")
article_urls = await self.discover_links(client, source['url'])
logger.info(f"📰 {len(article_urls)} articles découverts sur {source['name']}")
# 3. Scraper les articles découverts (avec limite)
for url in article_urls[:50]: # Limiter à 50 articles par source pour commencer
# Vérification anti-doublon MongoDB
if await self.is_url_scraped(url):
# logger.debug(f"⏭️ Déjà scrapé: {url}")
continue
doc = await self.scrape_article(client, source, url)
if doc:
self.buffer.append(doc)
count += 1
if len(self.buffer) >= self.buffer_size:
await self.flush_buffer()
await asyncio.sleep(0.5) # Petit délai pour ne pas surcharger
return count
except Exception as e:
logger.error(f"Erreur traitement source {source.get('name')}: {e}")
return count
async def run(self):
active_sources = [s for s in self.sources.get('sources', []) if s.get('active', True)]
# Utilisation de HTTP/2 pour plus de vitesse si supporté par le site
async with httpx.AsyncClient(timeout=10.0, follow_redirects=True, http2=True) as client:
tasks = [self.process_source(client, s) for s in active_sources]
counts = await asyncio.gather(*tasks)
await self.flush_buffer() # Dernier flush
logger.info(f"⚡ Turbo Scraping terminé. {sum(counts)} documents traités.")
import os
from datetime import datetime
if __name__ == "__main__":
scraper = ScrapDjiScraper()
asyncio.run(scraper.run()) |