Rossmann-Store-Sales / scripts /check_drift.py
ymlin105's picture
refactor: flatten metrics paths and polish project presentation
275e9d5
# ruff: noqa: E402
import json
from pathlib import Path
import sys
import pandas as pd
PROJECT_ROOT = Path(__file__).resolve().parents[1]
if str(PROJECT_ROOT) not in sys.path:
sys.path.insert(0, str(PROJECT_ROOT))
from src.serving.monitoring import DEFAULT_INFERENCE_LOG_PATH
from src.shared.config import settings
from src.training.data_loader import clean_data, load_raw_data, load_store_data
from src.training.features import apply_feature_pipeline, build_feature_matrix
def load_recent_inference_rows(log_path: Path) -> pd.DataFrame:
if not log_path.exists():
return pd.DataFrame()
records = [json.loads(line) for line in log_path.read_text(encoding="utf-8").splitlines() if line.strip()]
if not records:
return pd.DataFrame()
request_df = pd.DataFrame(records)
store_df = load_store_data(settings.data.store_path)
request_df = request_df.rename(
columns={
"store": "Store",
"start_date": "Date",
"promo": "Promo",
"state_holiday": "StateHoliday",
"school_holiday": "SchoolHoliday",
}
)
merged = request_df.merge(store_df, on="Store", how="left")
merged["Open"] = 1
for col in ["Promo2", "Promo2SinceWeek", "Promo2SinceYear"]:
merged[col] = merged[col].fillna(0).astype(int)
return merged
def build_drift_report() -> dict[str, object]:
train_df = load_raw_data(settings.data.train_path, settings.data.store_path)
train_df = clean_data(train_df)
train_df = apply_feature_pipeline(
train_df,
fourier_period=settings.pipeline.fourier_period,
fourier_order=settings.pipeline.fourier_order,
)
train_features = build_feature_matrix(train_df, settings.data.features)
inference_df = load_recent_inference_rows(DEFAULT_INFERENCE_LOG_PATH)
if inference_df.empty:
return {
"status": "no_inference_logs",
"log_path": str(DEFAULT_INFERENCE_LOG_PATH),
}
inference_df = apply_feature_pipeline(
inference_df,
fourier_period=settings.pipeline.fourier_period,
fourier_order=settings.pipeline.fourier_order,
)
inference_features = build_feature_matrix(inference_df, settings.data.features)
drift_rows = []
for col in inference_features.columns:
train_mean = float(train_features[col].mean())
inference_mean = float(inference_features[col].mean())
train_std = float(train_features[col].std(ddof=0))
drift_score = abs(inference_mean - train_mean) / max(train_std, 1e-6)
drift_rows.append(
{
"feature": col,
"train_mean": round(train_mean, 4),
"inference_mean": round(inference_mean, 4),
"train_std": round(train_std, 4),
"normalized_mean_shift": round(float(drift_score), 4),
}
)
drift_rows.sort(key=lambda row: row["normalized_mean_shift"], reverse=True)
return {
"status": "ok",
"log_path": str(DEFAULT_INFERENCE_LOG_PATH),
"num_inference_events": int(len(inference_df)),
"top_drift_features": drift_rows[:10],
}
def main() -> Path:
report = build_drift_report()
output_path = Path("metrics/drift_report.json")
output_path.parent.mkdir(parents=True, exist_ok=True)
with output_path.open("w", encoding="utf-8") as f:
json.dump(report, f, indent=2)
print(f"Drift report written to {output_path}")
return output_path
if __name__ == "__main__":
main()