#!/usr/bin/env python3 from __future__ import annotations import argparse import json import sys from pathlib import Path from typing import Dict, List, Tuple import os for _p in os.environ.get("WILDFIRE_FM_EXTRA_PYTHONPATH", "").split(os.pathsep): if _p and _p not in sys.path: sys.path.insert(0, _p) import numpy as np import pandas as pd from catboost import CatBoostRegressor from lightgbm import LGBMRegressor from sklearn.compose import ColumnTransformer from sklearn.impute import SimpleImputer from sklearn.linear_model import ElasticNet from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score from sklearn.pipeline import Pipeline from sklearn.preprocessing import OneHotEncoder, StandardScaler from xgboost import XGBRegressor DROP_COLUMNS = { "Event_ID", "Incid_Name", "incident_name_norm", "wfigs_name", "Ig_Date", "weather_date", "BurnBndAc", "target_log_burn_acres", } CATEGORICAL_COLUMNS = [ "Incid_Type", "state_abbr", "county_name", "wfigs_match_type", ] def rmse(y_true: np.ndarray, y_pred: np.ndarray) -> float: return float(np.sqrt(mean_squared_error(y_true, y_pred))) def mape(y_true: np.ndarray, y_pred: np.ndarray) -> float: denom = np.clip(np.asarray(y_true, dtype=np.float64), 1e-6, None) frac = np.abs(np.asarray(y_true, dtype=np.float64) - np.asarray(y_pred, dtype=np.float64)) / denom return float(np.mean(frac)) def spearman_corr(y_true: np.ndarray, y_pred: np.ndarray) -> float: a = pd.Series(np.asarray(y_true, dtype=np.float64)) b = pd.Series(np.asarray(y_pred, dtype=np.float64)) value = a.corr(b, method="spearman") return float(value) if pd.notna(value) else 0.0 def build_splits(df: pd.DataFrame) -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]: ordered = df.sort_values("Ig_Date").reset_index(drop=True) n = len(ordered) train_end = max(int(round(n * 0.6)), 1) val_end = max(int(round(n * 0.8)), train_end + 1) val_end = min(val_end, n - 1) if n >= 3 else n train = ordered.iloc[:train_end].copy() val = ordered.iloc[train_end:val_end].copy() test = ordered.iloc[val_end:].copy() if len(val) == 0 and len(test) > 1: val = test.iloc[:1].copy() test = test.iloc[1:].copy() return train, val, test def feature_columns(df: pd.DataFrame, feature_profile: str = "all") -> Tuple[List[str], List[str]]: categorical = [c for c in CATEGORICAL_COLUMNS if c in df.columns] numeric = [] for col in df.columns: if col in DROP_COLUMNS or col in categorical: continue if pd.api.types.is_numeric_dtype(df[col]): numeric.append(col) if feature_profile == "weather_fm": numeric = [c for c in numeric if c.startswith("weather_")] categorical = [] return numeric, categorical def make_sparse_preprocessor(numeric_cols: List[str], categorical_cols: List[str]) -> ColumnTransformer: return ColumnTransformer( transformers=[ ( "num", Pipeline( steps=[ ("impute", SimpleImputer(strategy="median")), ("scale", StandardScaler()), ] ), numeric_cols, ), ( "cat", Pipeline( steps=[ ("impute", SimpleImputer(strategy="most_frequent")), ("onehot", OneHotEncoder(handle_unknown="ignore")), ] ), categorical_cols, ), ], remainder="drop", ) def prepare_catboost_frames( train_df: pd.DataFrame, val_df: pd.DataFrame, test_df: pd.DataFrame, numeric_cols: List[str], categorical_cols: List[str], ) -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]: medians = {c: float(train_df[c].median()) for c in numeric_cols} modes = { c: str(train_df[c].mode(dropna=True).iloc[0]) if not train_df[c].mode(dropna=True).empty else "missing" for c in categorical_cols } def _prep(frame: pd.DataFrame) -> pd.DataFrame: out = frame[numeric_cols + categorical_cols].copy() for col in numeric_cols: out[col] = pd.to_numeric(out[col], errors="coerce").fillna(medians[col]) for col in categorical_cols: out[col] = out[col].astype("string").fillna(modes[col]).astype(str) return out return _prep(train_df), _prep(val_df), _prep(test_df) def evaluate_split(frame: pd.DataFrame, pred_log: np.ndarray) -> Dict[str, float]: true_log = frame["target_log_burn_acres"].to_numpy(dtype=np.float64) true_acres = frame["BurnBndAc"].to_numpy(dtype=np.float64) pred_log = np.asarray(pred_log, dtype=np.float64) pred_acres = np.exp(pred_log) return { "count": int(len(frame)), "log_mae": float(mean_absolute_error(true_log, pred_log)), "log_rmse": rmse(true_log, pred_log), "log_r2": float(r2_score(true_log, pred_log)) if len(frame) > 1 else 0.0, "log_spearman": spearman_corr(true_log, pred_log), "log_median_ae": float(np.median(np.abs(true_log - pred_log))), "acres_mae": float(mean_absolute_error(true_acres, pred_acres)), "acres_rmse": rmse(true_acres, pred_acres), "acres_median_ae": float(np.median(np.abs(true_acres - pred_acres))), "acres_mape": mape(true_acres, pred_acres), } def main() -> None: parser = argparse.ArgumentParser() parser.add_argument("--event-table", type=Path, required=True) parser.add_argument("--output-dir", type=Path, required=True) parser.add_argument("--feature-profile", choices=("all", "weather_fm"), default="all") parser.add_argument("--model-family", choices=("full", "lite"), default="full") parser.add_argument("--fm-family", type=str, default="") parser.add_argument("--seed", type=int, default=7) args = parser.parse_args() df = pd.read_csv(args.event_table) df["Ig_Date"] = pd.to_datetime(df["Ig_Date"]) train_df, val_df, test_df = build_splits(df) numeric_cols, categorical_cols = feature_columns(df, feature_profile=args.feature_profile) if not numeric_cols and not categorical_cols: raise SystemExit(f"No usable features found for profile={args.feature_profile}") x_cols = numeric_cols + categorical_cols pre = make_sparse_preprocessor(numeric_cols, categorical_cols) x_train = pre.fit_transform(train_df[x_cols]) x_val = pre.transform(val_df[x_cols]) x_test = pre.transform(test_df[x_cols]) y_train = train_df["target_log_burn_acres"].to_numpy(dtype=np.float64) cat_train, cat_val, cat_test = prepare_catboost_frames(train_df, val_df, test_df, numeric_cols, categorical_cols) cat_feature_idx = list(range(len(numeric_cols), len(numeric_cols) + len(categorical_cols))) candidates: List[Tuple[str, object, str]] = [ ( "enet", ElasticNet(alpha=0.01, l1_ratio=0.2, random_state=args.seed, max_iter=10000), "sparse", ), ] if args.model_family == "full": candidates.extend( [ ( "xgboost", XGBRegressor( n_estimators=400, max_depth=6, learning_rate=0.05, subsample=0.8, colsample_bytree=0.8, reg_lambda=1.0, objective="reg:squarederror", tree_method="hist", random_state=args.seed, n_jobs=8, ), "sparse", ), ( "lightgbm", LGBMRegressor( n_estimators=400, learning_rate=0.05, num_leaves=63, subsample=0.8, colsample_bytree=0.8, reg_lambda=1.0, random_state=args.seed, n_jobs=8, verbose=-1, ), "sparse", ), ( "catboost", CatBoostRegressor( iterations=500, depth=8, learning_rate=0.05, loss_function="RMSE", eval_metric="RMSE", random_seed=args.seed, verbose=False, ), "cat", ), ] ) candidate_validation: List[Dict[str, object]] = [] best_name = None best_kind = None best_model = None best_score = None for name, model, kind in candidates: if kind == "sparse": model.fit(x_train, y_train) val_pred = model.predict(x_val) else: model.fit(cat_train, y_train, cat_features=cat_feature_idx, eval_set=(cat_val, val_df["target_log_burn_acres"]), use_best_model=False) val_pred = model.predict(cat_val) val_metrics = evaluate_split(val_df, val_pred) candidate_validation.append({"model_name": name, "val_metrics": val_metrics}) score = float(val_metrics["log_mae"]) if best_score is None or score < best_score: best_score = score best_name = name best_kind = kind best_model = model assert best_model is not None and best_name is not None and best_kind is not None combined_train = pd.concat([train_df, val_df], ignore_index=True) if best_kind == "sparse": x_combined = pre.fit_transform(combined_train[x_cols]) x_train_final = pre.transform(train_df[x_cols]) x_val_final = pre.transform(val_df[x_cols]) x_test_final = pre.transform(test_df[x_cols]) best_model.fit(x_combined, combined_train["target_log_burn_acres"].to_numpy(dtype=np.float64)) train_pred = best_model.predict(x_train_final) val_pred = best_model.predict(x_val_final) test_pred = best_model.predict(x_test_final) else: cat_combined, cat_train_final, cat_test_final = prepare_catboost_frames( combined_train, train_df, test_df, numeric_cols, categorical_cols ) cat_val_final = prepare_catboost_frames(val_df, val_df, val_df, numeric_cols, categorical_cols)[0] best_model.fit( cat_combined, combined_train["target_log_burn_acres"].to_numpy(dtype=np.float64), cat_features=cat_feature_idx, use_best_model=False, ) train_pred = best_model.predict(cat_train_final) val_pred = best_model.predict(cat_val_final) test_pred = best_model.predict(cat_test_final) args.output_dir.mkdir(parents=True, exist_ok=True) pred_df = pd.concat( [ train_df.assign(split="train", pred_log_burn_acres=train_pred, pred_burn_acres=np.exp(train_pred)), val_df.assign(split="val", pred_log_burn_acres=val_pred, pred_burn_acres=np.exp(val_pred)), test_df.assign(split="test", pred_log_burn_acres=test_pred, pred_burn_acres=np.exp(test_pred)), ], axis=0, ignore_index=True, ) pred_path = args.output_dir / "predictions.csv" pred_df.to_csv(pred_path, index=False) summary = { "task_id": "wildfire_final_area_scalar_taskmodels", "task_form": "event_level_regression", "event_table": str(args.event_table), "output_dir": str(args.output_dir), "feature_profile": args.feature_profile, "seed": int(args.seed), "benchmark_protocol": "fm_lite_protocol" if args.feature_profile == "weather_fm" and args.model_family == "lite" else "standard_protocol", "split_sizes": { "train": int(len(train_df)), "val": int(len(val_df)), "test": int(len(test_df)), }, "feature_columns": { "numeric": numeric_cols, "categorical": categorical_cols, }, "candidate_validation": candidate_validation, "selected_model": best_name, "train_metrics": evaluate_split(train_df, train_pred), "val_metrics": evaluate_split(val_df, val_pred), "test_metrics": evaluate_split(test_df, test_pred), "headline_metrics": { "log_mae": float(evaluate_split(test_df, test_pred)["log_mae"]), "log_rmse": float(evaluate_split(test_df, test_pred)["log_rmse"]), "log_spearman": float(evaluate_split(test_df, test_pred)["log_spearman"]), }, "predictions_path": str(pred_path), "model_family": "lightweight_linear_task_heads" if args.model_family == "lite" else "popular_open_source_task_models", "fm_family": (args.fm_family or "weather_fm_derived_features") if args.feature_profile == "weather_fm" else None, "tmt_policy": { "task": "final_burned_area", "metric": "log-area regression error with rank agreement", "tolerance": "secondary magnitude-band interpretation only", }, } (args.output_dir / "summary.json").write_text(json.dumps(summary, indent=2), encoding="utf-8") print(json.dumps(summary, indent=2)) if __name__ == "__main__": main()