""" Data Cleaning Tools Tools for handling missing values, outliers, and data type issues. """ import polars as pl import numpy as np from typing import Dict, Any, List, Optional from pathlib import Path import sys import os # Add parent directory to path for imports sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from ..utils.polars_helpers import ( load_dataframe, save_dataframe, get_numeric_columns, get_categorical_columns, get_datetime_columns, detect_id_columns, ) from ..utils.validation import ( validate_file_exists, validate_file_format, validate_dataframe, validate_columns_exist, ) def clean_missing_values(file_path: str, strategy, output_path: str, threshold: float = 0.4) -> Dict[str, Any]: """ Handle missing values using appropriate strategies with smart threshold-based column dropping. Args: file_path: Path to CSV or Parquet file strategy: Either "auto" (string) to automatically decide strategies for all columns, or a dictionary mapping column names to strategies ('median', 'mean', 'mode', 'forward_fill', 'drop') output_path: Path to save cleaned dataset threshold: For "auto" strategy, drop columns with missing % > threshold (default: 0.4 = 40%) Returns: Dictionary with cleaning report Auto Strategy Behavior: 1. Drop columns with >threshold missing (default 40%) 2. Impute numeric columns with median 3. Impute categorical columns with mode 4. Forward-fill for time series columns """ # Validation validate_file_exists(file_path) validate_file_format(file_path) # Load data df = load_dataframe(file_path) validate_dataframe(df) # Get column type information numeric_cols = get_numeric_columns(df) categorical_cols = get_categorical_columns(df) datetime_cols = get_datetime_columns(df) id_cols = detect_id_columns(df) report = { "original_rows": len(df), "original_columns": len(df.columns), "columns_dropped": [], "columns_processed": {}, "rows_dropped": 0, "threshold_used": threshold } # Handle "auto" mode - Smart threshold-based cleaning if isinstance(strategy, str) and strategy == "auto": # Step 1: Identify and drop high-missing columns (>threshold) cols_to_drop = [] for col in df.columns: null_count = df[col].null_count() null_pct = null_count / len(df) if len(df) > 0 else 0 if null_pct > threshold: cols_to_drop.append(col) report["columns_dropped"].append({ "column": col, "missing_percentage": round(null_pct * 100, 2), "reason": f"Missing >{threshold*100}% of values" }) # Drop high-missing columns if cols_to_drop: df = df.drop(cols_to_drop) print(f"🗑️ Dropped {len(cols_to_drop)} columns with >{threshold*100}% missing:") for col_info in report["columns_dropped"]: print(f" - {col_info['column']} ({col_info['missing_percentage']}% missing)") # Step 2: Build strategy for remaining columns strategy = {} for col in df.columns: if df[col].null_count() > 0: if col in id_cols: strategy[col] = "drop" # Drop rows with missing IDs elif col in datetime_cols: strategy[col] = "forward_fill" # Forward fill for time series elif col in numeric_cols: strategy[col] = "median" # Median for numeric (robust to outliers) elif col in categorical_cols: strategy[col] = "mode" # Mode for categorical else: strategy[col] = "mode" # Default to mode print(f"🔧 Auto-detected strategies for {len(strategy)} remaining columns with missing values") # Process each column based on strategy for col, strat in strategy.items(): if col not in df.columns: report["columns_processed"][col] = { "status": "error", "message": f"Column not found (may have been dropped)" } continue null_count_before = df[col].null_count() if null_count_before == 0: report["columns_processed"][col] = { "status": "skipped", "message": "No missing values" } continue # Don't impute ID columns - drop rows instead if col in id_cols and strat != "drop": report["columns_processed"][col] = { "status": "skipped", "message": "ID column - not imputed (use 'drop' to remove rows)" } continue # Apply strategy try: rows_before = len(df) if strat == "median": if col in numeric_cols: median_val = df[col].median() df = df.with_columns( pl.col(col).fill_null(median_val).alias(col) ) report["columns_processed"][col] = { "status": "success", "strategy": "median", "nulls_before": int(null_count_before), "nulls_after": int(df[col].null_count()), "fill_value": float(median_val) } else: report["columns_processed"][col] = { "status": "error", "message": "Cannot use median on non-numeric column" } continue elif strat == "mean": if col in numeric_cols: mean_val = df[col].mean() df = df.with_columns( pl.col(col).fill_null(mean_val).alias(col) ) report["columns_processed"][col] = { "status": "success", "strategy": "mean", "nulls_before": int(null_count_before), "nulls_after": int(df[col].null_count()), "fill_value": float(mean_val) } else: report["columns_processed"][col] = { "status": "error", "message": "Cannot use mean on non-numeric column" } continue elif strat == "mode": mode_val = df[col].drop_nulls().mode().first() if mode_val is not None: df = df.with_columns( pl.col(col).fill_null(mode_val).alias(col) ) report["columns_processed"][col] = { "status": "success", "strategy": "mode", "nulls_before": int(null_count_before), "nulls_after": int(df[col].null_count()), "fill_value": str(mode_val) } elif strat == "forward_fill": df = df.with_columns( pl.col(col).forward_fill().alias(col) ) report["columns_processed"][col] = { "status": "success", "strategy": "forward_fill", "nulls_before": int(null_count_before), "nulls_after": int(df[col].null_count()) } elif strat == "drop": df = df.filter(pl.col(col).is_not_null()) rows_after = len(df) report["columns_processed"][col] = { "status": "success", "strategy": "drop", "nulls_before": int(null_count_before), "rows_dropped": rows_before - rows_after } else: report["columns_processed"][col] = { "status": "error", "message": f"Unknown strategy: {strat}" } continue except Exception as e: report["columns_processed"][col] = { "status": "error", "message": str(e) } report["final_rows"] = len(df) report["final_columns"] = len(df.columns) report["rows_dropped"] = report["original_rows"] - report["final_rows"] report["columns_dropped_count"] = len(report["columns_dropped"]) # Save cleaned dataset Path(output_path).parent.mkdir(parents=True, exist_ok=True) save_dataframe(df, output_path) report["output_path"] = output_path # Summary message report["message"] = f"Cleaned {report['original_rows']} rows → {report['final_rows']} rows. " report["message"] += f"Dropped {report['columns_dropped_count']} columns. " report["message"] += f"Processed {len([c for c in report['columns_processed'].values() if c['status'] == 'success'])} columns." return report def handle_outliers(file_path: str, method: str, columns: List[str], output_path: str) -> Dict[str, Any]: """ Detect and handle outliers in numeric columns. Args: file_path: Path to CSV or Parquet file method: Method to handle outliers ('clip', 'winsorize', 'remove') columns: List of columns to check, or ['all'] for all numeric columns output_path: Path to save cleaned dataset Returns: Dictionary with outlier handling report """ # Validation validate_file_exists(file_path) validate_file_format(file_path) # Load data df = load_dataframe(file_path) validate_dataframe(df) # Determine which columns to process numeric_cols = get_numeric_columns(df) if columns == ["all"]: target_cols = numeric_cols else: # Filter to only existing numeric columns (auto-skip dropped columns) target_cols = [] for col in columns: if col not in df.columns: print(f"⚠️ Skipping '{col}' - column was dropped in previous step") continue if col not in numeric_cols: print(f"⚠️ Skipping '{col}' - not numeric") continue target_cols.append(col) # If no valid columns remain, return early if not target_cols: return { "success": False, "error": f"None of the requested columns exist in the dataset. Available numeric columns: {', '.join(numeric_cols[:20])}", "error_type": "ValueError" } report = { "original_rows": len(df), "method": method, "columns_processed": {} } # Process each column for col in target_cols: col_data = df[col].drop_nulls() if len(col_data) == 0: report["columns_processed"][col] = { "status": "skipped", "message": "All values are null" } continue # Calculate IQR bounds q1 = col_data.quantile(0.25) q3 = col_data.quantile(0.75) iqr = q3 - q1 lower_bound = q1 - 1.5 * iqr upper_bound = q3 + 1.5 * iqr # Count outliers outliers_mask = (df[col] < lower_bound) | (df[col] > upper_bound) outlier_count = outliers_mask.sum() if outlier_count == 0: report["columns_processed"][col] = { "status": "skipped", "message": "No outliers detected" } continue # Apply method if method == "clip": # Clip values to bounds df = df.with_columns( pl.col(col).clip(lower_bound, upper_bound).alias(col) ) elif method == "winsorize": # Winsorize: cap at 1st and 99th percentiles p1 = col_data.quantile(0.01) p99 = col_data.quantile(0.99) df = df.with_columns( pl.col(col).clip(p1, p99).alias(col) ) elif method == "remove": # Remove rows with outliers df = df.filter(~outliers_mask) report["columns_processed"][col] = { "status": "success", "outliers_detected": int(outlier_count), "bounds": { "lower": float(lower_bound), "upper": float(upper_bound) } } report["final_rows"] = len(df) report["rows_dropped"] = report["original_rows"] - report["final_rows"] # Save cleaned dataset Path(output_path).parent.mkdir(parents=True, exist_ok=True) save_dataframe(df, output_path) report["output_path"] = output_path return report def fix_data_types(file_path: str, type_mapping: Optional[Dict[str, str]] = None, output_path: str = None) -> Dict[str, Any]: """ Auto-detect and fix incorrect data types. Args: file_path: Path to CSV or Parquet file type_mapping: Optional dictionary mapping columns to target types ('int', 'float', 'string', 'date', 'bool', 'category') Use 'auto' or None for automatic detection output_path: Path to save dataset with fixed types Returns: Dictionary with type fixing report """ # Validation validate_file_exists(file_path) validate_file_format(file_path) # Load data df = load_dataframe(file_path) validate_dataframe(df) if type_mapping is None or type_mapping == {"auto": "auto"}: type_mapping = {} report = { "columns_processed": {} } for col in df.columns: original_dtype = str(df[col].dtype) # Get target type from mapping or auto-detect if col in type_mapping and type_mapping[col] != "auto": target_type = type_mapping[col] else: # Auto-detect target type target_type = _auto_detect_type(df[col]) if target_type is None: report["columns_processed"][col] = { "status": "skipped", "original_dtype": original_dtype, "message": "Could not auto-detect type" } continue # Try to convert try: if target_type == "int": df = df.with_columns( pl.col(col).cast(pl.Int64, strict=False).alias(col) ) elif target_type == "float": df = df.with_columns( pl.col(col).cast(pl.Float64, strict=False).alias(col) ) elif target_type == "string": df = df.with_columns( pl.col(col).cast(pl.Utf8).alias(col) ) elif target_type == "date": df = df.with_columns( pl.col(col).str.strptime(pl.Date, "%Y-%m-%d", strict=False).alias(col) ) elif target_type == "bool": df = df.with_columns( pl.col(col).cast(pl.Boolean, strict=False).alias(col) ) elif target_type == "category": df = df.with_columns( pl.col(col).cast(pl.Categorical).alias(col) ) new_dtype = str(df[col].dtype) report["columns_processed"][col] = { "status": "success", "original_dtype": original_dtype, "new_dtype": new_dtype, "target_type": target_type } except Exception as e: report["columns_processed"][col] = { "status": "error", "original_dtype": original_dtype, "target_type": target_type, "message": str(e) } # Save dataset Path(output_path).parent.mkdir(parents=True, exist_ok=True) save_dataframe(df, output_path) report["output_path"] = output_path return report def _auto_detect_type(series: pl.Series) -> Optional[str]: """ Auto-detect appropriate type for a series. Args: series: Polars series Returns: Detected type string or None """ # Already correct type if series.dtype in pl.NUMERIC_DTYPES: return None if series.dtype in [pl.Date, pl.Datetime]: return None # Try to detect from string values if series.dtype == pl.Utf8: sample = series.drop_nulls().head(100) if len(sample) == 0: return None # Check for boolean unique_vals = set(str(v).lower() for v in sample.to_list()) if unique_vals.issubset({'true', 'false', '1', '0', 'yes', 'no', 't', 'f'}): return "bool" # Check for numeric try: sample.cast(pl.Float64) # Check if all are integers if all('.' not in str(v) for v in sample.to_list() if v is not None): return "int" return "float" except: pass # Check for date try: sample.str.strptime(pl.Date, "%Y-%m-%d", strict=False) return "date" except: pass # Check if should be categorical (low cardinality) n_unique = series.n_unique() if n_unique < len(series) * 0.5 and n_unique < 100: return "category" return None