"""Professional-grade data contracts for AutoClean-Ai. This module defines the core data structures for a complex RL environment that trains AI models to perform data cleaning operations on tabular datasets. """ from typing import Optional, Dict, Any, List, Literal from enum import Enum import uuid from pydantic import BaseModel, Field from openenv.core.env_server import Action, Observation, State class DifficultyLevel(Enum): """Difficulty levels for cleaning tasks.""" BEGINNER = "beginner" INTERMEDIATE = "intermediate" ADVANCED = "advanced" EXPERT = "expert" class CleaningActionType(str, Enum): """Available data cleaning actions.""" DROP_NULLS = "drop_nulls" FILL_NULLS = "fill_nulls" REMOVE_DUPLICATES = "remove_duplicates" FILTER_ROWS = "filter_rows" DROP_COLUMNS = "drop_columns" CONVERT_TYPES = "convert_types" VALIDATE_EMAIL = "validate_email" OUTLIER_REMOVAL = "outlier_removal" NORMALIZE = "normalize" SUBMIT = "submit" REVERT = "revert" class DatasetInfo(BaseModel): """Dataset metadata and quality metrics.""" shape: List[int] = Field(default_factory=lambda: [0, 0]) columns: List[str] = Field(default_factory=list) null_counts: Dict[str, int] = Field(default_factory=dict) null_percentages: Dict[str, float] = Field(default_factory=dict) duplicate_count: int = 0 dtypes: Dict[str, str] = Field(default_factory=dict) numeric_columns: List[str] = Field(default_factory=list) categorical_columns: List[str] = Field(default_factory=list) outlier_counts: Dict[str, int] = Field(default_factory=dict) quality_score: float = 0.0 class RewardBreakdown(BaseModel): """Detailed breakdown of reward components.""" null_improvement: float = 0.0 duplicate_improvement: float = 0.0 outlier_improvement: float = 0.0 valid_email_count: int = 0 type_correctness: float = 0.0 normalization_score: float = 0.0 efficiency_bonus: float = 0.0 action_validity: float = 0.0 progress_bonus: float = 0.0 penalty: float = 0.0 total: float = 0.0 class DataCleaningAction(Action): """ Action space for the AI agent. The AI must provide: - Action type from allowed operations - Parameters specific to the action """ action_type: CleaningActionType params: Dict[str, Any] = Field(default_factory=dict) reasoning: str = "" class DataCleaningObservation(Observation): """ Observation space with rich feedback signals. Provides the AI with detailed information about: - Current dataset state and quality metrics - Previous action results - Detailed reward breakdown - Available valid actions - Task progress """ # Core dataset info dataset_info: DatasetInfo = Field(default_factory=DatasetInfo) # Episode state done: bool = False reward: Optional[float] = None # Feedback message: str = "" available_actions: List[CleaningActionType] = Field(default_factory=list) step_count: int = 0 task_id: str = "" # Performance metrics quality_score: float = 0.0 previous_quality: float = 0.0 quality_improvement: float = 0.0 # Detailed reward breakdown reward_breakdown: Optional[RewardBreakdown] = None # History action_history: List[Dict[str, Any]] = Field(default_factory=list) # Difficulty and progress difficulty_level: DifficultyLevel = DifficultyLevel.INTERMEDIATE task_progress: float = 0.0 # Extended metadata metadata: Dict[str, Any] = Field(default_factory=dict) class EpisodeStatistics(BaseModel): """Comprehensive statistics for an episode.""" episode_id: str = "" total_steps: int = 0 initial_quality: float = 0.0 final_quality: float = 0.0 quality_improvement: float = 0.0 nulls_removed: int = 0 duplicates_removed: int = 0 outliers_removed: int = 0 emails_validated: int = 0 actions_taken: Dict[str, int] = Field(default_factory=dict) reward_history: List[float] = Field(default_factory=list) efficiency_score: float = 0.0 total_reward: float = 0.0 class DataCleaningState(State): """ Comprehensive state tracking for the RL environment. Tracks episode-level and agent-level state. """ # Episode identification episode_id: Optional[str] = None session_id: str = Field(default_factory=lambda: str(uuid.uuid4())[:8]) # Step tracking step_count: int = 0 max_steps: int = 15 # Dataset state dataset_info: DatasetInfo = Field(default_factory=DatasetInfo) initial_dataset_info: DatasetInfo = Field(default_factory=DatasetInfo) # Performance tracking total_reward: float = 0.0 reward_history: List[float] = Field(default_factory=list) action_history: List[Dict[str, Any]] = Field(default_factory=list) # Quality metrics current_quality_score: float = 0.0 best_quality_score: float = 0.0 # Task state current_task_id: str = "" difficulty_level: str = "intermediate" # Timestamps episode_start_time: Optional[float] = None last_step_time: Optional[float] = None # Metadata for extensibility metadata: Dict[str, Any] = Field(default_factory=dict) def to_dict(self) -> Dict[str, Any]: """Convert state to dictionary for serialization.""" return { "episode_id": self.episode_id, "session_id": self.session_id, "step_count": self.step_count, "max_steps": self.max_steps, "current_quality_score": self.current_quality_score, "best_quality_score": self.best_quality_score, "total_reward": self.total_reward, "current_task_id": self.current_task_id, "difficulty_level": self.difficulty_level, **self.metadata } class EnvironmentConfig(BaseModel): """Configuration for the data cleaning environment.""" # Episode configuration max_steps_per_episode: int = 15 min_steps_for_completion: int = 3 # Early stopping configuration early_stopping_enabled: bool = True early_stopping_patience: int = 3 early_stopping_min_reward: float = 0.01 # Reward configuration reward_weights: Dict[str, float] = Field(default_factory=lambda: { "null_improvement": 0.25, "duplicate_improvement": 0.20, "outlier_improvement": 0.20, "email_validation": 0.15, "type_correctness": 0.10, "efficiency": 0.10, }) # Difficulty configuration initial_difficulty: str = "intermediate" adaptive_difficulty: bool = True # Task configuration tasks: List[Dict[str, Any]] = Field(default_factory=list)