Spaces:
Sleeping
Sleeping
| # ruff: noqa: E402 | |
| import numpy as np | |
| import xgboost as xgb | |
| import logging | |
| import json | |
| from pathlib import Path | |
| import sys | |
| import subprocess | |
| from datetime import datetime, timezone | |
| PROJECT_ROOT = Path(__file__).resolve().parents[2] | |
| if str(PROJECT_ROOT) not in sys.path: | |
| sys.path.insert(0, str(PROJECT_ROOT)) | |
| from src.shared.config import DEFAULT_MODEL_METADATA_PATH, DEFAULT_MODEL_PATH, settings | |
| from src.shared.mlflow_utils import start_run | |
| from src.training.data_loader import load_raw_data, clean_data | |
| from src.training.features import ( | |
| apply_feature_pipeline, | |
| build_feature_matrix | |
| ) | |
| from src.training.splits import holdout_masks | |
| # Setup logging | |
| logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') | |
| logger = logging.getLogger(__name__) | |
| def build_model_metadata(model_path: Path) -> dict[str, str]: | |
| """Builds lightweight metadata for the generated model artifact.""" | |
| git_hash = "unknown" | |
| try: | |
| git_hash = subprocess.check_output( | |
| ["git", "rev-parse", "--short", "HEAD"], | |
| cwd=PROJECT_ROOT, | |
| text=True, | |
| ).strip() | |
| except Exception: | |
| logger.info("Unable to read git hash; using 'unknown' in model metadata.") | |
| timestamp = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ") | |
| version = f"{timestamp}-{git_hash}" | |
| return { | |
| "model_version": version, | |
| "created_at_utc": timestamp, | |
| "git_short_hash": git_hash, | |
| "model_path": str(model_path), | |
| } | |
| def rmspe(y_true: np.ndarray, y_pred: np.ndarray) -> float: | |
| """Computes RMSPE in percentage points.""" | |
| safe_true = np.clip(y_true, a_min=1.0, a_max=None) | |
| return float(np.sqrt(np.mean(np.square((y_true - y_pred) / safe_true))) * 100) | |
| def run_training( | |
| model_path: str = str(DEFAULT_MODEL_PATH), | |
| metrics_path: str = "metrics/training_summary.json", | |
| metadata_path: str = str(DEFAULT_MODEL_METADATA_PATH), | |
| ) -> dict: | |
| """Runs training, records validation metadata, and saves the final model.""" | |
| logger.info("Starting Rossmann training pipeline") | |
| # 1. Load and Clean Data | |
| df = load_raw_data(settings.data.train_path, settings.data.store_path) | |
| df = clean_data(df) | |
| # 2. Feature Engineering | |
| logger.info("Applying feature engineering...") | |
| df = apply_feature_pipeline( | |
| df, | |
| fourier_period=settings.pipeline.fourier_period, | |
| fourier_order=settings.pipeline.fourier_order, | |
| ) | |
| # 3. Final Feature Matrix Construction | |
| feature_cols = settings.data.features | |
| X = build_feature_matrix(df, feature_cols) | |
| # Target transformation (Log) | |
| y = np.log1p(df[settings.data.target]) | |
| # 4. Simple time-based validation | |
| params = settings.model_params.get("xgboost", {}) | |
| metrics = { | |
| "num_rows": int(len(df)), | |
| "num_features": int(len(feature_cols)), | |
| "validation_days": 42, | |
| "model_params": params, | |
| } | |
| validation_mask, validation_start, validation_end = holdout_masks(df["Date"], validation_days=42) | |
| metrics["validation_start_date"] = validation_start.strftime("%Y-%m-%d") | |
| metrics["validation_end_date"] = validation_end.strftime("%Y-%m-%d") | |
| if validation_mask.any() and (~validation_mask).any(): | |
| train_model = xgb.XGBRegressor(**params) | |
| train_model.fit(X.loc[~validation_mask], y.loc[~validation_mask]) | |
| y_train_actual = np.expm1(y.loc[~validation_mask].to_numpy()) | |
| y_train_pred = np.expm1(train_model.predict(X.loc[~validation_mask])) | |
| y_valid = np.expm1(y.loc[validation_mask].to_numpy()) | |
| y_pred = np.expm1(train_model.predict(X.loc[validation_mask])) | |
| metrics["train_rmspe"] = round(rmspe(y_train_actual, y_train_pred), 4) | |
| metrics["validation_rmspe"] = round(rmspe(y_valid, y_pred), 4) | |
| metrics["validation_rows"] = int(validation_mask.sum()) | |
| logger.info("Validation RMSPE: %.4f%%", metrics["validation_rmspe"]) | |
| # 5. Train final model on all available data and save it | |
| final_model = xgb.XGBRegressor(**params) | |
| logger.info("Fitting final XGBoost model...") | |
| final_model.fit(X, y) | |
| output_path = Path(model_path) | |
| output_path.parent.mkdir(parents=True, exist_ok=True) | |
| final_model.save_model(str(output_path)) | |
| summary_path = Path(metrics_path) | |
| summary_path.parent.mkdir(parents=True, exist_ok=True) | |
| with summary_path.open("w", encoding="utf-8") as f: | |
| json.dump(metrics, f, indent=2) | |
| metadata_output_path = Path(metadata_path) | |
| metadata_output_path.parent.mkdir(parents=True, exist_ok=True) | |
| model_metadata = build_model_metadata(output_path) | |
| with metadata_output_path.open("w", encoding="utf-8") as f: | |
| json.dump(model_metadata, f, indent=2) | |
| run_name = f"xgb_holdout_{metrics['validation_start_date']}_{metrics['validation_end_date']}" | |
| with start_run(run_name, experiment_name="rossmann-training") as run: | |
| if run is not None: | |
| import mlflow | |
| mlflow.log_params(params) | |
| mlflow.log_param("num_rows", metrics["num_rows"]) | |
| mlflow.log_param("num_features", metrics["num_features"]) | |
| mlflow.log_param("validation_days", metrics["validation_days"]) | |
| mlflow.log_param("validation_start_date", metrics["validation_start_date"]) | |
| mlflow.log_param("validation_end_date", metrics["validation_end_date"]) | |
| if "train_rmspe" in metrics: | |
| mlflow.log_metric("train_rmspe", metrics["train_rmspe"]) | |
| if "validation_rmspe" in metrics: | |
| mlflow.log_metric("validation_rmspe", metrics["validation_rmspe"]) | |
| mlflow.log_artifact(str(output_path)) | |
| mlflow.log_artifact(str(summary_path)) | |
| mlflow.log_artifact(str(metadata_output_path)) | |
| logger.info("Model saved to %s", output_path) | |
| logger.info("Training summary written to %s", summary_path) | |
| logger.info("Model metadata written to %s", metadata_output_path) | |
| logger.info("Training pipeline completed successfully.") | |
| return metrics | |
| if __name__ == "__main__": | |
| run_training() | |