Spaces:
Running
Running
| """Metrics for scoring a predicted cleaning plan against gold + clean reference.""" | |
| from __future__ import annotations | |
| import math | |
| import pandas as pd | |
| from jsonschema import Draft202012Validator | |
| from scrubdata.executor import apply_plan | |
| # Plan schema (validity gate). Permissive on extra keys; strict on shape/op names. | |
| OP_NAMES = [ | |
| "strip_whitespace", "collapse_internal_whitespace", "normalize_punctuation", | |
| "fix_encoding", "resolve_by_majority", "normalize_disguised_nulls", | |
| "standardize_case", "parse_currency", "parse_number", "parse_percent", "parse_date", | |
| "standardize_boolean", "standardize_phone", "normalize_email", "canonicalize_categories", | |
| "drop_empty_rows", "drop_empty_columns", "drop_exact_duplicates", | |
| "flag_pii", "mask_pii", "hash_pii", "pseudonymize_pii", | |
| ] | |
| PLAN_SCHEMA = { | |
| "type": "object", | |
| "required": ["table_operations", "columns"], | |
| "properties": { | |
| "dataset_summary": {"type": "string"}, | |
| "table_operations": { | |
| "type": "array", | |
| "items": {"type": "object", "required": ["op"], | |
| "properties": {"op": {"enum": OP_NAMES}}}, | |
| }, | |
| "columns": { | |
| "type": "array", | |
| "items": { | |
| "type": "object", "required": ["name", "operations"], | |
| "properties": { | |
| "name": {"type": "string"}, | |
| "operations": { | |
| "type": "array", | |
| "items": {"type": "object", "required": ["op"], | |
| "properties": {"op": {"enum": OP_NAMES}}}, | |
| }, | |
| }, | |
| }, | |
| }, | |
| "flags": {"type": "array"}, | |
| }, | |
| } | |
| _VALIDATOR = Draft202012Validator(PLAN_SCHEMA) | |
| def is_valid(plan: dict) -> bool: | |
| return _VALIDATOR.is_valid(plan) | |
| # --- feature extraction for set-based F1 ------------------------------------- | |
| def op_pairs(plan: dict) -> set: | |
| """Op-identity pairs for plan F1. PII ops are excluded: they are orthogonal to the | |
| cleaning gold (which predates PII support) and would unfairly penalize planners | |
| that flag sensitive columns.""" | |
| s = {("<table>", t["op"]) for t in plan.get("table_operations", [])} | |
| for c in plan.get("columns", []): | |
| for o in c.get("operations", []): | |
| if "pii" not in o.get("op", ""): | |
| s.add((c["name"], o["op"])) | |
| return s | |
| def canon_pairs(plan: dict) -> set: | |
| s = set() | |
| for c in plan.get("columns", []): | |
| for o in c.get("operations", []): | |
| if o["op"] == "canonicalize_categories": | |
| for raw, canon in o.get("mapping", {}).items(): | |
| s.add((c["name"], raw, canon)) | |
| return s | |
| def _prf(tp: int, fp: int, fn: int) -> dict: | |
| p = tp / (tp + fp) if tp + fp else 0.0 | |
| r = tp / (tp + fn) if tp + fn else 0.0 | |
| f = 2 * p * r / (p + r) if p + r else 0.0 | |
| return {"p": p, "r": r, "f1": f} | |
| # --- end-to-end recovery ----------------------------------------------------- | |
| def _cell_equal(a, b) -> bool: | |
| am = a is None or (isinstance(a, float) and math.isnan(a)) or pd.isna(a) | |
| bm = b is None or (isinstance(b, float) and math.isnan(b)) or pd.isna(b) | |
| if am or bm: | |
| return am and bm | |
| try: | |
| fa, fb = float(a), float(b) | |
| # strings like "Nan" (a person's name) parse to float NaN, which is | |
| # unequal to itself under isclose — fall through to string equality | |
| if not (math.isnan(fa) or math.isnan(fb)): | |
| return math.isclose(fa, fb, rel_tol=1e-6, abs_tol=1e-6) | |
| except (TypeError, ValueError): | |
| pass | |
| return str(a) == str(b) | |
| def recovery(clean_df: pd.DataFrame, dirty_df: pd.DataFrame, plan: dict) -> float: | |
| """Fraction of clean-reference cells recovered by executing `plan` on `dirty_df`.""" | |
| try: | |
| cleaned, _ = apply_plan(dirty_df, plan) | |
| except Exception: | |
| return 0.0 | |
| total = clean_df.size or 1 | |
| matched = 0 | |
| nrows = min(len(cleaned), len(clean_df)) | |
| for col in clean_df.columns: | |
| if col not in cleaned.columns: | |
| continue # missing column → all its cells count as unrecovered | |
| cl, pr = clean_df[col].tolist(), cleaned[col].tolist() | |
| for i in range(nrows): | |
| if _cell_equal(cl[i], pr[i]): | |
| matched += 1 | |
| return matched / total | |