Spaces:
Running
Running
| """Hyperparameter optimization using Optuna. | |
| Tunes: | |
| - Learning rate | |
| - Dropout | |
| - Hidden dimensions | |
| - Batch size | |
| - Class weights | |
| - Focal gamma | |
| - Ensemble strategy | |
| Objective: maximize weighted healthcare score | |
| (0.4 * severe_recall + 0.3 * macro_f1 + 0.2 * auroc + 0.1 * calibration_quality) | |
| Output: | |
| - optuna_trials.json | |
| - optuna_best_params.json | |
| - hyperparameter_optimization_report.md | |
| """ | |
| from __future__ import annotations | |
| import argparse | |
| import json | |
| import logging | |
| from pathlib import Path | |
| from typing import Any, Dict | |
| import joblib | |
| import numpy as np | |
| import optuna | |
| import pandas as pd | |
| from preprocessing.artifact_manager import manager | |
| import torch | |
| import torch.nn as nn | |
| import torch.nn.functional as F | |
| from optuna.pruners import MedianPruner | |
| from optuna.samplers import TPESampler | |
| from sklearn.metrics import f1_score, recall_score, roc_auc_score | |
| from sklearn.model_selection import cross_val_score, train_test_split | |
| from torch.utils.data import DataLoader, TensorDataset | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s [%(levelname)s] %(name)s: %(message)s', | |
| ) | |
| logger = logging.getLogger('medcare_ddi.optuna_tune') | |
| BASE_DIR = Path(__file__).resolve().parents[2] | |
| DATA_DIR = BASE_DIR / 'data' | |
| PROCESSED_DIR = DATA_DIR / 'processed' | |
| MODEL_DIR = BASE_DIR / 'models' | |
| REPORTS_DIR = MODEL_DIR / 'reports' | |
| REPORTS_DIR.mkdir(parents=True, exist_ok=True) | |
| LABEL_NAMES = ['unknown', 'minor', 'moderate', 'major'] | |
| LABEL_TO_INDEX = {label: idx for idx, label in enumerate(LABEL_NAMES)} | |
| def load_training_data() -> tuple[np.ndarray, np.ndarray]: | |
| """Load preprocessed features and labels.""" | |
| feature_pipeline_path = MODEL_DIR / 'feature_pipeline_multisource.pkl' | |
| if not feature_pipeline_path.exists(): | |
| raise FileNotFoundError(f'Feature pipeline not found: {feature_pipeline_path}') | |
| feature_pipeline = joblib.load(feature_pipeline_path) | |
| ddinter_path = PROCESSED_DIR / 'ddinter_combined.parquet' | |
| if not ddinter_path.exists(): | |
| raise FileNotFoundError(f'DDInter not found: {ddinter_path}') | |
| df = manager.load_artifact('ddinter_combined') | |
| logger.info(f'Loaded {len(df)} DDInter records') | |
| y = np.array([LABEL_TO_INDEX.get(str(lbl).lower(), 0) for lbl in df['Level']], dtype=np.int64) | |
| # Create features from drug names using frozen pipeline | |
| from training.feature_pipeline_multisource import transform_pair_features | |
| features = [] | |
| for _, row in df.iterrows(): | |
| try: | |
| vec = transform_pair_features(row['Drug_A'], row['Drug_B'], feature_pipeline) | |
| features.append(vec) | |
| except Exception as e: | |
| logger.warning(f'Feature extraction failed for {row["Drug_A"]}, {row["Drug_B"]}: {e}') | |
| continue | |
| X = np.vstack(features).astype(np.float32) | |
| logger.info(f'Extracted {X.shape[0]} feature vectors of dimension {X.shape[1]}') | |
| return X[:len(features)], y[:len(features)] | |
| def healthcare_objective(y_true: np.ndarray, y_pred: np.ndarray, y_proba: np.ndarray) -> float: | |
| """Compute weighted healthcare-aware objective.""" | |
| severe_idx = LABEL_TO_INDEX['major'] | |
| macro_f1 = f1_score(y_true, y_pred, average='macro', zero_division=0) | |
| severe_recall = recall_score(y_true, y_pred, labels=[severe_idx], average='macro', zero_division=0) | |
| try: | |
| y_true_ovr = np.eye(len(LABEL_NAMES))[y_true] | |
| auroc = roc_auc_score(y_true_ovr, y_proba, average='macro', multi_class='ovr') | |
| except Exception: | |
| auroc = 0.0 | |
| # Calibration quality (simplified): expected calibration error | |
| confidences = np.max(y_proba, axis=1) | |
| correct = (np.argmax(y_proba, axis=1) == y_true).astype(float) | |
| calibration = np.abs(correct.mean() - confidences.mean()) | |
| calibration_quality = 1.0 - calibration | |
| score = 0.4 * severe_recall + 0.3 * macro_f1 + 0.2 * auroc + 0.1 * calibration_quality | |
| return float(score) | |
| def train_and_evaluate_mlp( | |
| X_train: np.ndarray, | |
| X_val: np.ndarray, | |
| y_train: np.ndarray, | |
| y_val: np.ndarray, | |
| params: Dict[str, Any], | |
| device: str = 'cpu', | |
| ) -> float: | |
| """Train MLP and return healthcare objective score.""" | |
| lr = params['learning_rate'] | |
| dropout = params['dropout'] | |
| hidden_dim = params['hidden_dim'] | |
| batch_size = params['batch_size'] | |
| epochs = params.get('epochs', 60) | |
| gamma = params.get('focal_gamma', 2.0) | |
| model = nn.Sequential( | |
| nn.Linear(X_train.shape[1], hidden_dim), | |
| nn.ReLU(), | |
| nn.Dropout(dropout), | |
| nn.Linear(hidden_dim, hidden_dim // 2), | |
| nn.ReLU(), | |
| nn.Dropout(dropout), | |
| nn.Linear(hidden_dim // 2, len(LABEL_NAMES)), | |
| ).to(device) | |
| optimizer = torch.optim.Adam(model.parameters(), lr=lr) | |
| scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='max', factor=0.5, patience=3) | |
| # Compute class weights | |
| class_counts = np.bincount(y_train, minlength=len(LABEL_NAMES)) | |
| class_weights = np.array([1.0 / max(c, 1) for c in class_counts]) | |
| class_weights = torch.tensor(class_weights / class_weights.sum(), dtype=torch.float32, device=device) | |
| train_ds = TensorDataset( | |
| torch.tensor(X_train, dtype=torch.float32), | |
| torch.tensor(y_train, dtype=torch.long), | |
| ) | |
| train_loader = DataLoader(train_ds, batch_size=batch_size, shuffle=True) | |
| val_ds = TensorDataset( | |
| torch.tensor(X_val, dtype=torch.float32), | |
| torch.tensor(y_val, dtype=torch.long), | |
| ) | |
| val_loader = DataLoader(val_ds, batch_size=batch_size, shuffle=False) | |
| best_score = -np.inf | |
| patience = 10 | |
| patience_counter = 0 | |
| for epoch in range(epochs): | |
| model.train() | |
| for X_batch, y_batch in train_loader: | |
| X_batch, y_batch = X_batch.to(device), y_batch.to(device) | |
| optimizer.zero_grad() | |
| logits = model(X_batch) | |
| ce_loss = F.cross_entropy(logits, y_batch, weight=class_weights, reduction='none') | |
| p = torch.exp(-ce_loss) | |
| focal_loss = ((1 - p) ** gamma) * ce_loss | |
| loss = focal_loss.mean() | |
| loss.backward() | |
| torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0) | |
| optimizer.step() | |
| # Validate | |
| model.eval() | |
| all_preds = [] | |
| all_probs = [] | |
| with torch.no_grad(): | |
| for X_batch, _ in val_loader: | |
| X_batch = X_batch.to(device) | |
| logits = model(X_batch) | |
| probs = F.softmax(logits, dim=1) | |
| preds = torch.argmax(probs, dim=1) | |
| all_preds.append(preds.cpu().numpy()) | |
| all_probs.append(probs.cpu().numpy()) | |
| preds = np.concatenate(all_preds) | |
| probs = np.vstack(all_probs) | |
| score = healthcare_objective(y_val, preds, probs) | |
| scheduler.step(score) | |
| if score > best_score: | |
| best_score = score | |
| patience_counter = 0 | |
| else: | |
| patience_counter += 1 | |
| if patience_counter >= patience: | |
| break | |
| return best_score | |
| def objective(trial: optuna.Trial, X: np.ndarray, y: np.ndarray) -> float: | |
| """Optuna objective function.""" | |
| # Split data | |
| X_train, X_val, y_train, y_val = train_test_split( | |
| X, y, test_size=0.2, random_state=2026, stratify=y | |
| ) | |
| # Suggest hyperparameters | |
| params = { | |
| 'learning_rate': trial.suggest_float('learning_rate', 1e-4, 1e-2, log=True), | |
| 'dropout': trial.suggest_float('dropout', 0.1, 0.5), | |
| 'hidden_dim': trial.suggest_int('hidden_dim', 128, 512, step=64), | |
| 'batch_size': trial.suggest_int('batch_size', 32, 256, step=32), | |
| 'focal_gamma': trial.suggest_float('focal_gamma', 0.5, 3.0), | |
| } | |
| device = 'cuda' if torch.cuda.is_available() else 'cpu' | |
| try: | |
| score = train_and_evaluate_mlp(X_train, X_val, y_train, y_val, params, device=device) | |
| return score | |
| except Exception as e: | |
| logger.warning(f'Trial failed: {e}') | |
| return -np.inf | |
| def main() -> None: | |
| parser = argparse.ArgumentParser(description='Hyperparameter optimization') | |
| parser.add_argument('--n-trials', type=int, default=50) | |
| parser.add_argument('--seed', type=int, default=2026) | |
| parser.add_argument('--output-trials', type=str, default=str(REPORTS_DIR / 'optuna_trials.json')) | |
| parser.add_argument('--output-best', type=str, default=str(REPORTS_DIR / 'optuna_best_params.json')) | |
| parser.add_argument('--output-report', type=str, default=str(REPORTS_DIR / 'hyperparameter_optimization_report.md')) | |
| args = parser.parse_args() | |
| logger.info('Loading training data...') | |
| X, y = load_training_data() | |
| logger.info(f'Data shape: {X.shape}, labels: {y.shape}') | |
| logger.info(f'Starting Optuna optimization with {args.n_trials} trials...') | |
| sampler = TPESampler(seed=args.seed) | |
| pruner = MedianPruner(n_startup_trials=10) | |
| study = optuna.create_study(sampler=sampler, pruner=pruner, direction='maximize') | |
| study.optimize( | |
| lambda trial: objective(trial, X, y), | |
| n_trials=args.n_trials, | |
| show_progress_bar=True, | |
| ) | |
| # Save results | |
| trials_list = [] | |
| for trial in study.trials: | |
| trials_list.append({ | |
| 'number': trial.number, | |
| 'value': trial.value, | |
| 'params': trial.params, | |
| 'state': trial.state.name, | |
| }) | |
| trials_path = Path(args.output_trials) | |
| trials_path.parent.mkdir(parents=True, exist_ok=True) | |
| trials_path.write_text(json.dumps(trials_list, indent=2), encoding='utf-8') | |
| logger.info(f'Saved trials to {trials_path}') | |
| # Save best params | |
| best_params = study.best_trial.params | |
| best_path = Path(args.output_best) | |
| best_path.write_text(json.dumps(best_params, indent=2), encoding='utf-8') | |
| logger.info(f'Best params: {best_params}') | |
| logger.info(f'Best score: {study.best_value:.4f}') | |
| # Generate report | |
| report_path = Path(args.output_report) | |
| with report_path.open('w') as f: | |
| f.write('# Hyperparameter Optimization Report\n\n') | |
| f.write(f'**Best Score:** {study.best_value:.4f}\n\n') | |
| f.write('**Best Parameters:**\n\n') | |
| f.write('```json\n') | |
| f.write(json.dumps(best_params, indent=2)) | |
| f.write('\n```\n\n') | |
| f.write(f'**Trials Completed:** {len(study.trials)}\n\n') | |
| f.write('**Top 5 Trials:**\n\n') | |
| f.write('| Trial | Score | LR | Dropout | Hidden | Batch |\n') | |
| f.write('|-------|-------|----|---------|---------|---------|\n') | |
| for trial in sorted(study.trials, key=lambda t: t.value or -np.inf, reverse=True)[:5]: | |
| if trial.value is not None: | |
| f.write( | |
| f"| {trial.number} | {trial.value:.4f} | " | |
| f"{trial.params.get('learning_rate', 0):.4e} | " | |
| f"{trial.params.get('dropout', 0):.3f} | " | |
| f"{trial.params.get('hidden_dim', 0)} | " | |
| f"{trial.params.get('batch_size', 0)} |\n" | |
| ) | |
| logger.info(f'Saved report to {report_path}') | |
| logger.info('✓ Hyperparameter optimization complete') | |
| if __name__ == '__main__': | |
| main() | |