File size: 20,710 Bytes
16dc556
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
"""Field archetypes: clean-value generators + matched corruptors.

Each archetype produces clean values in the SAME canonical/typed representation
that scrubdata.executor outputs, and a `corrupt()` that dirties a clean column
while returning the exact ground-truth column-operations. Designed so that
executor(dirty, ops) == clean (verified downstream).
"""

from __future__ import annotations

import random

from . import vocab as V

# ---- shared corruption helpers ----------------------------------------------

DISGUISED = ["N/A", "na", "-", "--", "null", "None", "?", "#N/A", "TBD",
             "empty", "(empty)", "n/a", "NULL", "none", "unknown"]


def _add_whitespace(rng: random.Random, s: str) -> str:
    choice = rng.random()
    if choice < 0.4:
        return " " * rng.randint(1, 3) + s
    if choice < 0.7:
        return s + " " * rng.randint(1, 3)
    # doubled internal space
    parts = s.split(" ")
    if len(parts) > 1:
        i = rng.randrange(len(parts) - 1)
        parts[i] = parts[i] + " "
        return " ".join(parts)
    return " " + s + " "


def _inject_disguised_nulls(rng: random.Random, values, clean, p=0.12):
    """Randomly turn some cells into disguised-null tokens; clean value = None."""
    used = False
    out_dirty, out_clean = [], []
    for d, c in zip(values, clean):
        if rng.random() < p:
            out_dirty.append(rng.choice(DISGUISED))
            out_clean.append(None)
            used = True
        else:
            out_dirty.append(d)
            out_clean.append(c)
    return out_dirty, out_clean, used


# ---- archetypes --------------------------------------------------------------

class Field:
    semantic_type = "text"
    names: list[str] = []

    def gen_clean(self, rng: random.Random, n: int):
        raise NotImplementedError

    def corrupt(self, rng: random.Random, clean):
        """Return (dirty_values, clean_values, ops, issues)."""
        raise NotImplementedError


class NameField(Field):
    semantic_type = "text"
    names = ["name", "full_name", "customer", "contact", "rep"]
    FIRST = ["Alice", "Bob", "Carol", "David", "Eve", "Frank", "Grace", "Heidi",
             "Ivan", "Judy", "Karl", "Lena", "Mona", "Omar", "Priya", "Sara"]
    LAST = ["Johnson", "Smith", "Diaz", "Lee", "Adams", "Moore", "Park", "Cruz",
            "Petrov", "Wong", "Brandt", "Fischer", "Ali", "Khan", "Novak", "Reyes",
            "O'Brien", "D'Angelo", "Saint-Clair", "Smith-Jones", "N'Diaye"]

    def gen_clean(self, rng, n):
        return [f"{rng.choice(self.FIRST)} {rng.choice(self.LAST)}" for _ in range(n)]

    def corrupt(self, rng, clean):
        dirty = [_add_whitespace(rng, c) if rng.random() < 0.5 else c for c in clean]
        ops = [{"op": "strip_whitespace",
                "rationale": "Trimmed leading/trailing and doubled spaces."}]
        issues = ["whitespace"]
        # high-cardinality regime: unicode punctuation artifacts (curly quotes, long
        # dashes, NBSP). Inverse of executor._PUNCT_MAP -> execution-verified.
        if rng.random() < 0.45:
            punct = False
            for i, v in enumerate(dirty):
                if rng.random() < 0.35:
                    w = v.replace("'", "’").replace("-", "–")
                    if " " in w and rng.random() < 0.3:
                        k = w.rindex(" ")
                        w = w[:k] + "Β " + w[k + 1:]
                    if w != v:
                        dirty[i] = w
                        punct = True
            if punct:
                ops.append({"op": "normalize_punctuation",
                            "rationale": "Normalized curly quotes / long dashes / "
                                         "NBSP artifacts to plain ASCII."})
                issues.append("unicode_punctuation")
        return dirty, clean, ops, issues


class CompanyField(NameField):
    names = ["company", "organization", "account", "employer"]
    POOL = ["Acme Inc", "Globex", "Initech", "Umbrella", "Soylent Corp", "Hooli",
            "Vehement", "Stark Industries", "Wonka Co", "Cyberdyne",
            "O'Reilly & Sons", "Day-Lewis Group", "L'Atelier Co"]

    def gen_clean(self, rng, n):
        return [rng.choice(self.POOL) for _ in range(n)]


class EmailField(Field):
    semantic_type = "email"
    names = ["email", "email_address", "contact_email"]

    def gen_clean(self, rng, n):
        out = []
        for _ in range(n):
            user = "".join(rng.choice("abcdefghijklmnop") for _ in range(rng.randint(4, 7)))
            dom = rng.choice(["example.com", "mail.com", "corp.io", "test.org"])
            out.append(f"{user}@{dom}")
        return out

    def corrupt(self, rng, clean):
        dirty = []
        for c in clean:
            v = c.upper() if rng.random() < 0.5 else c
            if rng.random() < 0.4:
                v = _add_whitespace(rng, v)
            dirty.append(v)
        ops = [{"op": "normalize_email",
                "rationale": "Lowercased and trimmed email addresses."}]
        return dirty, clean, ops, ["casing", "whitespace"]


class VocabField(Field):
    """Categorical column backed by a real vocabulary (canonical -> aliases).

    LOW-card mode (default): draws a FEW canonicals (every surface shows in the
    sample). HIGH-card mode (high_card=True): draws MANY (min_card..max_card, e.g.
    30..80) real canonicals with a DOMINANT-canonical long-tailed row distribution
    and single-char-substitution typos in the tail β€” replicating the hospital
    birmingham(75) + birminghxm(1) regime. Both corrupt() and record surface->
    canonical so canonicalize_categories recovers the clean value (self-verified)."""

    def __init__(self, names, semantic_type, entries, max_card=5, min_card=2,
                 high_card=False, typo_p=0.13):
        self.names = names
        self.semantic_type = semantic_type
        self.entries = entries
        self._canonicals = list(entries)
        self.max_card = max_card
        self.min_card = min_card
        self.high_card = high_card
        self.typo_p = typo_p

    def _choose(self, rng):
        lo = max(2, min(self.min_card, len(self._canonicals)))
        hi = min(self.max_card, len(self._canonicals))
        k = rng.randint(min(lo, hi), hi)
        return rng.sample(self._canonicals, k)

    def _gen_rows(self, rng, n):
        """Long-tailed row draw: a few dominant canonicals carry most of the mass,
        the rest form a sparse tail (where typo surfaces land as rare singletons).
        Falls back to uniform for low-card columns."""
        chosen = self._chosen
        if not self.high_card or len(chosen) < 6:
            return [rng.choice(chosen) for _ in range(n)]
        # Zipf-like weights: a couple of dominant values, steeply decaying tail.
        order = list(chosen)
        rng.shuffle(order)
        weights = [1.0 / ((i + 1) ** 1.6) for i in range(len(order))]
        # Boost the single top canonical so a clear dominant emerges (birmingham 75).
        weights[0] *= 3.0
        return rng.choices(order, weights=weights, k=n)

    def gen_clean(self, rng, n):
        self._chosen = self._choose(rng)
        return self._gen_rows(rng, n)

    def _surface_for(self, rng, c, force_typo):
        """One dirty surface for canonical c. force_typo guarantees a single-char
        substitution typo (rare-tail birminghxm regime)."""
        aliases = self.entries.get(c, [])
        if force_typo:
            s = V.make_substitution_typo(rng, c)
            return s
        return V.make_surface(rng, c, aliases, typo_p=self.typo_p)

    def corrupt(self, rng, clean):
        # Decide which canonicals get a guaranteed single-char typo surface (high-card
        # only): a controlled fraction of the present canonicals, applied to ONE of
        # their occurrences so it lands as a rare tail singleton.
        present = list(dict.fromkeys(clean))
        forced_typo_canon = set()
        if self.high_card:
            frac = rng.uniform(0.3, 0.6)
            k = max(1, int(len(present) * frac))
            forced_typo_canon = set(rng.sample(present, min(k, len(present))))
        # Reserve, per forced canonical, exactly one row index to carry the typo.
        forced_slot = {}
        if forced_typo_canon:
            for canon in forced_typo_canon:
                idxs = [i for i, c in enumerate(clean) if c == canon]
                if idxs:
                    forced_slot[rng.choice(idxs)] = canon

        # Build mapping collision-safely: a surface may only map to ONE canonical, and
        # a surface that equals some canonical's clean form must not be remapped.
        # Reserve all clean canonical strings as "do not remap" keys.
        reserved = {str(c).strip() for c in present}
        mapping = {}
        dirty, ws = [], False
        for i, c in enumerate(clean):
            force = i in forced_slot
            for _attempt in range(4):
                s = self._surface_for(rng, c, force_typo=force)
                key = str(s).strip()
                if key == str(c).strip():
                    break  # already canonical surface, no mapping needed
                # Skip surfaces that collide with another canonical, or that some
                # other canonical already claims (would make the mapping ambiguous).
                if key in reserved:
                    s = c       # ambiguous -> fall back to clean (still verifies)
                    break
                if key in mapping and mapping[key] != c:
                    s = c       # collision with a different canonical's surface
                    break
                break
            key = str(s).strip()
            if key != str(c).strip() and key not in reserved:
                mapping[key] = c
            cell = s
            # whitespace noise (less often on high-card to keep the tail clean)
            if rng.random() < (0.12 if self.high_card else 0.25):
                cell = _add_whitespace(rng, s)
                ws = True
            dirty.append(cell)

        ops, issues = [], ["inconsistent_categories", "casing"]
        if ws:  # strip first so canonicalize sees the bare surface (executor order)
            ops.append({"op": "strip_whitespace",
                        "rationale": "Trimmed surrounding/doubled spaces."})
            issues.append("whitespace")
        if mapping:
            ops.append({"op": "canonicalize_categories", "mapping": mapping,
                        "rationale": f"Unified {len(mapping)} variant spelling(s) "
                                     f"into canonical labels."})
        return dirty, clean, ops, issues


class StatusField(VocabField):
    """Like VocabField but picks a fresh status/category value-set each example."""

    def __init__(self):
        super().__init__(
            names=["status", "stage", "tier", "segment", "state", "payment_status"],
            semantic_type="categorical", entries={}, max_card=4)

    def gen_clean(self, rng, n):
        self.entries = rng.choice(V._STATUS_SETS)
        self._canonicals = list(self.entries)
        self._chosen = self._choose(rng)
        return self._gen_rows(rng, n)


class CurrencyField(Field):
    semantic_type = "currency"
    names = ["amount", "revenue", "price", "deal_size", "cost"]

    def gen_clean(self, rng, n):
        return [round(rng.uniform(50, 9000), 2) for _ in range(n)]

    def _fmt(self, rng, x: float) -> str:
        neg = x < 0
        a = abs(x)
        style = rng.random()
        if style < 0.4:
            s = f"${a:,.2f}"
        elif style < 0.7 and a == int(a):
            s = f"{int(a):,d}"            # grouped integer β€” only when no cents to lose
        else:  # EU style (comma decimal) β€” always preserves 2 decimals
            s = f"{a:,.2f}".replace(",", "X").replace(".", ",").replace("X", ".")
        return f"({s})" if neg else s

    def corrupt(self, rng, clean):
        dirty = [self._fmt(rng, c) for c in clean]
        dirty, clean2, used_null = _inject_disguised_nulls(rng, dirty, clean)
        ops, issues = [], ["numeric_stored_as_text", "currency_symbols"]
        if used_null:
            ops.append({"op": "normalize_disguised_nulls",
                        "rationale": "Converted N/A, '-', 'null' etc. to true missing."})
            issues.append("disguised_nulls")
        ops.append({"op": "parse_currency",
                    "rationale": "Stripped currency symbols/grouping; parsed to number."})
        return dirty, clean2, ops, issues


class DateField(Field):
    semantic_type = "date"
    names = ["signup_date", "created_at", "close_date", "date", "order_date"]

    def gen_clean(self, rng, n):
        out = []
        for _ in range(n):
            y, m, d = 2023, rng.randint(1, 12), rng.randint(1, 28)
            out.append(f"{y:04d}-{m:02d}-{d:02d}")
        return out

    def _fmt(self, rng, iso: str) -> str:
        y, m, d = iso.split("-")
        months = ["Jan", "Feb", "Mar", "Apr", "May", "Jun", "Jul", "Aug",
                  "Sep", "Oct", "Nov", "Dec"]
        style = rng.random()
        if style < 0.3:
            return iso
        if style < 0.55:
            return f"{int(m)}/{int(d)}/{y}"          # US slash (m<=12, d<=28 -> unambiguous-ish)
        if style < 0.8:
            return f"{int(d)} {months[int(m)-1]} {y}"  # 5 Jan 2023
        # Excel serial
        import datetime
        base = datetime.date(1899, 12, 30)
        serial = (datetime.date(int(y), int(m), int(d)) - base).days
        return str(serial)

    def corrupt(self, rng, clean):
        dirty = [self._fmt(rng, c) for c in clean]
        ops = [{"op": "parse_date",
                "rationale": "Unified mixed date formats to ISO YYYY-MM-DD."}]
        return dirty, clean, ops, ["mixed_date_formats"]


class BooleanField(Field):
    semantic_type = "boolean"
    names = ["is_active", "subscribed", "verified", "opted_in"]
    TRUE = ["Yes", "Y", "TRUE", "true", "1", "T"]
    FALSE = ["No", "N", "FALSE", "false", "0", "F"]

    def gen_clean(self, rng, n):
        return [rng.random() < 0.5 for _ in range(n)]

    def corrupt(self, rng, clean):
        dirty = [rng.choice(self.TRUE if c else self.FALSE) for c in clean]
        ops = [{"op": "standardize_boolean",
                "rationale": "Mapped Yes/Y/1/TRUE β†’ true, No/N/0/FALSE β†’ false."}]
        return dirty, clean, ops, ["inconsistent_booleans"]


class PhoneField(Field):
    semantic_type = "phone"
    names = ["phone", "phone_number", "mobile", "contact_number"]

    def gen_clean(self, rng, n):
        # Canonical = executor's output for a plain 10-digit US number.
        out, self._digits = [], []
        for _ in range(n):
            d = "".join(str(rng.randint(0, 9)) for _ in range(10))
            d = "5" + d[1:]  # keep it phone-ish
            self._digits.append(d)
            out.append(f"({d[0:3]}) {d[3:6]}-{d[6:]}")
        return out

    def corrupt(self, rng, clean):
        dirty = []
        for d in self._digits:
            style = rng.random()
            if style < 0.25:
                dirty.append(f"{d[0:3]}.{d[3:6]}.{d[6:]}")
            elif style < 0.5:
                dirty.append(f"{d[0:3]}-{d[3:6]}-{d[6:]}")
            elif style < 0.75:
                dirty.append(d)
            else:
                dirty.append(f"({d[0:3]}){d[3:6]}-{d[6:]}")
        ops = [{"op": "standardize_phone",
                "rationale": "Standardized phone formatting."}]
        return dirty, clean, ops, ["inconsistent_formats"]


class PercentField(Field):
    semantic_type = "percent"
    names = ["rate", "discount", "completion", "margin", "growth", "conversion"]

    def gen_clean(self, rng, n):
        self._pct = [round(rng.uniform(0, 100), 1) for _ in range(n)]
        return [p / 100 for p in self._pct]

    def corrupt(self, rng, clean):
        dirty = [f"{p}%" for p in self._pct]
        ops = [{"op": "parse_percent", "rationale": "Parsed percent text to a fraction."}]
        return dirty, clean, ops, ["numeric_stored_as_text"]


ARCHETYPES: list[Field] = [
    NameField(), CompanyField(), EmailField(), PercentField(),
    VocabField(["country", "nation", "country_name"], "country", V.country_vocab(), max_card=5),
    VocabField(["state", "province", "region"], "state", V.state_vocab(), max_card=5),
    VocabField(["currency", "currency_code", "ccy"], "categorical", V.currency_vocab(), max_card=4),
    VocabField(["city", "location", "hq_city"], "city", V.city_vocab(), max_card=5),
    VocabField(["department", "dept", "team"], "categorical", V.department_vocab(), max_card=4),
    VocabField(["job_title", "title", "role", "position"], "categorical", V.job_title_vocab(), max_card=4),
    # real O*NET occupations (alternate title -> canonical, CC BY 4.0): 1,016 canonicals
    *([VocabField(["job_title", "occupation", "role"], "categorical",
                  V._cached("onet", lambda: V._alias_file("onet_jobtitle_aliases.jsonl", limit=1016)),
                  max_card=5),
       VocabField(["job_title", "occupation"], "categorical",
                  V._cached("onet", lambda: V._alias_file("onet_jobtitle_aliases.jsonl", limit=1016)),
                  min_card=25, max_card=60, high_card=True)]
      if V._alias_file("onet_jobtitle_aliases.jsonl", limit=2) else []),
    # real nickname->formal first names (Bill -> William; Apache-2.0)
    *([VocabField(["first_name", "given_name", "contact_first"], "categorical",
                  V.nickname_vocab(), max_card=5),
       VocabField(["first_name", "given_name"], "categorical",
                  V.nickname_vocab(), min_card=25, max_card=60, high_card=True)]
      if V.nickname_vocab() else []),
    # ToughTables gold-anchored entity misspellings (SemTab 2T, CC-BY-4.0): 49.6k real
    # variant aliases across people/films/places β€” the grouped-entity regime
    *([VocabField(["name", "entity", "person", "artist"], "categorical",
                  V._cached("tt", lambda: V._alias_file("toughtables_aliases.jsonl", limit=3000)),
                  max_card=5),
       VocabField(["name", "entity", "player"], "categorical",
                  V._cached("tt", lambda: V._alias_file("toughtables_aliases.jsonl", limit=3000)),
                  min_card=25, max_card=60, high_card=True)]
      if V._alias_file("toughtables_aliases.jsonl", limit=2) else []),
    # RxNorm prescribable drugs (public domain): synonym/TTY variants -> ingredient
    *([VocabField(["drug", "medication", "drug_name", "prescription"], "categorical",
                  V._cached("rxnorm", lambda: V._alias_file("rxnorm_aliases.jsonl", limit=1500)),
                  max_card=5),
       VocabField(["drug", "medication"], "categorical",
                  V._cached("rxnorm", lambda: V._alias_file("rxnorm_aliases.jsonl", limit=1500)),
                  min_card=25, max_card=60, high_card=True)]
      if V._alias_file("rxnorm_aliases.jsonl", limit=2) else []),
    # MusicBrainz search-hint aliases (CC0): community-recorded artist misspellings
    *([VocabField(["artist", "performer", "band", "composer"], "categorical",
                  V._cached("mbhint", lambda: V._alias_file("musicbrainz_hint_aliases.jsonl", limit=2000)),
                  max_card=5),
       VocabField(["artist", "performer"], "categorical",
                  V._cached("mbhint", lambda: V._alias_file("musicbrainz_hint_aliases.jsonl", limit=2000)),
                  min_card=25, max_card=60, high_card=True)]
      if V._alias_file("musicbrainz_hint_aliases.jsonl", limit=2) else []),
    VocabField(["industry", "sector", "vertical"], "categorical", V.industry_vocab(), max_card=4),
    # real Wikidata companies (alias -> canonical: 'AB InBev' -> 'Anheuser-Busch InBev')
    *([VocabField(["company", "vendor", "account", "supplier"], "categorical",
                  V.company_vocab(), max_card=5),
       VocabField(["company", "vendor", "account"], "categorical",
                  V.company_vocab(), min_card=25, max_card=60, high_card=True)]
      if V.company_vocab() else []),
    # real ROR organizations (alias/acronym -> canonical): both low-card and the
    # hospital-style high-cardinality long-tail regime. Skipped if harvest absent.
    *([VocabField(["organization", "institution", "affiliation", "employer"], "categorical",
                  V.org_vocab(), max_card=5),
       VocabField(["organization", "institution", "affiliation"], "categorical",
                  V.org_vocab(), min_card=25, max_card=60, high_card=True)]
      if V.org_vocab() else []),
    VocabField(["unit", "uom", "measure_unit"], "categorical", V.unit_vocab(), max_card=4),
    StatusField(),
    CurrencyField(), DateField(), BooleanField(), PhoneField(),
]