File size: 4,998 Bytes
16dc556
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
"""Assemble one synthetic training example: clean table -> dirty table + plan.

make_example() returns a dict with the clean reference, the dirtied dataframe,
the ground-truth plan, and the dirty-data profile. build_dataset.py verifies each
by running the executor and keeps only perfectly-recovered examples.
"""

from __future__ import annotations

import random

import pandas as pd

from scrubdata.profiler import profile_dataframe

from .fields import ARCHETYPES


def _pick_columns(rng: random.Random, k: int):
    """Pick k archetypes (with replacement across types) and unique column names."""
    chosen, used_names = [], set()
    pool = list(ARCHETYPES)
    rng.shuffle(pool)
    for arch in pool:
        if len(chosen) >= k:
            break
        name = next((n for n in arch.names if n not in used_names), None)
        if name is None:
            continue
        used_names.add(name)
        chosen.append((name, arch))
    return chosen


def make_example(rng: random.Random) -> dict:
    # With aggregation profiling the prompt size is bounded by DISTINCT values, not rows,
    # so scale-invariance is free — we vary row counts only to teach varied frequency
    # magnitudes (dominant vs rare), not to enlarge prompts.
    if rng.random() < 0.4:
        n_rows = rng.randint(25, 60)
        n_cols = rng.randint(5, 8)
    else:
        n_rows = rng.randint(10, 25)
        n_cols = rng.randint(3, 6)
    cols = _pick_columns(rng, n_cols)

    clean_data: dict[str, list] = {}
    dirty_data: dict[str, list] = {}
    plan_columns: list[dict] = []

    for name, arch in cols:
        clean_vals = arch.gen_clean(rng, n_rows)
        dirty_vals, clean_vals, ops, issues = arch.corrupt(rng, clean_vals)
        clean_data[name] = clean_vals
        dirty_data[name] = dirty_vals
        if ops:
            plan_columns.append({
                "name": name,
                "detected_semantic_type": arch.semantic_type,
                "issues": issues,
                "operations": ops,
            })

    clean_df = pd.DataFrame(clean_data)
    # Drop any accidental duplicate clean rows so dedup verification is exact.
    clean_df = clean_df.drop_duplicates().reset_index(drop=True)
    n_rows = len(clean_df)
    dirty_df = pd.DataFrame({c: v[:n_rows] for c, v in dirty_data.items()})

    # --- anomaly flags (flag-only: value is KEPT, not changed) ---
    # Teaches the model to surface implausible values without silently editing them.
    flags: list[dict] = []
    numeric_cols = [c["name"] for c in plan_columns
                    if any(o["op"] in ("parse_currency", "parse_number")
                           for o in c["operations"])]
    if numeric_cols and n_rows >= 3 and rng.random() < 0.4:
        col = rng.choice(numeric_cols)
        i = rng.randrange(n_rows)
        anomaly = rng.choice([9_999_999, -100, 0])
        dirty_df.at[i, col] = str(anomaly)
        clean_df.at[i, col] = float(anomaly)   # unchanged by flag-only execution
        flags.append({"column": col, "issue": "out_of_range", "action": "flag_only",
                      "rationale": f"Value {anomaly} is implausible for '{col}'; "
                                   f"flagged for human review, not auto-changed."})

    table_ops: list[dict] = []

    # --- table-level corruptions ---
    if rng.random() < 0.5:  # empty column
        empty_name = rng.choice(["notes2", "col_x", "extra", "unnamed"])
        dirty_df[empty_name] = ""
        table_ops.append({"op": "drop_empty_columns", "columns": [empty_name],
                          "rationale": "Dropped column(s) with no data."})

    extra_rows = []
    if rng.random() < 0.6 and n_rows >= 2:  # exact duplicate rows
        k = rng.randint(1, 2)
        dup_idx = rng.sample(range(n_rows), k)
        extra_rows.extend(dirty_df.iloc[dup_idx].to_dict("records"))
        table_ops.append({"op": "drop_exact_duplicates",
                          "rationale": f"Removed {k} exact duplicate row(s)."})
    if rng.random() < 0.4:  # an empty row
        empty = {c: "" for c in dirty_df.columns}
        extra_rows.append(empty)
        table_ops.append({"op": "drop_empty_rows", "rationale": "Removed 1 fully-empty row."})

    if extra_rows:
        dirty_df = pd.concat([dirty_df, pd.DataFrame(extra_rows)], ignore_index=True)

    # Order table ops the way the executor expects (cols, rows, dedup).
    order = {"drop_empty_columns": 0, "drop_empty_rows": 1, "drop_exact_duplicates": 2}
    table_ops.sort(key=lambda o: order[o["op"]])

    profile = profile_dataframe(dirty_df)
    plan = {
        "dataset_summary": f"{len(dirty_df)} rows × {dirty_df.shape[1]} columns. "
                           f"{len(plan_columns)} column(s) need cleanup, "
                           f"{len(table_ops)} table-level fix(es).",
        "table_operations": table_ops,
        "columns": plan_columns,
        "flags": flags,
    }
    return {"clean_df": clean_df, "dirty_df": dirty_df, "plan": plan, "profile": profile}