"""Field archetypes: clean-value generators + matched corruptors. Each archetype produces clean values in the SAME canonical/typed representation that scrubdata.executor outputs, and a `corrupt()` that dirties a clean column while returning the exact ground-truth column-operations. Designed so that executor(dirty, ops) == clean (verified downstream). """ from __future__ import annotations import random from . import vocab as V # ---- shared corruption helpers ---------------------------------------------- DISGUISED = ["N/A", "na", "-", "--", "null", "None", "?", "#N/A", "TBD", "empty", "(empty)", "n/a", "NULL", "none", "unknown"] def _add_whitespace(rng: random.Random, s: str) -> str: choice = rng.random() if choice < 0.4: return " " * rng.randint(1, 3) + s if choice < 0.7: return s + " " * rng.randint(1, 3) # doubled internal space parts = s.split(" ") if len(parts) > 1: i = rng.randrange(len(parts) - 1) parts[i] = parts[i] + " " return " ".join(parts) return " " + s + " " def _inject_disguised_nulls(rng: random.Random, values, clean, p=0.12): """Randomly turn some cells into disguised-null tokens; clean value = None.""" used = False out_dirty, out_clean = [], [] for d, c in zip(values, clean): if rng.random() < p: out_dirty.append(rng.choice(DISGUISED)) out_clean.append(None) used = True else: out_dirty.append(d) out_clean.append(c) return out_dirty, out_clean, used # ---- archetypes -------------------------------------------------------------- class Field: semantic_type = "text" names: list[str] = [] def gen_clean(self, rng: random.Random, n: int): raise NotImplementedError def corrupt(self, rng: random.Random, clean): """Return (dirty_values, clean_values, ops, issues).""" raise NotImplementedError class NameField(Field): semantic_type = "text" names = ["name", "full_name", "customer", "contact", "rep"] FIRST = ["Alice", "Bob", "Carol", "David", "Eve", "Frank", "Grace", "Heidi", "Ivan", "Judy", "Karl", "Lena", "Mona", "Omar", "Priya", "Sara"] LAST = ["Johnson", "Smith", "Diaz", "Lee", "Adams", "Moore", "Park", "Cruz", "Petrov", "Wong", "Brandt", "Fischer", "Ali", "Khan", "Novak", "Reyes", "O'Brien", "D'Angelo", "Saint-Clair", "Smith-Jones", "N'Diaye"] def gen_clean(self, rng, n): return [f"{rng.choice(self.FIRST)} {rng.choice(self.LAST)}" for _ in range(n)] def corrupt(self, rng, clean): dirty = [_add_whitespace(rng, c) if rng.random() < 0.5 else c for c in clean] ops = [{"op": "strip_whitespace", "rationale": "Trimmed leading/trailing and doubled spaces."}] issues = ["whitespace"] # high-cardinality regime: unicode punctuation artifacts (curly quotes, long # dashes, NBSP). Inverse of executor._PUNCT_MAP -> execution-verified. if rng.random() < 0.45: punct = False for i, v in enumerate(dirty): if rng.random() < 0.35: w = v.replace("'", "’").replace("-", "–") if " " in w and rng.random() < 0.3: k = w.rindex(" ") w = w[:k] + " " + w[k + 1:] if w != v: dirty[i] = w punct = True if punct: ops.append({"op": "normalize_punctuation", "rationale": "Normalized curly quotes / long dashes / " "NBSP artifacts to plain ASCII."}) issues.append("unicode_punctuation") return dirty, clean, ops, issues class CompanyField(NameField): names = ["company", "organization", "account", "employer"] POOL = ["Acme Inc", "Globex", "Initech", "Umbrella", "Soylent Corp", "Hooli", "Vehement", "Stark Industries", "Wonka Co", "Cyberdyne", "O'Reilly & Sons", "Day-Lewis Group", "L'Atelier Co"] def gen_clean(self, rng, n): return [rng.choice(self.POOL) for _ in range(n)] class EmailField(Field): semantic_type = "email" names = ["email", "email_address", "contact_email"] def gen_clean(self, rng, n): out = [] for _ in range(n): user = "".join(rng.choice("abcdefghijklmnop") for _ in range(rng.randint(4, 7))) dom = rng.choice(["example.com", "mail.com", "corp.io", "test.org"]) out.append(f"{user}@{dom}") return out def corrupt(self, rng, clean): dirty = [] for c in clean: v = c.upper() if rng.random() < 0.5 else c if rng.random() < 0.4: v = _add_whitespace(rng, v) dirty.append(v) ops = [{"op": "normalize_email", "rationale": "Lowercased and trimmed email addresses."}] return dirty, clean, ops, ["casing", "whitespace"] class VocabField(Field): """Categorical column backed by a real vocabulary (canonical -> aliases). LOW-card mode (default): draws a FEW canonicals (every surface shows in the sample). HIGH-card mode (high_card=True): draws MANY (min_card..max_card, e.g. 30..80) real canonicals with a DOMINANT-canonical long-tailed row distribution and single-char-substitution typos in the tail — replicating the hospital birmingham(75) + birminghxm(1) regime. Both corrupt() and record surface-> canonical so canonicalize_categories recovers the clean value (self-verified).""" def __init__(self, names, semantic_type, entries, max_card=5, min_card=2, high_card=False, typo_p=0.13): self.names = names self.semantic_type = semantic_type self.entries = entries self._canonicals = list(entries) self.max_card = max_card self.min_card = min_card self.high_card = high_card self.typo_p = typo_p def _choose(self, rng): lo = max(2, min(self.min_card, len(self._canonicals))) hi = min(self.max_card, len(self._canonicals)) k = rng.randint(min(lo, hi), hi) return rng.sample(self._canonicals, k) def _gen_rows(self, rng, n): """Long-tailed row draw: a few dominant canonicals carry most of the mass, the rest form a sparse tail (where typo surfaces land as rare singletons). Falls back to uniform for low-card columns.""" chosen = self._chosen if not self.high_card or len(chosen) < 6: return [rng.choice(chosen) for _ in range(n)] # Zipf-like weights: a couple of dominant values, steeply decaying tail. order = list(chosen) rng.shuffle(order) weights = [1.0 / ((i + 1) ** 1.6) for i in range(len(order))] # Boost the single top canonical so a clear dominant emerges (birmingham 75). weights[0] *= 3.0 return rng.choices(order, weights=weights, k=n) def gen_clean(self, rng, n): self._chosen = self._choose(rng) return self._gen_rows(rng, n) def _surface_for(self, rng, c, force_typo): """One dirty surface for canonical c. force_typo guarantees a single-char substitution typo (rare-tail birminghxm regime).""" aliases = self.entries.get(c, []) if force_typo: s = V.make_substitution_typo(rng, c) return s return V.make_surface(rng, c, aliases, typo_p=self.typo_p) def corrupt(self, rng, clean): # Decide which canonicals get a guaranteed single-char typo surface (high-card # only): a controlled fraction of the present canonicals, applied to ONE of # their occurrences so it lands as a rare tail singleton. present = list(dict.fromkeys(clean)) forced_typo_canon = set() if self.high_card: frac = rng.uniform(0.3, 0.6) k = max(1, int(len(present) * frac)) forced_typo_canon = set(rng.sample(present, min(k, len(present)))) # Reserve, per forced canonical, exactly one row index to carry the typo. forced_slot = {} if forced_typo_canon: for canon in forced_typo_canon: idxs = [i for i, c in enumerate(clean) if c == canon] if idxs: forced_slot[rng.choice(idxs)] = canon # Build mapping collision-safely: a surface may only map to ONE canonical, and # a surface that equals some canonical's clean form must not be remapped. # Reserve all clean canonical strings as "do not remap" keys. reserved = {str(c).strip() for c in present} mapping = {} dirty, ws = [], False for i, c in enumerate(clean): force = i in forced_slot for _attempt in range(4): s = self._surface_for(rng, c, force_typo=force) key = str(s).strip() if key == str(c).strip(): break # already canonical surface, no mapping needed # Skip surfaces that collide with another canonical, or that some # other canonical already claims (would make the mapping ambiguous). if key in reserved: s = c # ambiguous -> fall back to clean (still verifies) break if key in mapping and mapping[key] != c: s = c # collision with a different canonical's surface break break key = str(s).strip() if key != str(c).strip() and key not in reserved: mapping[key] = c cell = s # whitespace noise (less often on high-card to keep the tail clean) if rng.random() < (0.12 if self.high_card else 0.25): cell = _add_whitespace(rng, s) ws = True dirty.append(cell) ops, issues = [], ["inconsistent_categories", "casing"] if ws: # strip first so canonicalize sees the bare surface (executor order) ops.append({"op": "strip_whitespace", "rationale": "Trimmed surrounding/doubled spaces."}) issues.append("whitespace") if mapping: ops.append({"op": "canonicalize_categories", "mapping": mapping, "rationale": f"Unified {len(mapping)} variant spelling(s) " f"into canonical labels."}) return dirty, clean, ops, issues class StatusField(VocabField): """Like VocabField but picks a fresh status/category value-set each example.""" def __init__(self): super().__init__( names=["status", "stage", "tier", "segment", "state", "payment_status"], semantic_type="categorical", entries={}, max_card=4) def gen_clean(self, rng, n): self.entries = rng.choice(V._STATUS_SETS) self._canonicals = list(self.entries) self._chosen = self._choose(rng) return self._gen_rows(rng, n) class CurrencyField(Field): semantic_type = "currency" names = ["amount", "revenue", "price", "deal_size", "cost"] def gen_clean(self, rng, n): return [round(rng.uniform(50, 9000), 2) for _ in range(n)] def _fmt(self, rng, x: float) -> str: neg = x < 0 a = abs(x) style = rng.random() if style < 0.4: s = f"${a:,.2f}" elif style < 0.7 and a == int(a): s = f"{int(a):,d}" # grouped integer — only when no cents to lose else: # EU style (comma decimal) — always preserves 2 decimals s = f"{a:,.2f}".replace(",", "X").replace(".", ",").replace("X", ".") return f"({s})" if neg else s def corrupt(self, rng, clean): dirty = [self._fmt(rng, c) for c in clean] dirty, clean2, used_null = _inject_disguised_nulls(rng, dirty, clean) ops, issues = [], ["numeric_stored_as_text", "currency_symbols"] if used_null: ops.append({"op": "normalize_disguised_nulls", "rationale": "Converted N/A, '-', 'null' etc. to true missing."}) issues.append("disguised_nulls") ops.append({"op": "parse_currency", "rationale": "Stripped currency symbols/grouping; parsed to number."}) return dirty, clean2, ops, issues class DateField(Field): semantic_type = "date" names = ["signup_date", "created_at", "close_date", "date", "order_date"] def gen_clean(self, rng, n): out = [] for _ in range(n): y, m, d = 2023, rng.randint(1, 12), rng.randint(1, 28) out.append(f"{y:04d}-{m:02d}-{d:02d}") return out def _fmt(self, rng, iso: str) -> str: y, m, d = iso.split("-") months = ["Jan", "Feb", "Mar", "Apr", "May", "Jun", "Jul", "Aug", "Sep", "Oct", "Nov", "Dec"] style = rng.random() if style < 0.3: return iso if style < 0.55: return f"{int(m)}/{int(d)}/{y}" # US slash (m<=12, d<=28 -> unambiguous-ish) if style < 0.8: return f"{int(d)} {months[int(m)-1]} {y}" # 5 Jan 2023 # Excel serial import datetime base = datetime.date(1899, 12, 30) serial = (datetime.date(int(y), int(m), int(d)) - base).days return str(serial) def corrupt(self, rng, clean): dirty = [self._fmt(rng, c) for c in clean] ops = [{"op": "parse_date", "rationale": "Unified mixed date formats to ISO YYYY-MM-DD."}] return dirty, clean, ops, ["mixed_date_formats"] class BooleanField(Field): semantic_type = "boolean" names = ["is_active", "subscribed", "verified", "opted_in"] TRUE = ["Yes", "Y", "TRUE", "true", "1", "T"] FALSE = ["No", "N", "FALSE", "false", "0", "F"] def gen_clean(self, rng, n): return [rng.random() < 0.5 for _ in range(n)] def corrupt(self, rng, clean): dirty = [rng.choice(self.TRUE if c else self.FALSE) for c in clean] ops = [{"op": "standardize_boolean", "rationale": "Mapped Yes/Y/1/TRUE → true, No/N/0/FALSE → false."}] return dirty, clean, ops, ["inconsistent_booleans"] class PhoneField(Field): semantic_type = "phone" names = ["phone", "phone_number", "mobile", "contact_number"] def gen_clean(self, rng, n): # Canonical = executor's output for a plain 10-digit US number. out, self._digits = [], [] for _ in range(n): d = "".join(str(rng.randint(0, 9)) for _ in range(10)) d = "5" + d[1:] # keep it phone-ish self._digits.append(d) out.append(f"({d[0:3]}) {d[3:6]}-{d[6:]}") return out def corrupt(self, rng, clean): dirty = [] for d in self._digits: style = rng.random() if style < 0.25: dirty.append(f"{d[0:3]}.{d[3:6]}.{d[6:]}") elif style < 0.5: dirty.append(f"{d[0:3]}-{d[3:6]}-{d[6:]}") elif style < 0.75: dirty.append(d) else: dirty.append(f"({d[0:3]}){d[3:6]}-{d[6:]}") ops = [{"op": "standardize_phone", "rationale": "Standardized phone formatting."}] return dirty, clean, ops, ["inconsistent_formats"] class PercentField(Field): semantic_type = "percent" names = ["rate", "discount", "completion", "margin", "growth", "conversion"] def gen_clean(self, rng, n): self._pct = [round(rng.uniform(0, 100), 1) for _ in range(n)] return [p / 100 for p in self._pct] def corrupt(self, rng, clean): dirty = [f"{p}%" for p in self._pct] ops = [{"op": "parse_percent", "rationale": "Parsed percent text to a fraction."}] return dirty, clean, ops, ["numeric_stored_as_text"] ARCHETYPES: list[Field] = [ NameField(), CompanyField(), EmailField(), PercentField(), VocabField(["country", "nation", "country_name"], "country", V.country_vocab(), max_card=5), VocabField(["state", "province", "region"], "state", V.state_vocab(), max_card=5), VocabField(["currency", "currency_code", "ccy"], "categorical", V.currency_vocab(), max_card=4), VocabField(["city", "location", "hq_city"], "city", V.city_vocab(), max_card=5), VocabField(["department", "dept", "team"], "categorical", V.department_vocab(), max_card=4), VocabField(["job_title", "title", "role", "position"], "categorical", V.job_title_vocab(), max_card=4), # real O*NET occupations (alternate title -> canonical, CC BY 4.0): 1,016 canonicals *([VocabField(["job_title", "occupation", "role"], "categorical", V._cached("onet", lambda: V._alias_file("onet_jobtitle_aliases.jsonl", limit=1016)), max_card=5), VocabField(["job_title", "occupation"], "categorical", V._cached("onet", lambda: V._alias_file("onet_jobtitle_aliases.jsonl", limit=1016)), min_card=25, max_card=60, high_card=True)] if V._alias_file("onet_jobtitle_aliases.jsonl", limit=2) else []), # real nickname->formal first names (Bill -> William; Apache-2.0) *([VocabField(["first_name", "given_name", "contact_first"], "categorical", V.nickname_vocab(), max_card=5), VocabField(["first_name", "given_name"], "categorical", V.nickname_vocab(), min_card=25, max_card=60, high_card=True)] if V.nickname_vocab() else []), # ToughTables gold-anchored entity misspellings (SemTab 2T, CC-BY-4.0): 49.6k real # variant aliases across people/films/places — the grouped-entity regime *([VocabField(["name", "entity", "person", "artist"], "categorical", V._cached("tt", lambda: V._alias_file("toughtables_aliases.jsonl", limit=3000)), max_card=5), VocabField(["name", "entity", "player"], "categorical", V._cached("tt", lambda: V._alias_file("toughtables_aliases.jsonl", limit=3000)), min_card=25, max_card=60, high_card=True)] if V._alias_file("toughtables_aliases.jsonl", limit=2) else []), # RxNorm prescribable drugs (public domain): synonym/TTY variants -> ingredient *([VocabField(["drug", "medication", "drug_name", "prescription"], "categorical", V._cached("rxnorm", lambda: V._alias_file("rxnorm_aliases.jsonl", limit=1500)), max_card=5), VocabField(["drug", "medication"], "categorical", V._cached("rxnorm", lambda: V._alias_file("rxnorm_aliases.jsonl", limit=1500)), min_card=25, max_card=60, high_card=True)] if V._alias_file("rxnorm_aliases.jsonl", limit=2) else []), # MusicBrainz search-hint aliases (CC0): community-recorded artist misspellings *([VocabField(["artist", "performer", "band", "composer"], "categorical", V._cached("mbhint", lambda: V._alias_file("musicbrainz_hint_aliases.jsonl", limit=2000)), max_card=5), VocabField(["artist", "performer"], "categorical", V._cached("mbhint", lambda: V._alias_file("musicbrainz_hint_aliases.jsonl", limit=2000)), min_card=25, max_card=60, high_card=True)] if V._alias_file("musicbrainz_hint_aliases.jsonl", limit=2) else []), VocabField(["industry", "sector", "vertical"], "categorical", V.industry_vocab(), max_card=4), # real Wikidata companies (alias -> canonical: 'AB InBev' -> 'Anheuser-Busch InBev') *([VocabField(["company", "vendor", "account", "supplier"], "categorical", V.company_vocab(), max_card=5), VocabField(["company", "vendor", "account"], "categorical", V.company_vocab(), min_card=25, max_card=60, high_card=True)] if V.company_vocab() else []), # real ROR organizations (alias/acronym -> canonical): both low-card and the # hospital-style high-cardinality long-tail regime. Skipped if harvest absent. *([VocabField(["organization", "institution", "affiliation", "employer"], "categorical", V.org_vocab(), max_card=5), VocabField(["organization", "institution", "affiliation"], "categorical", V.org_vocab(), min_card=25, max_card=60, high_card=True)] if V.org_vocab() else []), VocabField(["unit", "uom", "measure_unit"], "categorical", V.unit_vocab(), max_card=4), StatusField(), CurrencyField(), DateField(), BooleanField(), PhoneField(), ]