File size: 9,169 Bytes
dfdddb1
 
 
 
 
 
 
 
 
 
 
 
 
655a82b
 
dfdddb1
 
 
 
 
 
 
 
 
 
 
 
 
655a82b
dfdddb1
 
 
 
 
 
 
655a82b
 
 
 
 
 
 
 
 
 
dfdddb1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1d03e6e
dfdddb1
 
 
 
 
1d03e6e
 
 
dfdddb1
655a82b
1d03e6e
dfdddb1
1d03e6e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
dfdddb1
 
 
 
 
1d03e6e
dfdddb1
1d03e6e
dfdddb1
1d03e6e
 
dfdddb1
 
 
 
 
 
1d03e6e
b009395
1d03e6e
 
 
b009395
 
 
 
 
 
 
 
 
 
dfdddb1
 
1d03e6e
b009395
dfdddb1
 
 
1d03e6e
 
 
dfdddb1
 
1d03e6e
 
 
 
 
 
dfdddb1
1d03e6e
dfdddb1
1d03e6e
dfdddb1
 
 
 
 
 
655a82b
 
dfdddb1
 
 
 
 
 
 
 
 
 
 
 
655a82b
 
 
 
 
dfdddb1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
import asyncio
import httpx
import json
import logging
from typing import Dict, List, Optional
from datetime import datetime
from lxml import html
from newspaper import Article

from utils.config import STORAGE_PATH, SCRAPER_DELAY, SCRAPER_CONCURRENT_REQUESTS
from utils.logger import setup_logger
from db.postgres_connector import SessionLocal
from db.models import Document, DocumentVersion
from db.mongo_connector import save_to_mongo, db

from parser.cleaner import clean_html
from indexer.typesense_indexer import index_document as index_typesense
from utils.uuid_gen import generate_uuid

logger = setup_logger(__name__)

class ScrapDjiScraper:
    def __init__(self, sources_file: str = "sources.json"):
        self.sources_file = sources_file
        self.sources = self.load_sources()
        self.sem = asyncio.Semaphore(10)  # Augmenté pour scraping massif
        self.buffer = []
        self.buffer_size = 50  # Buffer augmenté pour scraping massif
        self.discovered_urls = set()  # Pour éviter les doublons (mémoire session)

    def load_sources(self) -> Dict:
        if not os.path.exists(self.sources_file):
            return {"sources": []}
        with open(self.sources_file, 'r', encoding='utf-8') as f:
            return json.load(f)

    async def is_url_scraped(self, url: str) -> bool:
        """Vérifie si l'URL existe déjà dans MongoDB"""
        try:
            # Vérification rapide sur l'ID (source_url est unique ?)
            # Ou count_documents
            doc = await db["documents"].find_one({"source_url": url}, {"_id": 1})
            return doc is not None
        except Exception:
            return False

    async def scrape_article(self, client: httpx.AsyncClient, source: Dict, url: str) -> Optional[Dict]:
        """Scrape ultra-rapide avec lxml (C-level parsing)"""
        try:
            resp = await client.get(url, timeout=7.0)
            if resp.status_code != 200: return None
            
            # Utilisation de lxml directement pour la vitesse pure
            tree = html.fromstring(resp.content)
            title = tree.xpath('//h1/text()')
            title = title[0].strip() if title else "Sans titre"
            
            # Extraction brute via newspaper (très optimisée)
            article = Article(url)
            article.set_html(resp.text)
            article.parse()
            content = article.text
            
            if len(content) < 100: return None
            
            return {
                'id': generate_uuid(),
                'titre': title,
                'texte': content,
                'source_url': url,
                'pays': source.get('pays', 'Afrique'),
                'langue': source.get('langue', 'fr'),
                'type_document': 'article',
                'date': datetime.now().isoformat()
            }
        except Exception as e:
            logger.debug(f"Erreur scraping {url}: {e}")
            return None
    
    async def discover_links(self, client: httpx.AsyncClient, base_url: str) -> List[str]:
        """Découvre les liens d'articles sur une page (Optimisé)"""
        try:
            resp = await client.get(base_url, timeout=10.0)
            if resp.status_code != 200:
                return []
            
            # Traitement CPU-bound dans un thread
            loop = asyncio.get_event_loop()
            links = await loop.run_in_executor(None, self._extract_links_sync, resp.content, base_url)
            
            # Filtrage rapide
            new_links = []
            for link in links:
                if link not in self.discovered_urls:
                    new_links.append(link)
                    self.discovered_urls.add(link)
            
            return new_links[:100]
        except Exception as e:
            logger.debug(f"Erreur découverte liens {base_url}: {e}")
            return []

    def _extract_links_sync(self, content: bytes, base_url: str) -> List[str]:
        """Extraction synchrone des liens via lxml"""
        try:
            tree = html.fromstring(content)
            raw_links = tree.xpath('//a/@href')
            
            valid_links = []
            from urllib.parse import urljoin
            
            for link in raw_links:
                if not link: continue
                
                # Normalisation
                if link.startswith('/'):
                    link = urljoin(base_url, link)
                elif not link.startswith('http'):
                    continue
                
                # Filtrage simple
                if any(x in link.lower() for x in ['article', 'actualite', 'news', '/20', '-20']):
                    valid_links.append(link)
            
            return valid_links
        except:
            return []

    async def flush_buffer(self):
        """Sauvegarde groupée pour réduire les accès disque/réseau"""
        if not self.buffer: return
        logger.info(f"💾 Flush buffer: sauvegarde de {len(self.buffer)} documents...")
        
        # 1. Sauvegarde JSON (Bloquant -> Thread)
        loop = asyncio.get_event_loop()
        await loop.run_in_executor(None, self._save_buffer_sync, self.buffer.copy())
        
        # 2. Sauvegarde MongoDB (Async -> Direct)
        # On lance les sauvegardes en parallèle sans bloquer
        try:
            mongo_tasks = [save_to_mongo("documents", doc) for doc in self.buffer]
            # On utilise gather avec return_exceptions=True pour que si un échoue, les autres continuent
            await asyncio.gather(*mongo_tasks, return_exceptions=True)
            logger.info("✅ Sauvegarde MongoDB terminées")
        except Exception as e:
            logger.error(f"Erreur sauvegarde MongoDB: {e}")

        self.buffer = []

    def _save_buffer_sync(self, documents: List[Dict]):
        """Sauvegarde synchrone (disque uniquement)"""
        try:
            os.makedirs("data", exist_ok=True)
            local_file = "data/search_index.json"
            
            # Lecture
            existing_data = []
            if os.path.exists(local_file):
                with open(local_file, "r", encoding="utf-8") as f:
                    existing_data = json.load(f)
            
            # Ajout
            existing_data.extend(documents)
            
            # Écriture
            with open(local_file, "w", encoding="utf-8") as f:
                json.dump(existing_data, f, indent=2, ensure_ascii=False)
        except Exception as e:
            logger.error(f"Erreur sauvegarde JSON: {e}")

    async def process_source(self, client: httpx.AsyncClient, source: Dict):
        """Traite une source avec découverte de liens"""
        count = 0
        async with self.sem:
            try:
                # 1. Scraper la page principale (si non scrapée récemment?)
                # On ne vérifie pas is_url_scraped pour la home car elle change
                doc = await self.scrape_article(client, source, source['url'])
                if doc:
                    self.buffer.append(doc)
                    count += 1
                
                # 2. Découvrir et scraper les articles liés
                logger.info(f"🔍 Découverte de liens sur {source['name']}...")
                article_urls = await self.discover_links(client, source['url'])
                logger.info(f"📰 {len(article_urls)} articles découverts sur {source['name']}")
                
                # 3. Scraper les articles découverts (avec limite)
                for url in article_urls[:50]:  # Limiter à 50 articles par source pour commencer
                    # Vérification anti-doublon MongoDB
                    if await self.is_url_scraped(url):
                        # logger.debug(f"⏭️ Déjà scrapé: {url}")
                        continue
                    
                    doc = await self.scrape_article(client, source, url)
                    if doc:
                        self.buffer.append(doc)
                        count += 1
                        if len(self.buffer) >= self.buffer_size:
                            await self.flush_buffer()
                    await asyncio.sleep(0.5)  # Petit délai pour ne pas surcharger
                
                return count
            except Exception as e:
                logger.error(f"Erreur traitement source {source.get('name')}: {e}")
                return count

    async def run(self):
        active_sources = [s for s in self.sources.get('sources', []) if s.get('active', True)]
        # Utilisation de HTTP/2 pour plus de vitesse si supporté par le site
        async with httpx.AsyncClient(timeout=10.0, follow_redirects=True, http2=True) as client:
            tasks = [self.process_source(client, s) for s in active_sources]
            counts = await asyncio.gather(*tasks)
            await self.flush_buffer() # Dernier flush
            logger.info(f"⚡ Turbo Scraping terminé. {sum(counts)} documents traités.")

import os
from datetime import datetime

if __name__ == "__main__":
    scraper = ScrapDjiScraper()
    asyncio.run(scraper.run())