Joblib
ynuozhang
update code
baf3373
import os
import json
import joblib
import optuna
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from dataclasses import dataclass
from typing import Dict, Any, Tuple, Optional
from datasets import load_from_disk, DatasetDict
from sklearn.metrics import (
f1_score, roc_auc_score, average_precision_score,
precision_recall_curve, roc_curve
)
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import AdaBoostClassifier
from sklearn.tree import DecisionTreeClassifier
from linearboost import LinearBoostClassifier
import xgboost as xgb
from lightning.pytorch import seed_everything
seed_everything(1986)
# -----------------------------
# Data loading
# -----------------------------
@dataclass
class SplitData:
X_train: np.ndarray
y_train: np.ndarray
seq_train: Optional[np.ndarray]
X_val: np.ndarray
y_val: np.ndarray
seq_val: Optional[np.ndarray]
def _stack_embeddings(col) -> np.ndarray:
# HF datasets often store embeddings as list-of-floats per row
arr = np.asarray(col, dtype=np.float32)
if arr.ndim != 2:
arr = np.stack(col).astype(np.float32)
return arr
def load_split_data(dataset_path: str) -> SplitData:
ds = load_from_disk(dataset_path)
# Case A: DatasetDict with train/val
if isinstance(ds, DatasetDict) and "train" in ds and "val" in ds:
train_ds, val_ds = ds["train"], ds["val"]
else:
# Case B: Single dataset with "split" column
if "split" not in ds.column_names:
raise ValueError(
"Dataset must be a DatasetDict(train/val) or have a 'split' column."
)
train_ds = ds.filter(lambda x: x["split"] == "train")
val_ds = ds.filter(lambda x: x["split"] == "val")
for required in ["embedding", "label"]:
if required not in train_ds.column_names:
raise ValueError(f"Missing column '{required}' in train split.")
if required not in val_ds.column_names:
raise ValueError(f"Missing column '{required}' in val split.")
X_train = _stack_embeddings(train_ds["embedding"])
y_train = np.asarray(train_ds["label"], dtype=np.int64)
X_val = _stack_embeddings(val_ds["embedding"])
y_val = np.asarray(val_ds["label"], dtype=np.int64)
seq_train = None
seq_val = None
if "sequence" in train_ds.column_names:
seq_train = np.asarray(train_ds["sequence"])
if "sequence" in val_ds.column_names:
seq_val = np.asarray(val_ds["sequence"])
return SplitData(X_train, y_train, seq_train, X_val, y_val, seq_val)
# -----------------------------
# Metrics + thresholding
# -----------------------------
def best_f1_threshold(y_true: np.ndarray, y_prob: np.ndarray) -> Tuple[float, float]:
"""
Find threshold maximizing F1 on the given set.
Returns (best_threshold, best_f1).
"""
precision, recall, thresholds = precision_recall_curve(y_true, y_prob)
# precision_recall_curve returns thresholds of length n-1
# compute F1 for those thresholds
f1s = (2 * precision[:-1] * recall[:-1]) / (precision[:-1] + recall[:-1] + 1e-12)
best_idx = int(np.nanargmax(f1s))
return float(thresholds[best_idx]), float(f1s[best_idx])
def eval_binary(y_true: np.ndarray, y_prob: np.ndarray, threshold: float) -> Dict[str, float]:
y_pred = (y_prob >= threshold).astype(int)
return {
"f1": float(f1_score(y_true, y_pred)),
"auc": float(roc_auc_score(y_true, y_prob)),
"ap": float(average_precision_score(y_true, y_prob)),
"threshold": float(threshold),
}
# -----------------------------
# Model factories
# -----------------------------
def train_xgb(
X_train, y_train, X_val, y_val, params: Dict[str, Any]
) -> Tuple[xgb.Booster, np.ndarray, np.ndarray]:
dtrain = xgb.DMatrix(X_train, label=y_train)
dval = xgb.DMatrix(X_val, label=y_val)
num_boost_round = int(params.pop("num_boost_round"))
early_stopping_rounds = int(params.pop("early_stopping_rounds"))
booster = xgb.train(
params=params,
dtrain=dtrain,
num_boost_round=num_boost_round,
evals=[(dval, "val")],
early_stopping_rounds=early_stopping_rounds,
verbose_eval=False,
)
p_train = booster.predict(dtrain)
p_val = booster.predict(dval)
return booster, p_train, p_val
def train_adaboost(
X_train, y_train, X_val, y_val, params: Dict[str, Any]
) -> Tuple[AdaBoostClassifier, np.ndarray, np.ndarray]:
base_depth = int(params.pop("base_depth"))
clf = AdaBoostClassifier(
estimator=DecisionTreeClassifier(max_depth=base_depth),
n_estimators=int(params["n_estimators"]),
learning_rate=float(params["learning_rate"]),
algorithm="SAMME",
)
clf.fit(X_train, y_train)
p_train = clf.predict_proba(X_train)[:, 1]
p_val = clf.predict_proba(X_val)[:, 1]
return clf, p_train, p_val
def train_linearboost(X_train, y_train, X_val, y_val, params):
clf = LinearBoostClassifier(**params)
clf.fit(X_train, y_train)
p_train = clf.predict_proba(X_train)[:, 1]
p_val = clf.predict_proba(X_val)[:, 1]
return clf, p_train, p_val
def suggest_linearboost_params(trial):
# Core boosting params
params = {
"n_estimators": trial.suggest_int("n_estimators", 50, 800),
"learning_rate": trial.suggest_float("learning_rate", 0.01, 1.0, log=True),
"algorithm": trial.suggest_categorical("algorithm", ["SAMME.R", "SAMME"]),
# Scaling choices from docs (you can expand this list if you want)
"scaler": trial.suggest_categorical(
"scaler",
["minmax", "standard", "robust", "quantile-uniform", "quantile-normal", "power"]
),
# useful for imbalanced splits
"class_weight": trial.suggest_categorical("class_weight", [None, "balanced"]),
# kernel trick
"kernel": trial.suggest_categorical("kernel", ["linear", "rbf", "poly", "sigmoid"]),
}
# Kernel-specific params (only when relevant)
if params["kernel"] in ["rbf", "poly"]:
params["gamma"] = trial.suggest_float("gamma", 1e-6, 10.0, log=True)
else:
params["gamma"] = None # docs: default treated as 1/n_features for rbf/poly :contentReference[oaicite:5]{index=5}
if params["kernel"] == "poly":
params["degree"] = trial.suggest_int("degree", 2, 6) # docs default=3 :contentReference[oaicite:6]{index=6}
params["coef0"] = trial.suggest_float("coef0", 0.0, 5.0) # docs default=1 :contentReference[oaicite:7]{index=7}
else:
# safe defaults
params["degree"] = 3
params["coef0"] = 1.0
return params
# -----------------------------
# Saving artifacts
# -----------------------------
def save_predictions_csv(
out_dir: str,
split_name: str,
y_true: np.ndarray,
y_prob: np.ndarray,
threshold: float,
sequences: Optional[np.ndarray] = None,
):
os.makedirs(out_dir, exist_ok=True)
df = pd.DataFrame({
"y_true": y_true.astype(int),
"y_prob": y_prob.astype(float),
"y_pred": (y_prob >= threshold).astype(int),
})
if sequences is not None:
df.insert(0, "sequence", sequences)
df.to_csv(os.path.join(out_dir, f"{split_name}_predictions.csv"), index=False)
def plot_curves(out_dir: str, y_true: np.ndarray, y_prob: np.ndarray):
os.makedirs(out_dir, exist_ok=True)
# PR
precision, recall, _ = precision_recall_curve(y_true, y_prob)
plt.figure()
plt.plot(recall, precision)
plt.xlabel("Recall")
plt.ylabel("Precision")
plt.title("Precision-Recall Curve")
plt.tight_layout()
plt.savefig(os.path.join(out_dir, "pr_curve.png"))
plt.close()
# ROC
fpr, tpr, _ = roc_curve(y_true, y_prob)
plt.figure()
plt.plot(fpr, tpr)
plt.xlabel("False Positive Rate")
plt.ylabel("True Positive Rate")
plt.title("ROC Curve")
plt.tight_layout()
plt.savefig(os.path.join(out_dir, "roc_curve.png"))
plt.close()
# -----------------------------
# Optuna objectives
# -----------------------------
def make_objective(model_name: str, data: SplitData, out_dir: str):
Xtr, ytr, Xva, yva = data.X_train, data.y_train, data.X_val, data.y_val
def objective(trial: optuna.Trial) -> float:
if model_name == "xgb":
params = {
"objective": "binary:logistic",
"eval_metric": "logloss",
"lambda": trial.suggest_float("lambda", 1e-8, 50.0, log=True),
"alpha": trial.suggest_float("alpha", 1e-8, 50.0, log=True),
"colsample_bytree": trial.suggest_float("colsample_bytree", 0.3, 1.0),
"subsample": trial.suggest_float("subsample", 0.5, 1.0),
"learning_rate": trial.suggest_float("learning_rate", 1e-3, 0.3, log=True),
"max_depth": trial.suggest_int("max_depth", 2, 15),
"min_child_weight": trial.suggest_int("min_child_weight", 1, 500),
"gamma": trial.suggest_float("gamma", 0.0, 10.0),
"tree_method": "hist",
"device": "cuda",
}
# Optional GPU: set env CUDA_VISIBLE_DEVICES externally if you want.
# If you *know* you want GPU and your xgboost supports it:
# params["device"] = "cuda"
params["num_boost_round"] = trial.suggest_int("num_boost_round", 50, 1500)
params["early_stopping_rounds"] = trial.suggest_int("early_stopping_rounds", 20, 200)
model, p_tr, p_va = train_xgb(Xtr, ytr, Xva, yva, params.copy())
elif model_name == "adaboost":
params = {
"n_estimators": trial.suggest_int("n_estimators", 50, 800),
"learning_rate": trial.suggest_float("learning_rate", 1e-3, 2.0, log=True),
"base_depth": trial.suggest_int("base_depth", 1, 4),
}
model, p_tr, p_va = train_adaboost(Xtr, ytr, Xva, yva, params)
elif model_name == "linearboost":
params = suggest_linearboost_params(trial)
model, p_tr, p_va = train_linearboost(Xtr, ytr, Xva, yva, params)
else:
raise ValueError(f"Unknown model_name={model_name}")
# Threshold picked on val for fair comparison across models
thr, f1_at_thr = best_f1_threshold(yva, p_va)
metrics = eval_binary(yva, p_va, thr)
# Track best trial artifacts inside the study directory
trial.set_user_attr("threshold", thr)
trial.set_user_attr("auc", metrics["auc"])
trial.set_user_attr("ap", metrics["ap"])
return f1_at_thr
return objective
# -----------------------------
# Main runner
# -----------------------------
def run_optuna_and_refit(
dataset_path: str,
out_dir: str,
model_name: str,
n_trials: int = 200,
):
os.makedirs(out_dir, exist_ok=True)
data = load_split_data(dataset_path)
print(f"[Data] Train: {data.X_train.shape}, Val: {data.X_val.shape}")
study = optuna.create_study(direction="maximize", pruner=optuna.pruners.MedianPruner())
study.optimize(make_objective(model_name, data, out_dir), n_trials=n_trials)
# Save trials table
trials_df = study.trials_dataframe()
trials_df.to_csv(os.path.join(out_dir, "study_trials.csv"), index=False)
best = study.best_trial
best_params = dict(best.params)
best_thr = float(best.user_attrs["threshold"])
best_auc = float(best.user_attrs["auc"])
best_ap = float(best.user_attrs["ap"])
best_f1 = float(best.value)
# Refit best model on train (same protocol as objective)
if model_name == "xgb":
# Reconstruct full param dict
params = {
"objective": "binary:logistic",
"eval_metric": "logloss",
"lambda": best_params["lambda"],
"alpha": best_params["alpha"],
"colsample_bytree": best_params["colsample_bytree"],
"subsample": best_params["subsample"],
"learning_rate": best_params["learning_rate"],
"max_depth": best_params["max_depth"],
"min_child_weight": best_params["min_child_weight"],
"gamma": best_params["gamma"],
"tree_method": "hist",
"num_boost_round": best_params["num_boost_round"],
"early_stopping_rounds": best_params["early_stopping_rounds"],
}
model, p_tr, p_va = train_xgb(
data.X_train, data.y_train, data.X_val, data.y_val, params
)
model_path = os.path.join(out_dir, "best_model.json")
model.save_model(model_path)
elif model_name == "adaboost":
params = best_params
model, p_tr, p_va = train_adaboost(
data.X_train, data.y_train, data.X_val, data.y_val, params
)
model_path = os.path.join(out_dir, "best_model.joblib")
joblib.dump(model, model_path)
elif model_name == "linearboost":
params = best_params
model, p_tr, p_va = train_linearboost(
data.X_train, data.y_train, data.X_val, data.y_val, params
)
model_path = os.path.join(out_dir, "best_model.joblib")
joblib.dump(model, model_path)
else:
raise ValueError(model_name)
# Save predictions CSVs
save_predictions_csv(out_dir, "train", data.y_train, p_tr, best_thr, data.seq_train)
save_predictions_csv(out_dir, "val", data.y_val, p_va, best_thr, data.seq_val)
# Plots on val
plot_curves(out_dir, data.y_val, p_va)
# Summary
summary = [
"=" * 72,
f"MODEL: {model_name}",
f"Best trial: {best.number}",
f"Best F1 (val @ best-threshold): {best_f1:.4f}",
f"Val AUC: {best_auc:.4f}",
f"Val AP: {best_ap:.4f}",
f"Best threshold (picked on val): {best_thr:.4f}",
f"Model saved to: {model_path}",
"Best params:",
json.dumps(best_params, indent=2),
"=" * 72,
]
with open(os.path.join(out_dir, "optimization_summary.txt"), "w") as f:
f.write("\n".join(summary))
print("\n".join(summary))
if __name__ == "__main__":
# Example usage:
# dataset_path = "/vast/projects/pranam/lab/yz927/projects/Classifier_Weight/training_classifiers/data/solubility"
# out_dir = "/vast/projects/pranam/lab/yz927/projects/Classifier_Weight/training_classifiers/src/solubility/xgb"
# run_optuna_and_refit(dataset_path, out_dir, model_name="xgb", n_trials=200)
import argparse
parser = argparse.ArgumentParser()
parser.add_argument("--dataset_path", type=str, required=True)
parser.add_argument("--out_dir", type=str, required=True)
parser.add_argument("--model", type=str, choices=["xgb", "adaboost", "linearboost"], required=True)
parser.add_argument("--n_trials", type=int, default=200)
args = parser.parse_args()
run_optuna_and_refit(
dataset_path=args.dataset_path,
out_dir=args.out_dir,
model_name=args.model,
n_trials=args.n_trials,
)