File size: 8,760 Bytes
64e892b |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 |
"""
Script 03: Data Preprocessing
This script preprocesses the raw wildfire data:
- Creates ordinal target variable (3 classes: Small, Medium, Large)
- Drops irrelevant columns (IDs, text fields, redundant info)
- Handles missing values
- Encodes categorical variables
- Splits data into train/test sets (stratified)
Usage:
python scripts/03_preprocess.py
"""
import sys
from pathlib import Path
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
# Add project root to path
project_root = Path(__file__).parent.parent
sys.path.insert(0, str(project_root))
from config.config import (
RAW_PARQUET,
PROCESSED_PARQUET,
TRAIN_PARQUET,
TEST_PARQUET,
PROCESSED_DATA_DIR,
FIRE_SIZE_CLASS_MAPPING,
TARGET_CLASS_NAMES,
TARGET_COLUMN,
COLUMNS_TO_DROP,
CATEGORICAL_FEATURES,
RANDOM_STATE,
TEST_SIZE
)
def load_data() -> pd.DataFrame:
"""Load the raw parquet data."""
print("Loading raw data...")
df = pd.read_parquet(RAW_PARQUET)
print(f" Loaded {len(df):,} records with {len(df.columns)} columns")
return df
def create_target_variable(df: pd.DataFrame) -> pd.DataFrame:
"""Create ordinal target variable from FIRE_SIZE_CLASS."""
print("\nCreating ordinal target variable...")
# Map original classes to ordinal (0, 1, 2)
df[TARGET_COLUMN] = df['FIRE_SIZE_CLASS'].map(FIRE_SIZE_CLASS_MAPPING)
# Check for unmapped values
unmapped = df[TARGET_COLUMN].isna().sum()
if unmapped > 0:
print(f" Warning: {unmapped} records could not be mapped. Dropping...")
df = df.dropna(subset=[TARGET_COLUMN])
df[TARGET_COLUMN] = df[TARGET_COLUMN].astype(int)
# Print distribution
print("\n Target Variable Distribution:")
for val in sorted(df[TARGET_COLUMN].unique()):
count = (df[TARGET_COLUMN] == val).sum()
pct = count / len(df) * 100
print(f" {val} ({TARGET_CLASS_NAMES[val]}): {count:,} ({pct:.2f}%)")
return df
def drop_irrelevant_columns(df: pd.DataFrame) -> pd.DataFrame:
"""Drop columns not useful for prediction."""
print("\nDropping irrelevant columns...")
# Get columns that exist in the dataframe
cols_to_drop = [col for col in COLUMNS_TO_DROP if col in df.columns]
print(f" Dropping {len(cols_to_drop)} columns:")
for col in cols_to_drop[:10]:
print(f" - {col}")
if len(cols_to_drop) > 10:
print(f" ... and {len(cols_to_drop) - 10} more")
df = df.drop(columns=cols_to_drop, errors='ignore')
print(f" Remaining columns: {len(df.columns)}")
return df
def handle_missing_values(df: pd.DataFrame) -> pd.DataFrame:
"""Handle missing values in the dataset."""
print("\nHandling missing values...")
initial_rows = len(df)
# Check missing in essential columns
essential_cols = ['LATITUDE', 'LONGITUDE', 'FIRE_YEAR', 'DISCOVERY_DOY', TARGET_COLUMN]
for col in essential_cols:
if col in df.columns:
missing = df[col].isna().sum()
if missing > 0:
print(f" {col}: {missing} missing values")
# Drop rows with missing essential values
df = df.dropna(subset=[c for c in essential_cols if c in df.columns])
# For categorical features, fill with 'Unknown'
for col in CATEGORICAL_FEATURES:
if col in df.columns:
missing = df[col].isna().sum()
if missing > 0:
df[col] = df[col].fillna('Unknown')
print(f" {col}: Filled {missing} missing with 'Unknown'")
rows_dropped = initial_rows - len(df)
print(f"\n Rows dropped due to missing essential values: {rows_dropped:,}")
print(f" Remaining rows: {len(df):,}")
return df
def encode_categorical_features(df: pd.DataFrame) -> tuple[pd.DataFrame, dict]:
"""Encode categorical features using Label Encoding."""
print("\nEncoding categorical features...")
encoders = {}
for col in CATEGORICAL_FEATURES:
if col in df.columns:
le = LabelEncoder()
df[f'{col}_encoded'] = le.fit_transform(df[col].astype(str))
encoders[col] = le
n_categories = len(le.classes_)
print(f" {col}: {n_categories} categories")
return df, encoders
def select_features(df: pd.DataFrame) -> pd.DataFrame:
"""Select features for modeling."""
print("\nSelecting features for modeling...")
# Features to keep
feature_cols = [
# Numerical
'LATITUDE', 'LONGITUDE', 'FIRE_YEAR', 'DISCOVERY_DOY',
# Encoded categorical
'NWCG_REPORTING_AGENCY_encoded',
'STAT_CAUSE_DESCR_encoded',
'STATE_encoded',
'OWNER_DESCR_encoded',
# Target
TARGET_COLUMN
]
# Keep only columns that exist
available_cols = [col for col in feature_cols if col in df.columns]
# Also keep original categorical columns for reference
original_cats = [col for col in CATEGORICAL_FEATURES if col in df.columns]
all_cols = available_cols + original_cats
all_cols = list(dict.fromkeys(all_cols)) # Remove duplicates, preserve order
df = df[all_cols]
print(f" Selected {len(available_cols)} feature columns + target")
print(f" Final columns: {list(df.columns)}")
return df
def split_data(df: pd.DataFrame) -> tuple[pd.DataFrame, pd.DataFrame]:
"""Split data into train and test sets."""
print("\nSplitting data into train/test sets...")
train_df, test_df = train_test_split(
df,
test_size=TEST_SIZE,
random_state=RANDOM_STATE,
stratify=df[TARGET_COLUMN]
)
print(f" Train set: {len(train_df):,} rows ({100*(1-TEST_SIZE):.0f}%)")
print(f" Test set: {len(test_df):,} rows ({100*TEST_SIZE:.0f}%)")
# Verify stratification
print("\n Target distribution in splits:")
for name, data in [('Train', train_df), ('Test', test_df)]:
dist = data[TARGET_COLUMN].value_counts(normalize=True).sort_index() * 100
dist_str = ", ".join([f"{TARGET_CLASS_NAMES[i]}: {v:.1f}%" for i, v in dist.items()])
print(f" {name}: {dist_str}")
return train_df, test_df
def save_data(df: pd.DataFrame, train_df: pd.DataFrame, test_df: pd.DataFrame) -> None:
"""Save processed data to parquet files."""
print("\nSaving processed data...")
# Create directory if needed
PROCESSED_DATA_DIR.mkdir(parents=True, exist_ok=True)
# Save full processed data
df.to_parquet(PROCESSED_PARQUET, index=False)
print(f" Full processed data: {PROCESSED_PARQUET}")
# Save train/test splits
train_df.to_parquet(TRAIN_PARQUET, index=False)
print(f" Train data: {TRAIN_PARQUET}")
test_df.to_parquet(TEST_PARQUET, index=False)
print(f" Test data: {TEST_PARQUET}")
def print_summary(df: pd.DataFrame) -> None:
"""Print preprocessing summary."""
print("\n" + "="*60)
print("PREPROCESSING SUMMARY")
print("="*60)
print(f"\nDataset shape: {df.shape}")
print(f"\nColumn types:")
print(df.dtypes.value_counts().to_string())
print(f"\nFeature statistics:")
numerical_cols = df.select_dtypes(include=[np.number]).columns
for col in numerical_cols:
if col != TARGET_COLUMN:
print(f" {col}:")
print(f" Range: [{df[col].min():.2f}, {df[col].max():.2f}]")
print(f" Mean: {df[col].mean():.2f}, Std: {df[col].std():.2f}")
def main():
"""Main preprocessing pipeline."""
print("\n" + "="*60)
print("DATA PREPROCESSING")
print("="*60)
# Load data
df = load_data()
# Create target variable
df = create_target_variable(df)
# Drop irrelevant columns
df = drop_irrelevant_columns(df)
# Handle missing values
df = handle_missing_values(df)
# Encode categorical features
df, encoders = encode_categorical_features(df)
# Select features
df = select_features(df)
# Split data
train_df, test_df = split_data(df)
# Save data
save_data(df, train_df, test_df)
# Print summary
print_summary(df)
print("\n" + "="*60)
print("✓ Preprocessing Complete!")
print("="*60 + "\n")
if __name__ == "__main__":
main()
|