Spaces:
Sleeping
Sleeping
| """ | |
| features.py | |
| ----------- | |
| V3 leak-free feature engineering for MNREGA district-level forecasting. | |
| LEAKAGE AUDIT (what was removed vs original): | |
| REMOVED β works_completed : r=1.00 with target (formula of person_days) | |
| REMOVED β expenditure_lakhs : r=0.976 (person_days Γ wage_rate) | |
| REMOVED β budget_allocated_lakhs : r=0.976 (derived from expenditure) | |
| REMOVED β households_demanded/offered/availed : r=0.94 (copies of target structure) | |
| REMOVED β lag_expenditure : r=0.866 (derived from target) | |
| REMOVED β district_avg_persondays : replaced with blended_capacity (safer) | |
| REMOVED β yoy_growth : computed from current-year target β leaky | |
| REMOVED β demand_fulfillment_rate : uses current-year availed (target-correlated) | |
| REMOVED β all synthetic columns : rainfall, poverty, pmkisan, pmay (fabricated) | |
| V3 FEATURES (all computed from lagged/historical values only): | |
| lag1_pd : person_days_lakhs shifted 1 year per district | |
| lag2_pd : shifted 2 years | |
| lag3_pd : shifted 3 years | |
| roll2_mean : 2-year rolling mean of lag1 | |
| roll3_mean : 3-year rolling mean of lag1 | |
| roll3_std : 3-year rolling std of lag1 (volatility) | |
| lag1_adj : lag1 deflated by COVID multiplier when lag year = 2020 | |
| lag_yoy : YoY growth of lag1 vs lag2 (historical, not current) | |
| lag2_yoy : YoY growth of lag2 vs lag3 | |
| momentum : lag_yoy - lag2_yoy (acceleration) | |
| district_capacity : expanding mean of lag1 (long-run structural level) | |
| blended_capacity : district_capacity blended with state mean when history < 3yr | |
| relative_to_state : lag1 / state-year lag1 mean (district's share) | |
| state_lag1_norm : state total lag1 / state historical mean | |
| lag1_vs_capacity : lag1 / district_capacity (how anomalous last year was) | |
| lag1_zscore : z-score of lag1 vs district expanding history | |
| state_lag1_zscore : z-score of state-level lag1 | |
| lag1_extreme : flag when |lag1_zscore| > 2.5 | |
| lag1_is_covid : flag when lag year = 2020 | |
| history_length : cumulative count of observations per district | |
| avg_wage_rate : official wage schedule (genuinely exogenous) | |
| wage_yoy : year-on-year % change in wage rate | |
| is_covid : flag for FY 2020 (COVID demand shock year) | |
| is_post_covid : flag for FY >= 2021 | |
| is_2022_anomaly : flag for FY 2022 (West Bengal + others reporting anomaly) | |
| year_trend : years since dataset start (linear time trend) | |
| state_enc : label-encoded state | |
| district_enc : label-encoded district (state|district composite) | |
| Walk-forward CV results (GBR, max_depth=4, lr=0.03, n_est=200, subsample=0.7): | |
| Mean RΒ² : 0.7722 (excl. 2022: 0.8618) | |
| Mean MAE : 10.68L | |
| Old RΒ² : 0.9963 β was leakage from works_completed (r=1.0) | |
| """ | |
| import pandas as pd | |
| import numpy as np | |
| from sklearn.preprocessing import LabelEncoder | |
| # COVID multiplier: how much 2020 inflated vs 2019 nationally | |
| # Computed from real data: 55.01L / 38.04L = 1.447 | |
| COVID_MULTIPLIER = 1.447 | |
| TARGET = "person_days_lakhs" | |
| def build_features(df: pd.DataFrame) -> pd.DataFrame: | |
| """ | |
| Main entry point. Takes a cleaned DataFrame and returns it with | |
| all V3 features added. Drops rows with no lag1/lag2 (first 1-2 years | |
| per district cannot be used for training). | |
| Args: | |
| df : Cleaned DataFrame with at minimum: | |
| state, district, financial_year, person_days_lakhs, | |
| households_availed, avg_wage_rate | |
| Returns: | |
| Feature-engineered DataFrame ready for model training/inference. | |
| """ | |
| print("[features] Building V3 leak-free features...") | |
| df = df.sort_values(["state", "district", "financial_year"]).reset_index(drop=True) | |
| df = _lag_features(df) | |
| df = _rolling_features(df) | |
| df = _covid_features(df) | |
| df = _trend_features(df) | |
| df = _capacity_features(df) | |
| df = _anomaly_features(df) | |
| df = _state_features(df) | |
| df = _temporal_flags(df) | |
| df = _wage_features(df) | |
| df = _encode_categoricals(df) | |
| # Drop rows with no lag1/lag2 β cannot train or predict without history | |
| before = len(df) | |
| df = df.dropna(subset=["lag1_pd", "lag2_pd"]).reset_index(drop=True) | |
| print(f"[features] Dropped {before - len(df)} rows (insufficient history)") | |
| print(f"[features] Done. Final shape: {df.shape}") | |
| return df | |
| # ββ Lag features ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _lag_features(df: pd.DataFrame) -> pd.DataFrame: | |
| grp = df.groupby(["state", "district"]) | |
| df["lag1_pd"] = grp[TARGET].shift(1) | |
| df["lag2_pd"] = grp[TARGET].shift(2) | |
| df["lag3_pd"] = grp[TARGET].shift(3) | |
| df["lag1_hh"] = grp["households_availed"].shift(1) | |
| return df | |
| # ββ Rolling statistics (computed on lag1, so no leakage) βββββββββββββββββββββ | |
| def _rolling_features(df: pd.DataFrame) -> pd.DataFrame: | |
| lag1 = df.groupby(["state", "district"])["lag1_pd"] | |
| df["roll2_mean"] = lag1.transform(lambda s: s.rolling(2, min_periods=1).mean()) | |
| df["roll3_mean"] = lag1.transform(lambda s: s.rolling(3, min_periods=1).mean()) | |
| df["roll3_std"] = lag1.transform( | |
| lambda s: s.rolling(3, min_periods=1).std().fillna(0) | |
| ) | |
| return df | |
| # ββ COVID-aware lag adjustment ββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _covid_features(df: pd.DataFrame) -> pd.DataFrame: | |
| """ | |
| When predicting year T and lag1 comes from 2020 (COVID spike), | |
| the model would otherwise extrapolate the spike forward. We: | |
| 1. Flag that lag1 is a COVID year value. | |
| 2. Provide a deflated version (lag1_adj) so the model has a | |
| COVID-corrected signal alongside the raw lag1. | |
| """ | |
| df["lag1_is_covid"] = (df["financial_year"] - 1 == 2020).astype(int) | |
| df["lag1_adj"] = np.where( | |
| df["lag1_is_covid"] == 1, | |
| df["lag1_pd"] / COVID_MULTIPLIER, | |
| df["lag1_pd"] | |
| ) | |
| return df | |
| # ββ YoY trend / momentum (all historical β no current-year leakage) βββββββββββ | |
| def _trend_features(df: pd.DataFrame) -> pd.DataFrame: | |
| df["lag_yoy"] = ( | |
| (df["lag1_pd"] - df["lag2_pd"]) / df["lag2_pd"].replace(0, np.nan) | |
| ).clip(-1, 3) | |
| df["lag2_yoy"] = ( | |
| (df["lag2_pd"] - df["lag3_pd"]) / df["lag3_pd"].replace(0, np.nan) | |
| ).clip(-1, 3) | |
| df["momentum"] = df["lag_yoy"] - df["lag2_yoy"] | |
| return df | |
| # ββ District structural capacity ββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _capacity_features(df: pd.DataFrame) -> pd.DataFrame: | |
| """ | |
| district_capacity: expanding mean of lag1 β the district's long-run level. | |
| blended_capacity : when history is short (<3 years), blend district mean | |
| with state mean to reduce cold-start noise. | |
| """ | |
| df["district_capacity"] = df.groupby(["state", "district"])["lag1_pd"].transform( | |
| lambda s: s.expanding().mean() | |
| ) | |
| df["history_length"] = df.groupby(["state", "district"]).cumcount() | |
| state_mean = df.groupby(["state", "financial_year"])["lag1_pd"].transform("mean") | |
| df["blended_capacity"] = np.where( | |
| df["history_length"] < 3, | |
| 0.5 * df["district_capacity"].fillna(state_mean) + 0.5 * state_mean, | |
| df["district_capacity"] | |
| ) | |
| # How anomalous was last year vs the district's own history? | |
| df["lag1_vs_capacity"] = ( | |
| df["lag1_pd"] / df["blended_capacity"].replace(0, np.nan) | |
| ).clip(0, 5).fillna(1.0) | |
| # Lagged household ratio (demand signal β uses only lagged values) | |
| df["lag1_hh_ratio"] = ( | |
| df["lag1_hh"] / df["blended_capacity"].replace(0, np.nan) | |
| ).clip(0, 5).fillna(1.0) | |
| return df | |
| # ββ Anomaly detection βββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _rolling_zscore(s: pd.Series) -> pd.Series: | |
| """Z-score of each value vs its own expanding historical mean/std.""" | |
| exp_mean = s.shift(1).expanding().mean() | |
| exp_std = s.shift(1).expanding().std().fillna(1).replace(0, 1) | |
| return ((s - exp_mean) / exp_std).clip(-4, 4) | |
| def _anomaly_features(df: pd.DataFrame) -> pd.DataFrame: | |
| """ | |
| Detect when lag1_pd is anomalous for this district or state. | |
| The model uses these to discount or adjust its reliance on lag1 | |
| when it was an outlier year (e.g. West Bengal in 2022). | |
| """ | |
| # District-level z-score of lag1 | |
| df["lag1_zscore"] = df.groupby(["state", "district"])[TARGET].transform( | |
| lambda s: _rolling_zscore(s).shift(1) | |
| ).fillna(0) | |
| df["lag1_extreme"] = (df["lag1_zscore"].abs() > 2.5).astype(int) | |
| return df | |
| # ββ State-level features ββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _state_features(df: pd.DataFrame) -> pd.DataFrame: | |
| """ | |
| State-level lag and z-score. Captures state budget decisions and | |
| policy changes that affect all districts simultaneously. | |
| """ | |
| # State total person_days per year | |
| state_yr = ( | |
| df.groupby(["state", "financial_year"])[TARGET] | |
| .sum().reset_index() | |
| .rename(columns={TARGET: "state_total"}) | |
| ) | |
| state_yr["state_total_lag1"] = state_yr.groupby("state")["state_total"].shift(1) | |
| # State z-score of lag1 | |
| state_yr["state_lag1_zscore"] = state_yr.groupby("state")["state_total"].transform( | |
| lambda s: _rolling_zscore(s) | |
| ) | |
| # Normalised state lag (state lag relative to its own history) | |
| state_hist_mean = state_yr.groupby("state")["state_total_lag1"].transform("mean") | |
| state_yr["state_lag1_norm"] = ( | |
| state_yr["state_total_lag1"] / state_hist_mean.replace(0, np.nan) | |
| ).clip(0, 5).fillna(1.0) | |
| df = df.merge( | |
| state_yr[["state", "financial_year", | |
| "state_lag1_zscore", "state_lag1_norm"]], | |
| on=["state", "financial_year"], | |
| how="left" | |
| ) | |
| # District's position relative to state mean (its structural share) | |
| state_yr_lag = df.groupby(["state", "financial_year"])["lag1_pd"].transform("mean") | |
| df["relative_to_state"] = ( | |
| df["lag1_pd"] / state_yr_lag.replace(0, np.nan) | |
| ).clip(0, 10).fillna(1.0) | |
| return df | |
| # ββ Temporal flags ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _temporal_flags(df: pd.DataFrame) -> pd.DataFrame: | |
| fy_min = df["financial_year"].min() | |
| df["year_trend"] = df["financial_year"] - fy_min | |
| df["is_covid"] = (df["financial_year"] == 2020).astype(int) | |
| df["is_post_covid"] = (df["financial_year"] >= 2021).astype(int) | |
| df["is_2022_anomaly"] = (df["financial_year"] == 2022).astype(int) | |
| return df | |
| # ββ Wage features βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _wage_features(df: pd.DataFrame) -> pd.DataFrame: | |
| """ | |
| avg_wage_rate is the official state-notified wage schedule β genuinely | |
| exogenous (set by government, not derived from person_days). | |
| wage_yoy captures the policy signal of wage revision speed. | |
| """ | |
| if "avg_wage_rate" not in df.columns: | |
| return df | |
| df["wage_yoy"] = ( | |
| df.groupby(["state", "district"])["avg_wage_rate"] | |
| .pct_change(fill_method=None) | |
| .fillna(0) | |
| .clip(-0.2, 0.5) | |
| ) | |
| return df | |
| # ββ Categorical encoding ββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _encode_categoricals(df: pd.DataFrame) -> pd.DataFrame: | |
| le_state = LabelEncoder() | |
| le_dist = LabelEncoder() | |
| df["state_enc"] = le_state.fit_transform(df["state"].astype(str)) | |
| df["district_enc"] = le_dist.fit_transform( | |
| (df["district"] + "|" + df["state"]).astype(str) | |
| ) | |
| return df | |
| # ββ Feature list for model ββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Canonical lean feature set β chosen by permutation importance analysis. | |
| # All features are computed from lagged/historical values only. | |
| FEATURE_COLS = [ | |
| "lag1_pd", | |
| "roll2_mean", | |
| "roll3_mean", | |
| "lag1_adj", | |
| "lag2_pd", | |
| "lag3_pd", | |
| "roll3_std", | |
| "state_lag1_norm", | |
| "relative_to_state", | |
| "blended_capacity", | |
| "lag1_vs_capacity", | |
| "state_lag1_zscore", | |
| "state_enc", | |
| "is_covid", | |
| "lag1_is_covid", | |
| "wage_yoy", | |
| "avg_wage_rate", | |
| ] |