Spaces:
Running
Running
| """Stage-3 WILD BENCH — "can this system clean real-world tables?" scoreboard. | |
| Every registered wild dataset (no gold required) gets a row built from the SHIPPED | |
| deterministic pipeline (the same `mock_plan` the Space runs): | |
| * BEHAVIORAL audit — what the product does on the raw table: changes applied, | |
| columns touched, review flags raised (abstentions), PII columns flagged, | |
| plan schema validity, silent-edit check (every diff cell attributable to a | |
| logged op), runtime per 1k rows. | |
| * INJECT-RECOVERY — seeded errors (typo/ocr/case/whitespace, eval/inject.py) | |
| injected into the table's OWN content, then cleaned and scored churn-neutral: | |
| in-domain robustness. (Caveat, disclosed: the raw table plays "clean", so | |
| pre-existing errors slightly deflate scores — uniform across systems.) | |
| Sources: training/unpaired_sources.json cache + data/wild/ extras (stage-3 hunts). | |
| uv run python -m eval.wild_bench # full scoreboard | |
| uv run python -m eval.wild_bench --only spotify # one dataset | |
| Writes eval/results/wild_bench.json and docs/WILD_BENCH.md. | |
| """ | |
| from __future__ import annotations | |
| import argparse | |
| import json | |
| import time | |
| from pathlib import Path | |
| import pandas as pd | |
| from scrubdata.executor import apply_plan | |
| from scrubdata.planner import mock_plan | |
| from .inject import inject | |
| from .metrics import is_valid | |
| from .run_real_multi import _cell_only, score | |
| ROOT = Path(__file__).resolve().parent.parent | |
| CACHE = ROOT / "data" / "real" / "cache" | |
| WILD = ROOT / "data" / "wild" | |
| INJECT_TYPES = ("typo", "ocr", "case", "whitespace") | |
| N_ROWS = 800 | |
| def registry() -> list[dict]: | |
| """All benchmark targets: cached portal tables + stage-3 wild extras.""" | |
| out = [] | |
| src = json.load(open(ROOT / "training" / "unpaired_sources.json")) | |
| for s in src: | |
| p = CACHE / f"{s['name']}.csv" | |
| if p.exists(): | |
| out.append({"name": s["name"], "domain": s["domain"], "path": p}) | |
| if WILD.exists(): | |
| manifest = WILD / "manifest.json" | |
| extras = json.load(open(manifest)) if manifest.exists() else [] | |
| for s in extras: | |
| p = WILD / f"{s['name']}.csv" | |
| if p.exists(): | |
| out.append({"name": s["name"], "domain": s.get("domain", "wild"), | |
| "path": p}) | |
| return out | |
| def _load(path: Path) -> pd.DataFrame: | |
| kw = dict(dtype=str, keep_default_na=False, nrows=N_ROWS, on_bad_lines="skip") | |
| try: | |
| df = pd.read_csv(path, encoding_errors="replace", **kw) | |
| except pd.errors.ParserError: # ragged quoting etc. — slow tolerant path | |
| df = pd.read_csv(path, engine="python", **kw) | |
| return df.loc[:, [c for c in df.columns if not c.startswith("Unnamed")]] | |
| def behavioral(df: pd.DataFrame) -> dict: | |
| t0 = time.perf_counter() | |
| plan = mock_plan(df) | |
| cleaned, log = apply_plan(df, plan) | |
| dt = time.perf_counter() - t0 | |
| flags = plan.get("flags", []) | |
| cells_changed = sum(e.get("cells_changed", 0) for e in log | |
| if isinstance(e.get("cells_changed"), int)) | |
| ops_logged = {e.get("op") for e in log} | |
| pii_flagged = sum(1 for e in log if e.get("op") == "flag_pii") + \ | |
| sum(1 for c in plan.get("columns", []) for o in c.get("operations", []) | |
| if o.get("op", "").endswith("_pii") and o.get("op") != "flag_pii") | |
| # silent-edit check: every changed CELL must be attributable to a logged op. | |
| # apply_plan resets the index after row drops, so attribute COLUMN ops on a | |
| # drop-free application (row drops are table-scope-logged separately). | |
| plan_cols_only = _cell_only(plan) | |
| cleaned2, log2 = apply_plan(df, plan_cols_only) | |
| changed_cols = {c for c in df.columns if c in cleaned2.columns | |
| and not df[c].equals(cleaned2[c])} | |
| logged_cols = {e.get("column") for e in log2 if e.get("column")} | |
| for op in plan_cols_only.get("table_operations", []): | |
| if op.get("op") == "resolve_by_majority": # logs table-scope w/ columns | |
| logged_cols.update(op.get("columns", [])) | |
| silent = sorted(changed_cols - logged_cols) | |
| return {"plan_valid": is_valid(plan), "ops": len(ops_logged), | |
| "cells_changed": cells_changed, "review_flags": len(flags), | |
| "pii_protected_or_flagged": pii_flagged, | |
| "silent_edit_columns": silent, "sec_per_1k_rows": round(dt / max(len(df), 1) * 1000, 2)} | |
| def inject_recovery(df: pd.DataFrame, seed: int = 7) -> dict: | |
| out = {} | |
| for et in INJECT_TYPES: | |
| dirty = inject(df, et, seed) | |
| if dirty is None: | |
| out[et] = None | |
| continue | |
| cleaned, _ = apply_plan(dirty, _cell_only(mock_plan(dirty))) | |
| m = score(dirty, df, cleaned) | |
| out[et] = round(m["f1"], 3) | |
| vals = [v for v in out.values() if v is not None] | |
| out["mean"] = round(sum(vals) / len(vals), 3) if vals else None | |
| return out | |
| def main() -> None: | |
| ap = argparse.ArgumentParser() | |
| ap.add_argument("--only", default=None) | |
| ap.add_argument("--out", default="eval/results/wild_bench.json") | |
| args = ap.parse_args() | |
| rows = [] | |
| for spec in registry(): | |
| if args.only and spec["name"] != args.only: | |
| continue | |
| try: | |
| df = _load(spec["path"]) | |
| except Exception as e: # noqa: BLE001 | |
| print(f" {spec['name']}: LOAD FAILED {type(e).__name__}") | |
| continue | |
| if df.empty or df.shape[1] < 2: | |
| continue | |
| b = behavioral(df) | |
| r = inject_recovery(df) | |
| row = {"name": spec["name"], "domain": spec["domain"], | |
| "rows": len(df), "cols": df.shape[1], **b, "inject": r} | |
| rows.append(row) | |
| print(f" {spec['name']:<18} {spec['domain']:<14} valid={b['plan_valid']} " | |
| f"chg={b['cells_changed']:<6} flags={b['review_flags']} " | |
| f"pii={b['pii_protected_or_flagged']} silent={len(b['silent_edit_columns'])} " | |
| f"| recover: {r['mean']}", flush=True) | |
| json.dump(rows, open(args.out, "w"), indent=1) | |
| _write_md(rows) | |
| n_silent = sum(1 for r in rows if r["silent_edit_columns"]) | |
| means = [r["inject"]["mean"] for r in rows if r["inject"]["mean"] is not None] | |
| print(f"\n{len(rows)} datasets | plan_valid {sum(r['plan_valid'] for r in rows)}/{len(rows)} " | |
| f"| silent-edit datasets: {n_silent} | mean inject-recovery: " | |
| f"{sum(means)/len(means):.3f}" if means else "no recovery rows") | |
| print(f"written to {args.out} and docs/WILD_BENCH.md") | |
| def _write_md(rows: list[dict]) -> None: | |
| L = ["# Wild Bench — can the shipped system clean real-world tables?", "", | |
| "Behavioral audit + seeded inject-recovery per dataset (eval/wild_bench.py).", | |
| "", "| dataset | domain | rows×cols | valid | changes | flags | PII | silent | typo | ocr | case | ws | mean |", | |
| "|---|---|---|---|---|---|---|---|---|---|---|---|---|"] | |
| for r in sorted(rows, key=lambda x: (x["inject"]["mean"] is not None, | |
| x["inject"]["mean"] or 0)): | |
| i = r["inject"] | |
| fmt = lambda v: "—" if v is None else f"{v:.2f}" | |
| L.append(f"| {r['name']} | {r['domain']} | {r['rows']}×{r['cols']} | " | |
| f"{'✓' if r['plan_valid'] else '✗'} | {r['cells_changed']} | " | |
| f"{r['review_flags']} | {r['pii_protected_or_flagged']} | " | |
| f"{len(r['silent_edit_columns'])} | {fmt(i['typo'])} | {fmt(i['ocr'])} | " | |
| f"{fmt(i['case'])} | {fmt(i['whitespace'])} | {fmt(i['mean'])} |") | |
| (ROOT / "docs" / "WILD_BENCH.md").write_text("\n".join(L) + "\n") | |
| if __name__ == "__main__": | |
| main() | |