Spaces:
Running on CPU Upgrade
Running on CPU Upgrade
| """Parse EMDAT Excel data and extract European flood events.""" | |
| from __future__ import annotations | |
| import json | |
| from datetime import date | |
| from pathlib import Path | |
| import pandas as pd | |
| from src.llm.client import load_config | |
| from src.models.schemas import FloodEvent | |
| def parse_emdat(config: dict | None = None) -> list[FloodEvent]: | |
| """Parse EMDAT Excel file and return filtered European flood events.""" | |
| if config is None: | |
| config = load_config() | |
| data_cfg = config["data"] | |
| paths_cfg = config["paths"] | |
| emdat_path = paths_cfg["emdat_file"] | |
| df = pd.read_excel(emdat_path, engine="openpyxl") | |
| # Filter: Flood + Europe + target years | |
| all_years = data_cfg["train_years"] + data_cfg["test_years"] | |
| mask = ( | |
| (df["Disaster Type"] == data_cfg["disaster_type"]) | |
| & (df["Region"] == data_cfg["region"]) | |
| & (df["Start Year"].isin(all_years)) | |
| ) | |
| filtered = df[mask].copy() | |
| events = [] | |
| for _, row in filtered.iterrows(): | |
| start_date = _build_date(row["Start Year"], row["Start Month"], row["Start Day"]) | |
| end_date = _build_date(row["End Year"], row["End Month"], row["End Day"]) | |
| event = FloodEvent( | |
| event_id=str(row["DisNo."]), | |
| country=str(row["Country"]), | |
| iso=str(row["ISO"]), | |
| region=str(row["Region"]), | |
| location=_safe_str(row.get("Location")), | |
| latitude=_safe_float(row.get("Latitude")), | |
| longitude=_safe_float(row.get("Longitude")), | |
| start_date=start_date, | |
| end_date=end_date, | |
| disaster_subtype=_safe_str(row.get("Disaster Subtype")), | |
| origin=_safe_str(row.get("Origin")), | |
| magnitude=_safe_float(row.get("Magnitude")), | |
| magnitude_scale=_safe_str(row.get("Magnitude Scale")), | |
| total_deaths=_safe_int(row.get("Total Deaths")), | |
| total_affected=_safe_int(row.get("Total Affected")), | |
| total_damage_k_usd=_safe_float(row.get("Total Damage ('000 US$)")), | |
| ) | |
| events.append(event) | |
| return events | |
| def split_events( | |
| events: list[FloodEvent], config: dict | None = None | |
| ) -> tuple[list[FloodEvent], list[FloodEvent]]: | |
| """Split events into train and test sets based on year config.""" | |
| if config is None: | |
| config = load_config() | |
| train_years = set(config["data"]["train_years"]) | |
| test_years = set(config["data"]["test_years"]) | |
| train = [e for e in events if e.start_date.year in train_years] | |
| test = [e for e in events if e.start_date.year in test_years] | |
| return train, test | |
| def save_events(events: list[FloodEvent], output_path: str) -> None: | |
| """Save events list to JSON file.""" | |
| path = Path(output_path) | |
| path.parent.mkdir(parents=True, exist_ok=True) | |
| data = [e.model_dump(mode="json") for e in events] | |
| path.write_text(json.dumps(data, indent=2, ensure_ascii=False)) | |
| def _build_date(year, month, day) -> date: | |
| y = int(year) if pd.notna(year) else 2000 | |
| m = int(month) if pd.notna(month) else 1 | |
| d = int(day) if pd.notna(day) else 1 | |
| return date(y, m, d) | |
| def _safe_str(val) -> str | None: | |
| if pd.isna(val): | |
| return None | |
| return str(val) | |
| def _safe_float(val) -> float | None: | |
| if pd.isna(val): | |
| return None | |
| try: | |
| return float(val) | |
| except (ValueError, TypeError): | |
| return None | |
| def _safe_int(val) -> int | None: | |
| if pd.isna(val): | |
| return None | |
| try: | |
| return int(float(val)) | |
| except (ValueError, TypeError): | |
| return None | |