Spaces:
Sleeping
Sleeping
| """ | |
| pipeline.py | |
| ----------- | |
| V3 pipeline orchestrator for SchemeImpactNet. | |
| Changes from original: | |
| - RAW_PATH now points to the real Dataful government CSV | |
| (confirmed 99% match with mnrega_india_unified.csv, <0.005L diff) | |
| - Feature engineering uses V3 leak-free features (src/features.py) | |
| - Model uses GBR V3 with walk-forward CV (src/model.py) | |
| - Model saved to models/mnrega_gbr_v3.pkl | |
| - Removed generate_synthetic dependency from Stage 1 | |
| - Stage 3 model comparison retained but flags honest metrics | |
| Data sources: | |
| Real: data/raw/20063- Dataful/mnrega-...-persondays-...csv | |
| β person_days_lakhs, households_availed (real gov data) | |
| β avg_wage_rate (official wage schedule, exogenous) | |
| Synthetic: all other columns (rainfall, poverty, pmkisan, pmay) | |
| β EXCLUDED from V3 model features | |
| """ | |
| import os | |
| import pandas as pd | |
| import numpy as np | |
| from src.clean import clean | |
| from src.features import build_features | |
| from src.eda import run_eda | |
| from src.model import run_model | |
| # ββ Data paths ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| DATAFUL_PATH = os.path.join( | |
| "data", "raw", "20063- Dataful", | |
| "mnrega-year-month-state-and-district-wise-total-persondays-" | |
| "and-households-engaged-in-work.csv" | |
| ) | |
| UNIFIED_PATH = os.path.join("data", "raw", "mnrega_india_unified.csv") | |
| PROCESSED_PATH = os.path.join("data", "processed", "mnrega_cleaned.csv") | |
| MODEL_PATH = os.path.join("models", "mnrega_best_model.pkl") | |
| SCOPE_LABEL = { | |
| 1: "Maharashtra", | |
| 2: "All-India", | |
| 3: "All-India (V3 leak-free)", | |
| } | |
| def run_pipeline(stage: int = 3) -> pd.DataFrame: | |
| assert stage in [1, 2, 3], "Stage must be 1, 2, or 3" | |
| print("\n" + "=" * 60) | |
| print(f" SchemeImpactNet V3 β Stage {stage} Pipeline") | |
| print(f" Scope : {SCOPE_LABEL[stage]}") | |
| print("=" * 60) | |
| # ββ Extract βββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| print(f"\n[pipeline] Step 1: Extract (real government data)") | |
| df = _load_real_data(state_filter="Maharashtra" if stage == 1 else None) | |
| # ββ Clean βββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| print(f"\n[pipeline] Step 2: Clean") | |
| df = _clean_real(df) | |
| # ββ Features ββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| print(f"\n[pipeline] Step 3: V3 Feature Engineering (leak-free)") | |
| df = build_features(df) | |
| # ββ Save processed ββββββββββββββββββββββββββββββββββββββββββββ | |
| os.makedirs(os.path.dirname(PROCESSED_PATH), exist_ok=True) | |
| df.to_csv(PROCESSED_PATH, index=False) | |
| print(f"\n[pipeline] Processed data saved β {PROCESSED_PATH}") | |
| # ββ EDA βββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| print(f"\n[pipeline] Step 4: EDA") | |
| run_eda(df, scope=SCOPE_LABEL[stage]) | |
| # ββ Model βββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| print(f"\n[pipeline] Step 5: V3 Model (walk-forward CV + pkl save)") | |
| predictions = run_model(df) | |
| print("\n" + "=" * 60) | |
| print(f" Stage {stage} Complete!") | |
| print(f" Processed : {PROCESSED_PATH}") | |
| print(f" Model : {MODEL_PATH}") | |
| print(f" Figures : reports/figures/") | |
| print(f" Predictions : data/processed/mnrega_predictions.csv") | |
| print(f" Report : reports/model_report.txt") | |
| print("=" * 60 + "\n") | |
| return predictions | |
| # ββ Real data loader ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _load_real_data(state_filter: str = None) -> pd.DataFrame: | |
| """ | |
| Load and pivot the Dataful government CSV from long format | |
| (one row per district-month-category) to annual wide format | |
| (one row per district-year with person_days_lakhs + households_availed). | |
| Falls back to unified CSV if Dataful not found. | |
| """ | |
| if os.path.exists(DATAFUL_PATH): | |
| print(f"[pipeline] Loading Dataful government CSV: {DATAFUL_PATH}") | |
| df_raw = pd.read_csv(DATAFUL_PATH) | |
| df_raw.columns = [c.strip().lower().replace(" ", "_") for c in df_raw.columns] | |
| # Parse fiscal year start integer from '2014-15' β 2014 | |
| df_raw["fy"] = df_raw["fiscal_year"].apply( | |
| lambda v: int(str(v).split("-")[0]) if "-" in str(v) else int(v) | |
| ) | |
| # Exclude incomplete current fiscal year | |
| df_raw = df_raw[df_raw["fy"] <= 2024] | |
| # Pivot: sum monthly values to annual per district | |
| pivot = df_raw.pivot_table( | |
| index=["fiscal_year", "fy", "state", "district"], | |
| columns="category", | |
| values="value", | |
| aggfunc="sum" | |
| ).reset_index() | |
| pivot.columns.name = None | |
| # Rename to match model schema | |
| pivot = pivot.rename(columns={ | |
| "Persondays": "person_days", | |
| "Household": "households_availed", | |
| "fy": "financial_year", | |
| }) | |
| pivot["person_days_lakhs"] = (pivot["person_days"] / 1e5).round(3) | |
| # Bring in avg_wage_rate from unified CSV (official schedule, exogenous) | |
| if os.path.exists(UNIFIED_PATH): | |
| df_uni = pd.read_csv(UNIFIED_PATH) | |
| df_uni.columns = [c.strip().lower().replace(" ", "_") for c in df_uni.columns] | |
| df_uni["financial_year"] = df_uni["financial_year"].apply( | |
| lambda v: int(str(v).split("-")[0]) if "-" in str(v) else int(v) | |
| ) | |
| wage_map = df_uni[["state", "financial_year", "avg_wage_rate"]].drop_duplicates() | |
| pivot = pivot.merge(wage_map, on=["state", "financial_year"], how="left") | |
| # Keep only needed columns | |
| keep = ["state", "district", "financial_year", | |
| "person_days_lakhs", "households_availed", "avg_wage_rate"] | |
| df = pivot[[c for c in keep if c in pivot.columns]].copy() | |
| else: | |
| print(f"[pipeline] Dataful CSV not found, falling back to unified CSV") | |
| print(f"[pipeline] NOTE: unified CSV contains synthetic columns β " | |
| f"V3 features ignore them") | |
| df = pd.read_csv(UNIFIED_PATH) | |
| df.columns = [c.strip().lower().replace(" ", "_") for c in df.columns] | |
| df["financial_year"] = df["financial_year"].apply( | |
| lambda v: int(str(v).split("-")[0]) if "-" in str(v) else int(v) | |
| ) | |
| if state_filter: | |
| before = len(df) | |
| df = df[df["state"] == state_filter].reset_index(drop=True) | |
| print(f"[pipeline] Filtered to {state_filter}: {before} β {len(df)} rows") | |
| print(f"[pipeline] Loaded {len(df):,} rows | " | |
| f"{df['state'].nunique()} states | " | |
| f"{df['district'].nunique()} districts | " | |
| f"{df['financial_year'].nunique()} years " | |
| f"({df['financial_year'].min()}β{df['financial_year'].max()})") | |
| return df | |
| def _clean_real(df: pd.DataFrame) -> pd.DataFrame: | |
| """ | |
| Lightweight clean for the real Dataful data. | |
| The full clean() from src/clean.py expects synthetic columns β | |
| we do a minimal version here. | |
| """ | |
| df = df.sort_values(["state", "district", "financial_year"]).reset_index(drop=True) | |
| # Strip strings | |
| for col in df.select_dtypes(include="object").columns: | |
| df[col] = df[col].str.strip() | |
| # Numeric cast | |
| for col in ["person_days_lakhs", "households_availed", "avg_wage_rate"]: | |
| if col in df.columns: | |
| df[col] = pd.to_numeric(df[col], errors="coerce") | |
| # Forward-fill wage within state (official schedule rarely changes mid-year) | |
| if "avg_wage_rate" in df.columns: | |
| df["avg_wage_rate"] = df.groupby("state")["avg_wage_rate"].transform( | |
| lambda s: s.ffill().bfill() | |
| ) | |
| # Drop rows with no person_days_lakhs | |
| before = len(df) | |
| df = df.dropna(subset=["person_days_lakhs"]).reset_index(drop=True) | |
| if len(df) < before: | |
| print(f"[pipeline] Dropped {before - len(df)} rows with null person_days_lakhs") | |
| print(f"[pipeline] Cleaned. Shape: {df.shape}") | |
| return df | |
| def run_optimizer_step(scope_state: str = None) -> None: | |
| """Run the budget optimizer after predictions are generated.""" | |
| from src.optimize import run_optimizer | |
| run_optimizer( | |
| predictions_path=os.path.join("data", "processed", "mnrega_predictions.csv"), | |
| raw_path=UNIFIED_PATH, | |
| scope_state=scope_state, | |
| target_year=2024, | |
| ) | |