Spaces:
Sleeping
Sleeping
File size: 12,808 Bytes
d7cc083 f173cee d7cc083 f173cee d7cc083 f173cee d7cc083 f173cee d7cc083 f173cee d7cc083 f173cee d7cc083 fa68c00 d7cc083 fa68c00 d7cc083 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 | # data_cleaning_env.py β Complete environment module for submission
import uuid, random, re, json
from copy import deepcopy
from datetime import datetime, timedelta
from typing import Optional, List, Dict, Any
from enum import Enum
import pandas as pd
import numpy as np
from pydantic import BaseModel, Field
# βββ Models ββββββββββββββββββββββββββββββββββββββββββββββββββββββ
class TaskDifficulty(str, Enum):
EASY = "easy"
MEDIUM = "medium"
HARD = "hard"
class CleaningAction(BaseModel):
task_id: int = Field(..., description="Which task: 1=easy, 2=medium, 3=hard")
action_type: str = Field(..., description="One of: remove_nulls, fix_dates, remove_outliers")
column: Optional[str] = Field(None, description="Which column to clean (optional)")
params: Optional[Dict[str, Any]] = Field(default_factory=dict)
class DatasetObservation(BaseModel):
task_id: int
task_description: str
difficulty: TaskDifficulty
dataset_preview: str
num_rows: int
null_count: int
date_format_errors: int
outlier_count: int
current_score: float
done: bool = False
reward: float = 0.0
hint: str = ""
class EnvironmentState(BaseModel):
episode_id: str
step_count: int
current_task: int
tasks_completed: List[int] = []
total_reward: float = 0.0
# βββ Dataset generators ββββββββββββββββββββββββββββββββββββββββββ
def make_task1_dataset(seed=42):
"""EASY: Dataset with null values that need removing."""
random.seed(seed); np.random.seed(seed)
n = 20
df = pd.DataFrame({
'name': [f'Person_{i}' if random.random() > 0.3 else None for i in range(n)],
'age': [random.randint(18, 65) if random.random() > 0.25 else None for _ in range(n)],
'salary': [round(random.uniform(30000, 120000), 2) if random.random() > 0.2 else None for _ in range(n)],
'city': [random.choice(['Mumbai', 'Delhi', 'Pune', None]) for _ in range(n)]
})
return df
def make_task2_dataset(seed=42):
"""MEDIUM: Dataset with inconsistent date formats."""
random.seed(seed); np.random.seed(seed)
n = 20
formats = ["%d/%m/%Y", "%m-%d-%Y", "%Y.%m.%d", "%d %b %Y", "%Y/%m/%d"]
target_format = "%Y-%m-%d"
base = datetime(2023, 1, 1)
dates_raw = []
for i in range(n):
d = base + timedelta(days=random.randint(0, 365))
fmt = random.choice(formats)
dates_raw.append(d.strftime(fmt))
df = pd.DataFrame({
'employee_id': range(1, n+1),
'hire_date': dates_raw,
'department': [random.choice(['Eng', 'HR', 'Finance', 'Sales']) for _ in range(n)],
'salary': [random.randint(40000, 150000) for _ in range(n)]
})
return df, target_format
def make_task3_dataset(seed=42):
"""HARD: Dataset with outliers in numeric columns."""
random.seed(seed); np.random.seed(seed)
n = 30
outlier_salaries = [500000, -5000, 999999, 1200000]
normal_salaries = np.random.normal(60000, 15000, n - len(outlier_salaries)).tolist()
all_salaries = normal_salaries + outlier_salaries
random.shuffle(all_salaries)
outlier_ages = [150, -5, 200]
normal_ages = np.random.randint(22, 60, n - len(outlier_ages)).tolist()
all_ages = normal_ages + outlier_ages
random.shuffle(all_ages)
df = pd.DataFrame({
'employee_id': range(1, n+1),
'age': all_ages,
'salary': [round(s, 2) for s in all_salaries],
'years_exp': [random.randint(0, 35) for _ in range(n)]
})
return df
# βββ Graders βββββββββββββββββββββββββββββββββββββββββββββββββββββ
def grade_task1(original_df, cleaned_df) -> float:
original_nulls = original_df.isnull().sum().sum()
if original_nulls == 0:
return 0.9999
cleaned_nulls = cleaned_df.isnull().sum().sum()
null_fix_score = max(0.0, (original_nulls - cleaned_nulls) / original_nulls)
row_retention = len(cleaned_df) / len(original_df)
retention_penalty = row_retention if row_retention < 0.3 else 1.0
return round(min(0.9999, max(0.0001, 0.7 * null_fix_score + 0.3 * retention_penalty)), 4)
def grade_task2(original_df, cleaned_df, target_format="%Y-%m-%d") -> float:
if 'hire_date' not in cleaned_df.columns:
return 0.0001
correctly_formatted = 0
total = len(cleaned_df)
pattern = re.compile(r'^\d{4}-\d{2}-\d{2}$')
for val in cleaned_df['hire_date']:
if isinstance(val, str) and pattern.match(val):
try:
datetime.strptime(val, target_format)
correctly_formatted += 1
except ValueError:
pass
base_score = correctly_formatted / total if total > 0 else 0.0
integrity = 1.0
for col in ['employee_id', 'department', 'salary']:
if col in original_df.columns and col in cleaned_df.columns:
if not original_df[col].equals(cleaned_df[col]):
integrity *= 0.8
return round(min(0.9999, max(0.0001, base_score * integrity)), 4)
def grade_task3(original_df, cleaned_df) -> float:
score_parts = []
for col in ['salary', 'age']:
if col not in original_df.columns or col not in cleaned_df.columns:
continue
orig_series = pd.to_numeric(original_df[col], errors='coerce').dropna()
clean_series = pd.to_numeric(cleaned_df[col], errors='coerce').dropna()
Q1, Q3 = orig_series.quantile(0.25), orig_series.quantile(0.75)
IQR = Q3 - Q1
lower, upper = Q1 - 3 * IQR, Q3 + 3 * IQR
original_outliers = ((orig_series < lower) | (orig_series > upper)).sum()
remaining_outliers = ((clean_series < lower) | (clean_series > upper)).sum()
if original_outliers == 0:
col_score = 0.9999
else:
removal_score = (original_outliers - remaining_outliers) / original_outliers
valid_retention = min(1.0, len(clean_series) / max(len(orig_series) - original_outliers, 1))
col_score = 0.6 * max(0.0, removal_score) + 0.4 * valid_retention
score_parts.append(col_score)
return round(min(0.9999, max(0.0001, sum(score_parts) / len(score_parts))), 4) if score_parts else 0.0001
# βββ Environment βββββββββββββββββββββββββββββββββββββββββββββββββ
TASK_DESCRIPTIONS = {
1: "EASY: The dataset has missing values (NaN/null). Call action_type='remove_nulls' to drop all rows with any null values.",
2: "MEDIUM: The 'hire_date' column has inconsistent formats. Call action_type='fix_dates' with column='hire_date' to standardise to YYYY-MM-DD.",
3: "HARD: The 'salary' and 'age' columns contain extreme outliers. Call action_type='remove_outliers' with column='all' to remove them via IQR method."
}
class DataCleaningEnvironment:
def __init__(self, task_id: int = 1, seed: int = 42):
assert task_id in [1, 2, 3], "task_id must be 1, 2, or 3"
self.task_id = task_id
self.seed = seed
self._original_df = None
self._current_df = None
self._target_format = None
self._step_count = 0
self._episode_id = None
self._done = False
self._total_reward = 0.0
def reset(self) -> DatasetObservation:
self._episode_id = str(uuid.uuid4())[:8]
self._step_count = 0
self._done = False
self._total_reward = 0.0
if self.task_id == 1:
self._original_df = make_task1_dataset(self.seed)
elif self.task_id == 2:
self._original_df, self._target_format = make_task2_dataset(self.seed)
else:
self._original_df = make_task3_dataset(self.seed)
self._current_df = deepcopy(self._original_df)
return self._make_observation(reward=0.0, done=False)
def step(self, action: CleaningAction):
if self._done:
return self._make_observation(reward=0.0, done=True), 0.0, True, {"error": "Episode already done"}
self._step_count += 1
try:
self._current_df = self._apply_action(action)
except Exception as e:
obs = self._make_observation(reward=-0.1, done=False)
obs.hint = f"Action failed: {str(e)[:100]}"
return obs, -0.1, False, {"error": str(e)}
reward = self._compute_reward()
self._total_reward += reward
self._done = True
return self._make_observation(reward=reward, done=True), reward, True, {"step": self._step_count}
def state(self) -> EnvironmentState:
return EnvironmentState(
episode_id=self._episode_id or "not_started",
step_count=self._step_count,
current_task=self.task_id,
tasks_completed=[],
total_reward=self._total_reward
)
@staticmethod
def _parse_date(val):
for fmt in ["%Y-%m-%d", "%Y/%m/%d", "%Y.%m.%d",
"%m/%d/%Y", "%m-%d-%Y", "%d/%m/%Y", "%d-%m-%Y", "%d.%m.%Y"]:
try:
return datetime.strptime(str(val).strip(), fmt).strftime("%Y-%m-%d")
except:
continue
return None
def _apply_action(self, action: CleaningAction) -> pd.DataFrame:
df = deepcopy(self._current_df)
if action.action_type == "remove_nulls":
df = df.dropna(subset=[action.column]) if action.column and action.column in df.columns else df.dropna()
elif action.action_type == "fix_dates":
col = action.column or "hire_date"
if col not in df.columns:
raise ValueError(f"Column '{col}' not found.")
df[col] = df[col].apply(self._parse_date)
elif action.action_type == "remove_outliers":
target_cols = [action.column] if action.column and action.column != "all" else ['salary', 'age']
for col in target_cols:
if col not in df.columns:
continue
series = pd.to_numeric(df[col], errors='coerce')
Q1, Q3 = series.quantile(0.25), series.quantile(0.75)
IQR = Q3 - Q1
df = df[(series >= Q1 - 3*IQR) & (series <= Q3 + 3*IQR)]
else:
raise ValueError(f"Unknown action_type: '{action.action_type}'")
return df
def _compute_reward(self) -> float:
if self.task_id == 1:
return grade_task1(self._original_df, self._current_df)
elif self.task_id == 2:
return grade_task2(self._original_df, self._current_df, self._target_format)
else:
return grade_task3(self._original_df, self._current_df)
def _make_observation(self, reward: float, done: bool) -> DatasetObservation:
df = self._current_df if self._current_df is not None else pd.DataFrame()
difficulties = {1: TaskDifficulty.EASY, 2: TaskDifficulty.MEDIUM, 3: TaskDifficulty.HARD}
null_count = int(df.isnull().sum().sum()) if len(df) > 0 else 0
date_errors = 0
if self.task_id == 2 and 'hire_date' in df.columns:
pat = re.compile(r'^\d{4}-\d{2}-\d{2}$')
date_errors = sum(1 for v in df['hire_date'] if not (isinstance(v, str) and pat.match(v)))
outlier_count = 0
if self.task_id == 3 and 'salary' in df.columns:
s = pd.to_numeric(df['salary'], errors='coerce').dropna()
if len(s) > 0:
Q1, Q3 = s.quantile(0.25), s.quantile(0.75)
IQR = Q3 - Q1
outlier_count = int(((s < Q1 - 3*IQR) | (s > Q3 + 3*IQR)).sum())
hints = {
1: "Call action_type='remove_nulls' to drop rows with missing values.",
2: "Call action_type='fix_dates' with column='hire_date'.",
3: "Call action_type='remove_outliers' with column='all'."
}
return DatasetObservation(
task_id=self.task_id,
task_description=TASK_DESCRIPTIONS[self.task_id],
difficulty=difficulties[self.task_id],
dataset_preview=df.head(5).to_string() if len(df) > 0 else "(empty)",
num_rows=len(df),
null_count=null_count,
date_format_errors=date_errors,
outlier_count=outlier_count,
current_score=self._compute_reward() if self._current_df is not None and self._step_count > 0 else 0.0,
done=done,
reward=reward,
hint=hints[self.task_id] if not done else ""
)
|