Spaces:
Sleeping
Sleeping
File size: 9,179 Bytes
f87e795 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 | """
pipeline.py
-----------
V3 pipeline orchestrator for SchemeImpactNet.
Changes from original:
- RAW_PATH now points to the real Dataful government CSV
(confirmed 99% match with mnrega_india_unified.csv, <0.005L diff)
- Feature engineering uses V3 leak-free features (src/features.py)
- Model uses GBR V3 with walk-forward CV (src/model.py)
- Model saved to models/mnrega_gbr_v3.pkl
- Removed generate_synthetic dependency from Stage 1
- Stage 3 model comparison retained but flags honest metrics
Data sources:
Real: data/raw/20063- Dataful/mnrega-...-persondays-...csv
β person_days_lakhs, households_availed (real gov data)
β avg_wage_rate (official wage schedule, exogenous)
Synthetic: all other columns (rainfall, poverty, pmkisan, pmay)
β EXCLUDED from V3 model features
"""
import os
import pandas as pd
import numpy as np
from src.clean import clean
from src.features import build_features
from src.eda import run_eda
from src.model import run_model
# ββ Data paths ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
DATAFUL_PATH = os.path.join(
"data", "raw", "20063- Dataful",
"mnrega-year-month-state-and-district-wise-total-persondays-"
"and-households-engaged-in-work.csv"
)
UNIFIED_PATH = os.path.join("data", "raw", "mnrega_india_unified.csv")
PROCESSED_PATH = os.path.join("data", "processed", "mnrega_cleaned.csv")
MODEL_PATH = os.path.join("models", "mnrega_best_model.pkl")
SCOPE_LABEL = {
1: "Maharashtra",
2: "All-India",
3: "All-India (V3 leak-free)",
}
def run_pipeline(stage: int = 3) -> pd.DataFrame:
assert stage in [1, 2, 3], "Stage must be 1, 2, or 3"
print("\n" + "=" * 60)
print(f" SchemeImpactNet V3 β Stage {stage} Pipeline")
print(f" Scope : {SCOPE_LABEL[stage]}")
print("=" * 60)
# ββ Extract βββββββββββββββββββββββββββββββββββββββββββββββββββ
print(f"\n[pipeline] Step 1: Extract (real government data)")
df = _load_real_data(state_filter="Maharashtra" if stage == 1 else None)
# ββ Clean βββββββββββββββββββββββββββββββββββββββββββββββββββββ
print(f"\n[pipeline] Step 2: Clean")
df = _clean_real(df)
# ββ Features ββββββββββββββββββββββββββββββββββββββββββββββββββ
print(f"\n[pipeline] Step 3: V3 Feature Engineering (leak-free)")
df = build_features(df)
# ββ Save processed ββββββββββββββββββββββββββββββββββββββββββββ
os.makedirs(os.path.dirname(PROCESSED_PATH), exist_ok=True)
df.to_csv(PROCESSED_PATH, index=False)
print(f"\n[pipeline] Processed data saved β {PROCESSED_PATH}")
# ββ EDA βββββββββββββββββββββββββββββββββββββββββββββββββββββββ
print(f"\n[pipeline] Step 4: EDA")
run_eda(df, scope=SCOPE_LABEL[stage])
# ββ Model βββββββββββββββββββββββββββββββββββββββββββββββββββββ
print(f"\n[pipeline] Step 5: V3 Model (walk-forward CV + pkl save)")
predictions = run_model(df)
print("\n" + "=" * 60)
print(f" Stage {stage} Complete!")
print(f" Processed : {PROCESSED_PATH}")
print(f" Model : {MODEL_PATH}")
print(f" Figures : reports/figures/")
print(f" Predictions : data/processed/mnrega_predictions.csv")
print(f" Report : reports/model_report.txt")
print("=" * 60 + "\n")
return predictions
# ββ Real data loader ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
def _load_real_data(state_filter: str = None) -> pd.DataFrame:
"""
Load and pivot the Dataful government CSV from long format
(one row per district-month-category) to annual wide format
(one row per district-year with person_days_lakhs + households_availed).
Falls back to unified CSV if Dataful not found.
"""
if os.path.exists(DATAFUL_PATH):
print(f"[pipeline] Loading Dataful government CSV: {DATAFUL_PATH}")
df_raw = pd.read_csv(DATAFUL_PATH)
df_raw.columns = [c.strip().lower().replace(" ", "_") for c in df_raw.columns]
# Parse fiscal year start integer from '2014-15' β 2014
df_raw["fy"] = df_raw["fiscal_year"].apply(
lambda v: int(str(v).split("-")[0]) if "-" in str(v) else int(v)
)
# Exclude incomplete current fiscal year
df_raw = df_raw[df_raw["fy"] <= 2024]
# Pivot: sum monthly values to annual per district
pivot = df_raw.pivot_table(
index=["fiscal_year", "fy", "state", "district"],
columns="category",
values="value",
aggfunc="sum"
).reset_index()
pivot.columns.name = None
# Rename to match model schema
pivot = pivot.rename(columns={
"Persondays": "person_days",
"Household": "households_availed",
"fy": "financial_year",
})
pivot["person_days_lakhs"] = (pivot["person_days"] / 1e5).round(3)
# Bring in avg_wage_rate from unified CSV (official schedule, exogenous)
if os.path.exists(UNIFIED_PATH):
df_uni = pd.read_csv(UNIFIED_PATH)
df_uni.columns = [c.strip().lower().replace(" ", "_") for c in df_uni.columns]
df_uni["financial_year"] = df_uni["financial_year"].apply(
lambda v: int(str(v).split("-")[0]) if "-" in str(v) else int(v)
)
wage_map = df_uni[["state", "financial_year", "avg_wage_rate"]].drop_duplicates()
pivot = pivot.merge(wage_map, on=["state", "financial_year"], how="left")
# Keep only needed columns
keep = ["state", "district", "financial_year",
"person_days_lakhs", "households_availed", "avg_wage_rate"]
df = pivot[[c for c in keep if c in pivot.columns]].copy()
else:
print(f"[pipeline] Dataful CSV not found, falling back to unified CSV")
print(f"[pipeline] NOTE: unified CSV contains synthetic columns β "
f"V3 features ignore them")
df = pd.read_csv(UNIFIED_PATH)
df.columns = [c.strip().lower().replace(" ", "_") for c in df.columns]
df["financial_year"] = df["financial_year"].apply(
lambda v: int(str(v).split("-")[0]) if "-" in str(v) else int(v)
)
if state_filter:
before = len(df)
df = df[df["state"] == state_filter].reset_index(drop=True)
print(f"[pipeline] Filtered to {state_filter}: {before} β {len(df)} rows")
print(f"[pipeline] Loaded {len(df):,} rows | "
f"{df['state'].nunique()} states | "
f"{df['district'].nunique()} districts | "
f"{df['financial_year'].nunique()} years "
f"({df['financial_year'].min()}β{df['financial_year'].max()})")
return df
def _clean_real(df: pd.DataFrame) -> pd.DataFrame:
"""
Lightweight clean for the real Dataful data.
The full clean() from src/clean.py expects synthetic columns β
we do a minimal version here.
"""
df = df.sort_values(["state", "district", "financial_year"]).reset_index(drop=True)
# Strip strings
for col in df.select_dtypes(include="object").columns:
df[col] = df[col].str.strip()
# Numeric cast
for col in ["person_days_lakhs", "households_availed", "avg_wage_rate"]:
if col in df.columns:
df[col] = pd.to_numeric(df[col], errors="coerce")
# Forward-fill wage within state (official schedule rarely changes mid-year)
if "avg_wage_rate" in df.columns:
df["avg_wage_rate"] = df.groupby("state")["avg_wage_rate"].transform(
lambda s: s.ffill().bfill()
)
# Drop rows with no person_days_lakhs
before = len(df)
df = df.dropna(subset=["person_days_lakhs"]).reset_index(drop=True)
if len(df) < before:
print(f"[pipeline] Dropped {before - len(df)} rows with null person_days_lakhs")
print(f"[pipeline] Cleaned. Shape: {df.shape}")
return df
def run_optimizer_step(scope_state: str = None) -> None:
"""Run the budget optimizer after predictions are generated."""
from src.optimize import run_optimizer
run_optimizer(
predictions_path=os.path.join("data", "processed", "mnrega_predictions.csv"),
raw_path=UNIFIED_PATH,
scope_state=scope_state,
target_year=2024,
)
|