Aswini-Kumar's picture
Sync server/grader.py
c77741a verified
"""
Grader for DataWranglerEnv.
Multi-dimensional scoring system that compares the agent's cleaned dataset
against the ground truth. Produces deterministic scores in (0.001, 0.999).
Dimensions:
- Missing values fixed (20%)
- Duplicates removed (15%)
- Type correctness (15%)
- Value accuracy (20%)
- Data preservation (10%)
- Constraint compliance (10%) β€” NEW: business rule satisfaction
- Step efficiency (5%) β€” NEW: fewer steps = better
- Golden row integrity (5%) β€” NEW: anti-exploit mechanism
"""
from typing import Any, Dict, List, Tuple
import numpy as np
import pandas as pd
def compute_score(
dirty_df: pd.DataFrame,
current_df: pd.DataFrame,
clean_df: pd.DataFrame,
original_dirty_df: pd.DataFrame,
issue_manifest: Dict[str, Any],
step_count: int = 0,
max_steps: int = 30,
golden_indices: List[int] = None,
) -> Tuple[float, Dict[str, float]]:
"""Compute the composite data quality score.
Args:
dirty_df: The original dirty dataset (snapshot at reset)
current_df: The agent's current working dataset
clean_df: The ground truth clean dataset
original_dirty_df: The very first dirty state (for reference)
issue_manifest: Manifest of all injected issues
step_count: Current step number
max_steps: Maximum allowed steps
golden_indices: Indices of golden rows in clean_df
Returns:
Tuple of (composite_score, dimension_scores_dict)
"""
scores = {}
if golden_indices is None:
golden_indices = issue_manifest.get("golden_indices", [])
# ── Dimension 1: Missing Values Fixed (20%) ──────────────────────────
original_missing = original_dirty_df.isnull().sum().sum()
current_missing = current_df.isnull().sum().sum()
target_missing = clean_df.isnull().sum().sum()
if original_missing > target_missing:
fixable = original_missing - target_missing
fixed = max(0, original_missing - current_missing)
scores["missing_fixed"] = min(1.0, fixed / fixable) if fixable > 0 else 1.0
else:
scores["missing_fixed"] = 1.0
# ── Dimension 2: Duplicates Removed (15%) ─────────────────────────────
original_dupes = original_dirty_df.duplicated().sum()
current_dupes = current_df.duplicated().sum()
target_dupes = clean_df.duplicated().sum()
if original_dupes > target_dupes:
fixable_dupes = original_dupes - target_dupes
removed = max(0, original_dupes - current_dupes)
scores["duplicates_removed"] = min(1.0, removed / fixable_dupes) if fixable_dupes > 0 else 1.0
else:
scores["duplicates_removed"] = 1.0
# ── Dimension 3: Type Correctness (15%) ───────────────────────────────
type_matches = 0
type_total = len(clean_df.columns)
for col in clean_df.columns:
if col not in current_df.columns:
continue
clean_dtype = clean_df[col].dtype
current_dtype = current_df[col].dtype
if clean_dtype == current_dtype:
type_matches += 1
elif pd.api.types.is_numeric_dtype(clean_dtype) and pd.api.types.is_numeric_dtype(current_dtype):
type_matches += 1
elif pd.api.types.is_string_dtype(clean_dtype) and pd.api.types.is_string_dtype(current_dtype):
type_matches += 1
elif str(clean_dtype) == "object" and str(current_dtype) == "object":
type_matches += 1
scores["type_correctness"] = type_matches / type_total if type_total > 0 else 1.0
# ── Dimension 4: Value Accuracy (20%) ─────────────────────────────────
try:
min_rows = min(len(current_df), len(clean_df))
common_cols = [c for c in clean_df.columns if c in current_df.columns]
if common_cols and min_rows > 0:
clean_subset = clean_df[common_cols].head(min_rows).reset_index(drop=True)
current_subset = current_df[common_cols].head(min_rows).reset_index(drop=True)
total_cells = 0
matching_cells = 0
for col in common_cols:
for i in range(min_rows):
total_cells += 1
clean_val = clean_subset.at[i, col]
try:
current_val = current_subset.at[i, col]
except (KeyError, IndexError):
continue
if pd.isna(clean_val) and pd.isna(current_val):
matching_cells += 1
elif pd.isna(clean_val) or pd.isna(current_val):
continue
elif str(clean_val).strip().lower() == str(current_val).strip().lower():
matching_cells += 1
else:
try:
cv = float(clean_val)
av = float(current_val)
if cv != 0 and abs(cv - av) / abs(cv) < 0.01:
matching_cells += 0.8
except (ValueError, TypeError):
pass
scores["value_accuracy"] = matching_cells / total_cells if total_cells > 0 else 0.0
else:
scores["value_accuracy"] = 0.0
except Exception:
scores["value_accuracy"] = 0.0
# ── Dimension 5: Data Preservation (10%) ──────────────────────────────
expected_rows = len(clean_df)
current_rows = len(current_df)
if current_rows >= expected_rows:
scores["data_preservation"] = 1.0
elif current_rows == 0:
scores["data_preservation"] = 0.0
else:
scores["data_preservation"] = max(0.0, current_rows / expected_rows)
# ── Dimension 6: Constraint Compliance (10%) β€” NEW ────────────────────
business_rules = issue_manifest.get("business_rules", [])
if business_rules:
rules_satisfied = 0
rules_total = len(business_rules)
for rule in business_rules:
rule_type = rule.get("type", "")
col = rule.get("column", "")
if rule_type == "range" and col in current_df.columns:
lo, hi = rule.get("min", float("-inf")), rule.get("max", float("inf"))
numeric = pd.to_numeric(current_df[col], errors="coerce")
valid = numeric.dropna()
if len(valid) > 0:
pct_ok = ((valid >= lo) & (valid <= hi)).mean()
rules_satisfied += pct_ok
elif rule_type == "not_null" and col in current_df.columns:
pct_ok = 1.0 - (current_df[col].isna().sum() / max(1, len(current_df)))
rules_satisfied += pct_ok
elif rule_type == "cross_column":
col_a = rule.get("column_a", "")
col_b = rule.get("column_b", "")
if col_a in current_df.columns and col_b in current_df.columns:
a = pd.to_numeric(current_df[col_a], errors="coerce")
b = pd.to_numeric(current_df[col_b], errors="coerce")
valid_mask = a.notna() & b.notna()
if valid_mask.sum() > 0:
pct_ok = (a[valid_mask] > b[valid_mask]).mean()
rules_satisfied += pct_ok
elif rule_type == "categorical" and col in current_df.columns:
allowed = set(rule.get("allowed_values", []))
if allowed:
non_null = current_df[col].dropna()
if len(non_null) > 0:
pct_ok = non_null.isin(allowed).mean()
rules_satisfied += pct_ok
elif rule_type == "pattern" and col in current_df.columns:
pat = rule.get("pattern", "")
if pat:
non_null = current_df[col].dropna().astype(str)
if len(non_null) > 0:
pct_ok = non_null.str.match(pat).mean()
rules_satisfied += pct_ok
scores["constraint_compliance"] = rules_satisfied / rules_total if rules_total > 0 else 0.8
else:
scores["constraint_compliance"] = 0.8 # Default for tasks without rules
# ── Dimension 7: Step Efficiency (5%) β€” NEW ───────────────────────────
if step_count > 0 and max_steps > 0:
efficiency = 1.0 - (step_count / max_steps)
scores["step_efficiency"] = max(0.1, min(1.0, efficiency * 1.5))
else:
scores["step_efficiency"] = 0.5
# ── Dimension 8: Golden Row Integrity (5%) β€” NEW ──────────────────────
if golden_indices and len(current_df) > 0:
golden_ok = 0
golden_total = len(golden_indices)
for gi in golden_indices:
if gi >= len(clean_df):
continue
golden_row = clean_df.iloc[gi]
# Check if this golden row still exists in current_df (approximately)
found = False
for _, row in current_df.iterrows():
match_count = 0
total_check = 0
for col in clean_df.columns:
if col not in current_df.columns:
continue
total_check += 1
gv = golden_row[col]
cv = row[col]
if pd.isna(gv) and pd.isna(cv):
match_count += 1
elif not pd.isna(gv) and not pd.isna(cv):
if str(gv).strip().lower() == str(cv).strip().lower():
match_count += 1
if total_check > 0 and match_count / total_check > 0.8:
found = True
break
if found:
golden_ok += 1
scores["golden_row_integrity"] = golden_ok / golden_total if golden_total > 0 else 1.0
else:
scores["golden_row_integrity"] = 1.0
# ── Composite Score ──────────────────────────────────────────────────
weights = {
"missing_fixed": 0.20,
"duplicates_removed": 0.15,
"type_correctness": 0.15,
"value_accuracy": 0.20,
"data_preservation": 0.10,
"constraint_compliance": 0.10,
"step_efficiency": 0.05,
"golden_row_integrity": 0.05,
}
composite = sum(scores[dim] * weights[dim] for dim in weights)
# Clamp to open interval (0, 1)
composite = max(0.001, min(0.999, composite))
return composite, scores
def compute_step_reward(
before_df: pd.DataFrame,
after_df: pd.DataFrame,
clean_df: pd.DataFrame,
original_dirty_df: pd.DataFrame,
issue_manifest: Dict[str, Any],
command: str,
data_modified: bool,
) -> float:
"""Compute the reward for a single step."""
cmd = command.strip().split()[0].lower() if command.strip() else ""
# Diagnostic commands get small positive reward
diagnostic_cmds = {"help", "view", "profile", "profile_column", "find_missing",
"find_duplicates", "find_outliers", "check_rules", "history"}
if cmd in diagnostic_cmds:
return 0.02
# Validate and undo get small reward
if cmd in ("validate", "undo"):
return 0.01
# Submit doesn't give step reward (final score is computed separately)
if cmd == "submit":
return 0.001
if not data_modified:
return -0.01
# For data-modifying commands, compare improvement
before_score, _ = compute_score(
original_dirty_df, before_df, clean_df, original_dirty_df, issue_manifest
)
after_score, _ = compute_score(
original_dirty_df, after_df, clean_df, original_dirty_df, issue_manifest
)
improvement = after_score - before_score
if improvement > 0:
return min(0.15, improvement * 2.0 + 0.03)
elif improvement < -0.01:
return max(-0.20, improvement * 2.0)
else:
return 0.001