joel commited on
Commit
1d03e6e
·
1 Parent(s): 98da555
Files changed (1) hide show
  1. scraper/main.py +55 -40
scraper/main.py CHANGED
@@ -67,76 +67,91 @@ class ScrapDjiScraper:
67
  return None
68
 
69
  async def discover_links(self, client: httpx.AsyncClient, base_url: str) -> List[str]:
70
- """Découvre les liens d'articles sur une page"""
71
  try:
72
  resp = await client.get(base_url, timeout=10.0)
73
  if resp.status_code != 200:
74
  return []
75
 
76
- tree = html.fromstring(resp.content)
77
- # Extraction de tous les liens d'articles
78
- links = tree.xpath('//a/@href')
79
 
80
- # Filtrage et normalisation des URLs
81
- article_urls = []
82
  for link in links:
83
- if not link:
84
- continue
85
- # Construire l'URL complète
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
86
  if link.startswith('/'):
87
- from urllib.parse import urljoin
88
  link = urljoin(base_url, link)
89
  elif not link.startswith('http'):
90
  continue
91
 
92
- # Filtrer les URLs qui ressemblent à des articles
93
  if any(x in link.lower() for x in ['article', 'actualite', 'news', '/20', '-20']):
94
- if link not in self.discovered_urls:
95
- article_urls.append(link)
96
- self.discovered_urls.add(link)
97
 
98
- return article_urls[:100] # Limiter à 100 liens par page
99
- except Exception as e:
100
- logger.debug(f"Erreur découverte liens {base_url}: {e}")
101
  return []
102
 
103
  async def flush_buffer(self):
104
  """Sauvegarde groupée pour réduire les accès disque/réseau"""
105
  if not self.buffer: return
106
  logger.info(f"💾 Flush buffer: sauvegarde de {len(self.buffer)} documents...")
107
- tasks = [self.save_everywhere(doc) for doc in self.buffer]
108
- await asyncio.gather(*tasks)
 
 
 
109
  self.buffer = []
110
 
111
- async def save_everywhere(self, doc: Dict):
112
- # Fallback de secours : Toujours sauver en local JSON pour le test
 
113
  try:
114
  os.makedirs("data", exist_ok=True)
115
  local_file = "data/search_index.json"
116
- data = []
 
 
117
  if os.path.exists(local_file):
118
  with open(local_file, "r", encoding="utf-8") as f:
119
- data = json.load(f)
120
- data.append(doc)
121
- # Garder tous les documents pour scraping massif
 
 
 
122
  with open(local_file, "w", encoding="utf-8") as f:
123
- json.dump(data, f, indent=2, ensure_ascii=False)
124
  except Exception as e:
125
- logger.error(f"Erreur sauvegarde JSON locale: {e}")
126
 
127
- # Les autres bases (échoueront silencieusement si non installées)
128
- try:
129
- session = SessionLocal()
130
- new_doc = Document(**{k: v for k, v in doc.items() if k in Document.__table__.columns})
131
- session.add(new_doc)
132
- session.commit()
133
- session.close()
134
- except: pass
135
-
136
- try: await save_to_mongo("documents", doc)
137
- except: pass
138
- try: index_typesense("documents", doc)
139
- except: pass
140
 
141
  async def process_source(self, client: httpx.AsyncClient, source: Dict):
142
  """Traite une source avec découverte de liens"""
 
67
  return None
68
 
69
  async def discover_links(self, client: httpx.AsyncClient, base_url: str) -> List[str]:
70
+ """Découvre les liens d'articles sur une page (Optimisé)"""
71
  try:
72
  resp = await client.get(base_url, timeout=10.0)
73
  if resp.status_code != 200:
74
  return []
75
 
76
+ # Traitement CPU-bound dans un thread
77
+ loop = asyncio.get_event_loop()
78
+ links = await loop.run_in_executor(None, self._extract_links_sync, resp.content, base_url)
79
 
80
+ # Filtrage rapide (peut rester dans le thread principal ou migrer si très lourd)
81
+ new_links = []
82
  for link in links:
83
+ if link not in self.discovered_urls:
84
+ new_links.append(link)
85
+ self.discovered_urls.add(link)
86
+
87
+ return new_links[:100]
88
+ except Exception as e:
89
+ logger.debug(f"Erreur découverte liens {base_url}: {e}")
90
+ return []
91
+
92
+ def _extract_links_sync(self, content: bytes, base_url: str) -> List[str]:
93
+ """Extraction synchrone des liens via lxml"""
94
+ try:
95
+ tree = html.fromstring(content)
96
+ raw_links = tree.xpath('//a/@href')
97
+
98
+ valid_links = []
99
+ from urllib.parse import urljoin
100
+
101
+ for link in raw_links:
102
+ if not link: continue
103
+
104
+ # Normalisation
105
  if link.startswith('/'):
 
106
  link = urljoin(base_url, link)
107
  elif not link.startswith('http'):
108
  continue
109
 
110
+ # Filtrage simple
111
  if any(x in link.lower() for x in ['article', 'actualite', 'news', '/20', '-20']):
112
+ valid_links.append(link)
 
 
113
 
114
+ return valid_links
115
+ except:
 
116
  return []
117
 
118
  async def flush_buffer(self):
119
  """Sauvegarde groupée pour réduire les accès disque/réseau"""
120
  if not self.buffer: return
121
  logger.info(f"💾 Flush buffer: sauvegarde de {len(self.buffer)} documents...")
122
+
123
+ # Exécuter la sauvegarde lourde dans un thread pour ne pas bloquer
124
+ loop = asyncio.get_event_loop()
125
+ await loop.run_in_executor(None, self._save_buffer_sync, self.buffer.copy())
126
+
127
  self.buffer = []
128
 
129
+ def _save_buffer_sync(self, documents: List[Dict]):
130
+ """Sauvegarde synchrone (disque/DB)"""
131
+ # 1. Sauvegarde JSON Local (Critique)
132
  try:
133
  os.makedirs("data", exist_ok=True)
134
  local_file = "data/search_index.json"
135
+
136
+ # Lecture
137
+ existing_data = []
138
  if os.path.exists(local_file):
139
  with open(local_file, "r", encoding="utf-8") as f:
140
+ existing_data = json.load(f)
141
+
142
+ # Ajout
143
+ existing_data.extend(documents)
144
+
145
+ # Écriture
146
  with open(local_file, "w", encoding="utf-8") as f:
147
+ json.dump(existing_data, f, indent=2, ensure_ascii=False)
148
  except Exception as e:
149
+ logger.error(f"Erreur sauvegarde JSON: {e}")
150
 
151
+ # 2. Sauvegarde DB (Best effort)
152
+ # Note: Pour MongoDB et autres clients async, il faudrait rester dans l'async
153
+ # Mais ici on simplifie pour le file system qui est le bottleneck principal
154
+ pass
 
 
 
 
 
 
 
 
 
155
 
156
  async def process_source(self, client: httpx.AsyncClient, source: Dict):
157
  """Traite une source avec découverte de liens"""