| from __future__ import annotations |
|
|
| import re |
| from dataclasses import dataclass |
| from typing import Dict, List, Tuple, Optional |
|
|
| import pandas as pd |
| from dateutil import parser as dtparser |
| from rapidfuzz import process, fuzz |
|
|
| WORD_NUMS = { |
| "zero": 0, "one": 1, "two": 2, "three": 3, "four": 4, "five": 5, |
| "six": 6, "seven": 7, "eight": 8, "nine": 9, "ten": 10, |
| "eleven": 11, "twelve": 12, "thirteen": 13, "fourteen": 14, |
| "fifteen": 15, "sixteen": 16, "seventeen": 17, "eighteen": 18, "nineteen": 19, |
| "twenty": 20, "thirty": 30, "forty": 40, "fifty": 50, "sixty": 60 |
| } |
|
|
| DEPT_CANON = { |
| "ai": "Artificial Intelligence", |
| "artificial intelligence": "Artificial Intelligence", |
| "ai/ml": "AI/ML", |
| "ml": "Machine Learning", |
| "machine learning": "Machine Learning", |
| "data science": "Data Science", |
| "datascience": "Data Science", |
| } |
|
|
| LOCATION_CANON = { |
| "nyc": "New York", |
| "new york": "New York", |
| "san francisco": "San Francisco", |
| "chicago": "Chicago", |
| "seattle": "Seattle", |
| "boston": "Boston", |
| } |
|
|
| EMAIL_RE = re.compile(r"^[A-Za-z0-9._%+\-]+@[A-Za-z0-9.\-]+\.[A-Za-z]{2,}$") |
|
|
| @dataclass |
| class CleaningReport: |
| rows: int |
| fixes: Dict[str, int] |
| warnings: List[str] |
|
|
| def _clean_name(s: str) -> Optional[str]: |
| if s is None or pd.isna(s): |
| return None |
| s = str(s).strip() |
| s = re.sub(r"\s+", " ", s) |
| s = " ".join([w.capitalize() for w in s.split()]) |
| return s if s else None |
|
|
| def _parse_word_number(s: str) -> Optional[int]: |
| if s is None: |
| return None |
| t = str(s).strip().lower() |
| if t == "": |
| return None |
| if re.fullmatch(r"\d+", t): |
| return int(t) |
| parts = re.split(r"[\s\-]+", t) |
| total = 0 |
| matched = False |
| for p in parts: |
| if p in WORD_NUMS: |
| total += WORD_NUMS[p] |
| matched = True |
| else: |
| return None |
| return total if matched else None |
|
|
| def _clean_email(s: str) -> Tuple[str, bool]: |
| if s is None: |
| return "unknown@unknown.com", True |
| t = str(s).strip().lower() |
| if t == "": |
| return "unknown@unknown.com", True |
| t = t.replace(" at ", "@").replace(" dot ", ".") |
| t = t.replace("..", ".") |
| t = re.sub(r"@{2,}", "@", t) |
| if EMAIL_RE.match(t): |
| return t, (t != str(s).strip().lower()) |
| if "@" in t and "." not in t.split("@", 1)[1]: |
| return t, False |
| return t, False |
|
|
| def _clean_salary(s: str) -> Tuple[Optional[float], bool]: |
| if s is None: |
| return None, False |
| t = str(s).strip() |
| if t == "" or t.lower() == "nan": |
| return None, False |
| t2 = re.sub(r"[,$]", "", t) |
| t2 = re.sub(r"usd", "", t2, flags=re.I).strip() |
| try: |
| return float(t2), (t2 != t) |
| except ValueError: |
| return None, False |
|
|
| def _clean_date(s: str) -> Tuple[Optional[str], bool]: |
| if s is None: |
| return None, False |
| t = str(s).strip() |
| if t == "" or t.lower() == "nan": |
| return None, False |
| try: |
| dt = dtparser.parse(t, dayfirst=False, fuzzy=True) |
| iso = dt.date().isoformat() |
| return iso, (iso != t) |
| except Exception: |
| return None, False |
|
|
| def _canon_from_map(value: str, mapping: Dict[str, str], threshold: int = 90) -> Tuple[str, bool]: |
| raw = (value or "").strip() |
| if raw == "": |
| return raw, False |
| key = raw.lower() |
| if key in mapping: |
| canon = mapping[key] |
| return canon, canon != raw |
| match = process.extractOne(key, mapping.keys(), scorer=fuzz.ratio) |
| if match and match[1] >= threshold: |
| canon = mapping[match[0]] |
| return canon, canon != raw |
| return raw, False |
|
|
| def clean_dataframe(df: pd.DataFrame) -> Tuple[pd.DataFrame, CleaningReport]: |
| out = df.copy() |
| fixes: Dict[str, int] = {} |
| warnings: List[str] = [] |
|
|
| out.columns = [c.strip() for c in out.columns] |
| col_map = {c.lower(): c for c in out.columns} |
|
|
| name_col = col_map.get("name") |
| age_col = col_map.get("age") |
| email_col = col_map.get("email") |
| salary_col = col_map.get("salary") |
| join_col = col_map.get("join_date") or col_map.get("join date") |
| dept_col = col_map.get("department") |
| perf_col = col_map.get("performance_score") or col_map.get("performance score") |
| loc_col = col_map.get("location") |
|
|
| if name_col: |
| before = out[name_col].astype(str).tolist() |
| out[name_col] = out[name_col].apply(_clean_name) |
| fixes["name_normalized"] = sum(b != a for b, a in zip(before, out[name_col].tolist())) |
|
|
| if age_col: |
| def clean_age(x): |
| if x is None: |
| return None |
| t = str(x).strip().lower() |
| if t == "": |
| return None |
| n = _parse_word_number(t) |
| if n is not None: |
| return n |
| try: |
| return int(float(t)) |
| except Exception: |
| return None |
| before = out[age_col].tolist() |
| out[age_col] = out[age_col].apply(clean_age) |
| fixes["age_parsed"] = sum(b != a for b, a in zip(before, out[age_col].tolist())) |
|
|
| if email_col: |
| before = out[email_col].tolist() |
| new_vals, changed = [], 0 |
| for v in before: |
| cleaned, did = _clean_email(v) |
| if did: |
| changed += 1 |
| new_vals.append(cleaned) |
| out[email_col] = new_vals |
| fixes["email_cleaned"] = changed |
| invalid = [e for e in out[email_col].tolist() if not EMAIL_RE.match(e)] |
| if invalid: |
| warnings.append(f"{len(invalid)} email(s) still look invalid (e.g., '{invalid[0]}').") |
|
|
| if salary_col: |
| before = out[salary_col].tolist() |
| vals, changed = [], 0 |
| for v in before: |
| s2, did = _clean_salary(v) |
| if did: |
| changed += 1 |
| vals.append(s2) |
| out[salary_col] = vals |
| fixes["salary_cleaned"] = changed |
|
|
| if join_col: |
| before = out[join_col].tolist() |
| vals, changed = [], 0 |
| for v in before: |
| d2, did = _clean_date(v) |
| if did: |
| changed += 1 |
| vals.append(d2) |
| out[join_col] = vals |
| fixes["join_date_normalized"] = changed |
|
|
| if dept_col: |
| before = out[dept_col].astype(str).tolist() |
| new, changed = [], 0 |
| for v in before: |
| canon, did = _canon_from_map(v, DEPT_CANON, threshold=90) |
| if did: |
| changed += 1 |
| new.append(canon) |
| out[dept_col] = new |
| fixes["department_standardized"] = changed |
|
|
| if loc_col: |
| before = out[loc_col].astype(str).tolist() |
| new, changed = [], 0 |
| for v in before: |
| canon, did = _canon_from_map(v, LOCATION_CANON, threshold=88) |
| if did: |
| changed += 1 |
| new.append(canon) |
| out[loc_col] = new |
| fixes["location_standardized"] = changed |
|
|
| if perf_col: |
| before = out[perf_col].tolist() |
| def clean_perf(x): |
| if x is None: |
| return None |
| t = str(x).strip().lower() |
| if t == "" or t == "nan": |
| return None |
| n = _parse_word_number(t) |
| if n is not None: |
| return float(n) |
| try: |
| return float(t) |
| except Exception: |
| return None |
| out[perf_col] = out[perf_col].apply(clean_perf) |
| fixes["performance_parsed"] = sum(b != a for b, a in zip(before, out[perf_col].tolist())) |
|
|
| return out, CleaningReport(rows=len(out), fixes=fixes, warnings=warnings) |
|
|