import os import json import joblib import optuna import numpy as np import pandas as pd import matplotlib.pyplot as plt from dataclasses import dataclass from typing import Dict, Any, Tuple, Optional from datasets import load_from_disk, DatasetDict from sklearn.metrics import ( f1_score, roc_auc_score, average_precision_score, precision_recall_curve, roc_curve ) from sklearn.linear_model import LogisticRegression from sklearn.ensemble import AdaBoostClassifier from sklearn.tree import DecisionTreeClassifier from linearboost import LinearBoostClassifier import xgboost as xgb from lightning.pytorch import seed_everything seed_everything(1986) # ----------------------------- # Data loading # ----------------------------- @dataclass class SplitData: X_train: np.ndarray y_train: np.ndarray seq_train: Optional[np.ndarray] X_val: np.ndarray y_val: np.ndarray seq_val: Optional[np.ndarray] def _stack_embeddings(col) -> np.ndarray: # HF datasets often store embeddings as list-of-floats per row arr = np.asarray(col, dtype=np.float32) if arr.ndim != 2: arr = np.stack(col).astype(np.float32) return arr def load_split_data(dataset_path: str) -> SplitData: ds = load_from_disk(dataset_path) # Case A: DatasetDict with train/val if isinstance(ds, DatasetDict) and "train" in ds and "val" in ds: train_ds, val_ds = ds["train"], ds["val"] else: # Case B: Single dataset with "split" column if "split" not in ds.column_names: raise ValueError( "Dataset must be a DatasetDict(train/val) or have a 'split' column." ) train_ds = ds.filter(lambda x: x["split"] == "train") val_ds = ds.filter(lambda x: x["split"] == "val") for required in ["embedding", "label"]: if required not in train_ds.column_names: raise ValueError(f"Missing column '{required}' in train split.") if required not in val_ds.column_names: raise ValueError(f"Missing column '{required}' in val split.") X_train = _stack_embeddings(train_ds["embedding"]) y_train = np.asarray(train_ds["label"], dtype=np.int64) X_val = _stack_embeddings(val_ds["embedding"]) y_val = np.asarray(val_ds["label"], dtype=np.int64) seq_train = None seq_val = None if "sequence" in train_ds.column_names: seq_train = np.asarray(train_ds["sequence"]) if "sequence" in val_ds.column_names: seq_val = np.asarray(val_ds["sequence"]) return SplitData(X_train, y_train, seq_train, X_val, y_val, seq_val) # ----------------------------- # Metrics + thresholding # ----------------------------- def best_f1_threshold(y_true: np.ndarray, y_prob: np.ndarray) -> Tuple[float, float]: """ Find threshold maximizing F1 on the given set. Returns (best_threshold, best_f1). """ precision, recall, thresholds = precision_recall_curve(y_true, y_prob) # precision_recall_curve returns thresholds of length n-1 # compute F1 for those thresholds f1s = (2 * precision[:-1] * recall[:-1]) / (precision[:-1] + recall[:-1] + 1e-12) best_idx = int(np.nanargmax(f1s)) return float(thresholds[best_idx]), float(f1s[best_idx]) def eval_binary(y_true: np.ndarray, y_prob: np.ndarray, threshold: float) -> Dict[str, float]: y_pred = (y_prob >= threshold).astype(int) return { "f1": float(f1_score(y_true, y_pred)), "auc": float(roc_auc_score(y_true, y_prob)), "ap": float(average_precision_score(y_true, y_prob)), "threshold": float(threshold), } # ----------------------------- # Model factories # ----------------------------- def train_xgb( X_train, y_train, X_val, y_val, params: Dict[str, Any] ) -> Tuple[xgb.Booster, np.ndarray, np.ndarray]: dtrain = xgb.DMatrix(X_train, label=y_train) dval = xgb.DMatrix(X_val, label=y_val) num_boost_round = int(params.pop("num_boost_round")) early_stopping_rounds = int(params.pop("early_stopping_rounds")) booster = xgb.train( params=params, dtrain=dtrain, num_boost_round=num_boost_round, evals=[(dval, "val")], early_stopping_rounds=early_stopping_rounds, verbose_eval=False, ) p_train = booster.predict(dtrain) p_val = booster.predict(dval) return booster, p_train, p_val def train_adaboost( X_train, y_train, X_val, y_val, params: Dict[str, Any] ) -> Tuple[AdaBoostClassifier, np.ndarray, np.ndarray]: base_depth = int(params.pop("base_depth")) clf = AdaBoostClassifier( estimator=DecisionTreeClassifier(max_depth=base_depth), n_estimators=int(params["n_estimators"]), learning_rate=float(params["learning_rate"]), algorithm="SAMME", ) clf.fit(X_train, y_train) p_train = clf.predict_proba(X_train)[:, 1] p_val = clf.predict_proba(X_val)[:, 1] return clf, p_train, p_val def train_linearboost(X_train, y_train, X_val, y_val, params): clf = LinearBoostClassifier(**params) clf.fit(X_train, y_train) p_train = clf.predict_proba(X_train)[:, 1] p_val = clf.predict_proba(X_val)[:, 1] return clf, p_train, p_val def suggest_linearboost_params(trial): # Core boosting params params = { "n_estimators": trial.suggest_int("n_estimators", 50, 800), "learning_rate": trial.suggest_float("learning_rate", 0.01, 1.0, log=True), "algorithm": trial.suggest_categorical("algorithm", ["SAMME.R", "SAMME"]), # Scaling choices from docs (you can expand this list if you want) "scaler": trial.suggest_categorical( "scaler", ["minmax", "standard", "robust", "quantile-uniform", "quantile-normal", "power"] ), # useful for imbalanced splits "class_weight": trial.suggest_categorical("class_weight", [None, "balanced"]), # kernel trick "kernel": trial.suggest_categorical("kernel", ["linear", "rbf", "poly", "sigmoid"]), } # Kernel-specific params (only when relevant) if params["kernel"] in ["rbf", "poly"]: params["gamma"] = trial.suggest_float("gamma", 1e-6, 10.0, log=True) else: params["gamma"] = None # docs: default treated as 1/n_features for rbf/poly :contentReference[oaicite:5]{index=5} if params["kernel"] == "poly": params["degree"] = trial.suggest_int("degree", 2, 6) # docs default=3 :contentReference[oaicite:6]{index=6} params["coef0"] = trial.suggest_float("coef0", 0.0, 5.0) # docs default=1 :contentReference[oaicite:7]{index=7} else: # safe defaults params["degree"] = 3 params["coef0"] = 1.0 return params # ----------------------------- # Saving artifacts # ----------------------------- def save_predictions_csv( out_dir: str, split_name: str, y_true: np.ndarray, y_prob: np.ndarray, threshold: float, sequences: Optional[np.ndarray] = None, ): os.makedirs(out_dir, exist_ok=True) df = pd.DataFrame({ "y_true": y_true.astype(int), "y_prob": y_prob.astype(float), "y_pred": (y_prob >= threshold).astype(int), }) if sequences is not None: df.insert(0, "sequence", sequences) df.to_csv(os.path.join(out_dir, f"{split_name}_predictions.csv"), index=False) def plot_curves(out_dir: str, y_true: np.ndarray, y_prob: np.ndarray): os.makedirs(out_dir, exist_ok=True) # PR precision, recall, _ = precision_recall_curve(y_true, y_prob) plt.figure() plt.plot(recall, precision) plt.xlabel("Recall") plt.ylabel("Precision") plt.title("Precision-Recall Curve") plt.tight_layout() plt.savefig(os.path.join(out_dir, "pr_curve.png")) plt.close() # ROC fpr, tpr, _ = roc_curve(y_true, y_prob) plt.figure() plt.plot(fpr, tpr) plt.xlabel("False Positive Rate") plt.ylabel("True Positive Rate") plt.title("ROC Curve") plt.tight_layout() plt.savefig(os.path.join(out_dir, "roc_curve.png")) plt.close() # ----------------------------- # Optuna objectives # ----------------------------- def make_objective(model_name: str, data: SplitData, out_dir: str): Xtr, ytr, Xva, yva = data.X_train, data.y_train, data.X_val, data.y_val def objective(trial: optuna.Trial) -> float: if model_name == "xgb": params = { "objective": "binary:logistic", "eval_metric": "logloss", "lambda": trial.suggest_float("lambda", 1e-8, 50.0, log=True), "alpha": trial.suggest_float("alpha", 1e-8, 50.0, log=True), "colsample_bytree": trial.suggest_float("colsample_bytree", 0.3, 1.0), "subsample": trial.suggest_float("subsample", 0.5, 1.0), "learning_rate": trial.suggest_float("learning_rate", 1e-3, 0.3, log=True), "max_depth": trial.suggest_int("max_depth", 2, 15), "min_child_weight": trial.suggest_int("min_child_weight", 1, 500), "gamma": trial.suggest_float("gamma", 0.0, 10.0), "tree_method": "hist", "device": "cuda", } # Optional GPU: set env CUDA_VISIBLE_DEVICES externally if you want. # If you *know* you want GPU and your xgboost supports it: # params["device"] = "cuda" params["num_boost_round"] = trial.suggest_int("num_boost_round", 50, 1500) params["early_stopping_rounds"] = trial.suggest_int("early_stopping_rounds", 20, 200) model, p_tr, p_va = train_xgb(Xtr, ytr, Xva, yva, params.copy()) elif model_name == "adaboost": params = { "n_estimators": trial.suggest_int("n_estimators", 50, 800), "learning_rate": trial.suggest_float("learning_rate", 1e-3, 2.0, log=True), "base_depth": trial.suggest_int("base_depth", 1, 4), } model, p_tr, p_va = train_adaboost(Xtr, ytr, Xva, yva, params) elif model_name == "linearboost": params = suggest_linearboost_params(trial) model, p_tr, p_va = train_linearboost(Xtr, ytr, Xva, yva, params) else: raise ValueError(f"Unknown model_name={model_name}") # Threshold picked on val for fair comparison across models thr, f1_at_thr = best_f1_threshold(yva, p_va) metrics = eval_binary(yva, p_va, thr) # Track best trial artifacts inside the study directory trial.set_user_attr("threshold", thr) trial.set_user_attr("auc", metrics["auc"]) trial.set_user_attr("ap", metrics["ap"]) return f1_at_thr return objective # ----------------------------- # Main runner # ----------------------------- def run_optuna_and_refit( dataset_path: str, out_dir: str, model_name: str, n_trials: int = 200, ): os.makedirs(out_dir, exist_ok=True) data = load_split_data(dataset_path) print(f"[Data] Train: {data.X_train.shape}, Val: {data.X_val.shape}") study = optuna.create_study(direction="maximize", pruner=optuna.pruners.MedianPruner()) study.optimize(make_objective(model_name, data, out_dir), n_trials=n_trials) # Save trials table trials_df = study.trials_dataframe() trials_df.to_csv(os.path.join(out_dir, "study_trials.csv"), index=False) best = study.best_trial best_params = dict(best.params) best_thr = float(best.user_attrs["threshold"]) best_auc = float(best.user_attrs["auc"]) best_ap = float(best.user_attrs["ap"]) best_f1 = float(best.value) # Refit best model on train (same protocol as objective) if model_name == "xgb": # Reconstruct full param dict params = { "objective": "binary:logistic", "eval_metric": "logloss", "lambda": best_params["lambda"], "alpha": best_params["alpha"], "colsample_bytree": best_params["colsample_bytree"], "subsample": best_params["subsample"], "learning_rate": best_params["learning_rate"], "max_depth": best_params["max_depth"], "min_child_weight": best_params["min_child_weight"], "gamma": best_params["gamma"], "tree_method": "hist", "num_boost_round": best_params["num_boost_round"], "early_stopping_rounds": best_params["early_stopping_rounds"], } model, p_tr, p_va = train_xgb( data.X_train, data.y_train, data.X_val, data.y_val, params ) model_path = os.path.join(out_dir, "best_model.json") model.save_model(model_path) elif model_name == "adaboost": params = best_params model, p_tr, p_va = train_adaboost( data.X_train, data.y_train, data.X_val, data.y_val, params ) model_path = os.path.join(out_dir, "best_model.joblib") joblib.dump(model, model_path) elif model_name == "linearboost": params = best_params model, p_tr, p_va = train_linearboost( data.X_train, data.y_train, data.X_val, data.y_val, params ) model_path = os.path.join(out_dir, "best_model.joblib") joblib.dump(model, model_path) else: raise ValueError(model_name) # Save predictions CSVs save_predictions_csv(out_dir, "train", data.y_train, p_tr, best_thr, data.seq_train) save_predictions_csv(out_dir, "val", data.y_val, p_va, best_thr, data.seq_val) # Plots on val plot_curves(out_dir, data.y_val, p_va) # Summary summary = [ "=" * 72, f"MODEL: {model_name}", f"Best trial: {best.number}", f"Best F1 (val @ best-threshold): {best_f1:.4f}", f"Val AUC: {best_auc:.4f}", f"Val AP: {best_ap:.4f}", f"Best threshold (picked on val): {best_thr:.4f}", f"Model saved to: {model_path}", "Best params:", json.dumps(best_params, indent=2), "=" * 72, ] with open(os.path.join(out_dir, "optimization_summary.txt"), "w") as f: f.write("\n".join(summary)) print("\n".join(summary)) if __name__ == "__main__": # Example usage: # dataset_path = "/vast/projects/pranam/lab/yz927/projects/Classifier_Weight/training_classifiers/data/solubility" # out_dir = "/vast/projects/pranam/lab/yz927/projects/Classifier_Weight/training_classifiers/src/solubility/xgb" # run_optuna_and_refit(dataset_path, out_dir, model_name="xgb", n_trials=200) import argparse parser = argparse.ArgumentParser() parser.add_argument("--dataset_path", type=str, required=True) parser.add_argument("--out_dir", type=str, required=True) parser.add_argument("--model", type=str, choices=["xgb", "adaboost", "linearboost"], required=True) parser.add_argument("--n_trials", type=int, default=200) args = parser.parse_args() run_optuna_and_refit( dataset_path=args.dataset_path, out_dir=args.out_dir, model_name=args.model, n_trials=args.n_trials, )