Spaces:
Sleeping
Sleeping
File size: 3,610 Bytes
f87e795 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 | """
clean.py
--------
Cleans and standardizes the unified MNREGA dataset.
Works for Stage 1 (Maharashtra) through Stage 3 (All-India + scheme data).
"""
import pandas as pd
import numpy as np
CRITICAL_COLS = ["person_days_lakhs", "expenditure_lakhs", "avg_wage_rate"]
NON_CRITICAL_COLS = [
"households_demanded", "households_offered", "households_availed",
"works_completed", "rainfall_mm", "crop_season_index",
"rural_population_lakhs", "poverty_rate_pct",
"pmkisan_beneficiaries", "pmkisan_amount_lakhs",
"pmay_houses_sanctioned", "pmay_houses_completed",
"pmay_expenditure_lakhs", "budget_allocated_lakhs"
]
def clean(df: pd.DataFrame) -> pd.DataFrame:
print("[clean] Starting cleaning pipeline...")
df = _strip_strings(df)
df = _parse_financial_year(df)
df = _cast_numerics(df)
df = _handle_missing(df)
df = _enforce_logical_constraints(df)
print(f"[clean] Done. Shape: {df.shape}")
return df
def _strip_strings(df: pd.DataFrame) -> pd.DataFrame:
for col in df.select_dtypes(include="object").columns:
df[col] = df[col].str.strip()
return df
def _parse_financial_year(df: pd.DataFrame) -> pd.DataFrame:
"""Convert '2018-19' → integer 2018."""
def _parse(val):
val = str(val).strip()
return int(val.split("-")[0]) if "-" in val else int(val)
df["financial_year"] = df["financial_year"].apply(_parse)
print(f"[clean] financial_year range: {df['financial_year'].min()} – {df['financial_year'].max()}")
return df
def _cast_numerics(df: pd.DataFrame) -> pd.DataFrame:
all_numeric = CRITICAL_COLS + NON_CRITICAL_COLS
for col in all_numeric:
if col in df.columns:
df[col] = pd.to_numeric(df[col], errors="coerce")
return df
def _handle_missing(df: pd.DataFrame) -> pd.DataFrame:
"""
Critical cols → forward-fill within district, drop if still null.
Non-critical → forward-fill within district, leave remaining NaN.
"""
df = df.sort_values(["state", "district", "financial_year"])
for col in CRITICAL_COLS + NON_CRITICAL_COLS:
if col not in df.columns:
continue
before = df[col].isna().sum()
if before > 0:
df[col] = df.groupby(["state", "district"])[col].transform(lambda s: s.ffill())
filled = before - df[col].isna().sum()
if filled > 0:
print(f"[clean] '{col}': forward-filled {filled} value(s)")
before = len(df)
df = df.dropna(subset=CRITICAL_COLS).reset_index(drop=True)
if len(df) < before:
print(f"[clean] Dropped {before - len(df)} rows with unresolvable critical nulls")
return df
def _enforce_logical_constraints(df: pd.DataFrame) -> pd.DataFrame:
"""Clip any constraint violations that slipped through generation."""
if all(c in df.columns for c in ["households_offered", "households_demanded"]):
violations = (df["households_offered"] > df["households_demanded"]).sum()
if violations:
df["households_offered"] = df[["households_offered", "households_demanded"]].min(axis=1)
print(f"[clean] Fixed {violations} households_offered > households_demanded")
if all(c in df.columns for c in ["households_availed", "households_offered"]):
violations = (df["households_availed"] > df["households_offered"]).sum()
if violations:
df["households_availed"] = df[["households_availed", "households_offered"]].min(axis=1)
print(f"[clean] Fixed {violations} households_availed > households_offered")
return df
|