scrubdata / eval /wild_bench.py
OpenAI Codex
deploy: add sponsor:openai tag (Best Use of Codex) + Codex-hardened build
16dc556
Raw
History Blame Contribute Delete
7.66 kB
"""Stage-3 WILD BENCH — "can this system clean real-world tables?" scoreboard.
Every registered wild dataset (no gold required) gets a row built from the SHIPPED
deterministic pipeline (the same `mock_plan` the Space runs):
* BEHAVIORAL audit — what the product does on the raw table: changes applied,
columns touched, review flags raised (abstentions), PII columns flagged,
plan schema validity, silent-edit check (every diff cell attributable to a
logged op), runtime per 1k rows.
* INJECT-RECOVERY — seeded errors (typo/ocr/case/whitespace, eval/inject.py)
injected into the table's OWN content, then cleaned and scored churn-neutral:
in-domain robustness. (Caveat, disclosed: the raw table plays "clean", so
pre-existing errors slightly deflate scores — uniform across systems.)
Sources: training/unpaired_sources.json cache + data/wild/ extras (stage-3 hunts).
uv run python -m eval.wild_bench # full scoreboard
uv run python -m eval.wild_bench --only spotify # one dataset
Writes eval/results/wild_bench.json and docs/WILD_BENCH.md.
"""
from __future__ import annotations
import argparse
import json
import time
from pathlib import Path
import pandas as pd
from scrubdata.executor import apply_plan
from scrubdata.planner import mock_plan
from .inject import inject
from .metrics import is_valid
from .run_real_multi import _cell_only, score
ROOT = Path(__file__).resolve().parent.parent
CACHE = ROOT / "data" / "real" / "cache"
WILD = ROOT / "data" / "wild"
INJECT_TYPES = ("typo", "ocr", "case", "whitespace")
N_ROWS = 800
def registry() -> list[dict]:
"""All benchmark targets: cached portal tables + stage-3 wild extras."""
out = []
src = json.load(open(ROOT / "training" / "unpaired_sources.json"))
for s in src:
p = CACHE / f"{s['name']}.csv"
if p.exists():
out.append({"name": s["name"], "domain": s["domain"], "path": p})
if WILD.exists():
manifest = WILD / "manifest.json"
extras = json.load(open(manifest)) if manifest.exists() else []
for s in extras:
p = WILD / f"{s['name']}.csv"
if p.exists():
out.append({"name": s["name"], "domain": s.get("domain", "wild"),
"path": p})
return out
def _load(path: Path) -> pd.DataFrame:
kw = dict(dtype=str, keep_default_na=False, nrows=N_ROWS, on_bad_lines="skip")
try:
df = pd.read_csv(path, encoding_errors="replace", **kw)
except pd.errors.ParserError: # ragged quoting etc. — slow tolerant path
df = pd.read_csv(path, engine="python", **kw)
return df.loc[:, [c for c in df.columns if not c.startswith("Unnamed")]]
def behavioral(df: pd.DataFrame) -> dict:
t0 = time.perf_counter()
plan = mock_plan(df)
cleaned, log = apply_plan(df, plan)
dt = time.perf_counter() - t0
flags = plan.get("flags", [])
cells_changed = sum(e.get("cells_changed", 0) for e in log
if isinstance(e.get("cells_changed"), int))
ops_logged = {e.get("op") for e in log}
pii_flagged = sum(1 for e in log if e.get("op") == "flag_pii") + \
sum(1 for c in plan.get("columns", []) for o in c.get("operations", [])
if o.get("op", "").endswith("_pii") and o.get("op") != "flag_pii")
# silent-edit check: every changed CELL must be attributable to a logged op.
# apply_plan resets the index after row drops, so attribute COLUMN ops on a
# drop-free application (row drops are table-scope-logged separately).
plan_cols_only = _cell_only(plan)
cleaned2, log2 = apply_plan(df, plan_cols_only)
changed_cols = {c for c in df.columns if c in cleaned2.columns
and not df[c].equals(cleaned2[c])}
logged_cols = {e.get("column") for e in log2 if e.get("column")}
for op in plan_cols_only.get("table_operations", []):
if op.get("op") == "resolve_by_majority": # logs table-scope w/ columns
logged_cols.update(op.get("columns", []))
silent = sorted(changed_cols - logged_cols)
return {"plan_valid": is_valid(plan), "ops": len(ops_logged),
"cells_changed": cells_changed, "review_flags": len(flags),
"pii_protected_or_flagged": pii_flagged,
"silent_edit_columns": silent, "sec_per_1k_rows": round(dt / max(len(df), 1) * 1000, 2)}
def inject_recovery(df: pd.DataFrame, seed: int = 7) -> dict:
out = {}
for et in INJECT_TYPES:
dirty = inject(df, et, seed)
if dirty is None:
out[et] = None
continue
cleaned, _ = apply_plan(dirty, _cell_only(mock_plan(dirty)))
m = score(dirty, df, cleaned)
out[et] = round(m["f1"], 3)
vals = [v for v in out.values() if v is not None]
out["mean"] = round(sum(vals) / len(vals), 3) if vals else None
return out
def main() -> None:
ap = argparse.ArgumentParser()
ap.add_argument("--only", default=None)
ap.add_argument("--out", default="eval/results/wild_bench.json")
args = ap.parse_args()
rows = []
for spec in registry():
if args.only and spec["name"] != args.only:
continue
try:
df = _load(spec["path"])
except Exception as e: # noqa: BLE001
print(f" {spec['name']}: LOAD FAILED {type(e).__name__}")
continue
if df.empty or df.shape[1] < 2:
continue
b = behavioral(df)
r = inject_recovery(df)
row = {"name": spec["name"], "domain": spec["domain"],
"rows": len(df), "cols": df.shape[1], **b, "inject": r}
rows.append(row)
print(f" {spec['name']:<18} {spec['domain']:<14} valid={b['plan_valid']} "
f"chg={b['cells_changed']:<6} flags={b['review_flags']} "
f"pii={b['pii_protected_or_flagged']} silent={len(b['silent_edit_columns'])} "
f"| recover: {r['mean']}", flush=True)
json.dump(rows, open(args.out, "w"), indent=1)
_write_md(rows)
n_silent = sum(1 for r in rows if r["silent_edit_columns"])
means = [r["inject"]["mean"] for r in rows if r["inject"]["mean"] is not None]
print(f"\n{len(rows)} datasets | plan_valid {sum(r['plan_valid'] for r in rows)}/{len(rows)} "
f"| silent-edit datasets: {n_silent} | mean inject-recovery: "
f"{sum(means)/len(means):.3f}" if means else "no recovery rows")
print(f"written to {args.out} and docs/WILD_BENCH.md")
def _write_md(rows: list[dict]) -> None:
L = ["# Wild Bench — can the shipped system clean real-world tables?", "",
"Behavioral audit + seeded inject-recovery per dataset (eval/wild_bench.py).",
"", "| dataset | domain | rows×cols | valid | changes | flags | PII | silent | typo | ocr | case | ws | mean |",
"|---|---|---|---|---|---|---|---|---|---|---|---|---|"]
for r in sorted(rows, key=lambda x: (x["inject"]["mean"] is not None,
x["inject"]["mean"] or 0)):
i = r["inject"]
fmt = lambda v: "—" if v is None else f"{v:.2f}"
L.append(f"| {r['name']} | {r['domain']} | {r['rows']}×{r['cols']} | "
f"{'✓' if r['plan_valid'] else '✗'} | {r['cells_changed']} | "
f"{r['review_flags']} | {r['pii_protected_or_flagged']} | "
f"{len(r['silent_edit_columns'])} | {fmt(i['typo'])} | {fmt(i['ocr'])} | "
f"{fmt(i['case'])} | {fmt(i['whitespace'])} | {fmt(i['mean'])} |")
(ROOT / "docs" / "WILD_BENCH.md").write_text("\n".join(L) + "\n")
if __name__ == "__main__":
main()