Spaces:
Sleeping
Sleeping
| """Seed-based deterministic data corruption pipeline for data cleaning OpenEnv.""" | |
| from __future__ import annotations | |
| import copy | |
| import random | |
| import uuid | |
| from typing import Any, Optional | |
| # Name variant dictionary for duplicate noise | |
| NAME_VARIANTS: dict[str, list[str]] = { | |
| "Robert": ["Rob", "Bob"], | |
| "William": ["Will", "Bill", "Wm"], | |
| "Elizabeth": ["Liz", "Beth"], | |
| "Jennifer": ["Jen", "Jenny"], | |
| "Michael": ["Mike"], | |
| "James": ["Jim"], | |
| "Katherine": ["Kate", "Kathy"], | |
| "Richard": ["Rick", "Rich"], | |
| } | |
| # Visually similar character substitutions | |
| VISUAL_SIMILAR: dict[str, str] = { | |
| "o": "0", | |
| "O": "0", | |
| "l": "1", | |
| "I": "1", | |
| "s": "5", | |
| "S": "5", | |
| "z": "2", | |
| "Z": "2", | |
| "g": "9", | |
| "B": "8", | |
| } | |
| # Date format pool for format randomization | |
| DATE_FORMATS: list[str] = [ | |
| "%m/%d/%Y", | |
| "%d/%m/%Y", | |
| "%Y-%m-%d", | |
| "%B %d, %Y", | |
| "%b %d, %Y", | |
| "%d %B %Y", | |
| "%m-%d-%Y", | |
| "%Y/%m/%d", | |
| "%d.%m.%Y", | |
| "%B %d %Y", | |
| ] | |
| # State abbreviation to full name mapping | |
| STATE_ABBREVIATIONS: dict[str, str] = { | |
| "AL": "Alabama", "AK": "Alaska", "AZ": "Arizona", "AR": "Arkansas", | |
| "CA": "California", "CO": "Colorado", "CT": "Connecticut", "DE": "Delaware", | |
| "FL": "Florida", "GA": "Georgia", "HI": "Hawaii", "ID": "Idaho", | |
| "IL": "Illinois", "IN": "Indiana", "IA": "Iowa", "KS": "Kansas", | |
| "KY": "Kentucky", "LA": "Louisiana", "ME": "Maine", "MD": "Maryland", | |
| "MA": "Massachusetts", "MI": "Michigan", "MN": "Minnesota", "MS": "Mississippi", | |
| "MO": "Missouri", "MT": "Montana", "NE": "Nebraska", "NV": "Nevada", | |
| "NH": "New Hampshire", "NJ": "New Jersey", "NM": "New Mexico", "NY": "New York", | |
| "NC": "North Carolina", "ND": "North Dakota", "OH": "Ohio", "OK": "Oklahoma", | |
| "OR": "Oregon", "PA": "Pennsylvania", "RI": "Rhode Island", "SC": "South Carolina", | |
| "SD": "South Dakota", "TN": "Tennessee", "TX": "Texas", "UT": "Utah", | |
| "VT": "Vermont", "VA": "Virginia", "WA": "Washington", "WV": "West Virginia", | |
| "WI": "Wisconsin", "WY": "Wyoming", "DC": "District of Columbia", | |
| } | |
| # Value variation mappings | |
| VALUE_VARIATIONS: dict[str, list[str]] = { | |
| "Engineering": ["Eng", "Eng.", "ENGINEERING", "engineering"], | |
| "Marketing": ["Mktg", "Mktg.", "MARKETING", "marketing"], | |
| "Human Resources": ["HR", "H.R.", "HumanResources"], | |
| "Information Technology": ["IT", "I.T.", "InfoTech"], | |
| "Finance": ["Fin", "Fin.", "FINANCE", "finance"], | |
| "Sales": ["Sls", "SALES", "sales"], | |
| "Operations": ["Ops", "OPERATIONS", "operations"], | |
| "Research": ["R&D", "Res", "RESEARCH"], | |
| "Accounting": ["Acct", "Acct.", "ACCOUNTING"], | |
| "Management": ["Mgmt", "Mgmt.", "MANAGEMENT"], | |
| } | |
| # Address abbreviation mappings | |
| ADDRESS_ABBREVIATIONS: dict[str, list[str]] = { | |
| "Street": ["St.", "St", "Str."], | |
| "Avenue": ["Ave.", "Ave", "Av."], | |
| "Boulevard": ["Blvd.", "Blvd"], | |
| "Drive": ["Dr.", "Dr"], | |
| "Lane": ["Ln.", "Ln"], | |
| "Road": ["Rd.", "Rd"], | |
| "Court": ["Ct.", "Ct"], | |
| "Place": ["Pl.", "Pl"], | |
| "Apartment": ["Apt.", "Apt", "Apt #"], | |
| "Suite": ["Ste.", "Ste", "Suite #"], | |
| "Building": ["Bldg.", "Bldg"], | |
| "Floor": ["Fl.", "Fl"], | |
| "North": ["N.", "N"], | |
| "South": ["S.", "S"], | |
| "East": ["E.", "E"], | |
| "West": ["W.", "W"], | |
| } | |
| def _make_row_id(rng: random.Random) -> str: | |
| """Generate a deterministic UUID-like row id.""" | |
| return str(uuid.UUID(int=rng.getrandbits(128), version=4)) | |
| class DataCorruptor: | |
| """Deterministic data corruption pipeline. | |
| Uses a seeded random.Random instance so the same seed always | |
| produces the same corrupted output. | |
| Each handler supports TWO spec formats: | |
| 1. **Explicit targeting** (primary): specific row indices, fields, | |
| and values are named in the spec. | |
| 2. **Generic/probabilistic** (fallback): columns + probability, | |
| matching the legacy format. | |
| When explicit keys (``targets``, ``row_indices``, ``source_indices``, | |
| ``pairs``, ``mapping``) are present, the handler uses them. Otherwise | |
| it falls back to the generic probabilistic path. | |
| """ | |
| def __init__(self, seed: int) -> None: | |
| self._rng: random.Random = random.Random(seed) | |
| # ------------------------------------------------------------------ | |
| # Public API | |
| # ------------------------------------------------------------------ | |
| def corrupt( | |
| self, | |
| clean_data: list[dict[str, Any]], | |
| corruptions: list[dict[str, Any]], | |
| ) -> list[dict[str, Any]]: | |
| """Apply a list of corruption specs to *clean_data*. | |
| Each corruption spec is a dict with at least a ``"type"`` key that | |
| maps to one of the ``_corrupt_*`` handler methods. Additional keys | |
| are forwarded as keyword arguments to the handler. | |
| Returns a new list of rows (the original is never mutated). | |
| """ | |
| data = copy.deepcopy(clean_data) | |
| for spec in corruptions: | |
| handler_name = f"_corrupt_{spec['type']}" | |
| handler = getattr(self, handler_name, None) | |
| if handler is None: | |
| raise ValueError(f"Unknown corruption type: {spec['type']}") | |
| data = handler(data, spec) | |
| return data | |
| # ------------------------------------------------------------------ | |
| # No-op handlers | |
| # ------------------------------------------------------------------ | |
| def _corrupt_valid_unusual( | |
| self, | |
| data: list[dict[str, Any]], | |
| spec: dict[str, Any], | |
| ) -> list[dict[str, Any]]: | |
| """No-op: valid_unusual entries are documentary annotations, not corruptions.""" | |
| return data | |
| # ------------------------------------------------------------------ | |
| # Corruption handlers | |
| # ------------------------------------------------------------------ | |
| def _corrupt_char_swap( | |
| self, | |
| data: list[dict[str, Any]], | |
| spec: dict[str, Any], | |
| ) -> list[dict[str, Any]]: | |
| """Swap adjacent chars or replace with visually similar characters. | |
| Explicit spec: | |
| targets: list[dict] — each dict has "row_idx" and "field" | |
| mode: str — "swap" | "visual" | "both" (default "both") | |
| Generic (fallback) spec: | |
| columns: list[str] — columns to target | |
| probability: float — per-cell probability (default 0.3) | |
| mode: str — "swap" | "visual" | "both" (default "both") | |
| """ | |
| mode: str = spec.get("mode", "both") | |
| targets: list[dict[str, Any]] | None = spec.get("targets") | |
| if targets is not None: | |
| for target in targets: | |
| row_idx: int = target["row_idx"] | |
| field: str = target["field"] | |
| if row_idx >= len(data): | |
| continue | |
| row = data[row_idx] | |
| if field not in row or row[field] is None: | |
| continue | |
| value = str(row[field]) | |
| if len(value) < 2: | |
| continue | |
| row[field] = self._apply_char_swap(value, mode) | |
| else: | |
| columns: list[str] = spec.get("columns", []) | |
| probability: float = spec.get("probability", 0.3) | |
| for row in data: | |
| for col in columns: | |
| if col not in row or row[col] is None: | |
| continue | |
| if self._rng.random() > probability: | |
| continue | |
| value = str(row[col]) | |
| if len(value) < 2: | |
| continue | |
| row[col] = self._apply_char_swap(value, mode) | |
| return data | |
| def _corrupt_format_randomize( | |
| self, | |
| data: list[dict[str, Any]], | |
| spec: dict[str, Any], | |
| ) -> list[dict[str, Any]]: | |
| """Convert date strings to random formats. | |
| Explicit spec: | |
| column: str — the date column to target | |
| row_indices: list[int] — specific rows to corrupt | |
| source_format: str (default "%Y-%m-%d") | |
| Generic (fallback) spec: | |
| columns: list[str] — date columns to target | |
| probability: float — per-cell probability (default 0.5) | |
| source_format: str (default "%Y-%m-%d") | |
| """ | |
| from datetime import datetime | |
| source_format: str = spec.get("source_format", "%Y-%m-%d") | |
| row_indices: list[int] | None = spec.get("row_indices") | |
| if row_indices is not None: | |
| column: str = spec.get("column", "") | |
| for idx in row_indices: | |
| if idx >= len(data): | |
| continue | |
| row = data[idx] | |
| if column not in row or row[column] is None: | |
| continue | |
| try: | |
| dt = datetime.strptime(str(row[column]), source_format) | |
| new_fmt = self._rng.choice(DATE_FORMATS) | |
| row[column] = dt.strftime(new_fmt) | |
| except ValueError: | |
| pass | |
| else: | |
| columns: list[str] = spec.get("columns", []) | |
| probability: float = spec.get("probability", 0.5) | |
| for row in data: | |
| for col in columns: | |
| if col not in row or row[col] is None: | |
| continue | |
| if self._rng.random() > probability: | |
| continue | |
| try: | |
| dt = datetime.strptime(str(row[col]), source_format) | |
| new_fmt = self._rng.choice(DATE_FORMATS) | |
| row[col] = dt.strftime(new_fmt) | |
| except ValueError: | |
| pass | |
| return data | |
| def _corrupt_null_inject( | |
| self, | |
| data: list[dict[str, Any]], | |
| spec: dict[str, Any], | |
| ) -> list[dict[str, Any]]: | |
| """Set specific cells to None. | |
| Explicit spec: | |
| targets: list[dict] — each dict has "row_idx" and "field" | |
| Generic (fallback) spec: | |
| columns: list[str] — columns to target | |
| probability: float — per-cell probability (default 0.1) | |
| """ | |
| targets: list[dict[str, Any]] | None = spec.get("targets") | |
| if targets is not None: | |
| for target in targets: | |
| row_idx: int = target["row_idx"] | |
| field: str = target["field"] | |
| if row_idx >= len(data): | |
| continue | |
| if field in data[row_idx]: | |
| data[row_idx][field] = None | |
| else: | |
| columns: list[str] = spec.get("columns", []) | |
| probability: float = spec.get("probability", 0.1) | |
| for row in data: | |
| for col in columns: | |
| if col not in row: | |
| continue | |
| if self._rng.random() < probability: | |
| row[col] = None | |
| return data | |
| def _corrupt_case_corrupt( | |
| self, | |
| data: list[dict[str, Any]], | |
| spec: dict[str, Any], | |
| ) -> list[dict[str, Any]]: | |
| """Randomize string values to lower/upper/mixed case. | |
| Explicit spec: | |
| targets: list[dict] — each dict has "row_idx" and "field" | |
| Generic (fallback) spec: | |
| columns: list[str] | |
| probability: float (default 0.3) | |
| """ | |
| targets: list[dict[str, Any]] | None = spec.get("targets") | |
| if targets is not None: | |
| for target in targets: | |
| row_idx: int = target["row_idx"] | |
| field: str = target["field"] | |
| if row_idx >= len(data): | |
| continue | |
| row = data[row_idx] | |
| if field not in row or row[field] is None: | |
| continue | |
| row[field] = self._apply_case_corrupt(str(row[field])) | |
| else: | |
| columns: list[str] = spec.get("columns", []) | |
| probability: float = spec.get("probability", 0.3) | |
| for row in data: | |
| for col in columns: | |
| if col not in row or row[col] is None: | |
| continue | |
| if self._rng.random() > probability: | |
| continue | |
| row[col] = self._apply_case_corrupt(str(row[col])) | |
| return data | |
| def _corrupt_format_strip( | |
| self, | |
| data: list[dict[str, Any]], | |
| spec: dict[str, Any], | |
| ) -> list[dict[str, Any]]: | |
| """Strip phone formatting, keeping digits only. | |
| Explicit spec: | |
| column: str — the column to strip | |
| row_indices: list[int] — specific rows to strip | |
| Generic (fallback) spec: | |
| columns: list[str] | |
| probability: float (default 0.4) | |
| """ | |
| row_indices: list[int] | None = spec.get("row_indices") | |
| if row_indices is not None: | |
| column: str = spec.get("column", "") | |
| for idx in row_indices: | |
| if idx >= len(data): | |
| continue | |
| row = data[idx] | |
| if column not in row or row[column] is None: | |
| continue | |
| row[column] = "".join( | |
| c for c in str(row[column]) if c.isdigit() | |
| ) | |
| else: | |
| columns: list[str] = spec.get("columns", []) | |
| probability: float = spec.get("probability", 0.4) | |
| for row in data: | |
| for col in columns: | |
| if col not in row or row[col] is None: | |
| continue | |
| if self._rng.random() > probability: | |
| continue | |
| row[col] = "".join( | |
| c for c in str(row[col]) if c.isdigit() | |
| ) | |
| return data | |
| def _corrupt_duplicate_with_noise( | |
| self, | |
| data: list[dict[str, Any]], | |
| spec: dict[str, Any], | |
| ) -> list[dict[str, Any]]: | |
| """Clone rows with name variants and format changes. | |
| Explicit spec: | |
| source_indices: list[int] — indices of rows to duplicate | |
| noise_fields: list[str] — fields on which to apply noise | |
| Generic (fallback) spec: | |
| probability: float — per-row probability of duplication (default 0.2) | |
| name_columns: list[str] — columns containing names to vary | |
| format_columns: list[str] — columns to reformat (phones, dates) | |
| """ | |
| source_indices: list[int] | None = spec.get("source_indices") | |
| if source_indices is not None: | |
| noise_fields: list[str] = spec.get("noise_fields", []) | |
| new_rows: list[dict[str, Any]] = [] | |
| for src_idx in source_indices: | |
| if src_idx >= len(data): | |
| continue | |
| source_row = data[src_idx] | |
| dup = copy.deepcopy(source_row) | |
| # Copy _entity_id from source (already in deepcopy) | |
| # New _row_id will be assigned by the environment in reset() | |
| for field in noise_fields: | |
| if field not in dup or dup[field] is None: | |
| continue | |
| value = str(dup[field]) | |
| # Try name variant first, then strip formatting | |
| variant = self._apply_name_variant(value) | |
| if variant != value: | |
| dup[field] = variant | |
| else: | |
| # Strip formatting as noise | |
| digits = "".join(c for c in value if c.isdigit()) | |
| if digits: | |
| dup[field] = digits | |
| new_rows.append(dup) | |
| data.extend(new_rows) | |
| else: | |
| probability: float = spec.get("probability", 0.2) | |
| name_columns: list[str] = spec.get("name_columns", []) | |
| format_columns: list[str] = spec.get("format_columns", []) | |
| new_rows = [] | |
| for row in data: | |
| if self._rng.random() > probability: | |
| continue | |
| dup = copy.deepcopy(row) | |
| dup["_row_id"] = _make_row_id(self._rng) | |
| for col in name_columns: | |
| if col not in dup or dup[col] is None: | |
| continue | |
| dup[col] = self._apply_name_variant(str(dup[col])) | |
| for col in format_columns: | |
| if col not in dup or dup[col] is None: | |
| continue | |
| value = str(dup[col]) | |
| digits = "".join(c for c in value if c.isdigit()) | |
| if digits: | |
| dup[col] = digits | |
| new_rows.append(dup) | |
| data.extend(new_rows) | |
| return data | |
| def _corrupt_value_variation( | |
| self, | |
| data: list[dict[str, Any]], | |
| spec: dict[str, Any], | |
| ) -> list[dict[str, Any]]: | |
| """Map values to known variants (e.g. Engineering -> Eng). | |
| Explicit spec: | |
| column: str — the column to target | |
| mapping: dict[str, list[str]] — value -> list of variants | |
| Generic (fallback) spec: | |
| columns: list[str] | |
| probability: float (default 0.3) | |
| custom_mappings: dict[str, list[str]] | |
| """ | |
| mapping: dict[str, list[str]] | None = spec.get("mapping") | |
| if mapping is not None: | |
| column: str = spec.get("column", "") | |
| for row in data: | |
| if column not in row or row[column] is None: | |
| continue | |
| value = str(row[column]) | |
| if value in mapping: | |
| row[column] = self._rng.choice(mapping[value]) | |
| else: | |
| columns: list[str] = spec.get("columns", []) | |
| probability: float = spec.get("probability", 0.3) | |
| custom: dict[str, list[str]] = spec.get("custom_mappings", {}) | |
| mappings = {**VALUE_VARIATIONS, **custom} | |
| for row in data: | |
| for col in columns: | |
| if col not in row or row[col] is None: | |
| continue | |
| if self._rng.random() > probability: | |
| continue | |
| value = str(row[col]) | |
| if value in mappings: | |
| row[col] = self._rng.choice(mappings[value]) | |
| return data | |
| def _corrupt_state_expand( | |
| self, | |
| data: list[dict[str, Any]], | |
| spec: dict[str, Any], | |
| ) -> list[dict[str, Any]]: | |
| """Expand state abbreviations to full names (CA -> California). | |
| Explicit spec: | |
| row_indices: list[int] — specific rows to expand | |
| column: str — state column (default "state") | |
| Generic (fallback) spec: | |
| columns: list[str] | |
| probability: float (default 0.4) | |
| """ | |
| row_indices: list[int] | None = spec.get("row_indices") | |
| if row_indices is not None: | |
| column: str = spec.get("column", "state") | |
| for idx in row_indices: | |
| if idx >= len(data): | |
| continue | |
| row = data[idx] | |
| if column not in row or row[column] is None: | |
| continue | |
| value = str(row[column]).strip() | |
| if value.upper() in STATE_ABBREVIATIONS: | |
| row[column] = STATE_ABBREVIATIONS[value.upper()] | |
| else: | |
| columns: list[str] = spec.get("columns", []) | |
| probability: float = spec.get("probability", 0.4) | |
| for row in data: | |
| for col in columns: | |
| if col not in row or row[col] is None: | |
| continue | |
| if self._rng.random() > probability: | |
| continue | |
| value = str(row[col]).strip() | |
| if value.upper() in STATE_ABBREVIATIONS: | |
| row[col] = STATE_ABBREVIATIONS[value.upper()] | |
| return data | |
| def _corrupt_duplicate_cluster( | |
| self, | |
| data: list[dict[str, Any]], | |
| spec: dict[str, Any], | |
| ) -> list[dict[str, Any]]: | |
| """Create clusters of duplicates from source rows. | |
| Explicit spec: | |
| source_indices: list[int] — indices of source rows | |
| cluster_sizes: list[int] — number of duplicates per source | |
| noise_fields: list[str] — fields on which to apply noise | |
| Generic (fallback) spec: | |
| probability: float — per-row probability (default 0.15) | |
| name_columns: list[str] | |
| address_columns: list[str] | |
| phone_columns: list[str] | |
| """ | |
| source_indices: list[int] | None = spec.get("source_indices") | |
| if source_indices is not None: | |
| cluster_sizes: list[int] = spec.get("cluster_sizes", []) | |
| noise_fields: list[str] = spec.get("noise_fields", []) | |
| new_rows: list[dict[str, Any]] = [] | |
| for i, src_idx in enumerate(source_indices): | |
| if src_idx >= len(data): | |
| continue | |
| size = cluster_sizes[i] if i < len(cluster_sizes) else 2 | |
| source_row = data[src_idx] | |
| for _ in range(size): | |
| dup = copy.deepcopy(source_row) | |
| # Copy _entity_id from source (already in deepcopy) | |
| # New _row_id assigned by environment in reset() | |
| for field in noise_fields: | |
| if field not in dup or dup[field] is None: | |
| continue | |
| value = str(dup[field]) | |
| variant = self._apply_name_variant(value) | |
| if variant != value: | |
| dup[field] = variant | |
| else: | |
| # Apply phone reformatting or strip | |
| digits = "".join( | |
| c for c in value if c.isdigit() | |
| ) | |
| if len(digits) == 10: | |
| fmt = self._rng.choice([ | |
| f"({digits[:3]}) {digits[3:6]}-{digits[6:]}", | |
| f"{digits[:3]}-{digits[3:6]}-{digits[6:]}", | |
| f"{digits[:3]}.{digits[3:6]}.{digits[6:]}", | |
| digits, | |
| f"+1{digits}", | |
| f"1-{digits[:3]}-{digits[3:6]}-{digits[6:]}", | |
| ]) | |
| dup[field] = fmt | |
| elif digits: | |
| dup[field] = digits | |
| else: | |
| dup[field] = self._apply_address_variation( | |
| value | |
| ) | |
| new_rows.append(dup) | |
| data.extend(new_rows) | |
| else: | |
| probability: float = spec.get("probability", 0.15) | |
| name_columns: list[str] = spec.get("name_columns", []) | |
| address_columns: list[str] = spec.get("address_columns", []) | |
| phone_columns: list[str] = spec.get("phone_columns", []) | |
| new_rows = [] | |
| for row in data: | |
| if self._rng.random() > probability: | |
| continue | |
| cluster_size = self._rng.randint(2, 3) | |
| for _ in range(cluster_size): | |
| dup = copy.deepcopy(row) | |
| dup["_row_id"] = _make_row_id(self._rng) | |
| for col in name_columns: | |
| if col not in dup or dup[col] is None: | |
| continue | |
| dup[col] = self._apply_name_variant(str(dup[col])) | |
| for col in address_columns: | |
| if col not in dup or dup[col] is None: | |
| continue | |
| dup[col] = self._apply_address_variation(str(dup[col])) | |
| for col in phone_columns: | |
| if col not in dup or dup[col] is None: | |
| continue | |
| phone = str(dup[col]) | |
| digits = "".join(c for c in phone if c.isdigit()) | |
| if len(digits) == 10: | |
| fmt = self._rng.choice([ | |
| f"({digits[:3]}) {digits[3:6]}-{digits[6:]}", | |
| f"{digits[:3]}-{digits[3:6]}-{digits[6:]}", | |
| f"{digits[:3]}.{digits[3:6]}.{digits[6:]}", | |
| digits, | |
| f"+1{digits}", | |
| f"1-{digits[:3]}-{digits[3:6]}-{digits[6:]}", | |
| ]) | |
| dup[col] = fmt | |
| new_rows.append(dup) | |
| data.extend(new_rows) | |
| return data | |
| def _corrupt_cross_field_corrupt( | |
| self, | |
| data: list[dict[str, Any]], | |
| spec: dict[str, Any], | |
| ) -> list[dict[str, Any]]: | |
| """Mismatch zip codes with cities by swapping zip values. | |
| Explicit spec: | |
| row_indices: list[int] — specific rows whose zips to swap | |
| zip_column: str (default "zip") | |
| Generic (fallback) spec: | |
| zip_column: str (default "zip") | |
| city_column: str (default "city") | |
| probability: float (default 0.15) | |
| """ | |
| zip_column: str = spec.get("zip_column", "zip") | |
| row_indices: list[int] | None = spec.get("row_indices") | |
| if row_indices is not None: | |
| # For explicit targeting: swap zip of each targeted row with | |
| # a randomly chosen other targeted row. | |
| valid = [ | |
| idx for idx in row_indices | |
| if idx < len(data) | |
| and zip_column in data[idx] | |
| and data[idx][zip_column] is not None | |
| ] | |
| if len(valid) >= 2: | |
| for idx in valid: | |
| others = [i for i in valid if i != idx] | |
| swap_idx = self._rng.choice(others) | |
| data[idx][zip_column], data[swap_idx][zip_column] = ( | |
| data[swap_idx][zip_column], | |
| data[idx][zip_column], | |
| ) | |
| else: | |
| probability: float = spec.get("probability", 0.15) | |
| eligible_indices = [ | |
| i for i, row in enumerate(data) | |
| if zip_column in row and row[zip_column] is not None | |
| ] | |
| if len(eligible_indices) < 2: | |
| return data | |
| for idx in eligible_indices: | |
| if self._rng.random() > probability: | |
| continue | |
| swap_idx = self._rng.choice( | |
| [i for i in eligible_indices if i != idx] | |
| ) | |
| data[idx][zip_column], data[swap_idx][zip_column] = ( | |
| data[swap_idx][zip_column], | |
| data[idx][zip_column], | |
| ) | |
| return data | |
| def _corrupt_impossible_date( | |
| self, | |
| data: list[dict[str, Any]], | |
| spec: dict[str, Any], | |
| ) -> list[dict[str, Any]]: | |
| """Create impossible dates: future DOB, visit before birth. | |
| Explicit spec: | |
| targets: list[dict] — each dict has: | |
| "row_idx": int | |
| "field": str — date field to corrupt | |
| "corrupt_type": str — "future" | "visit_before_birth" | |
| source_format: str (default "%Y-%m-%d") | |
| Generic (fallback) spec: | |
| dob_column: str (default "dob") | |
| visit_column: str (optional) | |
| source_format: str (default "%Y-%m-%d") | |
| probability: float (default 0.1) | |
| """ | |
| from datetime import datetime, timedelta | |
| source_format: str = spec.get("source_format", "%Y-%m-%d") | |
| targets: list[dict[str, Any]] | None = spec.get("targets") | |
| if targets is not None: | |
| for target in targets: | |
| row_idx: int = target["row_idx"] | |
| field: str = target["field"] | |
| corrupt_type: str = target.get("corrupt_type", "future") | |
| if row_idx >= len(data): | |
| continue | |
| row = data[row_idx] | |
| if field not in row or row[field] is None: | |
| continue | |
| try: | |
| dt = datetime.strptime(str(row[field]), source_format) | |
| except ValueError: | |
| continue | |
| if corrupt_type == "future": | |
| future_offset = self._rng.randint(1, 3650) | |
| future_date = datetime.now() + timedelta( | |
| days=future_offset | |
| ) | |
| row[field] = future_date.strftime(source_format) | |
| elif corrupt_type == "visit_before_birth": | |
| before_birth = dt - timedelta( | |
| days=self._rng.randint(30, 3650) | |
| ) | |
| row[field] = before_birth.strftime(source_format) | |
| else: | |
| dob_column: str = spec.get("dob_column", "dob") | |
| visit_column: Optional[str] = spec.get("visit_column") | |
| probability: float = spec.get("probability", 0.1) | |
| for row in data: | |
| if self._rng.random() > probability: | |
| continue | |
| if dob_column not in row or row[dob_column] is None: | |
| continue | |
| try: | |
| dob = datetime.strptime( | |
| str(row[dob_column]), source_format | |
| ) | |
| except ValueError: | |
| continue | |
| corruption_type = self._rng.choice( | |
| ["future_dob", "visit_before_birth"] | |
| ) | |
| if corruption_type == "future_dob": | |
| future_offset = self._rng.randint(1, 3650) | |
| future_date = datetime.now() + timedelta( | |
| days=future_offset | |
| ) | |
| row[dob_column] = future_date.strftime(source_format) | |
| elif ( | |
| corruption_type == "visit_before_birth" | |
| and visit_column | |
| and visit_column in row | |
| ): | |
| before_birth = dob - timedelta( | |
| days=self._rng.randint(30, 3650) | |
| ) | |
| row[visit_column] = before_birth.strftime(source_format) | |
| return data | |
| def _corrupt_insurance_id_mismatch( | |
| self, | |
| data: list[dict[str, Any]], | |
| spec: dict[str, Any], | |
| ) -> list[dict[str, Any]]: | |
| """Assign wrong prefix for insurance provider. | |
| Explicit spec: | |
| row_indices: list[int] — specific rows to corrupt | |
| id_column: str (default "insurance_id") | |
| provider_column: str (default "insurance_provider") | |
| prefix_map: dict[str, str] (default standard map) | |
| Generic (fallback) spec: | |
| id_column: str (default "insurance_id") | |
| provider_column: str (default "insurance_provider") | |
| prefix_map: dict[str, str] | |
| probability: float (default 0.15) | |
| """ | |
| id_column: str = spec.get("id_column", "insurance_id") | |
| provider_column: str = spec.get("provider_column", "insurance_provider") | |
| prefix_map: dict[str, str] = spec.get("prefix_map", { | |
| "BlueCross": "BC", | |
| "Aetna": "AE", | |
| "UnitedHealth": "UH", | |
| "Cigna": "CG", | |
| "Humana": "HM", | |
| "Kaiser": "KP", | |
| }) | |
| all_prefixes = list(prefix_map.values()) | |
| if len(all_prefixes) < 2: | |
| return data | |
| row_indices: list[int] | None = spec.get("row_indices") | |
| if row_indices is not None: | |
| for idx in row_indices: | |
| if idx >= len(data): | |
| continue | |
| row = data[idx] | |
| self._swap_insurance_prefix( | |
| row, id_column, provider_column, prefix_map, all_prefixes | |
| ) | |
| else: | |
| probability: float = spec.get("probability", 0.15) | |
| for row in data: | |
| if self._rng.random() > probability: | |
| continue | |
| self._swap_insurance_prefix( | |
| row, id_column, provider_column, prefix_map, all_prefixes | |
| ) | |
| return data | |
| def _corrupt_null_inject_contextual( | |
| self, | |
| data: list[dict[str, Any]], | |
| spec: dict[str, Any], | |
| ) -> list[dict[str, Any]]: | |
| """Null out fields that can be inferred from context. | |
| Explicit spec: | |
| targets: list[dict] — each dict has "row_idx" and "field" | |
| Generic (fallback) spec: | |
| inferable_pairs: list[dict] — each dict has: | |
| "null_column": str | |
| "context_column": str | |
| probability: float (default 0.2) | |
| """ | |
| targets: list[dict[str, Any]] | None = spec.get("targets") | |
| if targets is not None: | |
| for target in targets: | |
| row_idx: int = target["row_idx"] | |
| field: str = target["field"] | |
| if row_idx >= len(data): | |
| continue | |
| if field in data[row_idx]: | |
| data[row_idx][field] = None | |
| else: | |
| pairs: list[dict[str, str]] = spec.get("inferable_pairs", []) | |
| probability: float = spec.get("probability", 0.2) | |
| for row in data: | |
| for pair in pairs: | |
| null_col = pair["null_column"] | |
| context_col = pair["context_column"] | |
| if null_col not in row or context_col not in row: | |
| continue | |
| if row[context_col] is None: | |
| continue | |
| if self._rng.random() > probability: | |
| continue | |
| row[null_col] = None | |
| return data | |
| def _corrupt_false_positive_duplicate( | |
| self, | |
| data: list[dict[str, Any]], | |
| spec: dict[str, Any], | |
| ) -> list[dict[str, Any]]: | |
| """Mark row pairs as false positives (NOT duplicates). | |
| No new rows are created. The rows already exist in the dataset. | |
| This handler annotates them with a ``_false_positive_pair`` marker | |
| so the grader knows they should NOT be merged. | |
| Explicit & generic spec (same format): | |
| pairs: list[list[int]] — pairs of row indices | |
| marker_column: str (default "_false_positive_pair") | |
| """ | |
| pairs: list[list[int]] = spec.get("pairs", []) | |
| marker_column: str = spec.get("marker_column", "_false_positive_pair") | |
| pair_id = 0 | |
| for pair in pairs: | |
| if len(pair) != 2: | |
| continue | |
| idx_a, idx_b = pair | |
| if idx_a < len(data) and idx_b < len(data): | |
| data[idx_a][marker_column] = f"fp_{pair_id}" | |
| data[idx_b][marker_column] = f"fp_{pair_id}" | |
| pair_id += 1 | |
| return data | |
| def _corrupt_address_variation( | |
| self, | |
| data: list[dict[str, Any]], | |
| spec: dict[str, Any], | |
| ) -> list[dict[str, Any]]: | |
| """Apply address abbreviation changes (Street -> St., etc.). | |
| Explicit spec: | |
| row_indices: list[int] — specific rows to corrupt | |
| column: str — address column (default "address") | |
| Generic (fallback) spec: | |
| columns: list[str] | |
| probability: float (default 0.4) | |
| """ | |
| row_indices: list[int] | None = spec.get("row_indices") | |
| if row_indices is not None: | |
| column: str = spec.get("column", "address") | |
| for idx in row_indices: | |
| if idx >= len(data): | |
| continue | |
| row = data[idx] | |
| if column not in row or row[column] is None: | |
| continue | |
| row[column] = self._apply_address_variation(str(row[column])) | |
| else: | |
| columns: list[str] = spec.get("columns", []) | |
| probability: float = spec.get("probability", 0.4) | |
| for row in data: | |
| for col in columns: | |
| if col not in row or row[col] is None: | |
| continue | |
| if self._rng.random() > probability: | |
| continue | |
| row[col] = self._apply_address_variation(str(row[col])) | |
| return data | |
| # ------------------------------------------------------------------ | |
| # Internal helpers | |
| # ------------------------------------------------------------------ | |
| def _apply_char_swap(self, value: str, mode: str) -> str: | |
| """Apply a single char-swap or visual-similar corruption.""" | |
| chosen_mode = mode | |
| if mode == "both": | |
| chosen_mode = self._rng.choice(["swap", "visual"]) | |
| if chosen_mode == "swap": | |
| idx = self._rng.randint(0, len(value) - 2) | |
| chars = list(value) | |
| chars[idx], chars[idx + 1] = chars[idx + 1], chars[idx] | |
| return "".join(chars) | |
| else: # visual | |
| replaceable = [ | |
| i for i, c in enumerate(value) if c in VISUAL_SIMILAR | |
| ] | |
| if replaceable: | |
| idx = self._rng.choice(replaceable) | |
| chars = list(value) | |
| chars[idx] = VISUAL_SIMILAR[chars[idx]] | |
| return "".join(chars) | |
| return value | |
| def _apply_case_corrupt(self, value: str) -> str: | |
| """Apply random case corruption to a string value.""" | |
| case_fn = self._rng.choice(["lower", "upper", "mixed"]) | |
| if case_fn == "lower": | |
| return value.lower() | |
| elif case_fn == "upper": | |
| return value.upper() | |
| else: | |
| return "".join( | |
| c.upper() if self._rng.random() > 0.5 else c.lower() | |
| for c in value | |
| ) | |
| def _apply_name_variant(self, name: str) -> str: | |
| """Replace a first name with a known variant if one exists.""" | |
| parts = name.split() | |
| if not parts: | |
| return name | |
| first = parts[0] | |
| # Check if the first name has known variants | |
| if first in NAME_VARIANTS: | |
| variant = self._rng.choice(NAME_VARIANTS[first]) | |
| parts[0] = variant | |
| return " ".join(parts) | |
| # Also try case-insensitive match | |
| for canonical, variants in NAME_VARIANTS.items(): | |
| if first.lower() == canonical.lower(): | |
| variant = self._rng.choice(variants) | |
| parts[0] = variant | |
| return " ".join(parts) | |
| # If no variant found, apply minor perturbation (swap case of first char) | |
| if len(first) > 1: | |
| if self._rng.random() < 0.5: | |
| parts[0] = first[0].lower() + first[1:] | |
| else: | |
| parts[0] = first[0].upper() + first[1:] | |
| return " ".join(parts) | |
| return name | |
| def _apply_address_variation(self, address: str) -> str: | |
| """Replace address terms with abbreviations or vice versa.""" | |
| result = address | |
| for full_form, abbreviations in ADDRESS_ABBREVIATIONS.items(): | |
| if full_form in result: | |
| replacement = self._rng.choice(abbreviations) | |
| result = result.replace(full_form, replacement, 1) | |
| else: | |
| # Check if any abbreviation is present and expand it | |
| for abbr in abbreviations: | |
| if abbr in result: | |
| if self._rng.random() < 0.5: | |
| result = result.replace(abbr, full_form, 1) | |
| break | |
| return result | |
| def _swap_insurance_prefix( | |
| self, | |
| row: dict[str, Any], | |
| id_column: str, | |
| provider_column: str, | |
| prefix_map: dict[str, str], | |
| all_prefixes: list[str], | |
| ) -> None: | |
| """Swap the insurance ID prefix to a wrong one (in-place).""" | |
| if id_column not in row or provider_column not in row: | |
| return | |
| if row[id_column] is None or row[provider_column] is None: | |
| return | |
| provider = str(row[provider_column]) | |
| correct_prefix = prefix_map.get(provider) | |
| if correct_prefix is None: | |
| return | |
| wrong_prefixes = [p for p in all_prefixes if p != correct_prefix] | |
| if not wrong_prefixes: | |
| return | |
| wrong_prefix = self._rng.choice(wrong_prefixes) | |
| current_id = str(row[id_column]) | |
| if current_id.startswith(correct_prefix): | |
| row[id_column] = wrong_prefix + current_id[len(correct_prefix):] | |
| else: | |
| row[id_column] = wrong_prefix + current_id | |
| def generate_dirty_data( | |
| clean_data: list[dict[str, Any]], | |
| corruptions: list[dict[str, Any]], | |
| seed: int, | |
| ) -> list[dict[str, Any]]: | |
| """Top-level function: apply corruption pipeline to clean data. | |
| Args: | |
| clean_data: List of row dicts representing the clean dataset. | |
| corruptions: List of corruption spec dicts. Each must contain a | |
| ``"type"`` key matching one of the DataCorruptor handler names | |
| (without the ``_corrupt_`` prefix). | |
| seed: Integer seed for deterministic output. | |
| Returns: | |
| A new list of (possibly more) row dicts with corruptions applied. | |
| The original *clean_data* is never mutated. | |
| """ | |
| corruptor = DataCorruptor(seed=seed) | |
| return corruptor.corrupt(clean_data, corruptions) | |