tkbarb10's picture
download
raw
15.3 kB
"""
Run this script once after final model tuning is complete to generate all
resources/ artifacts used by the Streamlit app at runtime.
Usage:
python resources/precompute.py
Requirements:
- data/multiple_areas_modeling_v2.parquet must exist and be the final dataset
- models/xgboost_canada_optuna.pkl and models/xgboost_amazon_basin_optuna.pkl must exist
- data/tuning/xgboost_canada_tuning.parquet and xgboost_amazon_basin_tuning.parquet must exist
- data/baseline_evaluations/evaluations.parquet must exist
"""
import json
import pickle
import shutil
import sys
import pandas as pd
from pathlib import Path
from sklearn.metrics import confusion_matrix
sys.path.insert(0, str(Path(__file__).parents[1]))
from src.utils.data_utils import parse_geo_coords
RESOURCES = Path(__file__).parent
CACHE = RESOURCES / "cache"
CACHE.mkdir(exist_ok=True)
MODELS_DST = RESOURCES / "models"
MODELS_DST.mkdir(exist_ok=True)
DATA = RESOURCES.parent / "data"
MODELS = RESOURCES.parent / "models"
EMBEDDING_COLS = [f"A{i:02d}" for i in range(64)]
FEATURE_COLS = EMBEDDING_COLS + ["drift_magnitude"]
DERIVED_COLS = ["emb_mean", "emb_std", "emb_min", "emb_max", "emb_range",
"emb_std_z", "drift_rolling_2y", "drift_cumulative",
"drift_z_within_area", "loss_prototype_sim", "noloss_prototype_sim",
"drift_magnitude"]
FOCUS_AREAS = {"canada": "Canada", "amazon_basin": "Amazon Basin"}
NON_FOCUS_AREAS = {
"guinea": "Guinea",
"congo_basin_drc": "Congo Basin DRC",
"indonesia_malaysia": "Indonesia-Malaysia",
"mekong_region": "Mekong Region",
"cerrado_brazil": "Cerrado Brazil",
}
def main():
print("Loading data (this may take a minute)...")
df = pd.read_parquet(DATA / "multiple_areas_modeling_v2.parquet")
evals = pd.read_parquet(DATA / "baseline_evaluations" / "evaluations.parquet")
print(f" Loaded {len(df):,} rows")
print("Generating kpi_summary.json...")
_make_kpi_summary(df, evals)
print("Generating mean_embedding_profile.parquet...")
_make_embedding_profile(df)
print("Generating drift_by_area_year.parquet...")
_make_drift_by_area_year(df)
print("Generating target_distribution.json...")
_make_target_distribution(df)
print("Generating timelapse_sample.parquet...")
_make_timelapse_sample(df)
for key in FOCUS_AREAS:
print(f"Generating {key}_best_trial.json...")
_make_best_trial_json(key)
print(f"Generating {key}_feature_importance.json...")
_make_area_feature_importance(key)
print(f"Generating {key}_confusion_matrix.json (runs model on test split)...")
_make_area_confusion_matrix(df, key)
print(f"Generating {key}_data_sample.parquet...")
_make_area_data_sample(df, key)
print(f"Generating {key}_test_predictions.parquet...")
_make_test_predictions(df, key)
print("Generating comparison_kpis.json...")
_make_comparison_kpis(df)
print("Generating aoi_stats.json...")
_make_aoi_stats(df)
print("Copying model files to resources/ for HuggingFace Space compatibility...")
for key in FOCUS_AREAS:
src = MODELS / f"xgboost_{key}_optuna.pkl"
dst = MODELS_DST / f"xgboost_{key}_optuna.pkl"
if src.exists() and not dst.exists():
shutil.copy2(src, dst)
print(f" Copied {src.name} -> resources/")
elif dst.exists():
print(f" {dst.name} already exists in resources/, skipping")
print("Done! All resources/ artifacts generated.")
# ── Existing functions ─────────────────────────────────────────────────────────
def _make_kpi_summary(df: pd.DataFrame, evals: pd.DataFrame):
# Baseline: logistic regression row from evaluations file
lr_rows = evals[evals["model"].str.contains("logistic", case=False, na=False)]
if lr_rows.empty:
lr_rows = evals
lr_row = lr_rows.loc[lr_rows["pr_auc"].idxmax()] # type: ignore
baseline_pr_auc = float(lr_row["pr_auc"]) # type: ignore
# Best model: Canada XGBoost (top performer) from its tuning parquet
best_row = _best_trial_row("canada")
best_pr_auc = float(best_row["user_attrs_test_aucpr"]) # type: ignore
best_f1 = float(best_row["user_attrs_test_f1"]) # type: ignore
best_precision = float(best_row["user_attrs_test_precision"]) # type: ignore
best_recall = float(best_row["user_attrs_test_recall"]) # type: ignore
best_brier = float(best_row["user_attrs_test_brier"]) # type: ignore
improvement = ((best_pr_auc - baseline_pr_auc) / max(baseline_pr_auc, 1e-9)) * 100
n_engineered = len(DERIVED_COLS)
areas = df["name"].nunique() if "name" in df.columns else 7
kpi = {
"total_rows": int(len(df)),
"n_areas": int(areas),
"n_features_engineered": n_engineered,
"best_model": "XGBoost (Canada)",
"pr_auc": round(best_pr_auc, 4),
"f1": round(best_f1, 4),
"brier": round(best_brier, 4),
"precision": round(best_precision, 4),
"recall": round(best_recall, 4),
"baseline_pr_auc": round(baseline_pr_auc, 4),
"improvement_pct": round(float(improvement), 1),
}
(CACHE / "kpi_summary.json").write_text(json.dumps(kpi, indent=2))
def _make_embedding_profile(df: pd.DataFrame):
group_cols = ["loss_label", "name", "year"]
available = [c for c in group_cols if c in df.columns]
emb_cols = [c for c in EMBEDDING_COLS if c in df.columns]
profile = df.groupby(available)[emb_cols].mean().reset_index()
melted = profile.melt(id_vars=available, value_vars=emb_cols, var_name="dim", value_name="mean_value")
melted["dim_idx"] = melted["dim"].str[1:].astype(int)
melted.to_parquet(CACHE / "mean_embedding_profile.parquet", index=False)
def _make_drift_by_area_year(df: pd.DataFrame):
group_cols = [c for c in ["name", "year", "loss_label"] if c in df.columns]
agg = (
df.groupby(group_cols)["drift_magnitude"]
.agg(mean_drift="mean", std_drift="std", count="count")
.reset_index()
)
agg.to_parquet(CACHE / "drift_by_area_year.parquet", index=False)
def _make_target_distribution(df: pd.DataFrame):
overall = df["loss_label"].value_counts().to_dict()
per_area = (
df.groupby("name")["loss_label"].value_counts()
.unstack(fill_value=0)
.rename(columns={0: "no_loss", 1: "loss"})
.reset_index()
)
per_area["loss_rate"] = per_area["loss"] / (per_area["loss"] + per_area["no_loss"])
dist = {
"total": int(len(df)),
"loss": int(overall.get(1, 0)),
"no_loss": int(overall.get(0, 0)),
"loss_rate": round(overall.get(1, 0) / len(df), 4),
"per_area": per_area.to_dict(orient="records"),
}
(CACHE / "target_distribution.json").write_text(json.dumps(dist, indent=2, default=float))
def _make_timelapse_sample(df: pd.DataFrame):
# Parse lat/lon from .geo GeoJSON column
df = df.copy()
if ".geo" in df.columns and "latitude" not in df.columns:
df = parse_geo_coords(df)
cols = [c for c in ["latitude", "longitude", "year", "drift_magnitude", "loss_label", "name"] if c in df.columns]
loss_n = min(5_000, len(df[df["loss_label"] == 1]))
noloss_n = min(5_000, len(df[df["loss_label"] == 0]))
sample = pd.concat([
df[df["loss_label"] == 1].sample(loss_n, random_state=42),
df[df["loss_label"] == 0].sample(noloss_n, random_state=42),
])[cols].reset_index(drop=True)
sample.to_parquet(CACHE / "timelapse_sample.parquet", index=False)
# ── Comparison helpers ─────────────────────────────────────────────────────────
def _best_trial_row(area_key: str) -> pd.Series:
path = DATA / "tuning" / f"xgboost_{area_key}_tuning.parquet"
trials = pd.read_parquet(path)
completed = trials[trials["state"] == "COMPLETE"]
return completed.loc[completed["value"].idxmax()] # type: ignore
def _make_best_trial_json(area_key: str):
row = _best_trial_row(area_key)
trials = pd.read_parquet(DATA / "tuning" / f"xgboost_{area_key}_tuning.parquet")
n_trials = int((trials["state"] == "COMPLETE").sum())
params = {
str(k).removeprefix("params_"): round(float(v), 6) if isinstance(v, float) else int(v)
for k, v in row.items()
if str(k).startswith("params_")
}
trial_data = {
"pr_auc": round(float(row["user_attrs_test_aucpr"]), 4),
"f1": round(float(row["user_attrs_test_f1"]), 4),
"precision": round(float(row["user_attrs_test_precision"]), 4),
"recall": round(float(row["user_attrs_test_recall"]), 4),
"brier": round(float(row["user_attrs_test_brier"]), 4),
"n_trials": n_trials,
"params": params,
}
(CACHE / f"{area_key}_best_trial.json").write_text(json.dumps(trial_data, indent=2))
def _make_area_feature_importance(area_key: str):
row = _best_trial_row(area_key)
gains: dict = row["user_attrs_gain_importances"] # type: ignore
sorted_gains = sorted(gains.items(), key=lambda x: x[1], reverse=True)[:15]
result = [{"feature": k, "importance": round(float(v), 4)} for k, v in sorted_gains]
(CACHE / f"{area_key}_feature_importance.json").write_text(json.dumps(result, indent=2))
def _make_area_confusion_matrix(df: pd.DataFrame, area_key: str):
from sklearn.model_selection import train_test_split as _tts
model_path = MODELS / f"xgboost_{area_key}_optuna.pkl"
if not model_path.exists():
print(f" WARNING: {model_path.name} not found, skipping confusion matrix for {area_key}")
return
subset = df[df["name"] == area_key].copy()
feat_cols = [c for c in FEATURE_COLS if c in subset.columns]
# Replicate the exact split from the Group By Training notebook cell:
# geo-level stratified split on max loss_label per .geo location
geo_labels = subset.groupby(".geo")["loss_label"].max()
_, test_geos = _tts(
geo_labels.index,
test_size=0.2,
random_state=10,
stratify=geo_labels.to_numpy(),
)
test = subset[subset[".geo"].isin(test_geos)]
with open(model_path, "rb") as f:
model = pickle.load(f)
X_test = test[feat_cols].values
y_test = test["loss_label"].values
y_pred = model.predict(X_test)
tn, fp, fn, tp = confusion_matrix(y_test, y_pred).ravel() # type: ignore
cm = {
"tn": int(tn), "fp": int(fp), "fn": int(fn), "tp": int(tp),
"model": f"xgboost_{area_key}_optuna",
"n_test": int(len(test)),
}
(CACHE / f"{area_key}_confusion_matrix.json").write_text(json.dumps(cm, indent=2))
def _make_area_data_sample(df: pd.DataFrame, area_key: str):
subset = df[df["name"] == area_key].copy()
# Parse lat/lon from .geo GeoJSON
if ".geo" in subset.columns:
subset = parse_geo_coords(subset)
keep_cols = list(dict.fromkeys(
c for c in ["latitude", "longitude", "year", "drift_magnitude", "loss_label", "name"] + DERIVED_COLS
if c in subset.columns
))
loss_n = min(5_000, int((subset["loss_label"] == 1).sum()))
noloss_n = min(5_000, int((subset["loss_label"] == 0).sum()))
sample = pd.concat([
subset[subset["loss_label"] == 1].sample(loss_n, random_state=42),
subset[subset["loss_label"] == 0].sample(noloss_n, random_state=42),
])[keep_cols].reset_index(drop=True)
sample.to_parquet(CACHE / f"{area_key}_data_sample.parquet", index=False)
def _make_test_predictions(df: pd.DataFrame, area_key: str):
from sklearn.model_selection import train_test_split as _tts
model_path = MODELS / f"xgboost_{area_key}_optuna.pkl"
if not model_path.exists():
print(f" WARNING: {model_path.name} not found, skipping test predictions for {area_key}")
return
subset = df[df["name"] == area_key].copy()
feat_cols = [c for c in FEATURE_COLS if c in subset.columns]
geo_labels = subset.groupby(".geo")["loss_label"].max()
_, test_geos = _tts(
geo_labels.index,
test_size=0.2,
random_state=10,
stratify=geo_labels.to_numpy(),
)
test = subset[subset[".geo"].isin(test_geos)].copy()
with open(model_path, "rb") as f:
model = pickle.load(f)
y_prob = model.predict_proba(test[feat_cols].values)[:, 1]
if ".geo" in test.columns and "latitude" not in test.columns:
test = parse_geo_coords(test)
result = pd.DataFrame({
"probability": y_prob,
"true_label": test["loss_label"].values,
"year": test["year"].values if "year" in test.columns else 0,
})
if "latitude" in test.columns:
result["latitude"] = test["latitude"].values
result["longitude"] = test["longitude"].values
result.to_parquet(CACHE / f"{area_key}_test_predictions.parquet", index=False)
def _make_comparison_kpis(df: pd.DataFrame):
result = {}
for key in FOCUS_AREAS:
trial_path = CACHE / f"{key}_best_trial.json"
if not trial_path.exists():
print(f" WARNING: {trial_path.name} not found, skipping {key} in comparison_kpis")
continue
trial = json.loads(trial_path.read_text())
subset = df[df["name"] == key]
loss_count = int((subset["loss_label"] == 1).sum())
total = int(len(subset))
loss_rate = round(loss_count / total, 4) if total else 0.0
result[key] = {
"pr_auc": trial["pr_auc"],
"f1": trial["f1"],
"precision": trial["precision"],
"recall": trial["recall"],
"brier": trial["brier"],
"loss_rate": loss_rate,
"n_rows": total,
}
(CACHE / "comparison_kpis.json").write_text(json.dumps(result, indent=2))
def _make_aoi_stats(df: pd.DataFrame):
canada_trial_path = CACHE / "canada_best_trial.json"
canada_pr_auc = json.loads(canada_trial_path.read_text()).get("pr_auc", 0.0) if canada_trial_path.exists() else 0.0
result = {}
for key in NON_FOCUS_AREAS:
sub = df[df["name"] == key]
if sub.empty:
print(f" WARNING: no rows found for '{key}' in dataset, skipping")
continue
row = _best_trial_row(key)
pr_auc = round(float(row.get("user_attrs_test_aucpr", 0.0)), 4) # type: ignore
f1 = round(float(row.get("user_attrs_test_f1", 0.0)), 4) # type: ignore
result[key] = {
"n_pixel_years": int(len(sub)),
"n_pixels": int(sub[".geo"].nunique()) if ".geo" in sub.columns else None,
"loss_rate": round(float(sub["loss_label"].mean()), 4),
"drift_mean": round(float(sub["drift_magnitude"].mean()), 3),
"drift_max": round(float(sub["drift_magnitude"].max()), 3),
"drift_std": round(float(sub["drift_magnitude"].std()), 3),
"pr_auc": pr_auc,
"f1": f1,
"auc_delta": round(pr_auc - canada_pr_auc, 4),
}
(CACHE / "aoi_stats.json").write_text(json.dumps(result, indent=2))
if __name__ == "__main__":
main()

Xet Storage Details

Size:
15.3 kB
·
Xet hash:
48e7c29a3f78c848fa6e700a3926f5f8068f847d1854ebae52416c0dca5c843a

Xet efficiently stores files, intelligently splitting them into unique chunks and accelerating uploads and downloads. More info.