File size: 8,521 Bytes
16dc556
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
"""D1 β€” the GENERALIZATION metric: held-out-source real-error evaluation.

The wide-suite REAL slice mixes sources whose pairs are IN the champion's training mix
(hospital/beers/movies_1 -> mixA), so it part-measures memorization. This metric fixes
that and one more honesty problem:

  * HELD-OUT SOURCES ONLY: a model is scored only on real-error benchmarks whose pairs
    were never used to train it. The split is explicit and committed (TRAIN_SOURCES);
    new harvested sources must be assigned to exactly one side.
  * ERROR-CLASS BREAKDOWN: benchmark errors split by the SAME variant gate the training
    derivation uses (training.real_data._is_variant β€” one source of truth). A
    canonicalization system claims competence on the VARIANT class (typos / casing /
    aliases); imputation-class errors (missing or non-variant rewrites) are reported,
    never hidden, but a system that abstains on them is behaving correctly.

Headline numbers per system:
    GEN-F1          churn-neutral F1 over ALL errors, macro over held-out sources
    VARIANT-RECALL  share of variant-class errors repaired (claimed competence)
    VARIANT-PREC    of committed changes on variant cells, share correct
    damage          clean cells corrupted (churn-neutral)

DISCLOSED class imperfection: the string-variant gate over-counts on flights β€”
single-digit time differences ('7:59 p.m.' vs '7:58 p.m.') pass the similarity
threshold but are cross-source VALUE disagreements (need per-entity cross-row
voting, a different capability), not surface canonicalization. ~950 of flights'
1049 "variant" errors are of this kind; treat flights' variant-recall as a
lower-bound stress number, not addressable headroom.

    uv run python -m eval.generalization                 # grounded heuristic baseline
"""

from __future__ import annotations

import argparse
import json

from scrubdata.executor import apply_plan
from scrubdata.planner import mock_plan
from training.real_data import _is_variant

from .metrics import _cell_equal
from .run_real_multi import _cell_only, _fetch, _sem_equal, score

# pairs used to train the current champion (v6 = mixA) β€” anything here is OFF-LIMITS
# for generalization scoring of that model. Update per training run.
TRAIN_SOURCES = {"v6": {"hospital", "beers", "movies_1"}}

# held-out real-error sources. Harvested D1 sources get appended here OR to the
# training side β€” never both. ed2_restaurants (stage-2 harvest): real NYC-restaurant
# typos, in-regime, EVAL-ONLY β€” its sibling domain source (fodors_zagats) trains, so
# this measures cross-source same-domain transfer. dblp_scholar was REJECTED as an
# eval source: its gold systematically prefers the opposite case convention from the
# dirty side (Scholar lowercase vs DBLP Title Case), which measures convention
# preference, not cleaning β€” the artifact this metric is designed against.
EVAL_SOURCES = ["flights", "rayyan", "ed2_restaurants"]


def variant_breakdown(dirty, clean, out) -> dict:
    """Split benchmark errors by class and count repairs per class (churn-neutral)."""
    n = min(len(dirty), len(out), len(clean))
    c = {"variant_errors": 0, "variant_fixed": 0, "variant_changed": 0,
         "variant_good": 0, "other_errors": 0, "other_fixed": 0}
    for j, col in enumerate(dirty.columns):
        present = col in out.columns
        for i in range(n):
            dv, cv = dirty.iat[i, j], clean.iat[i, j]
            if _cell_equal(dv, cv):
                continue                                   # not a benchmark error
            ov = out.iloc[i][col] if present else dv
            chg = present and not _cell_equal(ov, dv)
            if chg and _sem_equal(ov, dv) and not _cell_equal(ov, cv):
                chg = False                                # churn: ignore
            fixed = _cell_equal(ov, cv) or (_sem_equal(ov, cv) and chg)
            is_variant = (str(dv).strip() and str(cv).strip()
                          and _is_variant(str(dv), str(cv)))
            if is_variant:
                c["variant_errors"] += 1
                c["variant_fixed"] += int(fixed)
                if chg:
                    c["variant_changed"] += 1
                    c["variant_good"] += int(_sem_equal(ov, cv))
            else:
                c["other_errors"] += 1
                c["other_fixed"] += int(fixed)
    return c


def evaluate_generalization(planner, sources=None, label: str = "system") -> dict:
    sources = sources or EVAL_SOURCES
    rows = []
    for name in sources:
        # FULL tables, no truncation β€” ed2_restaurants' real errors are concentrated
        # outside the first 2k rows (_raha_pair's head(2000) hid 473 of 477).
        dirty, clean = _fetch(name)
        cleaned, _ = apply_plan(dirty, _cell_only(planner(dirty)))
        m = score(dirty, clean, cleaned)
        b = variant_breakdown(dirty, clean, cleaned)
        rows.append({"source": name, **{k: m[k] for k in
                                        ("f1", "precision", "recall", "damage")}, **b})
        print(f"  {name:<10} F1={m['f1']:.3f} dmg={m['damage']:.3f} | variant: "
              f"{b['variant_fixed']}/{b['variant_errors']} fixed, "
              f"{b['variant_good']}/{b['variant_changed']} changes good | "
              f"other: {b['other_fixed']}/{b['other_errors']}", flush=True)

    return _aggregate(rows, sources, label)


def evaluate_captured_union(plans: dict, sources, label: str, tau: float = 0.5) -> dict:
    """Score the SHIPPED pipeline from captured raw model plans (Modal --capture):
    per source, verify(tau) the captured plan, union with the grounded heuristic β€”
    byte-identical composition to scrubdata/active.py."""
    from scrubdata.verifier import union_plans, verify_plan

    def planner_for(name):
        def planner(df, *_):
            return union_plans(verify_plan(df, plans[name], tau=tau), mock_plan(df))
        return planner

    rows = []
    for name in sources:
        dirty, clean = _fetch(name)
        cleaned, _ = apply_plan(dirty, _cell_only(planner_for(name)(dirty)))
        m = score(dirty, clean, cleaned)
        b = variant_breakdown(dirty, clean, cleaned)
        rows.append({"source": name, **{k: m[k] for k in
                                        ("f1", "precision", "recall", "damage")}, **b})
        print(f"  {name:<16} F1={m['f1']:.3f} dmg={m['damage']:.3f} | variant: "
              f"{b['variant_fixed']}/{b['variant_errors']} fixed", flush=True)
    return _aggregate(rows, sources, label)


def _aggregate(rows, sources, label) -> dict:
    def mean(xs):
        xs = list(xs)
        return sum(xs) / len(xs) if xs else 0.0

    def rate(num, den):
        return num / den if den else 0.0

    out = {
        "system": label, "sources": list(sources),
        "gen_f1": mean(r["f1"] for r in rows),
        "variant_recall": mean(rate(r["variant_fixed"], r["variant_errors"]) for r in rows),
        "variant_precision": mean(rate(r["variant_good"], r["variant_changed"])
                                  if r["variant_changed"] else 1.0 for r in rows),
        "other_recall": mean(rate(r["other_fixed"], r["other_errors"]) for r in rows),
        "damage": mean(r["damage"] for r in rows),
        "per_source": rows,
    }
    print(f"{label}: GEN-F1={out['gen_f1']:.3f} VARIANT-RECALL={out['variant_recall']:.3f} "
          f"VARIANT-PREC={out['variant_precision']:.3f} dmg={out['damage']:.3f}")
    return out


def main() -> None:
    ap = argparse.ArgumentParser()
    ap.add_argument("--sources", default=",".join(EVAL_SOURCES))
    ap.add_argument("--plans", default=None,
                    help="JSON file {source: captured raw model plan} -> score the "
                         "shipped union pipeline instead of the local baselines")
    ap.add_argument("--label", default="captured union")
    ap.add_argument("--out", default="eval/results/generalization_baseline.json")
    args = ap.parse_args()
    sources = args.sources.split(",")
    if args.plans:
        plans = json.load(open(args.plans))
        results = [evaluate_captured_union(plans, sources, args.label)]
    else:
        results = [
            evaluate_generalization(mock_plan, sources, "grounded heuristic"),
            evaluate_generalization(
                lambda df: {"table_operations": [], "columns": [], "flags": []},
                sources, "no-op"),
        ]
    json.dump(results, open(args.out, "w"), indent=1)
    print(f"written to {args.out}")


if __name__ == "__main__":
    main()