wildfirez / scripts /03_preprocess.py
zakaneki's picture
first commit
64e892b verified
"""
Script 03: Data Preprocessing
This script preprocesses the raw wildfire data:
- Creates ordinal target variable (3 classes: Small, Medium, Large)
- Drops irrelevant columns (IDs, text fields, redundant info)
- Handles missing values
- Encodes categorical variables
- Splits data into train/test sets (stratified)
Usage:
python scripts/03_preprocess.py
"""
import sys
from pathlib import Path
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
# Add project root to path
project_root = Path(__file__).parent.parent
sys.path.insert(0, str(project_root))
from config.config import (
RAW_PARQUET,
PROCESSED_PARQUET,
TRAIN_PARQUET,
TEST_PARQUET,
PROCESSED_DATA_DIR,
FIRE_SIZE_CLASS_MAPPING,
TARGET_CLASS_NAMES,
TARGET_COLUMN,
COLUMNS_TO_DROP,
CATEGORICAL_FEATURES,
RANDOM_STATE,
TEST_SIZE
)
def load_data() -> pd.DataFrame:
"""Load the raw parquet data."""
print("Loading raw data...")
df = pd.read_parquet(RAW_PARQUET)
print(f" Loaded {len(df):,} records with {len(df.columns)} columns")
return df
def create_target_variable(df: pd.DataFrame) -> pd.DataFrame:
"""Create ordinal target variable from FIRE_SIZE_CLASS."""
print("\nCreating ordinal target variable...")
# Map original classes to ordinal (0, 1, 2)
df[TARGET_COLUMN] = df['FIRE_SIZE_CLASS'].map(FIRE_SIZE_CLASS_MAPPING)
# Check for unmapped values
unmapped = df[TARGET_COLUMN].isna().sum()
if unmapped > 0:
print(f" Warning: {unmapped} records could not be mapped. Dropping...")
df = df.dropna(subset=[TARGET_COLUMN])
df[TARGET_COLUMN] = df[TARGET_COLUMN].astype(int)
# Print distribution
print("\n Target Variable Distribution:")
for val in sorted(df[TARGET_COLUMN].unique()):
count = (df[TARGET_COLUMN] == val).sum()
pct = count / len(df) * 100
print(f" {val} ({TARGET_CLASS_NAMES[val]}): {count:,} ({pct:.2f}%)")
return df
def drop_irrelevant_columns(df: pd.DataFrame) -> pd.DataFrame:
"""Drop columns not useful for prediction."""
print("\nDropping irrelevant columns...")
# Get columns that exist in the dataframe
cols_to_drop = [col for col in COLUMNS_TO_DROP if col in df.columns]
print(f" Dropping {len(cols_to_drop)} columns:")
for col in cols_to_drop[:10]:
print(f" - {col}")
if len(cols_to_drop) > 10:
print(f" ... and {len(cols_to_drop) - 10} more")
df = df.drop(columns=cols_to_drop, errors='ignore')
print(f" Remaining columns: {len(df.columns)}")
return df
def handle_missing_values(df: pd.DataFrame) -> pd.DataFrame:
"""Handle missing values in the dataset."""
print("\nHandling missing values...")
initial_rows = len(df)
# Check missing in essential columns
essential_cols = ['LATITUDE', 'LONGITUDE', 'FIRE_YEAR', 'DISCOVERY_DOY', TARGET_COLUMN]
for col in essential_cols:
if col in df.columns:
missing = df[col].isna().sum()
if missing > 0:
print(f" {col}: {missing} missing values")
# Drop rows with missing essential values
df = df.dropna(subset=[c for c in essential_cols if c in df.columns])
# For categorical features, fill with 'Unknown'
for col in CATEGORICAL_FEATURES:
if col in df.columns:
missing = df[col].isna().sum()
if missing > 0:
df[col] = df[col].fillna('Unknown')
print(f" {col}: Filled {missing} missing with 'Unknown'")
rows_dropped = initial_rows - len(df)
print(f"\n Rows dropped due to missing essential values: {rows_dropped:,}")
print(f" Remaining rows: {len(df):,}")
return df
def encode_categorical_features(df: pd.DataFrame) -> tuple[pd.DataFrame, dict]:
"""Encode categorical features using Label Encoding."""
print("\nEncoding categorical features...")
encoders = {}
for col in CATEGORICAL_FEATURES:
if col in df.columns:
le = LabelEncoder()
df[f'{col}_encoded'] = le.fit_transform(df[col].astype(str))
encoders[col] = le
n_categories = len(le.classes_)
print(f" {col}: {n_categories} categories")
return df, encoders
def select_features(df: pd.DataFrame) -> pd.DataFrame:
"""Select features for modeling."""
print("\nSelecting features for modeling...")
# Features to keep
feature_cols = [
# Numerical
'LATITUDE', 'LONGITUDE', 'FIRE_YEAR', 'DISCOVERY_DOY',
# Encoded categorical
'NWCG_REPORTING_AGENCY_encoded',
'STAT_CAUSE_DESCR_encoded',
'STATE_encoded',
'OWNER_DESCR_encoded',
# Target
TARGET_COLUMN
]
# Keep only columns that exist
available_cols = [col for col in feature_cols if col in df.columns]
# Also keep original categorical columns for reference
original_cats = [col for col in CATEGORICAL_FEATURES if col in df.columns]
all_cols = available_cols + original_cats
all_cols = list(dict.fromkeys(all_cols)) # Remove duplicates, preserve order
df = df[all_cols]
print(f" Selected {len(available_cols)} feature columns + target")
print(f" Final columns: {list(df.columns)}")
return df
def split_data(df: pd.DataFrame) -> tuple[pd.DataFrame, pd.DataFrame]:
"""Split data into train and test sets."""
print("\nSplitting data into train/test sets...")
train_df, test_df = train_test_split(
df,
test_size=TEST_SIZE,
random_state=RANDOM_STATE,
stratify=df[TARGET_COLUMN]
)
print(f" Train set: {len(train_df):,} rows ({100*(1-TEST_SIZE):.0f}%)")
print(f" Test set: {len(test_df):,} rows ({100*TEST_SIZE:.0f}%)")
# Verify stratification
print("\n Target distribution in splits:")
for name, data in [('Train', train_df), ('Test', test_df)]:
dist = data[TARGET_COLUMN].value_counts(normalize=True).sort_index() * 100
dist_str = ", ".join([f"{TARGET_CLASS_NAMES[i]}: {v:.1f}%" for i, v in dist.items()])
print(f" {name}: {dist_str}")
return train_df, test_df
def save_data(df: pd.DataFrame, train_df: pd.DataFrame, test_df: pd.DataFrame) -> None:
"""Save processed data to parquet files."""
print("\nSaving processed data...")
# Create directory if needed
PROCESSED_DATA_DIR.mkdir(parents=True, exist_ok=True)
# Save full processed data
df.to_parquet(PROCESSED_PARQUET, index=False)
print(f" Full processed data: {PROCESSED_PARQUET}")
# Save train/test splits
train_df.to_parquet(TRAIN_PARQUET, index=False)
print(f" Train data: {TRAIN_PARQUET}")
test_df.to_parquet(TEST_PARQUET, index=False)
print(f" Test data: {TEST_PARQUET}")
def print_summary(df: pd.DataFrame) -> None:
"""Print preprocessing summary."""
print("\n" + "="*60)
print("PREPROCESSING SUMMARY")
print("="*60)
print(f"\nDataset shape: {df.shape}")
print(f"\nColumn types:")
print(df.dtypes.value_counts().to_string())
print(f"\nFeature statistics:")
numerical_cols = df.select_dtypes(include=[np.number]).columns
for col in numerical_cols:
if col != TARGET_COLUMN:
print(f" {col}:")
print(f" Range: [{df[col].min():.2f}, {df[col].max():.2f}]")
print(f" Mean: {df[col].mean():.2f}, Std: {df[col].std():.2f}")
def main():
"""Main preprocessing pipeline."""
print("\n" + "="*60)
print("DATA PREPROCESSING")
print("="*60)
# Load data
df = load_data()
# Create target variable
df = create_target_variable(df)
# Drop irrelevant columns
df = drop_irrelevant_columns(df)
# Handle missing values
df = handle_missing_values(df)
# Encode categorical features
df, encoders = encode_categorical_features(df)
# Select features
df = select_features(df)
# Split data
train_df, test_df = split_data(df)
# Save data
save_data(df, train_df, test_df)
# Print summary
print_summary(df)
print("\n" + "="*60)
print("✓ Preprocessing Complete!")
print("="*60 + "\n")
if __name__ == "__main__":
main()