""" Model Training Tools Tools for training machine learning models and generating reports. """ import polars as pl import numpy as np from typing import Dict, Any, List, Optional from pathlib import Path import sys import os import joblib import json import tempfile # Add parent directory to path for imports sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) # Import artifact store try: from storage.helpers import save_model_with_store ARTIFACT_STORE_AVAILABLE = True except ImportError: ARTIFACT_STORE_AVAILABLE = False print("⚠️ Artifact store not available, using local paths") from sklearn.model_selection import train_test_split from sklearn.linear_model import LogisticRegression, Ridge, Lasso from sklearn.ensemble import RandomForestClassifier, RandomForestRegressor from xgboost import XGBClassifier, XGBRegressor from lightgbm import LGBMClassifier, LGBMRegressor from catboost import CatBoostClassifier, CatBoostRegressor from sklearn.metrics import ( accuracy_score, precision_score, recall_score, f1_score, confusion_matrix, classification_report, mean_squared_error, mean_absolute_error, r2_score ) import shap try: from .visualization_engine import ( generate_model_performance_plots, generate_feature_importance_plot ) VISUALIZATION_AVAILABLE = True except ImportError as e: VISUALIZATION_AVAILABLE = False print(f"⚠️ Visualization engine not available: {e}") from ..utils.polars_helpers import ( load_dataframe, get_numeric_columns, split_features_target, ) from ..utils.validation import ( validate_file_exists, validate_file_format, validate_dataframe, validate_column_exists, validate_target_column, ) def train_baseline_models(file_path: str, target_col: str, task_type: str = "auto", test_size: float = 0.2, random_state: int = 42) -> Dict[str, Any]: """ Train multiple baseline models and compare performance. Args: file_path: Path to prepared dataset target_col: Name of target column task_type: 'classification', 'regression', or 'auto' test_size: Proportion for test split random_state: Random seed Returns: Dictionary with training results and best model """ # Validation validate_file_exists(file_path) validate_file_format(file_path) # Load data df = load_dataframe(file_path) validate_dataframe(df) validate_column_exists(df, target_col) # Infer task type if auto if task_type == "auto": task_type = validate_target_column(df, target_col) # Split features and target X, y = split_features_target(df, target_col) # Convert to numpy for sklearn # Only keep numeric columns for X numeric_cols = get_numeric_columns(X) if len(numeric_cols) == 0: return { "status": "error", "message": "No numeric features found. Please encode categorical variables first." } X_numeric = X.select(numeric_cols) X_np = X_numeric.to_numpy() y_np = y.to_numpy() # Handle missing values (simple imputation with mean) from sklearn.impute import SimpleImputer imputer = SimpleImputer(strategy='mean') X_np = imputer.fit_transform(X_np) # Train-test split X_train, X_test, y_train, y_test = train_test_split( X_np, y_np, test_size=test_size, random_state=random_state ) results = { "task_type": task_type, "n_features": X_np.shape[1], "n_samples": len(X_np), "train_size": len(X_train), "test_size": len(X_test), "feature_names": numeric_cols, "models": {} } # Train models based on task type if task_type == "classification": models = { "logistic_regression": LogisticRegression(max_iter=1000, random_state=random_state), "random_forest": RandomForestClassifier(n_estimators=100, random_state=random_state, n_jobs=-1), "xgboost": XGBClassifier(n_estimators=100, random_state=random_state, n_jobs=-1), "lightgbm": LGBMClassifier(n_estimators=100, random_state=random_state, n_jobs=-1, verbose=-1), "catboost": CatBoostClassifier(iterations=100, random_state=random_state, verbose=0, allow_writing_files=False) } for model_name, model in models.items(): try: # Train model.fit(X_train, y_train) # Predict y_pred_train = model.predict(X_train) y_pred_test = model.predict(X_test) # Metrics results["models"][model_name] = { "train_metrics": { "accuracy": float(accuracy_score(y_train, y_pred_train)), "precision": float(precision_score(y_train, y_pred_train, average='weighted', zero_division=0)), "recall": float(recall_score(y_train, y_pred_train, average='weighted', zero_division=0)), "f1": float(f1_score(y_train, y_pred_train, average='weighted', zero_division=0)) }, "test_metrics": { "accuracy": float(accuracy_score(y_test, y_pred_test)), "precision": float(precision_score(y_test, y_pred_test, average='weighted', zero_division=0)), "recall": float(recall_score(y_test, y_pred_test, average='weighted', zero_division=0)), "f1": float(f1_score(y_test, y_pred_test, average='weighted', zero_division=0)) } } # Save model using artifact store if ARTIFACT_STORE_AVAILABLE: model_path = save_model_with_store( model_data={ "model": model, "imputer": imputer, "feature_names": numeric_cols }, filename=f"{model_name}.pkl", metadata={ "model_name": model_name, "task_type": "classification", "train_accuracy": float(accuracy_score(y_train, y_pred_train)), "test_accuracy": float(accuracy_score(y_test, y_pred_test)), "features": numeric_cols } ) else: model_path = f"./outputs/models/{model_name}.pkl" Path(model_path).parent.mkdir(parents=True, exist_ok=True) joblib.dump({ "model": model, "imputer": imputer, "feature_names": numeric_cols }, model_path) results["models"][model_name]["model_path"] = model_path except Exception as e: results["models"][model_name] = { "status": "error", "message": str(e) } else: # regression models = { "ridge": Ridge(random_state=random_state), "lasso": Lasso(random_state=random_state), "random_forest": RandomForestRegressor(n_estimators=100, random_state=random_state, n_jobs=-1), "xgboost": XGBRegressor(n_estimators=100, random_state=random_state, n_jobs=-1), "lightgbm": LGBMRegressor(n_estimators=100, random_state=random_state, n_jobs=-1, verbose=-1), "catboost": CatBoostRegressor(iterations=100, random_state=random_state, verbose=0, allow_writing_files=False) } for model_name, model in models.items(): try: # Train model.fit(X_train, y_train) # Predict y_pred_train = model.predict(X_train) y_pred_test = model.predict(X_test) # Metrics results["models"][model_name] = { "train_metrics": { "mse": float(mean_squared_error(y_train, y_pred_train)), "rmse": float(np.sqrt(mean_squared_error(y_train, y_pred_train))), "mae": float(mean_absolute_error(y_train, y_pred_train)), "r2": float(r2_score(y_train, y_pred_train)) }, "test_metrics": { "mse": float(mean_squared_error(y_test, y_pred_test)), "rmse": float(np.sqrt(mean_squared_error(y_test, y_pred_test))), "mae": float(mean_absolute_error(y_test, y_pred_test)), "r2": float(r2_score(y_test, y_pred_test)) } } # Save model using artifact store if ARTIFACT_STORE_AVAILABLE: model_path = save_model_with_store( model_data={ "model": model, "imputer": imputer, "feature_names": numeric_cols }, filename=f"{model_name}.pkl", metadata={ "model_name": model_name, "task_type": "regression", "train_r2": float(r2_score(y_train, y_pred_train)), "test_r2": float(r2_score(y_test, y_pred_test)), "features": numeric_cols } ) else: model_path = f"./outputs/models/{model_name}.pkl" Path(model_path).parent.mkdir(parents=True, exist_ok=True) joblib.dump({ "model": model, "imputer": imputer, "feature_names": numeric_cols }, model_path) results["models"][model_name]["model_path"] = model_path except Exception as e: results["models"][model_name] = { "status": "error", "message": str(e) } # Determine best model best_model_name = None best_score = -float('inf') for model_name, model_results in results["models"].items(): if "test_metrics" in model_results: if task_type == "classification": score = model_results["test_metrics"]["f1"] else: score = model_results["test_metrics"]["r2"] if score > best_score: best_score = score best_model_name = model_name results["best_model"] = { "name": best_model_name, "score": best_score, "model_path": results["models"][best_model_name]["model_path"] if best_model_name else None } # Generate visualizations for best model if VISUALIZATION_AVAILABLE and best_model_name: try: print(f"\n🎨 Generating visualizations for {best_model_name}...") # Load best model model_data = joblib.dump({ "model": models[best_model_name], "imputer": imputer, "feature_names": numeric_cols }, f"./outputs/models/{best_model_name}_temp.pkl") # Get predictions for visualization best_model = models[best_model_name] y_pred_test = best_model.predict(X_test) y_pred_proba = None if hasattr(best_model, "predict_proba") and task_type == "classification": y_pred_proba = best_model.predict_proba(X_test) # Generate model performance plots plot_dir = "./outputs/plots/model_performance" perf_plots = generate_model_performance_plots( y_true=y_test, y_pred=y_pred_test, y_pred_proba=y_pred_proba, task_type=task_type, model_name=best_model_name, output_dir=plot_dir ) results["performance_plots"] = perf_plots["plot_paths"] # Generate feature importance plot if available if hasattr(best_model, "feature_importances_"): feature_importance = dict(zip(numeric_cols, best_model.feature_importances_)) importance_plot = generate_feature_importance_plot( feature_importances=feature_importance, output_path=f"{plot_dir}/feature_importance_{best_model_name}.png" ) results["feature_importance_plot"] = importance_plot print(f" ✓ Generated {len(perf_plots.get('plot_paths', []))} performance plots") results["visualization_generated"] = True except Exception as e: print(f" ⚠️ Could not generate visualizations: {str(e)}") results["visualization_generated"] = False else: results["visualization_generated"] = False return results def generate_model_report(model_path: str, test_data_path: str, target_col: str, output_path: str) -> Dict[str, Any]: """ Generate comprehensive model evaluation report. Args: model_path: Path to saved model file test_data_path: Path to test dataset target_col: Name of target column output_path: Path to save report JSON Returns: Dictionary with model report """ # Validation validate_file_exists(model_path) validate_file_exists(test_data_path) # Load model model_data = joblib.load(model_path) model = model_data["model"] imputer = model_data["imputer"] feature_names = model_data["feature_names"] # Load test data df = load_dataframe(test_data_path) validate_dataframe(df) validate_column_exists(df, target_col) # Prepare features X = df.select(feature_names) y = df[target_col].to_numpy() X_np = imputer.transform(X.to_numpy()) # Predict y_pred = model.predict(X_np) # Determine task type if hasattr(model, "predict_proba"): task_type = "classification" else: task_type = "regression" report = { "model_path": model_path, "task_type": task_type, "n_features": len(feature_names), "n_samples": len(X_np) } # Calculate metrics if task_type == "classification": report["metrics"] = { "accuracy": float(accuracy_score(y, y_pred)), "precision": float(precision_score(y, y_pred, average='weighted', zero_division=0)), "recall": float(recall_score(y, y_pred, average='weighted', zero_division=0)), "f1": float(f1_score(y, y_pred, average='weighted', zero_division=0)) } # Confusion matrix cm = confusion_matrix(y, y_pred) report["confusion_matrix"] = cm.tolist() # Classification report class_report = classification_report(y, y_pred, output_dict=True, zero_division=0) report["classification_report"] = class_report else: # regression report["metrics"] = { "mse": float(mean_squared_error(y, y_pred)), "rmse": float(np.sqrt(mean_squared_error(y, y_pred))), "mae": float(mean_absolute_error(y, y_pred)), "r2": float(r2_score(y, y_pred)) } # Feature importance if hasattr(model, "feature_importances_"): importances = model.feature_importances_ feature_importance = [ {"feature": name, "importance": float(imp)} for name, imp in zip(feature_names, importances) ] feature_importance.sort(key=lambda x: x["importance"], reverse=True) report["feature_importance"] = feature_importance[:20] # Top 20 # SHAP values (for top 10 features) try: # Use TreeExplainer for tree-based models if hasattr(model, "feature_importances_"): explainer = shap.TreeExplainer(model) else: # Use KernelExplainer for other models (sample for speed) sample_size = min(100, len(X_np)) explainer = shap.KernelExplainer( model.predict, X_np[:sample_size] ) shap_values = explainer.shap_values(X_np[:100]) # First 100 samples # Calculate mean absolute SHAP values if isinstance(shap_values, list): # Multi-class shap_values = shap_values[0] mean_shap = np.abs(shap_values).mean(axis=0) shap_importance = [ {"feature": name, "shap_value": float(val)} for name, val in zip(feature_names, mean_shap) ] shap_importance.sort(key=lambda x: x["shap_value"], reverse=True) report["shap_feature_importance"] = shap_importance[:10] # Top 10 except Exception as e: report["shap_error"] = f"Could not compute SHAP values: {str(e)}" # Save report Path(output_path).parent.mkdir(parents=True, exist_ok=True) with open(output_path, 'w') as f: json.dump(report, f, indent=2) report["output_path"] = output_path return report