# data_cleaning_env.py — Complete environment module for submission import uuid, random, re, json from copy import deepcopy from datetime import datetime, timedelta from typing import Optional, List, Dict, Any from enum import Enum import pandas as pd import numpy as np from pydantic import BaseModel, Field # ─── Models ────────────────────────────────────────────────────── class TaskDifficulty(str, Enum): EASY = "easy" MEDIUM = "medium" HARD = "hard" class CleaningAction(BaseModel): task_id: int = Field(..., description="Which task: 1=easy, 2=medium, 3=hard") action_type: str = Field(..., description="One of: remove_nulls, fix_dates, remove_outliers") column: Optional[str] = Field(None, description="Which column to clean (optional)") params: Optional[Dict[str, Any]] = Field(default_factory=dict) class DatasetObservation(BaseModel): task_id: int task_description: str difficulty: TaskDifficulty dataset_preview: str num_rows: int null_count: int date_format_errors: int outlier_count: int current_score: float done: bool = False reward: float = 0.0 hint: str = "" class EnvironmentState(BaseModel): episode_id: str step_count: int current_task: int tasks_completed: List[int] = [] total_reward: float = 0.0 # ─── Dataset generators ────────────────────────────────────────── def make_task1_dataset(seed=42): """EASY: Dataset with null values that need removing.""" random.seed(seed); np.random.seed(seed) n = 20 df = pd.DataFrame({ 'name': [f'Person_{i}' if random.random() > 0.3 else None for i in range(n)], 'age': [random.randint(18, 65) if random.random() > 0.25 else None for _ in range(n)], 'salary': [round(random.uniform(30000, 120000), 2) if random.random() > 0.2 else None for _ in range(n)], 'city': [random.choice(['Mumbai', 'Delhi', 'Pune', None]) for _ in range(n)] }) return df def make_task2_dataset(seed=42): """MEDIUM: Dataset with inconsistent date formats.""" random.seed(seed); np.random.seed(seed) n = 20 formats = ["%d/%m/%Y", "%m-%d-%Y", "%Y.%m.%d", "%d %b %Y", "%Y/%m/%d"] target_format = "%Y-%m-%d" base = datetime(2023, 1, 1) dates_raw = [] for i in range(n): d = base + timedelta(days=random.randint(0, 365)) fmt = random.choice(formats) dates_raw.append(d.strftime(fmt)) df = pd.DataFrame({ 'employee_id': range(1, n+1), 'hire_date': dates_raw, 'department': [random.choice(['Eng', 'HR', 'Finance', 'Sales']) for _ in range(n)], 'salary': [random.randint(40000, 150000) for _ in range(n)] }) return df, target_format def make_task3_dataset(seed=42): """HARD: Dataset with outliers in numeric columns.""" random.seed(seed); np.random.seed(seed) n = 30 outlier_salaries = [500000, -5000, 999999, 1200000] normal_salaries = np.random.normal(60000, 15000, n - len(outlier_salaries)).tolist() all_salaries = normal_salaries + outlier_salaries random.shuffle(all_salaries) outlier_ages = [150, -5, 200] normal_ages = np.random.randint(22, 60, n - len(outlier_ages)).tolist() all_ages = normal_ages + outlier_ages random.shuffle(all_ages) df = pd.DataFrame({ 'employee_id': range(1, n+1), 'age': all_ages, 'salary': [round(s, 2) for s in all_salaries], 'years_exp': [random.randint(0, 35) for _ in range(n)] }) return df # ─── Graders ───────────────────────────────────────────────────── def grade_task1(original_df, cleaned_df) -> float: original_nulls = original_df.isnull().sum().sum() if original_nulls == 0: return 0.9999 cleaned_nulls = cleaned_df.isnull().sum().sum() null_fix_score = max(0.0, (original_nulls - cleaned_nulls) / original_nulls) row_retention = len(cleaned_df) / len(original_df) retention_penalty = row_retention if row_retention < 0.3 else 1.0 return round(min(0.9999, max(0.0001, 0.7 * null_fix_score + 0.3 * retention_penalty)), 4) def grade_task2(original_df, cleaned_df, target_format="%Y-%m-%d") -> float: if 'hire_date' not in cleaned_df.columns: return 0.0001 correctly_formatted = 0 total = len(cleaned_df) pattern = re.compile(r'^\d{4}-\d{2}-\d{2}$') for val in cleaned_df['hire_date']: if isinstance(val, str) and pattern.match(val): try: datetime.strptime(val, target_format) correctly_formatted += 1 except ValueError: pass base_score = correctly_formatted / total if total > 0 else 0.0 integrity = 1.0 for col in ['employee_id', 'department', 'salary']: if col in original_df.columns and col in cleaned_df.columns: if not original_df[col].equals(cleaned_df[col]): integrity *= 0.8 return round(min(0.9999, max(0.0001, base_score * integrity)), 4) def grade_task3(original_df, cleaned_df) -> float: score_parts = [] for col in ['salary', 'age']: if col not in original_df.columns or col not in cleaned_df.columns: continue orig_series = pd.to_numeric(original_df[col], errors='coerce').dropna() clean_series = pd.to_numeric(cleaned_df[col], errors='coerce').dropna() Q1, Q3 = orig_series.quantile(0.25), orig_series.quantile(0.75) IQR = Q3 - Q1 lower, upper = Q1 - 3 * IQR, Q3 + 3 * IQR original_outliers = ((orig_series < lower) | (orig_series > upper)).sum() remaining_outliers = ((clean_series < lower) | (clean_series > upper)).sum() if original_outliers == 0: col_score = 0.9999 else: removal_score = (original_outliers - remaining_outliers) / original_outliers valid_retention = min(1.0, len(clean_series) / max(len(orig_series) - original_outliers, 1)) col_score = 0.6 * max(0.0, removal_score) + 0.4 * valid_retention score_parts.append(col_score) return round(min(0.9999, max(0.0001, sum(score_parts) / len(score_parts))), 4) if score_parts else 0.0001 # ─── Environment ───────────────────────────────────────────────── TASK_DESCRIPTIONS = { 1: "EASY: The dataset has missing values (NaN/null). Call action_type='remove_nulls' to drop all rows with any null values.", 2: "MEDIUM: The 'hire_date' column has inconsistent formats. Call action_type='fix_dates' with column='hire_date' to standardise to YYYY-MM-DD.", 3: "HARD: The 'salary' and 'age' columns contain extreme outliers. Call action_type='remove_outliers' with column='all' to remove them via IQR method." } class DataCleaningEnvironment: def __init__(self, task_id: int = 1, seed: int = 42): assert task_id in [1, 2, 3], "task_id must be 1, 2, or 3" self.task_id = task_id self.seed = seed self._original_df = None self._current_df = None self._target_format = None self._step_count = 0 self._episode_id = None self._done = False self._total_reward = 0.0 def reset(self) -> DatasetObservation: self._episode_id = str(uuid.uuid4())[:8] self._step_count = 0 self._done = False self._total_reward = 0.0 if self.task_id == 1: self._original_df = make_task1_dataset(self.seed) elif self.task_id == 2: self._original_df, self._target_format = make_task2_dataset(self.seed) else: self._original_df = make_task3_dataset(self.seed) self._current_df = deepcopy(self._original_df) return self._make_observation(reward=0.0, done=False) def step(self, action: CleaningAction): if self._done: return self._make_observation(reward=0.0, done=True), 0.0, True, {"error": "Episode already done"} self._step_count += 1 try: self._current_df = self._apply_action(action) except Exception as e: obs = self._make_observation(reward=-0.1, done=False) obs.hint = f"Action failed: {str(e)[:100]}" return obs, -0.1, False, {"error": str(e)} reward = self._compute_reward() self._total_reward += reward self._done = True return self._make_observation(reward=reward, done=True), reward, True, {"step": self._step_count} def state(self) -> EnvironmentState: return EnvironmentState( episode_id=self._episode_id or "not_started", step_count=self._step_count, current_task=self.task_id, tasks_completed=[], total_reward=self._total_reward ) @staticmethod def _parse_date(val): for fmt in ["%Y-%m-%d", "%Y/%m/%d", "%Y.%m.%d", "%m/%d/%Y", "%m-%d-%Y", "%d/%m/%Y", "%d-%m-%Y", "%d.%m.%Y"]: try: return datetime.strptime(str(val).strip(), fmt).strftime("%Y-%m-%d") except: continue return None def _apply_action(self, action: CleaningAction) -> pd.DataFrame: df = deepcopy(self._current_df) if action.action_type == "remove_nulls": df = df.dropna(subset=[action.column]) if action.column and action.column in df.columns else df.dropna() elif action.action_type == "fix_dates": col = action.column or "hire_date" if col not in df.columns: raise ValueError(f"Column '{col}' not found.") df[col] = df[col].apply(self._parse_date) elif action.action_type == "remove_outliers": target_cols = [action.column] if action.column and action.column != "all" else ['salary', 'age'] for col in target_cols: if col not in df.columns: continue series = pd.to_numeric(df[col], errors='coerce') Q1, Q3 = series.quantile(0.25), series.quantile(0.75) IQR = Q3 - Q1 df = df[(series >= Q1 - 3*IQR) & (series <= Q3 + 3*IQR)] else: raise ValueError(f"Unknown action_type: '{action.action_type}'") return df def _compute_reward(self) -> float: if self.task_id == 1: return grade_task1(self._original_df, self._current_df) elif self.task_id == 2: return grade_task2(self._original_df, self._current_df, self._target_format) else: return grade_task3(self._original_df, self._current_df) def _make_observation(self, reward: float, done: bool) -> DatasetObservation: df = self._current_df if self._current_df is not None else pd.DataFrame() difficulties = {1: TaskDifficulty.EASY, 2: TaskDifficulty.MEDIUM, 3: TaskDifficulty.HARD} null_count = int(df.isnull().sum().sum()) if len(df) > 0 else 0 date_errors = 0 if self.task_id == 2 and 'hire_date' in df.columns: pat = re.compile(r'^\d{4}-\d{2}-\d{2}$') date_errors = sum(1 for v in df['hire_date'] if not (isinstance(v, str) and pat.match(v))) outlier_count = 0 if self.task_id == 3 and 'salary' in df.columns: s = pd.to_numeric(df['salary'], errors='coerce').dropna() if len(s) > 0: Q1, Q3 = s.quantile(0.25), s.quantile(0.75) IQR = Q3 - Q1 outlier_count = int(((s < Q1 - 3*IQR) | (s > Q3 + 3*IQR)).sum()) hints = { 1: "Call action_type='remove_nulls' to drop rows with missing values.", 2: "Call action_type='fix_dates' with column='hire_date'.", 3: "Call action_type='remove_outliers' with column='all'." } return DatasetObservation( task_id=self.task_id, task_description=TASK_DESCRIPTIONS[self.task_id], difficulty=difficulties[self.task_id], dataset_preview=df.head(5).to_string() if len(df) > 0 else "(empty)", num_rows=len(df), null_count=null_count, date_format_errors=date_errors, outlier_count=outlier_count, current_score=self._compute_reward() if self._current_df is not None and self._step_count > 0 else 0.0, done=done, reward=reward, hint=hints[self.task_id] if not done else "" )