"""Real paired dirty/clean datasets -> self-verified SFT training examples. The v4 model aced synthetic data (canon_f1 0.90) but scored 0 on the real Raha hospital table because it had never trained on real high-cardinality messy data. KEY INSIGHT: real *paired* (dirty, clean) datasets let us DERIVE a self-verified ground-truth plan by aligning cells. Wherever dirty[i,j] != clean[i,j], the pair dirty-value -> clean-value is a canonicalize mapping (or a deterministic format fix). Executing the derived plan recovers clean -> the example is self-verified with the SAME executor-recovery gate used for synthetic data. This module: 1) fetches the shortlisted PAIRED Raha datasets (disk-aware: small ones cached under data/real/ which is gitignored; bulky `tax` is sampled then deleted); 2) derive_plan(dirty_df, clean_df) -> plan dict (cell-align -> canonicalize + obvious format/dup fixes) such that apply_plan(dirty, plan) recovers clean; 3) emits chat-format ('messages') examples via build_chat_example using the AGGREGATED profile of the DIRTY table, keeping ONLY examples whose derived plan recovers clean above a threshold (self-verified). Run: uv run training/real_data.py uv run training/real_data.py --datasets hospital beers rayyan flights uv run training/real_data.py --include-tax # fetch+sample+delete tax Does NOT push to HF and does NOT train. """ from __future__ import annotations import argparse import difflib import json import math import re import urllib.request from pathlib import Path import pandas as pd from scrubdata.executor import apply_plan from scrubdata.profiler import profile_dataframe from scrubdata.prompt import build_chat_example ROOT = Path(__file__).resolve().parent.parent REAL_DIR = ROOT / "data" / "real" RAW_BASE = "https://raw.githubusercontent.com/BigDaMa/raha/master/datasets" # Paired datasets. `keep` controls disk policy (HARD constraint: ~5GB free). # Small tables are cached; `tax` is fetched, sampled, then the raw CSV is deleted. DATASETS = { "hospital": {"keep": True, "sample": None}, # already cached (600K) "beers": {"keep": True, "sample": None}, # ~250K "rayyan": {"keep": True, "sample": None}, # ~150K "flights": {"keep": True, "sample": None}, # ~250K "tax": {"keep": False, "sample": 4000}, # ~30MB raw -> sample then DELETE "movies_1": {"keep": True, "sample": 2500}, # real errors: titles/years/cast # stage-2 harvest (training/harvest_stage2.py pre-materializes data/real//; # _download is a no-op when the files exist). EVAL-ONLY sources (generalization # contract, eval/generalization.py) must NEVER appear here: ed2_restaurants. # dblp_acm/dblp_scholar: rejected — unique-value title columns are out-of-regime # (canonicalizable_columns distinct-ratio gate; per-cell fixes = memorization). "fodors_zagats": {"keep": True, "sample": None}, # EM gold pairs -> aligned table "gidcl_imdb": {"keep": True, "sample": 20000}, # 1M-row imdb pair subset (stage-3) "cleanml_company": {"keep": True, "sample": 8000}, # org names/cities "cleanml_movie": {"keep": True, "sample": 4000}, # movie metadata (8 typo cells) } # --------------------------------------------------------------------------- # # fetch (disk-aware) # --------------------------------------------------------------------------- # def _download(url: str, dest: Path) -> None: dest.parent.mkdir(parents=True, exist_ok=True) if not dest.exists(): urllib.request.urlretrieve(url, dest) def fetch_pair(name: str, keep_raw: bool = True) -> tuple[pd.DataFrame, pd.DataFrame]: """Fetch (dirty, clean) for a Raha dataset. Bulky raw files are deleted after load when keep_raw is False (the small derived JSONL is the only persisted output).""" cfg = DATASETS[name] base = REAL_DIR / name dirty_p = base / "dirty.csv" clean_p = base / "clean.csv" _download(f"{RAW_BASE}/{name}/dirty.csv", dirty_p) _download(f"{RAW_BASE}/{name}/clean.csv", clean_p) dirty = pd.read_csv(dirty_p, dtype=str, keep_default_na=False) clean = pd.read_csv(clean_p, dtype=str, keep_default_na=False) sample_n = cfg.get("sample") if sample_n and len(dirty) > sample_n: # Row-aligned sampling: take the first N rows of BOTH (positional align). dirty = dirty.head(sample_n).reset_index(drop=True) clean = clean.head(sample_n).reset_index(drop=True) if not (keep_raw and cfg["keep"]): # Delete the (possibly bulky) raw CSVs; we already loaded them in memory. for p in (dirty_p, clean_p): try: p.unlink() except FileNotFoundError: pass try: base.rmdir() except OSError: pass return dirty, clean # --------------------------------------------------------------------------- # # cell equality (reused contract with build_dataset._cell_equal) # --------------------------------------------------------------------------- # def _cell_equal(a, b) -> bool: a_missing = a is None or (isinstance(a, float) and math.isnan(a)) or pd.isna(a) b_missing = b is None or (isinstance(b, float) and math.isnan(b)) or pd.isna(b) if a_missing or b_missing: return a_missing and b_missing try: return math.isclose(float(a), float(b), rel_tol=1e-6, abs_tol=1e-6) except (TypeError, ValueError): return str(a) == str(b) # --------------------------------------------------------------------------- # # derive a plan from a (dirty, clean) pair # --------------------------------------------------------------------------- # def _norm(s: str) -> str: return "".join(ch.lower() for ch in str(s) if ch.isalnum()) def _is_variant(dirty: str, clean: str) -> bool: """True if `dirty` is a SURFACE VARIANT (typo / casing / punctuation / minor abbreviation) of `clean` — i.e. a learnable canonicalization, not a different valid value. '9:45'->'9:55' (distinct valid times) is rejected; 'birminghxm'-> 'birmingham' and 'WON'->'Won' are accepted.""" nd, nc = _norm(dirty), _norm(clean) if not nd or not nc: return False if nd == nc: # casing / punctuation only return True return difflib.SequenceMatcher(None, nd, nc).ratio() >= 0.72 def _column_diff_pairs(dirty_col, clean_col) -> tuple[dict, bool]: """Collect {dirty_raw_stripped -> clean_value} for rows that differ, keeping ONLY genuine canonicalizations. A pair is kept iff the dirty surface (a) is never a CORRECT value elsewhere in the column (else mapping it would corrupt legit rows), and (b) is a string VARIANT of its clean target. Returns (mapping, ambiguous); rejected/ambiguous pairs set ambiguous=True so they surface as flags.""" correct = {str(c).strip() for d, c in zip(dirty_col, clean_col) if _cell_equal(d, c) and not _is_missing(c)} mapping: dict[str, str] = {} ambiguous = False for dv, cv in zip(dirty_col, clean_col): if _cell_equal(dv, cv): continue if _is_missing(dv) or _is_missing(cv): ambiguous = True # missing source/target: not a canonicalization continue key = str(dv).strip() clean_val = str(cv) if key in correct or not _is_variant(key, clean_val): ambiguous = True # legit-elsewhere or arbitrary correction -> skip continue if key in mapping and mapping[key] != clean_val: ambiguous = True else: mapping[key] = clean_val return mapping, ambiguous def derive_plan(dirty_df: pd.DataFrame, clean_df: pd.DataFrame) -> dict: """Derive a self-verifying cleaning plan that maps dirty -> clean. Columns are aligned POSITIONALLY (Raha hospital/beers rename headers between dirty and clean, e.g. provider_number -> ProviderNumber), so we diff by column index and emit the plan under the DIRTY column name (what the executor sees). Method per column: collect the set of differing (dirty_raw -> clean) pairs and emit a canonicalize_categories op with that mapping. The executor does mapping.get(str(v).strip(), v), so every changed cell is recovered by construction and unchanged cells pass through -> recovery is exact whenever the mapping is unambiguous. Ambiguous columns (same dirty raw -> two cleans, or a missing dirty source) are emitted as flags so they don't break recovery. Table ops: drop_exact_duplicates when clean has fewer rows that are exact dups. (Raha tables are row-aligned 1:1, so this is usually a no-op.) """ n = min(len(dirty_df), len(clean_df)) d = dirty_df.head(n).reset_index(drop=True) c = clean_df.head(n).reset_index(drop=True) profile = profile_dataframe(d) sem_by_idx = {i: profile["columns"][i]["detected_semantic_type"] for i in range(len(profile["columns"]))} issues_by_idx = {i: profile["columns"][i]["issues"] for i in range(len(profile["columns"]))} columns_plan = [] flags = [] n_cols = min(d.shape[1], c.shape[1]) for j in range(n_cols): dirty_name = str(d.columns[j]) dcol = d.iloc[:, j].tolist() ccol = c.iloc[:, j].tolist() mapping, ambiguous = _column_diff_pairs(dcol, ccol) operations = [] if mapping: operations.append({ "op": "canonicalize_categories", "mapping": mapping, "rationale": ( f"{len(mapping)} real variant/typo value(s) mapped to their " "canonical form observed in the clean reference." ), }) col_record = { "name": dirty_name, "detected_semantic_type": sem_by_idx.get(j, "unknown"), "issues": issues_by_idx.get(j, []), "operations": operations, } columns_plan.append(col_record) if ambiguous: flags.append({ "column": dirty_name, "issue": "ambiguous_or_missing_source_values", "action": "flag_only", "rationale": "Some dirty values map to multiple cleans or are " "missing in the source; left for manual review.", }) table_operations = [] if len(clean_df) < len(dirty_df): # Did the missing rows correspond to exact duplicates in dirty? if int(dirty_df.duplicated().sum()) >= (len(dirty_df) - len(clean_df)): table_operations.append({ "op": "drop_exact_duplicates", "rationale": "Clean reference has the exact-duplicate rows removed.", }) n_map_cols = sum(1 for col in columns_plan if col["operations"]) return { "dataset_summary": ( f"Real paired dirty/clean table: {n} rows x {n_cols} columns. Derived " f"{n_map_cols} canonicalization mapping(s) from cell-level dirty->clean " "alignment (real high-cardinality typos/variants)." ), "table_operations": table_operations, "columns": columns_plan, "flags": flags, } # --------------------------------------------------------------------------- # # self-verification: cell recovery of derived plan # --------------------------------------------------------------------------- # def recovery_score(dirty_df: pd.DataFrame, clean_df: pd.DataFrame, plan: dict) -> float: """Fraction of cells (positional) where apply_plan(dirty, plan) matches clean.""" cleaned, _ = apply_plan(dirty_df, plan) n = min(len(cleaned), len(clean_df)) n_cols = min(cleaned.shape[1], clean_df.shape[1]) if n == 0 or n_cols == 0: return 0.0 total = ok = 0 for j in range(n_cols): out_col = cleaned.iloc[:, j].tolist() ref_col = clean_df.iloc[:, j].tolist() for i in range(n): total += 1 if _cell_equal(out_col[i], ref_col[i]): ok += 1 return ok / total if total else 0.0 def max_categorical_cardinality(plan: dict) -> int: """Largest canonicalize mapping (distinct variant count) in the plan.""" best = 0 for col in plan.get("columns", []): for op in col.get("operations", []): if op["op"] == "canonicalize_categories": best = max(best, len(op.get("mapping", {}))) return best def _sample_mapping(plan: dict, k: int = 6) -> tuple[str, dict]: """Pick the column with the largest mapping and return a small sample of it.""" best_col, best_map = None, {} for col in plan.get("columns", []): for op in col.get("operations", []): if op["op"] == "canonicalize_categories": m = op.get("mapping", {}) if len(m) > len(best_map): best_col, best_map = col["name"], m sample = dict(list(best_map.items())[:k]) if best_map else {} return best_col or "", sample # --------------------------------------------------------------------------- # # UNPAIRED real data: derive canonical targets by frequency clustering (no clean # reference needed) -> lets us use ANY messy CSV (Kaggle, gov, gists). # --------------------------------------------------------------------------- # def derive_canon_from_column(values, min_nonmissing: int = 20) -> dict | None: """From a single REAL messy categorical column, derive {variant -> canonical} with NO clean reference: (1) group surfaces by normalized form (casing/punct/ whitespace) -> canonical = most frequent surface in the group; (2) conservatively merge rare single-edit typos onto a much-more-frequent canonical. High precision: only merges when the canonical clearly dominates.""" from collections import Counter surf = [str(v).strip() for v in values if not _is_missing(v)] if len(surf) < min_nonmissing: return None freq = Counter(surf) distinct = list(freq) # must be categorical (values repeat) but with real variety if len(distinct) < 4 or len(distinct) > 0.7 * len(surf): return None groups: dict[str, list[str]] = {} for s in distinct: groups.setdefault(_norm(s), []).append(s) mapping: dict[str, str] = {} canon = set() for members in groups.values(): c = max(members, key=lambda m: freq[m]) # most frequent surface = canonical canon.add(c) for m in members: if m != c: mapping[m] = c # casing/punct/whitespace variant canon_by_freq = sorted(canon, key=lambda c: -freq[c]) for s in distinct: # rare single-edit typos if s in mapping or freq[s] >= 3: continue for c in canon_by_freq: if c != s and _norm(s) != _norm(c) and freq[c] >= 3 * freq[s] and _is_variant(s, c): mapping[s] = c break return mapping if len(mapping) >= 2 else None _ENTITY_TYPES = {"categorical", "city", "state", "country", "text"} _BAD_NAME = re.compile( r"date|time|_at\b|zip|postal|phone|fax|lat|lon|longitude|latitude|number|num\b|" r"\bid\b|_id|amount|salary|wage|hours|price|cost|year|count|total|rate|pct|percent|" r"score|\bage\b|size|qty|quantity", re.I) def _digit_heavy(v: str) -> bool: v = v.strip() return bool(v) and sum(c.isdigit() for c in v) > 0.4 * len(v) def candidate_categorical_columns(df, max_scan: int = 35) -> list[int]: """Auto-detect messy TEXT-ENTITY columns good for canonicalization. Rejects number/date/id/coordinate columns by NAME and by digit-density (those produce arbitrary value-correction noise, not learnable canonicalization).""" from scrubdata.detect import detect_semantic_type, is_missing out = [] for j in range(min(df.shape[1], max_scan)): nm = str(df.columns[j]) if _BAD_NAME.search(nm): continue col = df.iloc[:, j].tolist() vals = [str(v).strip() for v in col if not is_missing(v)][:600] if not vals or sum(_digit_heavy(v) for v in vals) > 0.25 * len(vals): continue if detect_semantic_type(nm, col) not in _ENTITY_TYPES: continue if derive_canon_from_column(col): out.append(j) return out def process_csv_url(name: str, url: str, rng, n_examples: int = 40, sample_rows: int = 4000, threshold: float = 0.97): """Fetch a real (unpaired) CSV, auto-find messy categorical columns, frequency- canonicalize them into an asserted clean_df, and yield self-verified examples. Disk-aware: samples rows, deletes the raw file after.""" # HARD-bounded fetch: read at most ~6MB with a connection timeout, so a slow/ # trickling gov server can't stall the run and huge files never fully download. import io import urllib.request try: req = urllib.request.Request(url, headers={"User-Agent": "Mozilla/5.0"}) with urllib.request.urlopen(req, timeout=20) as resp: data = resp.read(4_000_000) df = pd.read_csv(io.BytesIO(data), dtype=str, keep_default_na=False, on_bad_lines="skip", nrows=sample_rows, encoding_errors="replace", low_memory=False) except Exception as e: # noqa: BLE001 print(f" {name}: FETCH FAILED ({type(e).__name__}: {str(e)[:60]})", flush=True) return [] cats = candidate_categorical_columns(df) if not cats: return [] clean, used = build_clean_from_unpaired(df, cats) if not used: return [] d_sub = df.iloc[:, used].reset_index(drop=True) c_sub = clean.iloc[:, used].reset_index(drop=True) return list(iter_examples(d_sub, c_sub, rng, n_examples, threshold=threshold)) def build_clean_from_unpaired(dirty_df, columns: list[int]): """Build an asserted clean_df by frequency-canonicalizing the given columns of a real (unpaired) table. Returns (clean_df, used_col_indices).""" clean = dirty_df.copy() used = [] for j in columns: col = dirty_df.iloc[:, j].tolist() m = derive_canon_from_column(col) if not m: continue clean.iloc[:, j] = [m.get(str(v).strip(), v) if not _is_missing(v) else v for v in col] used.append(j) return clean, used # --------------------------------------------------------------------------- # # learnable-column selection + subsampling into many small real tables # --------------------------------------------------------------------------- # def _is_missing(v) -> bool: return v is None or (isinstance(v, float) and math.isnan(v)) or pd.isna(v) \ or str(v).strip() == "" def canonicalizable_columns(dirty_df: pd.DataFrame, clean_df: pd.DataFrame, min_nonmissing: int = 12) -> list[int]: """Column indices where canonicalization is a LEARNABLE skill: the clean values repeat (a small canonical set) AND the dirty->clean corrections CLUSTER onto those canonicals (typos/variants), not arbitrary per-cell fixes (flight times, IDs, ZIPs). Those arbitrary columns are memorization noise the model can't generalize, so we drop them.""" n = min(len(dirty_df), len(clean_df)) out = [] for j in range(min(dirty_df.shape[1], clean_df.shape[1])): dcol = dirty_df.iloc[:n, j].tolist() ccol = clean_df.iloc[:n, j].tolist() clean_vals = [str(c) for c in ccol if not _is_missing(c)] if len(clean_vals) < min_nonmissing: continue # (1) clean column is categorical: values repeat (low distinct ratio). if len(set(clean_vals)) / len(clean_vals) > 0.5: continue # (2) it yields >=2 GENUINE canonicalizations (variant typos of a canonical # that isn't a legit value elsewhere) -- this is the learnable signal and it # rejects arbitrary value-correction columns (flight times, IDs). mapping, _ = _column_diff_pairs(dcol, ccol) if len(mapping) >= 2: out.append(j) return out def iter_examples(dirty_df, clean_df, rng, n_examples: int, *, threshold: float = 0.97, min_rows: int = 20, max_rows: int = 90, min_cols: int = 2, max_cols: int = 5): """Yield (record, plan, recovery) for many small REAL sub-tables drawn from a paired dataset, using only learnable canonicalizable columns. Each sub-table is profiled (aggregated value_counts) and gets a derived self-verified plan.""" cols = canonicalizable_columns(dirty_df, clean_df) if not cols: return n = min(len(dirty_df), len(clean_df)) # error-centered window starts: sparse real tables (e.g. 477 diff cells in 28k # rows) yield nothing under uniform sampling — most windows contain no error. diff_rows = sorted({i for j in cols for i in range(n) if not _cell_equal(dirty_df.iat[i, j], clean_df.iat[i, j])}) tries = 0 made = 0 while made < n_examples and tries < n_examples * 6: tries += 1 k = rng.randint(min_rows, min(max_rows, n)) if diff_rows and rng.random() < 0.8: # center a window on an error anchor = rng.choice(diff_rows) start = max(0, min(anchor - rng.randint(0, k - 1), n - k)) else: start = rng.randint(0, max(0, n - k)) hi = min(max_cols, len(cols)) kc = rng.randint(min(min_cols, hi), hi) chosen = sorted(rng.sample(cols, kc)) d_sub = dirty_df.iloc[start:start + k, chosen].reset_index(drop=True) c_sub = clean_df.iloc[start:start + k, chosen].reset_index(drop=True) plan = derive_plan(d_sub, c_sub) if max_categorical_cardinality(plan) < 1: # no errors in this window continue score = recovery_score(d_sub, c_sub, plan) if score < threshold: continue profile = profile_dataframe(d_sub) yield build_chat_example(profile, d_sub, plan), plan, score made += 1 # --------------------------------------------------------------------------- # # main # --------------------------------------------------------------------------- # def process_dataset(name: str, keep_raw: bool, threshold: float) -> dict | None: dirty, clean = fetch_pair(name, keep_raw=keep_raw) plan = derive_plan(dirty, clean) n = min(len(dirty), len(clean)) d = dirty.head(n).reset_index(drop=True) c = clean.head(n).reset_index(drop=True) score = recovery_score(d, c, plan) n_err = sum( 1 for j in range(min(d.shape[1], c.shape[1])) for a, b in zip(d.iloc[:, j].tolist(), c.iloc[:, j].tolist()) if not _cell_equal(a, b) ) profile = profile_dataframe(d) record = build_chat_example(profile, d, plan) sample_col, sample_map = _sample_mapping(plan) return { "name": name, "rows": n, "cols": min(d.shape[1], c.shape[1]), "error_cells": n_err, "recovery": score, "kept": score >= threshold, "max_cardinality": max_categorical_cardinality(plan), "sample_col": sample_col, "sample_map": sample_map, "record": record, } def main() -> None: import random ap = argparse.ArgumentParser() ap.add_argument( "--datasets", nargs="+", default=["hospital", "beers", "rayyan", "flights"], help="paired datasets to process", ) ap.add_argument("--per-dataset", type=int, default=60, help="how many small sub-table examples to draw per dataset") ap.add_argument("--include-tax", action="store_true", help="also fetch+sample+DELETE the bulky tax table") ap.add_argument("--keep-raw", action="store_true", help="keep raw CSVs on disk even for bulky datasets") ap.add_argument("--threshold", type=float, default=0.97, help="min cell recovery to accept a sub-table example (self-verified)") ap.add_argument("--seed", type=int, default=13) ap.add_argument("--unpaired-json", type=str, default=None, help="JSON file: [{'name','url'}] of real messy CSVs (Kaggle/gov/gists)") ap.add_argument("--out", type=str, default="data/real_train.jsonl") args = ap.parse_args() datasets = list(args.datasets) if args.include_tax and "tax" not in datasets: datasets.append("tax") out_path = ROOT / args.out out_path.parent.mkdir(parents=True, exist_ok=True) rng = random.Random(args.seed) rows = [] total = 0 best_overall = (0, "", "", {}) # (card, dataset, col, mapping) with out_path.open("w", encoding="utf-8") as f: for name in datasets: if name not in DATASETS: print(f" skip unknown dataset: {name}") continue try: dirty, clean = fetch_pair(name, keep_raw=args.keep_raw) except Exception as e: # noqa: BLE001 print(f" {name}: FETCH FAILED ({type(e).__name__}: {e})") continue cols = canonicalizable_columns(dirty, clean) col_names = [str(dirty.columns[j]) for j in cols] made = 0 maxcard = 0 for record, plan, _score in iter_examples( dirty, clean, rng, args.per_dataset, threshold=args.threshold): f.write(json.dumps(record, ensure_ascii=False) + "\n") made += 1 card = max_categorical_cardinality(plan) maxcard = max(maxcard, card) if card > best_overall[0]: col, mp = _sample_mapping(plan) best_overall = (card, name, col, mp) total += made rows.append((name, len(cols), made, maxcard, col_names[:6])) # ---- unpaired real CSVs (Kaggle / government / GitHub gists) ---- unpaired_domains = 0 if args.unpaired_json: sources = json.loads(Path(args.unpaired_json).read_text()) with out_path.open("a", encoding="utf-8") as f: for src in sources: nm = src["name"] try: ex = process_csv_url(nm, src["url"], rng, n_examples=args.per_dataset) except Exception as e: # noqa: BLE001 print(f" {nm}: ERROR {type(e).__name__}: {str(e)[:60]}") continue for record, _plan, _s in ex: f.write(json.dumps(record, ensure_ascii=False) + "\n") if ex: unpaired_domains += 1 total += len(ex) print(f" [unpaired] {src.get('domain', nm):<20} {len(ex):>3} examples") rows.append((nm, "-", len(ex), "-", [src.get("domain", "")])) print("\n=== Real-data enrichment (many small self-verified tables) ===") hdr = f"{'dataset':<22}{'examples':>9} domain/columns" print(hdr) print("-" * len(hdr)) for row in rows: name, ncols, made, maxcard, names = row print(f"{str(name):<22}{made:>9} {', '.join(str(x) for x in names)[:48]}") paired_domains = sum(1 for r in rows if r[2] and r[1] != "-") print(f"\nDOMAINS with examples: {sum(1 for r in rows if r[2])} " f"(paired: {paired_domains}, unpaired: {unpaired_domains})") print(f"Wrote {total} self-verified REAL training examples to {out_path}") if best_overall[0]: card, ds, col, mp = best_overall print(f"Richest real mapping: {ds}.{col} ({card} distinct variants). Sample:") for raw, canon in list(mp.items())[:6]: print(f" {raw!r:>34} -> {canon!r}") if __name__ == "__main__": main()