Spaces:
Running
Running
| """Improved real-data north-star — fixes the biases of single-dataset `repair_recall`. | |
| repair_recall on hospital alone is biased: (1) ONE dataset (overfit to its quirks), | |
| (2) RECALL-ONLY so it rewards over-correction (v5: recall .42 / precision .16), | |
| (3) CONVENTION-sensitive (penalizes case/format differences that are semantically right), | |
| (4) ABSTAIN-blind (punishes correctly declining on uncertain values). | |
| This harness fixes all four: | |
| * MULTI-DATASET (Raha suite) -> macro-average, not one table. | |
| * F1 (recall AND precision) + DAMAGE rate (clean cells we corrupted) -> over-correction | |
| is penalized, not rewarded. | |
| * CONVENTION-NORMALIZED correctness (case/whitespace-insensitive) -> measures the right | |
| VALUE, not surface convention. | |
| * an ADVERSARIAL ABSTAIN slice -> the system must fix real typos but LEAVE trap values | |
| (ambiguous / not-a-typo) untouched. Abstaining correctly scores; wrong-merging doesn't. | |
| uv run python -m eval.run_real_multi | |
| """ | |
| from __future__ import annotations | |
| import random | |
| import pandas as pd | |
| from scrubdata.executor import apply_plan | |
| from scrubdata.planner import mock_plan | |
| from .metrics import _cell_equal | |
| DATASETS = ["hospital", "beers", "flights", "rayyan"] | |
| RAW = "https://raw.githubusercontent.com/BigDaMa/raha/master/datasets" | |
| def _cell_only(plan: dict) -> dict: | |
| """Strip row-count-changing table ops (dedup / drop-empty-rows) so the cleaned output | |
| stays row-aligned with the reference — we are measuring CELL-level cleaning here.""" | |
| p = dict(plan) | |
| p["table_operations"] = [o for o in plan.get("table_operations", []) | |
| if o.get("op") not in ("drop_exact_duplicates", "drop_empty_rows")] | |
| return p | |
| def _sem_equal(a, b) -> bool: | |
| """Semantic equality: numeric-tolerant (via _cell_equal) OR case/whitespace-folded. | |
| Separates the right VALUE from surface convention (Birmingham == birmingham).""" | |
| if _cell_equal(a, b): | |
| return True | |
| return str(a).strip().lower() == str(b).strip().lower() | |
| def _fetch(name: str): | |
| import urllib.request | |
| from pathlib import Path | |
| base = Path(__file__).resolve().parent.parent / "data" / "real" / name | |
| base.mkdir(parents=True, exist_ok=True) | |
| out = [] | |
| for fn in ("dirty.csv", "clean.csv"): | |
| p = base / fn | |
| if not p.exists(): | |
| urllib.request.urlretrieve(f"{RAW}/{name}/{fn}", p) | |
| out.append(pd.read_csv(p, dtype=str, keep_default_na=False)) | |
| return out | |
| def score(dirty: pd.DataFrame, clean: pd.DataFrame, out: pd.DataFrame) -> dict: | |
| """Precision/recall/F1 (convention-normalized) + damage rate. | |
| Churn-neutral: a rewrite that is sem-equal to the INPUT but does not restore the | |
| gold (pure case/whitespace churn) counts as NOTHING — not a change, not a fix, not | |
| damage. Otherwise bulk convention-rewrites of clean columns inflate precision (the | |
| '-case match' ablation artifact). Fixing an error also requires actually ACTING: | |
| a case-injected error left untouched is sem-equal to gold but is NOT a fix.""" | |
| n = min(len(dirty), len(out), len(clean)) | |
| errors = fixed = changed = good_changes = clean_cells = damage = errors_abstained = 0 | |
| for j, col in enumerate(dirty.columns): | |
| present = col in out.columns | |
| for i in range(n): | |
| dv, cv = dirty.iat[i, j], clean.iat[i, j] | |
| ov = out.iloc[i][col] if present else dv | |
| err = not _cell_equal(dv, cv) # benchmark error (raw) | |
| chg = present and not _cell_equal(ov, dv) # we changed it | |
| raw_ok = present and _cell_equal(ov, cv) # exactly restored gold | |
| sem_ok = _sem_equal(ov, cv) # right value (convention-tolerant) | |
| if chg and _sem_equal(ov, dv) and not raw_ok: | |
| chg = False # churn: ignore entirely | |
| if err: | |
| errors += 1 | |
| if raw_ok or (sem_ok and chg): | |
| fixed += 1 # real restoration or semantic fix | |
| elif not chg: | |
| errors_abstained += 1 # left an error untouched | |
| else: | |
| clean_cells += 1 | |
| if chg and not sem_ok: | |
| damage += 1 # corrupted a clean cell | |
| if chg: | |
| changed += 1 | |
| if sem_ok: | |
| good_changes += 1 | |
| recall = fixed / errors if errors else 0.0 | |
| precision = good_changes / changed if changed else 1.0 | |
| f1 = 2 * recall * precision / (recall + precision) if (recall + precision) else 0.0 | |
| return {"f1": f1, "recall": recall, "precision": precision, | |
| "damage": damage / clean_cells if clean_cells else 0.0, | |
| "_errors": errors, "_changed": changed, "_fixed": fixed} | |
| def abstain_slice(planner, seed: int = 3) -> dict: | |
| """Adversarial: a city column with real typos (must fix) + TRAP values (must leave). | |
| Tests that the system abstains rather than wrong-merges.""" | |
| rng = random.Random(seed) | |
| canon = ["Chicago", "Boston", "Houston", "Phoenix", "Dallas"] | |
| col, gold, kind = [], [], [] | |
| for c in canon: # clean anchors | |
| for _ in range(8): | |
| col.append(c); gold.append(c); kind.append("clean") | |
| typos = {"Chcago": "Chicago", "Bostton": "Boston", "Houston": "Houston"} | |
| for t, g in typos.items(): # real typos -> must FIX | |
| col.append(t); gold.append(g); kind.append("typo") | |
| # traps must NOT be single-edit variants of any reference entity (else mapping them | |
| # is arguably CORRECT and the trap mis-scores grounding): garbage strings + one real | |
| # rare city (must stay; catches freq-cluster over-merge into the dominant values). | |
| traps = ["Xqzzyville", "Qwortelby", "Zzanthor Flats", "Carmel"] | |
| for t in traps: | |
| col.append(t); gold.append(t); kind.append("trap") | |
| idx = list(range(len(col))); rng.shuffle(idx) | |
| col = [col[i] for i in idx]; gold = [gold[i] for i in idx]; kind = [kind[i] for i in idx] | |
| df = pd.DataFrame({"city": col}) | |
| cleaned, _ = apply_plan(df, _cell_only(planner(df))) | |
| out = cleaned["city"].tolist() if "city" in cleaned.columns else col | |
| typo_fixed = sum(1 for o, g, k in zip(out, gold, kind) if k == "typo" and _sem_equal(o, g)) | |
| trap_left = sum(1 for o, g, k in zip(out, gold, kind) if k == "trap" and _sem_equal(o, g)) | |
| n_typo = kind.count("typo"); n_trap = kind.count("trap") | |
| return {"typo_recall": typo_fixed / n_typo, "abstain_accuracy": trap_left / n_trap, | |
| "_typos": n_typo, "_traps": n_trap} | |
| def _mean(xs): | |
| xs = list(xs) | |
| return sum(xs) / len(xs) if xs else 0.0 | |
| # ---- the validation SUITE: Raha real-error pairs + injected harvested domains ---- | |
| RAHA = [("hospital", "health"), ("beers", "beverages"), ("flights", "travel"), | |
| ("rayyan", "citations"), ("movies_1", "entertainment")] | |
| # diverse subset of the 20+ harvested clean domains (cached locally) | |
| SUITE_DOMAINS = ["restaurants", "business", "jobs", "complaints", "film", "transport", | |
| "education", "music", "contractors", "alcohol-bars", "vehicles", | |
| "sf-business", "la-business", "real-estate", "food-inspections"] | |
| INJECT = {"typo": "canonicalization", "ocr": "canonicalization", | |
| "case": "format", "whitespace": "format"} | |
| def _raha_pair(name): | |
| dirty, clean = _fetch(name) | |
| if len(dirty) > 2200: # movies_1: subsample for speed | |
| dirty, clean = dirty.head(2000).reset_index(drop=True), clean.head(2000).reset_index(drop=True) | |
| return dirty, clean | |
| def _injected_pair(path, error_type, seed): | |
| import pandas as _pd | |
| from .inject import inject | |
| clean = _pd.read_csv(path, dtype=str, keep_default_na=False, nrows=600, on_bad_lines="skip") | |
| dirty = inject(clean, error_type, seed) | |
| return (dirty, clean) if dirty is not None else None | |
| def build_suite(seed: int = 7): | |
| """List of specs: {name, domain, error_type, source, load() -> (dirty, clean) | None}. | |
| source 'real' = curated real-error benchmarks; 'injected' = seeded synthetic errors on | |
| harvested clean domains. Report both slices — injected typos are in-distribution for | |
| frequency clustering (canonical always present+dominant by construction), so the | |
| grounding claim lives in the REAL slice.""" | |
| from pathlib import Path | |
| specs = [{"name": n, "domain": d, "error_type": "mixed", "source": "real", | |
| "load": (lambda n=n: _raha_pair(n))} for n, d in RAHA] | |
| import json | |
| src = {s["name"]: s["domain"] for s in | |
| json.load(open("training/unpaired_sources.json"))} | |
| cache = Path("data/real/cache") | |
| for fname, domain in src.items(): | |
| if domain not in SUITE_DOMAINS: | |
| continue | |
| p = cache / f"{fname}.csv" | |
| if not p.exists(): | |
| continue | |
| for et, group in INJECT.items(): | |
| specs.append({"name": f"{domain}:{et}", "domain": domain, "error_type": group, | |
| "source": "injected", | |
| "load": (lambda p=p, et=et: _injected_pair(p, et, seed))}) | |
| return specs | |
| def evaluate_suite(planner, seed: int = 7) -> dict: | |
| """Run a planner over the whole suite at one injection seed; return the double-macro.""" | |
| import collections | |
| rows = [] | |
| for spec in build_suite(seed=seed): | |
| try: | |
| loaded = spec["load"]() | |
| except Exception: # noqa: BLE001 | |
| continue | |
| if loaded is None: | |
| continue | |
| dirty, clean = loaded | |
| cleaned, _ = apply_plan(dirty, _cell_only(planner(dirty))) | |
| m = score(dirty, clean, cleaned) | |
| rows.append({**spec, "f1": m["f1"], "damage": m["damage"]}) | |
| by_et = collections.defaultdict(list) | |
| by_dom = collections.defaultdict(list) | |
| by_src = collections.defaultdict(list) | |
| for r in rows: | |
| by_et[r["error_type"]].append(r["f1"]) | |
| by_dom[r["domain"]].append(r["f1"]) | |
| by_src[r.get("source", "injected")].append(r["f1"]) | |
| et_macro = _mean(_mean(v) for v in by_et.values()) | |
| dom_macro = _mean(_mean(v) for v in by_dom.values()) | |
| north = (2 * et_macro * dom_macro / (et_macro + dom_macro)) if (et_macro + dom_macro) else 0.0 | |
| ab = abstain_slice(planner) | |
| return {"north": north, "et_macro": et_macro, "dom_macro": dom_macro, | |
| "real": _mean(by_src.get("real", [])), "injected": _mean(by_src.get("injected", [])), | |
| "damage": _mean(r["damage"] for r in rows), "abstain": ab["abstain_accuracy"], | |
| "n": len(rows), "by_et": {k: _mean(v) for k, v in by_et.items()}} | |
| def main(seeds=(7, 17, 27), out: str | None = None) -> None: | |
| from scrubdata.baselines import openrefine_fingerprint_plan, openrefine_knn_plan | |
| systems = [("grounded (ours)", mock_plan), | |
| ("OpenRefine fingerprint", openrefine_fingerprint_plan), | |
| ("OpenRefine kNN", openrefine_knn_plan), | |
| ("no-op", lambda df: {"table_operations": [], "columns": [], "flags": []})] | |
| print("\n=== Cleaning north-star — WIDE validation suite (Raha + injected harvested), " | |
| f"{len(seeds)} seeds ===\n") | |
| print(f"{'system':<24}{'NORTH*':>9}{'±95%CI':>9}{'REAL-F1':>9}{'INJ-F1':>8}" | |
| f"{'damage':>8}{'abstain':>9}") | |
| print("-" * 76) | |
| table = [] | |
| for name, planner in systems: | |
| norths, results = [], [] | |
| for s in seeds: | |
| r = evaluate_suite(planner, seed=s) | |
| norths.append(r["north"]); results.append(r) | |
| mean_n = _mean(norths) | |
| # 95% CI ~ 1.96 * std / sqrt(n) | |
| var = _mean([(x - mean_n) ** 2 for x in norths]) | |
| ci = 1.96 * (var ** 0.5) / (len(norths) ** 0.5) | |
| last = results[-1] | |
| row = {"system": name, "north": mean_n, "north_ci": ci, | |
| "real_f1": _mean(r["real"] for r in results), | |
| "real_f1_per_seed": [r["real"] for r in results], | |
| "inj_f1": _mean(r["injected"] for r in results), | |
| "damage": _mean(r["damage"] for r in results), | |
| "abstain": _mean(r["abstain"] for r in results), | |
| "n_datasets": last["n"], "seeds": list(seeds)} | |
| table.append(row) | |
| print(f"{name:<24}{mean_n:>9.3f}{ci:>9.3f}{row['real_f1']:>9.3f}" | |
| f"{row['inj_f1']:>8.3f}{row['damage']:>8.3f}{row['abstain']:>9.3f}", flush=True) | |
| if out: | |
| import json | |
| json.dump(table, open(out, "w"), indent=1) | |
| print(f"rows written to {out}") | |
| print(f"\nNORTH* = harmonic mean(error-type macro, domain macro) over {last['n']} datasets. " | |
| "Double-macro + damage + abstain = un-gameable; hospital is 1 dataset of many. " | |
| "The money result: grounded vs the tool people actually use (OpenRefine).") | |
| if __name__ == "__main__": | |
| import argparse | |
| ap = argparse.ArgumentParser() | |
| ap.add_argument("--out", type=str, default=None, help="write table rows to JSON") | |
| main(out=ap.parse_args().out) | |