SchemeImpactNet / src /pipeline.py
sammeeer's picture
Inital schemeimpactnet deployment
f87e795
"""
pipeline.py
-----------
V3 pipeline orchestrator for SchemeImpactNet.
Changes from original:
- RAW_PATH now points to the real Dataful government CSV
(confirmed 99% match with mnrega_india_unified.csv, <0.005L diff)
- Feature engineering uses V3 leak-free features (src/features.py)
- Model uses GBR V3 with walk-forward CV (src/model.py)
- Model saved to models/mnrega_gbr_v3.pkl
- Removed generate_synthetic dependency from Stage 1
- Stage 3 model comparison retained but flags honest metrics
Data sources:
Real: data/raw/20063- Dataful/mnrega-...-persondays-...csv
β†’ person_days_lakhs, households_availed (real gov data)
β†’ avg_wage_rate (official wage schedule, exogenous)
Synthetic: all other columns (rainfall, poverty, pmkisan, pmay)
β†’ EXCLUDED from V3 model features
"""
import os
import pandas as pd
import numpy as np
from src.clean import clean
from src.features import build_features
from src.eda import run_eda
from src.model import run_model
# ── Data paths ────────────────────────────────────────────────────────────────
DATAFUL_PATH = os.path.join(
"data", "raw", "20063- Dataful",
"mnrega-year-month-state-and-district-wise-total-persondays-"
"and-households-engaged-in-work.csv"
)
UNIFIED_PATH = os.path.join("data", "raw", "mnrega_india_unified.csv")
PROCESSED_PATH = os.path.join("data", "processed", "mnrega_cleaned.csv")
MODEL_PATH = os.path.join("models", "mnrega_best_model.pkl")
SCOPE_LABEL = {
1: "Maharashtra",
2: "All-India",
3: "All-India (V3 leak-free)",
}
def run_pipeline(stage: int = 3) -> pd.DataFrame:
assert stage in [1, 2, 3], "Stage must be 1, 2, or 3"
print("\n" + "=" * 60)
print(f" SchemeImpactNet V3 β€” Stage {stage} Pipeline")
print(f" Scope : {SCOPE_LABEL[stage]}")
print("=" * 60)
# ── Extract ───────────────────────────────────────────────────
print(f"\n[pipeline] Step 1: Extract (real government data)")
df = _load_real_data(state_filter="Maharashtra" if stage == 1 else None)
# ── Clean ─────────────────────────────────────────────────────
print(f"\n[pipeline] Step 2: Clean")
df = _clean_real(df)
# ── Features ──────────────────────────────────────────────────
print(f"\n[pipeline] Step 3: V3 Feature Engineering (leak-free)")
df = build_features(df)
# ── Save processed ────────────────────────────────────────────
os.makedirs(os.path.dirname(PROCESSED_PATH), exist_ok=True)
df.to_csv(PROCESSED_PATH, index=False)
print(f"\n[pipeline] Processed data saved β†’ {PROCESSED_PATH}")
# ── EDA ───────────────────────────────────────────────────────
print(f"\n[pipeline] Step 4: EDA")
run_eda(df, scope=SCOPE_LABEL[stage])
# ── Model ─────────────────────────────────────────────────────
print(f"\n[pipeline] Step 5: V3 Model (walk-forward CV + pkl save)")
predictions = run_model(df)
print("\n" + "=" * 60)
print(f" Stage {stage} Complete!")
print(f" Processed : {PROCESSED_PATH}")
print(f" Model : {MODEL_PATH}")
print(f" Figures : reports/figures/")
print(f" Predictions : data/processed/mnrega_predictions.csv")
print(f" Report : reports/model_report.txt")
print("=" * 60 + "\n")
return predictions
# ── Real data loader ──────────────────────────────────────────────────────────
def _load_real_data(state_filter: str = None) -> pd.DataFrame:
"""
Load and pivot the Dataful government CSV from long format
(one row per district-month-category) to annual wide format
(one row per district-year with person_days_lakhs + households_availed).
Falls back to unified CSV if Dataful not found.
"""
if os.path.exists(DATAFUL_PATH):
print(f"[pipeline] Loading Dataful government CSV: {DATAFUL_PATH}")
df_raw = pd.read_csv(DATAFUL_PATH)
df_raw.columns = [c.strip().lower().replace(" ", "_") for c in df_raw.columns]
# Parse fiscal year start integer from '2014-15' β†’ 2014
df_raw["fy"] = df_raw["fiscal_year"].apply(
lambda v: int(str(v).split("-")[0]) if "-" in str(v) else int(v)
)
# Exclude incomplete current fiscal year
df_raw = df_raw[df_raw["fy"] <= 2024]
# Pivot: sum monthly values to annual per district
pivot = df_raw.pivot_table(
index=["fiscal_year", "fy", "state", "district"],
columns="category",
values="value",
aggfunc="sum"
).reset_index()
pivot.columns.name = None
# Rename to match model schema
pivot = pivot.rename(columns={
"Persondays": "person_days",
"Household": "households_availed",
"fy": "financial_year",
})
pivot["person_days_lakhs"] = (pivot["person_days"] / 1e5).round(3)
# Bring in avg_wage_rate from unified CSV (official schedule, exogenous)
if os.path.exists(UNIFIED_PATH):
df_uni = pd.read_csv(UNIFIED_PATH)
df_uni.columns = [c.strip().lower().replace(" ", "_") for c in df_uni.columns]
df_uni["financial_year"] = df_uni["financial_year"].apply(
lambda v: int(str(v).split("-")[0]) if "-" in str(v) else int(v)
)
wage_map = df_uni[["state", "financial_year", "avg_wage_rate"]].drop_duplicates()
pivot = pivot.merge(wage_map, on=["state", "financial_year"], how="left")
# Keep only needed columns
keep = ["state", "district", "financial_year",
"person_days_lakhs", "households_availed", "avg_wage_rate"]
df = pivot[[c for c in keep if c in pivot.columns]].copy()
else:
print(f"[pipeline] Dataful CSV not found, falling back to unified CSV")
print(f"[pipeline] NOTE: unified CSV contains synthetic columns β€” "
f"V3 features ignore them")
df = pd.read_csv(UNIFIED_PATH)
df.columns = [c.strip().lower().replace(" ", "_") for c in df.columns]
df["financial_year"] = df["financial_year"].apply(
lambda v: int(str(v).split("-")[0]) if "-" in str(v) else int(v)
)
if state_filter:
before = len(df)
df = df[df["state"] == state_filter].reset_index(drop=True)
print(f"[pipeline] Filtered to {state_filter}: {before} β†’ {len(df)} rows")
print(f"[pipeline] Loaded {len(df):,} rows | "
f"{df['state'].nunique()} states | "
f"{df['district'].nunique()} districts | "
f"{df['financial_year'].nunique()} years "
f"({df['financial_year'].min()}–{df['financial_year'].max()})")
return df
def _clean_real(df: pd.DataFrame) -> pd.DataFrame:
"""
Lightweight clean for the real Dataful data.
The full clean() from src/clean.py expects synthetic columns β€”
we do a minimal version here.
"""
df = df.sort_values(["state", "district", "financial_year"]).reset_index(drop=True)
# Strip strings
for col in df.select_dtypes(include="object").columns:
df[col] = df[col].str.strip()
# Numeric cast
for col in ["person_days_lakhs", "households_availed", "avg_wage_rate"]:
if col in df.columns:
df[col] = pd.to_numeric(df[col], errors="coerce")
# Forward-fill wage within state (official schedule rarely changes mid-year)
if "avg_wage_rate" in df.columns:
df["avg_wage_rate"] = df.groupby("state")["avg_wage_rate"].transform(
lambda s: s.ffill().bfill()
)
# Drop rows with no person_days_lakhs
before = len(df)
df = df.dropna(subset=["person_days_lakhs"]).reset_index(drop=True)
if len(df) < before:
print(f"[pipeline] Dropped {before - len(df)} rows with null person_days_lakhs")
print(f"[pipeline] Cleaned. Shape: {df.shape}")
return df
def run_optimizer_step(scope_state: str = None) -> None:
"""Run the budget optimizer after predictions are generated."""
from src.optimize import run_optimizer
run_optimizer(
predictions_path=os.path.join("data", "processed", "mnrega_predictions.csv"),
raw_path=UNIFIED_PATH,
scope_state=scope_state,
target_year=2024,
)