data-janitor-env / server /engine.py
yaswanth169's picture
Upload folder using huggingface_hub
2b62b0f verified
import re
import statistics
from datetime import datetime
from typing import Any, Dict, List, Optional
DATE_FORMATS = [
"%Y-%m-%d",
"%m/%d/%Y",
"%d/%m/%Y",
"%b %d, %Y",
"%d-%b-%Y",
"%B %d %Y",
"%B %d, %Y",
"%Y/%m/%d",
"%m-%d-%Y",
"%d.%m.%Y",
]
def is_null(value: Any) -> bool:
if value is None:
return True
if isinstance(value, str) and value.strip().lower() in (
"", "n/a", "na", "null", "none", "nan", "-", "missing",
):
return True
return False
def clean_numeric(value: Any) -> Optional[float]:
if is_null(value):
return None
s = str(value).strip().replace("$", "").replace(",", "").replace(" ", "")
try:
return float(s)
except (ValueError, TypeError):
return None
def parse_date(value: str) -> Optional[datetime]:
if not value or not isinstance(value, str):
return None
value = value.strip()
for fmt in DATE_FORMATS:
try:
return datetime.strptime(value, fmt)
except ValueError:
continue
return None
def normalize_phone(value: Any) -> str:
if is_null(value):
return ""
digits = re.sub(r"\D", "", str(value))
if len(digits) == 11 and digits[0] == "1":
digits = digits[1:]
if len(digits) == 10:
return f"({digits[:3]}) {digits[3:6]}-{digits[6:]}"
return str(value)
class DataEngine:
COMMANDS = [
"inspect",
"drop_duplicates",
"fill_missing",
"drop_nulls",
"convert_type",
"normalize_text",
"standardize_date",
"standardize_phone",
"rename_column",
"map_values",
"filter_rows",
"split_column",
"merge_columns",
"join",
"add_column",
"submit",
]
def __init__(
self,
data: List[Dict[str, Any]],
secondary_data: Optional[List[Dict[str, Any]]] = None,
):
self.data = [dict(row) for row in data]
self.secondary_data = (
[dict(row) for row in secondary_data] if secondary_data else None
)
@property
def columns(self) -> List[str]:
return list(self.data[0].keys()) if self.data else []
def execute(self, command: str, column: Optional[str], params: Dict[str, Any]) -> str:
if command not in self.COMMANDS:
return f"Unknown command '{command}'. Available: {', '.join(self.COMMANDS)}"
if command == "submit":
return "submitted"
handler = getattr(self, f"_cmd_{command}", None)
if not handler:
return f"Command '{command}' is not implemented."
try:
return handler(column, params)
except Exception as e:
return f"Error executing '{command}': {e}"
def _validate_column(self, column: Optional[str]) -> Optional[str]:
if not column:
return "Column name is required for this command."
if column not in self.columns:
return f"Column '{column}' not found. Available: {self.columns}"
return None
# ── inspect ──────────────────────────────────────────────────────────
def _cmd_inspect(self, column: Optional[str], params: Dict) -> str:
if column:
err = self._validate_column(column)
if err:
return err
values = [row.get(column) for row in self.data]
non_null = [v for v in values if not is_null(v)]
null_count = len(values) - len(non_null)
unique = set(str(v) for v in non_null)
types = set(type(v).__name__ for v in non_null)
sample = [str(v) for v in non_null[:8]]
return (
f"Column '{column}': {len(values)} total, {null_count} nulls, "
f"{len(unique)} unique, types: {types}. Sample: {sample}"
)
return f"Dataset: {len(self.data)} rows, columns: {self.columns}"
# ── drop_duplicates ──────────────────────────────────────────────────
def _cmd_drop_duplicates(self, column: Optional[str], params: Dict) -> str:
subset = params.get("subset", self.columns)
if isinstance(subset, str):
subset = [subset]
seen: set = set()
unique: List[Dict] = []
for row in self.data:
key = tuple(str(row.get(col, "")) for col in subset)
if key not in seen:
seen.add(key)
unique.append(row)
removed = len(self.data) - len(unique)
self.data = unique
return f"Removed {removed} duplicate rows. {len(self.data)} rows remaining."
# ── fill_missing ─────────────────────────────────────────────────────
def _cmd_fill_missing(self, column: Optional[str], params: Dict) -> str:
err = self._validate_column(column)
if err:
return err
strategy = params.get("strategy", "constant")
fill_value = params.get("value")
if strategy == "constant" and fill_value is None:
return "Strategy 'constant' requires a 'value' parameter."
non_null_values = [row[column] for row in self.data if not is_null(row.get(column))]
if strategy == "mean":
nums = [n for n in (clean_numeric(v) for v in non_null_values) if n is not None]
fill_value = round(statistics.mean(nums), 2) if nums else 0
elif strategy == "median":
nums = [n for n in (clean_numeric(v) for v in non_null_values) if n is not None]
fill_value = round(statistics.median(nums), 2) if nums else 0
elif strategy == "mode":
fill_value = (
max(set(non_null_values), key=non_null_values.count)
if non_null_values
else ""
)
filled = 0
for row in self.data:
if is_null(row.get(column)):
row[column] = fill_value
filled += 1
return f"Filled {filled} missing values in '{column}' with {strategy} ({fill_value})."
# ── drop_nulls ───────────────────────────────────────────────────────
def _cmd_drop_nulls(self, column: Optional[str], params: Dict) -> str:
err = self._validate_column(column)
if err:
return err
before = len(self.data)
self.data = [row for row in self.data if not is_null(row.get(column))]
removed = before - len(self.data)
return f"Dropped {removed} rows with null '{column}'. {len(self.data)} remaining."
# ── convert_type ─────────────────────────────────────────────────────
def _cmd_convert_type(self, column: Optional[str], params: Dict) -> str:
err = self._validate_column(column)
if err:
return err
target = params.get("target_type", "str")
converted = 0
errors = 0
for row in self.data:
val = row[column]
if is_null(val):
row[column] = None
continue
try:
if target == "int":
cleaned = clean_numeric(val)
row[column] = int(cleaned) if cleaned is not None else None
elif target == "float":
row[column] = clean_numeric(val)
elif target == "str":
row[column] = str(val)
else:
return f"Unsupported target type '{target}'. Use: int, float, str."
converted += 1
except (ValueError, TypeError):
row[column] = None
errors += 1
return f"Converted {converted} values in '{column}' to {target}. {errors} errors."
# ── normalize_text ───────────────────────────────────────────────────
def _cmd_normalize_text(self, column: Optional[str], params: Dict) -> str:
err = self._validate_column(column)
if err:
return err
operation = params.get("operation", "trim")
pattern = params.get("pattern", "")
replacement = params.get("replacement", "")
modified = 0
for row in self.data:
val = row[column]
if is_null(val):
continue
original = str(val)
if operation == "trim":
row[column] = original.strip()
elif operation == "lower":
row[column] = original.strip().lower()
elif operation == "upper":
row[column] = original.strip().upper()
elif operation == "title":
row[column] = original.strip().title()
elif operation == "regex_replace":
if not pattern:
return "regex_replace requires a 'pattern' parameter."
row[column] = re.sub(pattern, replacement, original)
else:
return (
f"Unknown operation '{operation}'. "
"Use: trim, lower, upper, title, regex_replace."
)
if row[column] != original:
modified += 1
return f"Normalized {modified} values in '{column}' with '{operation}'."
# ── standardize_date ─────────────────────────────────────────────────
def _cmd_standardize_date(self, column: Optional[str], params: Dict) -> str:
err = self._validate_column(column)
if err:
return err
target_format = params.get("format", "%Y-%m-%d")
converted = 0
failed = 0
for row in self.data:
val = row[column]
if is_null(val):
continue
parsed = parse_date(str(val))
if parsed:
row[column] = parsed.strftime(target_format)
converted += 1
else:
failed += 1
return (
f"Standardized {converted} dates in '{column}'. "
f"{failed} could not be parsed."
)
# ── standardize_phone ────────────────────────────────────────────────
def _cmd_standardize_phone(self, column: Optional[str], params: Dict) -> str:
err = self._validate_column(column)
if err:
return err
modified = 0
for row in self.data:
val = row[column]
if is_null(val):
continue
normalized = normalize_phone(val)
if normalized != str(val):
modified += 1
row[column] = normalized
return f"Standardized {modified} phone numbers in '{column}'."
# ── rename_column ────────────────────────────────────────────────────
def _cmd_rename_column(self, column: Optional[str], params: Dict) -> str:
err = self._validate_column(column)
if err:
return err
new_name = params.get("new_name")
if not new_name:
return "Parameter 'new_name' is required."
for row in self.data:
row[new_name] = row.pop(column, None)
return f"Renamed '{column}' to '{new_name}'."
# ── map_values ───────────────────────────────────────────────────────
def _cmd_map_values(self, column: Optional[str], params: Dict) -> str:
err = self._validate_column(column)
if err:
return err
mapping = params.get("mapping", {})
if not mapping:
return "Parameter 'mapping' (dict) is required."
modified = 0
for row in self.data:
key = str(row[column]) if row[column] is not None else None
if key in mapping:
row[column] = mapping[key]
modified += 1
return f"Mapped {modified} values in '{column}'."
# ── filter_rows ──────────────────────────────────────────────────────
def _cmd_filter_rows(self, column: Optional[str], params: Dict) -> str:
err = self._validate_column(column)
if err:
return err
operator = params.get("operator", "==")
value = params.get("value")
if value is None:
return "Parameter 'value' is required."
before = len(self.data)
kept: List[Dict] = []
for row in self.data:
cell = row.get(column)
remove = False
try:
if operator == "==":
remove = str(cell).strip() == str(value).strip()
elif operator == "!=":
remove = str(cell).strip() != str(value).strip()
elif operator in (">", "<", ">=", "<="):
num = clean_numeric(cell)
threshold = float(value)
if num is not None:
if operator == ">":
remove = num > threshold
elif operator == "<":
remove = num < threshold
elif operator == ">=":
remove = num >= threshold
elif operator == "<=":
remove = num <= threshold
elif operator == "is_null":
remove = is_null(cell)
else:
return f"Unknown operator '{operator}'. Use: ==, !=, >, <, >=, <=, is_null."
except (ValueError, TypeError):
pass
if not remove:
kept.append(row)
self.data = kept
removed = before - len(self.data)
return f"Removed {removed} rows where '{column}' {operator} {value}. {len(self.data)} remaining."
# ── split_column ─────────────────────────────────────────────────────
def _cmd_split_column(self, column: Optional[str], params: Dict) -> str:
err = self._validate_column(column)
if err:
return err
delimiter = params.get("delimiter", ",")
new_columns = params.get("new_columns", [])
if not new_columns:
return "Parameter 'new_columns' (list of names) is required."
for row in self.data:
val = str(row.get(column, ""))
parts = val.split(delimiter)
for i, new_col in enumerate(new_columns):
row[new_col] = parts[i].strip() if i < len(parts) else None
if params.get("drop_original", False):
for row in self.data:
row.pop(column, None)
return f"Split '{column}' into {new_columns}."
# ── merge_columns ────────────────────────────────────────────────────
def _cmd_merge_columns(self, column: Optional[str], params: Dict) -> str:
columns_list = params.get("columns", [])
separator = params.get("separator", " ")
new_column = params.get("new_column", column)
if not columns_list:
return "Parameter 'columns' (list) is required."
if not new_column:
return "Parameter 'new_column' or column is required."
for row in self.data:
parts = [str(row.get(col, "")) for col in columns_list]
row[new_column] = separator.join(parts)
if params.get("drop_originals", False):
for row in self.data:
for col in columns_list:
if col != new_column:
row.pop(col, None)
return f"Merged {columns_list} into '{new_column}'."
# ── join ─────────────────────────────────────────────────────────────
def _cmd_join(self, column: Optional[str], params: Dict) -> str:
if self.secondary_data is None:
return (
"Join already completed β€” secondary dataset was merged earlier this episode. "
f"Current table has {len(self.data)} rows and columns: {self.columns}. "
"Do NOT call join again. Clean remaining issues (casing, types, totals) and submit."
)
on = column or params.get("on")
if not on:
return "Join column required via 'column' or params 'on'."
how = params.get("how", "inner")
if how not in ("inner", "left"):
return "Supported join types: 'inner', 'left'."
lookup: Dict[str, Dict] = {}
for row in self.secondary_data:
key = str(row.get(on, "")).strip()
lookup[key] = row
joined: List[Dict] = []
matched = 0
for row in self.data:
key = str(row.get(on, "")).strip()
merged_row = dict(row)
if key in lookup:
for k, v in lookup[key].items():
if k != on:
merged_row[k] = v
matched += 1
joined.append(merged_row)
elif how == "left":
joined.append(merged_row)
self.data = joined
self.secondary_data = None
return (
f"Joined {matched} rows on '{on}' ({how}). "
f"{len(self.data)} rows in result."
)
# ── add_column ───────────────────────────────────────────────────────
def _cmd_add_column(self, column: Optional[str], params: Dict) -> str:
if not column:
return "Column name for the new column is required."
expression = params.get("expression", "")
if not expression:
return "Parameter 'expression' is required (e.g., 'quantity * unit_price')."
match = re.match(r"^(\w+)\s*([+\-*/])\s*(\w+)$", expression.strip())
if not match:
constant = params.get("value")
if constant is not None:
for row in self.data:
row[column] = constant
return f"Added column '{column}' with constant value {constant}."
return (
f"Expression '{expression}' not supported. "
"Use: 'column_a operator column_b' (operators: +, -, *, /)."
)
col_a, op, col_b = match.groups()
computed = 0
for row in self.data:
a = clean_numeric(row.get(col_a))
b = clean_numeric(row.get(col_b))
if a is not None and b is not None:
if op == "+":
row[column] = round(a + b, 2)
elif op == "-":
row[column] = round(a - b, 2)
elif op == "*":
row[column] = round(a * b, 2)
elif op == "/":
row[column] = round(a / b, 2) if b != 0 else None
computed += 1
else:
row[column] = None
return f"Computed '{column}' = {expression} for {computed} rows."