data-cleaning-env / environment.py
Yashwanth34567's picture
Upload folder using huggingface_hub
f1b06d6 verified
# environment.py
# ─────────────────────────────────────────────
# Core OpenEnv class β€” step / reset / state
# Full OpenEnv spec compliant
# ─────────────────────────────────────────────
import pandas as pd
import numpy as np
from typing import Any, Dict, Optional, Tuple
from config import (
MAX_STEPS,
REWARD_ISSUE_FIXED,
REWARD_SUBMIT_BONUS,
PENALTY_WRONG_ACTION,
PENALTY_DESTRUCTIVE,
TASK_EASY, TASK_MEDIUM, TASK_HARD,
)
from models import Observation, Action, Reward, StepResult
from tasks import Task, get_task
from utils import (
df_to_records,
get_null_counts,
get_duplicate_count,
detect_outliers_iqr,
clean_column_name,
standardize_column,
clamp,
)
from datasets import detect_issues
class DataCleaningEnv:
"""
OpenEnv-compliant Data Cleaning Environment.
The agent receives a messy CSV dataset and must
issue cleaning actions to fix it step by step.
Actions:
fill_missing β€” fill null values
drop_duplicates β€” remove duplicate rows
fix_dtype β€” convert column dtype
rename_column β€” rename a column
remove_outliers β€” remove outlier rows
standardize_values β€” replace values via mapping
submit β€” finalize and trigger grader
"""
def __init__(self, difficulty: str = TASK_EASY):
self.difficulty = difficulty
self.task: Task = get_task(difficulty)
self.step_count = 0
self.total_reward = 0.0
self.done = False
self._prev_score = 0.0
self._initialized = False
# ── OpenEnv API ───────────────────────────
def reset(self) -> Observation:
"""Start a fresh episode"""
self.task.reset()
self.step_count = 0
self.total_reward = 0.0
self.done = False
self._prev_score = 0.0
self._initialized = True
return self._make_observation()
def step(self, action: Action) -> StepResult:
"""Execute one action and return result"""
if not self._initialized:
self.reset()
if self.done:
obs = self._make_observation()
reward = Reward(value=0.0, total=self.total_reward,
reason="Episode already done")
return StepResult(observation=obs, reward=reward,
done=True, info={})
self.step_count += 1
reward_value, reason, info = self._execute_action(action)
# Clamp and accumulate
reward_value = clamp(reward_value, -1.0, 1.0)
self.total_reward = round(self.total_reward + reward_value, 4)
# Check done conditions
if self.step_count >= MAX_STEPS:
self.done = True
reason += " | Max steps reached"
reward = Reward(
value = round(reward_value, 4),
total = self.total_reward,
reason = reason,
)
obs = self._make_observation()
obs.done = self.done
return StepResult(
observation = obs,
reward = reward,
done = self.done,
info = info,
)
def state(self) -> Dict[str, Any]:
"""Return current environment state as dict"""
if not self._initialized:
return {"status": "not initialized, call reset() first"}
score, details = self.task.grade()
return {
"task_id": self.task.task_id,
"difficulty": self.difficulty,
"step": self.step_count,
"max_steps": MAX_STEPS,
"total_reward": self.total_reward,
"current_score": score,
"done": self.done,
"issues": self.task.issues,
"grade_details": details,
"dataframe": df_to_records(self.task.current_df),
"columns": list(self.task.current_df.columns),
"null_counts": get_null_counts(self.task.current_df),
"duplicate_rows": get_duplicate_count(self.task.current_df),
}
# ── Action Executor ───────────────────────
def _execute_action(self, action: Action
) -> Tuple[float, str, Dict]:
"""Route action to handler, return (reward, reason, info)"""
df = self.task.current_df
p = action.parameters
info = {}
try:
if action.action_type == "fill_missing":
df, reward, reason = self._fill_missing(df, p)
elif action.action_type == "drop_duplicates":
df, reward, reason = self._drop_duplicates(df)
elif action.action_type == "fix_dtype":
df, reward, reason = self._fix_dtype(df, p)
elif action.action_type == "rename_column":
df, reward, reason = self._rename_column(df, p)
elif action.action_type == "remove_outliers":
df, reward, reason = self._remove_outliers(df, p)
elif action.action_type == "standardize_values":
df, reward, reason = self._standardize_values(df, p)
elif action.action_type == "submit":
df, reward, reason, info = self._submit(df)
else:
reward = PENALTY_WRONG_ACTION
reason = f"Unknown action: {action.action_type}"
except Exception as e:
reward = PENALTY_WRONG_ACTION
reason = f"Action failed: {str(e)}"
info = {"error": str(e)}
self.task.current_df = df
self.task.update_issues()
return reward, reason, info
# ── Action Handlers ───────────────────────
def _fill_missing(self, df: pd.DataFrame,
params: Dict) -> Tuple[pd.DataFrame, float, str]:
column = params.get("column")
strategy = params.get("strategy", "mean")
if column not in df.columns:
return df, PENALTY_WRONG_ACTION, f"Column '{column}' not found"
null_before = df[column].isnull().sum()
if null_before == 0:
return df, PENALTY_WRONG_ACTION, \
f"No nulls in '{column}' β€” redundant action"
df = df.copy()
if strategy == "mean":
numeric = pd.to_numeric(df[column], errors="coerce")
df[column] = df[column].fillna(numeric.mean())
elif strategy == "median":
numeric = pd.to_numeric(df[column], errors="coerce")
df[column] = df[column].fillna(numeric.median())
elif strategy == "mode":
df[column].fillna(df[column].mode()[0], inplace=True)
elif strategy == "drop":
before = len(df)
df.dropna(subset=[column], inplace=True)
dropped = before - len(df)
if dropped > len(df) * 0.3:
return df, PENALTY_DESTRUCTIVE, \
f"Dropped {dropped} rows β€” too destructive"
else:
df[column].fillna(strategy, inplace=True)
null_after = df[column].isnull().sum()
fixed = null_before - null_after
reward = REWARD_ISSUE_FIXED * (fixed / max(null_before, 1))
return df, reward, f"Filled {fixed} nulls in '{column}' via {strategy}"
def _drop_duplicates(self, df: pd.DataFrame
) -> Tuple[pd.DataFrame, float, str]:
dup_before = df.duplicated().sum()
if dup_before == 0:
return df, PENALTY_WRONG_ACTION, "No duplicates β€” redundant action"
df = df.drop_duplicates().reset_index(drop=True)
reward = REWARD_ISSUE_FIXED
return df, reward, f"Removed {dup_before} duplicate rows"
def _fix_dtype(self, df: pd.DataFrame,
params: Dict) -> Tuple[pd.DataFrame, float, str]:
column = params.get("column")
target_type = params.get("target_type", "float")
if column not in df.columns:
return df, PENALTY_WRONG_ACTION, f"Column '{column}' not found"
df = df.copy()
try:
if target_type == "int":
df[column] = pd.to_numeric(
df[column], errors="coerce").fillna(0).astype(int)
elif target_type == "float":
df[column] = pd.to_numeric(df[column], errors="coerce")
elif target_type == "str":
df[column] = df[column].astype(str)
elif target_type == "datetime":
df[column] = pd.to_datetime(df[column], errors="coerce")
else:
return df, PENALTY_WRONG_ACTION, \
f"Unknown target type: {target_type}"
except Exception as e:
return df, PENALTY_WRONG_ACTION, f"Dtype fix failed: {e}"
return df, REWARD_ISSUE_FIXED, \
f"Converted '{column}' to {target_type}"
def _rename_column(self, df: pd.DataFrame,
params: Dict) -> Tuple[pd.DataFrame, float, str]:
old_name = params.get("old_name")
new_name = params.get("new_name")
if old_name not in df.columns:
return df, PENALTY_WRONG_ACTION, \
f"Column '{old_name}' not found"
expected = clean_column_name(old_name)
if new_name != expected:
return df, PENALTY_WRONG_ACTION, \
f"Suggested name '{new_name}' β€” expected '{expected}'"
df = df.rename(columns={old_name: new_name})
return df, REWARD_ISSUE_FIXED, \
f"Renamed '{old_name}' to '{new_name}'"
def _remove_outliers(self, df: pd.DataFrame,
params: Dict) -> Tuple[pd.DataFrame, float, str]:
column = params.get("column")
method = params.get("method", "iqr")
if column not in df.columns:
return df, PENALTY_WRONG_ACTION, f"Column '{column}' not found"
df = df.copy()
try:
numeric = pd.to_numeric(df[column], errors="coerce")
df[column] = numeric
if method == "iqr":
mask = detect_outliers_iqr(df, column)
else:
from utils import detect_outliers_zscore
mask = detect_outliers_zscore(df, column)
count = mask.sum()
if count == 0:
return df, PENALTY_WRONG_ACTION, \
f"No outliers in '{column}' β€” redundant"
if count > len(df) * 0.4:
return df, PENALTY_DESTRUCTIVE, \
f"Would remove {count} rows β€” too destructive"
df = df[~mask].reset_index(drop=True)
return df, REWARD_ISSUE_FIXED, \
f"Removed {count} outliers from '{column}'"
except Exception as e:
return df, PENALTY_WRONG_ACTION, f"Outlier removal failed: {e}"
def _standardize_values(self, df: pd.DataFrame,
params: Dict
) -> Tuple[pd.DataFrame, float, str]:
column = params.get("column")
mapping = params.get("mapping", {})
if column not in df.columns:
return df, PENALTY_WRONG_ACTION, f"Column '{column}' not found"
if not mapping:
return df, PENALTY_WRONG_ACTION, "Empty mapping provided"
before = df[column].value_counts().to_dict()
df = standardize_column(df, column, mapping)
after = df[column].value_counts().to_dict()
changed = sum(1 for k, v in before.items()
if after.get(mapping.get(k, k), 0) != v)
if changed == 0:
return df, PENALTY_WRONG_ACTION, "No values changed"
return df, REWARD_ISSUE_FIXED, \
f"Standardized {len(mapping)} values in '{column}'"
def _submit(self, df: pd.DataFrame
) -> Tuple[pd.DataFrame, float, str, Dict]:
score, details = self.task.grade()
self.done = True
if score >= 0.9:
reward = REWARD_SUBMIT_BONUS + score
reason = f"Excellent! Score: {score:.3f} β€” dataset is clean!"
elif score >= 0.6:
reward = score
reason = f"Good effort. Score: {score:.3f} β€” some issues remain"
else:
reward = score * 0.5
reason = f"Submitted early. Score: {score:.3f} β€” many issues remain"
return df, reward, reason, {"final_score": score, "details": details}
# ── Observation Builder ───────────────────
def _make_observation(self) -> Observation:
df = self.task.current_df
return Observation(
task_id = self.task.task_id,
difficulty = self.difficulty,
step = self.step_count,
max_steps = MAX_STEPS,
dataframe = df_to_records(df),
columns = list(df.columns),
dtypes = {c: str(t) for c, t in df.dtypes.items()},
null_counts = get_null_counts(df),
duplicate_rows = get_duplicate_count(df),
issues = self.task.issues,
done = self.done,
)