Spaces:
Sleeping
Sleeping
| """Professional-grade AutoClean-Ai Data Cleaning Environment. | |
| This module implements a sophisticated, production-ready RL environment with: | |
| - 10 standard data cleaning operations | |
| - 3 progressive difficulty tasks | |
| - Shaped rewards with partial progress signals | |
| - Comprehensive episode management | |
| - Model-agnostic design (works with any LLM) | |
| - Real-time metrics and logging | |
| - Session management for concurrent users | |
| """ | |
| import uuid | |
| import time | |
| import logging | |
| import pandas as pd | |
| import numpy as np | |
| from typing import Optional, Dict, Any, List, Tuple | |
| from enum import Enum | |
| # Add directories to path for imports to work in both local and HF Spaces | |
| import sys | |
| import os | |
| _dir = os.path.dirname(os.path.abspath(__file__)) | |
| _parent = os.path.dirname(_dir) | |
| if _parent not in sys.path: | |
| sys.path.insert(0, _parent) | |
| if _dir not in sys.path: | |
| sys.path.insert(0, _dir) | |
| from openenv.core.env_server import Environment | |
| from models import ( | |
| DataCleaningAction, | |
| DataCleaningObservation, | |
| DataCleaningState, | |
| EpisodeStatistics, | |
| RewardBreakdown, | |
| DatasetInfo, | |
| DifficultyLevel, | |
| EnvironmentConfig, | |
| CleaningActionType, | |
| ) | |
| # Import from same directory for HF Spaces deployment compatibility | |
| from grader import ( | |
| calculate_reward, | |
| calculate_dataset_quality_score, | |
| grade_task_result, | |
| ) | |
| from dataset_loader import DatasetGenerator | |
| # Configure logging | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' | |
| ) | |
| logger = logging.getLogger(__name__) | |
| class EpisodePhase(Enum): | |
| """Phases of an episode.""" | |
| INITIALIZATION = "initialization" | |
| ACTIVE = "active" | |
| CLEANING = "cleaning" | |
| GRADING = "grading" | |
| COMPLETION = "completion" | |
| class DataCleaningEnvironment(Environment[DataCleaningAction, DataCleaningObservation, DataCleaningState]): | |
| """ | |
| Professional-grade OpenEnv environment for training AI to perform data cleaning. | |
| Features: | |
| - 10 standard data cleaning operations | |
| - 3 progressive difficulty tasks | |
| - Shaped rewards with partial progress signals | |
| - Deterministic grading 0.0-1.0 | |
| - Comprehensive metrics tracking | |
| - Session management | |
| """ | |
| SUPPORTS_CONCURRENT_SESSIONS = True | |
| VERSION = "1.0.0" | |
| def __init__( | |
| self, | |
| transform=None, | |
| config: Optional[EnvironmentConfig] = None, | |
| session_id: Optional[str] = None, | |
| dataset_generator: Optional["DatasetGenerator"] = None, | |
| dataset_loader: Optional["DatasetGenerator"] = None | |
| ): | |
| super().__init__(transform=transform) | |
| # Configuration | |
| self.config = config or EnvironmentConfig() | |
| self.session_id = session_id or str(uuid.uuid4())[:8] | |
| # Dataset management (support both parameter names for backwards compatibility) | |
| self.dataset_generator = dataset_generator or dataset_loader or DatasetGenerator() | |
| self.dataset_loader = self.dataset_generator | |
| # Episode state | |
| self.episode_id: Optional[str] = None | |
| self.episode_phase: EpisodePhase = EpisodePhase.INITIALIZATION | |
| self.step_count: int = 0 | |
| # Current dataset | |
| self.df: Optional[pd.DataFrame] = None | |
| self.initial_df: Optional[pd.DataFrame] = None | |
| self.dataset_history: List[pd.DataFrame] = [] | |
| # Performance tracking | |
| self.reward_history: List[float] = [] | |
| self.action_history: List[Dict[str, Any]] = [] | |
| self.quality_history: List[float] = [] | |
| # Early stopping tracking | |
| self.consecutive_noop_actions: int = 0 | |
| self.consecutive_repeated_actions: int = 0 | |
| self.early_stop_reason: Optional[str] = None | |
| # Performance metrics | |
| self.last_step_time: Optional[float] = None | |
| self.episode_start_time: Optional[float] = None | |
| # Task state | |
| self.current_task_id: str = "" | |
| self.current_difficulty: str = "intermediate" | |
| logger.info(f"Initialized DataCleaningEnvironment (session={self.session_id})") | |
| def reset( | |
| self, | |
| seed: Optional[int] = None, | |
| episode_id: Optional[str] = None, | |
| difficulty: Optional[str] = None, | |
| task_id: Optional[str] = None, | |
| **kwargs | |
| ) -> DataCleaningObservation: | |
| """ | |
| Reset the environment for a new episode. | |
| Args: | |
| seed: Random seed for reproducibility | |
| episode_id: Custom episode ID | |
| difficulty: Starting difficulty level | |
| task_id: Specific task to run | |
| Returns: | |
| Initial observation | |
| """ | |
| if seed is not None: | |
| np.random.seed(seed) | |
| # Generate episode ID | |
| self.episode_id = episode_id or f"ep_{uuid.uuid4().hex[:8]}" | |
| self.episode_start_time = time.time() | |
| self.last_step_time = time.time() | |
| # Reset counters | |
| self.step_count = 0 | |
| self.reward_history = [] | |
| self.action_history = [] | |
| self.quality_history = [] | |
| self.dataset_history = [] | |
| self.consecutive_noop_actions = 0 | |
| self.consecutive_repeated_actions = 0 | |
| self.early_stop_reason = None | |
| # Determine task and difficulty | |
| if task_id: | |
| self.current_task_id = task_id | |
| else: | |
| # Map difficulty to default task | |
| diff_task_map = { | |
| "beginner": "task_1_basic_cleaning", | |
| "intermediate": "task_2_intermediate_cleaning", | |
| "advanced": "task_3_full_pipeline" | |
| } | |
| self.current_task_id = diff_task_map.get(difficulty or "intermediate", "task_1_basic_cleaning") | |
| # Generate dataset based on task | |
| self.df = self.dataset_generator.generate_dataset(self.current_task_id, seed=seed) | |
| self.initial_df = self.df.copy() | |
| self.dataset_history.append(self.df.copy()) | |
| # Calculate initial quality score | |
| initial_quality = calculate_dataset_quality_score(self.df, self.current_task_id) | |
| self.quality_history.append(initial_quality) | |
| self.episode_phase = EpisodePhase.ACTIVE | |
| logger.info(f"Reset episode {self.episode_id} task={self.current_task_id} rows={len(self.df)}") | |
| return self._create_observation( | |
| message="Episode started. Perform data cleaning operations on the dataset.", | |
| metadata={"phase": self.episode_phase.value} | |
| ) | |
| def step( | |
| self, | |
| action: DataCleaningAction, | |
| **kwargs | |
| ) -> DataCleaningObservation: | |
| """ | |
| Process the AI's action and return the next observation. | |
| Executes the requested data cleaning operation, calculates reward, | |
| and returns updated state. | |
| """ | |
| current_time = time.time() | |
| step_duration = current_time - (self.last_step_time or current_time) | |
| self.last_step_time = current_time | |
| if self.df is None: | |
| return self._create_error_observation("No active dataset. Call /reset first.") | |
| # Handle submit action | |
| if action.action_type == CleaningActionType.SUBMIT: | |
| return self._end_episode() | |
| # Handle revert action | |
| if action.action_type == CleaningActionType.REVERT: | |
| if len(self.dataset_history) > 1: | |
| self.dataset_history.pop() | |
| self.df = self.dataset_history[-1].copy() | |
| return self._create_observation( | |
| message="Reverted to previous state", | |
| reward=0.0 | |
| ) | |
| else: | |
| return self._create_observation( | |
| message="Cannot revert: no previous state available", | |
| reward=-0.05 | |
| ) | |
| # Validate and execute action | |
| try: | |
| self.df = self._execute_action(self.df, action) | |
| self.dataset_history.append(self.df.copy()) | |
| # Calculate reward and quality | |
| previous_quality = self.quality_history[-1] if self.quality_history else 0.0 | |
| current_quality = calculate_dataset_quality_score(self.df, self.current_task_id) | |
| reward, reward_info = calculate_reward( | |
| df=self.df, | |
| initial_df=self.initial_df, | |
| previous_quality=previous_quality, | |
| current_quality=current_quality, | |
| action=action, | |
| task_id=self.current_task_id, | |
| step_count=self.step_count | |
| ) | |
| self.quality_history.append(current_quality) | |
| # Update tracking | |
| self.reward_history.append(reward) | |
| self.action_history.append({ | |
| "action_type": action.action_type, | |
| "params": action.params, | |
| "reward": reward, | |
| "quality_improvement": current_quality - previous_quality | |
| }) | |
| # Check for repeated actions | |
| if len(self.action_history) >= 2: | |
| last_action = self.action_history[-2] | |
| if last_action["action_type"] == action.action_type and last_action["params"] == action.params: | |
| self.consecutive_repeated_actions += 1 | |
| reward -= 0.05 * self.consecutive_repeated_actions | |
| else: | |
| self.consecutive_repeated_actions = 0 | |
| self.step_count += 1 | |
| # Check for early stopping | |
| early_stop = self._check_early_stopping() | |
| done = self.step_count >= self.config.max_steps_per_episode or early_stop | |
| if early_stop: | |
| done = True | |
| self.early_stop_reason = early_stop | |
| self.episode_phase = EpisodePhase.COMPLETION | |
| return self._create_observation( | |
| message=f"Executed {action.action_type} successfully", | |
| reward=reward, | |
| done=done, | |
| metadata={ | |
| "step": self.step_count, | |
| "previous_quality": previous_quality, | |
| "current_quality": current_quality, | |
| "quality_improvement": current_quality - previous_quality, | |
| "reward_breakdown": reward_info, | |
| } | |
| ) | |
| except Exception as e: | |
| logger.warning(f"Action execution failed: {e}") | |
| return self._create_error_observation(f"Invalid action: {str(e)}") | |
| def state(self) -> DataCleaningState: | |
| """Return comprehensive state of the environment.""" | |
| # Calculate derived metrics | |
| avg_reward = sum(self.reward_history) / max(1, len(self.reward_history)) | |
| current_quality = self.quality_history[-1] if self.quality_history else 0.0 | |
| best_quality = max(self.quality_history) if self.quality_history else 0.0 | |
| # Build episode statistics | |
| episode_stats = EpisodeStatistics( | |
| episode_id=self.episode_id or "", | |
| total_steps=self.step_count, | |
| initial_quality=self.quality_history[0] if self.quality_history else 0.0, | |
| final_quality=current_quality, | |
| quality_improvement=current_quality - (self.quality_history[0] if self.quality_history else 0.0), | |
| actions_taken={action["action_type"]: sum(1 for a in self.action_history if a["action_type"] == action["action_type"]) for action in self.action_history}, | |
| reward_history=self.reward_history.copy(), | |
| total_reward=sum(self.reward_history), | |
| ) | |
| dataset_info = self._get_dataset_info(self.df) if self.df is not None else DatasetInfo() | |
| initial_dataset_info = self._get_dataset_info(self.initial_df) if self.initial_df is not None else DatasetInfo() | |
| return DataCleaningState( | |
| episode_id=self.episode_id, | |
| session_id=self.session_id, | |
| step_count=self.step_count, | |
| max_steps=self.config.max_steps_per_episode, | |
| dataset_info=dataset_info, | |
| initial_dataset_info=initial_dataset_info, | |
| total_reward=sum(self.reward_history), | |
| reward_history=self.reward_history.copy(), | |
| action_history=self.action_history.copy(), | |
| current_quality_score=current_quality, | |
| best_quality_score=best_quality, | |
| current_task_id=self.current_task_id, | |
| difficulty_level=self.current_difficulty, | |
| episode_start_time=self.episode_start_time if hasattr(self, 'episode_start_time') else None, | |
| last_step_time=self.last_step_time, | |
| metadata={ | |
| "phase": self.episode_phase.value, | |
| "version": self.VERSION, | |
| } | |
| ) | |
| def close(self) -> None: | |
| """Clean up resources.""" | |
| logger.info(f"Closed environment (session={self.session_id})") | |
| def _execute_action(self, df: pd.DataFrame, action: DataCleaningAction) -> pd.DataFrame: | |
| """Execute the requested cleaning action on the dataframe.""" | |
| params = action.params | |
| if action.action_type == CleaningActionType.DROP_NULLS: | |
| column = params.get("column") | |
| if column and column in df.columns: | |
| return df.dropna(subset=[column]).reset_index(drop=True) | |
| else: | |
| return df.dropna().reset_index(drop=True) | |
| elif action.action_type == CleaningActionType.FILL_NULLS: | |
| column = params.get("column") | |
| strategy = params.get("strategy", "mean") | |
| if column not in df.columns: | |
| raise ValueError(f"Column {column} not found") | |
| if strategy == "mean": | |
| df[column] = df[column].fillna(df[column].mean()) | |
| elif strategy == "median": | |
| df[column] = df[column].fillna(df[column].median()) | |
| elif strategy == "mode": | |
| df[column] = df[column].fillna(df[column].mode()[0] if not df[column].mode().empty else 0) | |
| elif strategy == "forward_fill": | |
| df[column] = df[column].ffill() | |
| elif strategy == "backward_fill": | |
| df[column] = df[column].bfill() | |
| return df | |
| elif action.action_type == CleaningActionType.REMOVE_DUPLICATES: | |
| columns = params.get("columns") | |
| if columns: | |
| return df.drop_duplicates(subset=columns).reset_index(drop=True) | |
| else: | |
| return df.drop_duplicates().reset_index(drop=True) | |
| elif action.action_type == CleaningActionType.VALIDATE_EMAIL: | |
| column = params.get("column") | |
| drop_invalid = params.get("drop_invalid", False) | |
| if column not in df.columns: | |
| raise ValueError(f"Column {column} not found") | |
| # Simple email validation regex | |
| email_pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$' | |
| valid_mask = df[column].astype(str).str.match(email_pattern, na=False) | |
| if drop_invalid: | |
| return df[valid_mask].reset_index(drop=True) | |
| else: | |
| return df | |
| elif action.action_type == CleaningActionType.OUTLIER_REMOVAL: | |
| column = params.get("column") | |
| multiplier = params.get("multiplier", 1.5) | |
| if column not in df.columns: | |
| raise ValueError(f"Column {column} not found") | |
| Q1 = df[column].quantile(0.25) | |
| Q3 = df[column].quantile(0.75) | |
| IQR = Q3 - Q1 | |
| lower_bound = Q1 - multiplier * IQR | |
| upper_bound = Q3 + multiplier * IQR | |
| return df[(df[column] >= lower_bound) & (df[column] <= upper_bound)].reset_index(drop=True) | |
| elif action.action_type == CleaningActionType.CONVERT_TYPES: | |
| column = params.get("column") | |
| dtype = params.get("dtype") | |
| if column not in df.columns: | |
| raise ValueError(f"Column {column} not found") | |
| if dtype == "int": | |
| df[column] = pd.to_numeric(df[column], errors='coerce').astype('Int64') | |
| elif dtype == "float": | |
| df[column] = pd.to_numeric(df[column], errors='coerce') | |
| elif dtype == "str": | |
| df[column] = df[column].astype(str) | |
| elif dtype == "datetime": | |
| df[column] = pd.to_datetime(df[column], errors='coerce') | |
| return df | |
| elif action.action_type == CleaningActionType.NORMALIZE: | |
| column = params.get("column") | |
| method = params.get("method", "minmax") | |
| if column not in df.columns: | |
| raise ValueError(f"Column {column} not found") | |
| if method == "minmax": | |
| min_val = df[column].min() | |
| max_val = df[column].max() | |
| if max_val != min_val: | |
| df[column] = (df[column] - min_val) / (max_val - min_val) | |
| elif method == "zscore": | |
| mean_val = df[column].mean() | |
| std_val = df[column].std() | |
| if std_val != 0: | |
| df[column] = (df[column] - mean_val) / std_val | |
| return df | |
| elif action.action_type == CleaningActionType.DROP_COLUMNS: | |
| columns = params.get("columns", []) | |
| existing_columns = [col for col in columns if col in df.columns] | |
| return df.drop(columns=existing_columns).reset_index(drop=True) | |
| elif action.action_type == CleaningActionType.FILTER_ROWS: | |
| column = params.get("column") | |
| operator = params.get("operator") | |
| value = params.get("value") | |
| if column not in df.columns: | |
| raise ValueError(f"Column {column} not found") | |
| if operator == ">": | |
| return df[df[column] > value].reset_index(drop=True) | |
| elif operator == "<": | |
| return df[df[column] < value].reset_index(drop=True) | |
| elif operator == "==": | |
| return df[df[column] == value].reset_index(drop=True) | |
| elif operator == ">=": | |
| return df[df[column] >= value].reset_index(drop=True) | |
| elif operator == "<=": | |
| return df[df[column] <= value].reset_index(drop=True) | |
| else: | |
| raise ValueError(f"Unknown operator: {operator}") | |
| else: | |
| raise ValueError(f"Unknown action type: {action.action_type}") | |
| def _get_dataset_info(self, df: pd.DataFrame) -> DatasetInfo: | |
| """Generate comprehensive dataset metadata and quality metrics.""" | |
| return DatasetInfo( | |
| shape=[df.shape[0], df.shape[1]], | |
| columns=list(df.columns), | |
| null_counts=df.isnull().sum().to_dict(), | |
| null_percentages=(df.isnull().sum() / len(df) * 100).to_dict(), | |
| duplicate_count=df.duplicated().sum(), | |
| dtypes=df.dtypes.astype(str).to_dict(), | |
| numeric_columns=list(df.select_dtypes(include=[np.number]).columns), | |
| categorical_columns=list(df.select_dtypes(exclude=[np.number]).columns), | |
| quality_score=calculate_dataset_quality_score(df, self.current_task_id) | |
| ) | |
| def _create_observation( | |
| self, | |
| message: str = "", | |
| reward: Optional[float] = None, | |
| done: bool = False, | |
| metadata: Optional[Dict[str, Any]] = None | |
| ) -> DataCleaningObservation: | |
| """Create a comprehensive observation.""" | |
| dataset_info = self._get_dataset_info(self.df) if self.df is not None else DatasetInfo() | |
| all_actions = list(CleaningActionType) | |
| available_actions = [action for action in all_actions if action != CleaningActionType.SUBMIT] | |
| reward_breakdown = metadata.get("reward_breakdown") if metadata else None | |
| previous_quality = self.quality_history[-2] if len(self.quality_history) >= 2 else 0.0 | |
| current_quality = self.quality_history[-1] if self.quality_history else 0.0 | |
| return DataCleaningObservation( | |
| dataset_info=dataset_info, | |
| done=done, | |
| reward=reward, | |
| message=message, | |
| available_actions=available_actions, | |
| step_count=self.step_count, | |
| task_id=self.current_task_id, | |
| quality_score=current_quality, | |
| previous_quality=previous_quality, | |
| quality_improvement=current_quality - previous_quality, | |
| reward_breakdown=reward_breakdown, | |
| action_history=self.action_history.copy(), | |
| difficulty_level=DifficultyLevel(self.current_difficulty) if self.current_difficulty in DifficultyLevel.__members__ else DifficultyLevel.INTERMEDIATE, | |
| task_progress=self.step_count / self.config.max_steps_per_episode, | |
| metadata=metadata or {} | |
| ) | |
| def _create_error_observation(self, error_message: str) -> DataCleaningObservation: | |
| """Create an error observation.""" | |
| return DataCleaningObservation( | |
| done=False, | |
| reward=-0.1, | |
| message=f"Error: {error_message}", | |
| step_count=self.step_count, | |
| task_id=self.current_task_id, | |
| metadata={"error": error_message} | |
| ) | |
| def _end_episode(self) -> DataCleaningObservation: | |
| """End the current episode and perform final grading.""" | |
| self.episode_phase = EpisodePhase.GRADING | |
| # Calculate final grade | |
| final_score = grade_task_result( | |
| initial_df=self.initial_df, | |
| final_df=self.df, | |
| task_id=self.current_task_id, | |
| step_count=self.step_count | |
| ) | |
| self.episode_phase = EpisodePhase.COMPLETION | |
| return DataCleaningObservation( | |
| done=True, | |
| reward=final_score, | |
| message=f"Episode completed. Final score: {final_score:.4f}", | |
| step_count=self.step_count, | |
| task_id=self.current_task_id, | |
| quality_score=calculate_dataset_quality_score(self.df, self.current_task_id), | |
| metadata={ | |
| "episode_complete": True, | |
| "final_score": final_score, | |
| } | |
| ) | |
| def _check_early_stopping(self) -> Optional[str]: | |
| """ | |
| Check if episode should stop early based on performance conditions. | |
| Returns: | |
| str describing early stop reason, or None if should continue. | |
| """ | |
| if not self.config.early_stopping_enabled: | |
| return None | |
| # Require minimum steps before early stopping | |
| if self.step_count < 3: | |
| return None | |
| # 1. No improvement after multiple steps | |
| if len(self.quality_history) >= 5: | |
| recent_quality = self.quality_history[-5:] | |
| if max(recent_quality) == min(recent_quality): | |
| return "no_improvement" | |
| # 2. Too many repeated actions | |
| if self.consecutive_repeated_actions >= 3: | |
| return "repeated_actions" | |
| # 3. Perfect quality achieved early | |
| current_quality = self.quality_history[-1] if self.quality_history else 0.0 | |
| if current_quality >= 0.95: | |
| return "perfect_quality" | |
| return None |