import io import os import time import json import tempfile import base64 import numpy as np import pandas as pd import logging import streamlit as st from datetime import datetime # Heavy ML and preprocessing imports are loaded lazily inside functions to # reduce cold-start time (especially important for deployments like HF Spaces). from typing import List, Dict, Optional, Tuple, Any, Union import warnings warnings.filterwarnings("ignore") # Import configuration from config import ( BRAND_NAME, DEFAULT_CV_FOLDS, DEFAULT_N_ESTIMATORS, DEFAULT_RANDOM_STATE, DATE_PARSE_THRESHOLD ) # Optional library availability flags try: import shap SHAP_AVAILABLE = True except Exception: SHAP_AVAILABLE = False try: from xgboost import XGBClassifier, XGBRegressor XGB_AVAILABLE = True except Exception: XGB_AVAILABLE = False try: from lightgbm import LGBMClassifier, LGBMRegressor LGBM_AVAILABLE = True except Exception: LGBM_AVAILABLE = False try: from catboost import CatBoostClassifier, CatBoostRegressor CATBOOST_AVAILABLE = True except Exception: CATBOOST_AVAILABLE = False try: from imblearn.over_sampling import SMOTE IMBLEARN_AVAILABLE = True except Exception: IMBLEARN_AVAILABLE = False try: import optuna OPTUNA_AVAILABLE = True except Exception: OPTUNA_AVAILABLE = False try: import chardet CHARDET_AVAILABLE = True except Exception: CHARDET_AVAILABLE = False try: from reportlab.lib.pagesizes import letter from reportlab.platypus import SimpleDocTemplate, Paragraph, Table, TableStyle, Spacer, Image from reportlab.lib import colors from reportlab.lib.styles import getSampleStyleSheet REPORTLAB_AVAILABLE = True except Exception: REPORTLAB_AVAILABLE = False # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) def reset_app() -> None: """ Reset all application state to initial empty state. Clears base_df, work_df, resets flags, and triggers rerun. """ try: # Explicitly delete DataFrames to free memory if 'base_df' in st.session_state and st.session_state.base_df is not None: del st.session_state.base_df if 'work_df' in st.session_state and st.session_state.work_df is not None: del st.session_state.work_df if 'filtered_data' in st.session_state and st.session_state.filtered_data is not None: del st.session_state.filtered_data st.session_state.base_df = None st.session_state.work_df = None st.session_state.filtered_data = None st.session_state.data_loaded = False st.session_state.reset_requested = True st.session_state.selected_date_col = None st.session_state.date_range = None # Clear all dynamic filters keys_to_clear = [k for k in st.session_state.keys() if k.startswith(("filter_", "slider_"))] for key in keys_to_clear: if key in st.session_state: del st.session_state[key] logger.info("App reset: All session state cleared.") st.rerun() except Exception as e: logger.error(f"Error during reset_app(): {e}") st.error("❌ Failed to reset app. Please refresh the page.") def get_filtered_data() -> pd.DataFrame: """ Apply global user-defined filters to the working dataset (`work_df`). Filters include: - Date range (auto-detected date columns) - Categorical selection (dropdowns for low-cardinality columns) - Numeric range sliders (for top 10 numeric columns) Returns: pd.DataFrame: Filtered view of `st.session_state.work_df` """ if "work_df" not in st.session_state or st.session_state.work_df is None or st.session_state.work_df.empty: return pd.DataFrame() # Lightweight caching: avoid recomputing filtered view on every render try: work = st.session_state.work_df fkey = ( len(work), work.shape[1], int(work.isnull().sum().sum()), int(work.duplicated().sum()), int(work.memory_usage(deep=True).sum()) ) cached = st.session_state.get('filtered_data') cached_key = st.session_state.get('filtered_data_key') if cached is not None and cached_key == fkey: return cached.copy() except Exception: # if anything goes wrong, fall through and recompute pass # Work with a copy to avoid modifying the original filtered = st.session_state.work_df.copy() # Limit the size of data we work with to prevent memory issues max_rows = 10000 # Limit to 10k rows for performance if len(filtered) > max_rows: filtered = filtered.sample(n=max_rows, random_state=42) st.warning(f"⚠️ Working with a sample of {max_rows:,} rows for performance. Load fewer rows for full data.") # --- DATE RANGE FILTER --- date_candidates = [ c for c in filtered.columns if "date" in c.lower() or "time" in c.lower() or pd.api.types.is_datetime64_any_dtype(filtered[c]) ] if date_candidates: # Initialize selected date column if not set if "selected_date_col" not in st.session_state: st.session_state.selected_date_col = date_candidates[0] dcol = st.session_state.selected_date_col try: filtered[dcol] = pd.to_datetime(filtered[dcol], errors="coerce") valid_dates = filtered[dcol].dropna() if len(valid_dates) > 0: mind, maxd = valid_dates.min(), valid_dates.max() if pd.notna(mind) and pd.notna(maxd): # Initialize date range if not set if "date_range" not in st.session_state: st.session_state.date_range = (mind.date(), maxd.date()) sel = st.session_state.date_range if isinstance(sel, tuple) and len(sel) == 2: try: s = pd.to_datetime(sel[0]) e = pd.to_datetime(sel[1]) + pd.Timedelta(days=1) - pd.Timedelta(seconds=1) filtered = filtered[(filtered[dcol] >= s) & (filtered[dcol] <= e)] except Exception as e: logger.warning(f"Invalid date range: {e}") except Exception as e: logger.warning(f"Failed to parse date column '{dcol}': {e}") # --- CATEGORICAL FILTERS --- cat_cols = filtered.select_dtypes(include=["object", "category"]).columns.tolist() for c in cat_cols: n_unique = filtered[c].nunique(dropna=False) if 1 < n_unique <= 30: # Reasonable cardinality for dropdown key = f"filter_{c}" unique_vals = filtered[c].astype(str).unique().tolist() default_selection = unique_vals # Default: all selected # Initialize session state if needed if key not in st.session_state: st.session_state[key] = default_selection picked = st.session_state[key] if isinstance(picked, list) and len(picked) > 0: filtered = filtered[filtered[c].astype(str).isin(picked)] # --- NUMERIC SLIDERS --- num_cols = filtered.select_dtypes(include=[np.number]).columns.tolist() num_cols = num_cols[:10] # Cap at 10 to avoid clutter for c in num_cols: min_val = float(filtered[c].min()) max_val = float(filtered[c].max()) if min_val == max_val: continue key = f"slider_{c}" if key not in st.session_state: st.session_state[key] = (min_val, max_val) lo, hi = st.session_state[key] lo = max(lo, min_val) hi = min(hi, max_val) filtered = filtered[(filtered[c] >= lo) & (filtered[c] <= hi)] # Store filtered copy in session state for quick reuse try: st.session_state['filtered_data'] = filtered.copy() st.session_state['filtered_data_key'] = fkey except Exception: pass return filtered def create_kpis(k1, k2, k3, k4, k5, df: pd.DataFrame) -> None: """ Render five KPI cards in provided Streamlit columns. Dynamically adapts to light/dark theme via CSS classes. Args: k1-k5: Streamlit columns where KPIs will be rendered df: DataFrame to compute metrics from """ if df.empty: for col in [k1, k2, k3, k4, k5]: col.markdown("
N/A
-
", unsafe_allow_html=True) return rows = len(df) cols = df.shape[1] nulls = int(df.isnull().sum().sum()) duplicates = int(df.duplicated().sum()) memory_kb = df.memory_usage(deep=True).sum() / 1024 theme = st.session_state.get("theme", "light") theme_class = "dark" if theme == "dark" else "light" kpi_template = """
{label}
{value}
""" with k1: st.markdown( kpi_template.format(theme=theme_class, label="Rows", value=f"{rows:,}"), unsafe_allow_html=True ) with k2: st.markdown( kpi_template.format(theme=theme_class, label="Columns", value=f"{cols:,}"), unsafe_allow_html=True ) with k3: st.markdown( kpi_template.format(theme=theme_class, label="Nulls", value=f"{nulls:,}"), unsafe_allow_html=True ) with k4: st.markdown( kpi_template.format(theme=theme_class, label="Duplicates", value=f"{duplicates:,}"), unsafe_allow_html=True ) with k5: st.markdown( kpi_template.format(theme=theme_class, label="Memory (KB)", value=f"{memory_kb:,.1f}"), unsafe_allow_html=True ) def detect_encoding_bytes(raw_bytes: bytes) -> str: """ Detect encoding of raw bytes using chardet if available, otherwise default to utf-8 """ if CHARDET_AVAILABLE: try: return chardet.detect(raw_bytes).get('encoding', 'utf-8') except Exception: return 'utf-8' return 'utf-8' def read_csv_robust(uploaded_file) -> pd.DataFrame: """ Robust CSV reader that tries multiple encodings """ try: raw = uploaded_file.read() enc = detect_encoding_bytes(raw) uploaded_file.seek(0) for e in [enc, 'utf-8', 'latin-1', 'cp1252', 'iso-8859-1']: try: uploaded_file.seek(0) return pd.read_csv(uploaded_file, encoding=e) except Exception: continue uploaded_file.seek(0) return pd.read_csv(uploaded_file) except Exception as e: raise Exception(f"Failed to read file: {e}") def df_memory_kb(df: pd.DataFrame) -> float: """ Calculate memory usage of dataframe in KB """ return df.memory_usage(deep=True).sum() / 1024.0 def detect_duplicates(df: pd.DataFrame) -> int: """ Count duplicate rows in dataframe """ return int(df.duplicated().sum()) def detect_high_cardinality(df: pd.DataFrame, threshold: int = 50) -> List[Tuple[str, int]]: """ Detect columns with high cardinality (many unique values) """ out = [] for c in df.select_dtypes(include=['object','category']).columns: if df[c].nunique(dropna=True) > threshold: out.append((c, df[c].nunique())) return out def try_parse_dates(df: pd.DataFrame, sample_rows: int = 1000) -> Tuple[pd.DataFrame, List[str]]: """ Try to parse date columns automatically Returns tuple of (df, list of converted columns) """ converted = [] for col in df.columns: if np.issubdtype(df[col].dtype, np.datetime64): continue if df[col].dtype == object or np.issubdtype(df[col].dtype, np.number): sample = df[col].dropna().astype(str).head(sample_rows) if sample.empty: continue try: parsed = pd.to_datetime(sample, errors='coerce', infer_datetime_format=True) if parsed.notnull().mean() > DATE_PARSE_THRESHOLD: df[col] = pd.to_datetime(df[col], errors='coerce', infer_datetime_format=True) converted.append(col) except Exception: continue return df, converted def date_feature_engineer(df: pd.DataFrame, col: str) -> pd.DataFrame: """ Expand datetime column into components (year, month, day, etc.) """ s = pd.to_datetime(df[col], errors='coerce') df[f"{col}__year"] = s.dt.year df[f"{col}__month"] = s.dt.month df[f"{col}__day"] = s.dt.day df[f"{col}__weekday"] = s.dt.weekday df[f"{col}__is_weekend"] = s.dt.weekday.isin([5,6]).astype(int) df[f"{col}__dayofyear"] = s.dt.dayofyear return df def target_encode_column(train_series: pd.Series, target_series: pd.Series, smoothing: float = 0.2) -> pd.Series: """ Simple target encoding with smoothing to avoid overfitting """ temp = pd.concat([train_series, target_series], axis=1) stats = temp.groupby(train_series.name)[target_series.name].agg(['mean','count']) global_mean = target_series.mean() stats['smoothed'] = (stats['mean'] * stats['count'] + global_mean * smoothing) / (stats['count'] + smoothing) return train_series.map(stats['smoothed']).fillna(global_mean) def build_preprocessor( df: pd.DataFrame, target: str, encoding_strategy: str = 'onehot', scale_numeric: bool = True, poly_degree: int = 1, include_interactions: bool = False, target_encode_cols: Optional[List[str]] = None ) -> Tuple[Any, List[str], List[str], callable]: """ Build ColumnTransformer preprocessor with options Returns: (preprocessor, numeric_cols, cat_cols, get_feature_names_func) """ # Lazy-import sklearn preprocessing tools to reduce cold-start time from sklearn.pipeline import Pipeline from sklearn.compose import ColumnTransformer from sklearn.impute import SimpleImputer from sklearn.preprocessing import OneHotEncoder, OrdinalEncoder, StandardScaler, PolynomialFeatures X = df.drop(columns=[target]) numeric_cols = X.select_dtypes(include=[np.number]).columns.tolist() cat_cols = X.select_dtypes(include=['object','category','bool']).columns.tolist() numeric_cols = [c for c in numeric_cols if not np.issubdtype(df[c].dtype, np.datetime64)] num_steps = [('imputer', SimpleImputer(strategy='mean'))] if scale_numeric: num_steps.append(('scaler', StandardScaler())) if poly_degree > 1: pf = PolynomialFeatures(degree=poly_degree, include_bias=False, interaction_only=not include_interactions) num_steps.append(('poly', pf)) cat_steps = [('imputer', SimpleImputer(strategy='most_frequent'))] if encoding_strategy == 'onehot': cat_steps.append(('encoder', OneHotEncoder(handle_unknown='ignore', sparse_output=False))) else: cat_steps.append(('encoder', OrdinalEncoder())) transformers = [] if numeric_cols: transformers.append(('num', Pipeline(num_steps), numeric_cols)) if cat_cols: transformers.append(('cat', Pipeline(cat_steps), cat_cols)) preproc = ColumnTransformer(transformers=transformers, remainder='drop', sparse_threshold=0) def get_feature_names(preproc_obj): fn = [] if not hasattr(preproc_obj, 'transformers_'): return fn for name, trans, cols in preproc_obj.transformers_: if name == 'num': fn.extend(cols) elif name == 'cat': enc = trans.named_steps.get('encoder') if enc is not None and hasattr(enc, 'get_feature_names_out'): try: fn.extend(list(enc.get_feature_names_out(cols))) except Exception: fn.extend(cols) else: fn.extend(cols) return fn return preproc, numeric_cols, cat_cols, get_feature_names def model_key_to_estimator(key: str, problem_type: str): """ Convert model key string to sklearn estimator instance """ # Lazy-import estimators to avoid heavy sklearn imports during app startup try: from sklearn.ensemble import RandomForestClassifier, RandomForestRegressor from sklearn.neighbors import KNeighborsClassifier, KNeighborsRegressor from sklearn.naive_bayes import GaussianNB from sklearn.svm import SVC from sklearn.linear_model import LogisticRegression, LinearRegression except Exception: # If sklearn isn't available, raise an informative error when used RandomForestClassifier = RandomForestRegressor = None KNeighborsClassifier = KNeighborsRegressor = None GaussianNB = SVC = None LogisticRegression = LinearRegression = None k = key.lower() if problem_type == 'Classification': if 'randomforest' in k: return RandomForestClassifier(n_estimators=DEFAULT_N_ESTIMATORS, random_state=DEFAULT_RANDOM_STATE, n_jobs=-1) if 'xgb' in k or 'xgboost' in k: return XGBClassifier(use_label_encoder=False, eval_metric='logloss', random_state=DEFAULT_RANDOM_STATE, n_estimators=DEFAULT_N_ESTIMATORS) if XGB_AVAILABLE else RandomForestClassifier(n_estimators=DEFAULT_N_ESTIMATORS, random_state=DEFAULT_RANDOM_STATE, n_jobs=-1) if 'lgbm' in k or 'lightgbm' in k: return LGBMClassifier(n_estimators=DEFAULT_N_ESTIMATORS, random_state=DEFAULT_RANDOM_STATE) if LGBM_AVAILABLE else RandomForestClassifier(n_estimators=DEFAULT_N_ESTIMATORS, random_state=DEFAULT_RANDOM_STATE, n_jobs=-1) if 'catboost' in k or 'cat' in k: return CatBoostClassifier(verbose=0, random_seed=DEFAULT_RANDOM_STATE) if CATBOOST_AVAILABLE else RandomForestClassifier(n_estimators=DEFAULT_N_ESTIMATORS, random_state=DEFAULT_RANDOM_STATE, n_jobs=-1) if 'knn' in k: return KNeighborsClassifier() if 'naive' in k or 'bayes' in k: return GaussianNB() if 'logistic' in k or 'linear' in k: return LogisticRegression(max_iter=4000) if 'svc' in k or 'svm' in k: return SVC(probability=True) return RandomForestClassifier(n_estimators=DEFAULT_N_ESTIMATORS, random_state=DEFAULT_RANDOM_STATE, n_jobs=-1) else: # Regression if 'randomforest' in k: return RandomForestRegressor(n_estimators=DEFAULT_N_ESTIMATORS, random_state=DEFAULT_RANDOM_STATE, n_jobs=-1) if 'xgb' in k or 'xgboost' in k: return XGBRegressor(random_state=DEFAULT_RANDOM_STATE, n_estimators=DEFAULT_N_ESTIMATORS) if XGB_AVAILABLE else RandomForestRegressor(n_estimators=DEFAULT_N_ESTIMATORS, random_state=DEFAULT_RANDOM_STATE, n_jobs=-1) if 'lgbm' in k or 'lightgbm' in k: return LGBMRegressor(n_estimators=DEFAULT_N_ESTIMATORS, random_state=DEFAULT_RANDOM_STATE) if LGBM_AVAILABLE else RandomForestRegressor(n_estimators=DEFAULT_N_ESTIMATORS, random_state=DEFAULT_RANDOM_STATE, n_jobs=-1) if 'catboost' in k or 'cat' in k: return CatBoostRegressor(verbose=0, random_seed=DEFAULT_RANDOM_STATE) if CATBOOST_AVAILABLE else RandomForestRegressor(n_estimators=DEFAULT_N_ESTIMATORS, random_state=DEFAULT_RANDOM_STATE, n_jobs=-1) if 'knn' in k: return KNeighborsRegressor() if 'linear' in k: return LinearRegression() return RandomForestRegressor(n_estimators=DEFAULT_N_ESTIMATORS, random_state=DEFAULT_RANDOM_STATE, n_jobs=-1) def generate_code_snippet(preproc, model, problem_type: str, target: str) -> str: """ Generate a reproducible Python snippet for training & inference """ snippet = f"""# Auto-generated training snippet import pandas as pd import joblib from sklearn.model_selection import train_test_split # Load your dataset df = pd.read_csv("your_dataset.csv") X = df.drop(columns=["{target}"]) y = df["{target}"] # Replace this with the preprocessor and model used in the app # pipeline = joblib.load("pipeline_model.joblib") # Split data X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42) # Train the pipeline # pipeline.fit(X_train, y_train) # Make predictions # preds = pipeline.predict(X_test) # Save the trained pipeline # joblib.dump(pipeline, "pipeline_model.joblib") """ return snippet # new add def log_change(action: str, details: str = "") -> None: """ Append a change to the change log with timestamp """ ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S") if "change_log" not in st.session_state: st.session_state["change_log"] = [] st.session_state["change_log"].append(f"[{ts}] {action} {details}".strip()) def get_numeric_columns(df: pd.DataFrame) -> List[str]: """ Get list of numeric columns from a DataFrame """ return df.select_dtypes(include=[np.number]).columns.tolist() def get_categorical_columns(df: pd.DataFrame) -> List[str]: """ Get list of categorical columns from a DataFrame """ return df.select_dtypes(include=['object', 'category', 'bool']).columns.tolist() def get_datetime_columns(df: pd.DataFrame) -> List[str]: """ Get list of datetime columns from a DataFrame """ return [c for c in df.columns if "date" in c.lower() or "time" in c.lower() or pd.api.types.is_datetime64_any_dtype(df[c])] def safe_convert_type(series: pd.Series, target_type: str) -> pd.Series: """ Safely convert a pandas series to a target type """ try: if target_type == "datetime64[ns]": return pd.to_datetime(series, errors="coerce") elif target_type in ["bool", "boolean"]: mapping = {"true": True, "false": False, "yes": True, "no": False, "1": True, "0": False} return series.astype(str).str.lower().map(mapping).astype(target_type) elif target_type in ["Int64", "Float64"]: return pd.to_numeric(series, errors="coerce").astype(target_type) elif target_type == "category": return series.astype("category") elif target_type == "string": return series.astype("string") else: return series.astype(target_type, errors="raise") except Exception as e: logger.warning(f"Failed to convert series to {target_type}: {e}") return series def calculate_data_health_score(df: pd.DataFrame) -> Dict: """ Calculate a comprehensive data health score for a DataFrame """ # Lightweight caching: avoid recomputing across tab switches when data unchanged try: key = ( len(df), df.shape[1], int(df.isnull().sum().sum()), int(df.duplicated().sum()), int(df.memory_usage(deep=True).sum()) ) cached = st.session_state.get('cached_data_health') if cached and isinstance(cached, dict) and cached.get('key') == key: return cached.get('value') except Exception: # Fallback to normal computation if any quick metric fails pass total_cols = len(df.columns) total_cells = len(df) * total_cols missing_cells = df.isnull().sum().sum() completeness = max(0, 100 - (missing_cells / total_cells * 100)) consistency = 100 obj_cols = get_categorical_columns(df) for col in obj_cols: series = df[col].astype(str) if series.str.contains(r'^\s+|\s+$', regex=True).any(): consistency -= 3 if series.str.contains(r'[A-Z]{2,}').any() and series.str.contains(r'[a-z]').any(): consistency -= 2 if series.str.contains(r'[$€£¥,]', regex=True).any(): consistency -= 4 accuracy = 100 num_cols = get_numeric_columns(df) for col in num_cols: col_lower = col.lower() if any(k in col_lower for k in ['price', 'cost', 'amount', 'fee', 'age', 'year']): neg_count = (df[col] < 0).sum() if neg_count > 0: accuracy -= min(15, (neg_count / len(df)) * 100) if 'age' in col_lower: impossible = (df[col] > 150).sum() if impossible > 0: accuracy -= min(10, (impossible / len(df)) * 100) uniqueness = 100 for col in df.columns: ratio = df[col].nunique() / len(df) if ratio > 0.95 and df[col].dtype == 'object': uniqueness -= 5 timeliness = 100 date_cols = get_datetime_columns(df) latest_date = None if date_cols: try: d = pd.to_datetime(df[date_cols[0]], errors='coerce').dropna() if len(d) > 0: latest_date = d.max() days_old = (pd.Timestamp.now() - latest_date).days if days_old > 365: timeliness -= 20 elif days_old > 180: timeliness -= 10 except Exception as e: logger.debug(f"Could not parse date column {date_cols[0]}: {e}") weights = {"completeness": 0.3, "consistency": 0.25, "accuracy": 0.25, "uniqueness": 0.1, "timeliness": 0.1} final_score = ( completeness * weights["completeness"] + consistency * weights["consistency"] + accuracy * weights["accuracy"] + uniqueness * weights["uniqueness"] + timeliness * weights["timeliness"] ) result = { "final_score": round(final_score, 1), "details": { "completeness": round(completeness, 1), "consistency": round(consistency, 1), "accuracy": round(accuracy, 1), "uniqueness": round(uniqueness, 1), "timeliness": round(timeliness, 1), }, "weights": weights, "metrics": { "total_rows": len(df), "total_cols": total_cols, "missing_cells": missing_cells, "missing_pct": round(missing_cells / total_cells * 100, 2), "duplicate_rows": int(df.duplicated().sum()), "date_col": date_cols[0] if date_cols else None, "latest_date": latest_date.isoformat() if latest_date else None, } } # Store in session cache (key computed earlier) when possible try: st.session_state['cached_data_health'] = {'key': key, 'value': result} except Exception: pass return result def generate_recommendation_list(df: pd.DataFrame, scorecard: Dict) -> List[str]: """ Generate a list of recommendations based on data health score """ recs = [] if scorecard["details"]["completeness"] < 85: recs.append("🔴 High missing values detected — Use **Clean → Fill Mode** to impute or drop columns.") if scorecard["details"]["consistency"] < 90: recs.append("🟡 Inconsistent text formats — Use **Clean → Convert** to standardize currency, casing, or spacing.") if scorecard["details"]["accuracy"] < 80: recs.append("🔴 Invalid values found — Check for negative prices, impossible ages, or illogical entries.") if scorecard["details"]["uniqueness"] < 90: recs.append("💡 High-cardinality categories — Consider binning or encoding for ML pipelines.") if scorecard["details"]["timeliness"] < 80: date_col = scorecard["metrics"].get("date_col") if date_col: recs.append(f"⏳ Dataset appears outdated — Last update: {scorecard['metrics']['latest_date']}. Refresh source?") num_cols = get_numeric_columns(df) for col in num_cols: Q1 = df[col].quantile(0.25) Q3 = df[col].quantile(0.75) IQR = Q3 - Q1 lower_bound = Q1 - 1.5 * IQR upper_bound = Q3 + 1.5 * IQR outliers = df[(df[col] < lower_bound) | (df[col] > upper_bound)] if len(outliers) > len(df) * 0.05: recs.append(f"⚠️ High outliers in `{col}` — Use **Review → Outlier Analysis** to investigate.") return recs