File size: 1,878 Bytes
f87e795
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
"""
extract.py
----------
Loads and validates the unified MNREGA CSV.
Supports both the synthetic unified dataset and any real CSV
that matches the schema.
"""

import pandas as pd

REQUIRED_COLUMNS = {
    "state", "district", "financial_year",
    "person_days_lakhs", "expenditure_lakhs", "avg_wage_rate"
}

STAGE1_COLUMNS = REQUIRED_COLUMNS
STAGE2_COLUMNS = STAGE1_COLUMNS | {"rainfall_mm", "crop_season_index", "rural_population_lakhs", "poverty_rate_pct"}
STAGE3_COLUMNS = STAGE2_COLUMNS | {"pmkisan_beneficiaries", "pmay_houses_sanctioned", "budget_allocated_lakhs"}


def load_csv(filepath: str, state_filter: str = None) -> pd.DataFrame:
    """
    Load unified MNREGA CSV.

    Args:
        filepath     : Path to CSV file.
        state_filter : If provided, filter to a single state e.g. "Maharashtra".
                       Pass None for all-India (Stage 2+).

    Returns:
        Raw DataFrame.
    """
    print(f"[extract] Loading: {filepath}")
    try:
        df = pd.read_csv(filepath)
    except FileNotFoundError:
        raise FileNotFoundError(f"[extract] File not found: {filepath}")

    # Normalize column names
    df.columns = [c.strip().lower().replace(" ", "_") for c in df.columns]

    _validate_columns(df)

    if state_filter:
        before = len(df)
        df = df[df["state"] == state_filter].reset_index(drop=True)
        print(f"[extract] Filtered to '{state_filter}': {before}{len(df)} rows")

    print(f"[extract] Loaded {len(df)} rows | {df['state'].nunique()} state(s) | {df['district'].nunique()} districts | {df['financial_year'].nunique()} years")
    print(f"[extract] Validation passed ✓")
    return df


def _validate_columns(df: pd.DataFrame) -> None:
    actual = set(df.columns)
    missing = REQUIRED_COLUMNS - actual
    if missing:
        raise ValueError(f"[extract] Missing required columns: {missing}")