Spaces:
Sleeping
Sleeping
| # environment.py | |
| # βββββββββββββββββββββββββββββββββββββββββββββ | |
| # Core OpenEnv class β step / reset / state | |
| # Full OpenEnv spec compliant | |
| # βββββββββββββββββββββββββββββββββββββββββββββ | |
| import pandas as pd | |
| import numpy as np | |
| from typing import Any, Dict, Optional, Tuple | |
| from config import ( | |
| MAX_STEPS, | |
| REWARD_ISSUE_FIXED, | |
| REWARD_SUBMIT_BONUS, | |
| PENALTY_WRONG_ACTION, | |
| PENALTY_DESTRUCTIVE, | |
| TASK_EASY, TASK_MEDIUM, TASK_HARD, | |
| ) | |
| from models import Observation, Action, Reward, StepResult | |
| from tasks import Task, get_task | |
| from utils import ( | |
| df_to_records, | |
| get_null_counts, | |
| get_duplicate_count, | |
| detect_outliers_iqr, | |
| clean_column_name, | |
| standardize_column, | |
| clamp, | |
| ) | |
| from datasets import detect_issues | |
| class DataCleaningEnv: | |
| """ | |
| OpenEnv-compliant Data Cleaning Environment. | |
| The agent receives a messy CSV dataset and must | |
| issue cleaning actions to fix it step by step. | |
| Actions: | |
| fill_missing β fill null values | |
| drop_duplicates β remove duplicate rows | |
| fix_dtype β convert column dtype | |
| rename_column β rename a column | |
| remove_outliers β remove outlier rows | |
| standardize_values β replace values via mapping | |
| submit β finalize and trigger grader | |
| """ | |
| def __init__(self, difficulty: str = TASK_EASY): | |
| self.difficulty = difficulty | |
| self.task: Task = get_task(difficulty) | |
| self.step_count = 0 | |
| self.total_reward = 0.0 | |
| self.done = False | |
| self._prev_score = 0.0 | |
| self._initialized = False | |
| # ββ OpenEnv API βββββββββββββββββββββββββββ | |
| def reset(self) -> Observation: | |
| """Start a fresh episode""" | |
| self.task.reset() | |
| self.step_count = 0 | |
| self.total_reward = 0.0 | |
| self.done = False | |
| self._prev_score = 0.0 | |
| self._initialized = True | |
| return self._make_observation() | |
| def step(self, action: Action) -> StepResult: | |
| """Execute one action and return result""" | |
| if not self._initialized: | |
| self.reset() | |
| if self.done: | |
| obs = self._make_observation() | |
| reward = Reward(value=0.0, total=self.total_reward, | |
| reason="Episode already done") | |
| return StepResult(observation=obs, reward=reward, | |
| done=True, info={}) | |
| self.step_count += 1 | |
| reward_value, reason, info = self._execute_action(action) | |
| # Clamp and accumulate | |
| reward_value = clamp(reward_value, -1.0, 1.0) | |
| self.total_reward = round(self.total_reward + reward_value, 4) | |
| # Check done conditions | |
| if self.step_count >= MAX_STEPS: | |
| self.done = True | |
| reason += " | Max steps reached" | |
| reward = Reward( | |
| value = round(reward_value, 4), | |
| total = self.total_reward, | |
| reason = reason, | |
| ) | |
| obs = self._make_observation() | |
| obs.done = self.done | |
| return StepResult( | |
| observation = obs, | |
| reward = reward, | |
| done = self.done, | |
| info = info, | |
| ) | |
| def state(self) -> Dict[str, Any]: | |
| """Return current environment state as dict""" | |
| if not self._initialized: | |
| return {"status": "not initialized, call reset() first"} | |
| score, details = self.task.grade() | |
| return { | |
| "task_id": self.task.task_id, | |
| "difficulty": self.difficulty, | |
| "step": self.step_count, | |
| "max_steps": MAX_STEPS, | |
| "total_reward": self.total_reward, | |
| "current_score": score, | |
| "done": self.done, | |
| "issues": self.task.issues, | |
| "grade_details": details, | |
| "dataframe": df_to_records(self.task.current_df), | |
| "columns": list(self.task.current_df.columns), | |
| "null_counts": get_null_counts(self.task.current_df), | |
| "duplicate_rows": get_duplicate_count(self.task.current_df), | |
| } | |
| # ββ Action Executor βββββββββββββββββββββββ | |
| def _execute_action(self, action: Action | |
| ) -> Tuple[float, str, Dict]: | |
| """Route action to handler, return (reward, reason, info)""" | |
| df = self.task.current_df | |
| p = action.parameters | |
| info = {} | |
| try: | |
| if action.action_type == "fill_missing": | |
| df, reward, reason = self._fill_missing(df, p) | |
| elif action.action_type == "drop_duplicates": | |
| df, reward, reason = self._drop_duplicates(df) | |
| elif action.action_type == "fix_dtype": | |
| df, reward, reason = self._fix_dtype(df, p) | |
| elif action.action_type == "rename_column": | |
| df, reward, reason = self._rename_column(df, p) | |
| elif action.action_type == "remove_outliers": | |
| df, reward, reason = self._remove_outliers(df, p) | |
| elif action.action_type == "standardize_values": | |
| df, reward, reason = self._standardize_values(df, p) | |
| elif action.action_type == "submit": | |
| df, reward, reason, info = self._submit(df) | |
| else: | |
| reward = PENALTY_WRONG_ACTION | |
| reason = f"Unknown action: {action.action_type}" | |
| except Exception as e: | |
| reward = PENALTY_WRONG_ACTION | |
| reason = f"Action failed: {str(e)}" | |
| info = {"error": str(e)} | |
| self.task.current_df = df | |
| self.task.update_issues() | |
| return reward, reason, info | |
| # ββ Action Handlers βββββββββββββββββββββββ | |
| def _fill_missing(self, df: pd.DataFrame, | |
| params: Dict) -> Tuple[pd.DataFrame, float, str]: | |
| column = params.get("column") | |
| strategy = params.get("strategy", "mean") | |
| if column not in df.columns: | |
| return df, PENALTY_WRONG_ACTION, f"Column '{column}' not found" | |
| null_before = df[column].isnull().sum() | |
| if null_before == 0: | |
| return df, PENALTY_WRONG_ACTION, \ | |
| f"No nulls in '{column}' β redundant action" | |
| df = df.copy() | |
| if strategy == "mean": | |
| numeric = pd.to_numeric(df[column], errors="coerce") | |
| df[column] = df[column].fillna(numeric.mean()) | |
| elif strategy == "median": | |
| numeric = pd.to_numeric(df[column], errors="coerce") | |
| df[column] = df[column].fillna(numeric.median()) | |
| elif strategy == "mode": | |
| df[column].fillna(df[column].mode()[0], inplace=True) | |
| elif strategy == "drop": | |
| before = len(df) | |
| df.dropna(subset=[column], inplace=True) | |
| dropped = before - len(df) | |
| if dropped > len(df) * 0.3: | |
| return df, PENALTY_DESTRUCTIVE, \ | |
| f"Dropped {dropped} rows β too destructive" | |
| else: | |
| df[column].fillna(strategy, inplace=True) | |
| null_after = df[column].isnull().sum() | |
| fixed = null_before - null_after | |
| reward = REWARD_ISSUE_FIXED * (fixed / max(null_before, 1)) | |
| return df, reward, f"Filled {fixed} nulls in '{column}' via {strategy}" | |
| def _drop_duplicates(self, df: pd.DataFrame | |
| ) -> Tuple[pd.DataFrame, float, str]: | |
| dup_before = df.duplicated().sum() | |
| if dup_before == 0: | |
| return df, PENALTY_WRONG_ACTION, "No duplicates β redundant action" | |
| df = df.drop_duplicates().reset_index(drop=True) | |
| reward = REWARD_ISSUE_FIXED | |
| return df, reward, f"Removed {dup_before} duplicate rows" | |
| def _fix_dtype(self, df: pd.DataFrame, | |
| params: Dict) -> Tuple[pd.DataFrame, float, str]: | |
| column = params.get("column") | |
| target_type = params.get("target_type", "float") | |
| if column not in df.columns: | |
| return df, PENALTY_WRONG_ACTION, f"Column '{column}' not found" | |
| df = df.copy() | |
| try: | |
| if target_type == "int": | |
| df[column] = pd.to_numeric( | |
| df[column], errors="coerce").fillna(0).astype(int) | |
| elif target_type == "float": | |
| df[column] = pd.to_numeric(df[column], errors="coerce") | |
| elif target_type == "str": | |
| df[column] = df[column].astype(str) | |
| elif target_type == "datetime": | |
| df[column] = pd.to_datetime(df[column], errors="coerce") | |
| else: | |
| return df, PENALTY_WRONG_ACTION, \ | |
| f"Unknown target type: {target_type}" | |
| except Exception as e: | |
| return df, PENALTY_WRONG_ACTION, f"Dtype fix failed: {e}" | |
| return df, REWARD_ISSUE_FIXED, \ | |
| f"Converted '{column}' to {target_type}" | |
| def _rename_column(self, df: pd.DataFrame, | |
| params: Dict) -> Tuple[pd.DataFrame, float, str]: | |
| old_name = params.get("old_name") | |
| new_name = params.get("new_name") | |
| if old_name not in df.columns: | |
| return df, PENALTY_WRONG_ACTION, \ | |
| f"Column '{old_name}' not found" | |
| expected = clean_column_name(old_name) | |
| if new_name != expected: | |
| return df, PENALTY_WRONG_ACTION, \ | |
| f"Suggested name '{new_name}' β expected '{expected}'" | |
| df = df.rename(columns={old_name: new_name}) | |
| return df, REWARD_ISSUE_FIXED, \ | |
| f"Renamed '{old_name}' to '{new_name}'" | |
| def _remove_outliers(self, df: pd.DataFrame, | |
| params: Dict) -> Tuple[pd.DataFrame, float, str]: | |
| column = params.get("column") | |
| method = params.get("method", "iqr") | |
| if column not in df.columns: | |
| return df, PENALTY_WRONG_ACTION, f"Column '{column}' not found" | |
| df = df.copy() | |
| try: | |
| numeric = pd.to_numeric(df[column], errors="coerce") | |
| df[column] = numeric | |
| if method == "iqr": | |
| mask = detect_outliers_iqr(df, column) | |
| else: | |
| from utils import detect_outliers_zscore | |
| mask = detect_outliers_zscore(df, column) | |
| count = mask.sum() | |
| if count == 0: | |
| return df, PENALTY_WRONG_ACTION, \ | |
| f"No outliers in '{column}' β redundant" | |
| if count > len(df) * 0.4: | |
| return df, PENALTY_DESTRUCTIVE, \ | |
| f"Would remove {count} rows β too destructive" | |
| df = df[~mask].reset_index(drop=True) | |
| return df, REWARD_ISSUE_FIXED, \ | |
| f"Removed {count} outliers from '{column}'" | |
| except Exception as e: | |
| return df, PENALTY_WRONG_ACTION, f"Outlier removal failed: {e}" | |
| def _standardize_values(self, df: pd.DataFrame, | |
| params: Dict | |
| ) -> Tuple[pd.DataFrame, float, str]: | |
| column = params.get("column") | |
| mapping = params.get("mapping", {}) | |
| if column not in df.columns: | |
| return df, PENALTY_WRONG_ACTION, f"Column '{column}' not found" | |
| if not mapping: | |
| return df, PENALTY_WRONG_ACTION, "Empty mapping provided" | |
| before = df[column].value_counts().to_dict() | |
| df = standardize_column(df, column, mapping) | |
| after = df[column].value_counts().to_dict() | |
| changed = sum(1 for k, v in before.items() | |
| if after.get(mapping.get(k, k), 0) != v) | |
| if changed == 0: | |
| return df, PENALTY_WRONG_ACTION, "No values changed" | |
| return df, REWARD_ISSUE_FIXED, \ | |
| f"Standardized {len(mapping)} values in '{column}'" | |
| def _submit(self, df: pd.DataFrame | |
| ) -> Tuple[pd.DataFrame, float, str, Dict]: | |
| score, details = self.task.grade() | |
| self.done = True | |
| if score >= 0.9: | |
| reward = REWARD_SUBMIT_BONUS + score | |
| reason = f"Excellent! Score: {score:.3f} β dataset is clean!" | |
| elif score >= 0.6: | |
| reward = score | |
| reason = f"Good effort. Score: {score:.3f} β some issues remain" | |
| else: | |
| reward = score * 0.5 | |
| reason = f"Submitted early. Score: {score:.3f} β many issues remain" | |
| return df, reward, reason, {"final_score": score, "details": details} | |
| # ββ Observation Builder βββββββββββββββββββ | |
| def _make_observation(self) -> Observation: | |
| df = self.task.current_df | |
| return Observation( | |
| task_id = self.task.task_id, | |
| difficulty = self.difficulty, | |
| step = self.step_count, | |
| max_steps = MAX_STEPS, | |
| dataframe = df_to_records(df), | |
| columns = list(df.columns), | |
| dtypes = {c: str(t) for c, t in df.dtypes.items()}, | |
| null_counts = get_null_counts(df), | |
| duplicate_rows = get_duplicate_count(df), | |
| issues = self.task.issues, | |
| done = self.done, | |
| ) |