Data-Science-Agent / src /tools /advanced_preprocessing.py
Pulastya B
fix: Fix module import paths for Render deployment
227cb22
"""
Advanced Preprocessing Tools
Tools for handling imbalanced data, feature scaling, and strategic data splitting.
"""
import polars as pl
import numpy as np
from typing import Dict, Any, List, Optional, Tuple
from pathlib import Path
import sys
import os
import joblib
import warnings
warnings.filterwarnings('ignore')
# Add parent directory to path for imports
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, MinMaxScaler, RobustScaler, LabelEncoder
from imblearn.over_sampling import SMOTE, ADASYN, BorderlineSMOTE
from imblearn.under_sampling import RandomUnderSampler, TomekLinks, EditedNearestNeighbours
from imblearn.combine import SMOTETomek, SMOTEENN
from collections import Counter
from ..utils.polars_helpers import (
load_dataframe, save_dataframe, get_numeric_columns,
get_categorical_columns, split_features_target
)
from ..utils.validation import (
validate_file_exists, validate_file_format, validate_dataframe,
validate_column_exists
)
def handle_imbalanced_data(
file_path: str,
target_col: str,
strategy: str = "smote",
sampling_ratio: float = 1.0,
output_path: str = None,
random_state: int = 42
) -> Dict[str, Any]:
"""
Handle imbalanced datasets using various resampling techniques.
Args:
file_path: Path to dataset
target_col: Target column name
strategy: Resampling strategy:
- 'smote': Synthetic Minority Over-sampling (SMOTE)
- 'adasyn': Adaptive Synthetic Sampling
- 'borderline_smote': Borderline SMOTE variant
- 'random_undersample': Random undersampling
- 'tomek': Tomek Links undersampling
- 'smote_tomek': Combined SMOTE + Tomek Links
- 'smote_enn': Combined SMOTE + Edited Nearest Neighbours
- 'class_weights': Return class weights (no resampling)
sampling_ratio: Ratio of minority to majority class (0.5 = 50%, 1.0 = 100%)
output_path: Path to save balanced dataset
random_state: Random seed
Returns:
Dictionary with balancing results and class distributions
"""
# Validation
validate_file_exists(file_path)
validate_file_format(file_path)
# Load data
df = load_dataframe(file_path)
validate_dataframe(df)
validate_column_exists(df, target_col)
# Get original class distribution
original_dist = df[target_col].value_counts().to_dict()
original_counts = dict(sorted(original_dist.items()))
print(f"📊 Original class distribution: {original_counts}")
# Calculate imbalance ratio
class_counts = list(original_counts.values())
imbalance_ratio = max(class_counts) / min(class_counts)
if imbalance_ratio < 1.5:
return {
'status': 'skipped',
'message': 'Dataset is already balanced (ratio < 1.5)',
'original_distribution': original_counts,
'imbalance_ratio': float(imbalance_ratio)
}
# Prepare data
X, y = split_features_target(df, target_col)
# Handle class weights strategy (no resampling)
if strategy == "class_weights":
from sklearn.utils.class_weight import compute_class_weight
classes = np.unique(y)
weights = compute_class_weight('balanced', classes=classes, y=y)
class_weights = dict(zip(classes, weights))
return {
'status': 'success',
'strategy': 'class_weights',
'class_weights': {str(k): float(v) for k, v in class_weights.items()},
'original_distribution': original_counts,
'imbalance_ratio': float(imbalance_ratio),
'recommendation': 'Use class_weight parameter in your model training'
}
# Create resampler based on strategy
sampling_strategy = sampling_ratio if sampling_ratio < 1.0 else 'auto'
if strategy == "smote":
resampler = SMOTE(sampling_strategy=sampling_strategy, random_state=random_state)
elif strategy == "adasyn":
resampler = ADASYN(sampling_strategy=sampling_strategy, random_state=random_state)
elif strategy == "borderline_smote":
resampler = BorderlineSMOTE(sampling_strategy=sampling_strategy, random_state=random_state)
elif strategy == "random_undersample":
resampler = RandomUnderSampler(sampling_strategy=sampling_strategy, random_state=random_state)
elif strategy == "tomek":
resampler = TomekLinks(sampling_strategy='auto')
elif strategy == "smote_tomek":
resampler = SMOTETomek(sampling_strategy=sampling_strategy, random_state=random_state)
elif strategy == "smote_enn":
resampler = SMOTEENN(sampling_strategy=sampling_strategy, random_state=random_state)
else:
raise ValueError(f"Unsupported strategy: {strategy}")
# Perform resampling
print(f"⚖️ Applying {strategy} resampling...")
X_resampled, y_resampled = resampler.fit_resample(X, y)
# Get new class distribution
new_counts = dict(Counter(y_resampled))
new_counts = dict(sorted(new_counts.items()))
print(f"✅ New class distribution: {new_counts}")
# Calculate changes
total_original = sum(original_counts.values())
total_new = sum(new_counts.values())
changes = {
str(cls): {
'original': original_counts.get(cls, 0),
'new': new_counts.get(cls, 0),
'change': new_counts.get(cls, 0) - original_counts.get(cls, 0)
}
for cls in set(list(original_counts.keys()) + list(new_counts.keys()))
}
# Create balanced dataframe
feature_cols = [col for col in df.columns if col != target_col]
balanced_data = {col: X_resampled[:, i] for i, col in enumerate(feature_cols)}
balanced_data[target_col] = y_resampled
balanced_df = pl.DataFrame(balanced_data)
# Save if output path provided
if output_path:
save_dataframe(balanced_df, output_path)
print(f"💾 Balanced dataset saved to: {output_path}")
return {
'status': 'success',
'strategy': strategy,
'original_distribution': original_counts,
'new_distribution': new_counts,
'changes_by_class': changes,
'total_samples_before': total_original,
'total_samples_after': total_new,
'sample_change': f"{'+' if total_new > total_original else ''}{total_new - total_original}",
'new_imbalance_ratio': float(max(new_counts.values()) / min(new_counts.values())),
'output_path': output_path
}
def perform_feature_scaling(
file_path: str,
scaler_type: str = "standard",
columns: Optional[List[str]] = None,
output_path: Optional[str] = None,
scaler_save_path: Optional[str] = None
) -> Dict[str, Any]:
"""
Scale features using various normalization techniques.
Args:
file_path: Path to dataset
scaler_type: Scaling method:
- 'standard': StandardScaler (mean=0, std=1)
- 'minmax': MinMaxScaler (range 0-1)
- 'robust': RobustScaler (median, IQR - robust to outliers)
columns: List of columns to scale (None = all numeric columns)
output_path: Path to save scaled dataset
scaler_save_path: Path to save fitted scaler for future use
Returns:
Dictionary with scaling statistics
"""
# Validation
validate_file_exists(file_path)
validate_file_format(file_path)
# Load data
df = load_dataframe(file_path)
validate_dataframe(df)
# Get numeric columns if not specified
if columns is None:
columns = get_numeric_columns(df)
print(f"🔢 Auto-detected {len(columns)} numeric columns for scaling")
else:
for col in columns:
validate_column_exists(df, col)
if not columns:
return {
'status': 'skipped',
'message': 'No numeric columns found to scale'
}
# Create scaler
if scaler_type == "standard":
scaler = StandardScaler()
elif scaler_type == "minmax":
scaler = MinMaxScaler()
elif scaler_type == "robust":
scaler = RobustScaler()
else:
raise ValueError(f"Unsupported scaler_type: {scaler_type}")
# Get original statistics
original_stats = {}
for col in columns:
col_data = df[col].to_numpy()
original_stats[col] = {
'mean': float(np.mean(col_data)),
'std': float(np.std(col_data)),
'min': float(np.min(col_data)),
'max': float(np.max(col_data)),
'median': float(np.median(col_data))
}
# Fit and transform
print(f"📏 Applying {scaler_type} scaling to {len(columns)} columns...")
scaled_data = scaler.fit_transform(df[columns].to_numpy())
# Create scaled dataframe
df_scaled = df.clone()
for i, col in enumerate(columns):
df_scaled = df_scaled.with_columns(
pl.Series(col, scaled_data[:, i])
)
# Get new statistics
new_stats = {}
for i, col in enumerate(columns):
new_stats[col] = {
'mean': float(np.mean(scaled_data[:, i])),
'std': float(np.std(scaled_data[:, i])),
'min': float(np.min(scaled_data[:, i])),
'max': float(np.max(scaled_data[:, i])),
'median': float(np.median(scaled_data[:, i]))
}
# Save scaled data
if output_path:
save_dataframe(df_scaled, output_path)
print(f"💾 Scaled dataset saved to: {output_path}")
# Save scaler
if scaler_save_path:
os.makedirs(os.path.dirname(scaler_save_path), exist_ok=True)
joblib.dump(scaler, scaler_save_path)
print(f"💾 Scaler saved to: {scaler_save_path}")
return {
'status': 'success',
'scaler_type': scaler_type,
'columns_scaled': columns,
'n_columns': len(columns),
'original_stats': original_stats,
'scaled_stats': new_stats,
'output_path': output_path,
'scaler_path': scaler_save_path
}
def split_data_strategically(
file_path: str,
target_col: Optional[str] = None,
split_type: str = "train_test",
test_size: float = 0.2,
val_size: float = 0.1,
stratify: bool = True,
time_col: Optional[str] = None,
group_col: Optional[str] = None,
random_state: int = 42,
output_dir: Optional[str] = None
) -> Dict[str, Any]:
"""
Perform strategic data splitting with multiple options.
Args:
file_path: Path to dataset
target_col: Target column (for stratification)
split_type: Split strategy:
- 'train_test': Train/test split
- 'train_val_test': Train/validation/test split
- 'time_based': Time-based split (requires time_col)
- 'group_based': Group-based split (requires group_col, prevents leakage)
test_size: Test set proportion
val_size: Validation set proportion (for train_val_test)
stratify: Whether to stratify by target
time_col: Column to use for time-based splitting
group_col: Column to use for group-based splitting
random_state: Random seed
output_dir: Directory to save split datasets
Returns:
Dictionary with split information and file paths
"""
# Validation
validate_file_exists(file_path)
validate_file_format(file_path)
# Load data
df = load_dataframe(file_path)
validate_dataframe(df)
if target_col:
validate_column_exists(df, target_col)
n_samples = len(df)
# Time-based split
if split_type == "time_based":
if not time_col:
raise ValueError("time_col is required for time_based split")
validate_column_exists(df, time_col)
# Sort by time
df = df.sort(time_col)
# Calculate split points
test_idx = int(n_samples * (1 - test_size))
if output_dir:
train_df = df[:test_idx]
test_df = df[test_idx:]
os.makedirs(output_dir, exist_ok=True)
train_path = os.path.join(output_dir, "train.csv")
test_path = os.path.join(output_dir, "test.csv")
save_dataframe(train_df, train_path)
save_dataframe(test_df, test_path)
print(f"✅ Time-based split: train={len(train_df)}, test={len(test_df)}")
return {
'status': 'success',
'split_type': 'time_based',
'train_size': len(train_df),
'test_size': len(test_df),
'train_path': train_path,
'test_path': test_path,
'time_column': time_col
}
# Group-based split
elif split_type == "group_based":
if not group_col:
raise ValueError("group_col is required for group_based split")
validate_column_exists(df, group_col)
# Get unique groups
unique_groups = df[group_col].unique().to_list()
n_groups = len(unique_groups)
# Split groups
np.random.seed(random_state)
np.random.shuffle(unique_groups)
test_n_groups = max(1, int(n_groups * test_size))
test_groups = unique_groups[:test_n_groups]
train_groups = unique_groups[test_n_groups:]
train_df = df.filter(pl.col(group_col).is_in(train_groups))
test_df = df.filter(pl.col(group_col).is_in(test_groups))
if output_dir:
os.makedirs(output_dir, exist_ok=True)
train_path = os.path.join(output_dir, "train.csv")
test_path = os.path.join(output_dir, "test.csv")
save_dataframe(train_df, train_path)
save_dataframe(test_df, test_path)
print(f"✅ Group-based split: train={len(train_df)}, test={len(test_df)}")
return {
'status': 'success',
'split_type': 'group_based',
'train_size': len(train_df),
'test_size': len(test_df),
'train_groups': len(train_groups),
'test_groups': len(test_groups),
'train_path': train_path,
'test_path': test_path,
'group_column': group_col
}
# Standard train/test split
elif split_type == "train_test":
X, y = split_features_target(df, target_col) if target_col else (df.to_numpy(), None)
stratify_y = y if (stratify and target_col and len(np.unique(y)) < 20) else None
if target_col:
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=test_size, random_state=random_state, stratify=stratify_y
)
# Reconstruct dataframes
feature_cols = [col for col in df.columns if col != target_col]
train_data = {col: X_train[:, i] for i, col in enumerate(feature_cols)}
train_data[target_col] = y_train
train_df = pl.DataFrame(train_data)
test_data = {col: X_test[:, i] for i, col in enumerate(feature_cols)}
test_data[target_col] = y_test
test_df = pl.DataFrame(test_data)
else:
indices = np.arange(len(df))
train_idx, test_idx = train_test_split(
indices, test_size=test_size, random_state=random_state
)
train_df = df[train_idx]
test_df = df[test_idx]
if output_dir:
os.makedirs(output_dir, exist_ok=True)
train_path = os.path.join(output_dir, "train.csv")
test_path = os.path.join(output_dir, "test.csv")
save_dataframe(train_df, train_path)
save_dataframe(test_df, test_path)
print(f"✅ Train/test split: train={len(train_df)}, test={len(test_df)}")
return {
'status': 'success',
'split_type': 'train_test',
'train_size': len(train_df),
'test_size': len(test_df),
'stratified': bool(stratify_y is not None),
'train_path': train_path,
'test_path': test_path
}
# Train/val/test split
elif split_type == "train_val_test":
X, y = split_features_target(df, target_col) if target_col else (df.to_numpy(), None)
stratify_y = y if (stratify and target_col and len(np.unique(y)) < 20) else None
# First split: train+val vs test
if target_col:
X_temp, X_test, y_temp, y_test = train_test_split(
X, y, test_size=test_size, random_state=random_state, stratify=stratify_y
)
# Second split: train vs val
val_ratio = val_size / (1 - test_size)
stratify_temp = y_temp if stratify_y is not None else None
X_train, X_val, y_train, y_val = train_test_split(
X_temp, y_temp, test_size=val_ratio, random_state=random_state, stratify=stratify_temp
)
# Reconstruct dataframes
feature_cols = [col for col in df.columns if col != target_col]
train_data = {col: X_train[:, i] for i, col in enumerate(feature_cols)}
train_data[target_col] = y_train
train_df = pl.DataFrame(train_data)
val_data = {col: X_val[:, i] for i, col in enumerate(feature_cols)}
val_data[target_col] = y_val
val_df = pl.DataFrame(val_data)
test_data = {col: X_test[:, i] for i, col in enumerate(feature_cols)}
test_data[target_col] = y_test
test_df = pl.DataFrame(test_data)
else:
indices = np.arange(len(df))
temp_idx, test_idx = train_test_split(
indices, test_size=test_size, random_state=random_state
)
val_ratio = val_size / (1 - test_size)
train_idx, val_idx = train_test_split(
temp_idx, test_size=val_ratio, random_state=random_state
)
train_df = df[train_idx]
val_df = df[val_idx]
test_df = df[test_idx]
if output_dir:
os.makedirs(output_dir, exist_ok=True)
train_path = os.path.join(output_dir, "train.csv")
val_path = os.path.join(output_dir, "val.csv")
test_path = os.path.join(output_dir, "test.csv")
save_dataframe(train_df, train_path)
save_dataframe(val_df, val_path)
save_dataframe(test_df, test_path)
print(f"✅ Train/val/test split: train={len(train_df)}, val={len(val_df)}, test={len(test_df)}")
return {
'status': 'success',
'split_type': 'train_val_test',
'train_size': len(train_df),
'val_size': len(val_df),
'test_size': len(test_df),
'stratified': bool(stratify_y is not None),
'train_path': train_path,
'val_path': val_path,
'test_path': test_path
}
else:
raise ValueError(f"Unsupported split_type: {split_type}")