Spaces:
Sleeping
Sleeping
| """ | |
| preprocessing.py — Feature engineering and data preprocessing pipeline. | |
| """ | |
| import pandas as pd | |
| import numpy as np | |
| from sklearn.preprocessing import RobustScaler, OneHotEncoder | |
| from sklearn.impute import SimpleImputer | |
| from sklearn.compose import ColumnTransformer | |
| from sklearn.pipeline import Pipeline | |
| from config import COLUMNS_TO_DROP_BASE, SKEW_THRESHOLD | |
| def engineer_features(df: pd.DataFrame) -> pd.DataFrame: | |
| df = df.copy() | |
| df["TotalSF"] = ( | |
| df.get("TotalBsmtSF", pd.Series(0, index=df.index)).fillna(0) | |
| + df.get("1stFlrSF", pd.Series(0, index=df.index)).fillna(0) | |
| + df.get("2ndFlrSF", pd.Series(0, index=df.index)).fillna(0) | |
| ) | |
| df["TotalBath"] = ( | |
| df.get("FullBath", pd.Series(0, index=df.index)).fillna(0) | |
| + 0.5 * df.get("HalfBath", pd.Series(0, index=df.index)).fillna(0) | |
| + df.get("BsmtFullBath", pd.Series(0, index=df.index)).fillna(0) | |
| + 0.5 * df.get("BsmtHalfBath", pd.Series(0, index=df.index)).fillna(0) | |
| ) | |
| df["HasPool"] = (df.get("PoolArea", pd.Series(0, index=df.index)) > 0).astype(int) | |
| df["HasGarage"] = df.get("GarageArea", pd.Series(np.nan, index=df.index)).notnull().astype(int) | |
| df["HasBsmt"] = df.get("TotalBsmtSF", pd.Series(np.nan, index=df.index)).notnull().astype(int) | |
| df["IsRemodeled"] = ( | |
| df.get("YearRemodAdd", pd.Series(0, index=df.index)) | |
| != df.get("YearBuilt", pd.Series(0, index=df.index)) | |
| ).astype(int) | |
| return df | |
| def preprocess_combined(df: pd.DataFrame) -> pd.DataFrame: | |
| # Step 1 — Drop unwanted / high-NaN columns | |
| to_drop = [c for c in COLUMNS_TO_DROP_BASE if c in df.columns] | |
| missing_pct = df.isnull().sum() * 100 / len(df) | |
| to_drop += missing_pct[missing_pct > 50].index.tolist() | |
| df = df.drop(columns=list(set(to_drop)), errors="ignore") | |
| # Step 2 — Feature engineering | |
| df = engineer_features(df) | |
| # Step 3 — Fix dtypes: numeric columns must be float, not object | |
| for col in df.columns: | |
| if col in df.select_dtypes(include=["object"]).columns: | |
| # Try to convert to numeric; if it fails keep as string | |
| converted = pd.to_numeric(df[col], errors="coerce") | |
| if converted.notna().sum() > 0.5 * len(df): | |
| df[col] = converted | |
| # Step 4 — Log-transform skewed numerics | |
| num_cols = df.select_dtypes(include=[np.number]).columns | |
| skewed = df[num_cols].apply(lambda x: x.dropna().skew()) | |
| for feat in skewed[abs(skewed) > SKEW_THRESHOLD].index: | |
| df[feat] = np.log1p(df[feat].clip(lower=0)) | |
| # Step 5 — Fill missing values | |
| for col in df.select_dtypes(include=["object"]).columns: | |
| df[col] = df[col].fillna("Missing").astype(str) | |
| for col in df.select_dtypes(include=[np.number]).columns: | |
| df[col] = df[col].fillna(df[col].median()).astype(float) | |
| return df | |
| def build_preprocessor(numerical_features: list, categorical_features: list) -> ColumnTransformer: | |
| num_pipeline = Pipeline([ | |
| ("imputer", SimpleImputer(strategy="median")), | |
| ("scaler", RobustScaler()), | |
| ]) | |
| cat_pipeline = Pipeline([ | |
| ("onehot", OneHotEncoder(handle_unknown="ignore", sparse_output=False)), | |
| ]) | |
| return ColumnTransformer([ | |
| ("num", num_pipeline, numerical_features), | |
| ("cat", cat_pipeline, categorical_features), | |
| ]) |