Spaces:
Running
Running
| """ | |
| Advanced Preprocessing Tools | |
| Tools for handling imbalanced data, feature scaling, and strategic data splitting. | |
| """ | |
| import polars as pl | |
| import numpy as np | |
| from typing import Dict, Any, List, Optional, Tuple | |
| from pathlib import Path | |
| import sys | |
| import os | |
| import joblib | |
| import warnings | |
| warnings.filterwarnings('ignore') | |
| # Add parent directory to path for imports | |
| sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) | |
| from sklearn.model_selection import train_test_split | |
| from sklearn.preprocessing import StandardScaler, MinMaxScaler, RobustScaler, LabelEncoder | |
| from imblearn.over_sampling import SMOTE, ADASYN, BorderlineSMOTE | |
| from imblearn.under_sampling import RandomUnderSampler, TomekLinks, EditedNearestNeighbours | |
| from imblearn.combine import SMOTETomek, SMOTEENN | |
| from collections import Counter | |
| from ..utils.polars_helpers import ( | |
| load_dataframe, save_dataframe, get_numeric_columns, | |
| get_categorical_columns, split_features_target | |
| ) | |
| from ..utils.validation import ( | |
| validate_file_exists, validate_file_format, validate_dataframe, | |
| validate_column_exists | |
| ) | |
| def handle_imbalanced_data( | |
| file_path: str, | |
| target_col: str, | |
| strategy: str = "smote", | |
| sampling_ratio: float = 1.0, | |
| output_path: str = None, | |
| random_state: int = 42 | |
| ) -> Dict[str, Any]: | |
| """ | |
| Handle imbalanced datasets using various resampling techniques. | |
| Args: | |
| file_path: Path to dataset | |
| target_col: Target column name | |
| strategy: Resampling strategy: | |
| - 'smote': Synthetic Minority Over-sampling (SMOTE) | |
| - 'adasyn': Adaptive Synthetic Sampling | |
| - 'borderline_smote': Borderline SMOTE variant | |
| - 'random_undersample': Random undersampling | |
| - 'tomek': Tomek Links undersampling | |
| - 'smote_tomek': Combined SMOTE + Tomek Links | |
| - 'smote_enn': Combined SMOTE + Edited Nearest Neighbours | |
| - 'class_weights': Return class weights (no resampling) | |
| sampling_ratio: Ratio of minority to majority class (0.5 = 50%, 1.0 = 100%) | |
| output_path: Path to save balanced dataset | |
| random_state: Random seed | |
| Returns: | |
| Dictionary with balancing results and class distributions | |
| """ | |
| # Validation | |
| validate_file_exists(file_path) | |
| validate_file_format(file_path) | |
| # Load data | |
| df = load_dataframe(file_path) | |
| validate_dataframe(df) | |
| validate_column_exists(df, target_col) | |
| # Get original class distribution | |
| original_dist = df[target_col].value_counts().to_dict() | |
| original_counts = dict(sorted(original_dist.items())) | |
| print(f"📊 Original class distribution: {original_counts}") | |
| # Calculate imbalance ratio | |
| class_counts = list(original_counts.values()) | |
| imbalance_ratio = max(class_counts) / min(class_counts) | |
| if imbalance_ratio < 1.5: | |
| return { | |
| 'status': 'skipped', | |
| 'message': 'Dataset is already balanced (ratio < 1.5)', | |
| 'original_distribution': original_counts, | |
| 'imbalance_ratio': float(imbalance_ratio) | |
| } | |
| # Prepare data | |
| X, y = split_features_target(df, target_col) | |
| # Handle class weights strategy (no resampling) | |
| if strategy == "class_weights": | |
| from sklearn.utils.class_weight import compute_class_weight | |
| classes = np.unique(y) | |
| weights = compute_class_weight('balanced', classes=classes, y=y) | |
| class_weights = dict(zip(classes, weights)) | |
| return { | |
| 'status': 'success', | |
| 'strategy': 'class_weights', | |
| 'class_weights': {str(k): float(v) for k, v in class_weights.items()}, | |
| 'original_distribution': original_counts, | |
| 'imbalance_ratio': float(imbalance_ratio), | |
| 'recommendation': 'Use class_weight parameter in your model training' | |
| } | |
| # Create resampler based on strategy | |
| sampling_strategy = sampling_ratio if sampling_ratio < 1.0 else 'auto' | |
| if strategy == "smote": | |
| resampler = SMOTE(sampling_strategy=sampling_strategy, random_state=random_state) | |
| elif strategy == "adasyn": | |
| resampler = ADASYN(sampling_strategy=sampling_strategy, random_state=random_state) | |
| elif strategy == "borderline_smote": | |
| resampler = BorderlineSMOTE(sampling_strategy=sampling_strategy, random_state=random_state) | |
| elif strategy == "random_undersample": | |
| resampler = RandomUnderSampler(sampling_strategy=sampling_strategy, random_state=random_state) | |
| elif strategy == "tomek": | |
| resampler = TomekLinks(sampling_strategy='auto') | |
| elif strategy == "smote_tomek": | |
| resampler = SMOTETomek(sampling_strategy=sampling_strategy, random_state=random_state) | |
| elif strategy == "smote_enn": | |
| resampler = SMOTEENN(sampling_strategy=sampling_strategy, random_state=random_state) | |
| else: | |
| raise ValueError(f"Unsupported strategy: {strategy}") | |
| # Perform resampling | |
| print(f"⚖️ Applying {strategy} resampling...") | |
| X_resampled, y_resampled = resampler.fit_resample(X, y) | |
| # Get new class distribution | |
| new_counts = dict(Counter(y_resampled)) | |
| new_counts = dict(sorted(new_counts.items())) | |
| print(f"✅ New class distribution: {new_counts}") | |
| # Calculate changes | |
| total_original = sum(original_counts.values()) | |
| total_new = sum(new_counts.values()) | |
| changes = { | |
| str(cls): { | |
| 'original': original_counts.get(cls, 0), | |
| 'new': new_counts.get(cls, 0), | |
| 'change': new_counts.get(cls, 0) - original_counts.get(cls, 0) | |
| } | |
| for cls in set(list(original_counts.keys()) + list(new_counts.keys())) | |
| } | |
| # Create balanced dataframe | |
| feature_cols = [col for col in df.columns if col != target_col] | |
| balanced_data = {col: X_resampled[:, i] for i, col in enumerate(feature_cols)} | |
| balanced_data[target_col] = y_resampled | |
| balanced_df = pl.DataFrame(balanced_data) | |
| # Save if output path provided | |
| if output_path: | |
| save_dataframe(balanced_df, output_path) | |
| print(f"💾 Balanced dataset saved to: {output_path}") | |
| return { | |
| 'status': 'success', | |
| 'strategy': strategy, | |
| 'original_distribution': original_counts, | |
| 'new_distribution': new_counts, | |
| 'changes_by_class': changes, | |
| 'total_samples_before': total_original, | |
| 'total_samples_after': total_new, | |
| 'sample_change': f"{'+' if total_new > total_original else ''}{total_new - total_original}", | |
| 'new_imbalance_ratio': float(max(new_counts.values()) / min(new_counts.values())), | |
| 'output_path': output_path | |
| } | |
| def perform_feature_scaling( | |
| file_path: str, | |
| scaler_type: str = "standard", | |
| columns: Optional[List[str]] = None, | |
| output_path: Optional[str] = None, | |
| scaler_save_path: Optional[str] = None | |
| ) -> Dict[str, Any]: | |
| """ | |
| Scale features using various normalization techniques. | |
| Args: | |
| file_path: Path to dataset | |
| scaler_type: Scaling method: | |
| - 'standard': StandardScaler (mean=0, std=1) | |
| - 'minmax': MinMaxScaler (range 0-1) | |
| - 'robust': RobustScaler (median, IQR - robust to outliers) | |
| columns: List of columns to scale (None = all numeric columns) | |
| output_path: Path to save scaled dataset | |
| scaler_save_path: Path to save fitted scaler for future use | |
| Returns: | |
| Dictionary with scaling statistics | |
| """ | |
| # Validation | |
| validate_file_exists(file_path) | |
| validate_file_format(file_path) | |
| # Load data | |
| df = load_dataframe(file_path) | |
| validate_dataframe(df) | |
| # Get numeric columns if not specified | |
| if columns is None: | |
| columns = get_numeric_columns(df) | |
| print(f"🔢 Auto-detected {len(columns)} numeric columns for scaling") | |
| else: | |
| for col in columns: | |
| validate_column_exists(df, col) | |
| if not columns: | |
| return { | |
| 'status': 'skipped', | |
| 'message': 'No numeric columns found to scale' | |
| } | |
| # Create scaler | |
| if scaler_type == "standard": | |
| scaler = StandardScaler() | |
| elif scaler_type == "minmax": | |
| scaler = MinMaxScaler() | |
| elif scaler_type == "robust": | |
| scaler = RobustScaler() | |
| else: | |
| raise ValueError(f"Unsupported scaler_type: {scaler_type}") | |
| # Get original statistics | |
| original_stats = {} | |
| for col in columns: | |
| col_data = df[col].to_numpy() | |
| original_stats[col] = { | |
| 'mean': float(np.mean(col_data)), | |
| 'std': float(np.std(col_data)), | |
| 'min': float(np.min(col_data)), | |
| 'max': float(np.max(col_data)), | |
| 'median': float(np.median(col_data)) | |
| } | |
| # Fit and transform | |
| print(f"📏 Applying {scaler_type} scaling to {len(columns)} columns...") | |
| scaled_data = scaler.fit_transform(df[columns].to_numpy()) | |
| # Create scaled dataframe | |
| df_scaled = df.clone() | |
| for i, col in enumerate(columns): | |
| df_scaled = df_scaled.with_columns( | |
| pl.Series(col, scaled_data[:, i]) | |
| ) | |
| # Get new statistics | |
| new_stats = {} | |
| for i, col in enumerate(columns): | |
| new_stats[col] = { | |
| 'mean': float(np.mean(scaled_data[:, i])), | |
| 'std': float(np.std(scaled_data[:, i])), | |
| 'min': float(np.min(scaled_data[:, i])), | |
| 'max': float(np.max(scaled_data[:, i])), | |
| 'median': float(np.median(scaled_data[:, i])) | |
| } | |
| # Save scaled data | |
| if output_path: | |
| save_dataframe(df_scaled, output_path) | |
| print(f"💾 Scaled dataset saved to: {output_path}") | |
| # Save scaler | |
| if scaler_save_path: | |
| os.makedirs(os.path.dirname(scaler_save_path), exist_ok=True) | |
| joblib.dump(scaler, scaler_save_path) | |
| print(f"💾 Scaler saved to: {scaler_save_path}") | |
| return { | |
| 'status': 'success', | |
| 'scaler_type': scaler_type, | |
| 'columns_scaled': columns, | |
| 'n_columns': len(columns), | |
| 'original_stats': original_stats, | |
| 'scaled_stats': new_stats, | |
| 'output_path': output_path, | |
| 'scaler_path': scaler_save_path | |
| } | |
| def split_data_strategically( | |
| file_path: str, | |
| target_col: Optional[str] = None, | |
| split_type: str = "train_test", | |
| test_size: float = 0.2, | |
| val_size: float = 0.1, | |
| stratify: bool = True, | |
| time_col: Optional[str] = None, | |
| group_col: Optional[str] = None, | |
| random_state: int = 42, | |
| output_dir: Optional[str] = None | |
| ) -> Dict[str, Any]: | |
| """ | |
| Perform strategic data splitting with multiple options. | |
| Args: | |
| file_path: Path to dataset | |
| target_col: Target column (for stratification) | |
| split_type: Split strategy: | |
| - 'train_test': Train/test split | |
| - 'train_val_test': Train/validation/test split | |
| - 'time_based': Time-based split (requires time_col) | |
| - 'group_based': Group-based split (requires group_col, prevents leakage) | |
| test_size: Test set proportion | |
| val_size: Validation set proportion (for train_val_test) | |
| stratify: Whether to stratify by target | |
| time_col: Column to use for time-based splitting | |
| group_col: Column to use for group-based splitting | |
| random_state: Random seed | |
| output_dir: Directory to save split datasets | |
| Returns: | |
| Dictionary with split information and file paths | |
| """ | |
| # Validation | |
| validate_file_exists(file_path) | |
| validate_file_format(file_path) | |
| # Load data | |
| df = load_dataframe(file_path) | |
| validate_dataframe(df) | |
| if target_col: | |
| validate_column_exists(df, target_col) | |
| n_samples = len(df) | |
| # Time-based split | |
| if split_type == "time_based": | |
| if not time_col: | |
| raise ValueError("time_col is required for time_based split") | |
| validate_column_exists(df, time_col) | |
| # Sort by time | |
| df = df.sort(time_col) | |
| # Calculate split points | |
| test_idx = int(n_samples * (1 - test_size)) | |
| if output_dir: | |
| train_df = df[:test_idx] | |
| test_df = df[test_idx:] | |
| os.makedirs(output_dir, exist_ok=True) | |
| train_path = os.path.join(output_dir, "train.csv") | |
| test_path = os.path.join(output_dir, "test.csv") | |
| save_dataframe(train_df, train_path) | |
| save_dataframe(test_df, test_path) | |
| print(f"✅ Time-based split: train={len(train_df)}, test={len(test_df)}") | |
| return { | |
| 'status': 'success', | |
| 'split_type': 'time_based', | |
| 'train_size': len(train_df), | |
| 'test_size': len(test_df), | |
| 'train_path': train_path, | |
| 'test_path': test_path, | |
| 'time_column': time_col | |
| } | |
| # Group-based split | |
| elif split_type == "group_based": | |
| if not group_col: | |
| raise ValueError("group_col is required for group_based split") | |
| validate_column_exists(df, group_col) | |
| # Get unique groups | |
| unique_groups = df[group_col].unique().to_list() | |
| n_groups = len(unique_groups) | |
| # Split groups | |
| np.random.seed(random_state) | |
| np.random.shuffle(unique_groups) | |
| test_n_groups = max(1, int(n_groups * test_size)) | |
| test_groups = unique_groups[:test_n_groups] | |
| train_groups = unique_groups[test_n_groups:] | |
| train_df = df.filter(pl.col(group_col).is_in(train_groups)) | |
| test_df = df.filter(pl.col(group_col).is_in(test_groups)) | |
| if output_dir: | |
| os.makedirs(output_dir, exist_ok=True) | |
| train_path = os.path.join(output_dir, "train.csv") | |
| test_path = os.path.join(output_dir, "test.csv") | |
| save_dataframe(train_df, train_path) | |
| save_dataframe(test_df, test_path) | |
| print(f"✅ Group-based split: train={len(train_df)}, test={len(test_df)}") | |
| return { | |
| 'status': 'success', | |
| 'split_type': 'group_based', | |
| 'train_size': len(train_df), | |
| 'test_size': len(test_df), | |
| 'train_groups': len(train_groups), | |
| 'test_groups': len(test_groups), | |
| 'train_path': train_path, | |
| 'test_path': test_path, | |
| 'group_column': group_col | |
| } | |
| # Standard train/test split | |
| elif split_type == "train_test": | |
| X, y = split_features_target(df, target_col) if target_col else (df.to_numpy(), None) | |
| stratify_y = y if (stratify and target_col and len(np.unique(y)) < 20) else None | |
| if target_col: | |
| X_train, X_test, y_train, y_test = train_test_split( | |
| X, y, test_size=test_size, random_state=random_state, stratify=stratify_y | |
| ) | |
| # Reconstruct dataframes | |
| feature_cols = [col for col in df.columns if col != target_col] | |
| train_data = {col: X_train[:, i] for i, col in enumerate(feature_cols)} | |
| train_data[target_col] = y_train | |
| train_df = pl.DataFrame(train_data) | |
| test_data = {col: X_test[:, i] for i, col in enumerate(feature_cols)} | |
| test_data[target_col] = y_test | |
| test_df = pl.DataFrame(test_data) | |
| else: | |
| indices = np.arange(len(df)) | |
| train_idx, test_idx = train_test_split( | |
| indices, test_size=test_size, random_state=random_state | |
| ) | |
| train_df = df[train_idx] | |
| test_df = df[test_idx] | |
| if output_dir: | |
| os.makedirs(output_dir, exist_ok=True) | |
| train_path = os.path.join(output_dir, "train.csv") | |
| test_path = os.path.join(output_dir, "test.csv") | |
| save_dataframe(train_df, train_path) | |
| save_dataframe(test_df, test_path) | |
| print(f"✅ Train/test split: train={len(train_df)}, test={len(test_df)}") | |
| return { | |
| 'status': 'success', | |
| 'split_type': 'train_test', | |
| 'train_size': len(train_df), | |
| 'test_size': len(test_df), | |
| 'stratified': bool(stratify_y is not None), | |
| 'train_path': train_path, | |
| 'test_path': test_path | |
| } | |
| # Train/val/test split | |
| elif split_type == "train_val_test": | |
| X, y = split_features_target(df, target_col) if target_col else (df.to_numpy(), None) | |
| stratify_y = y if (stratify and target_col and len(np.unique(y)) < 20) else None | |
| # First split: train+val vs test | |
| if target_col: | |
| X_temp, X_test, y_temp, y_test = train_test_split( | |
| X, y, test_size=test_size, random_state=random_state, stratify=stratify_y | |
| ) | |
| # Second split: train vs val | |
| val_ratio = val_size / (1 - test_size) | |
| stratify_temp = y_temp if stratify_y is not None else None | |
| X_train, X_val, y_train, y_val = train_test_split( | |
| X_temp, y_temp, test_size=val_ratio, random_state=random_state, stratify=stratify_temp | |
| ) | |
| # Reconstruct dataframes | |
| feature_cols = [col for col in df.columns if col != target_col] | |
| train_data = {col: X_train[:, i] for i, col in enumerate(feature_cols)} | |
| train_data[target_col] = y_train | |
| train_df = pl.DataFrame(train_data) | |
| val_data = {col: X_val[:, i] for i, col in enumerate(feature_cols)} | |
| val_data[target_col] = y_val | |
| val_df = pl.DataFrame(val_data) | |
| test_data = {col: X_test[:, i] for i, col in enumerate(feature_cols)} | |
| test_data[target_col] = y_test | |
| test_df = pl.DataFrame(test_data) | |
| else: | |
| indices = np.arange(len(df)) | |
| temp_idx, test_idx = train_test_split( | |
| indices, test_size=test_size, random_state=random_state | |
| ) | |
| val_ratio = val_size / (1 - test_size) | |
| train_idx, val_idx = train_test_split( | |
| temp_idx, test_size=val_ratio, random_state=random_state | |
| ) | |
| train_df = df[train_idx] | |
| val_df = df[val_idx] | |
| test_df = df[test_idx] | |
| if output_dir: | |
| os.makedirs(output_dir, exist_ok=True) | |
| train_path = os.path.join(output_dir, "train.csv") | |
| val_path = os.path.join(output_dir, "val.csv") | |
| test_path = os.path.join(output_dir, "test.csv") | |
| save_dataframe(train_df, train_path) | |
| save_dataframe(val_df, val_path) | |
| save_dataframe(test_df, test_path) | |
| print(f"✅ Train/val/test split: train={len(train_df)}, val={len(val_df)}, test={len(test_df)}") | |
| return { | |
| 'status': 'success', | |
| 'split_type': 'train_val_test', | |
| 'train_size': len(train_df), | |
| 'val_size': len(val_df), | |
| 'test_size': len(test_df), | |
| 'stratified': bool(stratify_y is not None), | |
| 'train_path': train_path, | |
| 'val_path': val_path, | |
| 'test_path': test_path | |
| } | |
| else: | |
| raise ValueError(f"Unsupported split_type: {split_type}") | |