Data-Science-Agent / src /tools /auto_pipeline.py
Pulastya B
fix: Fix module import paths for Render deployment
227cb22
"""
Automated ML Pipeline
Zero-configuration automatic data processing: Clean → Encode → Engineer → Select
"""
import polars as pl
import numpy as np
import pandas as pd
from typing import Dict, Any, List, Optional, Tuple
from pathlib import Path
import sys
import os
from sklearn.feature_selection import SelectKBest, f_classif, f_regression, mutual_info_classif
from sklearn.preprocessing import StandardScaler
# Add parent directory to path
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from ..utils.polars_helpers import load_dataframe, get_numeric_columns
from ..utils.validation import validate_file_exists
from .data_cleaning import clean_missing_values, handle_outliers
from .data_type_conversion import force_numeric_conversion, smart_type_inference
from .feature_engineering import encode_categorical, create_time_features
from .advanced_feature_engineering import create_interaction_features
def auto_ml_pipeline(file_path: str,
target_col: str,
task_type: str = "auto",
output_path: Optional[str] = None,
feature_engineering_level: str = "basic") -> Dict[str, Any]:
"""
Fully automated ML pipeline with zero manual intervention.
Pipeline stages:
1. Auto-detect column types
2. Clean missing values intelligently
3. Handle outliers
4. Encode categorical variables
5. Engineer time features (if datetime detected)
6. Create interaction features (if requested)
7. Select best features
Args:
file_path: Path to input dataset
target_col: Target column name
task_type: 'classification', 'regression', or 'auto'
output_path: Where to save processed data
feature_engineering_level: 'basic', 'intermediate', 'advanced'
Returns:
Dictionary with pipeline results and explanations
"""
validate_file_exists(file_path)
if output_path is None:
output_path = "./outputs/data/auto_pipeline_output.csv"
Path(output_path).parent.mkdir(parents=True, exist_ok=True)
results = {
"stages_completed": [],
"transformations_applied": [],
"warnings": [],
"final_features": [],
"output_path": output_path
}
# Load data
df = load_dataframe(file_path)
original_shape = df.shape
results["original_shape"] = {"rows": original_shape[0], "columns": original_shape[1]}
print(f"🚀 Starting Auto ML Pipeline")
print(f"📊 Original shape: {original_shape[0]:,} rows × {original_shape[1]} columns")
# STAGE 1: Auto-detect column types
print("\n🔍 Stage 1: Auto-detecting column types...")
type_detection = smart_type_inference(file_path, output_path="./outputs/data/stage1_types.csv")
results["stages_completed"].append("type_detection")
results["transformations_applied"].append({
"stage": "Type Detection",
"description": f"Detected {len(type_detection.get('conversions_made', []))} type conversions"
})
current_file = "./outputs/data/stage1_types.csv"
# STAGE 2: Clean missing values
print("\n🧹 Stage 2: Cleaning missing values...")
cleaning_result = clean_missing_values(
current_file,
strategy="auto",
output_path="./outputs/data/stage2_cleaned.csv"
)
results["stages_completed"].append("missing_value_cleaning")
results["transformations_applied"].append({
"stage": "Missing Value Cleaning",
"description": f"Cleaned {cleaning_result.get('total_nulls_before', 0)} missing values using auto-detected strategies"
})
current_file = "./outputs/data/stage2_cleaned.csv"
# STAGE 3: Handle outliers
print("\n📊 Stage 3: Handling outliers...")
outlier_result = handle_outliers(
current_file,
columns=["all"],
method="clip",
output_path="./outputs/data/stage3_no_outliers.csv"
)
results["stages_completed"].append("outlier_handling")
results["transformations_applied"].append({
"stage": "Outlier Handling",
"description": f"Clipped outliers in {outlier_result.get('columns_processed', 0)} columns"
})
current_file = "./outputs/data/stage3_no_outliers.csv"
# STAGE 4: Force numeric conversion (for any remaining string numbers)
print("\n🔢 Stage 4: Converting to numeric...")
numeric_result = force_numeric_conversion(
current_file,
columns=["all"],
errors="coerce",
output_path="./outputs/data/stage4_numeric.csv"
)
results["stages_completed"].append("numeric_conversion")
current_file = "./outputs/data/stage4_numeric.csv"
# STAGE 5: Encode categorical variables
print("\n🏷️ Stage 5: Encoding categorical variables...")
encoding_result = encode_categorical(
current_file,
method="auto",
output_path="./outputs/data/stage5_encoded.csv"
)
results["stages_completed"].append("categorical_encoding")
results["transformations_applied"].append({
"stage": "Categorical Encoding",
"description": f"Encoded {len(encoding_result.get('encoded_columns', []))} categorical columns"
})
current_file = "./outputs/data/stage5_encoded.csv"
# STAGE 6: Feature engineering (if requested)
if feature_engineering_level in ["intermediate", "advanced"]:
print("\n⚙️ Stage 6: Engineering features...")
# Check for datetime columns and create time features
df_current = load_dataframe(current_file).to_pandas()
datetime_cols = df_current.select_dtypes(include=['datetime64']).columns.tolist()
if datetime_cols:
print(f" Creating time features from {len(datetime_cols)} datetime columns...")
for dt_col in datetime_cols:
try:
time_result = create_time_features(
current_file,
date_column=dt_col,
output_path=current_file # Overwrite
)
results["transformations_applied"].append({
"stage": "Time Feature Engineering",
"description": f"Created time features from {dt_col}"
})
except Exception as e:
results["warnings"].append(f"Could not create time features from {dt_col}: {str(e)}")
# Create interaction features for advanced mode
if feature_engineering_level == "advanced":
print(" Creating interaction features...")
try:
interaction_result = create_interaction_features(
current_file,
method="polynomial",
degree=2,
max_features=10,
output_path="./outputs/data/stage6_engineered.csv"
)
results["stages_completed"].append("interaction_features")
results["transformations_applied"].append({
"stage": "Interaction Features",
"description": f"Created {len(interaction_result.get('new_features', []))} interaction features"
})
current_file = "./outputs/data/stage6_engineered.csv"
except Exception as e:
results["warnings"].append(f"Could not create interaction features: {str(e)}")
# STAGE 7: Feature selection
print("\n🎯 Stage 7: Selecting best features...")
try:
selection_result = auto_feature_selection(
current_file,
target_col=target_col,
task_type=task_type,
max_features=50,
output_path=output_path
)
results["stages_completed"].append("feature_selection")
results["transformations_applied"].append({
"stage": "Feature Selection",
"description": f"Selected {selection_result['n_features_selected']} best features from {selection_result['n_features_original']}"
})
results["selected_features"] = selection_result["selected_features"]
results["feature_importance"] = selection_result.get("feature_scores", {})
except Exception as e:
results["warnings"].append(f"Feature selection failed: {str(e)}")
# Just copy the file
import shutil
shutil.copy(current_file, output_path)
# Final shape
df_final = load_dataframe(output_path)
final_shape = df_final.shape
results["final_shape"] = {"rows": final_shape[0], "columns": final_shape[1]}
results["final_features"] = df_final.columns
print(f"\n✅ Pipeline completed!")
print(f"📊 Final shape: {final_shape[0]:,} rows × {final_shape[1]} columns")
print(f"💾 Saved to: {output_path}")
# Generate summary
results["summary"] = _generate_pipeline_summary(results)
return results
def auto_feature_selection(file_path: str,
target_col: str,
task_type: str = "auto",
max_features: int = 50,
method: str = "auto",
output_path: Optional[str] = None) -> Dict[str, Any]:
"""
Automatically select the best features for modeling.
Args:
file_path: Path to dataset
target_col: Target column
task_type: 'classification', 'regression', or 'auto'
max_features: Maximum number of features to keep
method: 'mutual_info', 'f_test', or 'auto'
output_path: Where to save selected features
Returns:
Dictionary with selection results
"""
validate_file_exists(file_path)
df = load_dataframe(file_path).to_pandas()
if target_col not in df.columns:
return {"status": "error", "message": f"Target column '{target_col}' not found"}
# Separate features and target
X = df.drop(columns=[target_col])
y = df[target_col]
# Get only numeric features
numeric_features = X.select_dtypes(include=[np.number]).columns.tolist()
X_numeric = X[numeric_features]
if len(numeric_features) == 0:
return {"status": "error", "message": "No numeric features found"}
# Auto-detect task type
if task_type == "auto":
if y.dtype == 'object' or y.nunique() < 20:
task_type = "classification"
else:
task_type = "regression"
# Select method
if method == "auto":
method = "mutual_info" if task_type == "classification" else "f_test"
# Perform selection
n_features_to_select = min(max_features, len(numeric_features))
if method == "mutual_info":
if task_type == "classification":
selector = SelectKBest(mutual_info_classif, k=n_features_to_select)
else:
from sklearn.feature_selection import mutual_info_regression
selector = SelectKBest(mutual_info_regression, k=n_features_to_select)
else: # f_test
if task_type == "classification":
selector = SelectKBest(f_classif, k=n_features_to_select)
else:
selector = SelectKBest(f_regression, k=n_features_to_select)
# Fit selector
X_selected = selector.fit_transform(X_numeric.fillna(0), y)
# Get selected feature names
selected_mask = selector.get_support()
selected_features = np.array(numeric_features)[selected_mask].tolist()
# Get feature scores
feature_scores = dict(zip(numeric_features, selector.scores_))
sorted_features = sorted(feature_scores.items(), key=lambda x: x[1], reverse=True)
results = {
"n_features_original": len(numeric_features),
"n_features_selected": len(selected_features),
"selected_features": selected_features,
"feature_scores": dict(sorted_features[:n_features_to_select]),
"selection_method": method,
"task_type": task_type
}
# Save selected features + target
if output_path:
df_selected = df[selected_features + [target_col]]
df_selected.to_csv(output_path, index=False)
results["output_path"] = output_path
return results
def _generate_pipeline_summary(results: Dict[str, Any]) -> str:
"""Generate human-readable summary of pipeline execution."""
summary = []
summary.append("🔄 **Auto ML Pipeline Summary**\n")
summary.append(f"Original shape: {results['original_shape']['rows']:,} rows × {results['original_shape']['columns']} columns")
summary.append(f"Final shape: {results['final_shape']['rows']:,} rows × {results['final_shape']['columns']} columns\n")
summary.append("**Stages Completed:**")
for i, stage in enumerate(results['stages_completed'], 1):
summary.append(f"{i}. {stage.replace('_', ' ').title()}")
summary.append("\n**Transformations Applied:**")
for transform in results['transformations_applied']:
summary.append(f"• {transform['stage']}: {transform['description']}")
if results.get('warnings'):
summary.append("\n⚠️ **Warnings:**")
for warning in results['warnings']:
summary.append(f"• {warning}")
if results.get('selected_features'):
summary.append(f"\n🎯 **Selected {len(results['selected_features'])} best features**")
summary.append(f"\n💾 Output saved to: {results['output_path']}")
return "\n".join(summary)
def explain_pipeline_decision(stage: str, decision: str, reason: str) -> Dict[str, str]:
"""
Explain a pipeline decision in human-readable format.
Args:
stage: Pipeline stage name
decision: What decision was made
reason: Why this decision was made
Returns:
Dictionary with explanation
"""
return {
"stage": stage,
"decision": decision,
"reason": reason,
"explanation": f"In the {stage} stage, I decided to {decision} because {reason}"
}