| import pandas as pd |
| import numpy as np |
| from sklearn.model_selection import train_test_split, cross_val_score, StratifiedKFold, KFold |
| from sklearn.ensemble import ( |
| RandomForestClassifier, RandomForestRegressor, |
| GradientBoostingClassifier, GradientBoostingRegressor, |
| VotingClassifier, VotingRegressor, |
| ) |
| from sklearn.linear_model import LogisticRegression, Ridge, Lasso |
| from sklearn.svm import SVC, SVR |
| from sklearn.preprocessing import StandardScaler, LabelEncoder, MinMaxScaler |
| from sklearn.metrics import ( |
| accuracy_score, classification_report, mean_squared_error, |
| r2_score, f1_score, roc_auc_score, confusion_matrix, |
| mean_absolute_error, |
| ) |
| from sklearn.pipeline import Pipeline |
| from sklearn.impute import SimpleImputer |
| from typing import Dict, Any, Tuple, Optional, List |
| import warnings |
| warnings.filterwarnings('ignore') |
|
|
| try: |
| import xgboost as xgb |
| XGB_AVAILABLE = True |
| except ImportError: |
| XGB_AVAILABLE = False |
|
|
| try: |
| import lightgbm as lgb |
| LGB_AVAILABLE = True |
| except ImportError: |
| LGB_AVAILABLE = False |
|
|
|
|
| class MLPipeline: |
| """ |
| A powerful, production-ready Machine Learning pipeline supporting |
| classification and regression with ensemble methods, cross-validation, |
| feature importance, and detailed metrics. |
| """ |
|
|
| def __init__(self, task_type: str = "classification", model_name: str = "Random Forest"): |
| self.task_type = task_type |
| self.model_name = model_name |
| self.model = None |
| self.scaler = StandardScaler() |
| self.imputer = SimpleImputer(strategy='median') |
| self.label_encoder = LabelEncoder() |
| self.is_fitted = False |
| self.feature_names: List[str] = [] |
| self.metrics: Dict[str, Any] = {} |
| self.X_test = None |
| self.y_test = None |
| self.y_pred = None |
| self.classes_: Optional[np.ndarray] = None |
|
|
| |
| |
| |
|
|
| def _build_model(self): |
| name = self.model_name |
| if self.task_type == "classification": |
| models = { |
| "Random Forest": RandomForestClassifier( |
| n_estimators=200, max_depth=None, min_samples_split=2, |
| random_state=42, n_jobs=-1, class_weight='balanced' |
| ), |
| "Gradient Boosting": GradientBoostingClassifier( |
| n_estimators=150, learning_rate=0.1, max_depth=5, |
| random_state=42 |
| ), |
| "Logistic Regression": LogisticRegression( |
| max_iter=1000, random_state=42, class_weight='balanced' |
| ), |
| "SVM": SVC(probability=True, kernel='rbf', random_state=42, class_weight='balanced'), |
| } |
| return models.get(name, models["Random Forest"]) |
| else: |
| models = { |
| "Random Forest": RandomForestRegressor( |
| n_estimators=200, max_depth=None, random_state=42, n_jobs=-1 |
| ), |
| "Gradient Boosting": GradientBoostingRegressor( |
| n_estimators=150, learning_rate=0.1, max_depth=5, random_state=42 |
| ), |
| "Ridge Regression": Ridge(alpha=1.0), |
| "Lasso Regression": Lasso(alpha=1.0, max_iter=5000), |
| "SVM": SVR(kernel='rbf'), |
| } |
| return models.get(name, models["Random Forest"]) |
|
|
| def _preprocess_X(self, df: pd.DataFrame, fit: bool = True) -> np.ndarray: |
| df = df.copy() |
|
|
| |
| for col in df.select_dtypes(include=['object', 'category']).columns: |
| le = LabelEncoder() |
| df[col] = le.fit_transform(df[col].astype(str)) |
|
|
| |
| for col in df.select_dtypes(include=['bool']).columns: |
| df[col] = df[col].astype(int) |
|
|
| arr = df.values.astype(float) |
|
|
| if fit: |
| arr = self.imputer.fit_transform(arr) |
| arr = self.scaler.fit_transform(arr) |
| else: |
| arr = self.imputer.transform(arr) |
| arr = self.scaler.transform(arr) |
|
|
| return arr |
|
|
| |
| |
| |
|
|
| def preprocess( |
| self, df: pd.DataFrame, target_col: Optional[str] = None |
| ) -> Tuple[np.ndarray, Optional[np.ndarray]]: |
| df = df.copy() |
|
|
| if target_col and target_col in df.columns: |
| y_raw = df[target_col] |
| if self.task_type == "classification": |
| self.label_encoder = LabelEncoder() |
| y = self.label_encoder.fit_transform(y_raw.astype(str)) |
| self.classes_ = self.label_encoder.classes_ |
| else: |
| y = y_raw.values.astype(float) |
| df = df.drop(columns=[target_col]) |
| else: |
| y = None |
|
|
| |
| df = pd.get_dummies(df, drop_first=True) |
| self.feature_names = df.columns.tolist() |
|
|
| X = self._preprocess_X(df, fit=True) |
| return X, y |
|
|
| def train( |
| self, |
| X: np.ndarray, |
| y: np.ndarray, |
| test_size: float = 0.2, |
| ) -> Dict[str, Any]: |
| """Train the model and return comprehensive metrics.""" |
|
|
| if isinstance(X, pd.DataFrame): |
| X = self._preprocess_X(X, fit=True) |
|
|
| |
| stratify = None |
| if self.task_type == "classification": |
| unique, counts = np.unique(y, return_counts=True) |
| if len(unique) >= 2 and all(c >= 2 for c in counts): |
| stratify = y |
|
|
| X_train, X_test, y_train, y_test = train_test_split( |
| X, y, test_size=test_size, random_state=42, stratify=stratify |
| ) |
| self.X_test = X_test |
| self.y_test = y_test |
|
|
| self.model = self._build_model() |
| self.model.fit(X_train, y_train) |
| self.is_fitted = True |
|
|
| y_pred = self.model.predict(X_test) |
| self.y_pred = y_pred |
|
|
| self.metrics = self._compute_metrics(y_test, y_pred, X, y) |
| return self.metrics |
|
|
| def _compute_metrics( |
| self, |
| y_test: np.ndarray, |
| y_pred: np.ndarray, |
| X_full: np.ndarray, |
| y_full: np.ndarray, |
| ) -> Dict[str, Any]: |
| metrics: Dict[str, Any] = {} |
|
|
| if self.task_type == "classification": |
| metrics["accuracy"] = round(float(accuracy_score(y_test, y_pred)), 4) |
| metrics["f1_score"] = round(float(f1_score(y_test, y_pred, average='weighted')), 4) |
|
|
| |
| if len(np.unique(y_full)) == 2 and hasattr(self.model, 'predict_proba'): |
| try: |
| proba = self.model.predict_proba(self.X_test)[:, 1] |
| metrics["roc_auc"] = round(float(roc_auc_score(y_test, proba)), 4) |
| except Exception: |
| pass |
|
|
| |
| try: |
| cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=42) |
| cv_scores = cross_val_score(self.model, X_full, y_full, cv=cv, scoring='accuracy', n_jobs=-1) |
| metrics["cv_mean_accuracy"] = round(float(cv_scores.mean()), 4) |
| metrics["cv_std"] = round(float(cv_scores.std()), 4) |
| except Exception: |
| pass |
|
|
| |
| try: |
| class_names = [str(c) for c in self.classes_] if self.classes_ is not None else None |
| metrics["classification_report"] = classification_report( |
| y_test, y_pred, target_names=class_names |
| ) |
| except Exception: |
| pass |
|
|
| |
| try: |
| cm = confusion_matrix(y_test, y_pred) |
| metrics["confusion_matrix"] = cm.tolist() |
| except Exception: |
| pass |
|
|
| else: |
| metrics["mse"] = round(float(mean_squared_error(y_test, y_pred)), 4) |
| metrics["rmse"] = round(float(np.sqrt(mean_squared_error(y_test, y_pred))), 4) |
| metrics["mae"] = round(float(mean_absolute_error(y_test, y_pred)), 4) |
| metrics["r2_score"] = round(float(r2_score(y_test, y_pred)), 4) |
|
|
| |
| try: |
| cv = KFold(n_splits=5, shuffle=True, random_state=42) |
| cv_scores = cross_val_score(self.model, X_full, y_full, cv=cv, scoring='r2', n_jobs=-1) |
| metrics["cv_mean_r2"] = round(float(cv_scores.mean()), 4) |
| metrics["cv_std"] = round(float(cv_scores.std()), 4) |
| except Exception: |
| pass |
|
|
| return metrics |
|
|
| def predict(self, X: np.ndarray) -> np.ndarray: |
| if not self.is_fitted: |
| raise ValueError("Model must be trained before prediction") |
| if isinstance(X, pd.DataFrame): |
| X = self._preprocess_X(X, fit=False) |
| return self.model.predict(X) |
|
|
| def predict_proba(self, X: np.ndarray) -> np.ndarray: |
| if not self.is_fitted: |
| raise ValueError("Model must be trained before prediction") |
| if self.task_type != "classification": |
| raise ValueError("predict_proba only available for classification") |
| if not hasattr(self.model, 'predict_proba'): |
| raise ValueError(f"{self.model_name} does not support probability estimates") |
| if isinstance(X, pd.DataFrame): |
| X = self._preprocess_X(X, fit=False) |
| return self.model.predict_proba(X) |
|
|
| def get_feature_importance(self) -> pd.DataFrame: |
| if not self.is_fitted: |
| raise ValueError("Model must be trained first") |
|
|
| if hasattr(self.model, 'feature_importances_'): |
| importance = self.model.feature_importances_ |
| elif hasattr(self.model, 'coef_'): |
| coef = self.model.coef_ |
| importance = np.abs(coef).mean(axis=0) if coef.ndim > 1 else np.abs(coef) |
| else: |
| |
| importance = np.zeros(len(self.feature_names)) |
|
|
| return pd.DataFrame({ |
| "feature": self.feature_names[:len(importance)], |
| "importance": importance, |
| }).sort_values("importance", ascending=False).reset_index(drop=True) |
|
|
| def get_predictions_df(self, df_original: pd.DataFrame) -> pd.DataFrame: |
| """Returns original df with predictions appended.""" |
| if not self.is_fitted: |
| raise ValueError("Model not trained yet") |
| result = df_original.copy() |
| |
| feature_df = df_original[[f for f in self.feature_names if f in df_original.columns]] |
| preds = self.predict(feature_df) |
| result["prediction"] = preds |
| return result |
|
|
|
|
| |
| |
| |
|
|
| class XGBoostPipeline(MLPipeline): |
| """XGBoost-based pipeline with early stopping and full metrics.""" |
|
|
| def __init__(self, task_type: str = "classification"): |
| super().__init__(task_type=task_type, model_name="XGBoost") |
|
|
| def _build_xgb_model(self, n_classes: int = 2): |
| if self.task_type == "classification": |
| objective = "multi:softprob" if n_classes > 2 else "binary:logistic" |
| return xgb.XGBClassifier( |
| n_estimators=200, |
| max_depth=6, |
| learning_rate=0.05, |
| subsample=0.8, |
| colsample_bytree=0.8, |
| eval_metric='logloss', |
| random_state=42, |
| n_jobs=-1, |
| objective=objective, |
| ) |
| else: |
| return xgb.XGBRegressor( |
| n_estimators=200, |
| max_depth=6, |
| learning_rate=0.05, |
| subsample=0.8, |
| colsample_bytree=0.8, |
| random_state=42, |
| n_jobs=-1, |
| ) |
|
|
| def train(self, X: np.ndarray, y: np.ndarray, test_size: float = 0.2) -> Dict[str, Any]: |
| if not XGB_AVAILABLE: |
| raise ImportError("xgboost is not installed. Run: pip install xgboost") |
|
|
| if isinstance(X, pd.DataFrame): |
| X = self._preprocess_X(X, fit=True) |
|
|
| stratify = None |
| if self.task_type == "classification": |
| unique, counts = np.unique(y, return_counts=True) |
| if len(unique) >= 2 and all(c >= 2 for c in counts): |
| stratify = y |
|
|
| X_train, X_test, y_train, y_test = train_test_split( |
| X, y, test_size=test_size, random_state=42, stratify=stratify |
| ) |
| self.X_test = X_test |
| self.y_test = y_test |
|
|
| n_classes = len(np.unique(y)) if self.task_type == "classification" else 2 |
| self.model = self._build_xgb_model(n_classes=n_classes) |
|
|
| self.model.fit( |
| X_train, y_train, |
| eval_set=[(X_test, y_test)], |
| verbose=False, |
| ) |
| self.is_fitted = True |
|
|
| y_pred = self.model.predict(X_test) |
| self.y_pred = y_pred |
| self.metrics = self._compute_metrics(y_test, y_pred, X, y) |
| return self.metrics |
|
|
|
|
| |
| |
| |
|
|
| class LightGBMPipeline(MLPipeline): |
| """LightGBM pipeline — fastest gradient boosting for large datasets.""" |
|
|
| def __init__(self, task_type: str = "classification"): |
| super().__init__(task_type=task_type, model_name="LightGBM") |
|
|
| def train(self, X: np.ndarray, y: np.ndarray, test_size: float = 0.2) -> Dict[str, Any]: |
| if not LGB_AVAILABLE: |
| raise ImportError("lightgbm is not installed. Run: pip install lightgbm") |
|
|
| if isinstance(X, pd.DataFrame): |
| X = self._preprocess_X(X, fit=True) |
|
|
| stratify = None |
| if self.task_type == "classification": |
| unique, counts = np.unique(y, return_counts=True) |
| if all(c >= 2 for c in counts): |
| stratify = y |
|
|
| X_train, X_test, y_train, y_test = train_test_split( |
| X, y, test_size=test_size, random_state=42, stratify=stratify |
| ) |
| self.X_test = X_test |
| self.y_test = y_test |
|
|
| if self.task_type == "classification": |
| n_classes = len(np.unique(y)) |
| objective = "multiclass" if n_classes > 2 else "binary" |
| self.model = lgb.LGBMClassifier( |
| n_estimators=200, learning_rate=0.05, |
| num_leaves=31, random_state=42, |
| objective=objective, n_jobs=-1, |
| class_weight='balanced', |
| verbose=-1, |
| ) |
| else: |
| self.model = lgb.LGBMRegressor( |
| n_estimators=200, learning_rate=0.05, |
| num_leaves=31, random_state=42, |
| n_jobs=-1, verbose=-1, |
| ) |
|
|
| self.model.fit(X_train, y_train, eval_set=[(X_test, y_test)]) |
| self.is_fitted = True |
|
|
| y_pred = self.model.predict(X_test) |
| self.y_pred = y_pred |
| self.metrics = self._compute_metrics(y_test, y_pred, X, y) |
| return self.metrics |
|
|
|
|
| |
| |
| |
|
|
| class EnsemblePipeline(MLPipeline): |
| """ |
| Voting ensemble of Random Forest + Gradient Boosting (+ XGBoost if available). |
| Best overall accuracy across most datasets. |
| """ |
|
|
| def __init__(self, task_type: str = "classification"): |
| super().__init__(task_type=task_type, model_name="Ensemble") |
|
|
| def train(self, X: np.ndarray, y: np.ndarray, test_size: float = 0.2) -> Dict[str, Any]: |
| if isinstance(X, pd.DataFrame): |
| X = self._preprocess_X(X, fit=True) |
|
|
| stratify = None |
| if self.task_type == "classification": |
| unique, counts = np.unique(y, return_counts=True) |
| if all(c >= 2 for c in counts): |
| stratify = y |
|
|
| X_train, X_test, y_train, y_test = train_test_split( |
| X, y, test_size=test_size, random_state=42, stratify=stratify |
| ) |
| self.X_test = X_test |
| self.y_test = y_test |
|
|
| if self.task_type == "classification": |
| estimators = [ |
| ("rf", RandomForestClassifier(n_estimators=150, random_state=42, n_jobs=-1, class_weight='balanced')), |
| ("gb", GradientBoostingClassifier(n_estimators=100, random_state=42)), |
| ] |
| if XGB_AVAILABLE: |
| estimators.append(("xgb", xgb.XGBClassifier( |
| n_estimators=100, |
| eval_metric='logloss', random_state=42, n_jobs=-1, |
| ))) |
| self.model = VotingClassifier(estimators=estimators, voting='soft', n_jobs=-1) |
| else: |
| estimators = [ |
| ("rf", RandomForestRegressor(n_estimators=150, random_state=42, n_jobs=-1)), |
| ("gb", GradientBoostingRegressor(n_estimators=100, random_state=42)), |
| ] |
| if XGB_AVAILABLE: |
| estimators.append(("xgb", xgb.XGBRegressor(n_estimators=100, random_state=42, n_jobs=-1))) |
| self.model = VotingRegressor(estimators=estimators, n_jobs=-1) |
|
|
| self.model.fit(X_train, y_train) |
| self.is_fitted = True |
|
|
| y_pred = self.model.predict(X_test) |
| self.y_pred = y_pred |
| self.metrics = self._compute_metrics(y_test, y_pred, X, y) |
| return self.metrics |
|
|
| def get_feature_importance(self) -> pd.DataFrame: |
| """Average feature importances from sub-estimators that support it.""" |
| importances = [] |
| estimators = self.model.estimators_ |
| for est in estimators: |
| if hasattr(est, 'feature_importances_'): |
| importances.append(est.feature_importances_) |
|
|
| if not importances: |
| return pd.DataFrame({"feature": self.feature_names, "importance": 0.0}) |
|
|
| avg_importance = np.mean(importances, axis=0) |
| return pd.DataFrame({ |
| "feature": self.feature_names[:len(avg_importance)], |
| "importance": avg_importance, |
| }).sort_values("importance", ascending=False).reset_index(drop=True) |