File size: 3,610 Bytes
f87e795
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
"""
clean.py
--------
Cleans and standardizes the unified MNREGA dataset.
Works for Stage 1 (Maharashtra) through Stage 3 (All-India + scheme data).
"""

import pandas as pd
import numpy as np

CRITICAL_COLS = ["person_days_lakhs", "expenditure_lakhs", "avg_wage_rate"]

NON_CRITICAL_COLS = [
    "households_demanded", "households_offered", "households_availed",
    "works_completed", "rainfall_mm", "crop_season_index",
    "rural_population_lakhs", "poverty_rate_pct",
    "pmkisan_beneficiaries", "pmkisan_amount_lakhs",
    "pmay_houses_sanctioned", "pmay_houses_completed",
    "pmay_expenditure_lakhs", "budget_allocated_lakhs"
]


def clean(df: pd.DataFrame) -> pd.DataFrame:
    print("[clean] Starting cleaning pipeline...")
    df = _strip_strings(df)
    df = _parse_financial_year(df)
    df = _cast_numerics(df)
    df = _handle_missing(df)
    df = _enforce_logical_constraints(df)
    print(f"[clean] Done. Shape: {df.shape}")
    return df


def _strip_strings(df: pd.DataFrame) -> pd.DataFrame:
    for col in df.select_dtypes(include="object").columns:
        df[col] = df[col].str.strip()
    return df


def _parse_financial_year(df: pd.DataFrame) -> pd.DataFrame:
    """Convert '2018-19' → integer 2018."""
    def _parse(val):
        val = str(val).strip()
        return int(val.split("-")[0]) if "-" in val else int(val)

    df["financial_year"] = df["financial_year"].apply(_parse)
    print(f"[clean] financial_year range: {df['financial_year'].min()}{df['financial_year'].max()}")
    return df


def _cast_numerics(df: pd.DataFrame) -> pd.DataFrame:
    all_numeric = CRITICAL_COLS + NON_CRITICAL_COLS
    for col in all_numeric:
        if col in df.columns:
            df[col] = pd.to_numeric(df[col], errors="coerce")
    return df


def _handle_missing(df: pd.DataFrame) -> pd.DataFrame:
    """
    Critical cols   → forward-fill within district, drop if still null.
    Non-critical    → forward-fill within district, leave remaining NaN.
    """
    df = df.sort_values(["state", "district", "financial_year"])

    for col in CRITICAL_COLS + NON_CRITICAL_COLS:
        if col not in df.columns:
            continue
        before = df[col].isna().sum()
        if before > 0:
            df[col] = df.groupby(["state", "district"])[col].transform(lambda s: s.ffill())
            filled = before - df[col].isna().sum()
            if filled > 0:
                print(f"[clean] '{col}': forward-filled {filled} value(s)")

    before = len(df)
    df = df.dropna(subset=CRITICAL_COLS).reset_index(drop=True)
    if len(df) < before:
        print(f"[clean] Dropped {before - len(df)} rows with unresolvable critical nulls")

    return df


def _enforce_logical_constraints(df: pd.DataFrame) -> pd.DataFrame:
    """Clip any constraint violations that slipped through generation."""
    if all(c in df.columns for c in ["households_offered", "households_demanded"]):
        violations = (df["households_offered"] > df["households_demanded"]).sum()
        if violations:
            df["households_offered"] = df[["households_offered", "households_demanded"]].min(axis=1)
            print(f"[clean] Fixed {violations} households_offered > households_demanded")

    if all(c in df.columns for c in ["households_availed", "households_offered"]):
        violations = (df["households_availed"] > df["households_offered"]).sum()
        if violations:
            df["households_availed"] = df[["households_availed", "households_offered"]].min(axis=1)
            print(f"[clean] Fixed {violations} households_availed > households_offered")

    return df