#!/usr/bin/env python3 from __future__ import annotations import argparse import json import sys from pathlib import Path from typing import Dict, List, Tuple import os for _p in os.environ.get("WILDFIRE_FM_EXTRA_PYTHONPATH", "").split(os.pathsep): if _p and _p not in sys.path: sys.path.insert(0, _p) import faiss import hnswlib import numpy as np import pandas as pd from sklearn.compose import ColumnTransformer from sklearn.impute import SimpleImputer from sklearn.metrics.pairwise import cosine_similarity from sklearn.pipeline import Pipeline from sklearn.preprocessing import OneHotEncoder, StandardScaler DROP_COLUMNS = { "Event_ID", "Incid_Name", "incident_name_norm", "wfigs_name", "Ig_Date", "weather_date", "BurnBndAc", "target_log_burn_acres", } CATEGORICAL_COLUMNS = ["Incid_Type", "state_abbr", "county_name", "wfigs_match_type"] def rmse(y_true: np.ndarray, y_pred: np.ndarray) -> float: return float(np.sqrt(np.mean((np.asarray(y_true) - np.asarray(y_pred)) ** 2))) def mape(y_true: np.ndarray, y_pred: np.ndarray) -> float: denom = np.clip(np.asarray(y_true, dtype=np.float64), 1e-6, None) frac = np.abs(np.asarray(y_true, dtype=np.float64) - np.asarray(y_pred, dtype=np.float64)) / denom return float(np.mean(frac)) def r2_score_manual(y_true: np.ndarray, y_pred: np.ndarray) -> float: y_true = np.asarray(y_true, dtype=np.float64) y_pred = np.asarray(y_pred, dtype=np.float64) ss_res = float(np.sum((y_true - y_pred) ** 2)) ss_tot = float(np.sum((y_true - y_true.mean()) ** 2)) return float(1.0 - ss_res / ss_tot) if ss_tot > 0 else 0.0 def spearman_corr(y_true: np.ndarray, y_pred: np.ndarray) -> float: a = pd.Series(np.asarray(y_true, dtype=np.float64)) b = pd.Series(np.asarray(y_pred, dtype=np.float64)) value = a.corr(b, method="spearman") return float(value) if pd.notna(value) else 0.0 def build_splits(df: pd.DataFrame) -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]: ordered = df.sort_values("Ig_Date").reset_index(drop=True) n = len(ordered) train_end = max(int(round(n * 0.6)), 1) val_end = max(int(round(n * 0.8)), train_end + 1) val_end = min(val_end, n - 1) if n >= 3 else n train = ordered.iloc[:train_end].copy() val = ordered.iloc[train_end:val_end].copy() test = ordered.iloc[val_end:].copy() if len(val) == 0 and len(test) > 1: val = test.iloc[:1].copy() test = test.iloc[1:].copy() return train, val, test def feature_columns(df: pd.DataFrame, feature_profile: str = "all") -> Tuple[List[str], List[str]]: categorical = [c for c in CATEGORICAL_COLUMNS if c in df.columns] numeric = [] for col in df.columns: if col in DROP_COLUMNS or col in categorical: continue if pd.api.types.is_numeric_dtype(df[col]): numeric.append(col) if feature_profile == "weather_fm": numeric = [c for c in numeric if c.startswith("weather_")] categorical = [] return numeric, categorical def make_preprocessor(numeric_cols: List[str], categorical_cols: List[str]) -> ColumnTransformer: return ColumnTransformer( transformers=[ ( "num", Pipeline( steps=[ ("impute", SimpleImputer(strategy="median")), ("scale", StandardScaler()), ] ), numeric_cols, ), ( "cat", Pipeline( steps=[ ("impute", SimpleImputer(strategy="most_frequent")), ("onehot", OneHotEncoder(handle_unknown="ignore")), ] ), categorical_cols, ), ], remainder="drop", ) def to_dense_float32(x) -> np.ndarray: if hasattr(x, "toarray"): x = x.toarray() return np.asarray(x, dtype=np.float32) def weighted_prediction(sim: np.ndarray, targets: np.ndarray) -> float: weights = np.maximum((np.asarray(sim, dtype=np.float64) + 1.0) / 2.0, 1e-6) return float(np.sum(weights * targets) / np.sum(weights)) def graded_relevance(query_target: float, retrieved_targets: np.ndarray) -> np.ndarray: delta = np.abs(np.asarray(retrieved_targets, dtype=np.float64) - float(query_target)) return np.select([delta <= 0.5, delta <= 1.0, delta <= 1.5], [3.0, 2.0, 1.0], default=0.0) def dcg(relevance: np.ndarray) -> float: rel = np.asarray(relevance, dtype=np.float64) if rel.size == 0: return 0.0 discounts = 1.0 / np.log2(np.arange(rel.size, dtype=np.float64) + 2.0) return float(np.sum(rel * discounts)) def ndcg_at_k(relevance: np.ndarray, ideal_relevance: np.ndarray, k: int) -> float: rel = np.asarray(relevance, dtype=np.float64)[:k] ideal = np.asarray(ideal_relevance, dtype=np.float64)[:k] denom = dcg(ideal) return float(dcg(rel) / denom) if denom > 0 else 0.0 def score_backend( name: str, query_vec: np.ndarray, library_vec: np.ndarray, query_df: pd.DataFrame, library_df: pd.DataFrame, k: int, mode: str, ) -> Tuple[Dict[str, float], pd.DataFrame]: target_lib = library_df["target_log_burn_acres"].to_numpy(dtype=np.float64) rows = [] preds = [] ndcg5 = [] ndcg10 = [] hit1 = [] hit5 = [] hit10 = [] best_abs_delta = [] k_eff = min(int(k), int(library_vec.shape[0])) if name == "cosine_exact": sim_all = cosine_similarity(query_vec, library_vec) knn_idx = np.argsort(-sim_all, axis=1)[:, :k_eff] knn_sim = np.take_along_axis(sim_all, knn_idx, axis=1) else: library_norm = library_vec / np.clip(np.linalg.norm(library_vec, axis=1, keepdims=True), 1e-12, None) query_norm = query_vec / np.clip(np.linalg.norm(query_vec, axis=1, keepdims=True), 1e-12, None) if name == "faiss_flat_ip": index = faiss.IndexFlatIP(library_norm.shape[1]) index.add(library_norm.astype(np.float32)) knn_sim, knn_idx = index.search(query_norm.astype(np.float32), k_eff) elif name == "hnsw_cosine": index = hnswlib.Index(space="cosine", dim=library_norm.shape[1]) index.init_index(max_elements=library_norm.shape[0], ef_construction=100, M=16) index.add_items(library_norm.astype(np.float32), np.arange(library_norm.shape[0])) index.set_ef(max(50, k_eff)) knn_idx, dist = index.knn_query(query_norm.astype(np.float32), k=k_eff) knn_sim = 1.0 - dist else: raise ValueError(name) for i in range(query_df.shape[0]): order = knn_idx[i] top_sim = knn_sim[i] top_targets = target_lib[order] query_target = float(query_df.iloc[i]["target_log_burn_acres"]) relevance = graded_relevance(query_target, top_targets) ideal_relevance = np.sort(graded_relevance(query_target, target_lib))[::-1] abs_delta = np.abs(top_targets - float(query_df.iloc[i]["target_log_burn_acres"])) ndcg5.append(ndcg_at_k(relevance, ideal_relevance, 5)) ndcg10.append(ndcg_at_k(relevance, ideal_relevance, 10)) hit1.append(float(relevance[:1].max() >= 2.0)) hit5.append(float(relevance[: min(5, k_eff)].max() >= 2.0)) hit10.append(float(relevance[: min(10, k_eff)].max() >= 2.0)) best_abs_delta.append(float(abs_delta.min())) pred = float(np.mean(top_targets)) if mode == "mean" else weighted_prediction(top_sim, top_targets) preds.append(pred) rows.append( { "query_event_id": query_df.iloc[i]["Event_ID"], "true_log_burn_acres": float(query_df.iloc[i]["target_log_burn_acres"]), "pred_log_burn_acres": pred, "backend": name, "k": k, "effective_k": k_eff, "mode": mode, "top_relevance": relevance.tolist(), "best_abs_log_delta": float(abs_delta.min()), } ) pred_arr = np.asarray(preds, dtype=np.float64) true_log = query_df["target_log_burn_acres"].to_numpy(dtype=np.float64) true_acres = query_df["BurnBndAc"].to_numpy(dtype=np.float64) pred_acres = np.exp(pred_arr) metrics = { "count": int(len(query_df)), "log_mae": float(np.mean(np.abs(true_log - pred_arr))), "log_rmse": rmse(true_log, pred_arr), "log_r2": r2_score_manual(true_log, pred_arr), "log_spearman": spearman_corr(true_log, pred_arr), "log_median_ae": float(np.median(np.abs(true_log - pred_arr))), "acres_mae": float(np.mean(np.abs(true_acres - pred_acres))), "acres_rmse": rmse(true_acres, pred_acres), "acres_median_ae": float(np.median(np.abs(true_acres - pred_acres))), "acres_mape": mape(true_acres, pred_acres), "ndcg_at_5": float(np.mean(ndcg5)) if ndcg5 else 0.0, "ndcg_at_10": float(np.mean(ndcg10)) if ndcg10 else 0.0, "hit_at_1_tol1": float(np.mean(hit1)) if hit1 else 0.0, "hit_at_5_tol1": float(np.mean(hit5)) if hit5 else 0.0, "hit_at_10_tol1": float(np.mean(hit10)) if hit10 else 0.0, "mean_best_abs_log_delta_at_k": float(np.mean(best_abs_delta)) if best_abs_delta else 0.0, } return metrics, pd.DataFrame(rows) def target_weight_vectors(train_vec: np.ndarray, val_vec: np.ndarray, test_vec: np.ndarray, target: np.ndarray) -> Tuple[np.ndarray, np.ndarray, np.ndarray]: x = np.asarray(train_vec, dtype=np.float64) y = np.asarray(target, dtype=np.float64) y = y - y.mean() x_centered = x - x.mean(axis=0, keepdims=True) denom = np.clip(np.sqrt(np.sum(x_centered**2, axis=0)) * np.sqrt(np.sum(y**2)), 1e-12, None) corr = np.abs(np.sum(x_centered * y[:, None], axis=0) / denom) corr = np.nan_to_num(corr, nan=0.0, posinf=0.0, neginf=0.0) if float(corr.max()) > 0: corr = corr / float(corr.max()) weights = (0.25 + corr).astype(np.float32) return train_vec * weights, val_vec * weights, test_vec * weights def main() -> None: parser = argparse.ArgumentParser() parser.add_argument("--event-table", type=Path, required=True) parser.add_argument("--output-dir", type=Path, required=True) parser.add_argument("--selection-metric", choices=("log_mae", "ndcg_at_10"), default="ndcg_at_10") parser.add_argument("--feature-profile", choices=("all", "weather_fm"), default="all") parser.add_argument("--fm-family", type=str, default="") parser.add_argument("--seed", type=int, default=7) args = parser.parse_args() df = pd.read_csv(args.event_table) df["Ig_Date"] = pd.to_datetime(df["Ig_Date"]) train_df, val_df, test_df = build_splits(df) numeric_cols, categorical_cols = feature_columns(df, feature_profile=args.feature_profile) if not numeric_cols and not categorical_cols: raise SystemExit(f"No usable features found for profile={args.feature_profile}") x_cols = numeric_cols + categorical_cols pre = make_preprocessor(numeric_cols, categorical_cols) train_vec = to_dense_float32(pre.fit_transform(train_df[x_cols])) val_vec = to_dense_float32(pre.transform(val_df[x_cols])) test_vec = to_dense_float32(pre.transform(test_df[x_cols])) weighted_train_vec, weighted_val_vec, weighted_test_vec = target_weight_vectors( train_vec, val_vec, test_vec, train_df["target_log_burn_acres"].to_numpy(dtype=np.float64), ) vector_variants = { "standard": (train_vec, val_vec, test_vec), "target_weighted": (weighted_train_vec, weighted_val_vec, weighted_test_vec), } candidate_validation: List[Dict[str, object]] = [] best = None best_score = None best_val_rows = None best_test_rows = None for variant, (lib_vec, v_vec, _) in vector_variants.items(): for backend in ["cosine_exact", "faiss_flat_ip", "hnsw_cosine"]: for k in [1, 3, 5, 10]: for mode in ["mean", "weighted"]: val_metrics, val_rows = score_backend(backend, v_vec, lib_vec, val_df, train_df, k, mode) candidate_validation.append({"variant": variant, "backend": backend, "k": k, "mode": mode, "val_metrics": val_metrics}) score = float(val_metrics[args.selection_metric]) better = score > best_score if args.selection_metric == "ndcg_at_10" and best_score is not None else score < best_score if best_score is not None else True if better: best_score = score best = {"variant": variant, "backend": backend, "k": k, "mode": mode} best_val_rows = val_rows assert best is not None best_train_vec, _, best_test_vec = vector_variants[str(best["variant"])] test_metrics, test_rows = score_backend(best["backend"], best_test_vec, best_train_vec, test_df, train_df, int(best["k"]), str(best["mode"])) best_test_rows = test_rows args.output_dir.mkdir(parents=True, exist_ok=True) if best_val_rows is not None: best_val_rows.to_csv(args.output_dir / "val_retrieval_examples.csv", index=False) if best_test_rows is not None: best_test_rows.to_csv(args.output_dir / "test_retrieval_examples.csv", index=False) summary = { "task_id": "wildfire_analog_retrieval_taskmodels", "task_form": "event_level_retrieval_with_induced_outcome_error", "event_table": str(args.event_table), "output_dir": str(args.output_dir), "feature_profile": args.feature_profile, "seed": int(args.seed), "split_sizes": { "train": int(len(train_df)), "val": int(len(val_df)), "test": int(len(test_df)), }, "feature_columns": {"numeric": numeric_cols, "categorical": categorical_cols}, "candidate_validation": candidate_validation, "selected_retrieval": best, "selection_metric": args.selection_metric, "test_metrics": test_metrics, "model_family": "popular_open_source_retrieval_backends_with_train_only_target_weighting", "fm_family": (args.fm_family or "weather_fm_derived_features") if args.feature_profile == "weather_fm" else None, } (args.output_dir / "summary.json").write_text(json.dumps(summary, indent=2), encoding="utf-8") print(json.dumps(summary, indent=2)) if __name__ == "__main__": main()