scrubdata / training /fields.py
OpenAI Codex
deploy: add sponsor:openai tag (Best Use of Codex) + Codex-hardened build
16dc556
Raw
History Blame Contribute Delete
20.7 kB
"""Field archetypes: clean-value generators + matched corruptors.
Each archetype produces clean values in the SAME canonical/typed representation
that scrubdata.executor outputs, and a `corrupt()` that dirties a clean column
while returning the exact ground-truth column-operations. Designed so that
executor(dirty, ops) == clean (verified downstream).
"""
from __future__ import annotations
import random
from . import vocab as V
# ---- shared corruption helpers ----------------------------------------------
DISGUISED = ["N/A", "na", "-", "--", "null", "None", "?", "#N/A", "TBD",
"empty", "(empty)", "n/a", "NULL", "none", "unknown"]
def _add_whitespace(rng: random.Random, s: str) -> str:
choice = rng.random()
if choice < 0.4:
return " " * rng.randint(1, 3) + s
if choice < 0.7:
return s + " " * rng.randint(1, 3)
# doubled internal space
parts = s.split(" ")
if len(parts) > 1:
i = rng.randrange(len(parts) - 1)
parts[i] = parts[i] + " "
return " ".join(parts)
return " " + s + " "
def _inject_disguised_nulls(rng: random.Random, values, clean, p=0.12):
"""Randomly turn some cells into disguised-null tokens; clean value = None."""
used = False
out_dirty, out_clean = [], []
for d, c in zip(values, clean):
if rng.random() < p:
out_dirty.append(rng.choice(DISGUISED))
out_clean.append(None)
used = True
else:
out_dirty.append(d)
out_clean.append(c)
return out_dirty, out_clean, used
# ---- archetypes --------------------------------------------------------------
class Field:
semantic_type = "text"
names: list[str] = []
def gen_clean(self, rng: random.Random, n: int):
raise NotImplementedError
def corrupt(self, rng: random.Random, clean):
"""Return (dirty_values, clean_values, ops, issues)."""
raise NotImplementedError
class NameField(Field):
semantic_type = "text"
names = ["name", "full_name", "customer", "contact", "rep"]
FIRST = ["Alice", "Bob", "Carol", "David", "Eve", "Frank", "Grace", "Heidi",
"Ivan", "Judy", "Karl", "Lena", "Mona", "Omar", "Priya", "Sara"]
LAST = ["Johnson", "Smith", "Diaz", "Lee", "Adams", "Moore", "Park", "Cruz",
"Petrov", "Wong", "Brandt", "Fischer", "Ali", "Khan", "Novak", "Reyes",
"O'Brien", "D'Angelo", "Saint-Clair", "Smith-Jones", "N'Diaye"]
def gen_clean(self, rng, n):
return [f"{rng.choice(self.FIRST)} {rng.choice(self.LAST)}" for _ in range(n)]
def corrupt(self, rng, clean):
dirty = [_add_whitespace(rng, c) if rng.random() < 0.5 else c for c in clean]
ops = [{"op": "strip_whitespace",
"rationale": "Trimmed leading/trailing and doubled spaces."}]
issues = ["whitespace"]
# high-cardinality regime: unicode punctuation artifacts (curly quotes, long
# dashes, NBSP). Inverse of executor._PUNCT_MAP -> execution-verified.
if rng.random() < 0.45:
punct = False
for i, v in enumerate(dirty):
if rng.random() < 0.35:
w = v.replace("'", "’").replace("-", "–")
if " " in w and rng.random() < 0.3:
k = w.rindex(" ")
w = w[:k] + " " + w[k + 1:]
if w != v:
dirty[i] = w
punct = True
if punct:
ops.append({"op": "normalize_punctuation",
"rationale": "Normalized curly quotes / long dashes / "
"NBSP artifacts to plain ASCII."})
issues.append("unicode_punctuation")
return dirty, clean, ops, issues
class CompanyField(NameField):
names = ["company", "organization", "account", "employer"]
POOL = ["Acme Inc", "Globex", "Initech", "Umbrella", "Soylent Corp", "Hooli",
"Vehement", "Stark Industries", "Wonka Co", "Cyberdyne",
"O'Reilly & Sons", "Day-Lewis Group", "L'Atelier Co"]
def gen_clean(self, rng, n):
return [rng.choice(self.POOL) for _ in range(n)]
class EmailField(Field):
semantic_type = "email"
names = ["email", "email_address", "contact_email"]
def gen_clean(self, rng, n):
out = []
for _ in range(n):
user = "".join(rng.choice("abcdefghijklmnop") for _ in range(rng.randint(4, 7)))
dom = rng.choice(["example.com", "mail.com", "corp.io", "test.org"])
out.append(f"{user}@{dom}")
return out
def corrupt(self, rng, clean):
dirty = []
for c in clean:
v = c.upper() if rng.random() < 0.5 else c
if rng.random() < 0.4:
v = _add_whitespace(rng, v)
dirty.append(v)
ops = [{"op": "normalize_email",
"rationale": "Lowercased and trimmed email addresses."}]
return dirty, clean, ops, ["casing", "whitespace"]
class VocabField(Field):
"""Categorical column backed by a real vocabulary (canonical -> aliases).
LOW-card mode (default): draws a FEW canonicals (every surface shows in the
sample). HIGH-card mode (high_card=True): draws MANY (min_card..max_card, e.g.
30..80) real canonicals with a DOMINANT-canonical long-tailed row distribution
and single-char-substitution typos in the tail — replicating the hospital
birmingham(75) + birminghxm(1) regime. Both corrupt() and record surface->
canonical so canonicalize_categories recovers the clean value (self-verified)."""
def __init__(self, names, semantic_type, entries, max_card=5, min_card=2,
high_card=False, typo_p=0.13):
self.names = names
self.semantic_type = semantic_type
self.entries = entries
self._canonicals = list(entries)
self.max_card = max_card
self.min_card = min_card
self.high_card = high_card
self.typo_p = typo_p
def _choose(self, rng):
lo = max(2, min(self.min_card, len(self._canonicals)))
hi = min(self.max_card, len(self._canonicals))
k = rng.randint(min(lo, hi), hi)
return rng.sample(self._canonicals, k)
def _gen_rows(self, rng, n):
"""Long-tailed row draw: a few dominant canonicals carry most of the mass,
the rest form a sparse tail (where typo surfaces land as rare singletons).
Falls back to uniform for low-card columns."""
chosen = self._chosen
if not self.high_card or len(chosen) < 6:
return [rng.choice(chosen) for _ in range(n)]
# Zipf-like weights: a couple of dominant values, steeply decaying tail.
order = list(chosen)
rng.shuffle(order)
weights = [1.0 / ((i + 1) ** 1.6) for i in range(len(order))]
# Boost the single top canonical so a clear dominant emerges (birmingham 75).
weights[0] *= 3.0
return rng.choices(order, weights=weights, k=n)
def gen_clean(self, rng, n):
self._chosen = self._choose(rng)
return self._gen_rows(rng, n)
def _surface_for(self, rng, c, force_typo):
"""One dirty surface for canonical c. force_typo guarantees a single-char
substitution typo (rare-tail birminghxm regime)."""
aliases = self.entries.get(c, [])
if force_typo:
s = V.make_substitution_typo(rng, c)
return s
return V.make_surface(rng, c, aliases, typo_p=self.typo_p)
def corrupt(self, rng, clean):
# Decide which canonicals get a guaranteed single-char typo surface (high-card
# only): a controlled fraction of the present canonicals, applied to ONE of
# their occurrences so it lands as a rare tail singleton.
present = list(dict.fromkeys(clean))
forced_typo_canon = set()
if self.high_card:
frac = rng.uniform(0.3, 0.6)
k = max(1, int(len(present) * frac))
forced_typo_canon = set(rng.sample(present, min(k, len(present))))
# Reserve, per forced canonical, exactly one row index to carry the typo.
forced_slot = {}
if forced_typo_canon:
for canon in forced_typo_canon:
idxs = [i for i, c in enumerate(clean) if c == canon]
if idxs:
forced_slot[rng.choice(idxs)] = canon
# Build mapping collision-safely: a surface may only map to ONE canonical, and
# a surface that equals some canonical's clean form must not be remapped.
# Reserve all clean canonical strings as "do not remap" keys.
reserved = {str(c).strip() for c in present}
mapping = {}
dirty, ws = [], False
for i, c in enumerate(clean):
force = i in forced_slot
for _attempt in range(4):
s = self._surface_for(rng, c, force_typo=force)
key = str(s).strip()
if key == str(c).strip():
break # already canonical surface, no mapping needed
# Skip surfaces that collide with another canonical, or that some
# other canonical already claims (would make the mapping ambiguous).
if key in reserved:
s = c # ambiguous -> fall back to clean (still verifies)
break
if key in mapping and mapping[key] != c:
s = c # collision with a different canonical's surface
break
break
key = str(s).strip()
if key != str(c).strip() and key not in reserved:
mapping[key] = c
cell = s
# whitespace noise (less often on high-card to keep the tail clean)
if rng.random() < (0.12 if self.high_card else 0.25):
cell = _add_whitespace(rng, s)
ws = True
dirty.append(cell)
ops, issues = [], ["inconsistent_categories", "casing"]
if ws: # strip first so canonicalize sees the bare surface (executor order)
ops.append({"op": "strip_whitespace",
"rationale": "Trimmed surrounding/doubled spaces."})
issues.append("whitespace")
if mapping:
ops.append({"op": "canonicalize_categories", "mapping": mapping,
"rationale": f"Unified {len(mapping)} variant spelling(s) "
f"into canonical labels."})
return dirty, clean, ops, issues
class StatusField(VocabField):
"""Like VocabField but picks a fresh status/category value-set each example."""
def __init__(self):
super().__init__(
names=["status", "stage", "tier", "segment", "state", "payment_status"],
semantic_type="categorical", entries={}, max_card=4)
def gen_clean(self, rng, n):
self.entries = rng.choice(V._STATUS_SETS)
self._canonicals = list(self.entries)
self._chosen = self._choose(rng)
return self._gen_rows(rng, n)
class CurrencyField(Field):
semantic_type = "currency"
names = ["amount", "revenue", "price", "deal_size", "cost"]
def gen_clean(self, rng, n):
return [round(rng.uniform(50, 9000), 2) for _ in range(n)]
def _fmt(self, rng, x: float) -> str:
neg = x < 0
a = abs(x)
style = rng.random()
if style < 0.4:
s = f"${a:,.2f}"
elif style < 0.7 and a == int(a):
s = f"{int(a):,d}" # grouped integer — only when no cents to lose
else: # EU style (comma decimal) — always preserves 2 decimals
s = f"{a:,.2f}".replace(",", "X").replace(".", ",").replace("X", ".")
return f"({s})" if neg else s
def corrupt(self, rng, clean):
dirty = [self._fmt(rng, c) for c in clean]
dirty, clean2, used_null = _inject_disguised_nulls(rng, dirty, clean)
ops, issues = [], ["numeric_stored_as_text", "currency_symbols"]
if used_null:
ops.append({"op": "normalize_disguised_nulls",
"rationale": "Converted N/A, '-', 'null' etc. to true missing."})
issues.append("disguised_nulls")
ops.append({"op": "parse_currency",
"rationale": "Stripped currency symbols/grouping; parsed to number."})
return dirty, clean2, ops, issues
class DateField(Field):
semantic_type = "date"
names = ["signup_date", "created_at", "close_date", "date", "order_date"]
def gen_clean(self, rng, n):
out = []
for _ in range(n):
y, m, d = 2023, rng.randint(1, 12), rng.randint(1, 28)
out.append(f"{y:04d}-{m:02d}-{d:02d}")
return out
def _fmt(self, rng, iso: str) -> str:
y, m, d = iso.split("-")
months = ["Jan", "Feb", "Mar", "Apr", "May", "Jun", "Jul", "Aug",
"Sep", "Oct", "Nov", "Dec"]
style = rng.random()
if style < 0.3:
return iso
if style < 0.55:
return f"{int(m)}/{int(d)}/{y}" # US slash (m<=12, d<=28 -> unambiguous-ish)
if style < 0.8:
return f"{int(d)} {months[int(m)-1]} {y}" # 5 Jan 2023
# Excel serial
import datetime
base = datetime.date(1899, 12, 30)
serial = (datetime.date(int(y), int(m), int(d)) - base).days
return str(serial)
def corrupt(self, rng, clean):
dirty = [self._fmt(rng, c) for c in clean]
ops = [{"op": "parse_date",
"rationale": "Unified mixed date formats to ISO YYYY-MM-DD."}]
return dirty, clean, ops, ["mixed_date_formats"]
class BooleanField(Field):
semantic_type = "boolean"
names = ["is_active", "subscribed", "verified", "opted_in"]
TRUE = ["Yes", "Y", "TRUE", "true", "1", "T"]
FALSE = ["No", "N", "FALSE", "false", "0", "F"]
def gen_clean(self, rng, n):
return [rng.random() < 0.5 for _ in range(n)]
def corrupt(self, rng, clean):
dirty = [rng.choice(self.TRUE if c else self.FALSE) for c in clean]
ops = [{"op": "standardize_boolean",
"rationale": "Mapped Yes/Y/1/TRUE → true, No/N/0/FALSE → false."}]
return dirty, clean, ops, ["inconsistent_booleans"]
class PhoneField(Field):
semantic_type = "phone"
names = ["phone", "phone_number", "mobile", "contact_number"]
def gen_clean(self, rng, n):
# Canonical = executor's output for a plain 10-digit US number.
out, self._digits = [], []
for _ in range(n):
d = "".join(str(rng.randint(0, 9)) for _ in range(10))
d = "5" + d[1:] # keep it phone-ish
self._digits.append(d)
out.append(f"({d[0:3]}) {d[3:6]}-{d[6:]}")
return out
def corrupt(self, rng, clean):
dirty = []
for d in self._digits:
style = rng.random()
if style < 0.25:
dirty.append(f"{d[0:3]}.{d[3:6]}.{d[6:]}")
elif style < 0.5:
dirty.append(f"{d[0:3]}-{d[3:6]}-{d[6:]}")
elif style < 0.75:
dirty.append(d)
else:
dirty.append(f"({d[0:3]}){d[3:6]}-{d[6:]}")
ops = [{"op": "standardize_phone",
"rationale": "Standardized phone formatting."}]
return dirty, clean, ops, ["inconsistent_formats"]
class PercentField(Field):
semantic_type = "percent"
names = ["rate", "discount", "completion", "margin", "growth", "conversion"]
def gen_clean(self, rng, n):
self._pct = [round(rng.uniform(0, 100), 1) for _ in range(n)]
return [p / 100 for p in self._pct]
def corrupt(self, rng, clean):
dirty = [f"{p}%" for p in self._pct]
ops = [{"op": "parse_percent", "rationale": "Parsed percent text to a fraction."}]
return dirty, clean, ops, ["numeric_stored_as_text"]
ARCHETYPES: list[Field] = [
NameField(), CompanyField(), EmailField(), PercentField(),
VocabField(["country", "nation", "country_name"], "country", V.country_vocab(), max_card=5),
VocabField(["state", "province", "region"], "state", V.state_vocab(), max_card=5),
VocabField(["currency", "currency_code", "ccy"], "categorical", V.currency_vocab(), max_card=4),
VocabField(["city", "location", "hq_city"], "city", V.city_vocab(), max_card=5),
VocabField(["department", "dept", "team"], "categorical", V.department_vocab(), max_card=4),
VocabField(["job_title", "title", "role", "position"], "categorical", V.job_title_vocab(), max_card=4),
# real O*NET occupations (alternate title -> canonical, CC BY 4.0): 1,016 canonicals
*([VocabField(["job_title", "occupation", "role"], "categorical",
V._cached("onet", lambda: V._alias_file("onet_jobtitle_aliases.jsonl", limit=1016)),
max_card=5),
VocabField(["job_title", "occupation"], "categorical",
V._cached("onet", lambda: V._alias_file("onet_jobtitle_aliases.jsonl", limit=1016)),
min_card=25, max_card=60, high_card=True)]
if V._alias_file("onet_jobtitle_aliases.jsonl", limit=2) else []),
# real nickname->formal first names (Bill -> William; Apache-2.0)
*([VocabField(["first_name", "given_name", "contact_first"], "categorical",
V.nickname_vocab(), max_card=5),
VocabField(["first_name", "given_name"], "categorical",
V.nickname_vocab(), min_card=25, max_card=60, high_card=True)]
if V.nickname_vocab() else []),
# ToughTables gold-anchored entity misspellings (SemTab 2T, CC-BY-4.0): 49.6k real
# variant aliases across people/films/places — the grouped-entity regime
*([VocabField(["name", "entity", "person", "artist"], "categorical",
V._cached("tt", lambda: V._alias_file("toughtables_aliases.jsonl", limit=3000)),
max_card=5),
VocabField(["name", "entity", "player"], "categorical",
V._cached("tt", lambda: V._alias_file("toughtables_aliases.jsonl", limit=3000)),
min_card=25, max_card=60, high_card=True)]
if V._alias_file("toughtables_aliases.jsonl", limit=2) else []),
# RxNorm prescribable drugs (public domain): synonym/TTY variants -> ingredient
*([VocabField(["drug", "medication", "drug_name", "prescription"], "categorical",
V._cached("rxnorm", lambda: V._alias_file("rxnorm_aliases.jsonl", limit=1500)),
max_card=5),
VocabField(["drug", "medication"], "categorical",
V._cached("rxnorm", lambda: V._alias_file("rxnorm_aliases.jsonl", limit=1500)),
min_card=25, max_card=60, high_card=True)]
if V._alias_file("rxnorm_aliases.jsonl", limit=2) else []),
# MusicBrainz search-hint aliases (CC0): community-recorded artist misspellings
*([VocabField(["artist", "performer", "band", "composer"], "categorical",
V._cached("mbhint", lambda: V._alias_file("musicbrainz_hint_aliases.jsonl", limit=2000)),
max_card=5),
VocabField(["artist", "performer"], "categorical",
V._cached("mbhint", lambda: V._alias_file("musicbrainz_hint_aliases.jsonl", limit=2000)),
min_card=25, max_card=60, high_card=True)]
if V._alias_file("musicbrainz_hint_aliases.jsonl", limit=2) else []),
VocabField(["industry", "sector", "vertical"], "categorical", V.industry_vocab(), max_card=4),
# real Wikidata companies (alias -> canonical: 'AB InBev' -> 'Anheuser-Busch InBev')
*([VocabField(["company", "vendor", "account", "supplier"], "categorical",
V.company_vocab(), max_card=5),
VocabField(["company", "vendor", "account"], "categorical",
V.company_vocab(), min_card=25, max_card=60, high_card=True)]
if V.company_vocab() else []),
# real ROR organizations (alias/acronym -> canonical): both low-card and the
# hospital-style high-cardinality long-tail regime. Skipped if harvest absent.
*([VocabField(["organization", "institution", "affiliation", "employer"], "categorical",
V.org_vocab(), max_card=5),
VocabField(["organization", "institution", "affiliation"], "categorical",
V.org_vocab(), min_card=25, max_card=60, high_card=True)]
if V.org_vocab() else []),
VocabField(["unit", "uom", "measure_unit"], "categorical", V.unit_vocab(), max_card=4),
StatusField(),
CurrencyField(), DateField(), BooleanField(), PhoneField(),
]