scrubdata / eval /run_real_multi.py
OpenAI Codex
deploy: add sponsor:openai tag (Best Use of Codex) + Codex-hardened build
16dc556
Raw
History Blame Contribute Delete
13.1 kB
"""Improved real-data north-star — fixes the biases of single-dataset `repair_recall`.
repair_recall on hospital alone is biased: (1) ONE dataset (overfit to its quirks),
(2) RECALL-ONLY so it rewards over-correction (v5: recall .42 / precision .16),
(3) CONVENTION-sensitive (penalizes case/format differences that are semantically right),
(4) ABSTAIN-blind (punishes correctly declining on uncertain values).
This harness fixes all four:
* MULTI-DATASET (Raha suite) -> macro-average, not one table.
* F1 (recall AND precision) + DAMAGE rate (clean cells we corrupted) -> over-correction
is penalized, not rewarded.
* CONVENTION-NORMALIZED correctness (case/whitespace-insensitive) -> measures the right
VALUE, not surface convention.
* an ADVERSARIAL ABSTAIN slice -> the system must fix real typos but LEAVE trap values
(ambiguous / not-a-typo) untouched. Abstaining correctly scores; wrong-merging doesn't.
uv run python -m eval.run_real_multi
"""
from __future__ import annotations
import random
import pandas as pd
from scrubdata.executor import apply_plan
from scrubdata.planner import mock_plan
from .metrics import _cell_equal
DATASETS = ["hospital", "beers", "flights", "rayyan"]
RAW = "https://raw.githubusercontent.com/BigDaMa/raha/master/datasets"
def _cell_only(plan: dict) -> dict:
"""Strip row-count-changing table ops (dedup / drop-empty-rows) so the cleaned output
stays row-aligned with the reference — we are measuring CELL-level cleaning here."""
p = dict(plan)
p["table_operations"] = [o for o in plan.get("table_operations", [])
if o.get("op") not in ("drop_exact_duplicates", "drop_empty_rows")]
return p
def _sem_equal(a, b) -> bool:
"""Semantic equality: numeric-tolerant (via _cell_equal) OR case/whitespace-folded.
Separates the right VALUE from surface convention (Birmingham == birmingham)."""
if _cell_equal(a, b):
return True
return str(a).strip().lower() == str(b).strip().lower()
def _fetch(name: str):
import urllib.request
from pathlib import Path
base = Path(__file__).resolve().parent.parent / "data" / "real" / name
base.mkdir(parents=True, exist_ok=True)
out = []
for fn in ("dirty.csv", "clean.csv"):
p = base / fn
if not p.exists():
urllib.request.urlretrieve(f"{RAW}/{name}/{fn}", p)
out.append(pd.read_csv(p, dtype=str, keep_default_na=False))
return out
def score(dirty: pd.DataFrame, clean: pd.DataFrame, out: pd.DataFrame) -> dict:
"""Precision/recall/F1 (convention-normalized) + damage rate.
Churn-neutral: a rewrite that is sem-equal to the INPUT but does not restore the
gold (pure case/whitespace churn) counts as NOTHING — not a change, not a fix, not
damage. Otherwise bulk convention-rewrites of clean columns inflate precision (the
'-case match' ablation artifact). Fixing an error also requires actually ACTING:
a case-injected error left untouched is sem-equal to gold but is NOT a fix."""
n = min(len(dirty), len(out), len(clean))
errors = fixed = changed = good_changes = clean_cells = damage = errors_abstained = 0
for j, col in enumerate(dirty.columns):
present = col in out.columns
for i in range(n):
dv, cv = dirty.iat[i, j], clean.iat[i, j]
ov = out.iloc[i][col] if present else dv
err = not _cell_equal(dv, cv) # benchmark error (raw)
chg = present and not _cell_equal(ov, dv) # we changed it
raw_ok = present and _cell_equal(ov, cv) # exactly restored gold
sem_ok = _sem_equal(ov, cv) # right value (convention-tolerant)
if chg and _sem_equal(ov, dv) and not raw_ok:
chg = False # churn: ignore entirely
if err:
errors += 1
if raw_ok or (sem_ok and chg):
fixed += 1 # real restoration or semantic fix
elif not chg:
errors_abstained += 1 # left an error untouched
else:
clean_cells += 1
if chg and not sem_ok:
damage += 1 # corrupted a clean cell
if chg:
changed += 1
if sem_ok:
good_changes += 1
recall = fixed / errors if errors else 0.0
precision = good_changes / changed if changed else 1.0
f1 = 2 * recall * precision / (recall + precision) if (recall + precision) else 0.0
return {"f1": f1, "recall": recall, "precision": precision,
"damage": damage / clean_cells if clean_cells else 0.0,
"_errors": errors, "_changed": changed, "_fixed": fixed}
def abstain_slice(planner, seed: int = 3) -> dict:
"""Adversarial: a city column with real typos (must fix) + TRAP values (must leave).
Tests that the system abstains rather than wrong-merges."""
rng = random.Random(seed)
canon = ["Chicago", "Boston", "Houston", "Phoenix", "Dallas"]
col, gold, kind = [], [], []
for c in canon: # clean anchors
for _ in range(8):
col.append(c); gold.append(c); kind.append("clean")
typos = {"Chcago": "Chicago", "Bostton": "Boston", "Houston": "Houston"}
for t, g in typos.items(): # real typos -> must FIX
col.append(t); gold.append(g); kind.append("typo")
# traps must NOT be single-edit variants of any reference entity (else mapping them
# is arguably CORRECT and the trap mis-scores grounding): garbage strings + one real
# rare city (must stay; catches freq-cluster over-merge into the dominant values).
traps = ["Xqzzyville", "Qwortelby", "Zzanthor Flats", "Carmel"]
for t in traps:
col.append(t); gold.append(t); kind.append("trap")
idx = list(range(len(col))); rng.shuffle(idx)
col = [col[i] for i in idx]; gold = [gold[i] for i in idx]; kind = [kind[i] for i in idx]
df = pd.DataFrame({"city": col})
cleaned, _ = apply_plan(df, _cell_only(planner(df)))
out = cleaned["city"].tolist() if "city" in cleaned.columns else col
typo_fixed = sum(1 for o, g, k in zip(out, gold, kind) if k == "typo" and _sem_equal(o, g))
trap_left = sum(1 for o, g, k in zip(out, gold, kind) if k == "trap" and _sem_equal(o, g))
n_typo = kind.count("typo"); n_trap = kind.count("trap")
return {"typo_recall": typo_fixed / n_typo, "abstain_accuracy": trap_left / n_trap,
"_typos": n_typo, "_traps": n_trap}
def _mean(xs):
xs = list(xs)
return sum(xs) / len(xs) if xs else 0.0
# ---- the validation SUITE: Raha real-error pairs + injected harvested domains ----
RAHA = [("hospital", "health"), ("beers", "beverages"), ("flights", "travel"),
("rayyan", "citations"), ("movies_1", "entertainment")]
# diverse subset of the 20+ harvested clean domains (cached locally)
SUITE_DOMAINS = ["restaurants", "business", "jobs", "complaints", "film", "transport",
"education", "music", "contractors", "alcohol-bars", "vehicles",
"sf-business", "la-business", "real-estate", "food-inspections"]
INJECT = {"typo": "canonicalization", "ocr": "canonicalization",
"case": "format", "whitespace": "format"}
def _raha_pair(name):
dirty, clean = _fetch(name)
if len(dirty) > 2200: # movies_1: subsample for speed
dirty, clean = dirty.head(2000).reset_index(drop=True), clean.head(2000).reset_index(drop=True)
return dirty, clean
def _injected_pair(path, error_type, seed):
import pandas as _pd
from .inject import inject
clean = _pd.read_csv(path, dtype=str, keep_default_na=False, nrows=600, on_bad_lines="skip")
dirty = inject(clean, error_type, seed)
return (dirty, clean) if dirty is not None else None
def build_suite(seed: int = 7):
"""List of specs: {name, domain, error_type, source, load() -> (dirty, clean) | None}.
source 'real' = curated real-error benchmarks; 'injected' = seeded synthetic errors on
harvested clean domains. Report both slices — injected typos are in-distribution for
frequency clustering (canonical always present+dominant by construction), so the
grounding claim lives in the REAL slice."""
from pathlib import Path
specs = [{"name": n, "domain": d, "error_type": "mixed", "source": "real",
"load": (lambda n=n: _raha_pair(n))} for n, d in RAHA]
import json
src = {s["name"]: s["domain"] for s in
json.load(open("training/unpaired_sources.json"))}
cache = Path("data/real/cache")
for fname, domain in src.items():
if domain not in SUITE_DOMAINS:
continue
p = cache / f"{fname}.csv"
if not p.exists():
continue
for et, group in INJECT.items():
specs.append({"name": f"{domain}:{et}", "domain": domain, "error_type": group,
"source": "injected",
"load": (lambda p=p, et=et: _injected_pair(p, et, seed))})
return specs
def evaluate_suite(planner, seed: int = 7) -> dict:
"""Run a planner over the whole suite at one injection seed; return the double-macro."""
import collections
rows = []
for spec in build_suite(seed=seed):
try:
loaded = spec["load"]()
except Exception: # noqa: BLE001
continue
if loaded is None:
continue
dirty, clean = loaded
cleaned, _ = apply_plan(dirty, _cell_only(planner(dirty)))
m = score(dirty, clean, cleaned)
rows.append({**spec, "f1": m["f1"], "damage": m["damage"]})
by_et = collections.defaultdict(list)
by_dom = collections.defaultdict(list)
by_src = collections.defaultdict(list)
for r in rows:
by_et[r["error_type"]].append(r["f1"])
by_dom[r["domain"]].append(r["f1"])
by_src[r.get("source", "injected")].append(r["f1"])
et_macro = _mean(_mean(v) for v in by_et.values())
dom_macro = _mean(_mean(v) for v in by_dom.values())
north = (2 * et_macro * dom_macro / (et_macro + dom_macro)) if (et_macro + dom_macro) else 0.0
ab = abstain_slice(planner)
return {"north": north, "et_macro": et_macro, "dom_macro": dom_macro,
"real": _mean(by_src.get("real", [])), "injected": _mean(by_src.get("injected", [])),
"damage": _mean(r["damage"] for r in rows), "abstain": ab["abstain_accuracy"],
"n": len(rows), "by_et": {k: _mean(v) for k, v in by_et.items()}}
def main(seeds=(7, 17, 27), out: str | None = None) -> None:
from scrubdata.baselines import openrefine_fingerprint_plan, openrefine_knn_plan
systems = [("grounded (ours)", mock_plan),
("OpenRefine fingerprint", openrefine_fingerprint_plan),
("OpenRefine kNN", openrefine_knn_plan),
("no-op", lambda df: {"table_operations": [], "columns": [], "flags": []})]
print("\n=== Cleaning north-star — WIDE validation suite (Raha + injected harvested), "
f"{len(seeds)} seeds ===\n")
print(f"{'system':<24}{'NORTH*':>9}{'±95%CI':>9}{'REAL-F1':>9}{'INJ-F1':>8}"
f"{'damage':>8}{'abstain':>9}")
print("-" * 76)
table = []
for name, planner in systems:
norths, results = [], []
for s in seeds:
r = evaluate_suite(planner, seed=s)
norths.append(r["north"]); results.append(r)
mean_n = _mean(norths)
# 95% CI ~ 1.96 * std / sqrt(n)
var = _mean([(x - mean_n) ** 2 for x in norths])
ci = 1.96 * (var ** 0.5) / (len(norths) ** 0.5)
last = results[-1]
row = {"system": name, "north": mean_n, "north_ci": ci,
"real_f1": _mean(r["real"] for r in results),
"real_f1_per_seed": [r["real"] for r in results],
"inj_f1": _mean(r["injected"] for r in results),
"damage": _mean(r["damage"] for r in results),
"abstain": _mean(r["abstain"] for r in results),
"n_datasets": last["n"], "seeds": list(seeds)}
table.append(row)
print(f"{name:<24}{mean_n:>9.3f}{ci:>9.3f}{row['real_f1']:>9.3f}"
f"{row['inj_f1']:>8.3f}{row['damage']:>8.3f}{row['abstain']:>9.3f}", flush=True)
if out:
import json
json.dump(table, open(out, "w"), indent=1)
print(f"rows written to {out}")
print(f"\nNORTH* = harmonic mean(error-type macro, domain macro) over {last['n']} datasets. "
"Double-macro + damage + abstain = un-gameable; hospital is 1 dataset of many. "
"The money result: grounded vs the tool people actually use (OpenRefine).")
if __name__ == "__main__":
import argparse
ap = argparse.ArgumentParser()
ap.add_argument("--out", type=str, default=None, help="write table rows to JSON")
main(out=ap.parse_args().out)