| """ |
| OpenEnv Data Cleaning Environment - Grading System |
| Evaluates data cleaning solutions based on task-specific criteria. |
| Deterministic scoring aligned with OpenEnv expectations. |
| """ |
|
|
| import logging |
| from typing import Dict, Any, List, Optional |
|
|
| import pandas as pd |
| import numpy as np |
|
|
| try: |
| from .evaluator import evaluate_cleanliness |
| |
| _ = evaluate_cleanliness |
| except ImportError: |
| from evaluator import evaluate_cleanliness |
| |
| _ = evaluate_cleanliness |
|
|
| try: |
| from .models import GradeResult, TaskConfig |
| from .tasks import get_task_config |
| except ImportError: |
| from models import GradeResult, TaskConfig |
| from tasks import get_task_config |
|
|
| logger = logging.getLogger("openenv-datacleaner.grader") |
|
|
|
|
| class Grader: |
| """ |
| Evaluates data cleaning solutions against task criteria. |
| Produces deterministic, reproducible scores. |
| """ |
|
|
| def __init__(self): |
| self._original_dataset: Optional[pd.DataFrame] = None |
| self._current_dataset: Optional[pd.DataFrame] = None |
| self._action_history: List[Dict[str, Any]] = [] |
| self._task_config: Optional[TaskConfig] = None |
|
|
| def setup( |
| self, |
| task_id: str, |
| original_dataset: pd.DataFrame, |
| current_dataset: pd.DataFrame, |
| action_history: List[Dict[str, Any]] |
| ): |
| """Set up grader with task context and datasets.""" |
| self._task_config = get_task_config(task_id) |
| self._original_dataset = original_dataset.copy() |
| self._current_dataset = current_dataset.copy() |
| self._action_history = list(action_history) |
|
|
| def grade(self) -> GradeResult: |
| """ |
| Grade the submitted solution. |
| Returns GradeResult with final_score, breakdown, and feedback. |
| """ |
| if self._task_config is None: |
| raise RuntimeError("Grader not set up. Call setup() first.") |
|
|
| criteria = self._task_config.grading_criteria |
| breakdown = {} |
| total_score = 0.0 |
| total_weight = 0.0 |
|
|
| |
| for criterion, weight in criteria.items(): |
| score = self._evaluate_criterion(criterion) |
| breakdown[criterion] = round(score, 4) |
| total_score += score * weight |
| total_weight += weight |
|
|
| |
| final_score = total_score / total_weight if total_weight > 0 else 0.0 |
| |
| |
| |
| if final_score <= 0.0: |
| final_score = 0.0001 |
| elif final_score >= 1.0: |
| final_score = 0.9999 |
| |
| final_score = round(final_score, 4) |
|
|
| feedback = self._generate_feedback(breakdown, final_score) |
|
|
| return GradeResult( |
| final_score=final_score, |
| breakdown=breakdown, |
| feedback=feedback |
| ) |
|
|
| def _evaluate_criterion(self, criterion: str) -> float: |
| """Evaluate a single criterion and return score (0.0 to 1.0).""" |
| evaluators = { |
| "null_handling": self._evaluate_null_handling, |
| "duplicate_handling": self._evaluate_duplicate_handling, |
| "email_validation": self._evaluate_email_validation, |
| "outlier_handling": self._evaluate_outlier_handling, |
| "type_conversion": self._evaluate_type_conversion, |
| "normalization": self._evaluate_normalization, |
| "efficiency": self._evaluate_efficiency, |
| "format_standardization": self._evaluate_format_standardization, |
| } |
|
|
| if criterion not in evaluators: |
| logger.warning(f"Unknown criterion: {criterion}") |
| return 0.0 |
|
|
| return evaluators[criterion]() |
|
|
| def _evaluate_null_handling(self) -> float: |
| """Score based on how well nulls were handled.""" |
| original_nulls = int(self._original_dataset.isnull().sum().sum()) |
| current_nulls = int(self._current_dataset.isnull().sum().sum()) |
|
|
| if original_nulls == 0: |
| return 1.0 |
|
|
| reduction = (original_nulls - current_nulls) / original_nulls |
| return round(min(max(reduction, 0.0), 1.0), 4) |
|
|
| def _evaluate_duplicate_handling(self) -> float: |
| """Score based on duplicate removal.""" |
| original_duplicates = int(self._original_dataset.duplicated().sum()) |
| current_duplicates = int(self._current_dataset.duplicated().sum()) |
|
|
| if original_duplicates == 0: |
| return 1.0 |
|
|
| reduction = (original_duplicates - current_duplicates) / original_duplicates |
| return round(min(max(reduction, 0.0), 1.0), 4) |
|
|
| def _evaluate_email_validation(self) -> float: |
| """Score based on email validation quality.""" |
| import re |
| email_pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$' |
|
|
| |
| email_col = "email" |
| if email_col not in self._current_dataset.columns: |
| return 0.5 |
|
|
| valid_mask = self._current_dataset[email_col].astype(str).str.match( |
| email_pattern, na=False |
| ) |
| valid_ratio = float(valid_mask.mean()) if len(self._current_dataset) > 0 else 0.0 |
|
|
| return round(min(max(valid_ratio, 0.0), 1.0), 4) |
|
|
| def _evaluate_outlier_handling(self) -> float: |
| """Score based on outlier handling using IQR method.""" |
| numeric_cols = self._current_dataset.select_dtypes( |
| include=[np.number] |
| ).columns.tolist() |
|
|
| if not numeric_cols: |
| return 0.5 |
|
|
| |
| original_outliers = self._count_outliers(self._original_dataset, numeric_cols) |
| current_outliers = self._count_outliers(self._current_dataset, numeric_cols) |
|
|
| if original_outliers == 0: |
| return 1.0 |
|
|
| reduction = (original_outliers - current_outliers) / original_outliers |
| return round(min(max(reduction, 0.0), 1.0), 4) |
|
|
| def _count_outliers( |
| self, df: pd.DataFrame, numeric_cols: List[str], multiplier: float = 1.5 |
| ) -> int: |
| """Count total outliers across numeric columns using IQR.""" |
| total_outliers = 0 |
| for col in numeric_cols: |
| if col not in df.columns: |
| continue |
| Q1 = df[col].quantile(0.25) |
| Q3 = df[col].quantile(0.75) |
| IQR = Q3 - Q1 |
| if IQR == 0: |
| continue |
| lower = Q1 - multiplier * IQR |
| upper = Q3 + multiplier * IQR |
| total_outliers += int(((df[col] < lower) | (df[col] > upper)).sum()) |
| return total_outliers |
|
|
| def _evaluate_type_conversion(self) -> float: |
| """Score based on proper type conversions.""" |
| actions_taken = [a["action_type"] for a in self._action_history] |
|
|
| if "convert_types" not in actions_taken: |
| return 0.0 |
|
|
| |
| score = 0.0 |
| expected_types = { |
| "id": ["int", "Int64"], |
| "age": ["int", "Int64"], |
| "salary": ["float"], |
| "join_date": ["datetime"], |
| } |
|
|
| for col, expected in expected_types.items(): |
| if col in self._current_dataset.columns: |
| actual_dtype = str(self._current_dataset[col].dtype) |
| if any(exp in actual_dtype for exp in expected): |
| score += 1.0 |
|
|
| return round(score / len(expected_types), 4) if expected_types else 0.0 |
|
|
| def _evaluate_normalization(self) -> float: |
| """Score based on normalization of numeric columns.""" |
| actions_taken = [a["action_type"] for a in self._action_history] |
|
|
| if "normalize" not in actions_taken: |
| return 0.0 |
|
|
| |
| numeric_cols = self._current_dataset.select_dtypes( |
| include=[np.number] |
| ).columns.tolist() |
|
|
| if not numeric_cols: |
| return 0.0 |
|
|
| normalized_count = 0 |
| for col in numeric_cols: |
| min_val = self._current_dataset[col].min() |
| max_val = self._current_dataset[col].max() |
| if max_val - min_val > 0: |
| |
| if min_val >= 0 and max_val <= 1: |
| normalized_count += 1 |
|
|
| return round(normalized_count / len(numeric_cols), 4) |
|
|
| def _evaluate_efficiency(self) -> float: |
| """Score based on action efficiency (fewer actions = better).""" |
| action_count = len(self._action_history) |
| expected_count = len(self._task_config.expected_actions) |
|
|
| if action_count == 0: |
| return 0.0 |
|
|
| |
| if action_count <= expected_count: |
| return 1.0 |
| elif action_count <= expected_count * 2: |
| return round(expected_count / action_count, 4) |
| else: |
| return round(max(0.0, 1.0 - (action_count - expected_count) / expected_count), 4) |
|
|
|
|
| def _evaluate_format_standardization(self) -> float: |
| """Score based on format standardization quality.""" |
| |
| score = 0.0 |
| actions_taken = [a["action_type"] for a in self._action_history] |
|
|
| |
| if "standardize_format" in actions_taken: |
| |
| columns_to_check = ["Education", "Gender", "City"] |
| for col in columns_to_check: |
| if col in self._current_dataset.columns: |
| |
| if col in ["Education", "Gender"]: |
| |
| value_counts = self._current_dataset[col].value_counts() |
| if len(value_counts) > 0: |
| |
| properly_formatted = self._current_dataset[col].astype(str).apply( |
| lambda x: x.istitle() if col == "Education" else x.isupper() or x.islower() |
| ).mean() |
| score += properly_formatted * 0.3 |
| elif col == "City": |
| |
| properly_formatted = self._current_dataset[col].astype(str).apply( |
| lambda x: x.istitle() |
| ).mean() |
| score += properly_formatted * 0.4 |
|
|
| |
| date_cols = ["JoiningYear"] |
| for col in date_cols: |
| if col in self._current_dataset.columns: |
| |
| if pd.api.types.is_datetime64_any_dtype(self._current_dataset[col]): |
| score += 0.3 |
|
|
| return round(min(max(score, 0.0), 1.0), 4) |
|
|
| def _generate_feedback( |
| self, breakdown: Dict[str, float], final_score: float |
| ) -> str: |
| """Generate human-readable feedback.""" |
| feedback_parts = [] |
|
|
| if final_score >= 0.9: |
| feedback_parts.append("Excellent work!") |
| elif final_score >= 0.7: |
| feedback_parts.append("Good job, room for improvement.") |
| elif final_score >= 0.5: |
| feedback_parts.append("Acceptable, but several areas need attention.") |
| else: |
| feedback_parts.append("Significant improvements needed.") |
|
|
| for criterion, score in breakdown.items(): |
| if score < 0.5: |
| feedback_parts.append(f" - {criterion}: needs improvement ({score:.2f})") |
| elif score >= 0.9: |
| feedback_parts.append(f" - {criterion}: excellent ({score:.2f})") |
|
|
| return "\n".join(feedback_parts) |
|
|
|
|
| |
| def EasyDataCleaningGrader(): return Grader() |
| def MediumDataCleaningGrader(): return Grader() |
| def HardDataCleaningGrader(): return Grader() |
|
|
|
|
|
|
|
|
|
|
|
|