# Fichier: app.py # =================================================================================== # WAHIS SCRAPER - VERSION FINALE, SIMPLE ET ROBUSTE (FILTRES ET TABLEAU) # =================================================================================== import gradio as gr import pandas as pd import json from datetime import datetime from pathlib import Path import warnings import traceback import asyncio import subprocess import zipfile from playwright.async_api import async_playwright from playwright_stealth import stealth_async # Installation du navigateur (ne change pas) def install_playwright_browsers(): print("Vérification et installation des navigateurs Playwright...") try: subprocess.run(["playwright", "install", "chromium"], capture_output=True, text=True, check=True) print("Installation de Chromium terminée avec succès.") except Exception as e: print(f"Erreur lors de l'installation de Chromium : {e}") raise install_playwright_browsers() warnings.filterwarnings('ignore') OUTPUT_DIR = Path("outputs") OUTPUT_DIR.mkdir(exist_ok=True) class WAHISScraper: # La logique du scraper est stable et correcte. def __init__(self): self.logs = [] def log(self, message): timestamp = datetime.now().strftime("%H:%M:%S") self.logs.append(f"[{timestamp}] {message}") print(message) async def run_extraction_async(self): self.log("🚀 Lancement de l'extraction en trois phases...") async with async_playwright() as p: browser = None try: self.log("🔧 Lancement d'un navigateur Chromium...") browser = await p.chromium.launch(headless=True) page = await browser.new_page() self.log("🕵️ Application du camouflage 'stealth'...") await stealth_async(page) self.log("🌍 Visite de la page principale pour passer le challenge Cloudflare...") await page.goto("https://wahis.woah.org/#/event-management", wait_until="networkidle", timeout=90000) self.log("🍪 Challenge Cloudflare réussi.") headers = { 'Content-Type': 'application/json', 'Accept': 'application/json', 'clientid': 'OIEwebsite', 'env': 'PRD', 'security-token': 'token', 'type': 'REQUEST' } self.log("--- PHASE 1 : Récupération de la liste des rapports ---") list_api_url = "https://wahis.woah.org/api/v1/pi/event/filtered-list?language=fr" payload_list = { "pageNumber": 1, "pageSize": 100, "sortColName": "REP_LAST_UPDATE", "sortColOrder": "DESC", "reportFilters": {}, "languageChanged": False } list_response_json = await page.evaluate("async (args) => (await fetch(args.url, { method: 'POST', headers: args.headers, body: JSON.stringify(args.payload) })).json()", {'url': list_api_url, 'headers': headers, 'payload': payload_list}) report_list = list_response_json.get('list', []) if not report_list: raise Exception("Échec de la phase 1.") self.log(f"✅ Phase 1 réussie : {len(report_list)} rapports de base récupérés.") unique_event_ids = sorted(list(set(item['eventId'] for item in report_list if 'eventId' in item))) self.log(f"--- PHASE 2 : Récupération des données GPS pour {len(unique_event_ids)} événements...") outbreaks_api_url = "https://wahis.woah.org/api/v1/pi/map-data/outbreaks-from-event-ids?language=fr" all_outbreaks_data = await page.evaluate("async (args) => (await fetch(args.url, { method: 'POST', headers: args.headers, body: JSON.stringify(args.payload) })).json()", {'url': outbreaks_api_url, 'headers': headers, 'payload': unique_event_ids}) self.log(f"✅ Phase 2 réussie : {len(all_outbreaks_data)} foyers récupérés.") unique_outbreak_ids = sorted(list(set(item['outbreakId'] for item in all_outbreaks_data if 'outbreakId' in item))) self.log(f"--- PHASE 3 : Récupération des détails épidémiologiques pour {len(unique_outbreak_ids)} foyers...") additional_info_data = [] if unique_outbreak_ids: additional_info_api_url = "https://wahis.woah.org/api/v1/pi/outbreak/additional-information" additional_info_data = await page.evaluate("async (args) => (await fetch(args.url, { method: 'POST', headers: args.headers, body: JSON.stringify(args.payload) })).json()", {'url': additional_info_api_url, 'headers': headers, 'payload': unique_outbreak_ids}) self.log(f"✅ Phase 3 réussie : {len(additional_info_data)} fiches de détails récupérées.") return report_list, all_outbreaks_data, additional_info_data, "\n".join(self.logs) except Exception as e: self.log(f"❌ Une erreur critique est survenue."); self.log(traceback.format_exc()) return None, None, None, "\n".join(self.logs) finally: if browser and browser.is_connected(): await browser.close() def process_data_and_create_zip(reports, outbreaks, additional_infos): if not reports: return pd.DataFrame(), [], [], [], None # Enrichir les données des foyers avec le nom de la maladie et du pays report_map = {report['eventId']: {'disease': report['disease'], 'country': report['country']} for report in reports} for outbreak in outbreaks: event_info = report_map.get(outbreak.get('eventId'), {}) outbreak['disease'] = event_info.get('disease') outbreak['country'] = event_info.get('country') valid_additional_infos = [info for info in additional_infos if isinstance(info, dict)] additional_info_map = {info.get('outbreakId'): info for info in valid_additional_infos} for outbreak in outbreaks: outbreak_id = outbreak.get('outbreakId') if outbreak_id in additional_info_map: outbreak.update(additional_info_map[outbreak_id]) df_outbreaks = pd.DataFrame(outbreaks) # Créer les listes pour les filtres à partir des données complètes all_countries = sorted(df_outbreaks['country'].dropna().unique()) if 'country' in df_outbreaks else [] all_diseases = sorted(df_outbreaks['disease'].dropna().unique()) if 'disease' in df_outbreaks else [] all_species = sorted(df_outbreaks['species'].dropna().unique()) if 'species' in df_outbreaks else [] # Créer le package ZIP timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") zip_path = OUTPUT_DIR / f"wahis_package_{timestamp}.zip" with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf: df_summary = pd.DataFrame(reports) df_summary.to_excel(OUTPUT_DIR / "1_summary_events.xlsx", index=False) df_outbreaks.to_excel(OUTPUT_DIR / "2_outbreaks_full_details.xlsx", index=False) zipf.write(OUTPUT_DIR / "1_summary_events.xlsx", arcname="1_summary_events.xlsx") zipf.write(OUTPUT_DIR / "2_outbreaks_full_details.xlsx", arcname="2_outbreaks_full_details.xlsx") return df_outbreaks, ["Tous"] + all_countries, ["Toutes"] + all_diseases, ["Toutes"] + all_species, str(zip_path) # --- Fonctions de l'interface Gradio --- async def run_and_update_ui(): scraper = WAHISScraper() reports, outbreaks, additional_infos, logs = await scraper.run_extraction_async() if not reports: return {status_textbox: logs, ui_visibility_group: gr.Group(visible=False)} df_outbreaks, countries, diseases, species, zip_path = process_data_and_create_zip(reports, outbreaks, additional_infos) return { status_textbox: logs, outbreak_data_state: df_outbreaks, filter_country: gr.Dropdown(choices=countries, value="Tous"), filter_disease: gr.Dropdown(choices=diseases, value="Toutes"), filter_species: gr.Dropdown(choices=species, value="Toutes"), outbreaks_table: df_outbreaks, download_section: gr.File(value=zip_path, visible=True), ui_visibility_group: gr.Group(visible=True) } def update_table(country, disease, species, df_outbreaks): if df_outbreaks is None or df_outbreaks.empty: return pd.DataFrame() filtered_df = df_outbreaks.copy() if country != "Tous": filtered_df = filtered_df[filtered_df['country'] == country] if disease != "Toutes": filtered_df = filtered_df[filtered_df['disease'] == disease] if species != "Toutes": filtered_df = filtered_df[filtered_df['species'] == species] return filtered_df with gr.Blocks(theme=gr.themes.Soft(), title="WAHIS Scraper") as demo: outbreak_data_state = gr.State() gr.Markdown("# 🤖 Scraper pour WAHIS (WOAH) - Version Robuste") run_button = gr.Button("🚀 Lancer l'extraction des données", variant="primary") with gr.Group(visible=False) as ui_visibility_group: gr.Markdown("### 🔍 Filtrez les données") with gr.Row(): filter_country = gr.Dropdown(label="Pays") filter_disease = gr.Dropdown(label="Maladie") filter_species = gr.Dropdown(label="Espèce") gr.Markdown("### 📋 Tableau Détaillé des Foyers (avec Coordonnées GPS)") outbreaks_table = gr.DataFrame(wrap=True) with gr.Accordion("Journal d'exécution et Téléchargement", open=False): status_textbox = gr.Textbox(lines=15, label="📜 Logs", interactive=False) download_section = gr.File(label="💾 Télécharger le Package Complet (.zip)") run_button.click( fn=run_and_update_ui, inputs=[], outputs=[status_textbox, ui_visibility_group, outbreak_data_state, filter_country, filter_disease, filter_species, outbreaks_table, download_section] ) filters = [filter_country, filter_disease, filter_species] for f in filters: f.change( fn=update_table, inputs=[filter_country, filter_disease, filter_species, outbreak_data_state], outputs=[outbreaks_table] ) if __name__ == "__main__": demo.launch()