"""Audit the cleaned Food Delivery dataset and emit a data-quality report. Read-only profiling that quantifies the things that matter for a cleanly running model: - missingness per column - duplicate rows (full row + by ID) - target distribution and IQR-based outlier bounds - coordinate plausibility (share inside the expected geographic bounds) - delivery distance distribution - categorical cardinality and value counts - a leakage note for time-of-pickup columns The result is written to ``models/data_quality_report.json``. This script does not modify the data; it only reads and reports. Usage: python -m src.ml.audit_data """ from __future__ import annotations import json import sys from pathlib import Path import numpy as np import pandas as pd sys.path.insert(0, str(Path(__file__).resolve().parents[2])) from src.config import ( # noqa: E402 GEO_LAT_BOUNDS, GEO_LON_BOUNDS, MODELS_DIR, PROCESSED_DIR, ) from src.ml.feature_engineering import haversine_km # noqa: E402 CLEAN_CSV = PROCESSED_DIR / "food_delivery_clean.csv" REPORT_PATH = MODELS_DIR / "data_quality_report.json" TARGET = "Time_taken(min)" COORD_COLS = [ "Restaurant_latitude", "Restaurant_longitude", "Delivery_location_latitude", "Delivery_location_longitude", ] CATEGORICAL_COLS = [ "Weatherconditions", "Road_traffic_density", "Type_of_order", "Type_of_vehicle", "Festival", "City", ] # Columns that describe the courier picking up the order; using them as features # would leak post-order information into a preparation-time prediction. LEAKAGE_CANDIDATES = ["Time_Order_picked"] def _missingness(df: pd.DataFrame) -> dict: miss = (df.isna().mean() * 100).round(2) return {c: float(v) for c, v in miss.items() if v > 0} def _target_stats(df: pd.DataFrame) -> dict: if TARGET not in df.columns: return {} t = pd.to_numeric(df[TARGET], errors="coerce").dropna() q1, q3 = t.quantile(0.25), t.quantile(0.75) iqr = q3 - q1 lower, upper = q1 - 1.5 * iqr, q3 + 1.5 * iqr n_outliers = int(((t < lower) | (t > upper)).sum()) return { "count": int(t.count()), "min": float(t.min()), "max": float(t.max()), "mean": round(float(t.mean()), 2), "std": round(float(t.std()), 2), "iqr_bounds": [round(float(lower), 2), round(float(upper), 2)], "n_outliers_iqr": n_outliers, "pct_outliers_iqr": round(100 * n_outliers / max(len(t), 1), 2), } def _coordinate_validity(df: pd.DataFrame) -> dict: if not set(COORD_COLS).issubset(df.columns): return {} lat_cols = ["Restaurant_latitude", "Delivery_location_latitude"] lon_cols = ["Restaurant_longitude", "Delivery_location_longitude"] out: dict = {} for c in lat_cols: s = pd.to_numeric(df[c], errors="coerce") in_range = s.between(*GEO_LAT_BOUNDS) out[c] = { "missing": int(s.isna().sum()), "out_of_range": int((~in_range & s.notna()).sum()), } for c in lon_cols: s = pd.to_numeric(df[c], errors="coerce") in_range = s.between(*GEO_LON_BOUNDS) out[c] = { "missing": int(s.isna().sum()), "out_of_range": int((~in_range & s.notna()).sum()), } return out def _distance_stats(df: pd.DataFrame) -> dict: if not set(COORD_COLS).issubset(df.columns): return {} d = haversine_km( df["Restaurant_latitude"], df["Restaurant_longitude"], df["Delivery_location_latitude"], df["Delivery_location_longitude"], ) valid = d.dropna() return { "count": int(valid.count()), "missing": int(d.isna().sum()), "min": round(float(valid.min()), 2) if len(valid) else None, "median": round(float(valid.median()), 2) if len(valid) else None, "max": round(float(valid.max()), 2) if len(valid) else None, "n_gt_50km": int((valid > 50).sum()), "n_near_zero": int((valid < 0.1).sum()), } def _categoricals(df: pd.DataFrame) -> dict: out: dict = {} for c in CATEGORICAL_COLS: if c not in df.columns: continue counts = df[c].astype("object").value_counts(dropna=False) out[c] = { "n_unique": int(df[c].nunique(dropna=True)), "value_counts": {str(k): int(v) for k, v in counts.head(10).items()}, } return out def audit(df: pd.DataFrame) -> dict: id_dupes = int(df["ID"].duplicated().sum()) if "ID" in df.columns else None return { "n_rows": int(len(df)), "n_cols": int(df.shape[1]), "missingness_pct": _missingness(df), "duplicates": {"full_row": int(df.duplicated().sum()), "by_id": id_dupes}, "target": _target_stats(df), "coordinates": _coordinate_validity(df), "distance_km": _distance_stats(df), "categoricals": _categoricals(df), "leakage_note": ( "Columns " + ", ".join(c for c in LEAKAGE_CANDIDATES if c in df.columns) + " describe post-order pickup timing and are intentionally NOT used " "as model features to avoid target leakage." ), } def main() -> None: if not CLEAN_CSV.exists(): raise FileNotFoundError( f"{CLEAN_CSV} missing. Run 'python -m src.ml.prepare_data' first." ) df = pd.read_csv(CLEAN_CSV) report = audit(df) REPORT_PATH.parent.mkdir(parents=True, exist_ok=True) REPORT_PATH.write_text(json.dumps(report, indent=2, default=str)) print(f"[audit] rows={report['n_rows']:,} cols={report['n_cols']}") print(f"[audit] full-row duplicates: {report['duplicates']['full_row']}") if report["missingness_pct"]: print("[audit] columns with missing values (%):") for c, v in sorted(report["missingness_pct"].items(), key=lambda x: -x[1]): print(f" {c:<32} {v:.2f}") if report["target"]: t = report["target"] print( f"[audit] target {TARGET}: min={t['min']} max={t['max']} " f"mean={t['mean']} | IQR-outliers={t['n_outliers_iqr']} " f"({t['pct_outliers_iqr']}%)" ) dist = report["distance_km"] if dist: print( f"[audit] distance_km: median={dist['median']} max={dist['max']} " f"missing={dist['missing']} >50km={dist['n_gt_50km']}" ) print(f"[audit] wrote {REPORT_PATH}") if __name__ == "__main__": main()