Spaces:
Sleeping
Sleeping
| # data_cleaning_env.py — Complete environment module for submission | |
| import uuid, random, re, json | |
| from copy import deepcopy | |
| from datetime import datetime, timedelta | |
| from typing import Optional, List, Dict, Any | |
| from enum import Enum | |
| import pandas as pd | |
| import numpy as np | |
| from pydantic import BaseModel, Field | |
| # ─── Models ────────────────────────────────────────────────────── | |
| class TaskDifficulty(str, Enum): | |
| EASY = "easy" | |
| MEDIUM = "medium" | |
| HARD = "hard" | |
| class CleaningAction(BaseModel): | |
| task_id: int = Field(..., description="Which task: 1=easy, 2=medium, 3=hard") | |
| action_type: str = Field(..., description="One of: remove_nulls, fix_dates, remove_outliers") | |
| column: Optional[str] = Field(None, description="Which column to clean (optional)") | |
| params: Optional[Dict[str, Any]] = Field(default_factory=dict) | |
| class DatasetObservation(BaseModel): | |
| task_id: int | |
| task_description: str | |
| difficulty: TaskDifficulty | |
| dataset_preview: str | |
| num_rows: int | |
| null_count: int | |
| date_format_errors: int | |
| outlier_count: int | |
| current_score: float | |
| done: bool = False | |
| reward: float = 0.0 | |
| hint: str = "" | |
| class EnvironmentState(BaseModel): | |
| episode_id: str | |
| step_count: int | |
| current_task: int | |
| tasks_completed: List[int] = [] | |
| total_reward: float = 0.0 | |
| # ─── Dataset generators ────────────────────────────────────────── | |
| def make_task1_dataset(seed=42): | |
| """EASY: Dataset with null values that need removing.""" | |
| random.seed(seed); np.random.seed(seed) | |
| n = 20 | |
| df = pd.DataFrame({ | |
| 'name': [f'Person_{i}' if random.random() > 0.3 else None for i in range(n)], | |
| 'age': [random.randint(18, 65) if random.random() > 0.25 else None for _ in range(n)], | |
| 'salary': [round(random.uniform(30000, 120000), 2) if random.random() > 0.2 else None for _ in range(n)], | |
| 'city': [random.choice(['Mumbai', 'Delhi', 'Pune', None]) for _ in range(n)] | |
| }) | |
| return df | |
| def make_task2_dataset(seed=42): | |
| """MEDIUM: Dataset with inconsistent date formats.""" | |
| random.seed(seed); np.random.seed(seed) | |
| n = 20 | |
| formats = ["%d/%m/%Y", "%m-%d-%Y", "%Y.%m.%d", "%d %b %Y", "%Y/%m/%d"] | |
| target_format = "%Y-%m-%d" | |
| base = datetime(2023, 1, 1) | |
| dates_raw = [] | |
| for i in range(n): | |
| d = base + timedelta(days=random.randint(0, 365)) | |
| fmt = random.choice(formats) | |
| dates_raw.append(d.strftime(fmt)) | |
| df = pd.DataFrame({ | |
| 'employee_id': range(1, n+1), | |
| 'hire_date': dates_raw, | |
| 'department': [random.choice(['Eng', 'HR', 'Finance', 'Sales']) for _ in range(n)], | |
| 'salary': [random.randint(40000, 150000) for _ in range(n)] | |
| }) | |
| return df, target_format | |
| def make_task3_dataset(seed=42): | |
| """HARD: Dataset with outliers in numeric columns.""" | |
| random.seed(seed); np.random.seed(seed) | |
| n = 30 | |
| outlier_salaries = [500000, -5000, 999999, 1200000] | |
| normal_salaries = np.random.normal(60000, 15000, n - len(outlier_salaries)).tolist() | |
| all_salaries = normal_salaries + outlier_salaries | |
| random.shuffle(all_salaries) | |
| outlier_ages = [150, -5, 200] | |
| normal_ages = np.random.randint(22, 60, n - len(outlier_ages)).tolist() | |
| all_ages = normal_ages + outlier_ages | |
| random.shuffle(all_ages) | |
| df = pd.DataFrame({ | |
| 'employee_id': range(1, n+1), | |
| 'age': all_ages, | |
| 'salary': [round(s, 2) for s in all_salaries], | |
| 'years_exp': [random.randint(0, 35) for _ in range(n)] | |
| }) | |
| return df | |
| # ─── Graders ───────────────────────────────────────────────────── | |
| def grade_task1(original_df, cleaned_df) -> float: | |
| original_nulls = original_df.isnull().sum().sum() | |
| if original_nulls == 0: | |
| return 0.9999 | |
| cleaned_nulls = cleaned_df.isnull().sum().sum() | |
| null_fix_score = max(0.0, (original_nulls - cleaned_nulls) / original_nulls) | |
| row_retention = len(cleaned_df) / len(original_df) | |
| retention_penalty = row_retention if row_retention < 0.3 else 1.0 | |
| return round(min(0.9999, max(0.0001, 0.7 * null_fix_score + 0.3 * retention_penalty)), 4) | |
| def grade_task2(original_df, cleaned_df, target_format="%Y-%m-%d") -> float: | |
| if 'hire_date' not in cleaned_df.columns: | |
| return 0.0001 | |
| correctly_formatted = 0 | |
| total = len(cleaned_df) | |
| pattern = re.compile(r'^\d{4}-\d{2}-\d{2}$') | |
| for val in cleaned_df['hire_date']: | |
| if isinstance(val, str) and pattern.match(val): | |
| try: | |
| datetime.strptime(val, target_format) | |
| correctly_formatted += 1 | |
| except ValueError: | |
| pass | |
| base_score = correctly_formatted / total if total > 0 else 0.0 | |
| integrity = 1.0 | |
| for col in ['employee_id', 'department', 'salary']: | |
| if col in original_df.columns and col in cleaned_df.columns: | |
| if not original_df[col].equals(cleaned_df[col]): | |
| integrity *= 0.8 | |
| return round(min(0.9999, max(0.0001, base_score * integrity)), 4) | |
| def grade_task3(original_df, cleaned_df) -> float: | |
| score_parts = [] | |
| for col in ['salary', 'age']: | |
| if col not in original_df.columns or col not in cleaned_df.columns: | |
| continue | |
| orig_series = pd.to_numeric(original_df[col], errors='coerce').dropna() | |
| clean_series = pd.to_numeric(cleaned_df[col], errors='coerce').dropna() | |
| Q1, Q3 = orig_series.quantile(0.25), orig_series.quantile(0.75) | |
| IQR = Q3 - Q1 | |
| lower, upper = Q1 - 3 * IQR, Q3 + 3 * IQR | |
| original_outliers = ((orig_series < lower) | (orig_series > upper)).sum() | |
| remaining_outliers = ((clean_series < lower) | (clean_series > upper)).sum() | |
| if original_outliers == 0: | |
| col_score = 0.9999 | |
| else: | |
| removal_score = (original_outliers - remaining_outliers) / original_outliers | |
| valid_retention = min(1.0, len(clean_series) / max(len(orig_series) - original_outliers, 1)) | |
| col_score = 0.6 * max(0.0, removal_score) + 0.4 * valid_retention | |
| score_parts.append(col_score) | |
| return round(min(0.9999, max(0.0001, sum(score_parts) / len(score_parts))), 4) if score_parts else 0.0001 | |
| # ─── Environment ───────────────────────────────────────────────── | |
| TASK_DESCRIPTIONS = { | |
| 1: "EASY: The dataset has missing values (NaN/null). Call action_type='remove_nulls' to drop all rows with any null values.", | |
| 2: "MEDIUM: The 'hire_date' column has inconsistent formats. Call action_type='fix_dates' with column='hire_date' to standardise to YYYY-MM-DD.", | |
| 3: "HARD: The 'salary' and 'age' columns contain extreme outliers. Call action_type='remove_outliers' with column='all' to remove them via IQR method." | |
| } | |
| class DataCleaningEnvironment: | |
| def __init__(self, task_id: int = 1, seed: int = 42): | |
| assert task_id in [1, 2, 3], "task_id must be 1, 2, or 3" | |
| self.task_id = task_id | |
| self.seed = seed | |
| self._original_df = None | |
| self._current_df = None | |
| self._target_format = None | |
| self._step_count = 0 | |
| self._episode_id = None | |
| self._done = False | |
| self._total_reward = 0.0 | |
| def reset(self) -> DatasetObservation: | |
| self._episode_id = str(uuid.uuid4())[:8] | |
| self._step_count = 0 | |
| self._done = False | |
| self._total_reward = 0.0 | |
| if self.task_id == 1: | |
| self._original_df = make_task1_dataset(self.seed) | |
| elif self.task_id == 2: | |
| self._original_df, self._target_format = make_task2_dataset(self.seed) | |
| else: | |
| self._original_df = make_task3_dataset(self.seed) | |
| self._current_df = deepcopy(self._original_df) | |
| return self._make_observation(reward=0.0, done=False) | |
| def step(self, action: CleaningAction): | |
| if self._done: | |
| return self._make_observation(reward=0.0, done=True), 0.0, True, {"error": "Episode already done"} | |
| self._step_count += 1 | |
| try: | |
| self._current_df = self._apply_action(action) | |
| except Exception as e: | |
| obs = self._make_observation(reward=-0.1, done=False) | |
| obs.hint = f"Action failed: {str(e)[:100]}" | |
| return obs, -0.1, False, {"error": str(e)} | |
| reward = self._compute_reward() | |
| self._total_reward += reward | |
| self._done = True | |
| return self._make_observation(reward=reward, done=True), reward, True, {"step": self._step_count} | |
| def state(self) -> EnvironmentState: | |
| return EnvironmentState( | |
| episode_id=self._episode_id or "not_started", | |
| step_count=self._step_count, | |
| current_task=self.task_id, | |
| tasks_completed=[], | |
| total_reward=self._total_reward | |
| ) | |
| def _parse_date(val): | |
| for fmt in ["%Y-%m-%d", "%Y/%m/%d", "%Y.%m.%d", | |
| "%m/%d/%Y", "%m-%d-%Y", "%d/%m/%Y", "%d-%m-%Y", "%d.%m.%Y"]: | |
| try: | |
| return datetime.strptime(str(val).strip(), fmt).strftime("%Y-%m-%d") | |
| except: | |
| continue | |
| return None | |
| def _apply_action(self, action: CleaningAction) -> pd.DataFrame: | |
| df = deepcopy(self._current_df) | |
| if action.action_type == "remove_nulls": | |
| df = df.dropna(subset=[action.column]) if action.column and action.column in df.columns else df.dropna() | |
| elif action.action_type == "fix_dates": | |
| col = action.column or "hire_date" | |
| if col not in df.columns: | |
| raise ValueError(f"Column '{col}' not found.") | |
| df[col] = df[col].apply(self._parse_date) | |
| elif action.action_type == "remove_outliers": | |
| target_cols = [action.column] if action.column and action.column != "all" else ['salary', 'age'] | |
| for col in target_cols: | |
| if col not in df.columns: | |
| continue | |
| series = pd.to_numeric(df[col], errors='coerce') | |
| Q1, Q3 = series.quantile(0.25), series.quantile(0.75) | |
| IQR = Q3 - Q1 | |
| df = df[(series >= Q1 - 3*IQR) & (series <= Q3 + 3*IQR)] | |
| else: | |
| raise ValueError(f"Unknown action_type: '{action.action_type}'") | |
| return df | |
| def _compute_reward(self) -> float: | |
| if self.task_id == 1: | |
| return grade_task1(self._original_df, self._current_df) | |
| elif self.task_id == 2: | |
| return grade_task2(self._original_df, self._current_df, self._target_format) | |
| else: | |
| return grade_task3(self._original_df, self._current_df) | |
| def _make_observation(self, reward: float, done: bool) -> DatasetObservation: | |
| df = self._current_df if self._current_df is not None else pd.DataFrame() | |
| difficulties = {1: TaskDifficulty.EASY, 2: TaskDifficulty.MEDIUM, 3: TaskDifficulty.HARD} | |
| null_count = int(df.isnull().sum().sum()) if len(df) > 0 else 0 | |
| date_errors = 0 | |
| if self.task_id == 2 and 'hire_date' in df.columns: | |
| pat = re.compile(r'^\d{4}-\d{2}-\d{2}$') | |
| date_errors = sum(1 for v in df['hire_date'] if not (isinstance(v, str) and pat.match(v))) | |
| outlier_count = 0 | |
| if self.task_id == 3 and 'salary' in df.columns: | |
| s = pd.to_numeric(df['salary'], errors='coerce').dropna() | |
| if len(s) > 0: | |
| Q1, Q3 = s.quantile(0.25), s.quantile(0.75) | |
| IQR = Q3 - Q1 | |
| outlier_count = int(((s < Q1 - 3*IQR) | (s > Q3 + 3*IQR)).sum()) | |
| hints = { | |
| 1: "Call action_type='remove_nulls' to drop rows with missing values.", | |
| 2: "Call action_type='fix_dates' with column='hire_date'.", | |
| 3: "Call action_type='remove_outliers' with column='all'." | |
| } | |
| return DatasetObservation( | |
| task_id=self.task_id, | |
| task_description=TASK_DESCRIPTIONS[self.task_id], | |
| difficulty=difficulties[self.task_id], | |
| dataset_preview=df.head(5).to_string() if len(df) > 0 else "(empty)", | |
| num_rows=len(df), | |
| null_count=null_count, | |
| date_format_errors=date_errors, | |
| outlier_count=outlier_count, | |
| current_score=self._compute_reward() if self._current_df is not None and self._step_count > 0 else 0.0, | |
| done=done, | |
| reward=reward, | |
| hint=hints[self.task_id] if not done else "" | |
| ) | |