Scrap-Dji / scraper /discovery.py
joel
Initial deployment: Scrap-Dji with API
dfdddb1
#!/usr/bin/env python3
"""
Module de découverte unifié et optimisé pour Scrap-Dji
Conçu pour être rapide et économe en ressources (Async/Httpx)
"""
import asyncio
import httpx
import json
import re
import os
from urllib.parse import urljoin, urlparse
from typing import List, Dict, Set
from bs4 import BeautifulSoup
from datetime import datetime
from utils.logger import setup_logger
from utils.config import SCRAPER_DELAY, SCRAPER_USER_AGENT
logger = setup_logger(__name__)
class UnifiedDiscovery:
def __init__(self):
self.headers = {'User-Agent': SCRAPER_USER_AGENT}
self.african_domains = {'.sn', '.ml', '.ci', '.ng', '.gh', '.ke', '.ma', '.tn', '.dz', '.cm', '.cd', '.ga', '.bj', '.tg'}
self.seed_sources = [
"https://www.allafrica.com",
"https://www.africanews.com",
"https://www.bbc.com/africa",
"https://www.rfi.fr/fr/afrique"
]
self.african_keywords = ['afrique', 'africa', 'actualités', 'news', 'journal', 'presse']
async def is_african_site(self, client: httpx.AsyncClient, url: str) -> bool:
"""Vérification rapide (Domaine) puis analyse de contenu si nécessaire"""
domain = urlparse(url).netloc.lower()
if any(domain.endswith(ext) for ext in self.african_domains):
return True
try:
resp = await client.get(url, timeout=5.0)
if resp.status_code == 200:
text = resp.text.lower()
return sum(text.count(kw) for kw in self.african_keywords) > 3
except Exception:
pass
return False
async def explore_source(self, client: httpx.AsyncClient, seed_url: str) -> List[Dict]:
"""Explore une source de départ asynchronement"""
discovered = []
try:
logger.info(f"🔍 Exploration de {seed_url}...")
resp = await client.get(seed_url, timeout=10.0)
soup = BeautifulSoup(resp.content, 'lxml')
links = soup.find_all('a', href=True)
for link in links:
href = link.get('href')
if not href or not href.startswith('http'): continue
full_url = urljoin(seed_url, href)
domain = urlparse(full_url).netloc
if any(ext in domain for ext in self.african_domains):
discovered.append({
'name': domain.replace('.', '_'),
'url': f"{urlparse(full_url).scheme}://{domain}",
'type': 'news',
'active': True,
'discovered_at': datetime.now().isoformat()
})
except Exception as e:
logger.error(f"Erreur seed {seed_url}: {e}")
return discovered
async def run_discovery(self):
async with httpx.AsyncClient(headers=self.headers, follow_redirects=True) as client:
tasks = [self.explore_source(client, seed) for seed in self.seed_sources]
results = await asyncio.gather(*tasks)
all_sources = []
seen = set()
for batch in results:
for s in batch:
if s['url'] not in seen:
all_sources.append(s)
seen.add(s['url'])
logger.info(f"✅ {len(all_sources)} sources uniques découvertes.")
self.save_sources(all_sources)
return all_sources
def save_sources(self, sources: List[Dict], filename: str = "sources.json"):
existing = {"sources": []}
if os.path.exists(filename):
try:
with open(filename, 'r', encoding='utf-8') as f:
existing = json.load(f)
except: pass
existing_urls = {s['url'] for s in existing.get('sources', [])}
new_sources = [s for s in sources if s['url'] not in existing_urls]
existing['sources'].extend(new_sources)
with open(filename, 'w', encoding='utf-8') as f:
json.dump(existing, f, indent=2, ensure_ascii=False)
logger.info(f"💾 {len(new_sources)} nouvelles sources ajoutées à {filename}")
if __name__ == "__main__":
discovery = UnifiedDiscovery()
asyncio.run(discovery.run_discovery())