|
|
"""
|
|
|
Script 03: Data Preprocessing
|
|
|
|
|
|
This script preprocesses the raw wildfire data:
|
|
|
- Creates ordinal target variable (3 classes: Small, Medium, Large)
|
|
|
- Drops irrelevant columns (IDs, text fields, redundant info)
|
|
|
- Handles missing values
|
|
|
- Encodes categorical variables
|
|
|
- Splits data into train/test sets (stratified)
|
|
|
|
|
|
Usage:
|
|
|
python scripts/03_preprocess.py
|
|
|
"""
|
|
|
|
|
|
import sys
|
|
|
from pathlib import Path
|
|
|
|
|
|
import numpy as np
|
|
|
import pandas as pd
|
|
|
from sklearn.model_selection import train_test_split
|
|
|
from sklearn.preprocessing import LabelEncoder
|
|
|
|
|
|
|
|
|
project_root = Path(__file__).parent.parent
|
|
|
sys.path.insert(0, str(project_root))
|
|
|
|
|
|
from config.config import (
|
|
|
RAW_PARQUET,
|
|
|
PROCESSED_PARQUET,
|
|
|
TRAIN_PARQUET,
|
|
|
TEST_PARQUET,
|
|
|
PROCESSED_DATA_DIR,
|
|
|
FIRE_SIZE_CLASS_MAPPING,
|
|
|
TARGET_CLASS_NAMES,
|
|
|
TARGET_COLUMN,
|
|
|
COLUMNS_TO_DROP,
|
|
|
CATEGORICAL_FEATURES,
|
|
|
RANDOM_STATE,
|
|
|
TEST_SIZE
|
|
|
)
|
|
|
|
|
|
|
|
|
def load_data() -> pd.DataFrame:
|
|
|
"""Load the raw parquet data."""
|
|
|
print("Loading raw data...")
|
|
|
df = pd.read_parquet(RAW_PARQUET)
|
|
|
print(f" Loaded {len(df):,} records with {len(df.columns)} columns")
|
|
|
return df
|
|
|
|
|
|
|
|
|
def create_target_variable(df: pd.DataFrame) -> pd.DataFrame:
|
|
|
"""Create ordinal target variable from FIRE_SIZE_CLASS."""
|
|
|
print("\nCreating ordinal target variable...")
|
|
|
|
|
|
|
|
|
df[TARGET_COLUMN] = df['FIRE_SIZE_CLASS'].map(FIRE_SIZE_CLASS_MAPPING)
|
|
|
|
|
|
|
|
|
unmapped = df[TARGET_COLUMN].isna().sum()
|
|
|
if unmapped > 0:
|
|
|
print(f" Warning: {unmapped} records could not be mapped. Dropping...")
|
|
|
df = df.dropna(subset=[TARGET_COLUMN])
|
|
|
|
|
|
df[TARGET_COLUMN] = df[TARGET_COLUMN].astype(int)
|
|
|
|
|
|
|
|
|
print("\n Target Variable Distribution:")
|
|
|
for val in sorted(df[TARGET_COLUMN].unique()):
|
|
|
count = (df[TARGET_COLUMN] == val).sum()
|
|
|
pct = count / len(df) * 100
|
|
|
print(f" {val} ({TARGET_CLASS_NAMES[val]}): {count:,} ({pct:.2f}%)")
|
|
|
|
|
|
return df
|
|
|
|
|
|
|
|
|
def drop_irrelevant_columns(df: pd.DataFrame) -> pd.DataFrame:
|
|
|
"""Drop columns not useful for prediction."""
|
|
|
print("\nDropping irrelevant columns...")
|
|
|
|
|
|
|
|
|
cols_to_drop = [col for col in COLUMNS_TO_DROP if col in df.columns]
|
|
|
|
|
|
print(f" Dropping {len(cols_to_drop)} columns:")
|
|
|
for col in cols_to_drop[:10]:
|
|
|
print(f" - {col}")
|
|
|
if len(cols_to_drop) > 10:
|
|
|
print(f" ... and {len(cols_to_drop) - 10} more")
|
|
|
|
|
|
df = df.drop(columns=cols_to_drop, errors='ignore')
|
|
|
print(f" Remaining columns: {len(df.columns)}")
|
|
|
|
|
|
return df
|
|
|
|
|
|
|
|
|
def handle_missing_values(df: pd.DataFrame) -> pd.DataFrame:
|
|
|
"""Handle missing values in the dataset."""
|
|
|
print("\nHandling missing values...")
|
|
|
|
|
|
initial_rows = len(df)
|
|
|
|
|
|
|
|
|
essential_cols = ['LATITUDE', 'LONGITUDE', 'FIRE_YEAR', 'DISCOVERY_DOY', TARGET_COLUMN]
|
|
|
for col in essential_cols:
|
|
|
if col in df.columns:
|
|
|
missing = df[col].isna().sum()
|
|
|
if missing > 0:
|
|
|
print(f" {col}: {missing} missing values")
|
|
|
|
|
|
|
|
|
df = df.dropna(subset=[c for c in essential_cols if c in df.columns])
|
|
|
|
|
|
|
|
|
for col in CATEGORICAL_FEATURES:
|
|
|
if col in df.columns:
|
|
|
missing = df[col].isna().sum()
|
|
|
if missing > 0:
|
|
|
df[col] = df[col].fillna('Unknown')
|
|
|
print(f" {col}: Filled {missing} missing with 'Unknown'")
|
|
|
|
|
|
rows_dropped = initial_rows - len(df)
|
|
|
print(f"\n Rows dropped due to missing essential values: {rows_dropped:,}")
|
|
|
print(f" Remaining rows: {len(df):,}")
|
|
|
|
|
|
return df
|
|
|
|
|
|
|
|
|
def encode_categorical_features(df: pd.DataFrame) -> tuple[pd.DataFrame, dict]:
|
|
|
"""Encode categorical features using Label Encoding."""
|
|
|
print("\nEncoding categorical features...")
|
|
|
|
|
|
encoders = {}
|
|
|
|
|
|
for col in CATEGORICAL_FEATURES:
|
|
|
if col in df.columns:
|
|
|
le = LabelEncoder()
|
|
|
df[f'{col}_encoded'] = le.fit_transform(df[col].astype(str))
|
|
|
encoders[col] = le
|
|
|
|
|
|
n_categories = len(le.classes_)
|
|
|
print(f" {col}: {n_categories} categories")
|
|
|
|
|
|
return df, encoders
|
|
|
|
|
|
|
|
|
def select_features(df: pd.DataFrame) -> pd.DataFrame:
|
|
|
"""Select features for modeling."""
|
|
|
print("\nSelecting features for modeling...")
|
|
|
|
|
|
|
|
|
feature_cols = [
|
|
|
|
|
|
'LATITUDE', 'LONGITUDE', 'FIRE_YEAR', 'DISCOVERY_DOY',
|
|
|
|
|
|
'NWCG_REPORTING_AGENCY_encoded',
|
|
|
'STAT_CAUSE_DESCR_encoded',
|
|
|
'STATE_encoded',
|
|
|
'OWNER_DESCR_encoded',
|
|
|
|
|
|
TARGET_COLUMN
|
|
|
]
|
|
|
|
|
|
|
|
|
available_cols = [col for col in feature_cols if col in df.columns]
|
|
|
|
|
|
|
|
|
original_cats = [col for col in CATEGORICAL_FEATURES if col in df.columns]
|
|
|
|
|
|
all_cols = available_cols + original_cats
|
|
|
all_cols = list(dict.fromkeys(all_cols))
|
|
|
|
|
|
df = df[all_cols]
|
|
|
|
|
|
print(f" Selected {len(available_cols)} feature columns + target")
|
|
|
print(f" Final columns: {list(df.columns)}")
|
|
|
|
|
|
return df
|
|
|
|
|
|
|
|
|
def split_data(df: pd.DataFrame) -> tuple[pd.DataFrame, pd.DataFrame]:
|
|
|
"""Split data into train and test sets."""
|
|
|
print("\nSplitting data into train/test sets...")
|
|
|
|
|
|
train_df, test_df = train_test_split(
|
|
|
df,
|
|
|
test_size=TEST_SIZE,
|
|
|
random_state=RANDOM_STATE,
|
|
|
stratify=df[TARGET_COLUMN]
|
|
|
)
|
|
|
|
|
|
print(f" Train set: {len(train_df):,} rows ({100*(1-TEST_SIZE):.0f}%)")
|
|
|
print(f" Test set: {len(test_df):,} rows ({100*TEST_SIZE:.0f}%)")
|
|
|
|
|
|
|
|
|
print("\n Target distribution in splits:")
|
|
|
for name, data in [('Train', train_df), ('Test', test_df)]:
|
|
|
dist = data[TARGET_COLUMN].value_counts(normalize=True).sort_index() * 100
|
|
|
dist_str = ", ".join([f"{TARGET_CLASS_NAMES[i]}: {v:.1f}%" for i, v in dist.items()])
|
|
|
print(f" {name}: {dist_str}")
|
|
|
|
|
|
return train_df, test_df
|
|
|
|
|
|
|
|
|
def save_data(df: pd.DataFrame, train_df: pd.DataFrame, test_df: pd.DataFrame) -> None:
|
|
|
"""Save processed data to parquet files."""
|
|
|
print("\nSaving processed data...")
|
|
|
|
|
|
|
|
|
PROCESSED_DATA_DIR.mkdir(parents=True, exist_ok=True)
|
|
|
|
|
|
|
|
|
df.to_parquet(PROCESSED_PARQUET, index=False)
|
|
|
print(f" Full processed data: {PROCESSED_PARQUET}")
|
|
|
|
|
|
|
|
|
train_df.to_parquet(TRAIN_PARQUET, index=False)
|
|
|
print(f" Train data: {TRAIN_PARQUET}")
|
|
|
|
|
|
test_df.to_parquet(TEST_PARQUET, index=False)
|
|
|
print(f" Test data: {TEST_PARQUET}")
|
|
|
|
|
|
|
|
|
def print_summary(df: pd.DataFrame) -> None:
|
|
|
"""Print preprocessing summary."""
|
|
|
print("\n" + "="*60)
|
|
|
print("PREPROCESSING SUMMARY")
|
|
|
print("="*60)
|
|
|
|
|
|
print(f"\nDataset shape: {df.shape}")
|
|
|
print(f"\nColumn types:")
|
|
|
print(df.dtypes.value_counts().to_string())
|
|
|
|
|
|
print(f"\nFeature statistics:")
|
|
|
numerical_cols = df.select_dtypes(include=[np.number]).columns
|
|
|
for col in numerical_cols:
|
|
|
if col != TARGET_COLUMN:
|
|
|
print(f" {col}:")
|
|
|
print(f" Range: [{df[col].min():.2f}, {df[col].max():.2f}]")
|
|
|
print(f" Mean: {df[col].mean():.2f}, Std: {df[col].std():.2f}")
|
|
|
|
|
|
|
|
|
def main():
|
|
|
"""Main preprocessing pipeline."""
|
|
|
print("\n" + "="*60)
|
|
|
print("DATA PREPROCESSING")
|
|
|
print("="*60)
|
|
|
|
|
|
|
|
|
df = load_data()
|
|
|
|
|
|
|
|
|
df = create_target_variable(df)
|
|
|
|
|
|
|
|
|
df = drop_irrelevant_columns(df)
|
|
|
|
|
|
|
|
|
df = handle_missing_values(df)
|
|
|
|
|
|
|
|
|
df, encoders = encode_categorical_features(df)
|
|
|
|
|
|
|
|
|
df = select_features(df)
|
|
|
|
|
|
|
|
|
train_df, test_df = split_data(df)
|
|
|
|
|
|
|
|
|
save_data(df, train_df, test_df)
|
|
|
|
|
|
|
|
|
print_summary(df)
|
|
|
|
|
|
print("\n" + "="*60)
|
|
|
print("✓ Preprocessing Complete!")
|
|
|
print("="*60 + "\n")
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
main()
|
|
|
|