Spaces:
Sleeping
Sleeping
| # ruff: noqa: E402 | |
| import json | |
| import logging | |
| from pathlib import Path | |
| import sys | |
| from typing import Any | |
| import numpy as np | |
| import pandas as pd | |
| import xgboost as xgb | |
| PROJECT_ROOT = Path(__file__).resolve().parents[1] | |
| if str(PROJECT_ROOT) not in sys.path: | |
| sys.path.insert(0, str(PROJECT_ROOT)) | |
| from src.shared.config import settings | |
| from src.shared.mlflow_utils import start_run | |
| from src.training.data_loader import clean_data, load_raw_data | |
| from src.training.features import apply_feature_pipeline, build_feature_matrix | |
| from src.training.splits import holdout_masks, rolling_date_windows | |
| logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") | |
| logger = logging.getLogger(__name__) | |
| ORIGINAL_CONFIG = { | |
| "n_estimators": 400, | |
| "learning_rate": 0.05, | |
| "max_depth": 8, | |
| "subsample": 0.8, | |
| "colsample_bytree": 0.8, | |
| "random_state": 42, | |
| "n_jobs": -1, | |
| "verbosity": 0, | |
| "objective": "reg:squarederror", | |
| } | |
| def rmspe(y_true: np.ndarray, y_pred: np.ndarray) -> float: | |
| safe_true = np.clip(y_true, a_min=1.0, a_max=None) | |
| return float(np.sqrt(np.mean(np.square((y_true - y_pred) / safe_true))) * 100) | |
| def prepare_dataset() -> tuple[pd.DataFrame, pd.DataFrame, np.ndarray, pd.Series]: | |
| logger.info("Loading Rossmann data for evaluation") | |
| df = load_raw_data(settings.data.train_path, settings.data.store_path) | |
| df = clean_data(df) | |
| df = apply_feature_pipeline( | |
| df, | |
| fourier_period=settings.pipeline.fourier_period, | |
| fourier_order=settings.pipeline.fourier_order, | |
| ) | |
| X = build_feature_matrix(df, settings.data.features) | |
| y = df[settings.data.target].to_numpy() | |
| dates = pd.to_datetime(df["Date"]).dt.normalize() | |
| return df, X, y, dates | |
| def baseline_predict(train_df: pd.DataFrame, valid_df: pd.DataFrame) -> np.ndarray: | |
| global_mean = float(train_df["Sales"].mean()) | |
| store_dow_mean = train_df.groupby(["Store", "DayOfWeek"])["Sales"].mean() | |
| store_mean = train_df.groupby("Store")["Sales"].mean() | |
| valid_keys = list(zip(valid_df["Store"], valid_df["DayOfWeek"])) | |
| return np.array( | |
| [store_dow_mean.get(key, store_mean.get(key[0], global_mean)) for key in valid_keys], | |
| dtype=float, | |
| ) | |
| def fit_and_score( | |
| X_train: pd.DataFrame, | |
| X_valid: pd.DataFrame, | |
| y_train: np.ndarray, | |
| y_valid: np.ndarray, | |
| params: dict[str, Any], | |
| ) -> dict[str, float]: | |
| model = xgb.XGBRegressor(**params) | |
| model.fit(X_train, np.log1p(y_train)) | |
| train_pred = np.expm1(model.predict(X_train)) | |
| valid_pred = np.expm1(model.predict(X_valid)) | |
| return { | |
| "train_rmspe": round(rmspe(y_train, train_pred), 4), | |
| "valid_rmspe": round(rmspe(y_valid, valid_pred), 4), | |
| } | |
| def holdout_evaluation( | |
| df: pd.DataFrame, | |
| X: pd.DataFrame, | |
| y: np.ndarray, | |
| dates: pd.Series, | |
| validation_days: int, | |
| ) -> dict[str, Any]: | |
| valid_mask, validation_start, validation_end = holdout_masks(dates, validation_days) | |
| train_mask = ~valid_mask | |
| train_df = df.loc[train_mask] | |
| valid_df = df.loc[valid_mask] | |
| y_valid = y[valid_mask] | |
| baseline_score = rmspe(y_valid, baseline_predict(train_df, valid_df)) | |
| original_score = fit_and_score( | |
| X.loc[train_mask], | |
| X.loc[valid_mask], | |
| y[train_mask], | |
| y_valid, | |
| ORIGINAL_CONFIG, | |
| ) | |
| tuned_score = fit_and_score( | |
| X.loc[train_mask], | |
| X.loc[valid_mask], | |
| y[train_mask], | |
| y_valid, | |
| settings.model_params["xgboost"], | |
| ) | |
| return { | |
| "validation_days": validation_days, | |
| "validation_start_date": validation_start.strftime("%Y-%m-%d"), | |
| "validation_end_date": validation_end.strftime("%Y-%m-%d"), | |
| "rows_train": int(train_mask.sum()), | |
| "rows_valid": int(valid_mask.sum()), | |
| "baseline_rmspe": round(baseline_score, 4), | |
| "pre_tuning_model": original_score, | |
| "tuned_model": tuned_score, | |
| "tuned_improvement_vs_pre_tuning": round( | |
| original_score["valid_rmspe"] - tuned_score["valid_rmspe"], 4 | |
| ), | |
| "tuned_improvement_vs_baseline": round( | |
| baseline_score - tuned_score["valid_rmspe"], 4 | |
| ), | |
| } | |
| def rolling_backtest( | |
| df: pd.DataFrame, | |
| X: pd.DataFrame, | |
| y: np.ndarray, | |
| dates: pd.Series, | |
| validation_days: int, | |
| windows: int, | |
| ) -> list[dict[str, Any]]: | |
| date_windows = rolling_date_windows(dates, validation_days, windows) | |
| results: list[dict[str, Any]] = [] | |
| for index, window_dates in enumerate(date_windows, start=1): | |
| train_mask = dates < window_dates.min() | |
| valid_mask = dates.isin(window_dates) | |
| train_df = df.loc[train_mask] | |
| valid_df = df.loc[valid_mask] | |
| y_valid = y[valid_mask] | |
| baseline_score = rmspe(y_valid, baseline_predict(train_df, valid_df)) | |
| tuned_score = fit_and_score( | |
| X.loc[train_mask], | |
| X.loc[valid_mask], | |
| y[train_mask], | |
| y_valid, | |
| settings.model_params["xgboost"], | |
| ) | |
| results.append( | |
| { | |
| "window": index, | |
| "validation_start_date": window_dates.min().strftime("%Y-%m-%d"), | |
| "validation_end_date": window_dates.max().strftime("%Y-%m-%d"), | |
| "rows_train": int(train_mask.sum()), | |
| "rows_valid": int(valid_mask.sum()), | |
| "baseline_rmspe": round(baseline_score, 4), | |
| "tuned_valid_rmspe": tuned_score["valid_rmspe"], | |
| "improvement_vs_baseline": round(baseline_score - tuned_score["valid_rmspe"], 4), | |
| } | |
| ) | |
| return results | |
| def build_summary(holdout: dict[str, Any], backtest: list[dict[str, Any]]) -> dict[str, Any]: | |
| backtest_scores = [window["tuned_valid_rmspe"] for window in backtest] | |
| baseline_scores = [window["baseline_rmspe"] for window in backtest] | |
| return { | |
| "dataset_rows_after_cleaning": holdout["rows_train"] + holdout["rows_valid"], | |
| "holdout": holdout, | |
| "rolling_backtest": backtest, | |
| "rolling_backtest_summary": { | |
| "windows": len(backtest), | |
| "average_tuned_rmspe": round(float(np.mean(backtest_scores)), 4), | |
| "average_baseline_rmspe": round(float(np.mean(baseline_scores)), 4), | |
| "average_improvement_vs_baseline": round( | |
| float(np.mean(np.array(baseline_scores) - np.array(backtest_scores))), 4 | |
| ), | |
| }, | |
| "selected_model_params": settings.model_params["xgboost"], | |
| } | |
| def main() -> Path: | |
| df, X, y, dates = prepare_dataset() | |
| holdout = holdout_evaluation(df, X, y, dates, validation_days=42) | |
| backtest = rolling_backtest(df, X, y, dates, validation_days=42, windows=3) | |
| summary = build_summary(holdout, backtest) | |
| output_path = Path("metrics/model_evaluation.json") | |
| output_path.parent.mkdir(parents=True, exist_ok=True) | |
| with output_path.open("w", encoding="utf-8") as f: | |
| json.dump(summary, f, indent=2) | |
| run_name = f"xgb_backtest_{holdout['validation_start_date']}_{holdout['validation_end_date']}" | |
| with start_run(run_name, experiment_name="rossmann-evaluation") as run: | |
| if run is not None: | |
| import mlflow | |
| mlflow.log_param("validation_days", holdout["validation_days"]) | |
| mlflow.log_param("backtest_windows", summary["rolling_backtest_summary"]["windows"]) | |
| mlflow.log_metric("holdout_baseline_rmspe", holdout["baseline_rmspe"]) | |
| mlflow.log_metric("holdout_tuned_rmspe", holdout["tuned_model"]["valid_rmspe"]) | |
| mlflow.log_metric( | |
| "average_backtest_rmspe", | |
| summary["rolling_backtest_summary"]["average_tuned_rmspe"], | |
| ) | |
| mlflow.log_metric( | |
| "average_backtest_improvement_vs_baseline", | |
| summary["rolling_backtest_summary"]["average_improvement_vs_baseline"], | |
| ) | |
| mlflow.log_artifact(str(output_path)) | |
| logger.info("Evaluation summary written to %s", output_path) | |
| return output_path | |
| if __name__ == "__main__": | |
| main() | |