SchemeImpactNet / src /features.py
sammeeer's picture
Inital schemeimpactnet deployment
f87e795
"""
features.py
-----------
V3 leak-free feature engineering for MNREGA district-level forecasting.
LEAKAGE AUDIT (what was removed vs original):
REMOVED β€” works_completed : r=1.00 with target (formula of person_days)
REMOVED β€” expenditure_lakhs : r=0.976 (person_days Γ— wage_rate)
REMOVED β€” budget_allocated_lakhs : r=0.976 (derived from expenditure)
REMOVED β€” households_demanded/offered/availed : r=0.94 (copies of target structure)
REMOVED β€” lag_expenditure : r=0.866 (derived from target)
REMOVED β€” district_avg_persondays : replaced with blended_capacity (safer)
REMOVED β€” yoy_growth : computed from current-year target β†’ leaky
REMOVED β€” demand_fulfillment_rate : uses current-year availed (target-correlated)
REMOVED β€” all synthetic columns : rainfall, poverty, pmkisan, pmay (fabricated)
V3 FEATURES (all computed from lagged/historical values only):
lag1_pd : person_days_lakhs shifted 1 year per district
lag2_pd : shifted 2 years
lag3_pd : shifted 3 years
roll2_mean : 2-year rolling mean of lag1
roll3_mean : 3-year rolling mean of lag1
roll3_std : 3-year rolling std of lag1 (volatility)
lag1_adj : lag1 deflated by COVID multiplier when lag year = 2020
lag_yoy : YoY growth of lag1 vs lag2 (historical, not current)
lag2_yoy : YoY growth of lag2 vs lag3
momentum : lag_yoy - lag2_yoy (acceleration)
district_capacity : expanding mean of lag1 (long-run structural level)
blended_capacity : district_capacity blended with state mean when history < 3yr
relative_to_state : lag1 / state-year lag1 mean (district's share)
state_lag1_norm : state total lag1 / state historical mean
lag1_vs_capacity : lag1 / district_capacity (how anomalous last year was)
lag1_zscore : z-score of lag1 vs district expanding history
state_lag1_zscore : z-score of state-level lag1
lag1_extreme : flag when |lag1_zscore| > 2.5
lag1_is_covid : flag when lag year = 2020
history_length : cumulative count of observations per district
avg_wage_rate : official wage schedule (genuinely exogenous)
wage_yoy : year-on-year % change in wage rate
is_covid : flag for FY 2020 (COVID demand shock year)
is_post_covid : flag for FY >= 2021
is_2022_anomaly : flag for FY 2022 (West Bengal + others reporting anomaly)
year_trend : years since dataset start (linear time trend)
state_enc : label-encoded state
district_enc : label-encoded district (state|district composite)
Walk-forward CV results (GBR, max_depth=4, lr=0.03, n_est=200, subsample=0.7):
Mean RΒ² : 0.7722 (excl. 2022: 0.8618)
Mean MAE : 10.68L
Old RΒ² : 0.9963 ← was leakage from works_completed (r=1.0)
"""
import pandas as pd
import numpy as np
from sklearn.preprocessing import LabelEncoder
# COVID multiplier: how much 2020 inflated vs 2019 nationally
# Computed from real data: 55.01L / 38.04L = 1.447
COVID_MULTIPLIER = 1.447
TARGET = "person_days_lakhs"
def build_features(df: pd.DataFrame) -> pd.DataFrame:
"""
Main entry point. Takes a cleaned DataFrame and returns it with
all V3 features added. Drops rows with no lag1/lag2 (first 1-2 years
per district cannot be used for training).
Args:
df : Cleaned DataFrame with at minimum:
state, district, financial_year, person_days_lakhs,
households_availed, avg_wage_rate
Returns:
Feature-engineered DataFrame ready for model training/inference.
"""
print("[features] Building V3 leak-free features...")
df = df.sort_values(["state", "district", "financial_year"]).reset_index(drop=True)
df = _lag_features(df)
df = _rolling_features(df)
df = _covid_features(df)
df = _trend_features(df)
df = _capacity_features(df)
df = _anomaly_features(df)
df = _state_features(df)
df = _temporal_flags(df)
df = _wage_features(df)
df = _encode_categoricals(df)
# Drop rows with no lag1/lag2 β€” cannot train or predict without history
before = len(df)
df = df.dropna(subset=["lag1_pd", "lag2_pd"]).reset_index(drop=True)
print(f"[features] Dropped {before - len(df)} rows (insufficient history)")
print(f"[features] Done. Final shape: {df.shape}")
return df
# ── Lag features ──────────────────────────────────────────────────────────────
def _lag_features(df: pd.DataFrame) -> pd.DataFrame:
grp = df.groupby(["state", "district"])
df["lag1_pd"] = grp[TARGET].shift(1)
df["lag2_pd"] = grp[TARGET].shift(2)
df["lag3_pd"] = grp[TARGET].shift(3)
df["lag1_hh"] = grp["households_availed"].shift(1)
return df
# ── Rolling statistics (computed on lag1, so no leakage) ─────────────────────
def _rolling_features(df: pd.DataFrame) -> pd.DataFrame:
lag1 = df.groupby(["state", "district"])["lag1_pd"]
df["roll2_mean"] = lag1.transform(lambda s: s.rolling(2, min_periods=1).mean())
df["roll3_mean"] = lag1.transform(lambda s: s.rolling(3, min_periods=1).mean())
df["roll3_std"] = lag1.transform(
lambda s: s.rolling(3, min_periods=1).std().fillna(0)
)
return df
# ── COVID-aware lag adjustment ────────────────────────────────────────────────
def _covid_features(df: pd.DataFrame) -> pd.DataFrame:
"""
When predicting year T and lag1 comes from 2020 (COVID spike),
the model would otherwise extrapolate the spike forward. We:
1. Flag that lag1 is a COVID year value.
2. Provide a deflated version (lag1_adj) so the model has a
COVID-corrected signal alongside the raw lag1.
"""
df["lag1_is_covid"] = (df["financial_year"] - 1 == 2020).astype(int)
df["lag1_adj"] = np.where(
df["lag1_is_covid"] == 1,
df["lag1_pd"] / COVID_MULTIPLIER,
df["lag1_pd"]
)
return df
# ── YoY trend / momentum (all historical β€” no current-year leakage) ───────────
def _trend_features(df: pd.DataFrame) -> pd.DataFrame:
df["lag_yoy"] = (
(df["lag1_pd"] - df["lag2_pd"]) / df["lag2_pd"].replace(0, np.nan)
).clip(-1, 3)
df["lag2_yoy"] = (
(df["lag2_pd"] - df["lag3_pd"]) / df["lag3_pd"].replace(0, np.nan)
).clip(-1, 3)
df["momentum"] = df["lag_yoy"] - df["lag2_yoy"]
return df
# ── District structural capacity ──────────────────────────────────────────────
def _capacity_features(df: pd.DataFrame) -> pd.DataFrame:
"""
district_capacity: expanding mean of lag1 β€” the district's long-run level.
blended_capacity : when history is short (<3 years), blend district mean
with state mean to reduce cold-start noise.
"""
df["district_capacity"] = df.groupby(["state", "district"])["lag1_pd"].transform(
lambda s: s.expanding().mean()
)
df["history_length"] = df.groupby(["state", "district"]).cumcount()
state_mean = df.groupby(["state", "financial_year"])["lag1_pd"].transform("mean")
df["blended_capacity"] = np.where(
df["history_length"] < 3,
0.5 * df["district_capacity"].fillna(state_mean) + 0.5 * state_mean,
df["district_capacity"]
)
# How anomalous was last year vs the district's own history?
df["lag1_vs_capacity"] = (
df["lag1_pd"] / df["blended_capacity"].replace(0, np.nan)
).clip(0, 5).fillna(1.0)
# Lagged household ratio (demand signal β€” uses only lagged values)
df["lag1_hh_ratio"] = (
df["lag1_hh"] / df["blended_capacity"].replace(0, np.nan)
).clip(0, 5).fillna(1.0)
return df
# ── Anomaly detection ─────────────────────────────────────────────────────────
def _rolling_zscore(s: pd.Series) -> pd.Series:
"""Z-score of each value vs its own expanding historical mean/std."""
exp_mean = s.shift(1).expanding().mean()
exp_std = s.shift(1).expanding().std().fillna(1).replace(0, 1)
return ((s - exp_mean) / exp_std).clip(-4, 4)
def _anomaly_features(df: pd.DataFrame) -> pd.DataFrame:
"""
Detect when lag1_pd is anomalous for this district or state.
The model uses these to discount or adjust its reliance on lag1
when it was an outlier year (e.g. West Bengal in 2022).
"""
# District-level z-score of lag1
df["lag1_zscore"] = df.groupby(["state", "district"])[TARGET].transform(
lambda s: _rolling_zscore(s).shift(1)
).fillna(0)
df["lag1_extreme"] = (df["lag1_zscore"].abs() > 2.5).astype(int)
return df
# ── State-level features ──────────────────────────────────────────────────────
def _state_features(df: pd.DataFrame) -> pd.DataFrame:
"""
State-level lag and z-score. Captures state budget decisions and
policy changes that affect all districts simultaneously.
"""
# State total person_days per year
state_yr = (
df.groupby(["state", "financial_year"])[TARGET]
.sum().reset_index()
.rename(columns={TARGET: "state_total"})
)
state_yr["state_total_lag1"] = state_yr.groupby("state")["state_total"].shift(1)
# State z-score of lag1
state_yr["state_lag1_zscore"] = state_yr.groupby("state")["state_total"].transform(
lambda s: _rolling_zscore(s)
)
# Normalised state lag (state lag relative to its own history)
state_hist_mean = state_yr.groupby("state")["state_total_lag1"].transform("mean")
state_yr["state_lag1_norm"] = (
state_yr["state_total_lag1"] / state_hist_mean.replace(0, np.nan)
).clip(0, 5).fillna(1.0)
df = df.merge(
state_yr[["state", "financial_year",
"state_lag1_zscore", "state_lag1_norm"]],
on=["state", "financial_year"],
how="left"
)
# District's position relative to state mean (its structural share)
state_yr_lag = df.groupby(["state", "financial_year"])["lag1_pd"].transform("mean")
df["relative_to_state"] = (
df["lag1_pd"] / state_yr_lag.replace(0, np.nan)
).clip(0, 10).fillna(1.0)
return df
# ── Temporal flags ────────────────────────────────────────────────────────────
def _temporal_flags(df: pd.DataFrame) -> pd.DataFrame:
fy_min = df["financial_year"].min()
df["year_trend"] = df["financial_year"] - fy_min
df["is_covid"] = (df["financial_year"] == 2020).astype(int)
df["is_post_covid"] = (df["financial_year"] >= 2021).astype(int)
df["is_2022_anomaly"] = (df["financial_year"] == 2022).astype(int)
return df
# ── Wage features ─────────────────────────────────────────────────────────────
def _wage_features(df: pd.DataFrame) -> pd.DataFrame:
"""
avg_wage_rate is the official state-notified wage schedule β€” genuinely
exogenous (set by government, not derived from person_days).
wage_yoy captures the policy signal of wage revision speed.
"""
if "avg_wage_rate" not in df.columns:
return df
df["wage_yoy"] = (
df.groupby(["state", "district"])["avg_wage_rate"]
.pct_change(fill_method=None)
.fillna(0)
.clip(-0.2, 0.5)
)
return df
# ── Categorical encoding ──────────────────────────────────────────────────────
def _encode_categoricals(df: pd.DataFrame) -> pd.DataFrame:
le_state = LabelEncoder()
le_dist = LabelEncoder()
df["state_enc"] = le_state.fit_transform(df["state"].astype(str))
df["district_enc"] = le_dist.fit_transform(
(df["district"] + "|" + df["state"]).astype(str)
)
return df
# ── Feature list for model ────────────────────────────────────────────────────
# Canonical lean feature set β€” chosen by permutation importance analysis.
# All features are computed from lagged/historical values only.
FEATURE_COLS = [
"lag1_pd",
"roll2_mean",
"roll3_mean",
"lag1_adj",
"lag2_pd",
"lag3_pd",
"roll3_std",
"state_lag1_norm",
"relative_to_state",
"blended_capacity",
"lag1_vs_capacity",
"state_lag1_zscore",
"state_enc",
"is_covid",
"lag1_is_covid",
"wage_yoy",
"avg_wage_rate",
]