ddi / src /validation /optuna_hyperparameter_tune.py
github-actions[bot]
Deploy from GitHub Actions (fb28c05c54cf19184fc3f14f1bf3297ba5749ea2)
d29b763
"""Hyperparameter optimization using Optuna.
Tunes:
- Learning rate
- Dropout
- Hidden dimensions
- Batch size
- Class weights
- Focal gamma
- Ensemble strategy
Objective: maximize weighted healthcare score
(0.4 * severe_recall + 0.3 * macro_f1 + 0.2 * auroc + 0.1 * calibration_quality)
Output:
- optuna_trials.json
- optuna_best_params.json
- hyperparameter_optimization_report.md
"""
from __future__ import annotations
import argparse
import json
import logging
from pathlib import Path
from typing import Any, Dict
import joblib
import numpy as np
import optuna
import pandas as pd
from preprocessing.artifact_manager import manager
import torch
import torch.nn as nn
import torch.nn.functional as F
from optuna.pruners import MedianPruner
from optuna.samplers import TPESampler
from sklearn.metrics import f1_score, recall_score, roc_auc_score
from sklearn.model_selection import cross_val_score, train_test_split
from torch.utils.data import DataLoader, TensorDataset
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s [%(levelname)s] %(name)s: %(message)s',
)
logger = logging.getLogger('medcare_ddi.optuna_tune')
BASE_DIR = Path(__file__).resolve().parents[2]
DATA_DIR = BASE_DIR / 'data'
PROCESSED_DIR = DATA_DIR / 'processed'
MODEL_DIR = BASE_DIR / 'models'
REPORTS_DIR = MODEL_DIR / 'reports'
REPORTS_DIR.mkdir(parents=True, exist_ok=True)
LABEL_NAMES = ['unknown', 'minor', 'moderate', 'major']
LABEL_TO_INDEX = {label: idx for idx, label in enumerate(LABEL_NAMES)}
def load_training_data() -> tuple[np.ndarray, np.ndarray]:
"""Load preprocessed features and labels."""
feature_pipeline_path = MODEL_DIR / 'feature_pipeline_multisource.pkl'
if not feature_pipeline_path.exists():
raise FileNotFoundError(f'Feature pipeline not found: {feature_pipeline_path}')
feature_pipeline = joblib.load(feature_pipeline_path)
ddinter_path = PROCESSED_DIR / 'ddinter_combined.parquet'
if not ddinter_path.exists():
raise FileNotFoundError(f'DDInter not found: {ddinter_path}')
df = manager.load_artifact('ddinter_combined')
logger.info(f'Loaded {len(df)} DDInter records')
y = np.array([LABEL_TO_INDEX.get(str(lbl).lower(), 0) for lbl in df['Level']], dtype=np.int64)
# Create features from drug names using frozen pipeline
from training.feature_pipeline_multisource import transform_pair_features
features = []
for _, row in df.iterrows():
try:
vec = transform_pair_features(row['Drug_A'], row['Drug_B'], feature_pipeline)
features.append(vec)
except Exception as e:
logger.warning(f'Feature extraction failed for {row["Drug_A"]}, {row["Drug_B"]}: {e}')
continue
X = np.vstack(features).astype(np.float32)
logger.info(f'Extracted {X.shape[0]} feature vectors of dimension {X.shape[1]}')
return X[:len(features)], y[:len(features)]
def healthcare_objective(y_true: np.ndarray, y_pred: np.ndarray, y_proba: np.ndarray) -> float:
"""Compute weighted healthcare-aware objective."""
severe_idx = LABEL_TO_INDEX['major']
macro_f1 = f1_score(y_true, y_pred, average='macro', zero_division=0)
severe_recall = recall_score(y_true, y_pred, labels=[severe_idx], average='macro', zero_division=0)
try:
y_true_ovr = np.eye(len(LABEL_NAMES))[y_true]
auroc = roc_auc_score(y_true_ovr, y_proba, average='macro', multi_class='ovr')
except Exception:
auroc = 0.0
# Calibration quality (simplified): expected calibration error
confidences = np.max(y_proba, axis=1)
correct = (np.argmax(y_proba, axis=1) == y_true).astype(float)
calibration = np.abs(correct.mean() - confidences.mean())
calibration_quality = 1.0 - calibration
score = 0.4 * severe_recall + 0.3 * macro_f1 + 0.2 * auroc + 0.1 * calibration_quality
return float(score)
def train_and_evaluate_mlp(
X_train: np.ndarray,
X_val: np.ndarray,
y_train: np.ndarray,
y_val: np.ndarray,
params: Dict[str, Any],
device: str = 'cpu',
) -> float:
"""Train MLP and return healthcare objective score."""
lr = params['learning_rate']
dropout = params['dropout']
hidden_dim = params['hidden_dim']
batch_size = params['batch_size']
epochs = params.get('epochs', 60)
gamma = params.get('focal_gamma', 2.0)
model = nn.Sequential(
nn.Linear(X_train.shape[1], hidden_dim),
nn.ReLU(),
nn.Dropout(dropout),
nn.Linear(hidden_dim, hidden_dim // 2),
nn.ReLU(),
nn.Dropout(dropout),
nn.Linear(hidden_dim // 2, len(LABEL_NAMES)),
).to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=lr)
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='max', factor=0.5, patience=3)
# Compute class weights
class_counts = np.bincount(y_train, minlength=len(LABEL_NAMES))
class_weights = np.array([1.0 / max(c, 1) for c in class_counts])
class_weights = torch.tensor(class_weights / class_weights.sum(), dtype=torch.float32, device=device)
train_ds = TensorDataset(
torch.tensor(X_train, dtype=torch.float32),
torch.tensor(y_train, dtype=torch.long),
)
train_loader = DataLoader(train_ds, batch_size=batch_size, shuffle=True)
val_ds = TensorDataset(
torch.tensor(X_val, dtype=torch.float32),
torch.tensor(y_val, dtype=torch.long),
)
val_loader = DataLoader(val_ds, batch_size=batch_size, shuffle=False)
best_score = -np.inf
patience = 10
patience_counter = 0
for epoch in range(epochs):
model.train()
for X_batch, y_batch in train_loader:
X_batch, y_batch = X_batch.to(device), y_batch.to(device)
optimizer.zero_grad()
logits = model(X_batch)
ce_loss = F.cross_entropy(logits, y_batch, weight=class_weights, reduction='none')
p = torch.exp(-ce_loss)
focal_loss = ((1 - p) ** gamma) * ce_loss
loss = focal_loss.mean()
loss.backward()
torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
optimizer.step()
# Validate
model.eval()
all_preds = []
all_probs = []
with torch.no_grad():
for X_batch, _ in val_loader:
X_batch = X_batch.to(device)
logits = model(X_batch)
probs = F.softmax(logits, dim=1)
preds = torch.argmax(probs, dim=1)
all_preds.append(preds.cpu().numpy())
all_probs.append(probs.cpu().numpy())
preds = np.concatenate(all_preds)
probs = np.vstack(all_probs)
score = healthcare_objective(y_val, preds, probs)
scheduler.step(score)
if score > best_score:
best_score = score
patience_counter = 0
else:
patience_counter += 1
if patience_counter >= patience:
break
return best_score
def objective(trial: optuna.Trial, X: np.ndarray, y: np.ndarray) -> float:
"""Optuna objective function."""
# Split data
X_train, X_val, y_train, y_val = train_test_split(
X, y, test_size=0.2, random_state=2026, stratify=y
)
# Suggest hyperparameters
params = {
'learning_rate': trial.suggest_float('learning_rate', 1e-4, 1e-2, log=True),
'dropout': trial.suggest_float('dropout', 0.1, 0.5),
'hidden_dim': trial.suggest_int('hidden_dim', 128, 512, step=64),
'batch_size': trial.suggest_int('batch_size', 32, 256, step=32),
'focal_gamma': trial.suggest_float('focal_gamma', 0.5, 3.0),
}
device = 'cuda' if torch.cuda.is_available() else 'cpu'
try:
score = train_and_evaluate_mlp(X_train, X_val, y_train, y_val, params, device=device)
return score
except Exception as e:
logger.warning(f'Trial failed: {e}')
return -np.inf
def main() -> None:
parser = argparse.ArgumentParser(description='Hyperparameter optimization')
parser.add_argument('--n-trials', type=int, default=50)
parser.add_argument('--seed', type=int, default=2026)
parser.add_argument('--output-trials', type=str, default=str(REPORTS_DIR / 'optuna_trials.json'))
parser.add_argument('--output-best', type=str, default=str(REPORTS_DIR / 'optuna_best_params.json'))
parser.add_argument('--output-report', type=str, default=str(REPORTS_DIR / 'hyperparameter_optimization_report.md'))
args = parser.parse_args()
logger.info('Loading training data...')
X, y = load_training_data()
logger.info(f'Data shape: {X.shape}, labels: {y.shape}')
logger.info(f'Starting Optuna optimization with {args.n_trials} trials...')
sampler = TPESampler(seed=args.seed)
pruner = MedianPruner(n_startup_trials=10)
study = optuna.create_study(sampler=sampler, pruner=pruner, direction='maximize')
study.optimize(
lambda trial: objective(trial, X, y),
n_trials=args.n_trials,
show_progress_bar=True,
)
# Save results
trials_list = []
for trial in study.trials:
trials_list.append({
'number': trial.number,
'value': trial.value,
'params': trial.params,
'state': trial.state.name,
})
trials_path = Path(args.output_trials)
trials_path.parent.mkdir(parents=True, exist_ok=True)
trials_path.write_text(json.dumps(trials_list, indent=2), encoding='utf-8')
logger.info(f'Saved trials to {trials_path}')
# Save best params
best_params = study.best_trial.params
best_path = Path(args.output_best)
best_path.write_text(json.dumps(best_params, indent=2), encoding='utf-8')
logger.info(f'Best params: {best_params}')
logger.info(f'Best score: {study.best_value:.4f}')
# Generate report
report_path = Path(args.output_report)
with report_path.open('w') as f:
f.write('# Hyperparameter Optimization Report\n\n')
f.write(f'**Best Score:** {study.best_value:.4f}\n\n')
f.write('**Best Parameters:**\n\n')
f.write('```json\n')
f.write(json.dumps(best_params, indent=2))
f.write('\n```\n\n')
f.write(f'**Trials Completed:** {len(study.trials)}\n\n')
f.write('**Top 5 Trials:**\n\n')
f.write('| Trial | Score | LR | Dropout | Hidden | Batch |\n')
f.write('|-------|-------|----|---------|---------|---------|\n')
for trial in sorted(study.trials, key=lambda t: t.value or -np.inf, reverse=True)[:5]:
if trial.value is not None:
f.write(
f"| {trial.number} | {trial.value:.4f} | "
f"{trial.params.get('learning_rate', 0):.4e} | "
f"{trial.params.get('dropout', 0):.3f} | "
f"{trial.params.get('hidden_dim', 0)} | "
f"{trial.params.get('batch_size', 0)} |\n"
)
logger.info(f'Saved report to {report_path}')
logger.info('✓ Hyperparameter optimization complete')
if __name__ == '__main__':
main()