kuechenpassagent / src /ml /audit_data.py
lederyou's picture
Upload folder using huggingface_hub
db662ea verified
Raw
History Blame Contribute Delete
6.53 kB
"""Audit the cleaned Food Delivery dataset and emit a data-quality report.
Read-only profiling that quantifies the things that matter for a cleanly
running model:
- missingness per column
- duplicate rows (full row + by ID)
- target distribution and IQR-based outlier bounds
- coordinate plausibility (share inside the expected geographic bounds)
- delivery distance distribution
- categorical cardinality and value counts
- a leakage note for time-of-pickup columns
The result is written to ``models/data_quality_report.json``. This script does
not modify the data; it only reads and reports.
Usage:
python -m src.ml.audit_data
"""
from __future__ import annotations
import json
import sys
from pathlib import Path
import numpy as np
import pandas as pd
sys.path.insert(0, str(Path(__file__).resolve().parents[2]))
from src.config import ( # noqa: E402
GEO_LAT_BOUNDS,
GEO_LON_BOUNDS,
MODELS_DIR,
PROCESSED_DIR,
)
from src.ml.feature_engineering import haversine_km # noqa: E402
CLEAN_CSV = PROCESSED_DIR / "food_delivery_clean.csv"
REPORT_PATH = MODELS_DIR / "data_quality_report.json"
TARGET = "Time_taken(min)"
COORD_COLS = [
"Restaurant_latitude",
"Restaurant_longitude",
"Delivery_location_latitude",
"Delivery_location_longitude",
]
CATEGORICAL_COLS = [
"Weatherconditions",
"Road_traffic_density",
"Type_of_order",
"Type_of_vehicle",
"Festival",
"City",
]
# Columns that describe the courier picking up the order; using them as features
# would leak post-order information into a preparation-time prediction.
LEAKAGE_CANDIDATES = ["Time_Order_picked"]
def _missingness(df: pd.DataFrame) -> dict:
miss = (df.isna().mean() * 100).round(2)
return {c: float(v) for c, v in miss.items() if v > 0}
def _target_stats(df: pd.DataFrame) -> dict:
if TARGET not in df.columns:
return {}
t = pd.to_numeric(df[TARGET], errors="coerce").dropna()
q1, q3 = t.quantile(0.25), t.quantile(0.75)
iqr = q3 - q1
lower, upper = q1 - 1.5 * iqr, q3 + 1.5 * iqr
n_outliers = int(((t < lower) | (t > upper)).sum())
return {
"count": int(t.count()),
"min": float(t.min()),
"max": float(t.max()),
"mean": round(float(t.mean()), 2),
"std": round(float(t.std()), 2),
"iqr_bounds": [round(float(lower), 2), round(float(upper), 2)],
"n_outliers_iqr": n_outliers,
"pct_outliers_iqr": round(100 * n_outliers / max(len(t), 1), 2),
}
def _coordinate_validity(df: pd.DataFrame) -> dict:
if not set(COORD_COLS).issubset(df.columns):
return {}
lat_cols = ["Restaurant_latitude", "Delivery_location_latitude"]
lon_cols = ["Restaurant_longitude", "Delivery_location_longitude"]
out: dict = {}
for c in lat_cols:
s = pd.to_numeric(df[c], errors="coerce")
in_range = s.between(*GEO_LAT_BOUNDS)
out[c] = {
"missing": int(s.isna().sum()),
"out_of_range": int((~in_range & s.notna()).sum()),
}
for c in lon_cols:
s = pd.to_numeric(df[c], errors="coerce")
in_range = s.between(*GEO_LON_BOUNDS)
out[c] = {
"missing": int(s.isna().sum()),
"out_of_range": int((~in_range & s.notna()).sum()),
}
return out
def _distance_stats(df: pd.DataFrame) -> dict:
if not set(COORD_COLS).issubset(df.columns):
return {}
d = haversine_km(
df["Restaurant_latitude"],
df["Restaurant_longitude"],
df["Delivery_location_latitude"],
df["Delivery_location_longitude"],
)
valid = d.dropna()
return {
"count": int(valid.count()),
"missing": int(d.isna().sum()),
"min": round(float(valid.min()), 2) if len(valid) else None,
"median": round(float(valid.median()), 2) if len(valid) else None,
"max": round(float(valid.max()), 2) if len(valid) else None,
"n_gt_50km": int((valid > 50).sum()),
"n_near_zero": int((valid < 0.1).sum()),
}
def _categoricals(df: pd.DataFrame) -> dict:
out: dict = {}
for c in CATEGORICAL_COLS:
if c not in df.columns:
continue
counts = df[c].astype("object").value_counts(dropna=False)
out[c] = {
"n_unique": int(df[c].nunique(dropna=True)),
"value_counts": {str(k): int(v) for k, v in counts.head(10).items()},
}
return out
def audit(df: pd.DataFrame) -> dict:
id_dupes = int(df["ID"].duplicated().sum()) if "ID" in df.columns else None
return {
"n_rows": int(len(df)),
"n_cols": int(df.shape[1]),
"missingness_pct": _missingness(df),
"duplicates": {"full_row": int(df.duplicated().sum()), "by_id": id_dupes},
"target": _target_stats(df),
"coordinates": _coordinate_validity(df),
"distance_km": _distance_stats(df),
"categoricals": _categoricals(df),
"leakage_note": (
"Columns "
+ ", ".join(c for c in LEAKAGE_CANDIDATES if c in df.columns)
+ " describe post-order pickup timing and are intentionally NOT used "
"as model features to avoid target leakage."
),
}
def main() -> None:
if not CLEAN_CSV.exists():
raise FileNotFoundError(
f"{CLEAN_CSV} missing. Run 'python -m src.ml.prepare_data' first."
)
df = pd.read_csv(CLEAN_CSV)
report = audit(df)
REPORT_PATH.parent.mkdir(parents=True, exist_ok=True)
REPORT_PATH.write_text(json.dumps(report, indent=2, default=str))
print(f"[audit] rows={report['n_rows']:,} cols={report['n_cols']}")
print(f"[audit] full-row duplicates: {report['duplicates']['full_row']}")
if report["missingness_pct"]:
print("[audit] columns with missing values (%):")
for c, v in sorted(report["missingness_pct"].items(), key=lambda x: -x[1]):
print(f" {c:<32} {v:.2f}")
if report["target"]:
t = report["target"]
print(
f"[audit] target {TARGET}: min={t['min']} max={t['max']} "
f"mean={t['mean']} | IQR-outliers={t['n_outliers_iqr']} "
f"({t['pct_outliers_iqr']}%)"
)
dist = report["distance_km"]
if dist:
print(
f"[audit] distance_km: median={dist['median']} max={dist['max']} "
f"missing={dist['missing']} >50km={dist['n_gt_50km']}"
)
print(f"[audit] wrote {REPORT_PATH}")
if __name__ == "__main__":
main()