data-cleaning-openenv / data_cleaning_env.py
Manas281's picture
fixed
fa68c00
# data_cleaning_env.py — Complete environment module for submission
import uuid, random, re, json
from copy import deepcopy
from datetime import datetime, timedelta
from typing import Optional, List, Dict, Any
from enum import Enum
import pandas as pd
import numpy as np
from pydantic import BaseModel, Field
# ─── Models ──────────────────────────────────────────────────────
class TaskDifficulty(str, Enum):
EASY = "easy"
MEDIUM = "medium"
HARD = "hard"
class CleaningAction(BaseModel):
task_id: int = Field(..., description="Which task: 1=easy, 2=medium, 3=hard")
action_type: str = Field(..., description="One of: remove_nulls, fix_dates, remove_outliers")
column: Optional[str] = Field(None, description="Which column to clean (optional)")
params: Optional[Dict[str, Any]] = Field(default_factory=dict)
class DatasetObservation(BaseModel):
task_id: int
task_description: str
difficulty: TaskDifficulty
dataset_preview: str
num_rows: int
null_count: int
date_format_errors: int
outlier_count: int
current_score: float
done: bool = False
reward: float = 0.0
hint: str = ""
class EnvironmentState(BaseModel):
episode_id: str
step_count: int
current_task: int
tasks_completed: List[int] = []
total_reward: float = 0.0
# ─── Dataset generators ──────────────────────────────────────────
def make_task1_dataset(seed=42):
"""EASY: Dataset with null values that need removing."""
random.seed(seed); np.random.seed(seed)
n = 20
df = pd.DataFrame({
'name': [f'Person_{i}' if random.random() > 0.3 else None for i in range(n)],
'age': [random.randint(18, 65) if random.random() > 0.25 else None for _ in range(n)],
'salary': [round(random.uniform(30000, 120000), 2) if random.random() > 0.2 else None for _ in range(n)],
'city': [random.choice(['Mumbai', 'Delhi', 'Pune', None]) for _ in range(n)]
})
return df
def make_task2_dataset(seed=42):
"""MEDIUM: Dataset with inconsistent date formats."""
random.seed(seed); np.random.seed(seed)
n = 20
formats = ["%d/%m/%Y", "%m-%d-%Y", "%Y.%m.%d", "%d %b %Y", "%Y/%m/%d"]
target_format = "%Y-%m-%d"
base = datetime(2023, 1, 1)
dates_raw = []
for i in range(n):
d = base + timedelta(days=random.randint(0, 365))
fmt = random.choice(formats)
dates_raw.append(d.strftime(fmt))
df = pd.DataFrame({
'employee_id': range(1, n+1),
'hire_date': dates_raw,
'department': [random.choice(['Eng', 'HR', 'Finance', 'Sales']) for _ in range(n)],
'salary': [random.randint(40000, 150000) for _ in range(n)]
})
return df, target_format
def make_task3_dataset(seed=42):
"""HARD: Dataset with outliers in numeric columns."""
random.seed(seed); np.random.seed(seed)
n = 30
outlier_salaries = [500000, -5000, 999999, 1200000]
normal_salaries = np.random.normal(60000, 15000, n - len(outlier_salaries)).tolist()
all_salaries = normal_salaries + outlier_salaries
random.shuffle(all_salaries)
outlier_ages = [150, -5, 200]
normal_ages = np.random.randint(22, 60, n - len(outlier_ages)).tolist()
all_ages = normal_ages + outlier_ages
random.shuffle(all_ages)
df = pd.DataFrame({
'employee_id': range(1, n+1),
'age': all_ages,
'salary': [round(s, 2) for s in all_salaries],
'years_exp': [random.randint(0, 35) for _ in range(n)]
})
return df
# ─── Graders ─────────────────────────────────────────────────────
def grade_task1(original_df, cleaned_df) -> float:
original_nulls = original_df.isnull().sum().sum()
if original_nulls == 0:
return 0.9999
cleaned_nulls = cleaned_df.isnull().sum().sum()
null_fix_score = max(0.0, (original_nulls - cleaned_nulls) / original_nulls)
row_retention = len(cleaned_df) / len(original_df)
retention_penalty = row_retention if row_retention < 0.3 else 1.0
return round(min(0.9999, max(0.0001, 0.7 * null_fix_score + 0.3 * retention_penalty)), 4)
def grade_task2(original_df, cleaned_df, target_format="%Y-%m-%d") -> float:
if 'hire_date' not in cleaned_df.columns:
return 0.0001
correctly_formatted = 0
total = len(cleaned_df)
pattern = re.compile(r'^\d{4}-\d{2}-\d{2}$')
for val in cleaned_df['hire_date']:
if isinstance(val, str) and pattern.match(val):
try:
datetime.strptime(val, target_format)
correctly_formatted += 1
except ValueError:
pass
base_score = correctly_formatted / total if total > 0 else 0.0
integrity = 1.0
for col in ['employee_id', 'department', 'salary']:
if col in original_df.columns and col in cleaned_df.columns:
if not original_df[col].equals(cleaned_df[col]):
integrity *= 0.8
return round(min(0.9999, max(0.0001, base_score * integrity)), 4)
def grade_task3(original_df, cleaned_df) -> float:
score_parts = []
for col in ['salary', 'age']:
if col not in original_df.columns or col not in cleaned_df.columns:
continue
orig_series = pd.to_numeric(original_df[col], errors='coerce').dropna()
clean_series = pd.to_numeric(cleaned_df[col], errors='coerce').dropna()
Q1, Q3 = orig_series.quantile(0.25), orig_series.quantile(0.75)
IQR = Q3 - Q1
lower, upper = Q1 - 3 * IQR, Q3 + 3 * IQR
original_outliers = ((orig_series < lower) | (orig_series > upper)).sum()
remaining_outliers = ((clean_series < lower) | (clean_series > upper)).sum()
if original_outliers == 0:
col_score = 0.9999
else:
removal_score = (original_outliers - remaining_outliers) / original_outliers
valid_retention = min(1.0, len(clean_series) / max(len(orig_series) - original_outliers, 1))
col_score = 0.6 * max(0.0, removal_score) + 0.4 * valid_retention
score_parts.append(col_score)
return round(min(0.9999, max(0.0001, sum(score_parts) / len(score_parts))), 4) if score_parts else 0.0001
# ─── Environment ─────────────────────────────────────────────────
TASK_DESCRIPTIONS = {
1: "EASY: The dataset has missing values (NaN/null). Call action_type='remove_nulls' to drop all rows with any null values.",
2: "MEDIUM: The 'hire_date' column has inconsistent formats. Call action_type='fix_dates' with column='hire_date' to standardise to YYYY-MM-DD.",
3: "HARD: The 'salary' and 'age' columns contain extreme outliers. Call action_type='remove_outliers' with column='all' to remove them via IQR method."
}
class DataCleaningEnvironment:
def __init__(self, task_id: int = 1, seed: int = 42):
assert task_id in [1, 2, 3], "task_id must be 1, 2, or 3"
self.task_id = task_id
self.seed = seed
self._original_df = None
self._current_df = None
self._target_format = None
self._step_count = 0
self._episode_id = None
self._done = False
self._total_reward = 0.0
def reset(self) -> DatasetObservation:
self._episode_id = str(uuid.uuid4())[:8]
self._step_count = 0
self._done = False
self._total_reward = 0.0
if self.task_id == 1:
self._original_df = make_task1_dataset(self.seed)
elif self.task_id == 2:
self._original_df, self._target_format = make_task2_dataset(self.seed)
else:
self._original_df = make_task3_dataset(self.seed)
self._current_df = deepcopy(self._original_df)
return self._make_observation(reward=0.0, done=False)
def step(self, action: CleaningAction):
if self._done:
return self._make_observation(reward=0.0, done=True), 0.0, True, {"error": "Episode already done"}
self._step_count += 1
try:
self._current_df = self._apply_action(action)
except Exception as e:
obs = self._make_observation(reward=-0.1, done=False)
obs.hint = f"Action failed: {str(e)[:100]}"
return obs, -0.1, False, {"error": str(e)}
reward = self._compute_reward()
self._total_reward += reward
self._done = True
return self._make_observation(reward=reward, done=True), reward, True, {"step": self._step_count}
def state(self) -> EnvironmentState:
return EnvironmentState(
episode_id=self._episode_id or "not_started",
step_count=self._step_count,
current_task=self.task_id,
tasks_completed=[],
total_reward=self._total_reward
)
@staticmethod
def _parse_date(val):
for fmt in ["%Y-%m-%d", "%Y/%m/%d", "%Y.%m.%d",
"%m/%d/%Y", "%m-%d-%Y", "%d/%m/%Y", "%d-%m-%Y", "%d.%m.%Y"]:
try:
return datetime.strptime(str(val).strip(), fmt).strftime("%Y-%m-%d")
except:
continue
return None
def _apply_action(self, action: CleaningAction) -> pd.DataFrame:
df = deepcopy(self._current_df)
if action.action_type == "remove_nulls":
df = df.dropna(subset=[action.column]) if action.column and action.column in df.columns else df.dropna()
elif action.action_type == "fix_dates":
col = action.column or "hire_date"
if col not in df.columns:
raise ValueError(f"Column '{col}' not found.")
df[col] = df[col].apply(self._parse_date)
elif action.action_type == "remove_outliers":
target_cols = [action.column] if action.column and action.column != "all" else ['salary', 'age']
for col in target_cols:
if col not in df.columns:
continue
series = pd.to_numeric(df[col], errors='coerce')
Q1, Q3 = series.quantile(0.25), series.quantile(0.75)
IQR = Q3 - Q1
df = df[(series >= Q1 - 3*IQR) & (series <= Q3 + 3*IQR)]
else:
raise ValueError(f"Unknown action_type: '{action.action_type}'")
return df
def _compute_reward(self) -> float:
if self.task_id == 1:
return grade_task1(self._original_df, self._current_df)
elif self.task_id == 2:
return grade_task2(self._original_df, self._current_df, self._target_format)
else:
return grade_task3(self._original_df, self._current_df)
def _make_observation(self, reward: float, done: bool) -> DatasetObservation:
df = self._current_df if self._current_df is not None else pd.DataFrame()
difficulties = {1: TaskDifficulty.EASY, 2: TaskDifficulty.MEDIUM, 3: TaskDifficulty.HARD}
null_count = int(df.isnull().sum().sum()) if len(df) > 0 else 0
date_errors = 0
if self.task_id == 2 and 'hire_date' in df.columns:
pat = re.compile(r'^\d{4}-\d{2}-\d{2}$')
date_errors = sum(1 for v in df['hire_date'] if not (isinstance(v, str) and pat.match(v)))
outlier_count = 0
if self.task_id == 3 and 'salary' in df.columns:
s = pd.to_numeric(df['salary'], errors='coerce').dropna()
if len(s) > 0:
Q1, Q3 = s.quantile(0.25), s.quantile(0.75)
IQR = Q3 - Q1
outlier_count = int(((s < Q1 - 3*IQR) | (s > Q3 + 3*IQR)).sum())
hints = {
1: "Call action_type='remove_nulls' to drop rows with missing values.",
2: "Call action_type='fix_dates' with column='hire_date'.",
3: "Call action_type='remove_outliers' with column='all'."
}
return DatasetObservation(
task_id=self.task_id,
task_description=TASK_DESCRIPTIONS[self.task_id],
difficulty=difficulties[self.task_id],
dataset_preview=df.head(5).to_string() if len(df) > 0 else "(empty)",
num_rows=len(df),
null_count=null_count,
date_format_errors=date_errors,
outlier_count=outlier_count,
current_score=self._compute_reward() if self._current_df is not None and self._step_count > 0 else 0.0,
done=done,
reward=reward,
hint=hints[self.task_id] if not done else ""
)