import pandas as pd import numpy as np from difflib import SequenceMatcher from sklearn.base import BaseEstimator, TransformerMixin from sklearn.pipeline import Pipeline from sklearn.compose import ColumnTransformer from sklearn.impute import SimpleImputer from sklearn.preprocessing import StandardScaler, PolynomialFeatures from sklearn.decomposition import PCA from sklearn.preprocessing import OneHotEncoder try: from category_encoders import TargetEncoder except Exception: TargetEncoder = None class OutlierClipper(BaseEstimator, TransformerMixin): """Clipper to handle extreme outliers using IQR method for numerical stability.""" def __init__(self, factor=3.0): self.factor = factor def fit(self, X, y=None): X = pd.DataFrame(X) self.lower_ = X.quantile(0.25) - self.factor * ( X.quantile(0.75) - X.quantile(0.25) ) self.upper_ = X.quantile(0.75) + self.factor * ( X.quantile(0.75) - X.quantile(0.25) ) return self def transform(self, X): X = pd.DataFrame(X) return X.clip(lower=self.lower_, upper=self.upper_, axis=1).values class SkewTransformer(BaseEstimator, TransformerMixin): """Auto-Log Transformer for skewed numeric features.""" def fit(self, X, y=None): X = pd.DataFrame(X) self.skewed_cols_ = X.columns[X.skew().abs() > 0.75].tolist() return self def transform(self, X): X = pd.DataFrame(X).copy() for col in self.skewed_cols_: # Log transform skewed columns (ensuring non-negative) X[col] = np.log1p(np.maximum(X[col], 0)) return X.values def fuzzy_merge_labels(series: pd.Series, threshold=0.9): """Merge similar categorical labels (e.g., 'Male' vs 'male' or 'Heart' vs 'Heartz').""" counts = series.value_counts() if len(counts) > 50: return series # Too many unique labels for fuzzy unique_labels = counts.index.tolist() mapping = {} for i, label1 in enumerate(unique_labels): if label1 in mapping: continue mapping[label1] = label1 for label2 in unique_labels[i + 1 :]: if label2 in mapping: continue # Similarity ignoring case if ( SequenceMatcher(None, label1.lower(), label2.lower()).ratio() > threshold ): # Merge the less frequent label into the more frequent one mapping[label2] = label1 return series.map(mapping) def smart_extract_numeric(series: pd.Series): """Scan a categorical column for hidden numeric patterns (e.g., '120/80', '10kg').""" sample = series.dropna().head(20).astype(str) # Check for unit patterns ($100, 100%, 10kg) unit_match = sample.str.contains( r"^\s*[\$\€\£]?\s*[\d\.\,]+\s*[\w\%]*\s*$", regex=True ).mean() if unit_match > 0.7: # Extract single number extracted = ( series.astype(str) .str.extract(r"([-+]?\d*\.?\d+)", expand=False) .astype(float) ) return {series.name: extracted} # Check for pair patterns (120/80, 5-10) pair_match = sample.str.contains( r"(?:\d+)\/(?:\d+)|(?:\d+)-(?:\d+)", regex=True ).mean() if pair_match > 0.5: # Extract pairs extracted = series.astype(str).str.extract(r"(\d+)[/-](\d+)", expand=True) if not extracted.isnull().all().all(): return { f"{series.name}_part1": extracted[0].astype(float), f"{series.name}_part2": extracted[1].astype(float), } return None def auto_clean_data(df: pd.DataFrame, target: str) -> tuple[pd.DataFrame, list[str]]: """Automatically scrub the dataset for common quality issues in any domain.""" logs = [] clean_df = df.copy() initial_rows = len(clean_df) original_columns = list(clean_df.columns) clean_df.columns = [str(col).strip().replace("\n", " ") for col in clean_df.columns] if clean_df.columns.tolist() != original_columns: logs.append("Normalized column names by trimming whitespace.") # ── 1. Universal Null Masking ─────────────────────────────────────────── null_placeholders = [ "??", "nan", "none", "null", "unknown", "na", "n/a", "invalid", "?", "-", "none", ] for col in clean_df.columns: if clean_df[col].dtype == "object": clean_df[col] = clean_df[col].replace(null_placeholders, np.nan) # Standardize case/space clean_df[col] = clean_df[col].astype(str).str.strip().replace("nan", np.nan) lower_vals = clean_df[col].dropna().astype(str).str.lower() if ( not lower_vals.empty and lower_vals.isin(["true", "false", "yes", "no", "0", "1"]).mean() > 0.8 ): clean_df[col] = lower_vals.map( { "true": True, "false": False, "yes": True, "no": False, "1": True, "0": False, } ) logs.append(f"Normalized boolean-like values in '{col}'.") # ── 2. Deduplication ──────────────────────────────────────────────────── clean_df.drop_duplicates(inplace=True) if len(clean_df) < initial_rows: logs.append(f"Removed {initial_rows - len(clean_df)} duplicate rows.") # ── 3. Smart Extraction & Label Merging ──────────────────────────────── cols_to_drop = [] id_hints = ["id", "uuid", "uid", "idx", "row_id", "timestamp", "created_at"] for col in clean_df.columns: if col == target: continue # A. High Missing Values (>90%) missing_pct = clean_df[col].isnull().mean() if missing_pct > 0.90: cols_to_drop.append(col) logs.append(f"Dropped '{col}' (>90% missing).") continue # B. Low Variance / Constant Values if clean_df[col].nunique() <= 1: cols_to_drop.append(col) logs.append(f"Dropped '{col}' (zero variance).") continue # C. Clear Identifiers if ( any(h in col.lower() for h in id_hints) and clean_df[col].nunique() > len(clean_df) * 0.8 ): cols_to_drop.append(col) logs.append(f"Dropped '{col}' (probable identifier).") continue # D. Smart Numeric Extraction if clean_df[col].dtype == "object": extracted_features = smart_extract_numeric(clean_df[col]) if extracted_features: for new_col, data in extracted_features.items(): clean_df[new_col] = data cols_to_drop.append(col) logs.append(f"Extracted numeric features from '{col}'.") continue # E. Fuzzy Category Consolidation (Merge typos) if clean_df[col].nunique() < 30: original_n = clean_df[col].nunique() clean_df[col] = fuzzy_merge_labels(clean_df[col]) if clean_df[col].nunique() < original_n: logs.append( f"Consolidated similar labels in '{col}' (merged {original_n - clean_df[col].nunique()} variants)." ) if cols_to_drop: clean_df.drop(columns=cols_to_drop, inplace=True) return clean_df, logs def extract_datetime_features(df): """Automatically extract datetime component features from object columns.""" new_df = df.copy() for col in new_df.columns: if new_df[col].dtype == "object": sample = new_df[col].dropna().head(10) if sample.empty: continue try: pd.to_datetime(sample, format="mixed", errors="raise") new_df[col] = pd.to_datetime(new_df[col], errors="coerce") new_df[f"{col}_year"] = new_df[col].dt.year new_df[f"{col}_month"] = new_df[col].dt.month new_df[f"{col}_day"] = new_df[col].dt.day new_df[f"{col}_is_weekend"] = new_df[col].dt.weekday >= 5 new_df.drop(columns=[col], inplace=True) except Exception: pass return new_df def make_preprocessor(num_cols, cat_cols): """Factory: returns a fresh, unfitted ColumnTransformer per call.""" transformers = [] # Numeric transformer if num_cols: use_interactions = len(num_cols) <= 18 pca_components = None if len(num_cols) >= 40: pca_components = min(24, max(8, len(num_cols) // 3)) num_steps = [ ("imputer", SimpleImputer(strategy="median", add_indicator=True)), ("skew_fix", SkewTransformer()), ("outlier_clipper", OutlierClipper(factor=3.0)), ] if use_interactions: num_steps.append( ( "interactions", PolynomialFeatures( degree=2, interaction_only=True, include_bias=False ), ) ) num_steps.append(("scaler", StandardScaler())) if pca_components: num_steps.append(("pca", PCA(n_components=pca_components, random_state=42))) num_transformer = Pipeline(steps=num_steps) transformers.append(("num", num_transformer, num_cols)) # Categorical transformer if cat_cols: cat_steps = [ ("imputer", SimpleImputer(strategy="constant", fill_value="missing")), ] if TargetEncoder is not None: cat_steps.append( ("target_encoder", TargetEncoder(smoothing=0.3, min_samples_leaf=10)) ) else: cat_steps.append( ("onehot", OneHotEncoder(handle_unknown="ignore", sparse_output=False)) ) cat_transformer = Pipeline(steps=cat_steps) transformers.append(("cat", cat_transformer, cat_cols)) if not transformers: raise ValueError( "Neither numeric nor categorical columns found. Cannot create preprocessor." ) return ColumnTransformer(transformers=transformers) def make_lite_preprocessor(num_cols, cat_cols): """Featherweight preprocessor for Stage 1 sweeps.""" transformers = [] # Numeric transformer if num_cols: num_steps = [ ("imputer", SimpleImputer(strategy="median")), ("scaler", StandardScaler()), ] if len(num_cols) >= 80: num_steps.append( ( "pca", PCA( n_components=min(16, max(6, len(num_cols) // 8)), random_state=42, ), ) ) num_transformer = Pipeline(steps=num_steps) transformers.append(("num", num_transformer, num_cols)) # Categorical transformer if cat_cols: cat_steps = [ ("imputer", SimpleImputer(strategy="constant", fill_value="missing")), ] if TargetEncoder is not None: cat_steps.append(("target_encoder", TargetEncoder())) else: cat_steps.append( ("onehot", OneHotEncoder(handle_unknown="ignore", sparse_output=False)) ) cat_transformer = Pipeline(steps=cat_steps) transformers.append(("cat", cat_transformer, cat_cols)) if not transformers: raise ValueError( "Neither numeric nor categorical columns found. Cannot create preprocessor." ) return ColumnTransformer(transformers=transformers) class DataAgent: """Specialized Agent for dataset DNA analysis and cleaning.""" def __init__(self): self.reasoning = [] def clean(self, df: pd.DataFrame, target: str): self.reasoning.append("DataAgent: Initiating deep cleaning protocol.") df, logs = auto_clean_data(df, target) for log in logs: self.reasoning.append(f"DataAgent Decision: {log}") df = extract_datetime_features(df) self.reasoning.append( "DataAgent: Completed automated feature engineering (Datetime)." ) return df, self.reasoning