DataCleanser / env /reward.py
sairaj2's picture
Add /web dashboard UI and update Docker setup
3432ccd
"""
Multi-component reward system for data cleaning environment
"""
import pandas as pd
import numpy as np
from typing import Dict, List, Optional, Tuple
from .models import Reward, QualityMetrics, Action, ActionType
from .utils import (
calculate_completeness,
calculate_uniqueness,
detect_missing_values,
detect_duplicates
)
class RewardCalculator:
"""Calculate rewards for data cleaning actions"""
def __init__(
self,
step_penalty: float = 0.01,
destructive_penalty: float = 0.5,
redundant_penalty: float = 0.05,
quality_weight: float = 0.4,
issue_weight: float = 0.4,
schema_weight: float = 0.2
):
self.step_penalty = step_penalty
self.destructive_penalty = destructive_penalty
self.redundant_penalty = redundant_penalty
self.quality_weight = quality_weight
self.issue_weight = issue_weight
self.schema_weight = schema_weight
self.previous_quality: Optional[float] = None
self.previous_issues: int = 0
self.action_history: List[Dict] = []
def calculate_quality_metrics(self, df: pd.DataFrame) -> QualityMetrics:
"""Calculate comprehensive quality metrics"""
completeness = float(calculate_completeness(df))
uniqueness = float(calculate_uniqueness(df))
validity_scores = []
for col in df.columns:
non_null = df[col].dropna()
if len(non_null) > 0:
if pd.api.types.is_numeric_dtype(df[col]):
valid = ((non_null >= 0) & (non_null <= 1e9)).mean()
validity_scores.append(valid)
else:
validity_scores.append(1.0)
validity = float(np.mean(validity_scores)) if validity_scores else 1.0
consistency = float(self._calculate_consistency(df))
overall = float(
completeness * 0.3 +
validity * 0.3 +
consistency * 0.2 +
uniqueness * 0.2
)
return QualityMetrics(
completeness=completeness,
validity=validity,
consistency=consistency,
uniqueness=uniqueness,
overall=overall
)
def _calculate_consistency(self, df: pd.DataFrame) -> float:
"""Calculate data consistency score"""
if len(df) == 0:
return 1.0
consistency_scores = []
for col in df.columns:
non_null = df[col].dropna()
if len(non_null) > 0:
if pd.api.types.is_string_dtype(df[col]):
formats = non_null.apply(lambda x: len(str(x)))
if formats.std() > 0:
cv = formats.std() / formats.mean()
consistency_scores.append(max(0, 1 - cv))
else:
consistency_scores.append(1.0)
else:
consistency_scores.append(1.0)
return np.mean(consistency_scores) if consistency_scores else 1.0
def calculate_reward(
self,
current_df: pd.DataFrame,
previous_df: pd.DataFrame,
action: Action,
action_result: Dict,
issues_before: int,
issues_after: int
) -> Reward:
"""Calculate reward for an action"""
reward = Reward()
current_metrics = self.calculate_quality_metrics(current_df)
previous_metrics = self.calculate_quality_metrics(previous_df)
quality_improvement = current_metrics.overall - previous_metrics.overall
reward.data_quality_improvement = quality_improvement * self.quality_weight * 10
if issues_before > 0:
issue_resolution = (issues_before - issues_after) / issues_before
reward.issue_resolution_progress = issue_resolution * self.issue_weight * 5
else:
reward.issue_resolution_progress = 0.0
schema_score = self._calculate_schema_validity(current_df)
reward.schema_validity_score = schema_score * self.schema_weight
if self._is_destructive_change(previous_df, current_df, action):
reward.destructive_changes_penalty = self.destructive_penalty
if self._is_redundant_action(action):
reward.redundant_penalty = self.redundant_penalty
reward.step_penalty = self.step_penalty
self.previous_quality = current_metrics.overall
self.previous_issues = issues_after
self.action_history.append({
'action': action.action_type.value,
'params': action.params,
'quality_change': quality_improvement,
'issues_change': issues_before - issues_after
})
reward.calculate_total()
return reward
def _calculate_schema_validity(self, df: pd.DataFrame) -> float:
"""Calculate schema validity score"""
if len(df) == 0:
return 0.0
validity_checks = []
for col in df.columns:
non_null = df[col].dropna()
if len(non_null) > 0:
type_consistency = len(non_null.apply(type).unique()) == 1
validity_checks.append(type_consistency)
return np.mean(validity_checks) if validity_checks else 1.0
def _is_destructive_change(
self,
previous_df: pd.DataFrame,
current_df: pd.DataFrame,
action: Action
) -> bool:
"""Check if action caused destructive changes"""
rows_lost = len(previous_df) - len(current_df)
if rows_lost > len(previous_df) * 0.1:
return True
cols_lost = len(previous_df.columns) - len(current_df.columns)
if cols_lost > 0:
return True
return False
def _is_redundant_action(self, action: Action) -> bool:
"""Check if action is redundant"""
if len(self.action_history) < 2:
return False
last_action = self.action_history[-1]
if (last_action['action'] == action.action_type.value and
last_action['params'] == action.params):
return True
if (last_action['action'] == action.action_type.value and
last_action['quality_change'] == 0 and
last_action['issues_change'] == 0):
return True
return False
def calculate_final_reward(
self,
final_df: pd.DataFrame,
expected_df: pd.DataFrame,
total_steps: int,
max_steps: int
) -> Reward:
"""Calculate final reward on submission"""
reward = Reward()
final_metrics = self.calculate_quality_metrics(final_df)
expected_metrics = self.calculate_quality_metrics(expected_df)
quality_score = final_metrics.overall
reward.data_quality_improvement = quality_score * self.quality_weight * 20
similarity = self._calculate_similarity(final_df, expected_df)
reward.issue_resolution_progress = similarity * self.issue_weight * 15
schema_score = self._calculate_schema_validity(final_df)
reward.schema_validity_score = schema_score * self.schema_weight * 5
step_efficiency = 1 - (total_steps / max_steps)
reward.step_penalty = (1 - step_efficiency) * 2
reward.calculate_total()
return reward
def _calculate_similarity(
self,
df1: pd.DataFrame,
df2: pd.DataFrame
) -> float:
"""Calculate similarity between two dataframes"""
if df1.shape != df2.shape:
return 0.0
matches = 0
total = 0
for col in df1.columns:
if col in df2.columns:
for i in range(len(df1)):
total += 1
val1 = df1.iloc[i][col]
val2 = df2.iloc[i][col]
if pd.isna(val1) and pd.isna(val2):
matches += 1
elif val1 == val2:
matches += 1
return matches / total if total > 0 else 0.0
def reset(self):
"""Reset reward calculator state"""
self.previous_quality = None
self.previous_issues = 0
self.action_history = []