# =================================================================================== # WAHIS SCRAPER - VERSION SANS PLAYWRIGHT (REQUESTS + HTTPX) # =================================================================================== import streamlit as st import pandas as pd import json from datetime import datetime from pathlib import Path import requests import httpx import asyncio from streamlit_folium import st_folium import folium import traceback import logging import time import random from typing import Dict, List, Optional, Tuple # Configuration des logs logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # --- Configuration et Initialisation --- st.set_page_config(layout="wide", page_title="WAHIS Animal Disease Dashboard") class WAHISScraperHTTP: """Scraper WAHIS utilisant uniquement HTTP requests (sans navigateur)""" def __init__(self): self.logs = [] self.session = None self.base_url = "https://wahis.woah.org" # Headers pour simuler un navigateur réel self.headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', 'Accept': 'application/json, text/plain, */*', 'Accept-Language': 'fr-FR,fr;q=0.9,en;q=0.8', 'Accept-Encoding': 'gzip, deflate, br', 'Connection': 'keep-alive', 'Sec-Fetch-Dest': 'empty', 'Sec-Fetch-Mode': 'cors', 'Sec-Fetch-Site': 'same-origin', 'sec-ch-ua': '"Not_A Brand";v="8", "Chromium";v="120", "Google Chrome";v="120"', 'sec-ch-ua-mobile': '?0', 'sec-ch-ua-platform': '"Windows"' } def log(self, message: str, level: str = "INFO"): """Logging avec niveaux""" timestamp = datetime.now().strftime("%H:%M:%S") formatted_message = f"[{timestamp}] {level}: {message}" self.logs.append(formatted_message) if level == "ERROR": logger.error(message) elif level == "WARNING": logger.warning(message) else: logger.info(message) def _create_session(self) -> requests.Session: """Crée une session HTTP avec les bons headers""" session = requests.Session() session.headers.update(self.headers) # Configuration SSL et timeout session.verify = True session.timeout = 30 return session def _get_api_headers(self) -> Dict[str, str]: """Headers spécifiques pour les API WAHIS""" api_headers = self.headers.copy() api_headers.update({ 'Content-Type': 'application/json', 'clientid': 'OIEwebsite', 'env': 'PRD', 'security-token': 'token', 'type': 'REQUEST', 'Referer': 'https://wahis.woah.org/', 'Origin': 'https://wahis.woah.org' }) return api_headers def _make_api_request(self, url: str, payload: dict, retries: int = 3) -> Optional[dict]: """Fait une requête API avec retry logic""" for attempt in range(retries): try: # Délai aléatoire pour éviter la détection if attempt > 0: delay = random.uniform(2, 5) self.log(f"Tentative {attempt + 1}/{retries} après {delay:.1f}s de délai") time.sleep(delay) response = self.session.post( url, json=payload, headers=self._get_api_headers(), timeout=30 ) self.log(f"Statut HTTP: {response.status_code} pour {url}") if response.status_code == 200: return response.json() elif response.status_code == 403: self.log("Accès refusé (403) - possible blocage Cloudflare", "WARNING") continue elif response.status_code == 429: self.log("Trop de requêtes (429) - attente prolongée", "WARNING") time.sleep(10) continue else: self.log(f"Erreur HTTP {response.status_code}: {response.text[:200]}", "WARNING") continue except requests.exceptions.Timeout: self.log(f"Timeout sur tentative {attempt + 1}", "WARNING") except requests.exceptions.ConnectionError as e: self.log(f"Erreur de connexion: {str(e)}", "WARNING") except Exception as e: self.log(f"Erreur inattendue: {str(e)}", "WARNING") self.log(f"Échec après {retries} tentatives pour {url}", "ERROR") return None def _initialize_session(self) -> bool: """Initialise la session en visitant d'abord la page principale""" try: self.log("🌐 Initialisation de la session HTTP") self.session = self._create_session() # Première visite pour établir la session main_url = f"{self.base_url}/#/event-management" response = self.session.get(main_url, timeout=15) if response.status_code == 200: self.log("✅ Session initialisée avec succès") return True else: self.log(f"Erreur d'initialisation: {response.status_code}", "WARNING") return False except Exception as e: self.log(f"Échec d'initialisation: {str(e)}", "ERROR") return False def run_extraction_sync(self) -> Tuple[List, List, List, str]: """Extraction principale en mode synchrone""" self.log("🚀 Démarrage de l'extraction WAHIS (mode HTTP)") try: # Initialisation de la session if not self._initialize_session(): raise Exception("Impossible d'initialiser la session HTTP") # Attendre un peu pour simuler un utilisateur réel time.sleep(2) # ===== PHASE 1: Récupération des rapports ===== self.log("📋 PHASE 1: Récupération de la liste des rapports") list_api_url = "https://wahis.woah.org/api/v1/pi/event/filtered-list?language=fr" payload_list = { "pageNumber": 1, "pageSize": 100, "sortColName": "REP_LAST_UPDATE", "sortColOrder": "DESC", "reportFilters": {}, "languageChanged": False } list_response = self._make_api_request(list_api_url, payload_list) if not list_response: raise Exception("Phase 1 échouée: Impossible de récupérer les rapports") report_list = list_response.get('list', []) if not report_list: raise Exception("Phase 1 échouée: Liste de rapports vide") self.log(f"✅ Phase 1 réussie: {len(report_list)} rapports récupérés") # ===== PHASE 2: Données GPS des foyers ===== unique_event_ids = sorted(list(set( item['eventId'] for item in report_list if 'eventId' in item and item['eventId'] ))) self.log(f"🗺️ PHASE 2: Récupération GPS pour {len(unique_event_ids)} événements") if not unique_event_ids: self.log("Aucun ID d'événement trouvé", "WARNING") return report_list, [], [], "\n".join(self.logs) outbreaks_api_url = "https://wahis.woah.org/api/v1/pi/map-data/outbreaks-from-event-ids?language=fr" all_outbreaks_data = self._make_api_request(outbreaks_api_url, unique_event_ids) if not all_outbreaks_data: self.log("Phase 2 échouée, mais continuation possible", "WARNING") all_outbreaks_data = [] elif not isinstance(all_outbreaks_data, list): all_outbreaks_data = [] self.log(f"✅ Phase 2: {len(all_outbreaks_data)} foyers récupérés") # ===== PHASE 3: Détails épidémiologiques ===== unique_outbreak_ids = sorted(list(set( item['outbreakId'] for item in all_outbreaks_data if 'outbreakId' in item and item['outbreakId'] ))) additional_info_data = [] if unique_outbreak_ids: self.log(f"📊 PHASE 3: Détails pour {len(unique_outbreak_ids)} foyers") additional_info_api_url = "https://wahis.woah.org/api/v1/pi/outbreak/additional-information" additional_info_data = self._make_api_request(additional_info_api_url, unique_outbreak_ids) if not additional_info_data: self.log("Phase 3 échouée (non critique)", "WARNING") additional_info_data = [] elif not isinstance(additional_info_data, list): additional_info_data = [] self.log(f"✅ Phase 3: {len(additional_info_data)} détails récupérés") self.log("🎉 Extraction HTTP terminée avec succès!") return report_list, all_outbreaks_data, additional_info_data, "\n".join(self.logs) except Exception as e: error_msg = f"Erreur critique pendant l'extraction HTTP: {str(e)}" self.log(error_msg, "ERROR") raise Exception(error_msg) finally: if self.session: self.session.close() self.log("🔒 Session HTTP fermée") def process_data(reports: list, outbreaks: list, additional_infos: list) -> pd.DataFrame: """Traitement et fusion des données avec validation""" if not outbreaks: return pd.DataFrame() # Validation et nettoyage des données additionnelles valid_additional_infos = [ info for info in additional_infos if isinstance(info, dict) and 'outbreakId' in info ] additional_info_map = { info['outbreakId']: info for info in valid_additional_infos } # Mapping des maladies depuis les rapports report_map = { report['eventId']: { 'disease': report.get('disease', 'N/A'), 'reportDate': report.get('reportDate'), 'country': report.get('country') } for report in reports if 'eventId' in report } # Enrichissement des données de foyers enriched_outbreaks = [] for outbreak in outbreaks: if not isinstance(outbreak, dict): continue # Copie de l'outbreak original enriched_outbreak = outbreak.copy() # Ajout des informations du rapport event_id = outbreak.get('eventId') if event_id in report_map: event_info = report_map[event_id] enriched_outbreak['diseaseName'] = event_info['disease'] enriched_outbreak['reportDate'] = event_info.get('reportDate') if not enriched_outbreak.get('country'): enriched_outbreak['country'] = event_info.get('country') # Ajout des informations additionnelles outbreak_id = outbreak.get('outbreakId') if outbreak_id in additional_info_map: additional_data = additional_info_map[outbreak_id] enriched_outbreak.update(additional_data) # Validation des coordonnées GPS lat = enriched_outbreak.get('latitude') lon = enriched_outbreak.get('longitude') if lat is not None and lon is not None: try: lat_float = float(lat) lon_float = float(lon) if -90 <= lat_float <= 90 and -180 <= lon_float <= 180: enriched_outbreak['latitude'] = lat_float enriched_outbreak['longitude'] = lon_float enriched_outbreaks.append(enriched_outbreak) except (ValueError, TypeError): # Ignorer les entrées avec des coordonnées invalides continue return pd.DataFrame(enriched_outbreaks) # --- Interface Streamlit --- st.title("🦠 Tableau de Bord WAHIS - Maladies Animales") st.markdown("*Surveillance mondiale des maladies animales (OIE/WOAH) - Version HTTP*") # Initialisation des données de session if 'df_outbreaks' not in st.session_state: st.session_state.df_outbreaks = pd.DataFrame() st.session_state.logs = "" st.session_state.last_update = None # Bouton d'extraction col1, col2 = st.columns([1, 2]) with col1: extract_button = st.button("🚀 Extraire les données WAHIS", type="primary") with col2: if st.session_state.last_update: st.success(f"Dernière mise à jour: {st.session_state.last_update}") if extract_button: with st.spinner("🔄 Extraction en cours via HTTP... (1-2 minutes)"): progress_bar = st.progress(0) status_text = st.empty() try: scraper = WAHISScraperHTTP() status_text.text("Initialisation de la connexion...") progress_bar.progress(20) # Lancement de l'extraction reports, outbreaks, additional, logs = scraper.run_extraction_sync() progress_bar.progress(80) status_text.text("Traitement des données...") if reports: st.session_state.df_outbreaks = process_data(reports, outbreaks, additional) st.session_state.logs = logs st.session_state.last_update = datetime.now().strftime("%d/%m/%Y %H:%M") progress_bar.progress(100) status_text.empty() st.success(f"✅ Extraction HTTP réussie! {len(st.session_state.df_outbreaks)} foyers récupérés.") st.rerun() else: progress_bar.empty() status_text.empty() st.error("❌ Échec de l'extraction. Consultez les logs pour plus de détails.") st.session_state.logs = logs except Exception as e: progress_bar.empty() status_text.empty() st.error("❌ Erreur critique pendant l'extraction HTTP") st.code(str(e)) # Afficher des solutions possibles st.info("💡 Solutions possibles:") st.markdown(""" - Le site WAHIS peut être temporairement inaccessible - Cloudflare bloque peut-être les requêtes automatisées - Essayez de relancer dans quelques minutes - Vérifiez votre connexion internet """) # Affichage des données si disponibles if not st.session_state.df_outbreaks.empty: df = st.session_state.df_outbreaks # Sidebar avec filtres st.sidebar.header("🔍 Filtres de recherche") # Statistiques rapides st.sidebar.metric("Total foyers", len(df)) st.sidebar.metric("Pays affectés", df['country'].nunique() if 'country' in df.columns else 0) st.sidebar.metric("Maladies détectées", df['diseaseName'].nunique() if 'diseaseName' in df.columns else 0) # Filtres all_diseases = ["Toutes"] + sorted(df['diseaseName'].dropna().unique().tolist()) all_countries = ["Tous"] + sorted(df['country'].dropna().unique().tolist()) if 'country' in df.columns else ["Tous"] all_species = ["Toutes"] + sorted(df['species'].dropna().unique().tolist()) if 'species' in df.columns else ["Toutes"] selected_disease = st.sidebar.selectbox("🦠 Maladie", all_diseases) selected_country = st.sidebar.selectbox("🌍 Pays", all_countries) selected_species = st.sidebar.selectbox("🐄 Espèce", all_species) # Application des filtres filtered_df = df.copy() if selected_disease != "Toutes": filtered_df = filtered_df[filtered_df['diseaseName'] == selected_disease] if selected_country != "Tous" and 'country' in df.columns: filtered_df = filtered_df[filtered_df['country'] == selected_country] if selected_species != "Toutes" and 'species' in df.columns: filtered_df = filtered_df[filtered_df['species'] == selected_species] # Carte interactive st.header(f"🗺️ Localisation de {len(filtered_df)} foyer(s)") if filtered_df.empty: st.warning("⚠️ Aucun foyer ne correspond aux filtres sélectionnés.") elif not all(col in filtered_df.columns for col in ['latitude', 'longitude']): st.warning("⚠️ Données GPS manquantes pour l'affichage de la carte.") else: # Calcul du centre de la carte center_lat = filtered_df['latitude'].mean() center_lon = filtered_df['longitude'].mean() m = folium.Map( location=[center_lat, center_lon], zoom_start=4, tiles='OpenStreetMap' ) # Ajout des marqueurs avec clustering from folium.plugins import MarkerCluster marker_cluster = MarkerCluster().add_to(m) for _, row in filtered_df.iterrows(): popup_content = f""" 🏥 Foyer: {row.get('locationName', 'Non spécifié')}
🦠 Maladie: {row.get('diseaseName', 'N/A')}
🐄 Espèce: {row.get('species', 'N/A')}
🌍 Pays: {row.get('country', 'N/A')}
📅 Date: {row.get('reportDate', 'N/A')} """ # Couleur selon le type de maladie color = 'red' if 'influenza' in str(row.get('diseaseName', '')).lower() else 'blue' folium.Marker( location=[row['latitude'], row['longitude']], popup=folium.Popup(popup_content, max_width=300), tooltip=f"{row.get('diseaseName', 'N/A')} - {row.get('country', 'N/A')}", icon=folium.Icon(color=color) ).add_to(marker_cluster) st_folium(m, width='100%', height=500) # Tableau des données with st.expander("📊 Tableau détaillé des foyers", expanded=False): if not filtered_df.empty: # Sélection des colonnes importantes display_columns = [] for col in ['diseaseName', 'country', 'locationName', 'species', 'reportDate', 'latitude', 'longitude']: if col in filtered_df.columns: display_columns.append(col) if display_columns: st.dataframe( filtered_df[display_columns], use_container_width=True, height=400 ) # Bouton de téléchargement csv = filtered_df.to_csv(index=False) st.download_button( label="📥 Télécharger les données (CSV)", data=csv, file_name=f"wahis_foyers_{datetime.now().strftime('%Y%m%d_%H%M')}.csv", mime="text/csv" ) else: st.info("Aucune donnée à afficher avec les filtres actuels.") # Logs et informations with st.expander("🔧 Journal d'exécution", expanded=False): if st.session_state.get('logs'): st.text_area("Logs détaillés:", st.session_state.logs, height=400) else: st.info("Aucun log disponible. Lancez une extraction pour voir les détails.") # Section d'aide with st.expander("ℹ️ À propos de cette version", expanded=False): st.markdown(""" ### Version HTTP (sans Playwright) Cette version utilise des requêtes HTTP directes au lieu d'un navigateur web: **✅ Avantages:** - Plus léger et rapide - Fonctionne dans tous les environnements - Pas de dépendances lourdes **⚠️ Limitations:** - Peut être bloqué par Cloudflare - Moins robuste face aux changements du site - Nécessite parfois plusieurs tentatives **🔧 En cas de problème:** - Relancez l'extraction après quelques minutes - Le site WAHIS peut être temporairement indisponible - Cloudflare peut bloquer les requêtes automatisées """) # Footer st.markdown("---") st.markdown( "🔬 *Outil développé pour la surveillance des maladies animales - " "Données issues de WAHIS (OIE/WOAH) - Version HTTP*" )