| |
| from __future__ import annotations |
|
|
| import argparse |
| import json |
| import sys |
| from pathlib import Path |
| from typing import Dict, List, Tuple |
|
|
|
|
| import os |
|
|
| for _p in os.environ.get("WILDFIRE_FM_EXTRA_PYTHONPATH", "").split(os.pathsep): |
| if _p and _p not in sys.path: |
| sys.path.insert(0, _p) |
|
|
| import numpy as np |
| import pandas as pd |
| from catboost import CatBoostRegressor |
| from lightgbm import LGBMRegressor |
| from sklearn.compose import ColumnTransformer |
| from sklearn.impute import SimpleImputer |
| from sklearn.linear_model import ElasticNet |
| from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score |
| from sklearn.pipeline import Pipeline |
| from sklearn.preprocessing import OneHotEncoder, StandardScaler |
| from xgboost import XGBRegressor |
|
|
|
|
| DROP_COLUMNS = { |
| "Event_ID", |
| "Incid_Name", |
| "incident_name_norm", |
| "wfigs_name", |
| "Ig_Date", |
| "weather_date", |
| "BurnBndAc", |
| "target_log_burn_acres", |
| } |
|
|
| CATEGORICAL_COLUMNS = [ |
| "Incid_Type", |
| "state_abbr", |
| "county_name", |
| "wfigs_match_type", |
| ] |
|
|
|
|
| def rmse(y_true: np.ndarray, y_pred: np.ndarray) -> float: |
| return float(np.sqrt(mean_squared_error(y_true, y_pred))) |
|
|
|
|
| def mape(y_true: np.ndarray, y_pred: np.ndarray) -> float: |
| denom = np.clip(np.asarray(y_true, dtype=np.float64), 1e-6, None) |
| frac = np.abs(np.asarray(y_true, dtype=np.float64) - np.asarray(y_pred, dtype=np.float64)) / denom |
| return float(np.mean(frac)) |
|
|
|
|
| def spearman_corr(y_true: np.ndarray, y_pred: np.ndarray) -> float: |
| a = pd.Series(np.asarray(y_true, dtype=np.float64)) |
| b = pd.Series(np.asarray(y_pred, dtype=np.float64)) |
| value = a.corr(b, method="spearman") |
| return float(value) if pd.notna(value) else 0.0 |
|
|
|
|
| def build_splits(df: pd.DataFrame) -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]: |
| ordered = df.sort_values("Ig_Date").reset_index(drop=True) |
| n = len(ordered) |
| train_end = max(int(round(n * 0.6)), 1) |
| val_end = max(int(round(n * 0.8)), train_end + 1) |
| val_end = min(val_end, n - 1) if n >= 3 else n |
| train = ordered.iloc[:train_end].copy() |
| val = ordered.iloc[train_end:val_end].copy() |
| test = ordered.iloc[val_end:].copy() |
| if len(val) == 0 and len(test) > 1: |
| val = test.iloc[:1].copy() |
| test = test.iloc[1:].copy() |
| return train, val, test |
|
|
|
|
| def feature_columns(df: pd.DataFrame, feature_profile: str = "all") -> Tuple[List[str], List[str]]: |
| categorical = [c for c in CATEGORICAL_COLUMNS if c in df.columns] |
| numeric = [] |
| for col in df.columns: |
| if col in DROP_COLUMNS or col in categorical: |
| continue |
| if pd.api.types.is_numeric_dtype(df[col]): |
| numeric.append(col) |
| if feature_profile == "weather_fm": |
| numeric = [c for c in numeric if c.startswith("weather_")] |
| categorical = [] |
| return numeric, categorical |
|
|
|
|
| def make_sparse_preprocessor(numeric_cols: List[str], categorical_cols: List[str]) -> ColumnTransformer: |
| return ColumnTransformer( |
| transformers=[ |
| ( |
| "num", |
| Pipeline( |
| steps=[ |
| ("impute", SimpleImputer(strategy="median")), |
| ("scale", StandardScaler()), |
| ] |
| ), |
| numeric_cols, |
| ), |
| ( |
| "cat", |
| Pipeline( |
| steps=[ |
| ("impute", SimpleImputer(strategy="most_frequent")), |
| ("onehot", OneHotEncoder(handle_unknown="ignore")), |
| ] |
| ), |
| categorical_cols, |
| ), |
| ], |
| remainder="drop", |
| ) |
|
|
|
|
| def prepare_catboost_frames( |
| train_df: pd.DataFrame, |
| val_df: pd.DataFrame, |
| test_df: pd.DataFrame, |
| numeric_cols: List[str], |
| categorical_cols: List[str], |
| ) -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]: |
| medians = {c: float(train_df[c].median()) for c in numeric_cols} |
| modes = { |
| c: str(train_df[c].mode(dropna=True).iloc[0]) if not train_df[c].mode(dropna=True).empty else "missing" |
| for c in categorical_cols |
| } |
|
|
| def _prep(frame: pd.DataFrame) -> pd.DataFrame: |
| out = frame[numeric_cols + categorical_cols].copy() |
| for col in numeric_cols: |
| out[col] = pd.to_numeric(out[col], errors="coerce").fillna(medians[col]) |
| for col in categorical_cols: |
| out[col] = out[col].astype("string").fillna(modes[col]).astype(str) |
| return out |
|
|
| return _prep(train_df), _prep(val_df), _prep(test_df) |
|
|
|
|
| def evaluate_split(frame: pd.DataFrame, pred_log: np.ndarray) -> Dict[str, float]: |
| true_log = frame["target_log_burn_acres"].to_numpy(dtype=np.float64) |
| true_acres = frame["BurnBndAc"].to_numpy(dtype=np.float64) |
| pred_log = np.asarray(pred_log, dtype=np.float64) |
| pred_acres = np.exp(pred_log) |
| return { |
| "count": int(len(frame)), |
| "log_mae": float(mean_absolute_error(true_log, pred_log)), |
| "log_rmse": rmse(true_log, pred_log), |
| "log_r2": float(r2_score(true_log, pred_log)) if len(frame) > 1 else 0.0, |
| "log_spearman": spearman_corr(true_log, pred_log), |
| "log_median_ae": float(np.median(np.abs(true_log - pred_log))), |
| "acres_mae": float(mean_absolute_error(true_acres, pred_acres)), |
| "acres_rmse": rmse(true_acres, pred_acres), |
| "acres_median_ae": float(np.median(np.abs(true_acres - pred_acres))), |
| "acres_mape": mape(true_acres, pred_acres), |
| } |
|
|
|
|
| def main() -> None: |
| parser = argparse.ArgumentParser() |
| parser.add_argument("--event-table", type=Path, required=True) |
| parser.add_argument("--output-dir", type=Path, required=True) |
| parser.add_argument("--feature-profile", choices=("all", "weather_fm"), default="all") |
| parser.add_argument("--model-family", choices=("full", "lite"), default="full") |
| parser.add_argument("--fm-family", type=str, default="") |
| parser.add_argument("--seed", type=int, default=7) |
| args = parser.parse_args() |
|
|
| df = pd.read_csv(args.event_table) |
| df["Ig_Date"] = pd.to_datetime(df["Ig_Date"]) |
| train_df, val_df, test_df = build_splits(df) |
| numeric_cols, categorical_cols = feature_columns(df, feature_profile=args.feature_profile) |
| if not numeric_cols and not categorical_cols: |
| raise SystemExit(f"No usable features found for profile={args.feature_profile}") |
| x_cols = numeric_cols + categorical_cols |
|
|
| pre = make_sparse_preprocessor(numeric_cols, categorical_cols) |
| x_train = pre.fit_transform(train_df[x_cols]) |
| x_val = pre.transform(val_df[x_cols]) |
| x_test = pre.transform(test_df[x_cols]) |
| y_train = train_df["target_log_burn_acres"].to_numpy(dtype=np.float64) |
|
|
| cat_train, cat_val, cat_test = prepare_catboost_frames(train_df, val_df, test_df, numeric_cols, categorical_cols) |
| cat_feature_idx = list(range(len(numeric_cols), len(numeric_cols) + len(categorical_cols))) |
|
|
| candidates: List[Tuple[str, object, str]] = [ |
| ( |
| "enet", |
| ElasticNet(alpha=0.01, l1_ratio=0.2, random_state=args.seed, max_iter=10000), |
| "sparse", |
| ), |
| ] |
| if args.model_family == "full": |
| candidates.extend( |
| [ |
| ( |
| "xgboost", |
| XGBRegressor( |
| n_estimators=400, |
| max_depth=6, |
| learning_rate=0.05, |
| subsample=0.8, |
| colsample_bytree=0.8, |
| reg_lambda=1.0, |
| objective="reg:squarederror", |
| tree_method="hist", |
| random_state=args.seed, |
| n_jobs=8, |
| ), |
| "sparse", |
| ), |
| ( |
| "lightgbm", |
| LGBMRegressor( |
| n_estimators=400, |
| learning_rate=0.05, |
| num_leaves=63, |
| subsample=0.8, |
| colsample_bytree=0.8, |
| reg_lambda=1.0, |
| random_state=args.seed, |
| n_jobs=8, |
| verbose=-1, |
| ), |
| "sparse", |
| ), |
| ( |
| "catboost", |
| CatBoostRegressor( |
| iterations=500, |
| depth=8, |
| learning_rate=0.05, |
| loss_function="RMSE", |
| eval_metric="RMSE", |
| random_seed=args.seed, |
| verbose=False, |
| ), |
| "cat", |
| ), |
| ] |
| ) |
|
|
| candidate_validation: List[Dict[str, object]] = [] |
| best_name = None |
| best_kind = None |
| best_model = None |
| best_score = None |
|
|
| for name, model, kind in candidates: |
| if kind == "sparse": |
| model.fit(x_train, y_train) |
| val_pred = model.predict(x_val) |
| else: |
| model.fit(cat_train, y_train, cat_features=cat_feature_idx, eval_set=(cat_val, val_df["target_log_burn_acres"]), use_best_model=False) |
| val_pred = model.predict(cat_val) |
| val_metrics = evaluate_split(val_df, val_pred) |
| candidate_validation.append({"model_name": name, "val_metrics": val_metrics}) |
| score = float(val_metrics["log_mae"]) |
| if best_score is None or score < best_score: |
| best_score = score |
| best_name = name |
| best_kind = kind |
| best_model = model |
|
|
| assert best_model is not None and best_name is not None and best_kind is not None |
|
|
| combined_train = pd.concat([train_df, val_df], ignore_index=True) |
| if best_kind == "sparse": |
| x_combined = pre.fit_transform(combined_train[x_cols]) |
| x_train_final = pre.transform(train_df[x_cols]) |
| x_val_final = pre.transform(val_df[x_cols]) |
| x_test_final = pre.transform(test_df[x_cols]) |
| best_model.fit(x_combined, combined_train["target_log_burn_acres"].to_numpy(dtype=np.float64)) |
| train_pred = best_model.predict(x_train_final) |
| val_pred = best_model.predict(x_val_final) |
| test_pred = best_model.predict(x_test_final) |
| else: |
| cat_combined, cat_train_final, cat_test_final = prepare_catboost_frames( |
| combined_train, train_df, test_df, numeric_cols, categorical_cols |
| ) |
| cat_val_final = prepare_catboost_frames(val_df, val_df, val_df, numeric_cols, categorical_cols)[0] |
| best_model.fit( |
| cat_combined, |
| combined_train["target_log_burn_acres"].to_numpy(dtype=np.float64), |
| cat_features=cat_feature_idx, |
| use_best_model=False, |
| ) |
| train_pred = best_model.predict(cat_train_final) |
| val_pred = best_model.predict(cat_val_final) |
| test_pred = best_model.predict(cat_test_final) |
|
|
| args.output_dir.mkdir(parents=True, exist_ok=True) |
| pred_df = pd.concat( |
| [ |
| train_df.assign(split="train", pred_log_burn_acres=train_pred, pred_burn_acres=np.exp(train_pred)), |
| val_df.assign(split="val", pred_log_burn_acres=val_pred, pred_burn_acres=np.exp(val_pred)), |
| test_df.assign(split="test", pred_log_burn_acres=test_pred, pred_burn_acres=np.exp(test_pred)), |
| ], |
| axis=0, |
| ignore_index=True, |
| ) |
| pred_path = args.output_dir / "predictions.csv" |
| pred_df.to_csv(pred_path, index=False) |
|
|
| summary = { |
| "task_id": "wildfire_final_area_scalar_taskmodels", |
| "task_form": "event_level_regression", |
| "event_table": str(args.event_table), |
| "output_dir": str(args.output_dir), |
| "feature_profile": args.feature_profile, |
| "seed": int(args.seed), |
| "benchmark_protocol": "fm_lite_protocol" if args.feature_profile == "weather_fm" and args.model_family == "lite" else "standard_protocol", |
| "split_sizes": { |
| "train": int(len(train_df)), |
| "val": int(len(val_df)), |
| "test": int(len(test_df)), |
| }, |
| "feature_columns": { |
| "numeric": numeric_cols, |
| "categorical": categorical_cols, |
| }, |
| "candidate_validation": candidate_validation, |
| "selected_model": best_name, |
| "train_metrics": evaluate_split(train_df, train_pred), |
| "val_metrics": evaluate_split(val_df, val_pred), |
| "test_metrics": evaluate_split(test_df, test_pred), |
| "headline_metrics": { |
| "log_mae": float(evaluate_split(test_df, test_pred)["log_mae"]), |
| "log_rmse": float(evaluate_split(test_df, test_pred)["log_rmse"]), |
| "log_spearman": float(evaluate_split(test_df, test_pred)["log_spearman"]), |
| }, |
| "predictions_path": str(pred_path), |
| "model_family": "lightweight_linear_task_heads" if args.model_family == "lite" else "popular_open_source_task_models", |
| "fm_family": (args.fm_family or "weather_fm_derived_features") if args.feature_profile == "weather_fm" else None, |
| "tmt_policy": { |
| "task": "final_burned_area", |
| "metric": "log-area regression error with rank agreement", |
| "tolerance": "secondary magnitude-band interpretation only", |
| }, |
| } |
| (args.output_dir / "summary.json").write_text(json.dumps(summary, indent=2), encoding="utf-8") |
| print(json.dumps(summary, indent=2)) |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|