WAHIS / app.py
MMOON's picture
Update app.py
7b1c181 verified
# Fichier: app.py
# ===================================================================================
# WAHIS SCRAPER - VERSION FINALE, SIMPLE ET ROBUSTE (FILTRES ET TABLEAU)
# ===================================================================================
import gradio as gr
import pandas as pd
import json
from datetime import datetime
from pathlib import Path
import warnings
import traceback
import asyncio
import subprocess
import zipfile
from playwright.async_api import async_playwright
from playwright_stealth import stealth_async
# Installation du navigateur (ne change pas)
def install_playwright_browsers():
print("Vérification et installation des navigateurs Playwright...")
try:
subprocess.run(["playwright", "install", "chromium"], capture_output=True, text=True, check=True)
print("Installation de Chromium terminée avec succès.")
except Exception as e:
print(f"Erreur lors de l'installation de Chromium : {e}")
raise
install_playwright_browsers()
warnings.filterwarnings('ignore')
OUTPUT_DIR = Path("outputs")
OUTPUT_DIR.mkdir(exist_ok=True)
class WAHISScraper:
# La logique du scraper est stable et correcte.
def __init__(self): self.logs = []
def log(self, message):
timestamp = datetime.now().strftime("%H:%M:%S")
self.logs.append(f"[{timestamp}] {message}")
print(message)
async def run_extraction_async(self):
self.log("🚀 Lancement de l'extraction en trois phases...")
async with async_playwright() as p:
browser = None
try:
self.log("🔧 Lancement d'un navigateur Chromium...")
browser = await p.chromium.launch(headless=True)
page = await browser.new_page()
self.log("🕵️ Application du camouflage 'stealth'...")
await stealth_async(page)
self.log("🌍 Visite de la page principale pour passer le challenge Cloudflare...")
await page.goto("https://wahis.woah.org/#/event-management", wait_until="networkidle", timeout=90000)
self.log("🍪 Challenge Cloudflare réussi.")
headers = { 'Content-Type': 'application/json', 'Accept': 'application/json', 'clientid': 'OIEwebsite', 'env': 'PRD', 'security-token': 'token', 'type': 'REQUEST' }
self.log("--- PHASE 1 : Récupération de la liste des rapports ---")
list_api_url = "https://wahis.woah.org/api/v1/pi/event/filtered-list?language=fr"
payload_list = { "pageNumber": 1, "pageSize": 100, "sortColName": "REP_LAST_UPDATE", "sortColOrder": "DESC", "reportFilters": {}, "languageChanged": False }
list_response_json = await page.evaluate("async (args) => (await fetch(args.url, { method: 'POST', headers: args.headers, body: JSON.stringify(args.payload) })).json()", {'url': list_api_url, 'headers': headers, 'payload': payload_list})
report_list = list_response_json.get('list', [])
if not report_list: raise Exception("Échec de la phase 1.")
self.log(f"✅ Phase 1 réussie : {len(report_list)} rapports de base récupérés.")
unique_event_ids = sorted(list(set(item['eventId'] for item in report_list if 'eventId' in item)))
self.log(f"--- PHASE 2 : Récupération des données GPS pour {len(unique_event_ids)} événements...")
outbreaks_api_url = "https://wahis.woah.org/api/v1/pi/map-data/outbreaks-from-event-ids?language=fr"
all_outbreaks_data = await page.evaluate("async (args) => (await fetch(args.url, { method: 'POST', headers: args.headers, body: JSON.stringify(args.payload) })).json()", {'url': outbreaks_api_url, 'headers': headers, 'payload': unique_event_ids})
self.log(f"✅ Phase 2 réussie : {len(all_outbreaks_data)} foyers récupérés.")
unique_outbreak_ids = sorted(list(set(item['outbreakId'] for item in all_outbreaks_data if 'outbreakId' in item)))
self.log(f"--- PHASE 3 : Récupération des détails épidémiologiques pour {len(unique_outbreak_ids)} foyers...")
additional_info_data = []
if unique_outbreak_ids:
additional_info_api_url = "https://wahis.woah.org/api/v1/pi/outbreak/additional-information"
additional_info_data = await page.evaluate("async (args) => (await fetch(args.url, { method: 'POST', headers: args.headers, body: JSON.stringify(args.payload) })).json()", {'url': additional_info_api_url, 'headers': headers, 'payload': unique_outbreak_ids})
self.log(f"✅ Phase 3 réussie : {len(additional_info_data)} fiches de détails récupérées.")
return report_list, all_outbreaks_data, additional_info_data, "\n".join(self.logs)
except Exception as e:
self.log(f"❌ Une erreur critique est survenue."); self.log(traceback.format_exc())
return None, None, None, "\n".join(self.logs)
finally:
if browser and browser.is_connected(): await browser.close()
def process_data_and_create_zip(reports, outbreaks, additional_infos):
if not reports: return pd.DataFrame(), [], [], [], None
# Enrichir les données des foyers avec le nom de la maladie et du pays
report_map = {report['eventId']: {'disease': report['disease'], 'country': report['country']} for report in reports}
for outbreak in outbreaks:
event_info = report_map.get(outbreak.get('eventId'), {})
outbreak['disease'] = event_info.get('disease')
outbreak['country'] = event_info.get('country')
valid_additional_infos = [info for info in additional_infos if isinstance(info, dict)]
additional_info_map = {info.get('outbreakId'): info for info in valid_additional_infos}
for outbreak in outbreaks:
outbreak_id = outbreak.get('outbreakId')
if outbreak_id in additional_info_map: outbreak.update(additional_info_map[outbreak_id])
df_outbreaks = pd.DataFrame(outbreaks)
# Créer les listes pour les filtres à partir des données complètes
all_countries = sorted(df_outbreaks['country'].dropna().unique()) if 'country' in df_outbreaks else []
all_diseases = sorted(df_outbreaks['disease'].dropna().unique()) if 'disease' in df_outbreaks else []
all_species = sorted(df_outbreaks['species'].dropna().unique()) if 'species' in df_outbreaks else []
# Créer le package ZIP
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
zip_path = OUTPUT_DIR / f"wahis_package_{timestamp}.zip"
with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
df_summary = pd.DataFrame(reports)
df_summary.to_excel(OUTPUT_DIR / "1_summary_events.xlsx", index=False)
df_outbreaks.to_excel(OUTPUT_DIR / "2_outbreaks_full_details.xlsx", index=False)
zipf.write(OUTPUT_DIR / "1_summary_events.xlsx", arcname="1_summary_events.xlsx")
zipf.write(OUTPUT_DIR / "2_outbreaks_full_details.xlsx", arcname="2_outbreaks_full_details.xlsx")
return df_outbreaks, ["Tous"] + all_countries, ["Toutes"] + all_diseases, ["Toutes"] + all_species, str(zip_path)
# --- Fonctions de l'interface Gradio ---
async def run_and_update_ui():
scraper = WAHISScraper()
reports, outbreaks, additional_infos, logs = await scraper.run_extraction_async()
if not reports:
return {status_textbox: logs, ui_visibility_group: gr.Group(visible=False)}
df_outbreaks, countries, diseases, species, zip_path = process_data_and_create_zip(reports, outbreaks, additional_infos)
return {
status_textbox: logs,
outbreak_data_state: df_outbreaks,
filter_country: gr.Dropdown(choices=countries, value="Tous"),
filter_disease: gr.Dropdown(choices=diseases, value="Toutes"),
filter_species: gr.Dropdown(choices=species, value="Toutes"),
outbreaks_table: df_outbreaks,
download_section: gr.File(value=zip_path, visible=True),
ui_visibility_group: gr.Group(visible=True)
}
def update_table(country, disease, species, df_outbreaks):
if df_outbreaks is None or df_outbreaks.empty: return pd.DataFrame()
filtered_df = df_outbreaks.copy()
if country != "Tous": filtered_df = filtered_df[filtered_df['country'] == country]
if disease != "Toutes": filtered_df = filtered_df[filtered_df['disease'] == disease]
if species != "Toutes": filtered_df = filtered_df[filtered_df['species'] == species]
return filtered_df
with gr.Blocks(theme=gr.themes.Soft(), title="WAHIS Scraper") as demo:
outbreak_data_state = gr.State()
gr.Markdown("# 🤖 Scraper pour WAHIS (WOAH) - Version Robuste")
run_button = gr.Button("🚀 Lancer l'extraction des données", variant="primary")
with gr.Group(visible=False) as ui_visibility_group:
gr.Markdown("### 🔍 Filtrez les données")
with gr.Row():
filter_country = gr.Dropdown(label="Pays")
filter_disease = gr.Dropdown(label="Maladie")
filter_species = gr.Dropdown(label="Espèce")
gr.Markdown("### 📋 Tableau Détaillé des Foyers (avec Coordonnées GPS)")
outbreaks_table = gr.DataFrame(wrap=True)
with gr.Accordion("Journal d'exécution et Téléchargement", open=False):
status_textbox = gr.Textbox(lines=15, label="📜 Logs", interactive=False)
download_section = gr.File(label="💾 Télécharger le Package Complet (.zip)")
run_button.click(
fn=run_and_update_ui,
inputs=[],
outputs=[status_textbox, ui_visibility_group, outbreak_data_state, filter_country, filter_disease, filter_species, outbreaks_table, download_section]
)
filters = [filter_country, filter_disease, filter_species]
for f in filters:
f.change(
fn=update_table,
inputs=[filter_country, filter_disease, filter_species, outbreak_data_state],
outputs=[outbreaks_table]
)
if __name__ == "__main__":
demo.launch()