Spaces:
Sleeping
Sleeping
| """ | |
| Grader for DataWranglerEnv. | |
| Multi-dimensional scoring system that compares the agent's cleaned dataset | |
| against the ground truth. Produces deterministic scores in (0.001, 0.999). | |
| Dimensions: | |
| - Missing values fixed (20%) | |
| - Duplicates removed (15%) | |
| - Type correctness (15%) | |
| - Value accuracy (20%) | |
| - Data preservation (10%) | |
| - Constraint compliance (10%) β NEW: business rule satisfaction | |
| - Step efficiency (5%) β NEW: fewer steps = better | |
| - Golden row integrity (5%) β NEW: anti-exploit mechanism | |
| """ | |
| from typing import Any, Dict, List, Tuple | |
| import numpy as np | |
| import pandas as pd | |
| def compute_score( | |
| dirty_df: pd.DataFrame, | |
| current_df: pd.DataFrame, | |
| clean_df: pd.DataFrame, | |
| original_dirty_df: pd.DataFrame, | |
| issue_manifest: Dict[str, Any], | |
| step_count: int = 0, | |
| max_steps: int = 30, | |
| golden_indices: List[int] = None, | |
| ) -> Tuple[float, Dict[str, float]]: | |
| """Compute the composite data quality score. | |
| Args: | |
| dirty_df: The original dirty dataset (snapshot at reset) | |
| current_df: The agent's current working dataset | |
| clean_df: The ground truth clean dataset | |
| original_dirty_df: The very first dirty state (for reference) | |
| issue_manifest: Manifest of all injected issues | |
| step_count: Current step number | |
| max_steps: Maximum allowed steps | |
| golden_indices: Indices of golden rows in clean_df | |
| Returns: | |
| Tuple of (composite_score, dimension_scores_dict) | |
| """ | |
| scores = {} | |
| if golden_indices is None: | |
| golden_indices = issue_manifest.get("golden_indices", []) | |
| # ββ Dimension 1: Missing Values Fixed (20%) ββββββββββββββββββββββββββ | |
| original_missing = original_dirty_df.isnull().sum().sum() | |
| current_missing = current_df.isnull().sum().sum() | |
| target_missing = clean_df.isnull().sum().sum() | |
| if original_missing > target_missing: | |
| fixable = original_missing - target_missing | |
| fixed = max(0, original_missing - current_missing) | |
| scores["missing_fixed"] = min(1.0, fixed / fixable) if fixable > 0 else 1.0 | |
| else: | |
| scores["missing_fixed"] = 1.0 | |
| # ββ Dimension 2: Duplicates Removed (15%) βββββββββββββββββββββββββββββ | |
| original_dupes = original_dirty_df.duplicated().sum() | |
| current_dupes = current_df.duplicated().sum() | |
| target_dupes = clean_df.duplicated().sum() | |
| if original_dupes > target_dupes: | |
| fixable_dupes = original_dupes - target_dupes | |
| removed = max(0, original_dupes - current_dupes) | |
| scores["duplicates_removed"] = min(1.0, removed / fixable_dupes) if fixable_dupes > 0 else 1.0 | |
| else: | |
| scores["duplicates_removed"] = 1.0 | |
| # ββ Dimension 3: Type Correctness (15%) βββββββββββββββββββββββββββββββ | |
| type_matches = 0 | |
| type_total = len(clean_df.columns) | |
| for col in clean_df.columns: | |
| if col not in current_df.columns: | |
| continue | |
| clean_dtype = clean_df[col].dtype | |
| current_dtype = current_df[col].dtype | |
| if clean_dtype == current_dtype: | |
| type_matches += 1 | |
| elif pd.api.types.is_numeric_dtype(clean_dtype) and pd.api.types.is_numeric_dtype(current_dtype): | |
| type_matches += 1 | |
| elif pd.api.types.is_string_dtype(clean_dtype) and pd.api.types.is_string_dtype(current_dtype): | |
| type_matches += 1 | |
| elif str(clean_dtype) == "object" and str(current_dtype) == "object": | |
| type_matches += 1 | |
| scores["type_correctness"] = type_matches / type_total if type_total > 0 else 1.0 | |
| # ββ Dimension 4: Value Accuracy (20%) βββββββββββββββββββββββββββββββββ | |
| try: | |
| min_rows = min(len(current_df), len(clean_df)) | |
| common_cols = [c for c in clean_df.columns if c in current_df.columns] | |
| if common_cols and min_rows > 0: | |
| clean_subset = clean_df[common_cols].head(min_rows).reset_index(drop=True) | |
| current_subset = current_df[common_cols].head(min_rows).reset_index(drop=True) | |
| total_cells = 0 | |
| matching_cells = 0 | |
| for col in common_cols: | |
| for i in range(min_rows): | |
| total_cells += 1 | |
| clean_val = clean_subset.at[i, col] | |
| try: | |
| current_val = current_subset.at[i, col] | |
| except (KeyError, IndexError): | |
| continue | |
| if pd.isna(clean_val) and pd.isna(current_val): | |
| matching_cells += 1 | |
| elif pd.isna(clean_val) or pd.isna(current_val): | |
| continue | |
| elif str(clean_val).strip().lower() == str(current_val).strip().lower(): | |
| matching_cells += 1 | |
| else: | |
| try: | |
| cv = float(clean_val) | |
| av = float(current_val) | |
| if cv != 0 and abs(cv - av) / abs(cv) < 0.01: | |
| matching_cells += 0.8 | |
| except (ValueError, TypeError): | |
| pass | |
| scores["value_accuracy"] = matching_cells / total_cells if total_cells > 0 else 0.0 | |
| else: | |
| scores["value_accuracy"] = 0.0 | |
| except Exception: | |
| scores["value_accuracy"] = 0.0 | |
| # ββ Dimension 5: Data Preservation (10%) ββββββββββββββββββββββββββββββ | |
| expected_rows = len(clean_df) | |
| current_rows = len(current_df) | |
| if current_rows >= expected_rows: | |
| scores["data_preservation"] = 1.0 | |
| elif current_rows == 0: | |
| scores["data_preservation"] = 0.0 | |
| else: | |
| scores["data_preservation"] = max(0.0, current_rows / expected_rows) | |
| # ββ Dimension 6: Constraint Compliance (10%) β NEW ββββββββββββββββββββ | |
| business_rules = issue_manifest.get("business_rules", []) | |
| if business_rules: | |
| rules_satisfied = 0 | |
| rules_total = len(business_rules) | |
| for rule in business_rules: | |
| rule_type = rule.get("type", "") | |
| col = rule.get("column", "") | |
| if rule_type == "range" and col in current_df.columns: | |
| lo, hi = rule.get("min", float("-inf")), rule.get("max", float("inf")) | |
| numeric = pd.to_numeric(current_df[col], errors="coerce") | |
| valid = numeric.dropna() | |
| if len(valid) > 0: | |
| pct_ok = ((valid >= lo) & (valid <= hi)).mean() | |
| rules_satisfied += pct_ok | |
| elif rule_type == "not_null" and col in current_df.columns: | |
| pct_ok = 1.0 - (current_df[col].isna().sum() / max(1, len(current_df))) | |
| rules_satisfied += pct_ok | |
| elif rule_type == "cross_column": | |
| col_a = rule.get("column_a", "") | |
| col_b = rule.get("column_b", "") | |
| if col_a in current_df.columns and col_b in current_df.columns: | |
| a = pd.to_numeric(current_df[col_a], errors="coerce") | |
| b = pd.to_numeric(current_df[col_b], errors="coerce") | |
| valid_mask = a.notna() & b.notna() | |
| if valid_mask.sum() > 0: | |
| pct_ok = (a[valid_mask] > b[valid_mask]).mean() | |
| rules_satisfied += pct_ok | |
| elif rule_type == "categorical" and col in current_df.columns: | |
| allowed = set(rule.get("allowed_values", [])) | |
| if allowed: | |
| non_null = current_df[col].dropna() | |
| if len(non_null) > 0: | |
| pct_ok = non_null.isin(allowed).mean() | |
| rules_satisfied += pct_ok | |
| elif rule_type == "pattern" and col in current_df.columns: | |
| pat = rule.get("pattern", "") | |
| if pat: | |
| non_null = current_df[col].dropna().astype(str) | |
| if len(non_null) > 0: | |
| pct_ok = non_null.str.match(pat).mean() | |
| rules_satisfied += pct_ok | |
| scores["constraint_compliance"] = rules_satisfied / rules_total if rules_total > 0 else 0.8 | |
| else: | |
| scores["constraint_compliance"] = 0.8 # Default for tasks without rules | |
| # ββ Dimension 7: Step Efficiency (5%) β NEW βββββββββββββββββββββββββββ | |
| if step_count > 0 and max_steps > 0: | |
| efficiency = 1.0 - (step_count / max_steps) | |
| scores["step_efficiency"] = max(0.1, min(1.0, efficiency * 1.5)) | |
| else: | |
| scores["step_efficiency"] = 0.5 | |
| # ββ Dimension 8: Golden Row Integrity (5%) β NEW ββββββββββββββββββββββ | |
| if golden_indices and len(current_df) > 0: | |
| golden_ok = 0 | |
| golden_total = len(golden_indices) | |
| for gi in golden_indices: | |
| if gi >= len(clean_df): | |
| continue | |
| golden_row = clean_df.iloc[gi] | |
| # Check if this golden row still exists in current_df (approximately) | |
| found = False | |
| for _, row in current_df.iterrows(): | |
| match_count = 0 | |
| total_check = 0 | |
| for col in clean_df.columns: | |
| if col not in current_df.columns: | |
| continue | |
| total_check += 1 | |
| gv = golden_row[col] | |
| cv = row[col] | |
| if pd.isna(gv) and pd.isna(cv): | |
| match_count += 1 | |
| elif not pd.isna(gv) and not pd.isna(cv): | |
| if str(gv).strip().lower() == str(cv).strip().lower(): | |
| match_count += 1 | |
| if total_check > 0 and match_count / total_check > 0.8: | |
| found = True | |
| break | |
| if found: | |
| golden_ok += 1 | |
| scores["golden_row_integrity"] = golden_ok / golden_total if golden_total > 0 else 1.0 | |
| else: | |
| scores["golden_row_integrity"] = 1.0 | |
| # ββ Composite Score ββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| weights = { | |
| "missing_fixed": 0.20, | |
| "duplicates_removed": 0.15, | |
| "type_correctness": 0.15, | |
| "value_accuracy": 0.20, | |
| "data_preservation": 0.10, | |
| "constraint_compliance": 0.10, | |
| "step_efficiency": 0.05, | |
| "golden_row_integrity": 0.05, | |
| } | |
| composite = sum(scores[dim] * weights[dim] for dim in weights) | |
| # Clamp to open interval (0, 1) | |
| composite = max(0.001, min(0.999, composite)) | |
| return composite, scores | |
| def compute_step_reward( | |
| before_df: pd.DataFrame, | |
| after_df: pd.DataFrame, | |
| clean_df: pd.DataFrame, | |
| original_dirty_df: pd.DataFrame, | |
| issue_manifest: Dict[str, Any], | |
| command: str, | |
| data_modified: bool, | |
| ) -> float: | |
| """Compute the reward for a single step.""" | |
| cmd = command.strip().split()[0].lower() if command.strip() else "" | |
| # Diagnostic commands get small positive reward | |
| diagnostic_cmds = {"help", "view", "profile", "profile_column", "find_missing", | |
| "find_duplicates", "find_outliers", "check_rules", "history"} | |
| if cmd in diagnostic_cmds: | |
| return 0.02 | |
| # Validate and undo get small reward | |
| if cmd in ("validate", "undo"): | |
| return 0.01 | |
| # Submit doesn't give step reward (final score is computed separately) | |
| if cmd == "submit": | |
| return 0.001 | |
| if not data_modified: | |
| return -0.01 | |
| # For data-modifying commands, compare improvement | |
| before_score, _ = compute_score( | |
| original_dirty_df, before_df, clean_df, original_dirty_df, issue_manifest | |
| ) | |
| after_score, _ = compute_score( | |
| original_dirty_df, after_df, clean_df, original_dirty_df, issue_manifest | |
| ) | |
| improvement = after_score - before_score | |
| if improvement > 0: | |
| return min(0.15, improvement * 2.0 + 0.03) | |
| elif improvement < -0.01: | |
| return max(-0.20, improvement * 2.0) | |
| else: | |
| return 0.001 | |