Scrap-Dji / scraper /main.py
joel
Fix: JSON syntax error and deploy all changes maj async auto serch
655a82b
import asyncio
import httpx
import json
import logging
from typing import Dict, List, Optional
from datetime import datetime
from lxml import html
from newspaper import Article
from utils.config import STORAGE_PATH, SCRAPER_DELAY, SCRAPER_CONCURRENT_REQUESTS
from utils.logger import setup_logger
from db.postgres_connector import SessionLocal
from db.models import Document, DocumentVersion
from db.mongo_connector import save_to_mongo, db
from parser.cleaner import clean_html
from indexer.typesense_indexer import index_document as index_typesense
from utils.uuid_gen import generate_uuid
logger = setup_logger(__name__)
class ScrapDjiScraper:
def __init__(self, sources_file: str = "sources.json"):
self.sources_file = sources_file
self.sources = self.load_sources()
self.sem = asyncio.Semaphore(10) # Augmenté pour scraping massif
self.buffer = []
self.buffer_size = 50 # Buffer augmenté pour scraping massif
self.discovered_urls = set() # Pour éviter les doublons (mémoire session)
def load_sources(self) -> Dict:
if not os.path.exists(self.sources_file):
return {"sources": []}
with open(self.sources_file, 'r', encoding='utf-8') as f:
return json.load(f)
async def is_url_scraped(self, url: str) -> bool:
"""Vérifie si l'URL existe déjà dans MongoDB"""
try:
# Vérification rapide sur l'ID (source_url est unique ?)
# Ou count_documents
doc = await db["documents"].find_one({"source_url": url}, {"_id": 1})
return doc is not None
except Exception:
return False
async def scrape_article(self, client: httpx.AsyncClient, source: Dict, url: str) -> Optional[Dict]:
"""Scrape ultra-rapide avec lxml (C-level parsing)"""
try:
resp = await client.get(url, timeout=7.0)
if resp.status_code != 200: return None
# Utilisation de lxml directement pour la vitesse pure
tree = html.fromstring(resp.content)
title = tree.xpath('//h1/text()')
title = title[0].strip() if title else "Sans titre"
# Extraction brute via newspaper (très optimisée)
article = Article(url)
article.set_html(resp.text)
article.parse()
content = article.text
if len(content) < 100: return None
return {
'id': generate_uuid(),
'titre': title,
'texte': content,
'source_url': url,
'pays': source.get('pays', 'Afrique'),
'langue': source.get('langue', 'fr'),
'type_document': 'article',
'date': datetime.now().isoformat()
}
except Exception as e:
logger.debug(f"Erreur scraping {url}: {e}")
return None
async def discover_links(self, client: httpx.AsyncClient, base_url: str) -> List[str]:
"""Découvre les liens d'articles sur une page (Optimisé)"""
try:
resp = await client.get(base_url, timeout=10.0)
if resp.status_code != 200:
return []
# Traitement CPU-bound dans un thread
loop = asyncio.get_event_loop()
links = await loop.run_in_executor(None, self._extract_links_sync, resp.content, base_url)
# Filtrage rapide
new_links = []
for link in links:
if link not in self.discovered_urls:
new_links.append(link)
self.discovered_urls.add(link)
return new_links[:100]
except Exception as e:
logger.debug(f"Erreur découverte liens {base_url}: {e}")
return []
def _extract_links_sync(self, content: bytes, base_url: str) -> List[str]:
"""Extraction synchrone des liens via lxml"""
try:
tree = html.fromstring(content)
raw_links = tree.xpath('//a/@href')
valid_links = []
from urllib.parse import urljoin
for link in raw_links:
if not link: continue
# Normalisation
if link.startswith('/'):
link = urljoin(base_url, link)
elif not link.startswith('http'):
continue
# Filtrage simple
if any(x in link.lower() for x in ['article', 'actualite', 'news', '/20', '-20']):
valid_links.append(link)
return valid_links
except:
return []
async def flush_buffer(self):
"""Sauvegarde groupée pour réduire les accès disque/réseau"""
if not self.buffer: return
logger.info(f"💾 Flush buffer: sauvegarde de {len(self.buffer)} documents...")
# 1. Sauvegarde JSON (Bloquant -> Thread)
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, self._save_buffer_sync, self.buffer.copy())
# 2. Sauvegarde MongoDB (Async -> Direct)
# On lance les sauvegardes en parallèle sans bloquer
try:
mongo_tasks = [save_to_mongo("documents", doc) for doc in self.buffer]
# On utilise gather avec return_exceptions=True pour que si un échoue, les autres continuent
await asyncio.gather(*mongo_tasks, return_exceptions=True)
logger.info("✅ Sauvegarde MongoDB terminées")
except Exception as e:
logger.error(f"Erreur sauvegarde MongoDB: {e}")
self.buffer = []
def _save_buffer_sync(self, documents: List[Dict]):
"""Sauvegarde synchrone (disque uniquement)"""
try:
os.makedirs("data", exist_ok=True)
local_file = "data/search_index.json"
# Lecture
existing_data = []
if os.path.exists(local_file):
with open(local_file, "r", encoding="utf-8") as f:
existing_data = json.load(f)
# Ajout
existing_data.extend(documents)
# Écriture
with open(local_file, "w", encoding="utf-8") as f:
json.dump(existing_data, f, indent=2, ensure_ascii=False)
except Exception as e:
logger.error(f"Erreur sauvegarde JSON: {e}")
async def process_source(self, client: httpx.AsyncClient, source: Dict):
"""Traite une source avec découverte de liens"""
count = 0
async with self.sem:
try:
# 1. Scraper la page principale (si non scrapée récemment?)
# On ne vérifie pas is_url_scraped pour la home car elle change
doc = await self.scrape_article(client, source, source['url'])
if doc:
self.buffer.append(doc)
count += 1
# 2. Découvrir et scraper les articles liés
logger.info(f"🔍 Découverte de liens sur {source['name']}...")
article_urls = await self.discover_links(client, source['url'])
logger.info(f"📰 {len(article_urls)} articles découverts sur {source['name']}")
# 3. Scraper les articles découverts (avec limite)
for url in article_urls[:50]: # Limiter à 50 articles par source pour commencer
# Vérification anti-doublon MongoDB
if await self.is_url_scraped(url):
# logger.debug(f"⏭️ Déjà scrapé: {url}")
continue
doc = await self.scrape_article(client, source, url)
if doc:
self.buffer.append(doc)
count += 1
if len(self.buffer) >= self.buffer_size:
await self.flush_buffer()
await asyncio.sleep(0.5) # Petit délai pour ne pas surcharger
return count
except Exception as e:
logger.error(f"Erreur traitement source {source.get('name')}: {e}")
return count
async def run(self):
active_sources = [s for s in self.sources.get('sources', []) if s.get('active', True)]
# Utilisation de HTTP/2 pour plus de vitesse si supporté par le site
async with httpx.AsyncClient(timeout=10.0, follow_redirects=True, http2=True) as client:
tasks = [self.process_source(client, s) for s in active_sources]
counts = await asyncio.gather(*tasks)
await self.flush_buffer() # Dernier flush
logger.info(f"⚡ Turbo Scraping terminé. {sum(counts)} documents traités.")
import os
from datetime import datetime
if __name__ == "__main__":
scraper = ScrapDjiScraper()
asyncio.run(scraper.run())