Spaces:
Sleeping
Sleeping
| """ | |
| Multi-component reward system for data cleaning environment | |
| """ | |
| import pandas as pd | |
| import numpy as np | |
| from typing import Dict, List, Optional, Tuple | |
| from .models import Reward, QualityMetrics, Action, ActionType | |
| from .utils import ( | |
| calculate_completeness, | |
| calculate_uniqueness, | |
| detect_missing_values, | |
| detect_duplicates | |
| ) | |
| class RewardCalculator: | |
| """Calculate rewards for data cleaning actions""" | |
| def __init__( | |
| self, | |
| step_penalty: float = 0.01, | |
| destructive_penalty: float = 0.5, | |
| redundant_penalty: float = 0.05, | |
| quality_weight: float = 0.4, | |
| issue_weight: float = 0.4, | |
| schema_weight: float = 0.2 | |
| ): | |
| self.step_penalty = step_penalty | |
| self.destructive_penalty = destructive_penalty | |
| self.redundant_penalty = redundant_penalty | |
| self.quality_weight = quality_weight | |
| self.issue_weight = issue_weight | |
| self.schema_weight = schema_weight | |
| self.previous_quality: Optional[float] = None | |
| self.previous_issues: int = 0 | |
| self.action_history: List[Dict] = [] | |
| def calculate_quality_metrics(self, df: pd.DataFrame) -> QualityMetrics: | |
| """Calculate comprehensive quality metrics""" | |
| completeness = float(calculate_completeness(df)) | |
| uniqueness = float(calculate_uniqueness(df)) | |
| validity_scores = [] | |
| for col in df.columns: | |
| non_null = df[col].dropna() | |
| if len(non_null) > 0: | |
| if pd.api.types.is_numeric_dtype(df[col]): | |
| valid = ((non_null >= 0) & (non_null <= 1e9)).mean() | |
| validity_scores.append(valid) | |
| else: | |
| validity_scores.append(1.0) | |
| validity = float(np.mean(validity_scores)) if validity_scores else 1.0 | |
| consistency = float(self._calculate_consistency(df)) | |
| overall = float( | |
| completeness * 0.3 + | |
| validity * 0.3 + | |
| consistency * 0.2 + | |
| uniqueness * 0.2 | |
| ) | |
| return QualityMetrics( | |
| completeness=completeness, | |
| validity=validity, | |
| consistency=consistency, | |
| uniqueness=uniqueness, | |
| overall=overall | |
| ) | |
| def _calculate_consistency(self, df: pd.DataFrame) -> float: | |
| """Calculate data consistency score""" | |
| if len(df) == 0: | |
| return 1.0 | |
| consistency_scores = [] | |
| for col in df.columns: | |
| non_null = df[col].dropna() | |
| if len(non_null) > 0: | |
| if pd.api.types.is_string_dtype(df[col]): | |
| formats = non_null.apply(lambda x: len(str(x))) | |
| if formats.std() > 0: | |
| cv = formats.std() / formats.mean() | |
| consistency_scores.append(max(0, 1 - cv)) | |
| else: | |
| consistency_scores.append(1.0) | |
| else: | |
| consistency_scores.append(1.0) | |
| return np.mean(consistency_scores) if consistency_scores else 1.0 | |
| def calculate_reward( | |
| self, | |
| current_df: pd.DataFrame, | |
| previous_df: pd.DataFrame, | |
| action: Action, | |
| action_result: Dict, | |
| issues_before: int, | |
| issues_after: int | |
| ) -> Reward: | |
| """Calculate reward for an action""" | |
| reward = Reward() | |
| current_metrics = self.calculate_quality_metrics(current_df) | |
| previous_metrics = self.calculate_quality_metrics(previous_df) | |
| quality_improvement = current_metrics.overall - previous_metrics.overall | |
| reward.data_quality_improvement = quality_improvement * self.quality_weight * 10 | |
| if issues_before > 0: | |
| issue_resolution = (issues_before - issues_after) / issues_before | |
| reward.issue_resolution_progress = issue_resolution * self.issue_weight * 5 | |
| else: | |
| reward.issue_resolution_progress = 0.0 | |
| schema_score = self._calculate_schema_validity(current_df) | |
| reward.schema_validity_score = schema_score * self.schema_weight | |
| if self._is_destructive_change(previous_df, current_df, action): | |
| reward.destructive_changes_penalty = self.destructive_penalty | |
| if self._is_redundant_action(action): | |
| reward.redundant_penalty = self.redundant_penalty | |
| reward.step_penalty = self.step_penalty | |
| self.previous_quality = current_metrics.overall | |
| self.previous_issues = issues_after | |
| self.action_history.append({ | |
| 'action': action.action_type.value, | |
| 'params': action.params, | |
| 'quality_change': quality_improvement, | |
| 'issues_change': issues_before - issues_after | |
| }) | |
| reward.calculate_total() | |
| return reward | |
| def _calculate_schema_validity(self, df: pd.DataFrame) -> float: | |
| """Calculate schema validity score""" | |
| if len(df) == 0: | |
| return 0.0 | |
| validity_checks = [] | |
| for col in df.columns: | |
| non_null = df[col].dropna() | |
| if len(non_null) > 0: | |
| type_consistency = len(non_null.apply(type).unique()) == 1 | |
| validity_checks.append(type_consistency) | |
| return np.mean(validity_checks) if validity_checks else 1.0 | |
| def _is_destructive_change( | |
| self, | |
| previous_df: pd.DataFrame, | |
| current_df: pd.DataFrame, | |
| action: Action | |
| ) -> bool: | |
| """Check if action caused destructive changes""" | |
| rows_lost = len(previous_df) - len(current_df) | |
| if rows_lost > len(previous_df) * 0.1: | |
| return True | |
| cols_lost = len(previous_df.columns) - len(current_df.columns) | |
| if cols_lost > 0: | |
| return True | |
| return False | |
| def _is_redundant_action(self, action: Action) -> bool: | |
| """Check if action is redundant""" | |
| if len(self.action_history) < 2: | |
| return False | |
| last_action = self.action_history[-1] | |
| if (last_action['action'] == action.action_type.value and | |
| last_action['params'] == action.params): | |
| return True | |
| if (last_action['action'] == action.action_type.value and | |
| last_action['quality_change'] == 0 and | |
| last_action['issues_change'] == 0): | |
| return True | |
| return False | |
| def calculate_final_reward( | |
| self, | |
| final_df: pd.DataFrame, | |
| expected_df: pd.DataFrame, | |
| total_steps: int, | |
| max_steps: int | |
| ) -> Reward: | |
| """Calculate final reward on submission""" | |
| reward = Reward() | |
| final_metrics = self.calculate_quality_metrics(final_df) | |
| expected_metrics = self.calculate_quality_metrics(expected_df) | |
| quality_score = final_metrics.overall | |
| reward.data_quality_improvement = quality_score * self.quality_weight * 20 | |
| similarity = self._calculate_similarity(final_df, expected_df) | |
| reward.issue_resolution_progress = similarity * self.issue_weight * 15 | |
| schema_score = self._calculate_schema_validity(final_df) | |
| reward.schema_validity_score = schema_score * self.schema_weight * 5 | |
| step_efficiency = 1 - (total_steps / max_steps) | |
| reward.step_penalty = (1 - step_efficiency) * 2 | |
| reward.calculate_total() | |
| return reward | |
| def _calculate_similarity( | |
| self, | |
| df1: pd.DataFrame, | |
| df2: pd.DataFrame | |
| ) -> float: | |
| """Calculate similarity between two dataframes""" | |
| if df1.shape != df2.shape: | |
| return 0.0 | |
| matches = 0 | |
| total = 0 | |
| for col in df1.columns: | |
| if col in df2.columns: | |
| for i in range(len(df1)): | |
| total += 1 | |
| val1 = df1.iloc[i][col] | |
| val2 = df2.iloc[i][col] | |
| if pd.isna(val1) and pd.isna(val2): | |
| matches += 1 | |
| elif val1 == val2: | |
| matches += 1 | |
| return matches / total if total > 0 else 0.0 | |
| def reset(self): | |
| """Reset reward calculator state""" | |
| self.previous_quality = None | |
| self.previous_issues = 0 | |
| self.action_history = [] |