Wildfire-FM / experiments /raw_reference /task_scripts /run_final_area_taskmodel_seeded.py
yx21e's picture
Initial FireWx-FM artifact release
80ef3b2 verified
#!/usr/bin/env python3
from __future__ import annotations
import argparse
import json
import sys
from pathlib import Path
from typing import Dict, List, Tuple
import os
for _p in os.environ.get("WILDFIRE_FM_EXTRA_PYTHONPATH", "").split(os.pathsep):
if _p and _p not in sys.path:
sys.path.insert(0, _p)
import numpy as np
import pandas as pd
from catboost import CatBoostRegressor
from lightgbm import LGBMRegressor
from sklearn.compose import ColumnTransformer
from sklearn.impute import SimpleImputer
from sklearn.linear_model import ElasticNet
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import OneHotEncoder, StandardScaler
from xgboost import XGBRegressor
DROP_COLUMNS = {
"Event_ID",
"Incid_Name",
"incident_name_norm",
"wfigs_name",
"Ig_Date",
"weather_date",
"BurnBndAc",
"target_log_burn_acres",
}
CATEGORICAL_COLUMNS = [
"Incid_Type",
"state_abbr",
"county_name",
"wfigs_match_type",
]
def rmse(y_true: np.ndarray, y_pred: np.ndarray) -> float:
return float(np.sqrt(mean_squared_error(y_true, y_pred)))
def mape(y_true: np.ndarray, y_pred: np.ndarray) -> float:
denom = np.clip(np.asarray(y_true, dtype=np.float64), 1e-6, None)
frac = np.abs(np.asarray(y_true, dtype=np.float64) - np.asarray(y_pred, dtype=np.float64)) / denom
return float(np.mean(frac))
def spearman_corr(y_true: np.ndarray, y_pred: np.ndarray) -> float:
a = pd.Series(np.asarray(y_true, dtype=np.float64))
b = pd.Series(np.asarray(y_pred, dtype=np.float64))
value = a.corr(b, method="spearman")
return float(value) if pd.notna(value) else 0.0
def build_splits(df: pd.DataFrame) -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]:
ordered = df.sort_values("Ig_Date").reset_index(drop=True)
n = len(ordered)
train_end = max(int(round(n * 0.6)), 1)
val_end = max(int(round(n * 0.8)), train_end + 1)
val_end = min(val_end, n - 1) if n >= 3 else n
train = ordered.iloc[:train_end].copy()
val = ordered.iloc[train_end:val_end].copy()
test = ordered.iloc[val_end:].copy()
if len(val) == 0 and len(test) > 1:
val = test.iloc[:1].copy()
test = test.iloc[1:].copy()
return train, val, test
def feature_columns(df: pd.DataFrame, feature_profile: str = "all") -> Tuple[List[str], List[str]]:
categorical = [c for c in CATEGORICAL_COLUMNS if c in df.columns]
numeric = []
for col in df.columns:
if col in DROP_COLUMNS or col in categorical:
continue
if pd.api.types.is_numeric_dtype(df[col]):
numeric.append(col)
if feature_profile == "weather_fm":
numeric = [c for c in numeric if c.startswith("weather_")]
categorical = []
return numeric, categorical
def make_sparse_preprocessor(numeric_cols: List[str], categorical_cols: List[str]) -> ColumnTransformer:
return ColumnTransformer(
transformers=[
(
"num",
Pipeline(
steps=[
("impute", SimpleImputer(strategy="median")),
("scale", StandardScaler()),
]
),
numeric_cols,
),
(
"cat",
Pipeline(
steps=[
("impute", SimpleImputer(strategy="most_frequent")),
("onehot", OneHotEncoder(handle_unknown="ignore")),
]
),
categorical_cols,
),
],
remainder="drop",
)
def prepare_catboost_frames(
train_df: pd.DataFrame,
val_df: pd.DataFrame,
test_df: pd.DataFrame,
numeric_cols: List[str],
categorical_cols: List[str],
) -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]:
medians = {c: float(train_df[c].median()) for c in numeric_cols}
modes = {
c: str(train_df[c].mode(dropna=True).iloc[0]) if not train_df[c].mode(dropna=True).empty else "missing"
for c in categorical_cols
}
def _prep(frame: pd.DataFrame) -> pd.DataFrame:
out = frame[numeric_cols + categorical_cols].copy()
for col in numeric_cols:
out[col] = pd.to_numeric(out[col], errors="coerce").fillna(medians[col])
for col in categorical_cols:
out[col] = out[col].astype("string").fillna(modes[col]).astype(str)
return out
return _prep(train_df), _prep(val_df), _prep(test_df)
def evaluate_split(frame: pd.DataFrame, pred_log: np.ndarray) -> Dict[str, float]:
true_log = frame["target_log_burn_acres"].to_numpy(dtype=np.float64)
true_acres = frame["BurnBndAc"].to_numpy(dtype=np.float64)
pred_log = np.asarray(pred_log, dtype=np.float64)
pred_acres = np.exp(pred_log)
return {
"count": int(len(frame)),
"log_mae": float(mean_absolute_error(true_log, pred_log)),
"log_rmse": rmse(true_log, pred_log),
"log_r2": float(r2_score(true_log, pred_log)) if len(frame) > 1 else 0.0,
"log_spearman": spearman_corr(true_log, pred_log),
"log_median_ae": float(np.median(np.abs(true_log - pred_log))),
"acres_mae": float(mean_absolute_error(true_acres, pred_acres)),
"acres_rmse": rmse(true_acres, pred_acres),
"acres_median_ae": float(np.median(np.abs(true_acres - pred_acres))),
"acres_mape": mape(true_acres, pred_acres),
}
def main() -> None:
parser = argparse.ArgumentParser()
parser.add_argument("--event-table", type=Path, required=True)
parser.add_argument("--output-dir", type=Path, required=True)
parser.add_argument("--feature-profile", choices=("all", "weather_fm"), default="all")
parser.add_argument("--model-family", choices=("full", "lite"), default="full")
parser.add_argument("--fm-family", type=str, default="")
parser.add_argument("--seed", type=int, default=7)
args = parser.parse_args()
df = pd.read_csv(args.event_table)
df["Ig_Date"] = pd.to_datetime(df["Ig_Date"])
train_df, val_df, test_df = build_splits(df)
numeric_cols, categorical_cols = feature_columns(df, feature_profile=args.feature_profile)
if not numeric_cols and not categorical_cols:
raise SystemExit(f"No usable features found for profile={args.feature_profile}")
x_cols = numeric_cols + categorical_cols
pre = make_sparse_preprocessor(numeric_cols, categorical_cols)
x_train = pre.fit_transform(train_df[x_cols])
x_val = pre.transform(val_df[x_cols])
x_test = pre.transform(test_df[x_cols])
y_train = train_df["target_log_burn_acres"].to_numpy(dtype=np.float64)
cat_train, cat_val, cat_test = prepare_catboost_frames(train_df, val_df, test_df, numeric_cols, categorical_cols)
cat_feature_idx = list(range(len(numeric_cols), len(numeric_cols) + len(categorical_cols)))
candidates: List[Tuple[str, object, str]] = [
(
"enet",
ElasticNet(alpha=0.01, l1_ratio=0.2, random_state=args.seed, max_iter=10000),
"sparse",
),
]
if args.model_family == "full":
candidates.extend(
[
(
"xgboost",
XGBRegressor(
n_estimators=400,
max_depth=6,
learning_rate=0.05,
subsample=0.8,
colsample_bytree=0.8,
reg_lambda=1.0,
objective="reg:squarederror",
tree_method="hist",
random_state=args.seed,
n_jobs=8,
),
"sparse",
),
(
"lightgbm",
LGBMRegressor(
n_estimators=400,
learning_rate=0.05,
num_leaves=63,
subsample=0.8,
colsample_bytree=0.8,
reg_lambda=1.0,
random_state=args.seed,
n_jobs=8,
verbose=-1,
),
"sparse",
),
(
"catboost",
CatBoostRegressor(
iterations=500,
depth=8,
learning_rate=0.05,
loss_function="RMSE",
eval_metric="RMSE",
random_seed=args.seed,
verbose=False,
),
"cat",
),
]
)
candidate_validation: List[Dict[str, object]] = []
best_name = None
best_kind = None
best_model = None
best_score = None
for name, model, kind in candidates:
if kind == "sparse":
model.fit(x_train, y_train)
val_pred = model.predict(x_val)
else:
model.fit(cat_train, y_train, cat_features=cat_feature_idx, eval_set=(cat_val, val_df["target_log_burn_acres"]), use_best_model=False)
val_pred = model.predict(cat_val)
val_metrics = evaluate_split(val_df, val_pred)
candidate_validation.append({"model_name": name, "val_metrics": val_metrics})
score = float(val_metrics["log_mae"])
if best_score is None or score < best_score:
best_score = score
best_name = name
best_kind = kind
best_model = model
assert best_model is not None and best_name is not None and best_kind is not None
combined_train = pd.concat([train_df, val_df], ignore_index=True)
if best_kind == "sparse":
x_combined = pre.fit_transform(combined_train[x_cols])
x_train_final = pre.transform(train_df[x_cols])
x_val_final = pre.transform(val_df[x_cols])
x_test_final = pre.transform(test_df[x_cols])
best_model.fit(x_combined, combined_train["target_log_burn_acres"].to_numpy(dtype=np.float64))
train_pred = best_model.predict(x_train_final)
val_pred = best_model.predict(x_val_final)
test_pred = best_model.predict(x_test_final)
else:
cat_combined, cat_train_final, cat_test_final = prepare_catboost_frames(
combined_train, train_df, test_df, numeric_cols, categorical_cols
)
cat_val_final = prepare_catboost_frames(val_df, val_df, val_df, numeric_cols, categorical_cols)[0]
best_model.fit(
cat_combined,
combined_train["target_log_burn_acres"].to_numpy(dtype=np.float64),
cat_features=cat_feature_idx,
use_best_model=False,
)
train_pred = best_model.predict(cat_train_final)
val_pred = best_model.predict(cat_val_final)
test_pred = best_model.predict(cat_test_final)
args.output_dir.mkdir(parents=True, exist_ok=True)
pred_df = pd.concat(
[
train_df.assign(split="train", pred_log_burn_acres=train_pred, pred_burn_acres=np.exp(train_pred)),
val_df.assign(split="val", pred_log_burn_acres=val_pred, pred_burn_acres=np.exp(val_pred)),
test_df.assign(split="test", pred_log_burn_acres=test_pred, pred_burn_acres=np.exp(test_pred)),
],
axis=0,
ignore_index=True,
)
pred_path = args.output_dir / "predictions.csv"
pred_df.to_csv(pred_path, index=False)
summary = {
"task_id": "wildfire_final_area_scalar_taskmodels",
"task_form": "event_level_regression",
"event_table": str(args.event_table),
"output_dir": str(args.output_dir),
"feature_profile": args.feature_profile,
"seed": int(args.seed),
"benchmark_protocol": "fm_lite_protocol" if args.feature_profile == "weather_fm" and args.model_family == "lite" else "standard_protocol",
"split_sizes": {
"train": int(len(train_df)),
"val": int(len(val_df)),
"test": int(len(test_df)),
},
"feature_columns": {
"numeric": numeric_cols,
"categorical": categorical_cols,
},
"candidate_validation": candidate_validation,
"selected_model": best_name,
"train_metrics": evaluate_split(train_df, train_pred),
"val_metrics": evaluate_split(val_df, val_pred),
"test_metrics": evaluate_split(test_df, test_pred),
"headline_metrics": {
"log_mae": float(evaluate_split(test_df, test_pred)["log_mae"]),
"log_rmse": float(evaluate_split(test_df, test_pred)["log_rmse"]),
"log_spearman": float(evaluate_split(test_df, test_pred)["log_spearman"]),
},
"predictions_path": str(pred_path),
"model_family": "lightweight_linear_task_heads" if args.model_family == "lite" else "popular_open_source_task_models",
"fm_family": (args.fm_family or "weather_fm_derived_features") if args.feature_profile == "weather_fm" else None,
"tmt_policy": {
"task": "final_burned_area",
"metric": "log-area regression error with rank agreement",
"tolerance": "secondary magnitude-band interpretation only",
},
}
(args.output_dir / "summary.json").write_text(json.dumps(summary, indent=2), encoding="utf-8")
print(json.dumps(summary, indent=2))
if __name__ == "__main__":
main()