Buckets:
| """ | |
| Run this script once after final model tuning is complete to generate all | |
| resources/ artifacts used by the Streamlit app at runtime. | |
| Usage: | |
| python resources/precompute.py | |
| Requirements: | |
| - data/multiple_areas_modeling_v2.parquet must exist and be the final dataset | |
| - models/xgboost_canada_optuna.pkl and models/xgboost_amazon_basin_optuna.pkl must exist | |
| - data/tuning/xgboost_canada_tuning.parquet and xgboost_amazon_basin_tuning.parquet must exist | |
| - data/baseline_evaluations/evaluations.parquet must exist | |
| """ | |
| import json | |
| import pickle | |
| import shutil | |
| import sys | |
| import pandas as pd | |
| from pathlib import Path | |
| from sklearn.metrics import confusion_matrix | |
| sys.path.insert(0, str(Path(__file__).parents[1])) | |
| from src.utils.data_utils import parse_geo_coords | |
| RESOURCES = Path(__file__).parent | |
| CACHE = RESOURCES / "cache" | |
| CACHE.mkdir(exist_ok=True) | |
| MODELS_DST = RESOURCES / "models" | |
| MODELS_DST.mkdir(exist_ok=True) | |
| DATA = RESOURCES.parent / "data" | |
| MODELS = RESOURCES.parent / "models" | |
| EMBEDDING_COLS = [f"A{i:02d}" for i in range(64)] | |
| FEATURE_COLS = EMBEDDING_COLS + ["drift_magnitude"] | |
| DERIVED_COLS = ["emb_mean", "emb_std", "emb_min", "emb_max", "emb_range", | |
| "emb_std_z", "drift_rolling_2y", "drift_cumulative", | |
| "drift_z_within_area", "loss_prototype_sim", "noloss_prototype_sim", | |
| "drift_magnitude"] | |
| FOCUS_AREAS = {"canada": "Canada", "amazon_basin": "Amazon Basin"} | |
| NON_FOCUS_AREAS = { | |
| "guinea": "Guinea", | |
| "congo_basin_drc": "Congo Basin DRC", | |
| "indonesia_malaysia": "Indonesia-Malaysia", | |
| "mekong_region": "Mekong Region", | |
| "cerrado_brazil": "Cerrado Brazil", | |
| } | |
| def main(): | |
| print("Loading data (this may take a minute)...") | |
| df = pd.read_parquet(DATA / "multiple_areas_modeling_v2.parquet") | |
| evals = pd.read_parquet(DATA / "baseline_evaluations" / "evaluations.parquet") | |
| print(f" Loaded {len(df):,} rows") | |
| print("Generating kpi_summary.json...") | |
| _make_kpi_summary(df, evals) | |
| print("Generating mean_embedding_profile.parquet...") | |
| _make_embedding_profile(df) | |
| print("Generating drift_by_area_year.parquet...") | |
| _make_drift_by_area_year(df) | |
| print("Generating target_distribution.json...") | |
| _make_target_distribution(df) | |
| print("Generating timelapse_sample.parquet...") | |
| _make_timelapse_sample(df) | |
| for key in FOCUS_AREAS: | |
| print(f"Generating {key}_best_trial.json...") | |
| _make_best_trial_json(key) | |
| print(f"Generating {key}_feature_importance.json...") | |
| _make_area_feature_importance(key) | |
| print(f"Generating {key}_confusion_matrix.json (runs model on test split)...") | |
| _make_area_confusion_matrix(df, key) | |
| print(f"Generating {key}_data_sample.parquet...") | |
| _make_area_data_sample(df, key) | |
| print(f"Generating {key}_test_predictions.parquet...") | |
| _make_test_predictions(df, key) | |
| print("Generating comparison_kpis.json...") | |
| _make_comparison_kpis(df) | |
| print("Generating aoi_stats.json...") | |
| _make_aoi_stats(df) | |
| print("Copying model files to resources/ for HuggingFace Space compatibility...") | |
| for key in FOCUS_AREAS: | |
| src = MODELS / f"xgboost_{key}_optuna.pkl" | |
| dst = MODELS_DST / f"xgboost_{key}_optuna.pkl" | |
| if src.exists() and not dst.exists(): | |
| shutil.copy2(src, dst) | |
| print(f" Copied {src.name} -> resources/") | |
| elif dst.exists(): | |
| print(f" {dst.name} already exists in resources/, skipping") | |
| print("Done! All resources/ artifacts generated.") | |
| # ── Existing functions ───────────────────────────────────────────────────────── | |
| def _make_kpi_summary(df: pd.DataFrame, evals: pd.DataFrame): | |
| # Baseline: logistic regression row from evaluations file | |
| lr_rows = evals[evals["model"].str.contains("logistic", case=False, na=False)] | |
| if lr_rows.empty: | |
| lr_rows = evals | |
| lr_row = lr_rows.loc[lr_rows["pr_auc"].idxmax()] # type: ignore | |
| baseline_pr_auc = float(lr_row["pr_auc"]) # type: ignore | |
| # Best model: Canada XGBoost (top performer) from its tuning parquet | |
| best_row = _best_trial_row("canada") | |
| best_pr_auc = float(best_row["user_attrs_test_aucpr"]) # type: ignore | |
| best_f1 = float(best_row["user_attrs_test_f1"]) # type: ignore | |
| best_precision = float(best_row["user_attrs_test_precision"]) # type: ignore | |
| best_recall = float(best_row["user_attrs_test_recall"]) # type: ignore | |
| best_brier = float(best_row["user_attrs_test_brier"]) # type: ignore | |
| improvement = ((best_pr_auc - baseline_pr_auc) / max(baseline_pr_auc, 1e-9)) * 100 | |
| n_engineered = len(DERIVED_COLS) | |
| areas = df["name"].nunique() if "name" in df.columns else 7 | |
| kpi = { | |
| "total_rows": int(len(df)), | |
| "n_areas": int(areas), | |
| "n_features_engineered": n_engineered, | |
| "best_model": "XGBoost (Canada)", | |
| "pr_auc": round(best_pr_auc, 4), | |
| "f1": round(best_f1, 4), | |
| "brier": round(best_brier, 4), | |
| "precision": round(best_precision, 4), | |
| "recall": round(best_recall, 4), | |
| "baseline_pr_auc": round(baseline_pr_auc, 4), | |
| "improvement_pct": round(float(improvement), 1), | |
| } | |
| (CACHE / "kpi_summary.json").write_text(json.dumps(kpi, indent=2)) | |
| def _make_embedding_profile(df: pd.DataFrame): | |
| group_cols = ["loss_label", "name", "year"] | |
| available = [c for c in group_cols if c in df.columns] | |
| emb_cols = [c for c in EMBEDDING_COLS if c in df.columns] | |
| profile = df.groupby(available)[emb_cols].mean().reset_index() | |
| melted = profile.melt(id_vars=available, value_vars=emb_cols, var_name="dim", value_name="mean_value") | |
| melted["dim_idx"] = melted["dim"].str[1:].astype(int) | |
| melted.to_parquet(CACHE / "mean_embedding_profile.parquet", index=False) | |
| def _make_drift_by_area_year(df: pd.DataFrame): | |
| group_cols = [c for c in ["name", "year", "loss_label"] if c in df.columns] | |
| agg = ( | |
| df.groupby(group_cols)["drift_magnitude"] | |
| .agg(mean_drift="mean", std_drift="std", count="count") | |
| .reset_index() | |
| ) | |
| agg.to_parquet(CACHE / "drift_by_area_year.parquet", index=False) | |
| def _make_target_distribution(df: pd.DataFrame): | |
| overall = df["loss_label"].value_counts().to_dict() | |
| per_area = ( | |
| df.groupby("name")["loss_label"].value_counts() | |
| .unstack(fill_value=0) | |
| .rename(columns={0: "no_loss", 1: "loss"}) | |
| .reset_index() | |
| ) | |
| per_area["loss_rate"] = per_area["loss"] / (per_area["loss"] + per_area["no_loss"]) | |
| dist = { | |
| "total": int(len(df)), | |
| "loss": int(overall.get(1, 0)), | |
| "no_loss": int(overall.get(0, 0)), | |
| "loss_rate": round(overall.get(1, 0) / len(df), 4), | |
| "per_area": per_area.to_dict(orient="records"), | |
| } | |
| (CACHE / "target_distribution.json").write_text(json.dumps(dist, indent=2, default=float)) | |
| def _make_timelapse_sample(df: pd.DataFrame): | |
| # Parse lat/lon from .geo GeoJSON column | |
| df = df.copy() | |
| if ".geo" in df.columns and "latitude" not in df.columns: | |
| df = parse_geo_coords(df) | |
| cols = [c for c in ["latitude", "longitude", "year", "drift_magnitude", "loss_label", "name"] if c in df.columns] | |
| loss_n = min(5_000, len(df[df["loss_label"] == 1])) | |
| noloss_n = min(5_000, len(df[df["loss_label"] == 0])) | |
| sample = pd.concat([ | |
| df[df["loss_label"] == 1].sample(loss_n, random_state=42), | |
| df[df["loss_label"] == 0].sample(noloss_n, random_state=42), | |
| ])[cols].reset_index(drop=True) | |
| sample.to_parquet(CACHE / "timelapse_sample.parquet", index=False) | |
| # ── Comparison helpers ───────────────────────────────────────────────────────── | |
| def _best_trial_row(area_key: str) -> pd.Series: | |
| path = DATA / "tuning" / f"xgboost_{area_key}_tuning.parquet" | |
| trials = pd.read_parquet(path) | |
| completed = trials[trials["state"] == "COMPLETE"] | |
| return completed.loc[completed["value"].idxmax()] # type: ignore | |
| def _make_best_trial_json(area_key: str): | |
| row = _best_trial_row(area_key) | |
| trials = pd.read_parquet(DATA / "tuning" / f"xgboost_{area_key}_tuning.parquet") | |
| n_trials = int((trials["state"] == "COMPLETE").sum()) | |
| params = { | |
| str(k).removeprefix("params_"): round(float(v), 6) if isinstance(v, float) else int(v) | |
| for k, v in row.items() | |
| if str(k).startswith("params_") | |
| } | |
| trial_data = { | |
| "pr_auc": round(float(row["user_attrs_test_aucpr"]), 4), | |
| "f1": round(float(row["user_attrs_test_f1"]), 4), | |
| "precision": round(float(row["user_attrs_test_precision"]), 4), | |
| "recall": round(float(row["user_attrs_test_recall"]), 4), | |
| "brier": round(float(row["user_attrs_test_brier"]), 4), | |
| "n_trials": n_trials, | |
| "params": params, | |
| } | |
| (CACHE / f"{area_key}_best_trial.json").write_text(json.dumps(trial_data, indent=2)) | |
| def _make_area_feature_importance(area_key: str): | |
| row = _best_trial_row(area_key) | |
| gains: dict = row["user_attrs_gain_importances"] # type: ignore | |
| sorted_gains = sorted(gains.items(), key=lambda x: x[1], reverse=True)[:15] | |
| result = [{"feature": k, "importance": round(float(v), 4)} for k, v in sorted_gains] | |
| (CACHE / f"{area_key}_feature_importance.json").write_text(json.dumps(result, indent=2)) | |
| def _make_area_confusion_matrix(df: pd.DataFrame, area_key: str): | |
| from sklearn.model_selection import train_test_split as _tts | |
| model_path = MODELS / f"xgboost_{area_key}_optuna.pkl" | |
| if not model_path.exists(): | |
| print(f" WARNING: {model_path.name} not found, skipping confusion matrix for {area_key}") | |
| return | |
| subset = df[df["name"] == area_key].copy() | |
| feat_cols = [c for c in FEATURE_COLS if c in subset.columns] | |
| # Replicate the exact split from the Group By Training notebook cell: | |
| # geo-level stratified split on max loss_label per .geo location | |
| geo_labels = subset.groupby(".geo")["loss_label"].max() | |
| _, test_geos = _tts( | |
| geo_labels.index, | |
| test_size=0.2, | |
| random_state=10, | |
| stratify=geo_labels.to_numpy(), | |
| ) | |
| test = subset[subset[".geo"].isin(test_geos)] | |
| with open(model_path, "rb") as f: | |
| model = pickle.load(f) | |
| X_test = test[feat_cols].values | |
| y_test = test["loss_label"].values | |
| y_pred = model.predict(X_test) | |
| tn, fp, fn, tp = confusion_matrix(y_test, y_pred).ravel() # type: ignore | |
| cm = { | |
| "tn": int(tn), "fp": int(fp), "fn": int(fn), "tp": int(tp), | |
| "model": f"xgboost_{area_key}_optuna", | |
| "n_test": int(len(test)), | |
| } | |
| (CACHE / f"{area_key}_confusion_matrix.json").write_text(json.dumps(cm, indent=2)) | |
| def _make_area_data_sample(df: pd.DataFrame, area_key: str): | |
| subset = df[df["name"] == area_key].copy() | |
| # Parse lat/lon from .geo GeoJSON | |
| if ".geo" in subset.columns: | |
| subset = parse_geo_coords(subset) | |
| keep_cols = list(dict.fromkeys( | |
| c for c in ["latitude", "longitude", "year", "drift_magnitude", "loss_label", "name"] + DERIVED_COLS | |
| if c in subset.columns | |
| )) | |
| loss_n = min(5_000, int((subset["loss_label"] == 1).sum())) | |
| noloss_n = min(5_000, int((subset["loss_label"] == 0).sum())) | |
| sample = pd.concat([ | |
| subset[subset["loss_label"] == 1].sample(loss_n, random_state=42), | |
| subset[subset["loss_label"] == 0].sample(noloss_n, random_state=42), | |
| ])[keep_cols].reset_index(drop=True) | |
| sample.to_parquet(CACHE / f"{area_key}_data_sample.parquet", index=False) | |
| def _make_test_predictions(df: pd.DataFrame, area_key: str): | |
| from sklearn.model_selection import train_test_split as _tts | |
| model_path = MODELS / f"xgboost_{area_key}_optuna.pkl" | |
| if not model_path.exists(): | |
| print(f" WARNING: {model_path.name} not found, skipping test predictions for {area_key}") | |
| return | |
| subset = df[df["name"] == area_key].copy() | |
| feat_cols = [c for c in FEATURE_COLS if c in subset.columns] | |
| geo_labels = subset.groupby(".geo")["loss_label"].max() | |
| _, test_geos = _tts( | |
| geo_labels.index, | |
| test_size=0.2, | |
| random_state=10, | |
| stratify=geo_labels.to_numpy(), | |
| ) | |
| test = subset[subset[".geo"].isin(test_geos)].copy() | |
| with open(model_path, "rb") as f: | |
| model = pickle.load(f) | |
| y_prob = model.predict_proba(test[feat_cols].values)[:, 1] | |
| if ".geo" in test.columns and "latitude" not in test.columns: | |
| test = parse_geo_coords(test) | |
| result = pd.DataFrame({ | |
| "probability": y_prob, | |
| "true_label": test["loss_label"].values, | |
| "year": test["year"].values if "year" in test.columns else 0, | |
| }) | |
| if "latitude" in test.columns: | |
| result["latitude"] = test["latitude"].values | |
| result["longitude"] = test["longitude"].values | |
| result.to_parquet(CACHE / f"{area_key}_test_predictions.parquet", index=False) | |
| def _make_comparison_kpis(df: pd.DataFrame): | |
| result = {} | |
| for key in FOCUS_AREAS: | |
| trial_path = CACHE / f"{key}_best_trial.json" | |
| if not trial_path.exists(): | |
| print(f" WARNING: {trial_path.name} not found, skipping {key} in comparison_kpis") | |
| continue | |
| trial = json.loads(trial_path.read_text()) | |
| subset = df[df["name"] == key] | |
| loss_count = int((subset["loss_label"] == 1).sum()) | |
| total = int(len(subset)) | |
| loss_rate = round(loss_count / total, 4) if total else 0.0 | |
| result[key] = { | |
| "pr_auc": trial["pr_auc"], | |
| "f1": trial["f1"], | |
| "precision": trial["precision"], | |
| "recall": trial["recall"], | |
| "brier": trial["brier"], | |
| "loss_rate": loss_rate, | |
| "n_rows": total, | |
| } | |
| (CACHE / "comparison_kpis.json").write_text(json.dumps(result, indent=2)) | |
| def _make_aoi_stats(df: pd.DataFrame): | |
| canada_trial_path = CACHE / "canada_best_trial.json" | |
| canada_pr_auc = json.loads(canada_trial_path.read_text()).get("pr_auc", 0.0) if canada_trial_path.exists() else 0.0 | |
| result = {} | |
| for key in NON_FOCUS_AREAS: | |
| sub = df[df["name"] == key] | |
| if sub.empty: | |
| print(f" WARNING: no rows found for '{key}' in dataset, skipping") | |
| continue | |
| row = _best_trial_row(key) | |
| pr_auc = round(float(row.get("user_attrs_test_aucpr", 0.0)), 4) # type: ignore | |
| f1 = round(float(row.get("user_attrs_test_f1", 0.0)), 4) # type: ignore | |
| result[key] = { | |
| "n_pixel_years": int(len(sub)), | |
| "n_pixels": int(sub[".geo"].nunique()) if ".geo" in sub.columns else None, | |
| "loss_rate": round(float(sub["loss_label"].mean()), 4), | |
| "drift_mean": round(float(sub["drift_magnitude"].mean()), 3), | |
| "drift_max": round(float(sub["drift_magnitude"].max()), 3), | |
| "drift_std": round(float(sub["drift_magnitude"].std()), 3), | |
| "pr_auc": pr_auc, | |
| "f1": f1, | |
| "auc_delta": round(pr_auc - canada_pr_auc, 4), | |
| } | |
| (CACHE / "aoi_stats.json").write_text(json.dumps(result, indent=2)) | |
| if __name__ == "__main__": | |
| main() | |
Xet Storage Details
- Size:
- 15.3 kB
- Xet hash:
- 48e7c29a3f78c848fa6e700a3926f5f8068f847d1854ebae52416c0dca5c843a
·
Xet efficiently stores files, intelligently splitting them into unique chunks and accelerating uploads and downloads. More info.