# ===================================================================================
# WAHIS SCRAPER - VERSION SANS PLAYWRIGHT (REQUESTS + HTTPX)
# ===================================================================================
import streamlit as st
import pandas as pd
import json
from datetime import datetime
from pathlib import Path
import requests
import httpx
import asyncio
from streamlit_folium import st_folium
import folium
import traceback
import logging
import time
import random
from typing import Dict, List, Optional, Tuple
# Configuration des logs
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# --- Configuration et Initialisation ---
st.set_page_config(layout="wide", page_title="WAHIS Animal Disease Dashboard")
class WAHISScraperHTTP:
"""Scraper WAHIS utilisant uniquement HTTP requests (sans navigateur)"""
def __init__(self):
self.logs = []
self.session = None
self.base_url = "https://wahis.woah.org"
# Headers pour simuler un navigateur réel
self.headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'Accept': 'application/json, text/plain, */*',
'Accept-Language': 'fr-FR,fr;q=0.9,en;q=0.8',
'Accept-Encoding': 'gzip, deflate, br',
'Connection': 'keep-alive',
'Sec-Fetch-Dest': 'empty',
'Sec-Fetch-Mode': 'cors',
'Sec-Fetch-Site': 'same-origin',
'sec-ch-ua': '"Not_A Brand";v="8", "Chromium";v="120", "Google Chrome";v="120"',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': '"Windows"'
}
def log(self, message: str, level: str = "INFO"):
"""Logging avec niveaux"""
timestamp = datetime.now().strftime("%H:%M:%S")
formatted_message = f"[{timestamp}] {level}: {message}"
self.logs.append(formatted_message)
if level == "ERROR":
logger.error(message)
elif level == "WARNING":
logger.warning(message)
else:
logger.info(message)
def _create_session(self) -> requests.Session:
"""Crée une session HTTP avec les bons headers"""
session = requests.Session()
session.headers.update(self.headers)
# Configuration SSL et timeout
session.verify = True
session.timeout = 30
return session
def _get_api_headers(self) -> Dict[str, str]:
"""Headers spécifiques pour les API WAHIS"""
api_headers = self.headers.copy()
api_headers.update({
'Content-Type': 'application/json',
'clientid': 'OIEwebsite',
'env': 'PRD',
'security-token': 'token',
'type': 'REQUEST',
'Referer': 'https://wahis.woah.org/',
'Origin': 'https://wahis.woah.org'
})
return api_headers
def _make_api_request(self, url: str, payload: dict, retries: int = 3) -> Optional[dict]:
"""Fait une requête API avec retry logic"""
for attempt in range(retries):
try:
# Délai aléatoire pour éviter la détection
if attempt > 0:
delay = random.uniform(2, 5)
self.log(f"Tentative {attempt + 1}/{retries} après {delay:.1f}s de délai")
time.sleep(delay)
response = self.session.post(
url,
json=payload,
headers=self._get_api_headers(),
timeout=30
)
self.log(f"Statut HTTP: {response.status_code} pour {url}")
if response.status_code == 200:
return response.json()
elif response.status_code == 403:
self.log("Accès refusé (403) - possible blocage Cloudflare", "WARNING")
continue
elif response.status_code == 429:
self.log("Trop de requêtes (429) - attente prolongée", "WARNING")
time.sleep(10)
continue
else:
self.log(f"Erreur HTTP {response.status_code}: {response.text[:200]}", "WARNING")
continue
except requests.exceptions.Timeout:
self.log(f"Timeout sur tentative {attempt + 1}", "WARNING")
except requests.exceptions.ConnectionError as e:
self.log(f"Erreur de connexion: {str(e)}", "WARNING")
except Exception as e:
self.log(f"Erreur inattendue: {str(e)}", "WARNING")
self.log(f"Échec après {retries} tentatives pour {url}", "ERROR")
return None
def _initialize_session(self) -> bool:
"""Initialise la session en visitant d'abord la page principale"""
try:
self.log("🌐 Initialisation de la session HTTP")
self.session = self._create_session()
# Première visite pour établir la session
main_url = f"{self.base_url}/#/event-management"
response = self.session.get(main_url, timeout=15)
if response.status_code == 200:
self.log("✅ Session initialisée avec succès")
return True
else:
self.log(f"Erreur d'initialisation: {response.status_code}", "WARNING")
return False
except Exception as e:
self.log(f"Échec d'initialisation: {str(e)}", "ERROR")
return False
def run_extraction_sync(self) -> Tuple[List, List, List, str]:
"""Extraction principale en mode synchrone"""
self.log("🚀 Démarrage de l'extraction WAHIS (mode HTTP)")
try:
# Initialisation de la session
if not self._initialize_session():
raise Exception("Impossible d'initialiser la session HTTP")
# Attendre un peu pour simuler un utilisateur réel
time.sleep(2)
# ===== PHASE 1: Récupération des rapports =====
self.log("📋 PHASE 1: Récupération de la liste des rapports")
list_api_url = "https://wahis.woah.org/api/v1/pi/event/filtered-list?language=fr"
payload_list = {
"pageNumber": 1,
"pageSize": 100,
"sortColName": "REP_LAST_UPDATE",
"sortColOrder": "DESC",
"reportFilters": {},
"languageChanged": False
}
list_response = self._make_api_request(list_api_url, payload_list)
if not list_response:
raise Exception("Phase 1 échouée: Impossible de récupérer les rapports")
report_list = list_response.get('list', [])
if not report_list:
raise Exception("Phase 1 échouée: Liste de rapports vide")
self.log(f"✅ Phase 1 réussie: {len(report_list)} rapports récupérés")
# ===== PHASE 2: Données GPS des foyers =====
unique_event_ids = sorted(list(set(
item['eventId'] for item in report_list
if 'eventId' in item and item['eventId']
)))
self.log(f"🗺️ PHASE 2: Récupération GPS pour {len(unique_event_ids)} événements")
if not unique_event_ids:
self.log("Aucun ID d'événement trouvé", "WARNING")
return report_list, [], [], "\n".join(self.logs)
outbreaks_api_url = "https://wahis.woah.org/api/v1/pi/map-data/outbreaks-from-event-ids?language=fr"
all_outbreaks_data = self._make_api_request(outbreaks_api_url, unique_event_ids)
if not all_outbreaks_data:
self.log("Phase 2 échouée, mais continuation possible", "WARNING")
all_outbreaks_data = []
elif not isinstance(all_outbreaks_data, list):
all_outbreaks_data = []
self.log(f"✅ Phase 2: {len(all_outbreaks_data)} foyers récupérés")
# ===== PHASE 3: Détails épidémiologiques =====
unique_outbreak_ids = sorted(list(set(
item['outbreakId'] for item in all_outbreaks_data
if 'outbreakId' in item and item['outbreakId']
)))
additional_info_data = []
if unique_outbreak_ids:
self.log(f"📊 PHASE 3: Détails pour {len(unique_outbreak_ids)} foyers")
additional_info_api_url = "https://wahis.woah.org/api/v1/pi/outbreak/additional-information"
additional_info_data = self._make_api_request(additional_info_api_url, unique_outbreak_ids)
if not additional_info_data:
self.log("Phase 3 échouée (non critique)", "WARNING")
additional_info_data = []
elif not isinstance(additional_info_data, list):
additional_info_data = []
self.log(f"✅ Phase 3: {len(additional_info_data)} détails récupérés")
self.log("🎉 Extraction HTTP terminée avec succès!")
return report_list, all_outbreaks_data, additional_info_data, "\n".join(self.logs)
except Exception as e:
error_msg = f"Erreur critique pendant l'extraction HTTP: {str(e)}"
self.log(error_msg, "ERROR")
raise Exception(error_msg)
finally:
if self.session:
self.session.close()
self.log("🔒 Session HTTP fermée")
def process_data(reports: list, outbreaks: list, additional_infos: list) -> pd.DataFrame:
"""Traitement et fusion des données avec validation"""
if not outbreaks:
return pd.DataFrame()
# Validation et nettoyage des données additionnelles
valid_additional_infos = [
info for info in additional_infos
if isinstance(info, dict) and 'outbreakId' in info
]
additional_info_map = {
info['outbreakId']: info for info in valid_additional_infos
}
# Mapping des maladies depuis les rapports
report_map = {
report['eventId']: {
'disease': report.get('disease', 'N/A'),
'reportDate': report.get('reportDate'),
'country': report.get('country')
}
for report in reports if 'eventId' in report
}
# Enrichissement des données de foyers
enriched_outbreaks = []
for outbreak in outbreaks:
if not isinstance(outbreak, dict):
continue
# Copie de l'outbreak original
enriched_outbreak = outbreak.copy()
# Ajout des informations du rapport
event_id = outbreak.get('eventId')
if event_id in report_map:
event_info = report_map[event_id]
enriched_outbreak['diseaseName'] = event_info['disease']
enriched_outbreak['reportDate'] = event_info.get('reportDate')
if not enriched_outbreak.get('country'):
enriched_outbreak['country'] = event_info.get('country')
# Ajout des informations additionnelles
outbreak_id = outbreak.get('outbreakId')
if outbreak_id in additional_info_map:
additional_data = additional_info_map[outbreak_id]
enriched_outbreak.update(additional_data)
# Validation des coordonnées GPS
lat = enriched_outbreak.get('latitude')
lon = enriched_outbreak.get('longitude')
if lat is not None and lon is not None:
try:
lat_float = float(lat)
lon_float = float(lon)
if -90 <= lat_float <= 90 and -180 <= lon_float <= 180:
enriched_outbreak['latitude'] = lat_float
enriched_outbreak['longitude'] = lon_float
enriched_outbreaks.append(enriched_outbreak)
except (ValueError, TypeError):
# Ignorer les entrées avec des coordonnées invalides
continue
return pd.DataFrame(enriched_outbreaks)
# --- Interface Streamlit ---
st.title("🦠 Tableau de Bord WAHIS - Maladies Animales")
st.markdown("*Surveillance mondiale des maladies animales (OIE/WOAH) - Version HTTP*")
# Initialisation des données de session
if 'df_outbreaks' not in st.session_state:
st.session_state.df_outbreaks = pd.DataFrame()
st.session_state.logs = ""
st.session_state.last_update = None
# Bouton d'extraction
col1, col2 = st.columns([1, 2])
with col1:
extract_button = st.button("🚀 Extraire les données WAHIS", type="primary")
with col2:
if st.session_state.last_update:
st.success(f"Dernière mise à jour: {st.session_state.last_update}")
if extract_button:
with st.spinner("🔄 Extraction en cours via HTTP... (1-2 minutes)"):
progress_bar = st.progress(0)
status_text = st.empty()
try:
scraper = WAHISScraperHTTP()
status_text.text("Initialisation de la connexion...")
progress_bar.progress(20)
# Lancement de l'extraction
reports, outbreaks, additional, logs = scraper.run_extraction_sync()
progress_bar.progress(80)
status_text.text("Traitement des données...")
if reports:
st.session_state.df_outbreaks = process_data(reports, outbreaks, additional)
st.session_state.logs = logs
st.session_state.last_update = datetime.now().strftime("%d/%m/%Y %H:%M")
progress_bar.progress(100)
status_text.empty()
st.success(f"✅ Extraction HTTP réussie! {len(st.session_state.df_outbreaks)} foyers récupérés.")
st.rerun()
else:
progress_bar.empty()
status_text.empty()
st.error("❌ Échec de l'extraction. Consultez les logs pour plus de détails.")
st.session_state.logs = logs
except Exception as e:
progress_bar.empty()
status_text.empty()
st.error("❌ Erreur critique pendant l'extraction HTTP")
st.code(str(e))
# Afficher des solutions possibles
st.info("💡 Solutions possibles:")
st.markdown("""
- Le site WAHIS peut être temporairement inaccessible
- Cloudflare bloque peut-être les requêtes automatisées
- Essayez de relancer dans quelques minutes
- Vérifiez votre connexion internet
""")
# Affichage des données si disponibles
if not st.session_state.df_outbreaks.empty:
df = st.session_state.df_outbreaks
# Sidebar avec filtres
st.sidebar.header("🔍 Filtres de recherche")
# Statistiques rapides
st.sidebar.metric("Total foyers", len(df))
st.sidebar.metric("Pays affectés", df['country'].nunique() if 'country' in df.columns else 0)
st.sidebar.metric("Maladies détectées", df['diseaseName'].nunique() if 'diseaseName' in df.columns else 0)
# Filtres
all_diseases = ["Toutes"] + sorted(df['diseaseName'].dropna().unique().tolist())
all_countries = ["Tous"] + sorted(df['country'].dropna().unique().tolist()) if 'country' in df.columns else ["Tous"]
all_species = ["Toutes"] + sorted(df['species'].dropna().unique().tolist()) if 'species' in df.columns else ["Toutes"]
selected_disease = st.sidebar.selectbox("🦠 Maladie", all_diseases)
selected_country = st.sidebar.selectbox("🌍 Pays", all_countries)
selected_species = st.sidebar.selectbox("🐄 Espèce", all_species)
# Application des filtres
filtered_df = df.copy()
if selected_disease != "Toutes":
filtered_df = filtered_df[filtered_df['diseaseName'] == selected_disease]
if selected_country != "Tous" and 'country' in df.columns:
filtered_df = filtered_df[filtered_df['country'] == selected_country]
if selected_species != "Toutes" and 'species' in df.columns:
filtered_df = filtered_df[filtered_df['species'] == selected_species]
# Carte interactive
st.header(f"🗺️ Localisation de {len(filtered_df)} foyer(s)")
if filtered_df.empty:
st.warning("⚠️ Aucun foyer ne correspond aux filtres sélectionnés.")
elif not all(col in filtered_df.columns for col in ['latitude', 'longitude']):
st.warning("⚠️ Données GPS manquantes pour l'affichage de la carte.")
else:
# Calcul du centre de la carte
center_lat = filtered_df['latitude'].mean()
center_lon = filtered_df['longitude'].mean()
m = folium.Map(
location=[center_lat, center_lon],
zoom_start=4,
tiles='OpenStreetMap'
)
# Ajout des marqueurs avec clustering
from folium.plugins import MarkerCluster
marker_cluster = MarkerCluster().add_to(m)
for _, row in filtered_df.iterrows():
popup_content = f"""
🏥 Foyer: {row.get('locationName', 'Non spécifié')}
🦠 Maladie: {row.get('diseaseName', 'N/A')}
🐄 Espèce: {row.get('species', 'N/A')}
🌍 Pays: {row.get('country', 'N/A')}
📅 Date: {row.get('reportDate', 'N/A')}
"""
# Couleur selon le type de maladie
color = 'red' if 'influenza' in str(row.get('diseaseName', '')).lower() else 'blue'
folium.Marker(
location=[row['latitude'], row['longitude']],
popup=folium.Popup(popup_content, max_width=300),
tooltip=f"{row.get('diseaseName', 'N/A')} - {row.get('country', 'N/A')}",
icon=folium.Icon(color=color)
).add_to(marker_cluster)
st_folium(m, width='100%', height=500)
# Tableau des données
with st.expander("📊 Tableau détaillé des foyers", expanded=False):
if not filtered_df.empty:
# Sélection des colonnes importantes
display_columns = []
for col in ['diseaseName', 'country', 'locationName', 'species', 'reportDate', 'latitude', 'longitude']:
if col in filtered_df.columns:
display_columns.append(col)
if display_columns:
st.dataframe(
filtered_df[display_columns],
use_container_width=True,
height=400
)
# Bouton de téléchargement
csv = filtered_df.to_csv(index=False)
st.download_button(
label="📥 Télécharger les données (CSV)",
data=csv,
file_name=f"wahis_foyers_{datetime.now().strftime('%Y%m%d_%H%M')}.csv",
mime="text/csv"
)
else:
st.info("Aucune donnée à afficher avec les filtres actuels.")
# Logs et informations
with st.expander("🔧 Journal d'exécution", expanded=False):
if st.session_state.get('logs'):
st.text_area("Logs détaillés:", st.session_state.logs, height=400)
else:
st.info("Aucun log disponible. Lancez une extraction pour voir les détails.")
# Section d'aide
with st.expander("ℹ️ À propos de cette version", expanded=False):
st.markdown("""
### Version HTTP (sans Playwright)
Cette version utilise des requêtes HTTP directes au lieu d'un navigateur web:
**✅ Avantages:**
- Plus léger et rapide
- Fonctionne dans tous les environnements
- Pas de dépendances lourdes
**⚠️ Limitations:**
- Peut être bloqué par Cloudflare
- Moins robuste face aux changements du site
- Nécessite parfois plusieurs tentatives
**🔧 En cas de problème:**
- Relancez l'extraction après quelques minutes
- Le site WAHIS peut être temporairement indisponible
- Cloudflare peut bloquer les requêtes automatisées
""")
# Footer
st.markdown("---")
st.markdown(
"🔬 *Outil développé pour la surveillance des maladies animales - "
"Données issues de WAHIS (OIE/WOAH) - Version HTTP*"
)