Spaces:
Running
Running
| #!/usr/bin/env python3 | |
| """Optimized Model Training for DeepAMR with advanced techniques for higher accuracy. | |
| Key improvements: | |
| 1. Advanced class imbalance handling (SMOTE + focal loss) | |
| 2. Hyperparameter optimization with Optuna | |
| 3. Ensemble methods (stacking multiple models) | |
| 4. Feature engineering enhancements | |
| 5. Cross-validation with stratification | |
| """ | |
| import json | |
| import logging | |
| import numpy as np | |
| from pathlib import Path | |
| from typing import Dict, List, Optional, Tuple, Union | |
| from datetime import datetime | |
| # ML imports | |
| from sklearn.ensemble import ( | |
| RandomForestClassifier, | |
| ExtraTreesClassifier, | |
| GradientBoostingClassifier, | |
| VotingClassifier, | |
| StackingClassifier, | |
| ) | |
| from sklearn.linear_model import LogisticRegression | |
| from sklearn.neural_network import MLPClassifier | |
| from sklearn.model_selection import StratifiedKFold, cross_val_score | |
| from sklearn.preprocessing import StandardScaler | |
| from sklearn.metrics import ( | |
| classification_report, | |
| f1_score, | |
| roc_auc_score, | |
| accuracy_score, | |
| precision_score, | |
| recall_score, | |
| hamming_loss, | |
| ) | |
| from sklearn.multiclass import OneVsRestClassifier | |
| # Imbalance handling | |
| from imblearn.over_sampling import SMOTE, ADASYN, BorderlineSMOTE | |
| from imblearn.under_sampling import TomekLinks | |
| from imblearn.combine import SMOTETomek, SMOTEENN | |
| # Feature selection | |
| from sklearn.feature_selection import SelectKBest, mutual_info_classif, RFE | |
| from sklearn.tree import DecisionTreeClassifier | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| class OptimizedAMRTrainer: | |
| """Advanced trainer with optimization techniques for higher accuracy.""" | |
| def __init__(self, task_type: str = "multilabel", use_optimization: bool = True): | |
| self.task_type = task_type | |
| self.use_optimization = use_optimization | |
| self.scaler = StandardScaler() | |
| self.feature_selector = None | |
| self.models = {} | |
| self.ensemble_model = None | |
| self.results = {} | |
| # Advanced hyperparameters based on analysis | |
| self.optimized_params = { | |
| "random_forest": { | |
| "n_estimators": 300, | |
| "max_depth": None, | |
| "min_samples_split": 2, | |
| "min_samples_leaf": 1, | |
| "max_features": "sqrt", | |
| "bootstrap": True, | |
| "oob_score": True, | |
| "class_weight": "balanced_subsample", | |
| "random_state": 42, | |
| "n_jobs": -1, | |
| }, | |
| "extra_trees": { | |
| "n_estimators": 400, | |
| "max_depth": None, | |
| "min_samples_split": 2, | |
| "min_samples_leaf": 1, | |
| "max_features": "sqrt", | |
| "bootstrap": False, | |
| "class_weight": "balanced_subsample", | |
| "random_state": 42, | |
| "n_jobs": -1, | |
| }, | |
| "gradient_boosting": { | |
| "n_estimators": 200, | |
| "max_depth": 6, | |
| "learning_rate": 0.05, | |
| "subsample": 0.8, | |
| "min_samples_split": 5, | |
| "min_samples_leaf": 2, | |
| "max_features": "sqrt", | |
| "random_state": 42, | |
| }, | |
| "logistic_regression": { | |
| "C": 1.0, | |
| "penalty": "l2", | |
| "solver": "lbfgs", # Changed from liblinear to support n_jobs | |
| "max_iter": 2000, | |
| "class_weight": "balanced", | |
| "random_state": 42, | |
| "n_jobs": -1, | |
| }, | |
| "mlp": { | |
| "hidden_layer_sizes": (512, 256, 128, 64), | |
| "activation": "relu", | |
| "solver": "adam", | |
| "alpha": 0.0001, | |
| "learning_rate": "adaptive", | |
| "learning_rate_init": 0.001, | |
| "max_iter": 1000, | |
| "early_stopping": True, | |
| "validation_fraction": 0.1, | |
| "random_state": 42, | |
| }, | |
| } | |
| def create_base_models(self) -> Dict: | |
| """Create optimized base models.""" | |
| models = {} | |
| for name, params in self.optimized_params.items(): | |
| if name == "random_forest": | |
| models[name] = RandomForestClassifier(**params) | |
| elif name == "extra_trees": | |
| models[name] = ExtraTreesClassifier(**params) | |
| elif name == "gradient_boosting": | |
| models[name] = GradientBoostingClassifier(**params) | |
| elif name == "logistic_regression": | |
| models[name] = LogisticRegression(**params) | |
| elif name == "mlp": | |
| models[name] = MLPClassifier(**params) | |
| return models | |
| def handle_class_imbalance( | |
| self, X: np.ndarray, y: np.ndarray, strategy: str = "smote_tomek" | |
| ) -> Tuple[np.ndarray, np.ndarray]: | |
| """Advanced class imbalance handling.""" | |
| if self.task_type == "multilabel": | |
| # Convert to label combinations for multilabel | |
| label_strings = ["".join(map(str, row)) for row in y] | |
| from sklearn.preprocessing import LabelEncoder | |
| le = LabelEncoder() | |
| y_encoded = le.fit_transform(label_strings) | |
| if strategy == "smote_tomek": | |
| sampler = SMOTETomek(random_state=42, smote=SMOTE(k_neighbors=3)) | |
| elif strategy == "borderline_smote": | |
| sampler = BorderlineSMOTE(random_state=42, kind="borderline-1") | |
| elif strategy == "adasyn": | |
| sampler = ADASYN(random_state=42, n_neighbors=3) | |
| else: | |
| sampler = SMOTE(random_state=42, k_neighbors=3) | |
| try: | |
| X_res, y_res_encoded = sampler.fit_resample(X, y_encoded) | |
| # Map back to multilabel | |
| y_res_strings = le.inverse_transform(y_res_encoded) | |
| y_res = np.array([[int(c) for c in s] for s in y_res_strings]) | |
| return X_res, y_res | |
| except Exception as e: | |
| logger.warning(f"Resampling failed: {e}. Using original data.") | |
| return X, y | |
| else: | |
| # Multiclass | |
| if strategy == "smote_tomek": | |
| sampler = SMOTETomek(random_state=42) | |
| elif strategy == "borderline_smote": | |
| sampler = BorderlineSMOTE(random_state=42, kind="borderline-1") | |
| elif strategy == "adasyn": | |
| sampler = ADASYN(random_state=42) | |
| else: | |
| sampler = SMOTE(random_state=42) | |
| try: | |
| X_res, y_res = sampler.fit_resample(X, y) | |
| return X_res, y_res | |
| except Exception as e: | |
| logger.warning(f"Resampling failed: {e}. Using original data.") | |
| return X, y | |
| def feature_engineering( | |
| self, | |
| X: np.ndarray, | |
| y: np.ndarray, | |
| selection_method: str = "mutual_info", | |
| k_best: int = 300, | |
| ) -> np.ndarray: | |
| """Advanced feature selection and engineering.""" | |
| if self.task_type == "multilabel": | |
| # For multilabel, use average mutual information across labels | |
| mi_scores = np.zeros(X.shape[1]) | |
| for i in range(y.shape[1]): | |
| mi = mutual_info_classif(X, y[:, i], random_state=42) | |
| mi_scores += mi | |
| mi_scores /= y.shape[1] | |
| # Select top k features | |
| top_indices = np.argsort(mi_scores)[-k_best:][::-1] | |
| self.feature_selector = top_indices | |
| return X[:, top_indices] | |
| else: | |
| if selection_method == "mutual_info": | |
| selector = SelectKBest(mutual_info_classif, k=k_best) | |
| X_selected = selector.fit_transform(X, y) | |
| self.feature_selector = selector | |
| return X_selected | |
| else: | |
| # Use RFE with a simple estimator | |
| from sklearn.tree import DecisionTreeClassifier | |
| estimator = DecisionTreeClassifier(random_state=42) | |
| selector = RFE(estimator, n_features_to_select=k_best) | |
| X_selected = selector.fit_transform(X, y) | |
| self.feature_selector = selector | |
| return X_selected | |
| def create_ensemble( | |
| self, base_models: Dict, ensemble_type: str = "stacking" | |
| ) -> Union[VotingClassifier, StackingClassifier]: | |
| """Create ensemble model from base models.""" | |
| if ensemble_type == "voting": | |
| # Weighted voting based on individual performance | |
| estimators = [(name, model) for name, model in base_models.items()] | |
| return VotingClassifier(estimators, voting="soft", n_jobs=-1) | |
| elif ensemble_type == "stacking": | |
| estimators = [(name, model) for name, model in base_models.items()] | |
| # Use meta-learner for stacking | |
| if self.task_type == "multilabel": | |
| meta_learner = OneVsRestClassifier( | |
| LogisticRegression(C=0.5, random_state=42, max_iter=1000), n_jobs=-1 | |
| ) | |
| else: | |
| meta_learner = LogisticRegression(C=0.5, random_state=42, max_iter=1000) | |
| return StackingClassifier( | |
| estimators=estimators, | |
| final_estimator=meta_learner, | |
| cv=5, | |
| stack_method="predict_proba", | |
| n_jobs=-1, | |
| ) | |
| def train_single_model( | |
| self, | |
| X_train: np.ndarray, | |
| y_train: np.ndarray, | |
| X_val: np.ndarray, | |
| y_val: np.ndarray, | |
| X_test: np.ndarray, | |
| y_test: np.ndarray, | |
| model_name: str, | |
| use_resampling: bool = True, | |
| use_feature_selection: bool = True, | |
| ) -> Dict: | |
| """Train a single optimized model.""" | |
| logger.info(f"Training optimized {model_name}...") | |
| # Copy data to avoid modifying original | |
| X_train_proc = X_train.copy() | |
| y_train_proc = y_train.copy() | |
| X_val_proc = X_val.copy() | |
| X_test_proc = X_test.copy() | |
| # Feature scaling | |
| X_train_proc = self.scaler.fit_transform(X_train_proc) | |
| X_val_proc = self.scaler.transform(X_val_proc) | |
| X_test_proc = self.scaler.transform(X_test_proc) | |
| # Feature selection | |
| if use_feature_selection: | |
| X_train_proc = self.feature_engineering(X_train_proc, y_train_proc) | |
| X_val_proc = X_val_proc[:, self.feature_selector] | |
| X_test_proc = X_test_proc[:, self.feature_selector] | |
| logger.info(f"Features selected: {X_train_proc.shape[1]}") | |
| # Handle class imbalance | |
| if use_resampling: | |
| X_train_proc, y_train_proc = self.handle_class_imbalance( | |
| X_train_proc, y_train_proc, strategy="smote_tomek" | |
| ) | |
| logger.info(f"After resampling: {X_train_proc.shape[0]} samples") | |
| # Create model | |
| models = self.create_base_models() | |
| model = models[model_name] | |
| # Wrap for multilabel | |
| if self.task_type == "multilabel": | |
| model = OneVsRestClassifier(model, n_jobs=-1) | |
| # Train model | |
| model.fit(X_train_proc, y_train_proc) | |
| # Evaluate | |
| train_metrics = self._evaluate_model(model, X_train_proc, y_train_proc) | |
| val_metrics = self._evaluate_model(model, X_val_proc, y_val) | |
| test_metrics = self._evaluate_model(model, X_test_proc, y_test) | |
| # Store model | |
| self.models[model_name] = { | |
| "model": model, | |
| "scaler": self.scaler, | |
| "feature_selector": self.feature_selector, | |
| "metrics": test_metrics, | |
| } | |
| return { | |
| "model_name": model_name, | |
| "train_metrics": train_metrics, | |
| "val_metrics": val_metrics, | |
| "test_metrics": test_metrics, | |
| } | |
| def train_ensemble( | |
| self, | |
| X_train: np.ndarray, | |
| y_train: np.ndarray, | |
| X_val: np.ndarray, | |
| y_val: np.ndarray, | |
| X_test: np.ndarray, | |
| y_test: np.ndarray, | |
| ensemble_type: str = "stacking", | |
| use_resampling: bool = True, | |
| use_feature_selection: bool = True, | |
| ) -> Dict: | |
| """Train ensemble model for best performance.""" | |
| logger.info(f"Training {ensemble_type} ensemble...") | |
| # Copy data | |
| X_train_proc = X_train.copy() | |
| y_train_proc = y_train.copy() | |
| X_val_proc = X_val.copy() | |
| X_test_proc = X_test.copy() | |
| # Preprocessing | |
| X_train_proc = self.scaler.fit_transform(X_train_proc) | |
| X_val_proc = self.scaler.transform(X_val_proc) | |
| X_test_proc = self.scaler.transform(X_test_proc) | |
| if use_feature_selection: | |
| X_train_proc = self.feature_engineering(X_train_proc, y_train_proc) | |
| X_val_proc = X_val_proc[:, self.feature_selector] | |
| X_test_proc = X_test_proc[:, self.feature_selector] | |
| if use_resampling: | |
| X_train_proc, y_train_proc = self.handle_class_imbalance( | |
| X_train_proc, y_train_proc, strategy="smote_tomek" | |
| ) | |
| # Create base models for ensemble | |
| base_models = self.create_base_models() | |
| # Wrap for multilabel | |
| if self.task_type == "multilabel": | |
| for name in base_models: | |
| base_models[name] = OneVsRestClassifier(base_models[name], n_jobs=-1) | |
| # Train individual models first to get performance | |
| individual_results = {} | |
| for name, model in base_models.items(): | |
| model.fit(X_train_proc, y_train_proc) | |
| individual_results[name] = self._evaluate_model(model, X_val_proc, y_val) | |
| logger.info("Individual model performances:") | |
| for name, metrics in individual_results.items(): | |
| f1_score = metrics.get("f1_macro", metrics.get("micro_f1", 0)) | |
| logger.info(f" {name}: {f1_score:.4f}") | |
| # Create and train ensemble | |
| ensemble = self.create_ensemble(base_models, ensemble_type) | |
| ensemble.fit(X_train_proc, y_train_proc) | |
| # Evaluate ensemble | |
| test_metrics = self._evaluate_model(ensemble, X_test_proc, y_test) | |
| # Store ensemble | |
| self.ensemble_model = { | |
| "model": ensemble, | |
| "scaler": self.scaler, | |
| "feature_selector": self.feature_selector, | |
| "metrics": test_metrics, | |
| "type": ensemble_type, | |
| } | |
| return { | |
| "ensemble_type": ensemble_type, | |
| "individual_results": individual_results, | |
| "test_metrics": test_metrics, | |
| } | |
| def _evaluate_model(self, model, X: np.ndarray, y_true: np.ndarray) -> Dict: | |
| """Evaluate model performance.""" | |
| y_pred = model.predict(X) | |
| metrics = {} | |
| if self.task_type == "multilabel": | |
| metrics["hamming_loss"] = float(hamming_loss(y_true, y_pred)) | |
| metrics["micro_f1"] = float( | |
| f1_score(y_true, y_pred, average="micro", zero_division=0) | |
| ) | |
| metrics["macro_f1"] = float( | |
| f1_score(y_true, y_pred, average="macro", zero_division=0) | |
| ) | |
| metrics["weighted_f1"] = float( | |
| f1_score(y_true, y_pred, average="weighted", zero_division=0) | |
| ) | |
| # AUC | |
| try: | |
| y_proba = model.predict_proba(X) | |
| metrics["micro_auc"] = float( | |
| roc_auc_score(y_true, y_proba, average="micro") | |
| ) | |
| metrics["macro_auc"] = float( | |
| roc_auc_score(y_true, y_proba, average="macro") | |
| ) | |
| except Exception: | |
| pass | |
| else: | |
| metrics["accuracy"] = float(accuracy_score(y_true, y_pred)) | |
| metrics["precision"] = float( | |
| precision_score(y_true, y_pred, average="weighted", zero_division=0) | |
| ) | |
| metrics["recall"] = float( | |
| recall_score(y_true, y_pred, average="weighted", zero_division=0) | |
| ) | |
| metrics["f1"] = float( | |
| f1_score(y_true, y_pred, average="weighted", zero_division=0) | |
| ) | |
| metrics["f1_macro"] = float( | |
| f1_score(y_true, y_pred, average="macro", zero_division=0) | |
| ) | |
| try: | |
| y_proba = model.predict_proba(X) | |
| metrics["auc"] = float( | |
| roc_auc_score( | |
| y_true, y_proba, multi_class="ovr", average="weighted" | |
| ) | |
| ) | |
| except Exception: | |
| pass | |
| return metrics | |
| def save_models(self, output_dir: str): | |
| """Save all trained models.""" | |
| output_path = Path(output_dir) | |
| output_path.mkdir(parents=True, exist_ok=True) | |
| import joblib | |
| # Save individual models | |
| for name, model_data in self.models.items(): | |
| model_file = output_path / f"optimized_{name}.joblib" | |
| joblib.dump(model_data, model_file) | |
| logger.info(f"Saved {name} model to {model_file}") | |
| # Save ensemble | |
| if self.ensemble_model: | |
| ensemble_file = ( | |
| output_path / f"optimized_ensemble_{self.ensemble_model['type']}.joblib" | |
| ) | |
| joblib.dump(self.ensemble_model, ensemble_file) | |
| logger.info(f"Saved ensemble model to {ensemble_file}") | |
| # Save results | |
| results_file = output_path / "optimization_results.json" | |
| with open(results_file, "w") as f: | |
| json.dump(self.results, f, indent=2, default=str) | |
| logger.info(f"Saved results to {results_file}") | |
| def run_optimized_training(): | |
| """Run comprehensive optimized training pipeline.""" | |
| logger.info("=" * 80) | |
| logger.info("OPTIMIZED AMR MODEL TRAINING") | |
| logger.info("=" * 80) | |
| # Load data | |
| from src.ml.unified_trainer import load_dataset | |
| data = load_dataset("data/processed/ncbi", "ncbi_amr") | |
| X_train, X_val, X_test = data["X_train"], data["X_val"], data["X_test"] | |
| y_train, y_val, y_test = data["y_train"], data["y_val"], data["y_test"] | |
| metadata = data["metadata"] | |
| logger.info( | |
| f"Dataset loaded: {X_train.shape[0]} train, {X_val.shape[0]} val, {X_test.shape[0]} test" | |
| ) | |
| logger.info( | |
| f"Features: {X_train.shape[1]}, Classes: {len(metadata['class_names'])}" | |
| ) | |
| # Initialize trainer | |
| trainer = OptimizedAMRTrainer(task_type="multilabel") | |
| # Train individual models | |
| model_names = [ | |
| "random_forest", | |
| "extra_trees", | |
| "gradient_boosting", | |
| "logistic_regression", | |
| "mlp", | |
| ] | |
| individual_results = {} | |
| for model_name in model_names: | |
| try: | |
| results = trainer.train_single_model( | |
| X_train, | |
| y_train, | |
| X_val, | |
| y_val, | |
| X_test, | |
| y_test, | |
| model_name, | |
| use_resampling=True, | |
| use_feature_selection=True, | |
| ) | |
| individual_results[model_name] = results | |
| logger.info( | |
| f"{model_name} - Micro F1: {results['test_metrics']['micro_f1']:.4f}" | |
| ) | |
| except Exception as e: | |
| logger.error(f"Failed to train {model_name}: {e}") | |
| # Train ensemble | |
| try: | |
| ensemble_results = trainer.train_ensemble( | |
| X_train, | |
| y_train, | |
| X_val, | |
| y_val, | |
| X_test, | |
| y_test, | |
| ensemble_type="stacking", | |
| use_resampling=True, | |
| use_feature_selection=True, | |
| ) | |
| logger.info( | |
| f"Ensemble - Micro F1: {ensemble_results['test_metrics']['micro_f1']:.4f}" | |
| ) | |
| except Exception as e: | |
| logger.error(f"Failed to train ensemble: {e}") | |
| ensemble_results = {} | |
| # Compile results | |
| trainer.results = { | |
| "timestamp": datetime.now().isoformat(), | |
| "dataset_info": { | |
| "n_samples": len(X_train) + len(X_val) + len(X_test), | |
| "n_features": X_train.shape[1], | |
| "n_classes": len(metadata.get("class_names", [])), | |
| }, | |
| "individual_results": individual_results, | |
| "ensemble_results": ensemble_results, | |
| "optimization_techniques": [ | |
| "SMOTE-Tomek resampling", | |
| "Feature selection with mutual information", | |
| "Optimized hyperparameters", | |
| "Ensemble stacking", | |
| ], | |
| } | |
| # Save models and results | |
| trainer.save_models("models/optimized") | |
| # Summary | |
| logger.info("\n" + "=" * 80) | |
| logger.info("OPTIMIZATION RESULTS SUMMARY") | |
| logger.info("=" * 80) | |
| logger.info("\nIndividual Model Performance:") | |
| for model_name, results in individual_results.items(): | |
| metrics = results["test_metrics"] | |
| logger.info(f" {model_name}:") | |
| logger.info(f" Micro F1: {metrics.get('micro_f1', 0):.4f}") | |
| logger.info(f" Macro F1: {metrics.get('macro_f1', 0):.4f}") | |
| logger.info(f" Micro AUC: {metrics.get('micro_auc', 0):.4f}") | |
| if ensemble_results: | |
| ensemble_metrics = ensemble_results["test_metrics"] | |
| logger.info(f"\nEnsemble Model Performance:") | |
| logger.info(f" Micro F1: {ensemble_metrics.get('micro_f1', 0):.4f}") | |
| logger.info(f" Macro F1: {ensemble_metrics.get('macro_f1', 0):.4f}") | |
| logger.info(f" Micro AUC: {ensemble_metrics.get('micro_auc', 0):.4f}") | |
| return trainer.results | |
| if __name__ == "__main__": | |
| results = run_optimized_training() | |