|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import gradio as gr |
|
|
import pandas as pd |
|
|
import json |
|
|
from datetime import datetime |
|
|
from pathlib import Path |
|
|
import warnings |
|
|
import traceback |
|
|
import asyncio |
|
|
import subprocess |
|
|
import zipfile |
|
|
from playwright.async_api import async_playwright |
|
|
from playwright_stealth import stealth_async |
|
|
|
|
|
|
|
|
def install_playwright_browsers(): |
|
|
print("Vérification et installation des navigateurs Playwright...") |
|
|
try: |
|
|
subprocess.run(["playwright", "install", "chromium"], capture_output=True, text=True, check=True) |
|
|
print("Installation de Chromium terminée avec succès.") |
|
|
except Exception as e: |
|
|
print(f"Erreur lors de l'installation de Chromium : {e}") |
|
|
raise |
|
|
|
|
|
install_playwright_browsers() |
|
|
|
|
|
warnings.filterwarnings('ignore') |
|
|
OUTPUT_DIR = Path("outputs") |
|
|
OUTPUT_DIR.mkdir(exist_ok=True) |
|
|
|
|
|
class WAHISScraper: |
|
|
|
|
|
def __init__(self): self.logs = [] |
|
|
def log(self, message): |
|
|
timestamp = datetime.now().strftime("%H:%M:%S") |
|
|
self.logs.append(f"[{timestamp}] {message}") |
|
|
print(message) |
|
|
async def run_extraction_async(self): |
|
|
self.log("🚀 Lancement de l'extraction en trois phases...") |
|
|
async with async_playwright() as p: |
|
|
browser = None |
|
|
try: |
|
|
self.log("🔧 Lancement d'un navigateur Chromium...") |
|
|
browser = await p.chromium.launch(headless=True) |
|
|
page = await browser.new_page() |
|
|
self.log("🕵️ Application du camouflage 'stealth'...") |
|
|
await stealth_async(page) |
|
|
self.log("🌍 Visite de la page principale pour passer le challenge Cloudflare...") |
|
|
await page.goto("https://wahis.woah.org/#/event-management", wait_until="networkidle", timeout=90000) |
|
|
self.log("🍪 Challenge Cloudflare réussi.") |
|
|
headers = { 'Content-Type': 'application/json', 'Accept': 'application/json', 'clientid': 'OIEwebsite', 'env': 'PRD', 'security-token': 'token', 'type': 'REQUEST' } |
|
|
self.log("--- PHASE 1 : Récupération de la liste des rapports ---") |
|
|
list_api_url = "https://wahis.woah.org/api/v1/pi/event/filtered-list?language=fr" |
|
|
payload_list = { "pageNumber": 1, "pageSize": 100, "sortColName": "REP_LAST_UPDATE", "sortColOrder": "DESC", "reportFilters": {}, "languageChanged": False } |
|
|
list_response_json = await page.evaluate("async (args) => (await fetch(args.url, { method: 'POST', headers: args.headers, body: JSON.stringify(args.payload) })).json()", {'url': list_api_url, 'headers': headers, 'payload': payload_list}) |
|
|
report_list = list_response_json.get('list', []) |
|
|
if not report_list: raise Exception("Échec de la phase 1.") |
|
|
self.log(f"✅ Phase 1 réussie : {len(report_list)} rapports de base récupérés.") |
|
|
unique_event_ids = sorted(list(set(item['eventId'] for item in report_list if 'eventId' in item))) |
|
|
self.log(f"--- PHASE 2 : Récupération des données GPS pour {len(unique_event_ids)} événements...") |
|
|
outbreaks_api_url = "https://wahis.woah.org/api/v1/pi/map-data/outbreaks-from-event-ids?language=fr" |
|
|
all_outbreaks_data = await page.evaluate("async (args) => (await fetch(args.url, { method: 'POST', headers: args.headers, body: JSON.stringify(args.payload) })).json()", {'url': outbreaks_api_url, 'headers': headers, 'payload': unique_event_ids}) |
|
|
self.log(f"✅ Phase 2 réussie : {len(all_outbreaks_data)} foyers récupérés.") |
|
|
unique_outbreak_ids = sorted(list(set(item['outbreakId'] for item in all_outbreaks_data if 'outbreakId' in item))) |
|
|
self.log(f"--- PHASE 3 : Récupération des détails épidémiologiques pour {len(unique_outbreak_ids)} foyers...") |
|
|
additional_info_data = [] |
|
|
if unique_outbreak_ids: |
|
|
additional_info_api_url = "https://wahis.woah.org/api/v1/pi/outbreak/additional-information" |
|
|
additional_info_data = await page.evaluate("async (args) => (await fetch(args.url, { method: 'POST', headers: args.headers, body: JSON.stringify(args.payload) })).json()", {'url': additional_info_api_url, 'headers': headers, 'payload': unique_outbreak_ids}) |
|
|
self.log(f"✅ Phase 3 réussie : {len(additional_info_data)} fiches de détails récupérées.") |
|
|
return report_list, all_outbreaks_data, additional_info_data, "\n".join(self.logs) |
|
|
except Exception as e: |
|
|
self.log(f"❌ Une erreur critique est survenue."); self.log(traceback.format_exc()) |
|
|
return None, None, None, "\n".join(self.logs) |
|
|
finally: |
|
|
if browser and browser.is_connected(): await browser.close() |
|
|
|
|
|
def process_data_and_create_zip(reports, outbreaks, additional_infos): |
|
|
if not reports: return pd.DataFrame(), [], [], [], None |
|
|
|
|
|
|
|
|
report_map = {report['eventId']: {'disease': report['disease'], 'country': report['country']} for report in reports} |
|
|
for outbreak in outbreaks: |
|
|
event_info = report_map.get(outbreak.get('eventId'), {}) |
|
|
outbreak['disease'] = event_info.get('disease') |
|
|
outbreak['country'] = event_info.get('country') |
|
|
|
|
|
valid_additional_infos = [info for info in additional_infos if isinstance(info, dict)] |
|
|
additional_info_map = {info.get('outbreakId'): info for info in valid_additional_infos} |
|
|
for outbreak in outbreaks: |
|
|
outbreak_id = outbreak.get('outbreakId') |
|
|
if outbreak_id in additional_info_map: outbreak.update(additional_info_map[outbreak_id]) |
|
|
|
|
|
df_outbreaks = pd.DataFrame(outbreaks) |
|
|
|
|
|
|
|
|
all_countries = sorted(df_outbreaks['country'].dropna().unique()) if 'country' in df_outbreaks else [] |
|
|
all_diseases = sorted(df_outbreaks['disease'].dropna().unique()) if 'disease' in df_outbreaks else [] |
|
|
all_species = sorted(df_outbreaks['species'].dropna().unique()) if 'species' in df_outbreaks else [] |
|
|
|
|
|
|
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") |
|
|
zip_path = OUTPUT_DIR / f"wahis_package_{timestamp}.zip" |
|
|
with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf: |
|
|
df_summary = pd.DataFrame(reports) |
|
|
df_summary.to_excel(OUTPUT_DIR / "1_summary_events.xlsx", index=False) |
|
|
df_outbreaks.to_excel(OUTPUT_DIR / "2_outbreaks_full_details.xlsx", index=False) |
|
|
zipf.write(OUTPUT_DIR / "1_summary_events.xlsx", arcname="1_summary_events.xlsx") |
|
|
zipf.write(OUTPUT_DIR / "2_outbreaks_full_details.xlsx", arcname="2_outbreaks_full_details.xlsx") |
|
|
|
|
|
return df_outbreaks, ["Tous"] + all_countries, ["Toutes"] + all_diseases, ["Toutes"] + all_species, str(zip_path) |
|
|
|
|
|
|
|
|
|
|
|
async def run_and_update_ui(): |
|
|
scraper = WAHISScraper() |
|
|
reports, outbreaks, additional_infos, logs = await scraper.run_extraction_async() |
|
|
if not reports: |
|
|
return {status_textbox: logs, ui_visibility_group: gr.Group(visible=False)} |
|
|
|
|
|
df_outbreaks, countries, diseases, species, zip_path = process_data_and_create_zip(reports, outbreaks, additional_infos) |
|
|
|
|
|
return { |
|
|
status_textbox: logs, |
|
|
outbreak_data_state: df_outbreaks, |
|
|
filter_country: gr.Dropdown(choices=countries, value="Tous"), |
|
|
filter_disease: gr.Dropdown(choices=diseases, value="Toutes"), |
|
|
filter_species: gr.Dropdown(choices=species, value="Toutes"), |
|
|
outbreaks_table: df_outbreaks, |
|
|
download_section: gr.File(value=zip_path, visible=True), |
|
|
ui_visibility_group: gr.Group(visible=True) |
|
|
} |
|
|
|
|
|
def update_table(country, disease, species, df_outbreaks): |
|
|
if df_outbreaks is None or df_outbreaks.empty: return pd.DataFrame() |
|
|
filtered_df = df_outbreaks.copy() |
|
|
if country != "Tous": filtered_df = filtered_df[filtered_df['country'] == country] |
|
|
if disease != "Toutes": filtered_df = filtered_df[filtered_df['disease'] == disease] |
|
|
if species != "Toutes": filtered_df = filtered_df[filtered_df['species'] == species] |
|
|
return filtered_df |
|
|
|
|
|
with gr.Blocks(theme=gr.themes.Soft(), title="WAHIS Scraper") as demo: |
|
|
outbreak_data_state = gr.State() |
|
|
gr.Markdown("# 🤖 Scraper pour WAHIS (WOAH) - Version Robuste") |
|
|
run_button = gr.Button("🚀 Lancer l'extraction des données", variant="primary") |
|
|
|
|
|
with gr.Group(visible=False) as ui_visibility_group: |
|
|
gr.Markdown("### 🔍 Filtrez les données") |
|
|
with gr.Row(): |
|
|
filter_country = gr.Dropdown(label="Pays") |
|
|
filter_disease = gr.Dropdown(label="Maladie") |
|
|
filter_species = gr.Dropdown(label="Espèce") |
|
|
gr.Markdown("### 📋 Tableau Détaillé des Foyers (avec Coordonnées GPS)") |
|
|
outbreaks_table = gr.DataFrame(wrap=True) |
|
|
|
|
|
with gr.Accordion("Journal d'exécution et Téléchargement", open=False): |
|
|
status_textbox = gr.Textbox(lines=15, label="📜 Logs", interactive=False) |
|
|
download_section = gr.File(label="💾 Télécharger le Package Complet (.zip)") |
|
|
|
|
|
run_button.click( |
|
|
fn=run_and_update_ui, |
|
|
inputs=[], |
|
|
outputs=[status_textbox, ui_visibility_group, outbreak_data_state, filter_country, filter_disease, filter_species, outbreaks_table, download_section] |
|
|
) |
|
|
|
|
|
filters = [filter_country, filter_disease, filter_species] |
|
|
for f in filters: |
|
|
f.change( |
|
|
fn=update_table, |
|
|
inputs=[filter_country, filter_disease, filter_species, outbreak_data_state], |
|
|
outputs=[outbreaks_table] |
|
|
) |
|
|
|
|
|
if __name__ == "__main__": |
|
|
demo.launch() |