data-wrangler-env / server /cleaning_engine.py
Aswini-Kumar's picture
Sync server/cleaning_engine.py
cf67c72 verified
"""
Cleaning Engine for DataWranglerEnv.
Parses text commands from the agent and executes data cleaning operations
on the working DataFrame. Returns text results for the observation.
Commands:
Diagnostic: help, view, profile, profile_column, find_missing,
find_duplicates, find_outliers
Cleaning: fill_missing, remove_duplicates, fix_dtype, replace,
regex_replace, standardize, remove_rows, clip,
rename_column, drop_column, sort
Special: validate, submit
"""
import re
from typing import Any, Dict, Optional, Tuple
import numpy as np
import pandas as pd
from .dataset_generator import COMMANDS_HELP
class CleaningEngine:
"""Parses agent text commands and applies cleaning operations to a DataFrame."""
def __init__(self, df: pd.DataFrame):
self.df = df
def execute(self, command_str: str) -> Tuple[str, bool]:
"""Parse and execute a text command.
Args:
command_str: Raw text command from the agent
Returns:
Tuple of (response_text, data_was_modified)
"""
command_str = command_str.strip()
if not command_str:
return "Error: Empty command. Type 'help' for available commands.", False
parts = self._parse_command(command_str)
cmd = parts[0].lower()
args = parts[1:]
dispatch = {
"help": self._cmd_help,
"view": self._cmd_view,
"profile": self._cmd_profile,
"profile_column": self._cmd_profile_column,
"find_missing": self._cmd_find_missing,
"find_duplicates": self._cmd_find_duplicates,
"find_outliers": self._cmd_find_outliers,
"fill_missing": self._cmd_fill_missing,
"remove_duplicates": self._cmd_remove_duplicates,
"fix_dtype": self._cmd_fix_dtype,
"replace": self._cmd_replace,
"regex_replace": self._cmd_regex_replace,
"standardize": self._cmd_standardize,
"remove_rows": self._cmd_remove_rows,
"clip": self._cmd_clip,
"rename_column": self._cmd_rename_column,
"drop_column": self._cmd_drop_column,
"sort": self._cmd_sort,
"validate": self._cmd_validate,
"submit": self._cmd_submit,
}
handler = dispatch.get(cmd)
if handler is None:
suggestions = [c for c in dispatch.keys() if c.startswith(cmd[:3])] if len(cmd) >= 3 else []
msg = f"Error: Unknown command '{cmd}'."
if suggestions:
msg += f" Did you mean: {', '.join(suggestions)}?"
msg += " Type 'help' for available commands."
return msg, False
try:
return handler(args)
except Exception as e:
return f"Error executing '{cmd}': {str(e)}", False
def _parse_command(self, command_str: str) -> list:
"""Parse command string, handling quoted arguments."""
parts = []
current = ""
in_quotes = False
quote_char = None
for char in command_str:
if char in ('"', "'") and not in_quotes:
in_quotes = True
quote_char = char
elif char == quote_char and in_quotes:
in_quotes = False
quote_char = None
elif char == " " and not in_quotes:
if current:
parts.append(current)
current = ""
else:
current += char
if current:
parts.append(current)
return parts
# ── Diagnostic commands (read-only) ──────────────────────────────────
def _cmd_help(self, args: list) -> Tuple[str, bool]:
return COMMANDS_HELP, False
def _cmd_view(self, args: list) -> Tuple[str, bool]:
n = 10
if args:
try:
n = int(args[0])
except ValueError:
return "Error: 'view' expects an integer argument. Usage: view [N]", False
n = min(n, 50)
result = self.df.head(n).to_string(max_colwidth=30)
return f"Showing first {n} rows ({len(self.df)} total):\n\n{result}", False
def _cmd_profile(self, args: list) -> Tuple[str, bool]:
lines = []
lines.append(f"Dataset Shape: {self.df.shape[0]} rows Γ— {self.df.shape[1]} columns")
lines.append(f"\nColumns:")
lines.append(f"{'Column':<20} {'Type':<12} {'Non-Null':<10} {'Missing':<10} {'Missing%':<10} {'Unique':<8}")
lines.append("-" * 70)
for col in self.df.columns:
dtype = str(self.df[col].dtype)
non_null = self.df[col].notna().sum()
missing = self.df[col].isna().sum()
missing_pct = f"{(missing / len(self.df) * 100):.1f}%"
unique = self.df[col].nunique()
lines.append(f"{col:<20} {dtype:<12} {non_null:<10} {missing:<10} {missing_pct:<10} {unique:<8}")
n_dupes = self.df.duplicated().sum()
lines.append(f"\nDuplicate rows: {n_dupes} ({n_dupes / len(self.df) * 100:.1f}%)")
return "\n".join(lines), False
def _cmd_profile_column(self, args: list) -> Tuple[str, bool]:
if not args:
return "Error: Usage: profile_column COLUMN_NAME", False
col = args[0]
if col not in self.df.columns:
return f"Error: Column '{col}' not found. Available: {', '.join(self.df.columns)}", False
lines = [f"Profile for column '{col}':"]
series = self.df[col]
lines.append(f" Type: {series.dtype}")
lines.append(f" Non-null: {series.notna().sum()} / {len(series)}")
lines.append(f" Missing: {series.isna().sum()} ({series.isna().mean() * 100:.1f}%)")
lines.append(f" Unique values: {series.nunique()}")
if pd.api.types.is_numeric_dtype(series):
desc = series.describe()
lines.append(f" Min: {desc.get('min', 'N/A')}")
lines.append(f" Max: {desc.get('max', 'N/A')}")
lines.append(f" Mean: {desc.get('mean', 'N/A'):.2f}" if pd.notna(desc.get('mean')) else " Mean: N/A")
lines.append(f" Std: {desc.get('std', 'N/A'):.2f}" if pd.notna(desc.get('std')) else " Std: N/A")
lines.append(f" Median: {desc.get('50%', 'N/A')}")
else:
top_values = series.dropna().value_counts().head(10)
lines.append(f" Top values:")
for val, count in top_values.items():
lines.append(f" '{val}': {count}")
return "\n".join(lines), False
def _cmd_find_missing(self, args: list) -> Tuple[str, bool]:
missing = self.df.isnull().sum()
missing = missing[missing > 0]
if missing.empty:
return "No missing values found! The dataset is complete.", False
lines = ["Missing values by column:"]
lines.append(f"{'Column':<25} {'Count':<8} {'Percentage':<10}")
lines.append("-" * 43)
for col, count in missing.sort_values(ascending=False).items():
pct = f"{count / len(self.df) * 100:.1f}%"
lines.append(f"{col:<25} {count:<8} {pct:<10}")
lines.append(f"\nTotal missing cells: {missing.sum()}")
return "\n".join(lines), False
def _cmd_find_duplicates(self, args: list) -> Tuple[str, bool]:
subset = None
if args:
subset = [c.strip() for c in args[0].split(",")]
invalid = [c for c in subset if c not in self.df.columns]
if invalid:
return f"Error: Unknown columns: {invalid}. Available: {list(self.df.columns)}", False
dupes = self.df[self.df.duplicated(subset=subset, keep=False)]
n_dupes = self.df.duplicated(subset=subset, keep="first").sum()
if n_dupes == 0:
cols_desc = f" (on columns: {subset})" if subset else ""
return f"No duplicate rows found{cols_desc}.", False
lines = [f"Found {n_dupes} duplicate rows (keeping first occurrence):"]
if len(dupes) <= 20:
lines.append(dupes.to_string(max_colwidth=25))
else:
lines.append(f"Showing first 10 of {len(dupes)} duplicate entries:")
lines.append(dupes.head(10).to_string(max_colwidth=25))
return "\n".join(lines), False
def _cmd_find_outliers(self, args: list) -> Tuple[str, bool]:
if not args:
return "Error: Usage: find_outliers COLUMN_NAME", False
col = args[0]
if col not in self.df.columns:
return f"Error: Column '{col}' not found. Available: {', '.join(self.df.columns)}", False
try:
numeric_col = pd.to_numeric(self.df[col], errors="coerce")
except Exception:
return f"Error: Column '{col}' cannot be converted to numeric for outlier detection.", False
q1 = numeric_col.quantile(0.25)
q3 = numeric_col.quantile(0.75)
iqr = q3 - q1
lower = q1 - 1.5 * iqr
upper = q3 + 1.5 * iqr
outliers_mask = (numeric_col < lower) | (numeric_col > upper)
n_outliers = outliers_mask.sum()
if n_outliers == 0:
return f"No outliers found in '{col}' (IQR method, bounds: [{lower:.2f}, {upper:.2f}]).", False
lines = [f"Found {n_outliers} outliers in '{col}' (IQR method):"]
lines.append(f" Q1: {q1:.2f}, Q3: {q3:.2f}, IQR: {iqr:.2f}")
lines.append(f" Lower bound: {lower:.2f}")
lines.append(f" Upper bound: {upper:.2f}")
outlier_values = numeric_col[outliers_mask].dropna()
lines.append(f" Outlier values: {list(outlier_values.head(15).values)}")
return "\n".join(lines), False
# ── Cleaning commands (modify data) ──────────────────────────────────
def _cmd_fill_missing(self, args: list) -> Tuple[str, bool]:
if len(args) < 2:
return "Error: Usage: fill_missing COLUMN STRATEGY [VALUE]\n Strategies: mean, median, mode, constant, forward_fill", False
col = args[0]
strategy = args[1].lower()
if col not in self.df.columns:
return f"Error: Column '{col}' not found. Available: {', '.join(self.df.columns)}", False
n_before = self.df[col].isna().sum()
if n_before == 0:
return f"No missing values in '{col}'. Nothing to fill.", False
if strategy == "mean":
try:
fill_val = pd.to_numeric(self.df[col], errors="coerce").mean()
self.df[col] = pd.to_numeric(self.df[col], errors="coerce").fillna(fill_val)
except Exception:
return f"Error: Cannot compute mean for non-numeric column '{col}'.", False
elif strategy == "median":
try:
fill_val = pd.to_numeric(self.df[col], errors="coerce").median()
self.df[col] = pd.to_numeric(self.df[col], errors="coerce").fillna(fill_val)
except Exception:
return f"Error: Cannot compute median for non-numeric column '{col}'.", False
elif strategy == "mode":
mode_val = self.df[col].mode()
if mode_val.empty:
return f"Error: No mode found for '{col}'.", False
self.df[col] = self.df[col].fillna(mode_val.iloc[0])
elif strategy == "constant":
if len(args) < 3:
return "Error: 'constant' strategy requires a VALUE. Usage: fill_missing COL constant VALUE", False
fill_val = args[2]
self.df[col] = self.df[col].fillna(fill_val)
elif strategy == "forward_fill":
self.df[col] = self.df[col].ffill()
else:
return f"Error: Unknown strategy '{strategy}'. Use: mean, median, mode, constant, forward_fill", False
n_after = self.df[col].isna().sum()
filled = n_before - n_after
return f"Filled {filled} missing values in '{col}' using strategy '{strategy}'. Remaining: {n_after}", True
def _cmd_remove_duplicates(self, args: list) -> Tuple[str, bool]:
subset = None
keep = "first"
if args:
subset = [c.strip() for c in args[0].split(",")]
invalid = [c for c in subset if c not in self.df.columns]
if invalid:
return f"Error: Unknown columns: {invalid}. Available: {list(self.df.columns)}", False
if len(args) > 1:
keep = args[1].lower()
if keep not in ("first", "last", "none", "false"):
return f"Error: keep must be 'first', 'last', or 'none'. Got: '{keep}'", False
if keep == "none":
keep = False
n_before = len(self.df)
self.df = self.df.drop_duplicates(subset=subset, keep=keep).reset_index(drop=True)
n_after = len(self.df)
removed = n_before - n_after
if removed == 0:
return "No duplicate rows found to remove.", False
return f"Removed {removed} duplicate rows. Dataset: {n_before} β†’ {n_after} rows.", True
def _cmd_fix_dtype(self, args: list) -> Tuple[str, bool]:
if len(args) < 2:
return "Error: Usage: fix_dtype COLUMN TYPE (int/float/str/datetime)", False
col = args[0]
target = args[1].lower()
if col not in self.df.columns:
return f"Error: Column '{col}' not found.", False
before_type = str(self.df[col].dtype)
errors = 0
if target in ("int", "int64"):
self.df[col] = self.df[col].astype(str).str.replace(r'[^\d.\-]', '', regex=True)
numeric = pd.to_numeric(self.df[col], errors="coerce")
errors = numeric.isna().sum() - self.df[col].isna().sum()
self.df[col] = numeric.astype("Int64")
elif target in ("float", "float64"):
self.df[col] = self.df[col].astype(str).str.replace(r'[^\d.\-]', '', regex=True)
self.df[col] = pd.to_numeric(self.df[col], errors="coerce")
errors = self.df[col].isna().sum()
elif target in ("str", "string", "object"):
self.df[col] = self.df[col].astype(str)
elif target in ("datetime", "date"):
self.df[col] = pd.to_datetime(self.df[col], errors="coerce", infer_datetime_format=True)
errors = self.df[col].isna().sum()
else:
return f"Error: Unknown type '{target}'. Use: int, float, str, datetime", False
return f"Converted '{col}' from {before_type} β†’ {target}. Coercion errors: {errors}", True
def _cmd_replace(self, args: list) -> Tuple[str, bool]:
if len(args) < 3:
return "Error: Usage: replace COLUMN OLD_VALUE NEW_VALUE", False
col = args[0]
old_val = args[1]
new_val = args[2]
if col not in self.df.columns:
return f"Error: Column '{col}' not found.", False
mask = self.df[col].astype(str) == old_val
n_matches = mask.sum()
if n_matches == 0:
return f"No matches found for '{old_val}' in column '{col}'.", False
self.df.loc[mask, col] = new_val
return f"Replaced {n_matches} occurrences of '{old_val}' with '{new_val}' in '{col}'.", True
def _cmd_regex_replace(self, args: list) -> Tuple[str, bool]:
"""Regex-based replacement within a column."""
if len(args) < 3:
return "Error: Usage: regex_replace COLUMN PATTERN REPLACEMENT", False
col = args[0]
pattern = args[1]
replacement = args[2]
if col not in self.df.columns:
return f"Error: Column '{col}' not found.", False
try:
before_vals = self.df[col].astype(str).copy()
self.df[col] = self.df[col].astype(str).str.replace(pattern, replacement, regex=True)
n_changed = (before_vals != self.df[col].astype(str)).sum()
except re.error as e:
return f"Error: Invalid regex pattern '{pattern}': {e}", False
if n_changed == 0:
return f"No matches for pattern '{pattern}' in column '{col}'.", False
return f"Regex replaced {n_changed} values in '{col}' (pattern: '{pattern}' β†’ '{replacement}').", True
def _cmd_standardize(self, args: list) -> Tuple[str, bool]:
if len(args) < 2:
return "Error: Usage: standardize COLUMN METHOD (lowercase/uppercase/titlecase/strip)", False
col = args[0]
method = args[1].lower()
if col not in self.df.columns:
return f"Error: Column '{col}' not found.", False
before_uniq = self.df[col].nunique()
if method == "lowercase":
self.df[col] = self.df[col].astype(str).str.lower()
elif method == "uppercase":
self.df[col] = self.df[col].astype(str).str.upper()
elif method == "titlecase":
self.df[col] = self.df[col].astype(str).str.title()
elif method == "strip":
self.df[col] = self.df[col].astype(str).str.strip()
else:
return f"Error: Unknown method '{method}'. Use: lowercase, uppercase, titlecase, strip", False
after_uniq = self.df[col].nunique()
consolidated = before_uniq - after_uniq
return f"Standardized '{col}' using {method}. Unique values: {before_uniq} β†’ {after_uniq} (consolidated {consolidated}).", True
def _cmd_remove_rows(self, args: list) -> Tuple[str, bool]:
if len(args) < 3:
return "Error: Usage: remove_rows COLUMN CONDITION VALUE\n Conditions: equals, not_equals, less_than, greater_than, contains", False
col = args[0]
condition = args[1].lower()
value = args[2]
if col not in self.df.columns:
return f"Error: Column '{col}' not found.", False
n_before = len(self.df)
if condition == "equals":
mask = self.df[col].astype(str) == value
elif condition == "not_equals":
mask = self.df[col].astype(str) != value
elif condition == "less_than":
try:
val = float(value)
mask = pd.to_numeric(self.df[col], errors="coerce") < val
except ValueError:
return f"Error: '{value}' is not a valid number for less_than.", False
elif condition == "greater_than":
try:
val = float(value)
mask = pd.to_numeric(self.df[col], errors="coerce") > val
except ValueError:
return f"Error: '{value}' is not a valid number for greater_than.", False
elif condition == "contains":
mask = self.df[col].astype(str).str.contains(value, case=False, na=False)
else:
return f"Error: Unknown condition '{condition}'. Use: equals, not_equals, less_than, greater_than, contains", False
n_removed = mask.sum()
if n_removed == 0:
return f"No rows match condition '{col} {condition} {value}'.", False
self.df = self.df[~mask].reset_index(drop=True)
return f"Removed {n_removed} rows where {col} {condition} {value}. Dataset: {n_before} β†’ {len(self.df)} rows.", True
def _cmd_clip(self, args: list) -> Tuple[str, bool]:
if len(args) < 3:
return "Error: Usage: clip COLUMN LOWER UPPER", False
col = args[0]
if col not in self.df.columns:
return f"Error: Column '{col}' not found.", False
try:
lower = float(args[1])
upper = float(args[2])
except ValueError:
return "Error: LOWER and UPPER must be numbers.", False
numeric_col = pd.to_numeric(self.df[col], errors="coerce")
n_clipped = ((numeric_col < lower) | (numeric_col > upper)).sum()
self.df[col] = numeric_col.clip(lower=lower, upper=upper)
return f"Clipped {n_clipped} values in '{col}' to [{lower}, {upper}].", True
def _cmd_rename_column(self, args: list) -> Tuple[str, bool]:
"""Rename a column."""
if len(args) < 2:
return "Error: Usage: rename_column OLD_NAME NEW_NAME", False
old_name = args[0]
new_name = args[1]
if old_name not in self.df.columns:
return f"Error: Column '{old_name}' not found.", False
if new_name in self.df.columns:
return f"Error: Column '{new_name}' already exists.", False
self.df = self.df.rename(columns={old_name: new_name})
return f"Renamed column '{old_name}' β†’ '{new_name}'.", True
def _cmd_drop_column(self, args: list) -> Tuple[str, bool]:
"""Drop a column from the dataset."""
if not args:
return "Error: Usage: drop_column COLUMN_NAME", False
col = args[0]
if col not in self.df.columns:
return f"Error: Column '{col}' not found.", False
self.df = self.df.drop(columns=[col])
return f"Dropped column '{col}'. Remaining columns: {len(self.df.columns)}", True
def _cmd_sort(self, args: list) -> Tuple[str, bool]:
"""Sort dataset by a column."""
if not args:
return "Error: Usage: sort COLUMN [asc|desc]", False
col = args[0]
if col not in self.df.columns:
return f"Error: Column '{col}' not found.", False
ascending = True
if len(args) > 1 and args[1].lower() == "desc":
ascending = False
self.df = self.df.sort_values(by=col, ascending=ascending, na_position="last").reset_index(drop=True)
direction = "ascending" if ascending else "descending"
return f"Sorted dataset by '{col}' ({direction}).", True
# ── Special commands ─────────────────────────────────────────────────
def _cmd_validate(self, args: list) -> Tuple[str, bool]:
return "__VALIDATE__", False
def _cmd_submit(self, args: list) -> Tuple[str, bool]:
return "__SUBMIT__", False