ymlin105's picture
refactor: flatten metrics paths and polish project presentation
275e9d5
# ruff: noqa: E402
import numpy as np
import xgboost as xgb
import logging
import json
from pathlib import Path
import sys
import subprocess
from datetime import datetime, timezone
PROJECT_ROOT = Path(__file__).resolve().parents[2]
if str(PROJECT_ROOT) not in sys.path:
sys.path.insert(0, str(PROJECT_ROOT))
from src.shared.config import DEFAULT_MODEL_METADATA_PATH, DEFAULT_MODEL_PATH, settings
from src.shared.mlflow_utils import start_run
from src.training.data_loader import load_raw_data, clean_data
from src.training.features import (
apply_feature_pipeline,
build_feature_matrix
)
from src.training.splits import holdout_masks
# Setup logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
def build_model_metadata(model_path: Path) -> dict[str, str]:
"""Builds lightweight metadata for the generated model artifact."""
git_hash = "unknown"
try:
git_hash = subprocess.check_output(
["git", "rev-parse", "--short", "HEAD"],
cwd=PROJECT_ROOT,
text=True,
).strip()
except Exception:
logger.info("Unable to read git hash; using 'unknown' in model metadata.")
timestamp = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ")
version = f"{timestamp}-{git_hash}"
return {
"model_version": version,
"created_at_utc": timestamp,
"git_short_hash": git_hash,
"model_path": str(model_path),
}
def rmspe(y_true: np.ndarray, y_pred: np.ndarray) -> float:
"""Computes RMSPE in percentage points."""
safe_true = np.clip(y_true, a_min=1.0, a_max=None)
return float(np.sqrt(np.mean(np.square((y_true - y_pred) / safe_true))) * 100)
def run_training(
model_path: str = str(DEFAULT_MODEL_PATH),
metrics_path: str = "metrics/training_summary.json",
metadata_path: str = str(DEFAULT_MODEL_METADATA_PATH),
) -> dict:
"""Runs training, records validation metadata, and saves the final model."""
logger.info("Starting Rossmann training pipeline")
# 1. Load and Clean Data
df = load_raw_data(settings.data.train_path, settings.data.store_path)
df = clean_data(df)
# 2. Feature Engineering
logger.info("Applying feature engineering...")
df = apply_feature_pipeline(
df,
fourier_period=settings.pipeline.fourier_period,
fourier_order=settings.pipeline.fourier_order,
)
# 3. Final Feature Matrix Construction
feature_cols = settings.data.features
X = build_feature_matrix(df, feature_cols)
# Target transformation (Log)
y = np.log1p(df[settings.data.target])
# 4. Simple time-based validation
params = settings.model_params.get("xgboost", {})
metrics = {
"num_rows": int(len(df)),
"num_features": int(len(feature_cols)),
"validation_days": 42,
"model_params": params,
}
validation_mask, validation_start, validation_end = holdout_masks(df["Date"], validation_days=42)
metrics["validation_start_date"] = validation_start.strftime("%Y-%m-%d")
metrics["validation_end_date"] = validation_end.strftime("%Y-%m-%d")
if validation_mask.any() and (~validation_mask).any():
train_model = xgb.XGBRegressor(**params)
train_model.fit(X.loc[~validation_mask], y.loc[~validation_mask])
y_train_actual = np.expm1(y.loc[~validation_mask].to_numpy())
y_train_pred = np.expm1(train_model.predict(X.loc[~validation_mask]))
y_valid = np.expm1(y.loc[validation_mask].to_numpy())
y_pred = np.expm1(train_model.predict(X.loc[validation_mask]))
metrics["train_rmspe"] = round(rmspe(y_train_actual, y_train_pred), 4)
metrics["validation_rmspe"] = round(rmspe(y_valid, y_pred), 4)
metrics["validation_rows"] = int(validation_mask.sum())
logger.info("Validation RMSPE: %.4f%%", metrics["validation_rmspe"])
# 5. Train final model on all available data and save it
final_model = xgb.XGBRegressor(**params)
logger.info("Fitting final XGBoost model...")
final_model.fit(X, y)
output_path = Path(model_path)
output_path.parent.mkdir(parents=True, exist_ok=True)
final_model.save_model(str(output_path))
summary_path = Path(metrics_path)
summary_path.parent.mkdir(parents=True, exist_ok=True)
with summary_path.open("w", encoding="utf-8") as f:
json.dump(metrics, f, indent=2)
metadata_output_path = Path(metadata_path)
metadata_output_path.parent.mkdir(parents=True, exist_ok=True)
model_metadata = build_model_metadata(output_path)
with metadata_output_path.open("w", encoding="utf-8") as f:
json.dump(model_metadata, f, indent=2)
run_name = f"xgb_holdout_{metrics['validation_start_date']}_{metrics['validation_end_date']}"
with start_run(run_name, experiment_name="rossmann-training") as run:
if run is not None:
import mlflow
mlflow.log_params(params)
mlflow.log_param("num_rows", metrics["num_rows"])
mlflow.log_param("num_features", metrics["num_features"])
mlflow.log_param("validation_days", metrics["validation_days"])
mlflow.log_param("validation_start_date", metrics["validation_start_date"])
mlflow.log_param("validation_end_date", metrics["validation_end_date"])
if "train_rmspe" in metrics:
mlflow.log_metric("train_rmspe", metrics["train_rmspe"])
if "validation_rmspe" in metrics:
mlflow.log_metric("validation_rmspe", metrics["validation_rmspe"])
mlflow.log_artifact(str(output_path))
mlflow.log_artifact(str(summary_path))
mlflow.log_artifact(str(metadata_output_path))
logger.info("Model saved to %s", output_path)
logger.info("Training summary written to %s", summary_path)
logger.info("Model metadata written to %s", metadata_output_path)
logger.info("Training pipeline completed successfully.")
return metrics
if __name__ == "__main__":
run_training()