Spaces:
Running
Running
| """Assemble one synthetic training example: clean table -> dirty table + plan. | |
| make_example() returns a dict with the clean reference, the dirtied dataframe, | |
| the ground-truth plan, and the dirty-data profile. build_dataset.py verifies each | |
| by running the executor and keeps only perfectly-recovered examples. | |
| """ | |
| from __future__ import annotations | |
| import random | |
| import pandas as pd | |
| from scrubdata.profiler import profile_dataframe | |
| from .fields import ARCHETYPES | |
| def _pick_columns(rng: random.Random, k: int): | |
| """Pick k archetypes (with replacement across types) and unique column names.""" | |
| chosen, used_names = [], set() | |
| pool = list(ARCHETYPES) | |
| rng.shuffle(pool) | |
| for arch in pool: | |
| if len(chosen) >= k: | |
| break | |
| name = next((n for n in arch.names if n not in used_names), None) | |
| if name is None: | |
| continue | |
| used_names.add(name) | |
| chosen.append((name, arch)) | |
| return chosen | |
| def make_example(rng: random.Random) -> dict: | |
| # With aggregation profiling the prompt size is bounded by DISTINCT values, not rows, | |
| # so scale-invariance is free — we vary row counts only to teach varied frequency | |
| # magnitudes (dominant vs rare), not to enlarge prompts. | |
| if rng.random() < 0.4: | |
| n_rows = rng.randint(25, 60) | |
| n_cols = rng.randint(5, 8) | |
| else: | |
| n_rows = rng.randint(10, 25) | |
| n_cols = rng.randint(3, 6) | |
| cols = _pick_columns(rng, n_cols) | |
| clean_data: dict[str, list] = {} | |
| dirty_data: dict[str, list] = {} | |
| plan_columns: list[dict] = [] | |
| for name, arch in cols: | |
| clean_vals = arch.gen_clean(rng, n_rows) | |
| dirty_vals, clean_vals, ops, issues = arch.corrupt(rng, clean_vals) | |
| clean_data[name] = clean_vals | |
| dirty_data[name] = dirty_vals | |
| if ops: | |
| plan_columns.append({ | |
| "name": name, | |
| "detected_semantic_type": arch.semantic_type, | |
| "issues": issues, | |
| "operations": ops, | |
| }) | |
| clean_df = pd.DataFrame(clean_data) | |
| # Drop any accidental duplicate clean rows so dedup verification is exact. | |
| clean_df = clean_df.drop_duplicates().reset_index(drop=True) | |
| n_rows = len(clean_df) | |
| dirty_df = pd.DataFrame({c: v[:n_rows] for c, v in dirty_data.items()}) | |
| # --- anomaly flags (flag-only: value is KEPT, not changed) --- | |
| # Teaches the model to surface implausible values without silently editing them. | |
| flags: list[dict] = [] | |
| numeric_cols = [c["name"] for c in plan_columns | |
| if any(o["op"] in ("parse_currency", "parse_number") | |
| for o in c["operations"])] | |
| if numeric_cols and n_rows >= 3 and rng.random() < 0.4: | |
| col = rng.choice(numeric_cols) | |
| i = rng.randrange(n_rows) | |
| anomaly = rng.choice([9_999_999, -100, 0]) | |
| dirty_df.at[i, col] = str(anomaly) | |
| clean_df.at[i, col] = float(anomaly) # unchanged by flag-only execution | |
| flags.append({"column": col, "issue": "out_of_range", "action": "flag_only", | |
| "rationale": f"Value {anomaly} is implausible for '{col}'; " | |
| f"flagged for human review, not auto-changed."}) | |
| table_ops: list[dict] = [] | |
| # --- table-level corruptions --- | |
| if rng.random() < 0.5: # empty column | |
| empty_name = rng.choice(["notes2", "col_x", "extra", "unnamed"]) | |
| dirty_df[empty_name] = "" | |
| table_ops.append({"op": "drop_empty_columns", "columns": [empty_name], | |
| "rationale": "Dropped column(s) with no data."}) | |
| extra_rows = [] | |
| if rng.random() < 0.6 and n_rows >= 2: # exact duplicate rows | |
| k = rng.randint(1, 2) | |
| dup_idx = rng.sample(range(n_rows), k) | |
| extra_rows.extend(dirty_df.iloc[dup_idx].to_dict("records")) | |
| table_ops.append({"op": "drop_exact_duplicates", | |
| "rationale": f"Removed {k} exact duplicate row(s)."}) | |
| if rng.random() < 0.4: # an empty row | |
| empty = {c: "" for c in dirty_df.columns} | |
| extra_rows.append(empty) | |
| table_ops.append({"op": "drop_empty_rows", "rationale": "Removed 1 fully-empty row."}) | |
| if extra_rows: | |
| dirty_df = pd.concat([dirty_df, pd.DataFrame(extra_rows)], ignore_index=True) | |
| # Order table ops the way the executor expects (cols, rows, dedup). | |
| order = {"drop_empty_columns": 0, "drop_empty_rows": 1, "drop_exact_duplicates": 2} | |
| table_ops.sort(key=lambda o: order[o["op"]]) | |
| profile = profile_dataframe(dirty_df) | |
| plan = { | |
| "dataset_summary": f"{len(dirty_df)} rows × {dirty_df.shape[1]} columns. " | |
| f"{len(plan_columns)} column(s) need cleanup, " | |
| f"{len(table_ops)} table-level fix(es).", | |
| "table_operations": table_ops, | |
| "columns": plan_columns, | |
| "flags": flags, | |
| } | |
| return {"clean_df": clean_df, "dirty_df": dirty_df, "plan": plan, "profile": profile} | |