autofarm / apps /intervention_simulation.py
isabelku's picture
AutoFarm Space deploy
826dd96
"""
Intervention Simulation
=======================
Standalone script that simulates stress challenges and robot interventions
on a field, predicts before/after yields using the combined SHAP + rule-based
classification approach (same as intervention_predictions_pipeline.py), and
outputs a results CSV + 3-panel PNG map.
Usage:
python apps/intervention_simulation.py
"""
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
import matplotlib.patches as mpatches
import numpy as np
import pandas as pd
import pickle
import warnings
import gc
import hashlib
import json
from pathlib import Path
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error, r2_score
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from tabpfn import TabPFNRegressor
import netCDF4 as nclib
warnings.filterwarnings("ignore")
# ── Paths ─────────────────────────────────────────────────────────────────────
DATA_ROOT = Path("./yieldsat_data")
RESULTS_DIR = Path("./results")
NC_PATH = DATA_ROOT / "Preprocessed/Germany/merge_s2-soil-dem-weather-coords.nc"
CHALLENGES_CSV = RESULTS_DIR / "sim_input_challenges.csv"
INTERVENTIONS_CSV = RESULTS_DIR / "sim_input_interventions.csv"
SIM_DATA_DIR = Path("data/raw/simulator/real_field")
INTERVENTION_ZONES_CSV = SIM_DATA_DIR / "de_0100_full_field_source.csv"
SHAP_CACHE_PATH = RESULTS_DIR / "shap_pca_cache_DE_0100.pkl"
SIM_CACHE_DIR = RESULTS_DIR / "_sim_cache"
OUTPUT_CSV = RESULTS_DIR / "farm_intervention_simulation.csv"
OUTPUT_PNG = RESULTS_DIR / "farm_intervention_simulation_map.png"
# ── Model Parameters ──────────────────────────────────────────────────────────
TABPFN_MAX_TRAIN = 2000
LOW_YIELD_PERCENTILE = 25
ZSCORE_THRESHOLD = 1.2
RECOVERY_FRACTION = 0.6
EPS = 1e-8
# ── Combination Parameters (from pipeline) ───────────────────────────────────
AGREE_BOOST = 0.15
DISAGREE_PENALTY = 0.20
SHAP_RESCUE_DISCOUNT = 0.8
RULE_ONLY_DISCOUNT = 0.7
SHAP_MIN_MAGNITUDE = 0.05
SHAP_MARGIN_THRESHOLD = 0.05
# ── Mappings ──────────────────────────────────────────────────────────────────
SOIL_DEPTHS = ["0-5", "5-15", "15-30", "30-60", "60-100", "100-200"]
SOIL_PROPS = ["clay", "sand", "silt", "soc", "phh2o", "cec", "nitrogen", "cfvo"]
PCA_GROUPS = {
"soil_texture": {
"cols": [f"{p}_{d}" for p in ["clay", "sand", "silt"] for d in SOIL_DEPTHS],
"n_components": 2,
},
"soil_chemistry": {
"cols": [
f"{p}_{d}" for p in ["soc", "cec", "nitrogen", "phh2o"] for d in SOIL_DEPTHS
],
"n_components": 3,
},
"coarse_fragments": {
"cols": [f"cfvo_{d}" for d in SOIL_DEPTHS],
"n_components": 1,
},
"temperature": {
"cols": ["temp_mean_mean", "temp_max_mean", "temp_min_mean"],
"n_components": 1,
},
}
ACTION_MAP = {
"drought": "irrigate",
"waterlogging": "install_drainage",
"nutrient_deficiency": "apply_fertilizer_N",
"compaction": "subsoil",
"poor_drainage": "install_drainage",
"healthy_low_yield_anomaly": "inspect_manually",
"unclassified": "inspect_manually",
}
STRESS_COLORS = {
"drought": "#2196F3",
"waterlogging": "#795548",
"nutrient_deficiency": "#4CAF50",
"compaction": "#FF9800",
"poor_drainage": "#00BCD4",
"healthy_low_yield_anomaly": "#E91E63",
"unclassified": "#9E9E9E",
}
ACTION_RESPONSIVE_FEATURES = {
"apply_fertilizer_N": ["ndvi_last", "ndre_last", "cire_last", "s2rep_last"],
"irrigate": ["ndvi_last", "ndmi_last"],
"install_drainage": ["ndvi_last", "ndmi_last"],
"subsoil": ["ndvi_last", "ndre_last", "cire_last"],
}
COMPATIBLE_STRESSES = {
("poor_drainage", "waterlogging"),
("waterlogging", "poor_drainage"),
("drought", "compaction"),
("compaction", "drought"),
("nutrient_deficiency", "compaction"),
("compaction", "nutrient_deficiency"),
}
RULE_PRIORITY_STRESSES = {"drought", "nutrient_deficiency"}
SHAP_PRIORITY_STRESSES = {"waterlogging", "compaction", "poor_drainage"}
# z-score feature mapping: raw column name -> classifier key
# Pipeline uses z_NDMI (not z_NDWI)
ZSCORE_FEATURES = {
"ndvi_last": "z_NDVI",
"ndre_last": "z_NDRE",
"ndmi_last": "z_NDMI",
"psri_last": "z_PSRI",
"cire_last": "z_CIre",
"s2rep_last": "z_S2REP",
"clay_0-5": "z_clay_0-5",
"sand_0-5": "z_sand_0-5",
"silt_0-5": "z_silt_0-5",
"soc_0-5": "z_soc_0-5",
"phh2o_0-5": "z_phh2o_0-5",
"cec_0-5": "z_cec_0-5",
"nitrogen_0-5": "z_nitrogen_0-5",
"cfvo_0-5": "z_cfvo_0-5",
"dem": "z_dem",
"slope": "z_slope",
"twi": "z_twi",
}
# ── Data Loading ──────────────────────────────────────────────────────────────
def load_field_data(nc_path, field_id, nc_id):
"""Load field from NetCDF. Returns chunk dict + band_names list."""
ds = nclib.Dataset(str(nc_path), "r")
band_names = [str(b) for b in ds["band"][:]]
fsn_all = ds["field_shared_name"][:]
field_indices = np.where(fsn_all == nc_id)[0]
if len(field_indices) == 0:
ds.close()
raise ValueError(f"Field {field_id} (NC id={nc_id}) not found in dataset")
i0, i1 = field_indices[0], field_indices[-1] + 1
chunk = {
"field_str": field_id,
"field_nc": nc_id,
"sample": np.ma.filled(ds["sample"][i0:i1, :, :], np.nan).astype(np.float32),
"times": ds["times"][i0:i1, :],
"target": np.ma.filled(ds["target"][i0:i1], np.nan).astype(np.float32),
"row": ds["row"][i0:i1],
"col": ds["col"][i0:i1],
}
ds.close()
return chunk, band_names
# ── Feature Engineering ───────────────────────────────────────────────────────
def find_valid_timesteps(sample, band_idx, min_coverage=0.5):
"""Return timestep indices with >50% valid pixels."""
vals = sample[:, :, band_idx]
valid_per_ts = np.sum(~np.isnan(vals), axis=0)
coverage = valid_per_ts / len(vals)
return np.where(coverage > min_coverage)[0]
def extract_features(chunk, band_names):
"""Extract 63 raw features (soil x 6 depths, topo, weather, VIs incl. PSRI)
then PCA-compress to 18 model features. Adds X_raw, X, feature_cols,
pca_info, psri_last to chunk."""
s = chunk["sample"]
B04_IDX = band_names.index("B04")
B03_IDX = band_names.index("B03")
B05_IDX = band_names.index("B05")
B06_IDX = band_names.index("B06")
B07_IDX = band_names.index("B07")
B08_IDX = band_names.index("B08")
B11_IDX = band_names.index("B11")
valid_ts = find_valid_timesteps(s, B04_IDX)
chunk["valid_ts"] = valid_ts
if len(valid_ts) == 0:
raise ValueError(f"No valid timesteps for {chunk['field_str']}")
last_t = valid_ts[-1]
nir = s[:, last_t, B08_IDX]
red = s[:, last_t, B04_IDX]
grn = s[:, last_t, B03_IDX]
re1 = s[:, last_t, B05_IDX]
re2 = s[:, last_t, B06_IDX]
re3 = s[:, last_t, B07_IDX]
swir = s[:, last_t, B11_IDX]
# Static bands
static_bands = {}
for prop in SOIL_PROPS:
for depth in SOIL_DEPTHS:
bname = f"{prop}_{depth}"
if bname in band_names:
static_bands[bname] = band_names.index(bname)
for bname in ["dem", "slope", "aspect", "curvature", "twi"]:
if bname in band_names:
static_bands[bname] = band_names.index(bname)
weather_bands = {}
for bname in ["temp_mean", "temp_max", "temp_min", "total_prec"]:
if bname in band_names:
weather_bands[bname] = band_names.index(bname)
# Build raw features
feat_dict = {}
for bname, bidx in static_bands.items():
feat_dict[bname] = s[:, 0, bidx]
for bname, bidx in weather_bands.items():
feat_dict[f"{bname}_mean"] = np.nanmean(s[:, valid_ts, bidx], axis=1)
feat_dict["ndvi_last"] = (nir - red) / (nir + red + EPS)
feat_dict["ndre_last"] = (nir - re1) / (nir + re1 + EPS)
feat_dict["ndmi_last"] = (nir - swir) / (nir + swir + EPS)
feat_dict["cire_last"] = nir / (re1 + EPS) - 1
feat_dict["s2rep_last"] = 705 + 35 * ((red + re3) / 2 - re1) / (re2 - re1 + EPS)
X_raw = pd.DataFrame(feat_dict).fillna(pd.DataFrame(feat_dict).median())
chunk["X_raw"] = X_raw
# PSRI for classifier (not a model feature)
chunk["psri_last"] = (red - grn) / (re2 + EPS)
# PCA compression
X_pca = X_raw.copy()
pca_info = {}
for group_name, group_cfg in PCA_GROUPS.items():
group_cols = group_cfg["cols"]
n_comp = group_cfg["n_components"]
available = [c for c in group_cols if c in X_pca.columns]
if len(available) < 2:
continue
scaler = StandardScaler()
scaled = scaler.fit_transform(X_pca[available].values)
pca = PCA(n_components=n_comp)
pcs = pca.fit_transform(scaled)
pca_info[group_name] = {
"pca": pca,
"scaler": scaler,
"original_cols": available,
"explained_var": pca.explained_variance_ratio_,
}
X_pca = X_pca.drop(columns=available)
for i in range(n_comp):
X_pca[f"{group_name}_pc{i + 1}"] = pcs[:, i]
X = X_pca.fillna(X_pca.median())
chunk["X"] = X
chunk["feature_cols"] = list(X.columns)
chunk["pca_info"] = pca_info
chunk["band_names"] = band_names
return chunk
# ── Model Training ────────────────────────────────────────────────────────────
def _sim_cache_path(field_id="DE_0100"):
"""Hash key over data file mtime + training/PCA config so cache invalidates
when any of those change."""
cfg = {
"field_id": field_id,
"nc_mtime": NC_PATH.stat().st_mtime if NC_PATH.exists() else 0,
"tabpfn_max_train": TABPFN_MAX_TRAIN,
"pca_groups": {
k: (v["cols"], v["n_components"]) for k, v in PCA_GROUPS.items()
},
"soil_depths": SOIL_DEPTHS,
"soil_props": SOIL_PROPS,
}
h = hashlib.md5(
json.dumps(cfg, sort_keys=True, default=str).encode()
).hexdigest()[:12]
SIM_CACHE_DIR.mkdir(exist_ok=True, parents=True)
return SIM_CACHE_DIR / f"sim_cache_{field_id}_{h}.pkl"
def _load_baseline_from_csv(chunk):
"""Load baseline yield_pred from de_0100_full_field_source.csv, aligned by
(row, col) to the chunk's pixel order. Returns None if the CSV is missing."""
if not INTERVENTION_ZONES_CSV.exists():
return None
src = pd.read_csv(INTERVENTION_ZONES_CSV)
if "yield_pred" not in src.columns:
return None
lookup = {
(int(r), int(c)): float(p)
for r, c, p in zip(src["row"], src["col"], src["yield_pred"])
}
chunk_rows = chunk["row"]
chunk_cols = chunk["col"]
try:
return np.array(
[lookup[(int(r), int(c))] for r, c in zip(chunk_rows, chunk_cols)],
dtype=np.float64,
)
except KeyError:
return None
def train_tabpfn(chunk, field_id="DE_0100"):
"""Train TabPFN (or load from disk cache). 70/30 split,
TabPFNRegressor(n_estimators=4). Baseline y_pred_all comes from the
precomputed source CSV, so no full-field model.predict is needed.
Returns (model, y_pred_all, col_means)."""
cache_path = _sim_cache_path(field_id)
# Try cache hit first
if cache_path.exists():
try:
with open(cache_path, "rb") as f:
cache = pickle.load(f)
model = cache["model"]
col_means = cache["col_means"]
print(f" Cache HIT: {cache_path.name}")
print(
f" TabPFN (cached): RMSE={cache.get('rmse', float('nan')):.3f}, "
f"R2={cache.get('r2', float('nan')):.3f}"
)
except Exception as e:
print(f" Cache load failed ({e}); rebuilding")
cache = None
model = None
else:
print(f" Cache MISS: {cache_path.name}")
cache = None
model = None
if model is None:
X = chunk["X"].values.copy()
y = chunk["target"]
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.3, random_state=42
)
col_means = np.nanmean(X_train, axis=0)
col_means = np.where(np.isnan(col_means), 0, col_means)
for j in range(X.shape[1]):
X_train[np.isnan(X_train[:, j]), j] = col_means[j]
X_test[np.isnan(X_test[:, j]), j] = col_means[j]
model = TabPFNRegressor(
n_estimators=4, device="cpu", ignore_pretraining_limits=True
)
X_tr, y_tr = X_train, y_train
if len(X_tr) > TABPFN_MAX_TRAIN:
idx = np.random.choice(len(X_tr), TABPFN_MAX_TRAIN, replace=False)
X_tr, y_tr = X_train[idx], y_train[idx]
model.fit(X_tr, y_tr)
y_pred_test = model.predict(X_test)
rmse = float(np.sqrt(mean_squared_error(y_test, y_pred_test)))
r2 = float(r2_score(y_test, y_pred_test))
print(f" TabPFN: RMSE={rmse:.3f}, R2={r2:.3f}")
with open(cache_path, "wb") as f:
pickle.dump(
{"model": model, "col_means": col_means, "rmse": rmse, "r2": r2}, f
)
print(f" Saved cache: {cache_path.name}")
del X_train, X_test
gc.collect()
# Baseline: load from precomputed source CSV (no full-field predict).
y_pred_all = _load_baseline_from_csv(chunk)
if y_pred_all is None:
print(" Source CSV unavailable β€” falling back to full-field model.predict")
X_full = chunk["X"].values.copy()
for j in range(X_full.shape[1]):
X_full[np.isnan(X_full[:, j]), j] = col_means[j]
y_pred_all = model.predict(X_full)
return model, y_pred_all, col_means
# ── Challenge Pixel Injection ─────────────────────────────────────────────────
def inject_challenge_pixels(chunk, challenge_info, X_raw_orig, y_pred_all):
"""Copy raw features from worst low-yield templates, apply stress-specific
adjustments, recompute PCA. Returns (X_raw_challenge, X_challenge, X_challenge_arr)."""
y_all = chunk["target"]
rows = chunk["row"]
cols = chunk["col"]
pca_info = chunk["pca_info"]
feature_cols = chunk["feature_cols"]
low_mask = y_all <= np.nanpercentile(y_all, LOW_YIELD_PERCENTILE)
low_indices = np.where(low_mask)[0]
low_yields = y_all[low_mask]
worst_order = np.argsort(low_yields)
template_indices = low_indices[worst_order[: len(challenge_info)]]
print(f" Template pixels (real low-yield):")
for i, ti in enumerate(template_indices):
print(
f" Template {i + 1}: [{rows[ti]},{cols[ti]}], yield={y_all[ti]:.1f} t/ha, "
f"pred={y_pred_all[ti]:.1f} t/ha"
)
X_raw_challenge = X_raw_orig.astype(np.float64).copy()
for ch_i, (info, template_idx) in enumerate(zip(challenge_info, template_indices)):
px_idx = info["pixel_idx"]
stress = info["stress"]
# Copy ALL raw features from template
for col_name in X_raw_challenge.columns:
X_raw_challenge.loc[px_idx, col_name] = X_raw_challenge.loc[
template_idx, col_name
]
# Stress-specific adjustments
if stress == "drought":
X_raw_challenge.loc[px_idx, "ndmi_last"] *= 0.5
for d in SOIL_DEPTHS:
col = f"sand_{d}"
if col in X_raw_challenge.columns:
X_raw_challenge.loc[px_idx, col] = X_raw_orig[col].quantile(0.95)
elif stress == "compaction":
for d in SOIL_DEPTHS:
for col_prefix in ["cfvo", "clay"]:
col = f"{col_prefix}_{d}"
if col in X_raw_challenge.columns:
X_raw_challenge.loc[px_idx, col] = X_raw_orig[col].quantile(
0.95
)
X_raw_challenge.loc[px_idx, "slope"] = X_raw_orig["slope"].quantile(0.05)
elif stress == "nutrient_deficiency":
for d in SOIL_DEPTHS:
for prop in ["nitrogen", "soc", "cec"]:
col = f"{prop}_{d}"
if col in X_raw_challenge.columns:
X_raw_challenge.loc[px_idx, col] = X_raw_orig[col].quantile(
0.05
)
print(
f" [{info['row']},{info['col']}] {stress}: "
f"features from template [{rows[template_idx]},{cols[template_idx]}]"
)
# Recompute PCA features from modified raw features
X_pca_challenge = X_raw_challenge.copy()
for group_name in PCA_GROUPS:
if group_name not in pca_info:
continue
gi = pca_info[group_name]
available = gi["original_cols"]
n_comp = len(gi["explained_var"])
scaled = gi["scaler"].transform(X_pca_challenge[available].values)
pcs = gi["pca"].transform(scaled)
X_pca_challenge = X_pca_challenge.drop(columns=available)
for j in range(n_comp):
X_pca_challenge[f"{group_name}_pc{j + 1}"] = pcs[:, j]
X_challenge = X_pca_challenge.fillna(X_pca_challenge.median())
X_challenge = X_challenge[feature_cols]
X_challenge_arr = X_challenge.values.copy()
col_means_ch = np.nanmean(X_challenge_arr, axis=0)
col_means_ch = np.where(np.isnan(col_means_ch), 0, col_means_ch)
for j in range(X_challenge_arr.shape[1]):
X_challenge_arr[np.isnan(X_challenge_arr[:, j]), j] = col_means_ch[j]
return X_raw_challenge, X_challenge, X_challenge_arr
# ── Stress Classification ────────────────────────────────────────────────────
def classify_stress(z_row, t=ZSCORE_THRESHOLD):
"""Rule-based z-score classifier using z_NDMI (pipeline convention).
5 stress types: drought, waterlogging, nutrient_deficiency, compaction,
poor_drainage."""
def signal(key, direction):
v = z_row.get(key, None)
if v is None or pd.isna(v):
return (False, 0.0, False)
if direction == "+":
return (v > t, v, True)
elif direction == "-":
return (v < -t, v, True)
else:
return (abs(v) > t, v, True)
def score(
dynamic, static, min_dyn_frac=0.5, min_dyn_available=1, static_weight=0.5
):
dyn_available = [s for s in dynamic if s[2]]
if len(dyn_available) < min_dyn_available:
return 0.0, False
dyn_fired = [s for s in dyn_available if s[0]]
if len(dyn_fired) / len(dyn_available) < min_dyn_frac:
return 0.0, False
stat_available = [s for s in static if s[2]]
stat_fired = [s for s in stat_available if s[0]]
dyn_strength = sum(max(0, abs(s[1]) - t) for s in dyn_fired)
stat_strength = sum(max(0, abs(s[1]) - t) for s in stat_fired) * static_weight
max_possible = (len(dyn_available) + len(stat_available) * static_weight) * 2.0
if max_possible <= 0:
return 0.0, False
conf = min(1.0, (dyn_strength + stat_strength) / max_possible)
return conf, True
stresses = []
# DROUGHT β€” uses z_NDMI (pipeline convention)
dyn = [signal("z_NDMI", "-"), signal("z_NDVI", "-")]
stat = [signal("z_sand_0-5", "+")]
conf, fired = score(dyn, stat, min_dyn_frac=0.5)
if fired:
stresses.append(("drought", conf))
# WATERLOGGING
dyn = [signal("z_PSRI", "+"), signal("z_NDMI", "-")]
stat = [
signal("z_twi", "+"),
signal("z_slope", "-"),
signal("z_clay_0-5", "+"),
signal("z_dem", "-"),
]
conf, fired = score(dyn, stat, min_dyn_frac=0.5)
if fired:
stresses.append(("waterlogging", conf))
# NUTRIENT DEFICIENCY
dyn = [
signal("z_NDRE", "-"),
signal("z_CIre", "-"),
signal("z_S2REP", "-"),
signal("z_NDVI", "-"),
]
stat = [
signal("z_nitrogen_0-5", "-"),
signal("z_soc_0-5", "-"),
signal("z_cec_0-5", "-"),
signal("z_phh2o_0-5", "abs"),
]
conf, fired = score(dyn, stat, min_dyn_frac=0.5)
if fired:
stresses.append(("nutrient_deficiency", conf))
# COMPACTION
dyn = [signal("z_PSRI", "+"), signal("z_NDRE", "-")]
stat = [
signal("z_cfvo_0-5", "+"),
signal("z_clay_0-5", "+"),
signal("z_slope", "-"),
]
conf, fired = score(dyn, stat, min_dyn_frac=0.5)
if fired:
stresses.append(("compaction", conf))
# POOR DRAINAGE
dyn = [signal("z_PSRI", "+"), signal("z_NDVI", "-")]
stat = [
signal("z_sand_0-5", "-"),
signal("z_silt_0-5", "+"),
signal("z_clay_0-5", "+"),
]
conf, fired = score(dyn, stat, min_dyn_frac=0.5)
if fired:
stresses.append(("poor_drainage", conf))
if stresses:
stresses.sort(key=lambda x: -x[1])
while len(stresses) < 2:
stresses.append(("none", 0.0))
return stresses[:2], "classified"
dyn_keys = ["z_NDVI", "z_NDRE", "z_NDMI", "z_PSRI", "z_CIre", "z_S2REP"]
available_dyn = [
k for k in dyn_keys if z_row.get(k) is not None and not pd.isna(z_row.get(k))
]
if len(available_dyn) < 2:
return [("unclassified", 0.0), ("none", 0.0)], "insufficient_data"
ndvi = z_row.get("z_NDVI", np.nan)
ndre = z_row.get("z_NDRE", np.nan)
psri = z_row.get("z_PSRI", np.nan)
cire = z_row.get("z_CIre", np.nan)
healthy_markers = 0
if not pd.isna(ndvi) and ndvi > 0.5:
healthy_markers += 1
if not pd.isna(ndre) and ndre > 0.5:
healthy_markers += 1
if not pd.isna(psri) and psri < -0.5:
healthy_markers += 1
if not pd.isna(cire) and cire > 0.5:
healthy_markers += 1
if healthy_markers >= 2:
return [("healthy_low_yield_anomaly", 0.5), ("none", 0.0)], "healthy_anomaly"
return [("unclassified", 0.0), ("none", 0.0)], "no_signal"
def run_stress_classification(chunk, X_raw_to_use):
"""Build diagnostic DataFrame for all low-yield pixels (or challenge pixels),
compute z-scores using field-level median/MAD, run classify_stress().
Returns rule_df."""
X_raw = X_raw_to_use
fid = chunk["field_str"]
n_pix = len(chunk["target"])
psri_last = chunk["psri_last"]
diag_records = []
for i in range(n_pix):
rec = {
"field_id": fid,
"row": int(chunk["row"][i]),
"col": int(chunk["col"][i]),
"yield_tha": float(chunk["target"][i]),
"NDVI": float(X_raw.iloc[i]["ndvi_last"]),
"NDRE": float(X_raw.iloc[i]["ndre_last"]),
"NDMI": float(X_raw.iloc[i]["ndmi_last"]),
"CIre": float(X_raw.iloc[i]["cire_last"]),
"S2REP": float(X_raw.iloc[i]["s2rep_last"]),
"PSRI": float(psri_last[i]),
}
for feat in [
"clay_0-5",
"sand_0-5",
"silt_0-5",
"soc_0-5",
"phh2o_0-5",
"cec_0-5",
"nitrogen_0-5",
"cfvo_0-5",
"dem",
"slope",
"aspect",
"curvature",
"twi",
]:
rec[feat] = float(X_raw.iloc[i].get(feat, np.nan))
diag_records.append(rec)
df = pd.DataFrame(diag_records)
# Low yield identification
field_p25 = (
df.groupby("field_id")["yield_tha"]
.quantile(LOW_YIELD_PERCENTILE / 100)
.rename("yield_p25")
)
df = df.merge(field_p25, on="field_id", how="left")
df["is_low_yield"] = df["yield_tha"] <= df["yield_p25"]
# Z-score computation
diag_features = [
"clay_0-5",
"sand_0-5",
"silt_0-5",
"soc_0-5",
"phh2o_0-5",
"cec_0-5",
"nitrogen_0-5",
"cfvo_0-5",
"dem",
"slope",
"aspect",
"curvature",
"twi",
"NDVI",
"NDRE",
"NDMI",
"PSRI",
"CIre",
"S2REP",
]
diag_features = [f for f in diag_features if f in df.columns]
field_medians = df.groupby("field_id")[diag_features].median()
def compute_mad(x):
return (x - x.median()).abs().median()
field_mads = df.groupby("field_id")[diag_features].apply(compute_mad)
mad_floors = {}
for col in diag_features:
q75, q25 = df[col].quantile([0.75, 0.25])
mad_floors[col] = max((q75 - q25) * 0.01, 1e-6)
low_df = df[df["is_low_yield"]].copy()
for col in diag_features:
med = low_df["field_id"].map(field_medians[col])
mad = low_df["field_id"].map(field_mads[col])
floor = mad_floors[col]
valid_raw = low_df[col].notna() & med.notna()
valid_variance = mad > floor
available = valid_raw & valid_variance
mad_safe = mad.clip(lower=floor)
z = (low_df[col] - med) / (1.4826 * mad_safe)
z = z.clip(-10, 10).where(available, other=np.nan)
low_df[f"z_{col}"] = z
z_cols = [f"z_{c}" for c in diag_features]
# Classify
stress_records = []
for idx, row in low_df.iterrows():
z_dict = {c: row[c] for c in z_cols}
top2, status = classify_stress(z_dict)
primary, secondary = top2[0], top2[1]
co_occurring = (
status == "classified"
and secondary[1] > 0
and secondary[1] >= 0.7 * primary[1]
)
stress_records.append(
{
"idx": idx,
"field_id": row["field_id"],
"row": int(row["row"]),
"col": int(row["col"]),
"yield_tha": row["yield_tha"],
"rule_stress": primary[0],
"rule_confidence": round(primary[1], 3),
"rule_secondary": secondary[0],
"rule_secondary_confidence": round(secondary[1], 3),
"co_occurring": co_occurring,
"rule_status": status,
}
)
return pd.DataFrame(stress_records)
# ── SHAP Cache + Combination (from pipeline) ─────────────────────────────────
def load_shap_cache(cache_path):
"""Load SHAP pkl, normalize confidence. Returns (shap_df, bool).
Graceful fallback to None if not found."""
cache_path = Path(cache_path)
if not cache_path.exists():
return None, False
with open(cache_path, "rb") as f:
shap_cache = pickle.load(f)
shap_df = pd.DataFrame(shap_cache["stress_records"])
p95 = shap_df["shap_magnitude"].quantile(0.95)
norm_denom = p95 if p95 > 0 else shap_df["shap_magnitude"].max()
shap_df["shap_confidence"] = (shap_df["shap_magnitude"] / norm_denom).clip(0, 1)
shap_df = shap_df.rename(columns={"primary_stress": "shap_stress"})
return shap_df, True
def _stress_types_agree(rule_s, shap_s):
"""Check agreement via COMPATIBLE_STRESSES."""
return rule_s == shap_s or (rule_s, shap_s) in COMPATIBLE_STRESSES
def _combine_pixel(row):
"""Full SHAP+rule combination logic with margin uncertainty, domain priority,
secondary support."""
rule_s = row.get("rule_stress", "unclassified") or "unclassified"
rule_c = row.get("rule_confidence", 0.0) or 0.0
rule_status = row.get("rule_status", "no_signal") or "no_signal"
shap_s = row.get("shap_stress", "unclassified") or "unclassified"
shap_c = row.get("shap_confidence", 0.0) or 0.0
scores = row.get("category_scores", {})
if pd.isna(rule_s):
rule_s = "unclassified"
if pd.isna(shap_s):
shap_s = "unclassified"
rule_classified = rule_status == "classified"
shap_classified = shap_s != "unclassified" and shap_c >= SHAP_MIN_MAGNITUDE
# SHAP margin uncertainty
shap_margin = 0.0
shap_uncertain = False
if isinstance(scores, dict) and len(scores) >= 2:
sorted_scores = sorted(scores.values())
best_score = sorted_scores[0]
second_score = sorted_scores[1]
if best_score < 0:
shap_margin = abs(best_score - second_score) / (abs(best_score) + 1e-9)
shap_uncertain = shap_margin < SHAP_MARGIN_THRESHOLD
if shap_uncertain:
shap_classified = False
if rule_classified and shap_classified and _stress_types_agree(rule_s, shap_s):
return {
"combined_stress": rule_s,
"combined_confidence": min(1.0, max(rule_c, shap_c) + AGREE_BOOST),
"combination_tier": "high",
"combination_reason": "both_agree",
"shap_margin": round(shap_margin, 3),
}
if rule_classified and shap_classified:
if isinstance(scores, dict) and len(scores) >= 2:
sorted_cats = sorted(scores.items(), key=lambda x: x[1])
shap_second = sorted_cats[1][0] if len(sorted_cats) >= 2 else None
if shap_second and _stress_types_agree(rule_s, shap_second):
return {
"combined_stress": rule_s,
"combined_confidence": rule_c * 0.85,
"combination_tier": "medium",
"combination_reason": "shap_secondary_supports_rule",
"shap_margin": round(shap_margin, 3),
}
if rule_s in RULE_PRIORITY_STRESSES:
winner = rule_s
elif shap_s in SHAP_PRIORITY_STRESSES:
winner = shap_s
else:
winner = rule_s if rule_c >= shap_c else shap_s
return {
"combined_stress": winner,
"combined_confidence": max(0.1, max(rule_c, shap_c) - DISAGREE_PENALTY),
"combination_tier": "low",
"combination_reason": f"disagree_rule={rule_s}_shap={shap_s}",
"shap_margin": round(shap_margin, 3),
}
if not rule_classified and shap_classified:
return {
"combined_stress": shap_s,
"combined_confidence": shap_c * SHAP_RESCUE_DISCOUNT,
"combination_tier": "medium",
"combination_reason": "shap_rescue",
"shap_margin": round(shap_margin, 3),
}
if rule_classified and not shap_classified:
return {
"combined_stress": rule_s,
"combined_confidence": rule_c * RULE_ONLY_DISCOUNT,
"combination_tier": "medium",
"combination_reason": "rule_only",
"shap_margin": round(shap_margin, 3),
}
return {
"combined_stress": rule_s,
"combined_confidence": 0.0,
"combination_tier": "unresolved",
"combination_reason": "neither_classified",
"shap_margin": round(shap_margin, 3),
}
def combine_classifications(rule_df, shap_df):
"""Merge rule-based and SHAP classifications per pixel and apply
_combine_pixel."""
merged = rule_df.merge(
shap_df[
[
"field_id",
"row",
"col",
"shap_stress",
"shap_magnitude",
"shap_confidence",
"category_scores",
]
],
on=["field_id", "row", "col"],
how="outer",
)
combined_cols = merged.apply(_combine_pixel, axis=1, result_type="expand")
result = pd.concat([merged, combined_cols], axis=1)
result["recommended_action"] = (
result["combined_stress"].map(ACTION_MAP).fillna("inspect_manually")
)
return result
def _rule_only_fallback(rule_df):
"""Wrap rule_df with combined_* columns when no SHAP cache is available."""
df = rule_df.copy()
df["combined_stress"] = df["rule_stress"]
df["combined_confidence"] = df["rule_confidence"]
df["combination_tier"] = "medium"
df["combination_reason"] = "rule_only"
df["shap_stress"] = np.nan
df["shap_magnitude"] = np.nan
df["shap_confidence"] = np.nan
df["recommended_action"] = (
df["combined_stress"].map(ACTION_MAP).fillna("inspect_manually")
)
return df
# ── Challenge Classification ─────────────────────────────────────────────────
def classify_challenge_pixels(challenge_info, X_raw_challenge, chunk):
"""Run combined SHAP+rule classification specifically on challenge pixels.
Computes z-scores directly for challenge pixel indices (bypassing the
low-yield filter, since challenge pixels have high original yields but
transplanted stress features). Also runs full-field classification via
run_stress_classification for non-challenge low-yield pixels, then
combines with SHAP cache if available."""
if not challenge_info:
print(" No challenge pixels to classify.")
# Still run full-field classification for intervention pixels
rule_df = run_stress_classification(chunk, X_raw_challenge)
shap_df, shap_available = load_shap_cache(SHAP_CACHE_PATH)
if shap_available:
print(
f" SHAP cache loaded: {SHAP_CACHE_PATH.name} ({len(shap_df)} pixels)"
)
return combine_classifications(rule_df, shap_df)
else:
print(f" No SHAP cache at {SHAP_CACHE_PATH} β€” rule-based only.")
return _rule_only_fallback(rule_df)
print("Running stress classification...")
# 1. Full-field rule-based classification (covers low-yield intervention pixels)
rule_df = run_stress_classification(chunk, X_raw_challenge)
# 2. Direct z-score classification for challenge pixels
# These pixels have high original yields but transplanted stress features,
# so they won't appear in the is_low_yield subset.
fid = chunk["field_str"]
challenge_pixel_indices = {info["pixel_idx"] for info in challenge_info}
# Compute field-level medians and MADs from modified raw features
field_medians = {}
field_mads = {}
mad_floors = {}
for raw_col in ZSCORE_FEATURES:
if raw_col == "psri_last":
vals = pd.Series(chunk["psri_last"])
elif raw_col not in X_raw_challenge.columns:
continue
else:
vals = X_raw_challenge[raw_col]
med = vals.median()
mad = (vals - med).abs().median()
q75, q25 = vals.quantile([0.75, 0.25])
floor = max((q75 - q25) * 0.01, 1e-6)
field_medians[raw_col] = med
field_mads[raw_col] = mad
mad_floors[raw_col] = floor
# PSRI is stored separately in chunk, not in X_raw
psri_last = chunk["psri_last"]
challenge_rule_records = []
for info in challenge_info:
px_idx = info["pixel_idx"]
z_dict = {}
for raw_col, z_key in ZSCORE_FEATURES.items():
if raw_col == "psri_last":
val = float(psri_last[px_idx])
elif raw_col not in X_raw_challenge.columns:
z_dict[z_key] = np.nan
continue
else:
val = X_raw_challenge.iloc[px_idx][raw_col]
med = field_medians[raw_col]
mad = field_mads[raw_col]
floor = mad_floors[raw_col]
if pd.isna(val) or pd.isna(med):
z_dict[z_key] = np.nan
continue
mad_safe = max(mad, floor)
z = (val - med) / (1.4826 * mad_safe)
z_dict[z_key] = float(np.clip(z, -10, 10))
top2, status = classify_stress(z_dict)
primary, secondary = top2[0], top2[1]
co_occurring = (
status == "classified"
and secondary[1] > 0
and secondary[1] >= 0.7 * primary[1]
)
challenge_rule_records.append(
{
"field_id": fid,
"row": info["row"],
"col": info["col"],
"yield_tha": info["original_yield"],
"rule_stress": primary[0],
"rule_confidence": round(primary[1], 3),
"rule_secondary": secondary[0],
"rule_secondary_confidence": round(secondary[1], 3),
"co_occurring": co_occurring,
"rule_status": status,
}
)
challenge_rule_df = pd.DataFrame(challenge_rule_records)
# 3. Merge challenge-pixel rule results into full rule_df
# Remove any duplicate rows for challenge pixels that might already be
# in rule_df (unlikely, but safe)
if len(rule_df) > 0:
challenge_coords = set((info["row"], info["col"]) for info in challenge_info)
rule_df = rule_df[
~rule_df.apply(
lambda r: (int(r["row"]), int(r["col"])) in challenge_coords, axis=1
)
]
rule_df = pd.concat([challenge_rule_df, rule_df], ignore_index=True)
# 4. Load SHAP cache and combine
shap_df, shap_available = load_shap_cache(SHAP_CACHE_PATH)
if shap_available:
print(f" SHAP cache loaded: {SHAP_CACHE_PATH.name} ({len(shap_df)} pixels)")
stress_df = combine_classifications(rule_df, shap_df)
n_high = (stress_df["combination_tier"] == "high").sum()
n_rescued = (stress_df["combination_reason"] == "shap_rescue").sum()
print(f" Combined: high={n_high} | rescued={n_rescued}")
else:
print(f" No SHAP cache at {SHAP_CACHE_PATH} β€” rule-based only.")
stress_df = _rule_only_fallback(rule_df)
# 5. Print classification results for challenge pixels
print(
f"\n {'Pixel':<12} {'Expected':<22} {'Classified':<22} "
f"{'Tier':<10} {'Conf':>6} {'Reason'}"
)
print(f" {'-' * 90}")
for info in challenge_info:
row_match = stress_df[
(stress_df["row"] == info["row"]) & (stress_df["col"] == info["col"])
]
if len(row_match) > 0:
r = row_match.iloc[0]
classified = r.get("combined_stress", "unclassified")
tier = r.get("combination_tier", "n/a")
conf = r.get("combined_confidence", 0.0)
reason = r.get("combination_reason", "n/a")
info["classified_stress"] = classified
info["classified_conf"] = conf
info["classified_action"] = ACTION_MAP.get(classified, "inspect_manually")
else:
info["classified_stress"] = "unclassified"
info["classified_conf"] = 0.0
info["classified_action"] = "inspect_manually"
tier, reason = "n/a", "no_match"
print(
f" [{info['row']:>2},{info['col']:>2}] {info['stress']:<22} "
f"{info['classified_stress']:<22} {tier:<10} {info['classified_conf']:>5.3f} {reason}"
)
return stress_df
# ── Intervention Loading ──────────────────────────────────────────────────────
def load_interventions(interventions_csv, rows, cols):
"""Read CSV, match (row,col) to pixel indices. Returns
(selected_indices, selected_actions, selected_stresses)."""
df = pd.read_csv(interventions_csv)
print(f" Loaded {len(df)} intervention pixels from {Path(interventions_csv).name}")
action_to_stress = {v: k for k, v in ACTION_MAP.items()}
selected_indices = []
selected_actions = []
selected_stresses = []
for _, irow in df.iterrows():
mask = (rows == irow["row"]) & (cols == irow["col"])
idx = np.where(mask)[0]
if len(idx) == 0:
print(
f" WARNING: pixel [{irow['row']},{irow['col']}] not found, skipping"
)
continue
selected_indices.append(idx[0])
selected_actions.append(irow["action"])
selected_stresses.append(action_to_stress.get(irow["action"], "unclassified"))
selected_indices = (
np.array(selected_indices) if selected_indices else np.array([], dtype=int)
)
print(f" Matched {len(selected_indices)} intervention pixels")
return selected_indices, selected_actions, selected_stresses
# ── Combine Interventions ────────────────────────────────────────────────────
def combine_interventions(
challenge_info, selected_indices, selected_actions, selected_stresses
):
"""Merge treated challenges + robot interventions. Returns
(all_indices, all_actions, all_stresses, all_is_challenge)."""
treated = [info for info in challenge_info if info["treated"]]
untreated = [info for info in challenge_info if not info["treated"]]
if untreated:
print(f" Untreated challenge pixels (no recovery):")
for info in untreated:
print(f" [{info['row']},{info['col']}] {info['stress']}")
all_indices = np.concatenate(
[
np.array([info["pixel_idx"] for info in treated])
if treated
else np.array([], dtype=int),
selected_indices if len(selected_indices) > 0 else np.array([], dtype=int),
]
).astype(int)
all_actions = [info["action"] for info in treated] + list(selected_actions)
all_stresses = [
info.get("classified_stress", info["stress"]) for info in treated
] + list(selected_stresses)
all_is_challenge = [True] * len(treated) + [False] * len(selected_indices)
print(
f" Combined: {len(all_indices)} pixels "
f"({len(treated)} treated challenges + {len(selected_indices)} robot interventions)"
)
return all_indices, all_actions, all_stresses, all_is_challenge
# ── Recovery Simulation ──────────────────────────────────────────────────────
def simulate_recovery(
X_challenge_arr, feature_cols, all_indices, all_actions, y_all, X_all_baseline
):
"""60% shift of responsive VIs toward healthy-pixel median.
Returns X_post (modified copy of X_challenge_arr)."""
low_mask = y_all <= np.nanpercentile(y_all, LOW_YIELD_PERCENTILE)
healthy_medians = {}
for i, col_name in enumerate(feature_cols):
healthy_vals = X_all_baseline[~low_mask, i]
healthy_medians[col_name] = np.median(healthy_vals)
X_post = X_challenge_arr.copy()
for sel_i, px_idx in enumerate(all_indices):
action = all_actions[sel_i]
responsive = ACTION_RESPONSIVE_FEATURES.get(action, [])
for feat_name in responsive:
if feat_name not in feature_cols:
continue
feat_idx = feature_cols.index(feat_name)
current_val = X_post[px_idx, feat_idx]
target_val = healthy_medians[feat_name]
X_post[px_idx, feat_idx] = current_val + RECOVERY_FRACTION * (
target_val - current_val
)
return X_post
# ── Prediction & Comparison ──────────────────────────────────────────────────
def predict_and_compare(
model,
X_challenge_arr,
X_post,
all_indices,
all_actions,
all_stresses,
all_is_challenge,
y_all,
rows,
cols,
y_pred_all,
challenge_indices=None,
):
"""TabPFN predict on post-intervention features, compute yield improvements.
Only the changed pixels (challenges + interventions) are passed to
model.predict; the unchanged remainder is patched in from y_pred_all.
Returns (comparison_df, y_pred_before_full, y_pred_after_full).
"""
# "Before" = baseline with challenge stress applied. Patch challenge
# pixels (where features differ from baseline) into a copy of y_pred_all.
y_pred_before = y_pred_all.astype(np.float64).copy()
if challenge_indices is not None and len(challenge_indices) > 0:
ch_idx = np.asarray(challenge_indices, dtype=int)
y_pred_before[ch_idx] = model.predict(X_challenge_arr[ch_idx])
# "After" = before + intervention recovery. Patch intervention pixels.
y_pred_after = y_pred_before.copy()
if len(all_indices) > 0:
ai = np.asarray(all_indices, dtype=int)
y_pred_after[ai] = model.predict(X_post[ai])
comparison = []
for sel_i, px_idx in enumerate(all_indices):
pred_before = y_pred_before[px_idx]
pred_after = y_pred_after[px_idx]
comparison.append(
{
"row": int(rows[px_idx]),
"col": int(cols[px_idx]),
"is_challenge": all_is_challenge[sel_i],
"action": all_actions[sel_i],
"stress": all_stresses[sel_i],
"yield_actual": float(y_all[px_idx]),
"yield_pred_before": float(pred_before),
"yield_pred_after": float(pred_after),
"yield_improvement": float(pred_after - pred_before),
"pct_improvement": float(
(pred_after - pred_before) / max(pred_before, 0.1) * 100
),
}
)
return pd.DataFrame(comparison), y_pred_before, y_pred_after
# ── Plotting ──────────────────────────────────────────────────────────────────
def pixels_to_image(rows, cols, values):
"""Helper: pixel coords to 2D array. Returns (img, r_min, c_min)."""
r_min, c_min = rows.min(), cols.min()
h = rows.max() - r_min + 1
w = cols.max() - c_min + 1
img = np.full((h, w), np.nan)
img[rows - r_min, cols - c_min] = values
return img, r_min, c_min
def plot_simulation_map(
chunk,
y_pred_before,
y_pred_after,
challenge_info,
selected_indices,
all_indices,
output_path,
):
"""3-panel figure: before (RdYlGn), after (RdYlGn), delta (RdBu).
Challenge=circles, intervention=crosses."""
r_arr = chunk["row"]
c_arr = chunk["col"]
y_all = chunk["target"]
vmin = 0
vmax = max(float(y_all.max()), float(y_pred_after.max()))
_, r_min, c_min = pixels_to_image(r_arr, c_arr, y_pred_before)
fig, axes = plt.subplots(1, 3, figsize=(18, 6))
for panel_idx, (img_data, title) in enumerate(
[
(y_pred_before, "Before Intervention"),
(y_pred_after, "After Intervention"),
]
):
img, _, _ = pixels_to_image(r_arr, c_arr, img_data)
im = axes[panel_idx].imshow(img, cmap="RdYlGn", vmin=vmin, vmax=vmax)
axes[panel_idx].set_title(title, fontsize=11)
axes[panel_idx].axis("off")
for info in challenge_info:
axes[panel_idx].plot(
info["col"] - c_min,
info["row"] - r_min,
"ko",
markersize=12,
markerfacecolor="none",
markeredgewidth=2.5,
)
for px_idx in selected_indices:
axes[panel_idx].plot(
c_arr[px_idx] - c_min,
r_arr[px_idx] - r_min,
"kx",
markersize=8,
markeredgewidth=2,
)
plt.colorbar(im, ax=axes[panel_idx], shrink=0.7, label="t/ha")
# Panel 3: Yield improvement (delta)
diff = y_pred_after - y_pred_before
img_diff, _, _ = pixels_to_image(r_arr, c_arr, diff)
max_abs = max(abs(float(np.nanmin(img_diff))), abs(float(np.nanmax(img_diff))), 0.5)
im3 = axes[2].imshow(img_diff, cmap="RdBu", vmin=-max_abs, vmax=max_abs)
axes[2].set_title("Yield Change (After - Before)", fontsize=11)
axes[2].axis("off")
for info in challenge_info:
axes[2].plot(
info["col"] - c_min,
info["row"] - r_min,
"ko",
markersize=12,
markerfacecolor="none",
markeredgewidth=2.5,
)
for px_idx in selected_indices:
axes[2].plot(
c_arr[px_idx] - c_min,
r_arr[px_idx] - r_min,
"kx",
markersize=8,
markeredgewidth=2,
)
plt.colorbar(im3, ax=axes[2], shrink=0.7, label="\u0394 t/ha")
n_chal = len(challenge_info)
n_orig = len(selected_indices)
plt.tight_layout()
fig.savefig(output_path, dpi=150, bbox_inches="tight")
plt.close(fig)
# ── Orchestrator ──────────────────────────────────────────────────────────────
def run_simulation(
field_id="DE_0100",
nc_id=100,
seed=42,
):
"""Orchestrator calling all steps, printing progress."""
np.random.seed(seed)
RESULTS_DIR.mkdir(exist_ok=True)
# 1. Load field data from NetCDF
print(f"[1/10] Loading field {field_id} (nc_id={nc_id})...")
chunk, band_names = load_field_data(NC_PATH, field_id, nc_id)
print(f" {len(chunk['target'])} pixels, yield={chunk['target'].mean():.1f} t/ha")
# 2. Extract features (raw + PCA)
print("[2/10] Extracting features...")
chunk = extract_features(chunk, band_names)
print(f" {len(chunk['X_raw'].columns)} raw -> {chunk['X'].shape[1]} after PCA")
# 3. Train TabPFN model
print("[3/10] Training TabPFN model...")
model, y_pred_all, col_means = train_tabpfn(chunk)
print(
f" All-pixel predictions: mean={y_pred_all.mean():.2f}, std={y_pred_all.std():.2f}"
)
y_all = chunk["target"]
rows = chunk["row"]
cols = chunk["col"]
feature_cols = chunk["feature_cols"]
X_raw_orig = chunk["X_raw"].copy()
# Baseline feature array (for healthy-pixel stats)
X_all_baseline = chunk["X"].values.copy()
for j in range(X_all_baseline.shape[1]):
X_all_baseline[np.isnan(X_all_baseline[:, j]), j] = col_means[j]
# 4. Load challenges CSV
print("[4/10] Loading challenges...")
if not CHALLENGES_CSV.exists():
print(
f" No challenges file at {CHALLENGES_CSV} β€” skipping challenge injection."
)
challenge_info = []
else:
challenges_df = pd.read_csv(CHALLENGES_CSV)
print(f" Loaded {len(challenges_df)} challenge pixels")
challenge_info = []
for _, crow in challenges_df.iterrows():
mask = (rows == crow["row"]) & (cols == crow["col"])
idx = np.where(mask)[0]
if len(idx) == 0:
print(
f" WARNING: pixel [{crow['row']},{crow['col']}] not found, skipping"
)
continue
px_idx = idx[0]
treated = str(crow.get("treated", "true")).lower() == "true"
challenge_info.append(
{
"pixel_idx": px_idx,
"row": int(crow["row"]),
"col": int(crow["col"]),
"stress": crow["challenge"],
"action": ACTION_MAP.get(crow["challenge"], "inspect_manually"),
"treated": treated,
"original_yield": float(y_all[px_idx]),
}
)
print(
f" [{crow['row']},{crow['col']}] {crow['challenge']}, treated={treated}"
)
# 5. Inject challenge pixels
if challenge_info:
print("[5/10] Injecting challenge pixels...")
X_raw_challenge, X_challenge, X_challenge_arr = inject_challenge_pixels(
chunk, challenge_info, X_raw_orig, y_pred_all
)
else:
print("[5/10] No challenges to inject β€” using original features.")
X_raw_challenge = X_raw_orig.copy()
X_challenge_arr = X_all_baseline.copy()
# 6. Run SHAP+rule-based stress classification on challenge pixels
print("[6/10] Running stress classification...")
stress_df = classify_challenge_pixels(challenge_info, X_raw_challenge, chunk)
# 7. Load interventions CSV
print("[7/10] Loading interventions...")
if not INTERVENTIONS_CSV.exists():
print(f" No interventions file at {INTERVENTIONS_CSV} β€” skipping.")
selected_indices = np.array([], dtype=int)
selected_actions = []
selected_stresses = []
else:
selected_indices, selected_actions, selected_stresses = load_interventions(
INTERVENTIONS_CSV, rows, cols
)
# 8. Combine treated challenges + robot interventions
print("[8/10] Combining interventions...")
if len(challenge_info) == 0 and len(selected_indices) == 0:
print(" WARNING: No challenges and no interventions β€” nothing to simulate.")
comp_df = pd.DataFrame(
columns=[
"row",
"col",
"is_challenge",
"action",
"stress",
"yield_actual",
"yield_pred_before",
"yield_pred_after",
"yield_improvement",
"pct_improvement",
]
)
comp_df.to_csv(OUTPUT_CSV, index=False)
print(f" Saved empty: {OUTPUT_CSV}")
# Still generate a baseline map (before == after, zero delta).
# No challenges and no interventions => features unchanged => baseline = y_pred_all.
y_pred_baseline = y_pred_all
plot_simulation_map(
chunk,
y_pred_baseline,
y_pred_baseline,
challenge_info,
selected_indices,
np.array([], dtype=int),
OUTPUT_PNG,
)
print(f"Saved baseline map: {OUTPUT_PNG}")
return
all_indices, all_actions, all_stresses, all_is_challenge = combine_interventions(
challenge_info, selected_indices, selected_actions, selected_stresses
)
# 9. Simulate post-intervention recovery
print("[9/10] Simulating recovery...")
X_post = simulate_recovery(
X_challenge_arr, feature_cols, all_indices, all_actions, y_all, X_all_baseline
)
print(f" Recovery fraction: {RECOVERY_FRACTION:.0%}")
# 10. Predict post-intervention yields & save
print("[10/10] Predicting and saving...")
challenge_indices = np.array(
[info["pixel_idx"] for info in challenge_info], dtype=int
)
comp_df, y_pred_before, y_pred_after = predict_and_compare(
model,
X_challenge_arr,
X_post,
all_indices,
all_actions,
all_stresses,
all_is_challenge,
y_all,
rows,
cols,
y_pred_all,
challenge_indices=challenge_indices,
)
# Print results
n_chal = comp_df["is_challenge"].sum()
n_orig = len(comp_df) - n_chal
print(f"\n{'=' * 90}")
print(f"INTERVENTION SIMULATION RESULTS ({n_chal} challenge + {n_orig} original)")
print(f"{'=' * 90}")
print(
f"\n{'Type':<6} {'Row':<5} {'Col':<5} {'Stress':<22} {'Action':<22} "
f"{'Before':>7} {'After':>7} {'Gain':>7} {'%':>6}"
)
print(f"{'-' * 90}")
for _, r in comp_df.iterrows():
label = "CHAL" if r["is_challenge"] else "orig"
print(
f"{label:<6} {int(r['row']):<5} {int(r['col']):<5} {r['stress']:<22} "
f"{r['action']:<22} {r['yield_pred_before']:>7.2f} {r['yield_pred_after']:>7.2f} "
f"{r['yield_improvement']:>+7.2f} {r['pct_improvement']:>+5.1f}%"
)
chal_df = comp_df[comp_df["is_challenge"]]
orig_df = comp_df[~comp_df["is_challenge"]]
if len(chal_df) > 0:
print(
f"\n{'CHALLENGE (' + str(n_chal) + ')':<54} "
f"{chal_df['yield_pred_before'].mean():>7.2f} "
f"{chal_df['yield_pred_after'].mean():>7.2f} "
f"{chal_df['yield_improvement'].mean():>+7.2f} "
f"{chal_df['pct_improvement'].mean():>+5.1f}%"
)
if len(orig_df) > 0:
print(
f"{'ORIGINAL (' + str(n_orig) + ')':<54} "
f"{orig_df['yield_pred_before'].mean():>7.2f} "
f"{orig_df['yield_pred_after'].mean():>7.2f} "
f"{orig_df['yield_improvement'].mean():>+7.2f} "
f"{orig_df['pct_improvement'].mean():>+5.1f}%"
)
print(f"\nField mean yield: {y_all.mean():.2f} t/ha")
# Save CSV
comp_df.to_csv(OUTPUT_CSV, index=False)
print(f"\nSaved: {OUTPUT_CSV}")
# Generate map (reuse arrays from predict_and_compare β€” no extra predict needed).
plot_simulation_map(
chunk,
y_pred_before,
y_pred_after,
challenge_info,
selected_indices,
all_indices,
OUTPUT_PNG,
)
print(f"Saved: {OUTPUT_PNG}")
print("\nDone.")
# ── Entry Point ───────────────────────────────────────────────────────────────
if __name__ == "__main__":
run_simulation()