Spaces:
Sleeping
Sleeping
| # ruff: noqa: E402 | |
| import json | |
| from pathlib import Path | |
| import sys | |
| import pandas as pd | |
| PROJECT_ROOT = Path(__file__).resolve().parents[1] | |
| if str(PROJECT_ROOT) not in sys.path: | |
| sys.path.insert(0, str(PROJECT_ROOT)) | |
| from src.serving.monitoring import DEFAULT_INFERENCE_LOG_PATH | |
| from src.shared.config import settings | |
| from src.training.data_loader import clean_data, load_raw_data, load_store_data | |
| from src.training.features import apply_feature_pipeline, build_feature_matrix | |
| def load_recent_inference_rows(log_path: Path) -> pd.DataFrame: | |
| if not log_path.exists(): | |
| return pd.DataFrame() | |
| records = [json.loads(line) for line in log_path.read_text(encoding="utf-8").splitlines() if line.strip()] | |
| if not records: | |
| return pd.DataFrame() | |
| request_df = pd.DataFrame(records) | |
| store_df = load_store_data(settings.data.store_path) | |
| request_df = request_df.rename( | |
| columns={ | |
| "store": "Store", | |
| "start_date": "Date", | |
| "promo": "Promo", | |
| "state_holiday": "StateHoliday", | |
| "school_holiday": "SchoolHoliday", | |
| } | |
| ) | |
| merged = request_df.merge(store_df, on="Store", how="left") | |
| merged["Open"] = 1 | |
| for col in ["Promo2", "Promo2SinceWeek", "Promo2SinceYear"]: | |
| merged[col] = merged[col].fillna(0).astype(int) | |
| return merged | |
| def build_drift_report() -> dict[str, object]: | |
| train_df = load_raw_data(settings.data.train_path, settings.data.store_path) | |
| train_df = clean_data(train_df) | |
| train_df = apply_feature_pipeline( | |
| train_df, | |
| fourier_period=settings.pipeline.fourier_period, | |
| fourier_order=settings.pipeline.fourier_order, | |
| ) | |
| train_features = build_feature_matrix(train_df, settings.data.features) | |
| inference_df = load_recent_inference_rows(DEFAULT_INFERENCE_LOG_PATH) | |
| if inference_df.empty: | |
| return { | |
| "status": "no_inference_logs", | |
| "log_path": str(DEFAULT_INFERENCE_LOG_PATH), | |
| } | |
| inference_df = apply_feature_pipeline( | |
| inference_df, | |
| fourier_period=settings.pipeline.fourier_period, | |
| fourier_order=settings.pipeline.fourier_order, | |
| ) | |
| inference_features = build_feature_matrix(inference_df, settings.data.features) | |
| drift_rows = [] | |
| for col in inference_features.columns: | |
| train_mean = float(train_features[col].mean()) | |
| inference_mean = float(inference_features[col].mean()) | |
| train_std = float(train_features[col].std(ddof=0)) | |
| drift_score = abs(inference_mean - train_mean) / max(train_std, 1e-6) | |
| drift_rows.append( | |
| { | |
| "feature": col, | |
| "train_mean": round(train_mean, 4), | |
| "inference_mean": round(inference_mean, 4), | |
| "train_std": round(train_std, 4), | |
| "normalized_mean_shift": round(float(drift_score), 4), | |
| } | |
| ) | |
| drift_rows.sort(key=lambda row: row["normalized_mean_shift"], reverse=True) | |
| return { | |
| "status": "ok", | |
| "log_path": str(DEFAULT_INFERENCE_LOG_PATH), | |
| "num_inference_events": int(len(inference_df)), | |
| "top_drift_features": drift_rows[:10], | |
| } | |
| def main() -> Path: | |
| report = build_drift_report() | |
| output_path = Path("metrics/drift_report.json") | |
| output_path.parent.mkdir(parents=True, exist_ok=True) | |
| with output_path.open("w", encoding="utf-8") as f: | |
| json.dump(report, f, indent=2) | |
| print(f"Drift report written to {output_path}") | |
| return output_path | |
| if __name__ == "__main__": | |
| main() | |