""" Automated ML Pipeline Zero-configuration automatic data processing: Clean โ†’ Encode โ†’ Engineer โ†’ Select """ import polars as pl import numpy as np import pandas as pd from typing import Dict, Any, List, Optional, Tuple from pathlib import Path import sys import os from sklearn.feature_selection import SelectKBest, f_classif, f_regression, mutual_info_classif from sklearn.preprocessing import StandardScaler # Add parent directory to path sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from ..utils.polars_helpers import load_dataframe, get_numeric_columns from ..utils.validation import validate_file_exists from .data_cleaning import clean_missing_values, handle_outliers from .data_type_conversion import force_numeric_conversion, smart_type_inference from .feature_engineering import encode_categorical, create_time_features from .advanced_feature_engineering import create_interaction_features def auto_ml_pipeline(file_path: str, target_col: str, task_type: str = "auto", output_path: Optional[str] = None, feature_engineering_level: str = "basic") -> Dict[str, Any]: """ Fully automated ML pipeline with zero manual intervention. Pipeline stages: 1. Auto-detect column types 2. Clean missing values intelligently 3. Handle outliers 4. Encode categorical variables 5. Engineer time features (if datetime detected) 6. Create interaction features (if requested) 7. Select best features Args: file_path: Path to input dataset target_col: Target column name task_type: 'classification', 'regression', or 'auto' output_path: Where to save processed data feature_engineering_level: 'basic', 'intermediate', 'advanced' Returns: Dictionary with pipeline results and explanations """ validate_file_exists(file_path) if output_path is None: output_path = "./outputs/data/auto_pipeline_output.csv" Path(output_path).parent.mkdir(parents=True, exist_ok=True) results = { "stages_completed": [], "transformations_applied": [], "warnings": [], "final_features": [], "output_path": output_path } # Load data df = load_dataframe(file_path) original_shape = df.shape results["original_shape"] = {"rows": original_shape[0], "columns": original_shape[1]} print(f"๐Ÿš€ Starting Auto ML Pipeline") print(f"๐Ÿ“Š Original shape: {original_shape[0]:,} rows ร— {original_shape[1]} columns") # STAGE 1: Auto-detect column types print("\n๐Ÿ” Stage 1: Auto-detecting column types...") type_detection = smart_type_inference(file_path, output_path="./outputs/data/stage1_types.csv") results["stages_completed"].append("type_detection") results["transformations_applied"].append({ "stage": "Type Detection", "description": f"Detected {len(type_detection.get('conversions_made', []))} type conversions" }) current_file = "./outputs/data/stage1_types.csv" # STAGE 2: Clean missing values print("\n๐Ÿงน Stage 2: Cleaning missing values...") cleaning_result = clean_missing_values( current_file, strategy="auto", output_path="./outputs/data/stage2_cleaned.csv" ) results["stages_completed"].append("missing_value_cleaning") results["transformations_applied"].append({ "stage": "Missing Value Cleaning", "description": f"Cleaned {cleaning_result.get('total_nulls_before', 0)} missing values using auto-detected strategies" }) current_file = "./outputs/data/stage2_cleaned.csv" # STAGE 3: Handle outliers print("\n๐Ÿ“Š Stage 3: Handling outliers...") outlier_result = handle_outliers( current_file, columns=["all"], method="clip", output_path="./outputs/data/stage3_no_outliers.csv" ) results["stages_completed"].append("outlier_handling") results["transformations_applied"].append({ "stage": "Outlier Handling", "description": f"Clipped outliers in {outlier_result.get('columns_processed', 0)} columns" }) current_file = "./outputs/data/stage3_no_outliers.csv" # STAGE 4: Force numeric conversion (for any remaining string numbers) print("\n๐Ÿ”ข Stage 4: Converting to numeric...") numeric_result = force_numeric_conversion( current_file, columns=["all"], errors="coerce", output_path="./outputs/data/stage4_numeric.csv" ) results["stages_completed"].append("numeric_conversion") current_file = "./outputs/data/stage4_numeric.csv" # STAGE 5: Encode categorical variables print("\n๐Ÿท๏ธ Stage 5: Encoding categorical variables...") encoding_result = encode_categorical( current_file, method="auto", output_path="./outputs/data/stage5_encoded.csv" ) results["stages_completed"].append("categorical_encoding") results["transformations_applied"].append({ "stage": "Categorical Encoding", "description": f"Encoded {len(encoding_result.get('encoded_columns', []))} categorical columns" }) current_file = "./outputs/data/stage5_encoded.csv" # STAGE 6: Feature engineering (if requested) if feature_engineering_level in ["intermediate", "advanced"]: print("\nโš™๏ธ Stage 6: Engineering features...") # Check for datetime columns and create time features df_current = load_dataframe(current_file).to_pandas() datetime_cols = df_current.select_dtypes(include=['datetime64']).columns.tolist() if datetime_cols: print(f" Creating time features from {len(datetime_cols)} datetime columns...") for dt_col in datetime_cols: try: time_result = create_time_features( current_file, date_column=dt_col, output_path=current_file # Overwrite ) results["transformations_applied"].append({ "stage": "Time Feature Engineering", "description": f"Created time features from {dt_col}" }) except Exception as e: results["warnings"].append(f"Could not create time features from {dt_col}: {str(e)}") # Create interaction features for advanced mode if feature_engineering_level == "advanced": print(" Creating interaction features...") try: interaction_result = create_interaction_features( current_file, method="polynomial", degree=2, max_features=10, output_path="./outputs/data/stage6_engineered.csv" ) results["stages_completed"].append("interaction_features") results["transformations_applied"].append({ "stage": "Interaction Features", "description": f"Created {len(interaction_result.get('new_features', []))} interaction features" }) current_file = "./outputs/data/stage6_engineered.csv" except Exception as e: results["warnings"].append(f"Could not create interaction features: {str(e)}") # STAGE 7: Feature selection print("\n๐ŸŽฏ Stage 7: Selecting best features...") try: selection_result = auto_feature_selection( current_file, target_col=target_col, task_type=task_type, max_features=50, output_path=output_path ) results["stages_completed"].append("feature_selection") results["transformations_applied"].append({ "stage": "Feature Selection", "description": f"Selected {selection_result['n_features_selected']} best features from {selection_result['n_features_original']}" }) results["selected_features"] = selection_result["selected_features"] results["feature_importance"] = selection_result.get("feature_scores", {}) except Exception as e: results["warnings"].append(f"Feature selection failed: {str(e)}") # Just copy the file import shutil shutil.copy(current_file, output_path) # Final shape df_final = load_dataframe(output_path) final_shape = df_final.shape results["final_shape"] = {"rows": final_shape[0], "columns": final_shape[1]} results["final_features"] = df_final.columns print(f"\nโœ… Pipeline completed!") print(f"๐Ÿ“Š Final shape: {final_shape[0]:,} rows ร— {final_shape[1]} columns") print(f"๐Ÿ’พ Saved to: {output_path}") # Generate summary results["summary"] = _generate_pipeline_summary(results) return results def auto_feature_selection(file_path: str, target_col: str, task_type: str = "auto", max_features: int = 50, method: str = "auto", output_path: Optional[str] = None) -> Dict[str, Any]: """ Automatically select the best features for modeling. Args: file_path: Path to dataset target_col: Target column task_type: 'classification', 'regression', or 'auto' max_features: Maximum number of features to keep method: 'mutual_info', 'f_test', or 'auto' output_path: Where to save selected features Returns: Dictionary with selection results """ validate_file_exists(file_path) df = load_dataframe(file_path).to_pandas() if target_col not in df.columns: return {"status": "error", "message": f"Target column '{target_col}' not found"} # Separate features and target X = df.drop(columns=[target_col]) y = df[target_col] # Get only numeric features numeric_features = X.select_dtypes(include=[np.number]).columns.tolist() X_numeric = X[numeric_features] if len(numeric_features) == 0: return {"status": "error", "message": "No numeric features found"} # Auto-detect task type if task_type == "auto": if y.dtype == 'object' or y.nunique() < 20: task_type = "classification" else: task_type = "regression" # Select method if method == "auto": method = "mutual_info" if task_type == "classification" else "f_test" # Perform selection n_features_to_select = min(max_features, len(numeric_features)) if method == "mutual_info": if task_type == "classification": selector = SelectKBest(mutual_info_classif, k=n_features_to_select) else: from sklearn.feature_selection import mutual_info_regression selector = SelectKBest(mutual_info_regression, k=n_features_to_select) else: # f_test if task_type == "classification": selector = SelectKBest(f_classif, k=n_features_to_select) else: selector = SelectKBest(f_regression, k=n_features_to_select) # Fit selector X_selected = selector.fit_transform(X_numeric.fillna(0), y) # Get selected feature names selected_mask = selector.get_support() selected_features = np.array(numeric_features)[selected_mask].tolist() # Get feature scores feature_scores = dict(zip(numeric_features, selector.scores_)) sorted_features = sorted(feature_scores.items(), key=lambda x: x[1], reverse=True) results = { "n_features_original": len(numeric_features), "n_features_selected": len(selected_features), "selected_features": selected_features, "feature_scores": dict(sorted_features[:n_features_to_select]), "selection_method": method, "task_type": task_type } # Save selected features + target if output_path: df_selected = df[selected_features + [target_col]] df_selected.to_csv(output_path, index=False) results["output_path"] = output_path return results def _generate_pipeline_summary(results: Dict[str, Any]) -> str: """Generate human-readable summary of pipeline execution.""" summary = [] summary.append("๐Ÿ”„ **Auto ML Pipeline Summary**\n") summary.append(f"Original shape: {results['original_shape']['rows']:,} rows ร— {results['original_shape']['columns']} columns") summary.append(f"Final shape: {results['final_shape']['rows']:,} rows ร— {results['final_shape']['columns']} columns\n") summary.append("**Stages Completed:**") for i, stage in enumerate(results['stages_completed'], 1): summary.append(f"{i}. {stage.replace('_', ' ').title()}") summary.append("\n**Transformations Applied:**") for transform in results['transformations_applied']: summary.append(f"โ€ข {transform['stage']}: {transform['description']}") if results.get('warnings'): summary.append("\nโš ๏ธ **Warnings:**") for warning in results['warnings']: summary.append(f"โ€ข {warning}") if results.get('selected_features'): summary.append(f"\n๐ŸŽฏ **Selected {len(results['selected_features'])} best features**") summary.append(f"\n๐Ÿ’พ Output saved to: {results['output_path']}") return "\n".join(summary) def explain_pipeline_decision(stage: str, decision: str, reason: str) -> Dict[str, str]: """ Explain a pipeline decision in human-readable format. Args: stage: Pipeline stage name decision: What decision was made reason: Why this decision was made Returns: Dictionary with explanation """ return { "stage": stage, "decision": decision, "reason": reason, "explanation": f"In the {stage} stage, I decided to {decision} because {reason}" }