"""Hyperparameter optimization using Optuna. Tunes: - Learning rate - Dropout - Hidden dimensions - Batch size - Class weights - Focal gamma - Ensemble strategy Objective: maximize weighted healthcare score (0.4 * severe_recall + 0.3 * macro_f1 + 0.2 * auroc + 0.1 * calibration_quality) Output: - optuna_trials.json - optuna_best_params.json - hyperparameter_optimization_report.md """ from __future__ import annotations import argparse import json import logging from pathlib import Path from typing import Any, Dict import joblib import numpy as np import optuna import pandas as pd from preprocessing.artifact_manager import manager import torch import torch.nn as nn import torch.nn.functional as F from optuna.pruners import MedianPruner from optuna.samplers import TPESampler from sklearn.metrics import f1_score, recall_score, roc_auc_score from sklearn.model_selection import cross_val_score, train_test_split from torch.utils.data import DataLoader, TensorDataset logging.basicConfig( level=logging.INFO, format='%(asctime)s [%(levelname)s] %(name)s: %(message)s', ) logger = logging.getLogger('medcare_ddi.optuna_tune') BASE_DIR = Path(__file__).resolve().parents[2] DATA_DIR = BASE_DIR / 'data' PROCESSED_DIR = DATA_DIR / 'processed' MODEL_DIR = BASE_DIR / 'models' REPORTS_DIR = MODEL_DIR / 'reports' REPORTS_DIR.mkdir(parents=True, exist_ok=True) LABEL_NAMES = ['unknown', 'minor', 'moderate', 'major'] LABEL_TO_INDEX = {label: idx for idx, label in enumerate(LABEL_NAMES)} def load_training_data() -> tuple[np.ndarray, np.ndarray]: """Load preprocessed features and labels.""" feature_pipeline_path = MODEL_DIR / 'feature_pipeline_multisource.pkl' if not feature_pipeline_path.exists(): raise FileNotFoundError(f'Feature pipeline not found: {feature_pipeline_path}') feature_pipeline = joblib.load(feature_pipeline_path) ddinter_path = PROCESSED_DIR / 'ddinter_combined.parquet' if not ddinter_path.exists(): raise FileNotFoundError(f'DDInter not found: {ddinter_path}') df = manager.load_artifact('ddinter_combined') logger.info(f'Loaded {len(df)} DDInter records') y = np.array([LABEL_TO_INDEX.get(str(lbl).lower(), 0) for lbl in df['Level']], dtype=np.int64) # Create features from drug names using frozen pipeline from training.feature_pipeline_multisource import transform_pair_features features = [] for _, row in df.iterrows(): try: vec = transform_pair_features(row['Drug_A'], row['Drug_B'], feature_pipeline) features.append(vec) except Exception as e: logger.warning(f'Feature extraction failed for {row["Drug_A"]}, {row["Drug_B"]}: {e}') continue X = np.vstack(features).astype(np.float32) logger.info(f'Extracted {X.shape[0]} feature vectors of dimension {X.shape[1]}') return X[:len(features)], y[:len(features)] def healthcare_objective(y_true: np.ndarray, y_pred: np.ndarray, y_proba: np.ndarray) -> float: """Compute weighted healthcare-aware objective.""" severe_idx = LABEL_TO_INDEX['major'] macro_f1 = f1_score(y_true, y_pred, average='macro', zero_division=0) severe_recall = recall_score(y_true, y_pred, labels=[severe_idx], average='macro', zero_division=0) try: y_true_ovr = np.eye(len(LABEL_NAMES))[y_true] auroc = roc_auc_score(y_true_ovr, y_proba, average='macro', multi_class='ovr') except Exception: auroc = 0.0 # Calibration quality (simplified): expected calibration error confidences = np.max(y_proba, axis=1) correct = (np.argmax(y_proba, axis=1) == y_true).astype(float) calibration = np.abs(correct.mean() - confidences.mean()) calibration_quality = 1.0 - calibration score = 0.4 * severe_recall + 0.3 * macro_f1 + 0.2 * auroc + 0.1 * calibration_quality return float(score) def train_and_evaluate_mlp( X_train: np.ndarray, X_val: np.ndarray, y_train: np.ndarray, y_val: np.ndarray, params: Dict[str, Any], device: str = 'cpu', ) -> float: """Train MLP and return healthcare objective score.""" lr = params['learning_rate'] dropout = params['dropout'] hidden_dim = params['hidden_dim'] batch_size = params['batch_size'] epochs = params.get('epochs', 60) gamma = params.get('focal_gamma', 2.0) model = nn.Sequential( nn.Linear(X_train.shape[1], hidden_dim), nn.ReLU(), nn.Dropout(dropout), nn.Linear(hidden_dim, hidden_dim // 2), nn.ReLU(), nn.Dropout(dropout), nn.Linear(hidden_dim // 2, len(LABEL_NAMES)), ).to(device) optimizer = torch.optim.Adam(model.parameters(), lr=lr) scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='max', factor=0.5, patience=3) # Compute class weights class_counts = np.bincount(y_train, minlength=len(LABEL_NAMES)) class_weights = np.array([1.0 / max(c, 1) for c in class_counts]) class_weights = torch.tensor(class_weights / class_weights.sum(), dtype=torch.float32, device=device) train_ds = TensorDataset( torch.tensor(X_train, dtype=torch.float32), torch.tensor(y_train, dtype=torch.long), ) train_loader = DataLoader(train_ds, batch_size=batch_size, shuffle=True) val_ds = TensorDataset( torch.tensor(X_val, dtype=torch.float32), torch.tensor(y_val, dtype=torch.long), ) val_loader = DataLoader(val_ds, batch_size=batch_size, shuffle=False) best_score = -np.inf patience = 10 patience_counter = 0 for epoch in range(epochs): model.train() for X_batch, y_batch in train_loader: X_batch, y_batch = X_batch.to(device), y_batch.to(device) optimizer.zero_grad() logits = model(X_batch) ce_loss = F.cross_entropy(logits, y_batch, weight=class_weights, reduction='none') p = torch.exp(-ce_loss) focal_loss = ((1 - p) ** gamma) * ce_loss loss = focal_loss.mean() loss.backward() torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0) optimizer.step() # Validate model.eval() all_preds = [] all_probs = [] with torch.no_grad(): for X_batch, _ in val_loader: X_batch = X_batch.to(device) logits = model(X_batch) probs = F.softmax(logits, dim=1) preds = torch.argmax(probs, dim=1) all_preds.append(preds.cpu().numpy()) all_probs.append(probs.cpu().numpy()) preds = np.concatenate(all_preds) probs = np.vstack(all_probs) score = healthcare_objective(y_val, preds, probs) scheduler.step(score) if score > best_score: best_score = score patience_counter = 0 else: patience_counter += 1 if patience_counter >= patience: break return best_score def objective(trial: optuna.Trial, X: np.ndarray, y: np.ndarray) -> float: """Optuna objective function.""" # Split data X_train, X_val, y_train, y_val = train_test_split( X, y, test_size=0.2, random_state=2026, stratify=y ) # Suggest hyperparameters params = { 'learning_rate': trial.suggest_float('learning_rate', 1e-4, 1e-2, log=True), 'dropout': trial.suggest_float('dropout', 0.1, 0.5), 'hidden_dim': trial.suggest_int('hidden_dim', 128, 512, step=64), 'batch_size': trial.suggest_int('batch_size', 32, 256, step=32), 'focal_gamma': trial.suggest_float('focal_gamma', 0.5, 3.0), } device = 'cuda' if torch.cuda.is_available() else 'cpu' try: score = train_and_evaluate_mlp(X_train, X_val, y_train, y_val, params, device=device) return score except Exception as e: logger.warning(f'Trial failed: {e}') return -np.inf def main() -> None: parser = argparse.ArgumentParser(description='Hyperparameter optimization') parser.add_argument('--n-trials', type=int, default=50) parser.add_argument('--seed', type=int, default=2026) parser.add_argument('--output-trials', type=str, default=str(REPORTS_DIR / 'optuna_trials.json')) parser.add_argument('--output-best', type=str, default=str(REPORTS_DIR / 'optuna_best_params.json')) parser.add_argument('--output-report', type=str, default=str(REPORTS_DIR / 'hyperparameter_optimization_report.md')) args = parser.parse_args() logger.info('Loading training data...') X, y = load_training_data() logger.info(f'Data shape: {X.shape}, labels: {y.shape}') logger.info(f'Starting Optuna optimization with {args.n_trials} trials...') sampler = TPESampler(seed=args.seed) pruner = MedianPruner(n_startup_trials=10) study = optuna.create_study(sampler=sampler, pruner=pruner, direction='maximize') study.optimize( lambda trial: objective(trial, X, y), n_trials=args.n_trials, show_progress_bar=True, ) # Save results trials_list = [] for trial in study.trials: trials_list.append({ 'number': trial.number, 'value': trial.value, 'params': trial.params, 'state': trial.state.name, }) trials_path = Path(args.output_trials) trials_path.parent.mkdir(parents=True, exist_ok=True) trials_path.write_text(json.dumps(trials_list, indent=2), encoding='utf-8') logger.info(f'Saved trials to {trials_path}') # Save best params best_params = study.best_trial.params best_path = Path(args.output_best) best_path.write_text(json.dumps(best_params, indent=2), encoding='utf-8') logger.info(f'Best params: {best_params}') logger.info(f'Best score: {study.best_value:.4f}') # Generate report report_path = Path(args.output_report) with report_path.open('w') as f: f.write('# Hyperparameter Optimization Report\n\n') f.write(f'**Best Score:** {study.best_value:.4f}\n\n') f.write('**Best Parameters:**\n\n') f.write('```json\n') f.write(json.dumps(best_params, indent=2)) f.write('\n```\n\n') f.write(f'**Trials Completed:** {len(study.trials)}\n\n') f.write('**Top 5 Trials:**\n\n') f.write('| Trial | Score | LR | Dropout | Hidden | Batch |\n') f.write('|-------|-------|----|---------|---------|---------|\n') for trial in sorted(study.trials, key=lambda t: t.value or -np.inf, reverse=True)[:5]: if trial.value is not None: f.write( f"| {trial.number} | {trial.value:.4f} | " f"{trial.params.get('learning_rate', 0):.4e} | " f"{trial.params.get('dropout', 0):.3f} | " f"{trial.params.get('hidden_dim', 0)} | " f"{trial.params.get('batch_size', 0)} |\n" ) logger.info(f'Saved report to {report_path}') logger.info('✓ Hyperparameter optimization complete') if __name__ == '__main__': main()