File size: 6,879 Bytes
1aa7fae |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 |
"""
Model training and experimentation tracking script.
Responsibilities:
- Load prepared train and test datasets (preferably from Hugging Face dataset repo).
- Define a model pipeline (Random Forest by default) and hyperparameter search space.
- Run hyperparameter tuning with cross-validation.
- Log all tuned parameters and evaluation metrics with MLflow.
- Save the best model locally.
- Register/upload the best model to the Hugging Face model hub.
"""
from __future__ import annotations
from typing import Dict, Tuple
import joblib
import mlflow
import mlflow.sklearn # noqa: F401
import numpy as np
import pandas as pd
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score
from sklearn.model_selection import RandomizedSearchCV
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
import config
from hf_data_utils import download_dataset_file
from hf_model_utils import upload_model
def _load_train_test_from_hf_or_local() -> Tuple[pd.DataFrame, pd.DataFrame]:
"""
Load train and test data from the HF dataset repo if available,
otherwise fall back to local CSVs created by data_prep.py.
"""
if config.HF_TOKEN and config.HF_DATASET_REPO:
try:
train_path = download_dataset_file(
filename="data/train.csv",
repo_id=config.HF_DATASET_REPO,
token=config.HF_TOKEN,
local_dir=config.DATA_DIR,
)
test_path = download_dataset_file(
filename="data/test.csv",
repo_id=config.HF_DATASET_REPO,
token=config.HF_TOKEN,
local_dir=config.DATA_DIR,
)
train_df = pd.read_csv(train_path)
test_df = pd.read_csv(test_path)
return train_df, test_df
except Exception:
# Fall back to local
pass
if not config.TRAIN_FILE.exists() or not config.TEST_FILE.exists():
raise FileNotFoundError(
"Train/test files not found locally or in the HF dataset repo. "
"Run data_prep.py first to generate the splits."
)
train_df = pd.read_csv(config.TRAIN_FILE)
test_df = pd.read_csv(config.TEST_FILE)
return train_df, test_df
def _build_model_and_search_space() -> Tuple[Pipeline, Dict]:
"""
Build a sklearn Pipeline and define the hyperparameter search space.
We use a RandomForestClassifier with a StandardScaler on numeric features.
"""
clf = RandomForestClassifier(random_state=config.RANDOM_STATE)
pipeline = Pipeline(
steps=[
("scaler", StandardScaler()),
("clf", clf),
]
)
param_distributions = {
"clf__n_estimators": [100, 200, 300, 400],
"clf__max_depth": [None, 5, 10, 20],
"clf__min_samples_split": [2, 5, 10],
"clf__min_samples_leaf": [1, 2, 4],
# 'auto' is deprecated in recent sklearn versions; use valid options only
"clf__max_features": ["sqrt", "log2", None],
"clf__bootstrap": [True, False],
}
return pipeline, param_distributions
def _evaluate_model(
model: Pipeline, X_test: pd.DataFrame, y_test: pd.Series
) -> Dict[str, float]:
"""
Compute standard binary classification metrics.
"""
y_pred = model.predict(X_test)
metrics = {
"accuracy": accuracy_score(y_test, y_pred),
"precision": precision_score(y_test, y_pred, zero_division=0),
"recall": recall_score(y_test, y_pred, zero_division=0),
"f1": f1_score(y_test, y_pred, zero_division=0),
}
return metrics
def main() -> None:
"""
Execute the training, tuning, logging, and model registration pipeline.
"""
print("Loading train and test data...")
train_df, test_df = _load_train_test_from_hf_or_local()
X_train = train_df[config.FEATURE_COLUMNS]
y_train = train_df[config.TARGET_COLUMN]
X_test = test_df[config.FEATURE_COLUMNS]
y_test = test_df[config.TARGET_COLUMN]
print("Building model and hyperparameter search space...")
pipeline, param_distributions = _build_model_and_search_space()
search = RandomizedSearchCV(
estimator=pipeline,
param_distributions=param_distributions,
n_iter=20,
cv=5,
scoring="f1",
n_jobs=-1,
verbose=1,
random_state=config.RANDOM_STATE,
)
# Configure MLflow
mlflow.set_tracking_uri(config.MLFLOW_TRACKING_URI)
mlflow.set_experiment(config.MLFLOW_EXPERIMENT_NAME)
print("Starting hyperparameter tuning with MLflow tracking...")
with mlflow.start_run(run_name="RandomForest_random_search"):
search.fit(X_train, y_train)
best_model: Pipeline = search.best_estimator_
best_params = search.best_params_
# Log all evaluated parameter combinations as nested runs,
# similar to the reference notebook pattern.
results = search.cv_results_
for i in range(len(results["params"])):
param_set = results["params"][i]
mean_score = results["mean_test_score"][i]
with mlflow.start_run(nested=True):
mlflow.log_params(param_set)
mlflow.log_metric("mean_cv_f1", float(mean_score))
# Evaluation
metrics = _evaluate_model(best_model, X_test, y_test)
# Log parameters and metrics
mlflow.log_params(best_params)
for name, value in metrics.items():
mlflow.log_metric(name, float(value))
# Save model locally
config.MODELS_DIR.mkdir(parents=True, exist_ok=True)
joblib.dump(best_model, config.BEST_MODEL_LOCAL_PATH)
mlflow.log_artifact(str(config.BEST_MODEL_LOCAL_PATH), artifact_path="artifacts")
# Also log the model in MLflow's model registry format
mlflow.sklearn.log_model(best_model, artifact_path="engine_model")
print("Best parameters found:")
for k, v in best_params.items():
print(f" {k}: {v}")
print("Evaluation metrics on test set:")
for k, v in metrics.items():
print(f" {k}: {v:.4f}")
# Upload best model to Hugging Face model hub, if configured
if config.HF_TOKEN and config.HF_MODEL_REPO:
try:
print("Uploading best model to Hugging Face model hub...")
upload_model(
local_model_path=config.BEST_MODEL_LOCAL_PATH,
repo_id=config.HF_MODEL_REPO,
repo_path="model.joblib",
token=config.HF_TOKEN,
)
print("Model upload to Hugging Face completed.")
except Exception as e:
print(f"Warning: Failed to upload model to Hugging Face: {e}")
if __name__ == "__main__":
main()
|