File size: 22,159 Bytes
bb479f9
 
 
 
 
cf67c72
 
 
 
 
 
 
 
bb479f9
 
cf67c72
bb479f9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
cf67c72
bb479f9
 
 
cf67c72
 
 
bb479f9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
cf67c72
bb479f9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
cf67c72
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
bb479f9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
cf67c72
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
bb479f9
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
"""
Cleaning Engine for DataWranglerEnv.

Parses text commands from the agent and executes data cleaning operations
on the working DataFrame. Returns text results for the observation.

Commands:
    Diagnostic: help, view, profile, profile_column, find_missing,
                find_duplicates, find_outliers
    Cleaning:   fill_missing, remove_duplicates, fix_dtype, replace,
                regex_replace, standardize, remove_rows, clip,
                rename_column, drop_column, sort
    Special:    validate, submit
"""

import re
from typing import Any, Dict, Optional, Tuple

import numpy as np
import pandas as pd

from .dataset_generator import COMMANDS_HELP


class CleaningEngine:
    """Parses agent text commands and applies cleaning operations to a DataFrame."""

    def __init__(self, df: pd.DataFrame):
        self.df = df

    def execute(self, command_str: str) -> Tuple[str, bool]:
        """Parse and execute a text command.

        Args:
            command_str: Raw text command from the agent

        Returns:
            Tuple of (response_text, data_was_modified)
        """
        command_str = command_str.strip()
        if not command_str:
            return "Error: Empty command. Type 'help' for available commands.", False

        parts = self._parse_command(command_str)
        cmd = parts[0].lower()
        args = parts[1:]

        dispatch = {
            "help": self._cmd_help,
            "view": self._cmd_view,
            "profile": self._cmd_profile,
            "profile_column": self._cmd_profile_column,
            "find_missing": self._cmd_find_missing,
            "find_duplicates": self._cmd_find_duplicates,
            "find_outliers": self._cmd_find_outliers,
            "fill_missing": self._cmd_fill_missing,
            "remove_duplicates": self._cmd_remove_duplicates,
            "fix_dtype": self._cmd_fix_dtype,
            "replace": self._cmd_replace,
            "regex_replace": self._cmd_regex_replace,
            "standardize": self._cmd_standardize,
            "remove_rows": self._cmd_remove_rows,
            "clip": self._cmd_clip,
            "rename_column": self._cmd_rename_column,
            "drop_column": self._cmd_drop_column,
            "sort": self._cmd_sort,
            "validate": self._cmd_validate,
            "submit": self._cmd_submit,
        }

        handler = dispatch.get(cmd)
        if handler is None:
            suggestions = [c for c in dispatch.keys() if c.startswith(cmd[:3])] if len(cmd) >= 3 else []
            msg = f"Error: Unknown command '{cmd}'."
            if suggestions:
                msg += f" Did you mean: {', '.join(suggestions)}?"
            msg += " Type 'help' for available commands."
            return msg, False

        try:
            return handler(args)
        except Exception as e:
            return f"Error executing '{cmd}': {str(e)}", False

    def _parse_command(self, command_str: str) -> list:
        """Parse command string, handling quoted arguments."""
        parts = []
        current = ""
        in_quotes = False
        quote_char = None

        for char in command_str:
            if char in ('"', "'") and not in_quotes:
                in_quotes = True
                quote_char = char
            elif char == quote_char and in_quotes:
                in_quotes = False
                quote_char = None
            elif char == " " and not in_quotes:
                if current:
                    parts.append(current)
                    current = ""
            else:
                current += char

        if current:
            parts.append(current)
        return parts

    # ── Diagnostic commands (read-only) ──────────────────────────────────

    def _cmd_help(self, args: list) -> Tuple[str, bool]:
        return COMMANDS_HELP, False

    def _cmd_view(self, args: list) -> Tuple[str, bool]:
        n = 10
        if args:
            try:
                n = int(args[0])
            except ValueError:
                return "Error: 'view' expects an integer argument. Usage: view [N]", False
        n = min(n, 50)
        result = self.df.head(n).to_string(max_colwidth=30)
        return f"Showing first {n} rows ({len(self.df)} total):\n\n{result}", False

    def _cmd_profile(self, args: list) -> Tuple[str, bool]:
        lines = []
        lines.append(f"Dataset Shape: {self.df.shape[0]} rows Γ— {self.df.shape[1]} columns")
        lines.append(f"\nColumns:")
        lines.append(f"{'Column':<20} {'Type':<12} {'Non-Null':<10} {'Missing':<10} {'Missing%':<10} {'Unique':<8}")
        lines.append("-" * 70)
        for col in self.df.columns:
            dtype = str(self.df[col].dtype)
            non_null = self.df[col].notna().sum()
            missing = self.df[col].isna().sum()
            missing_pct = f"{(missing / len(self.df) * 100):.1f}%"
            unique = self.df[col].nunique()
            lines.append(f"{col:<20} {dtype:<12} {non_null:<10} {missing:<10} {missing_pct:<10} {unique:<8}")

        n_dupes = self.df.duplicated().sum()
        lines.append(f"\nDuplicate rows: {n_dupes} ({n_dupes / len(self.df) * 100:.1f}%)")
        return "\n".join(lines), False

    def _cmd_profile_column(self, args: list) -> Tuple[str, bool]:
        if not args:
            return "Error: Usage: profile_column COLUMN_NAME", False
        col = args[0]
        if col not in self.df.columns:
            return f"Error: Column '{col}' not found. Available: {', '.join(self.df.columns)}", False

        lines = [f"Profile for column '{col}':"]
        series = self.df[col]
        lines.append(f"  Type: {series.dtype}")
        lines.append(f"  Non-null: {series.notna().sum()} / {len(series)}")
        lines.append(f"  Missing: {series.isna().sum()} ({series.isna().mean() * 100:.1f}%)")
        lines.append(f"  Unique values: {series.nunique()}")

        if pd.api.types.is_numeric_dtype(series):
            desc = series.describe()
            lines.append(f"  Min: {desc.get('min', 'N/A')}")
            lines.append(f"  Max: {desc.get('max', 'N/A')}")
            lines.append(f"  Mean: {desc.get('mean', 'N/A'):.2f}" if pd.notna(desc.get('mean')) else "  Mean: N/A")
            lines.append(f"  Std: {desc.get('std', 'N/A'):.2f}" if pd.notna(desc.get('std')) else "  Std: N/A")
            lines.append(f"  Median: {desc.get('50%', 'N/A')}")
        else:
            top_values = series.dropna().value_counts().head(10)
            lines.append(f"  Top values:")
            for val, count in top_values.items():
                lines.append(f"    '{val}': {count}")

        return "\n".join(lines), False

    def _cmd_find_missing(self, args: list) -> Tuple[str, bool]:
        missing = self.df.isnull().sum()
        missing = missing[missing > 0]
        if missing.empty:
            return "No missing values found! The dataset is complete.", False

        lines = ["Missing values by column:"]
        lines.append(f"{'Column':<25} {'Count':<8} {'Percentage':<10}")
        lines.append("-" * 43)
        for col, count in missing.sort_values(ascending=False).items():
            pct = f"{count / len(self.df) * 100:.1f}%"
            lines.append(f"{col:<25} {count:<8} {pct:<10}")
        lines.append(f"\nTotal missing cells: {missing.sum()}")
        return "\n".join(lines), False

    def _cmd_find_duplicates(self, args: list) -> Tuple[str, bool]:
        subset = None
        if args:
            subset = [c.strip() for c in args[0].split(",")]
            invalid = [c for c in subset if c not in self.df.columns]
            if invalid:
                return f"Error: Unknown columns: {invalid}. Available: {list(self.df.columns)}", False

        dupes = self.df[self.df.duplicated(subset=subset, keep=False)]
        n_dupes = self.df.duplicated(subset=subset, keep="first").sum()

        if n_dupes == 0:
            cols_desc = f" (on columns: {subset})" if subset else ""
            return f"No duplicate rows found{cols_desc}.", False

        lines = [f"Found {n_dupes} duplicate rows (keeping first occurrence):"]
        if len(dupes) <= 20:
            lines.append(dupes.to_string(max_colwidth=25))
        else:
            lines.append(f"Showing first 10 of {len(dupes)} duplicate entries:")
            lines.append(dupes.head(10).to_string(max_colwidth=25))
        return "\n".join(lines), False

    def _cmd_find_outliers(self, args: list) -> Tuple[str, bool]:
        if not args:
            return "Error: Usage: find_outliers COLUMN_NAME", False
        col = args[0]
        if col not in self.df.columns:
            return f"Error: Column '{col}' not found. Available: {', '.join(self.df.columns)}", False

        try:
            numeric_col = pd.to_numeric(self.df[col], errors="coerce")
        except Exception:
            return f"Error: Column '{col}' cannot be converted to numeric for outlier detection.", False

        q1 = numeric_col.quantile(0.25)
        q3 = numeric_col.quantile(0.75)
        iqr = q3 - q1
        lower = q1 - 1.5 * iqr
        upper = q3 + 1.5 * iqr

        outliers_mask = (numeric_col < lower) | (numeric_col > upper)
        n_outliers = outliers_mask.sum()

        if n_outliers == 0:
            return f"No outliers found in '{col}' (IQR method, bounds: [{lower:.2f}, {upper:.2f}]).", False

        lines = [f"Found {n_outliers} outliers in '{col}' (IQR method):"]
        lines.append(f"  Q1: {q1:.2f}, Q3: {q3:.2f}, IQR: {iqr:.2f}")
        lines.append(f"  Lower bound: {lower:.2f}")
        lines.append(f"  Upper bound: {upper:.2f}")
        outlier_values = numeric_col[outliers_mask].dropna()
        lines.append(f"  Outlier values: {list(outlier_values.head(15).values)}")
        return "\n".join(lines), False

    # ── Cleaning commands (modify data) ──────────────────────────────────

    def _cmd_fill_missing(self, args: list) -> Tuple[str, bool]:
        if len(args) < 2:
            return "Error: Usage: fill_missing COLUMN STRATEGY [VALUE]\n  Strategies: mean, median, mode, constant, forward_fill", False
        col = args[0]
        strategy = args[1].lower()
        if col not in self.df.columns:
            return f"Error: Column '{col}' not found. Available: {', '.join(self.df.columns)}", False

        n_before = self.df[col].isna().sum()
        if n_before == 0:
            return f"No missing values in '{col}'. Nothing to fill.", False

        if strategy == "mean":
            try:
                fill_val = pd.to_numeric(self.df[col], errors="coerce").mean()
                self.df[col] = pd.to_numeric(self.df[col], errors="coerce").fillna(fill_val)
            except Exception:
                return f"Error: Cannot compute mean for non-numeric column '{col}'.", False
        elif strategy == "median":
            try:
                fill_val = pd.to_numeric(self.df[col], errors="coerce").median()
                self.df[col] = pd.to_numeric(self.df[col], errors="coerce").fillna(fill_val)
            except Exception:
                return f"Error: Cannot compute median for non-numeric column '{col}'.", False
        elif strategy == "mode":
            mode_val = self.df[col].mode()
            if mode_val.empty:
                return f"Error: No mode found for '{col}'.", False
            self.df[col] = self.df[col].fillna(mode_val.iloc[0])
        elif strategy == "constant":
            if len(args) < 3:
                return "Error: 'constant' strategy requires a VALUE. Usage: fill_missing COL constant VALUE", False
            fill_val = args[2]
            self.df[col] = self.df[col].fillna(fill_val)
        elif strategy == "forward_fill":
            self.df[col] = self.df[col].ffill()
        else:
            return f"Error: Unknown strategy '{strategy}'. Use: mean, median, mode, constant, forward_fill", False

        n_after = self.df[col].isna().sum()
        filled = n_before - n_after
        return f"Filled {filled} missing values in '{col}' using strategy '{strategy}'. Remaining: {n_after}", True

    def _cmd_remove_duplicates(self, args: list) -> Tuple[str, bool]:
        subset = None
        keep = "first"
        if args:
            subset = [c.strip() for c in args[0].split(",")]
            invalid = [c for c in subset if c not in self.df.columns]
            if invalid:
                return f"Error: Unknown columns: {invalid}. Available: {list(self.df.columns)}", False
            if len(args) > 1:
                keep = args[1].lower()
                if keep not in ("first", "last", "none", "false"):
                    return f"Error: keep must be 'first', 'last', or 'none'. Got: '{keep}'", False
                if keep == "none":
                    keep = False

        n_before = len(self.df)
        self.df = self.df.drop_duplicates(subset=subset, keep=keep).reset_index(drop=True)
        n_after = len(self.df)
        removed = n_before - n_after

        if removed == 0:
            return "No duplicate rows found to remove.", False

        return f"Removed {removed} duplicate rows. Dataset: {n_before} β†’ {n_after} rows.", True

    def _cmd_fix_dtype(self, args: list) -> Tuple[str, bool]:
        if len(args) < 2:
            return "Error: Usage: fix_dtype COLUMN TYPE (int/float/str/datetime)", False
        col = args[0]
        target = args[1].lower()
        if col not in self.df.columns:
            return f"Error: Column '{col}' not found.", False

        before_type = str(self.df[col].dtype)
        errors = 0
        if target in ("int", "int64"):
            self.df[col] = self.df[col].astype(str).str.replace(r'[^\d.\-]', '', regex=True)
            numeric = pd.to_numeric(self.df[col], errors="coerce")
            errors = numeric.isna().sum() - self.df[col].isna().sum()
            self.df[col] = numeric.astype("Int64")
        elif target in ("float", "float64"):
            self.df[col] = self.df[col].astype(str).str.replace(r'[^\d.\-]', '', regex=True)
            self.df[col] = pd.to_numeric(self.df[col], errors="coerce")
            errors = self.df[col].isna().sum()
        elif target in ("str", "string", "object"):
            self.df[col] = self.df[col].astype(str)
        elif target in ("datetime", "date"):
            self.df[col] = pd.to_datetime(self.df[col], errors="coerce", infer_datetime_format=True)
            errors = self.df[col].isna().sum()
        else:
            return f"Error: Unknown type '{target}'. Use: int, float, str, datetime", False

        return f"Converted '{col}' from {before_type} β†’ {target}. Coercion errors: {errors}", True

    def _cmd_replace(self, args: list) -> Tuple[str, bool]:
        if len(args) < 3:
            return "Error: Usage: replace COLUMN OLD_VALUE NEW_VALUE", False
        col = args[0]
        old_val = args[1]
        new_val = args[2]
        if col not in self.df.columns:
            return f"Error: Column '{col}' not found.", False

        mask = self.df[col].astype(str) == old_val
        n_matches = mask.sum()

        if n_matches == 0:
            return f"No matches found for '{old_val}' in column '{col}'.", False

        self.df.loc[mask, col] = new_val
        return f"Replaced {n_matches} occurrences of '{old_val}' with '{new_val}' in '{col}'.", True

    def _cmd_regex_replace(self, args: list) -> Tuple[str, bool]:
        """Regex-based replacement within a column."""
        if len(args) < 3:
            return "Error: Usage: regex_replace COLUMN PATTERN REPLACEMENT", False
        col = args[0]
        pattern = args[1]
        replacement = args[2]
        if col not in self.df.columns:
            return f"Error: Column '{col}' not found.", False

        try:
            before_vals = self.df[col].astype(str).copy()
            self.df[col] = self.df[col].astype(str).str.replace(pattern, replacement, regex=True)
            n_changed = (before_vals != self.df[col].astype(str)).sum()
        except re.error as e:
            return f"Error: Invalid regex pattern '{pattern}': {e}", False

        if n_changed == 0:
            return f"No matches for pattern '{pattern}' in column '{col}'.", False
        return f"Regex replaced {n_changed} values in '{col}' (pattern: '{pattern}' β†’ '{replacement}').", True

    def _cmd_standardize(self, args: list) -> Tuple[str, bool]:
        if len(args) < 2:
            return "Error: Usage: standardize COLUMN METHOD (lowercase/uppercase/titlecase/strip)", False
        col = args[0]
        method = args[1].lower()
        if col not in self.df.columns:
            return f"Error: Column '{col}' not found.", False

        before_uniq = self.df[col].nunique()

        if method == "lowercase":
            self.df[col] = self.df[col].astype(str).str.lower()
        elif method == "uppercase":
            self.df[col] = self.df[col].astype(str).str.upper()
        elif method == "titlecase":
            self.df[col] = self.df[col].astype(str).str.title()
        elif method == "strip":
            self.df[col] = self.df[col].astype(str).str.strip()
        else:
            return f"Error: Unknown method '{method}'. Use: lowercase, uppercase, titlecase, strip", False

        after_uniq = self.df[col].nunique()
        consolidated = before_uniq - after_uniq
        return f"Standardized '{col}' using {method}. Unique values: {before_uniq} β†’ {after_uniq} (consolidated {consolidated}).", True

    def _cmd_remove_rows(self, args: list) -> Tuple[str, bool]:
        if len(args) < 3:
            return "Error: Usage: remove_rows COLUMN CONDITION VALUE\n  Conditions: equals, not_equals, less_than, greater_than, contains", False
        col = args[0]
        condition = args[1].lower()
        value = args[2]
        if col not in self.df.columns:
            return f"Error: Column '{col}' not found.", False

        n_before = len(self.df)

        if condition == "equals":
            mask = self.df[col].astype(str) == value
        elif condition == "not_equals":
            mask = self.df[col].astype(str) != value
        elif condition == "less_than":
            try:
                val = float(value)
                mask = pd.to_numeric(self.df[col], errors="coerce") < val
            except ValueError:
                return f"Error: '{value}' is not a valid number for less_than.", False
        elif condition == "greater_than":
            try:
                val = float(value)
                mask = pd.to_numeric(self.df[col], errors="coerce") > val
            except ValueError:
                return f"Error: '{value}' is not a valid number for greater_than.", False
        elif condition == "contains":
            mask = self.df[col].astype(str).str.contains(value, case=False, na=False)
        else:
            return f"Error: Unknown condition '{condition}'. Use: equals, not_equals, less_than, greater_than, contains", False

        n_removed = mask.sum()
        if n_removed == 0:
            return f"No rows match condition '{col} {condition} {value}'.", False

        self.df = self.df[~mask].reset_index(drop=True)
        return f"Removed {n_removed} rows where {col} {condition} {value}. Dataset: {n_before} β†’ {len(self.df)} rows.", True

    def _cmd_clip(self, args: list) -> Tuple[str, bool]:
        if len(args) < 3:
            return "Error: Usage: clip COLUMN LOWER UPPER", False
        col = args[0]
        if col not in self.df.columns:
            return f"Error: Column '{col}' not found.", False
        try:
            lower = float(args[1])
            upper = float(args[2])
        except ValueError:
            return "Error: LOWER and UPPER must be numbers.", False

        numeric_col = pd.to_numeric(self.df[col], errors="coerce")
        n_clipped = ((numeric_col < lower) | (numeric_col > upper)).sum()
        self.df[col] = numeric_col.clip(lower=lower, upper=upper)
        return f"Clipped {n_clipped} values in '{col}' to [{lower}, {upper}].", True

    def _cmd_rename_column(self, args: list) -> Tuple[str, bool]:
        """Rename a column."""
        if len(args) < 2:
            return "Error: Usage: rename_column OLD_NAME NEW_NAME", False
        old_name = args[0]
        new_name = args[1]
        if old_name not in self.df.columns:
            return f"Error: Column '{old_name}' not found.", False
        if new_name in self.df.columns:
            return f"Error: Column '{new_name}' already exists.", False
        self.df = self.df.rename(columns={old_name: new_name})
        return f"Renamed column '{old_name}' β†’ '{new_name}'.", True

    def _cmd_drop_column(self, args: list) -> Tuple[str, bool]:
        """Drop a column from the dataset."""
        if not args:
            return "Error: Usage: drop_column COLUMN_NAME", False
        col = args[0]
        if col not in self.df.columns:
            return f"Error: Column '{col}' not found.", False
        self.df = self.df.drop(columns=[col])
        return f"Dropped column '{col}'. Remaining columns: {len(self.df.columns)}", True

    def _cmd_sort(self, args: list) -> Tuple[str, bool]:
        """Sort dataset by a column."""
        if not args:
            return "Error: Usage: sort COLUMN [asc|desc]", False
        col = args[0]
        if col not in self.df.columns:
            return f"Error: Column '{col}' not found.", False
        ascending = True
        if len(args) > 1 and args[1].lower() == "desc":
            ascending = False
        self.df = self.df.sort_values(by=col, ascending=ascending, na_position="last").reset_index(drop=True)
        direction = "ascending" if ascending else "descending"
        return f"Sorted dataset by '{col}' ({direction}).", True

    # ── Special commands ─────────────────────────────────────────────────

    def _cmd_validate(self, args: list) -> Tuple[str, bool]:
        return "__VALIDATE__", False

    def _cmd_submit(self, args: list) -> Tuple[str, bool]:
        return "__SUBMIT__", False