Spaces:
Sleeping
Sleeping
| import re | |
| import statistics | |
| from datetime import datetime | |
| from typing import Any, Dict, List, Optional | |
| DATE_FORMATS = [ | |
| "%Y-%m-%d", | |
| "%m/%d/%Y", | |
| "%d/%m/%Y", | |
| "%b %d, %Y", | |
| "%d-%b-%Y", | |
| "%B %d %Y", | |
| "%B %d, %Y", | |
| "%Y/%m/%d", | |
| "%m-%d-%Y", | |
| "%d.%m.%Y", | |
| ] | |
| def is_null(value: Any) -> bool: | |
| if value is None: | |
| return True | |
| if isinstance(value, str) and value.strip().lower() in ( | |
| "", "n/a", "na", "null", "none", "nan", "-", "missing", | |
| ): | |
| return True | |
| return False | |
| def clean_numeric(value: Any) -> Optional[float]: | |
| if is_null(value): | |
| return None | |
| s = str(value).strip().replace("$", "").replace(",", "").replace(" ", "") | |
| try: | |
| return float(s) | |
| except (ValueError, TypeError): | |
| return None | |
| def parse_date(value: str) -> Optional[datetime]: | |
| if not value or not isinstance(value, str): | |
| return None | |
| value = value.strip() | |
| for fmt in DATE_FORMATS: | |
| try: | |
| return datetime.strptime(value, fmt) | |
| except ValueError: | |
| continue | |
| return None | |
| def normalize_phone(value: Any) -> str: | |
| if is_null(value): | |
| return "" | |
| digits = re.sub(r"\D", "", str(value)) | |
| if len(digits) == 11 and digits[0] == "1": | |
| digits = digits[1:] | |
| if len(digits) == 10: | |
| return f"({digits[:3]}) {digits[3:6]}-{digits[6:]}" | |
| return str(value) | |
| class DataEngine: | |
| COMMANDS = [ | |
| "inspect", | |
| "drop_duplicates", | |
| "fill_missing", | |
| "drop_nulls", | |
| "convert_type", | |
| "normalize_text", | |
| "standardize_date", | |
| "standardize_phone", | |
| "rename_column", | |
| "map_values", | |
| "filter_rows", | |
| "split_column", | |
| "merge_columns", | |
| "join", | |
| "add_column", | |
| "submit", | |
| ] | |
| def __init__( | |
| self, | |
| data: List[Dict[str, Any]], | |
| secondary_data: Optional[List[Dict[str, Any]]] = None, | |
| ): | |
| self.data = [dict(row) for row in data] | |
| self.secondary_data = ( | |
| [dict(row) for row in secondary_data] if secondary_data else None | |
| ) | |
| def columns(self) -> List[str]: | |
| return list(self.data[0].keys()) if self.data else [] | |
| def execute(self, command: str, column: Optional[str], params: Dict[str, Any]) -> str: | |
| if command not in self.COMMANDS: | |
| return f"Unknown command '{command}'. Available: {', '.join(self.COMMANDS)}" | |
| if command == "submit": | |
| return "submitted" | |
| handler = getattr(self, f"_cmd_{command}", None) | |
| if not handler: | |
| return f"Command '{command}' is not implemented." | |
| try: | |
| return handler(column, params) | |
| except Exception as e: | |
| return f"Error executing '{command}': {e}" | |
| def _validate_column(self, column: Optional[str]) -> Optional[str]: | |
| if not column: | |
| return "Column name is required for this command." | |
| if column not in self.columns: | |
| return f"Column '{column}' not found. Available: {self.columns}" | |
| return None | |
| # ββ inspect ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _cmd_inspect(self, column: Optional[str], params: Dict) -> str: | |
| if column: | |
| err = self._validate_column(column) | |
| if err: | |
| return err | |
| values = [row.get(column) for row in self.data] | |
| non_null = [v for v in values if not is_null(v)] | |
| null_count = len(values) - len(non_null) | |
| unique = set(str(v) for v in non_null) | |
| types = set(type(v).__name__ for v in non_null) | |
| sample = [str(v) for v in non_null[:8]] | |
| return ( | |
| f"Column '{column}': {len(values)} total, {null_count} nulls, " | |
| f"{len(unique)} unique, types: {types}. Sample: {sample}" | |
| ) | |
| return f"Dataset: {len(self.data)} rows, columns: {self.columns}" | |
| # ββ drop_duplicates ββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _cmd_drop_duplicates(self, column: Optional[str], params: Dict) -> str: | |
| subset = params.get("subset", self.columns) | |
| if isinstance(subset, str): | |
| subset = [subset] | |
| seen: set = set() | |
| unique: List[Dict] = [] | |
| for row in self.data: | |
| key = tuple(str(row.get(col, "")) for col in subset) | |
| if key not in seen: | |
| seen.add(key) | |
| unique.append(row) | |
| removed = len(self.data) - len(unique) | |
| self.data = unique | |
| return f"Removed {removed} duplicate rows. {len(self.data)} rows remaining." | |
| # ββ fill_missing βββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _cmd_fill_missing(self, column: Optional[str], params: Dict) -> str: | |
| err = self._validate_column(column) | |
| if err: | |
| return err | |
| strategy = params.get("strategy", "constant") | |
| fill_value = params.get("value") | |
| if strategy == "constant" and fill_value is None: | |
| return "Strategy 'constant' requires a 'value' parameter." | |
| non_null_values = [row[column] for row in self.data if not is_null(row.get(column))] | |
| if strategy == "mean": | |
| nums = [n for n in (clean_numeric(v) for v in non_null_values) if n is not None] | |
| fill_value = round(statistics.mean(nums), 2) if nums else 0 | |
| elif strategy == "median": | |
| nums = [n for n in (clean_numeric(v) for v in non_null_values) if n is not None] | |
| fill_value = round(statistics.median(nums), 2) if nums else 0 | |
| elif strategy == "mode": | |
| fill_value = ( | |
| max(set(non_null_values), key=non_null_values.count) | |
| if non_null_values | |
| else "" | |
| ) | |
| filled = 0 | |
| for row in self.data: | |
| if is_null(row.get(column)): | |
| row[column] = fill_value | |
| filled += 1 | |
| return f"Filled {filled} missing values in '{column}' with {strategy} ({fill_value})." | |
| # ββ drop_nulls βββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _cmd_drop_nulls(self, column: Optional[str], params: Dict) -> str: | |
| err = self._validate_column(column) | |
| if err: | |
| return err | |
| before = len(self.data) | |
| self.data = [row for row in self.data if not is_null(row.get(column))] | |
| removed = before - len(self.data) | |
| return f"Dropped {removed} rows with null '{column}'. {len(self.data)} remaining." | |
| # ββ convert_type βββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _cmd_convert_type(self, column: Optional[str], params: Dict) -> str: | |
| err = self._validate_column(column) | |
| if err: | |
| return err | |
| target = params.get("target_type", "str") | |
| converted = 0 | |
| errors = 0 | |
| for row in self.data: | |
| val = row[column] | |
| if is_null(val): | |
| row[column] = None | |
| continue | |
| try: | |
| if target == "int": | |
| cleaned = clean_numeric(val) | |
| row[column] = int(cleaned) if cleaned is not None else None | |
| elif target == "float": | |
| row[column] = clean_numeric(val) | |
| elif target == "str": | |
| row[column] = str(val) | |
| else: | |
| return f"Unsupported target type '{target}'. Use: int, float, str." | |
| converted += 1 | |
| except (ValueError, TypeError): | |
| row[column] = None | |
| errors += 1 | |
| return f"Converted {converted} values in '{column}' to {target}. {errors} errors." | |
| # ββ normalize_text βββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _cmd_normalize_text(self, column: Optional[str], params: Dict) -> str: | |
| err = self._validate_column(column) | |
| if err: | |
| return err | |
| operation = params.get("operation", "trim") | |
| pattern = params.get("pattern", "") | |
| replacement = params.get("replacement", "") | |
| modified = 0 | |
| for row in self.data: | |
| val = row[column] | |
| if is_null(val): | |
| continue | |
| original = str(val) | |
| if operation == "trim": | |
| row[column] = original.strip() | |
| elif operation == "lower": | |
| row[column] = original.strip().lower() | |
| elif operation == "upper": | |
| row[column] = original.strip().upper() | |
| elif operation == "title": | |
| row[column] = original.strip().title() | |
| elif operation == "regex_replace": | |
| if not pattern: | |
| return "regex_replace requires a 'pattern' parameter." | |
| row[column] = re.sub(pattern, replacement, original) | |
| else: | |
| return ( | |
| f"Unknown operation '{operation}'. " | |
| "Use: trim, lower, upper, title, regex_replace." | |
| ) | |
| if row[column] != original: | |
| modified += 1 | |
| return f"Normalized {modified} values in '{column}' with '{operation}'." | |
| # ββ standardize_date βββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _cmd_standardize_date(self, column: Optional[str], params: Dict) -> str: | |
| err = self._validate_column(column) | |
| if err: | |
| return err | |
| target_format = params.get("format", "%Y-%m-%d") | |
| converted = 0 | |
| failed = 0 | |
| for row in self.data: | |
| val = row[column] | |
| if is_null(val): | |
| continue | |
| parsed = parse_date(str(val)) | |
| if parsed: | |
| row[column] = parsed.strftime(target_format) | |
| converted += 1 | |
| else: | |
| failed += 1 | |
| return ( | |
| f"Standardized {converted} dates in '{column}'. " | |
| f"{failed} could not be parsed." | |
| ) | |
| # ββ standardize_phone ββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _cmd_standardize_phone(self, column: Optional[str], params: Dict) -> str: | |
| err = self._validate_column(column) | |
| if err: | |
| return err | |
| modified = 0 | |
| for row in self.data: | |
| val = row[column] | |
| if is_null(val): | |
| continue | |
| normalized = normalize_phone(val) | |
| if normalized != str(val): | |
| modified += 1 | |
| row[column] = normalized | |
| return f"Standardized {modified} phone numbers in '{column}'." | |
| # ββ rename_column ββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _cmd_rename_column(self, column: Optional[str], params: Dict) -> str: | |
| err = self._validate_column(column) | |
| if err: | |
| return err | |
| new_name = params.get("new_name") | |
| if not new_name: | |
| return "Parameter 'new_name' is required." | |
| for row in self.data: | |
| row[new_name] = row.pop(column, None) | |
| return f"Renamed '{column}' to '{new_name}'." | |
| # ββ map_values βββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _cmd_map_values(self, column: Optional[str], params: Dict) -> str: | |
| err = self._validate_column(column) | |
| if err: | |
| return err | |
| mapping = params.get("mapping", {}) | |
| if not mapping: | |
| return "Parameter 'mapping' (dict) is required." | |
| modified = 0 | |
| for row in self.data: | |
| key = str(row[column]) if row[column] is not None else None | |
| if key in mapping: | |
| row[column] = mapping[key] | |
| modified += 1 | |
| return f"Mapped {modified} values in '{column}'." | |
| # ββ filter_rows ββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _cmd_filter_rows(self, column: Optional[str], params: Dict) -> str: | |
| err = self._validate_column(column) | |
| if err: | |
| return err | |
| operator = params.get("operator", "==") | |
| value = params.get("value") | |
| if value is None: | |
| return "Parameter 'value' is required." | |
| before = len(self.data) | |
| kept: List[Dict] = [] | |
| for row in self.data: | |
| cell = row.get(column) | |
| remove = False | |
| try: | |
| if operator == "==": | |
| remove = str(cell).strip() == str(value).strip() | |
| elif operator == "!=": | |
| remove = str(cell).strip() != str(value).strip() | |
| elif operator in (">", "<", ">=", "<="): | |
| num = clean_numeric(cell) | |
| threshold = float(value) | |
| if num is not None: | |
| if operator == ">": | |
| remove = num > threshold | |
| elif operator == "<": | |
| remove = num < threshold | |
| elif operator == ">=": | |
| remove = num >= threshold | |
| elif operator == "<=": | |
| remove = num <= threshold | |
| elif operator == "is_null": | |
| remove = is_null(cell) | |
| else: | |
| return f"Unknown operator '{operator}'. Use: ==, !=, >, <, >=, <=, is_null." | |
| except (ValueError, TypeError): | |
| pass | |
| if not remove: | |
| kept.append(row) | |
| self.data = kept | |
| removed = before - len(self.data) | |
| return f"Removed {removed} rows where '{column}' {operator} {value}. {len(self.data)} remaining." | |
| # ββ split_column βββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _cmd_split_column(self, column: Optional[str], params: Dict) -> str: | |
| err = self._validate_column(column) | |
| if err: | |
| return err | |
| delimiter = params.get("delimiter", ",") | |
| new_columns = params.get("new_columns", []) | |
| if not new_columns: | |
| return "Parameter 'new_columns' (list of names) is required." | |
| for row in self.data: | |
| val = str(row.get(column, "")) | |
| parts = val.split(delimiter) | |
| for i, new_col in enumerate(new_columns): | |
| row[new_col] = parts[i].strip() if i < len(parts) else None | |
| if params.get("drop_original", False): | |
| for row in self.data: | |
| row.pop(column, None) | |
| return f"Split '{column}' into {new_columns}." | |
| # ββ merge_columns ββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _cmd_merge_columns(self, column: Optional[str], params: Dict) -> str: | |
| columns_list = params.get("columns", []) | |
| separator = params.get("separator", " ") | |
| new_column = params.get("new_column", column) | |
| if not columns_list: | |
| return "Parameter 'columns' (list) is required." | |
| if not new_column: | |
| return "Parameter 'new_column' or column is required." | |
| for row in self.data: | |
| parts = [str(row.get(col, "")) for col in columns_list] | |
| row[new_column] = separator.join(parts) | |
| if params.get("drop_originals", False): | |
| for row in self.data: | |
| for col in columns_list: | |
| if col != new_column: | |
| row.pop(col, None) | |
| return f"Merged {columns_list} into '{new_column}'." | |
| # ββ join βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _cmd_join(self, column: Optional[str], params: Dict) -> str: | |
| if self.secondary_data is None: | |
| return ( | |
| "Join already completed β secondary dataset was merged earlier this episode. " | |
| f"Current table has {len(self.data)} rows and columns: {self.columns}. " | |
| "Do NOT call join again. Clean remaining issues (casing, types, totals) and submit." | |
| ) | |
| on = column or params.get("on") | |
| if not on: | |
| return "Join column required via 'column' or params 'on'." | |
| how = params.get("how", "inner") | |
| if how not in ("inner", "left"): | |
| return "Supported join types: 'inner', 'left'." | |
| lookup: Dict[str, Dict] = {} | |
| for row in self.secondary_data: | |
| key = str(row.get(on, "")).strip() | |
| lookup[key] = row | |
| joined: List[Dict] = [] | |
| matched = 0 | |
| for row in self.data: | |
| key = str(row.get(on, "")).strip() | |
| merged_row = dict(row) | |
| if key in lookup: | |
| for k, v in lookup[key].items(): | |
| if k != on: | |
| merged_row[k] = v | |
| matched += 1 | |
| joined.append(merged_row) | |
| elif how == "left": | |
| joined.append(merged_row) | |
| self.data = joined | |
| self.secondary_data = None | |
| return ( | |
| f"Joined {matched} rows on '{on}' ({how}). " | |
| f"{len(self.data)} rows in result." | |
| ) | |
| # ββ add_column βββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _cmd_add_column(self, column: Optional[str], params: Dict) -> str: | |
| if not column: | |
| return "Column name for the new column is required." | |
| expression = params.get("expression", "") | |
| if not expression: | |
| return "Parameter 'expression' is required (e.g., 'quantity * unit_price')." | |
| match = re.match(r"^(\w+)\s*([+\-*/])\s*(\w+)$", expression.strip()) | |
| if not match: | |
| constant = params.get("value") | |
| if constant is not None: | |
| for row in self.data: | |
| row[column] = constant | |
| return f"Added column '{column}' with constant value {constant}." | |
| return ( | |
| f"Expression '{expression}' not supported. " | |
| "Use: 'column_a operator column_b' (operators: +, -, *, /)." | |
| ) | |
| col_a, op, col_b = match.groups() | |
| computed = 0 | |
| for row in self.data: | |
| a = clean_numeric(row.get(col_a)) | |
| b = clean_numeric(row.get(col_b)) | |
| if a is not None and b is not None: | |
| if op == "+": | |
| row[column] = round(a + b, 2) | |
| elif op == "-": | |
| row[column] = round(a - b, 2) | |
| elif op == "*": | |
| row[column] = round(a * b, 2) | |
| elif op == "/": | |
| row[column] = round(a / b, 2) if b != 0 else None | |
| computed += 1 | |
| else: | |
| row[column] = None | |
| return f"Computed '{column}' = {expression} for {computed} rows." | |