import pandas as pd import numpy as np from imblearn.over_sampling import SMOTE, RandomOverSampler from imblearn.under_sampling import RandomUnderSampler from imblearn.combine import SMOTEENN from sklearn.feature_selection import VarianceThreshold, SelectKBest, f_classif, f_regression, mutual_info_classif, mutual_info_regression from sklearn.model_selection import train_test_split from scipy import stats import sys def analyze_dataset(df: pd.DataFrame): """Returns comprehensive analysis of the dataset for EDA dashboard.""" # Column type classification numeric_cols = list(df.select_dtypes(include=[np.number]).columns) categorical_cols = list(df.select_dtypes(include=['object', 'category']).columns) datetime_cols = list(df.select_dtypes(include=['datetime', 'datetimetz']).columns) boolean_cols = list(df.select_dtypes(include=['bool']).columns) # Preview data (first 100 rows) preview_df = df.head(100).copy() # Convert any non-serializable types for col in preview_df.columns: if preview_df[col].dtype == 'object': preview_df[col] = preview_df[col].astype(str) elif pd.api.types.is_datetime64_any_dtype(preview_df[col]): preview_df[col] = preview_df[col].astype(str) # Replace all np.nan with None for JSON serialization preview_df = preview_df.replace({np.nan: None}) preview_data = preview_df.to_dict(orient='records') # Missing values missing_per_col = df.isnull().sum().to_dict() missing_pct_per_col = (df.isnull().sum() / len(df) * 100).round(2).to_dict() total_missing = int(df.isnull().sum().sum()) total_missing_pct = round(total_missing / (df.shape[0] * df.shape[1]) * 100, 2) if df.shape[0] * df.shape[1] > 0 else 0 # Duplicate rows duplicate_count = int(df.duplicated().sum()) # Memory usage memory_bytes = int(df.memory_usage(deep=True).sum()) if memory_bytes > 1024 * 1024: memory_str = f"{memory_bytes / (1024*1024):.2f} MB" else: memory_str = f"{memory_bytes / 1024:.2f} KB" # Distributions for categorical columns (bar charts) distributions = {} for col in df.columns: nunique = df[col].nunique(dropna=True) if nunique <= 30 and nunique > 0: counts = df[col].value_counts(dropna=False).head(30) items = [] for k, v in counts.items(): label = 'Missing/NaN' if pd.isna(k) else str(k) items.append({"name": label, "count": int(v)}) distributions[col] = items # Histogram data for numeric columns histograms = {} for col in numeric_cols: col_data = df[col].dropna() if len(col_data) > 0: try: counts_arr, bin_edges = np.histogram(col_data, bins=min(30, max(10, len(col_data)//10))) histograms[col] = { "counts": counts_arr.tolist(), "bin_edges": bin_edges.tolist() } except Exception: pass # Box plot data for numeric columns boxplots = {} for col in numeric_cols: col_data = df[col].dropna() if len(col_data) > 0: try: q1 = float(col_data.quantile(0.25)) q2 = float(col_data.quantile(0.5)) q3 = float(col_data.quantile(0.75)) iqr = q3 - q1 whisker_low = float(col_data[col_data >= q1 - 1.5 * iqr].min()) whisker_high = float(col_data[col_data <= q3 + 1.5 * iqr].max()) outliers = col_data[(col_data < q1 - 1.5 * iqr) | (col_data > q3 + 1.5 * iqr)].tolist() boxplots[col] = { "q1": q1, "median": q2, "q3": q3, "whisker_low": whisker_low, "whisker_high": whisker_high, "outliers": outliers[:100], # limit outliers for payload "outlier_count": len(outliers), "mean": float(col_data.mean()), "std": float(col_data.std()) } except Exception: pass # Correlation matrix for numeric columns correlation = {} if len(numeric_cols) >= 2: try: corr_matrix = df[numeric_cols].corr() correlation = { "columns": list(corr_matrix.columns), "values": corr_matrix.fillna(0).values.tolist() } except Exception: pass # Data quality insights insights = [] # Missing value insights cols_with_missing = {col: cnt for col, cnt in missing_per_col.items() if cnt > 0} if cols_with_missing: high_missing = [col for col, cnt in cols_with_missing.items() if cnt / len(df) > 0.3] insights.append({ "type": "warning", "title": f"Missing Values Detected ({len(cols_with_missing)} columns)", "detail": f"Total {total_missing} missing values across the dataset ({total_missing_pct}%)." }) if high_missing: insights.append({ "type": "danger", "title": f"High Missing Rate (>30%)", "detail": f"Columns with >30% missing: {', '.join(high_missing)}. Consider dropping or imputing." }) else: insights.append({ "type": "success", "title": "No Missing Values", "detail": "The dataset has no missing values." }) # Duplicate insights if duplicate_count > 0: insights.append({ "type": "warning", "title": f"{duplicate_count} Duplicate Rows Found", "detail": f"{duplicate_count} duplicate rows detected ({duplicate_count/len(df)*100:.1f}% of dataset)." }) # Skewness insights for numeric columns for col in numeric_cols: col_data = df[col].dropna() if len(col_data) > 10: try: skew_val = float(col_data.skew()) if abs(skew_val) > 2: insights.append({ "type": "info", "title": f"Highly Skewed: {col}", "detail": f"Skewness = {skew_val:.2f}. Consider log/sqrt transform." }) except Exception: pass # Near-zero variance for col in numeric_cols: col_data = df[col].dropna() if len(col_data) > 1: try: if col_data.std() < 1e-10: insights.append({ "type": "info", "title": f"Near-Zero Variance: {col}", "detail": f"Column '{col}' has near-zero variance. May not be useful for modeling." }) except Exception: pass # High cardinality categorical for col in categorical_cols: nunique = df[col].nunique() if nunique > 50: insights.append({ "type": "info", "title": f"High Cardinality: {col}", "detail": f"Column '{col}' has {nunique} unique values. One-hot encoding may be expensive." }) # Class imbalance check (for potential target columns with small unique count) for col in df.columns: nunique = df[col].nunique(dropna=True) if 2 <= nunique <= 10: counts = df[col].value_counts() ratio = counts.min() / counts.max() if ratio < 0.3: insights.append({ "type": "warning", "title": f"Potential Class Imbalance: {col}", "detail": f"Minority/majority ratio = {ratio:.2f}. Consider resampling if this is your target." }) # Summary statistics summary_stats = {} desc = df.describe(include='all') for col in df.columns: col_stats = {} if col in desc.columns: for stat_name in desc.index: val = desc.at[stat_name, col] if pd.isna(val): col_stats[stat_name] = None elif isinstance(val, (np.integer, np.floating)): col_stats[stat_name] = float(val) else: col_stats[stat_name] = str(val) summary_stats[col] = col_stats # V2 EDA: Missing Pattern Matrix (downsampled for heatmap) max_missing_heat_rows = 500 missing_sample_df = df.sample(n=min(len(df), max_missing_heat_rows), random_state=42) if len(df) > max_missing_heat_rows else df missing_matrix = missing_sample_df.isnull().values.astype(int).tolist() # V2 EDA: Downsampled numeric data for Pair / Violin plots max_numeric_rows = 500 numeric_sample_df = df[numeric_cols].copy() if len(numeric_sample_df) > max_numeric_rows: numeric_sample_df = numeric_sample_df.sample(n=max_numeric_rows, random_state=42) numeric_sample = {} for col in numeric_sample_df.columns: numeric_sample[col] = [None if pd.isna(x) else float(x) for x in numeric_sample_df[col].tolist()] analysis = { "shape": list(df.shape), "columns": list(df.columns), "dtypes": {col: str(dtype) for col, dtype in df.dtypes.items()}, "column_types": { "numeric": numeric_cols, "categorical": categorical_cols, "datetime": datetime_cols, "boolean": boolean_cols, }, "missing_values": missing_per_col, "missing_pct": missing_pct_per_col, "total_missing": total_missing, "total_missing_pct": total_missing_pct, "duplicate_count": duplicate_count, "memory_usage": memory_str, "preview_data": preview_data, "distributions": distributions, "histograms": histograms, "boxplots": boxplots, "correlation": correlation, "insights": insights, "summary_statistics": summary_stats, "missing_matrix": missing_matrix, "missing_matrix_columns": list(df.columns), "numeric_sample": numeric_sample, } return analysis def preprocess_missing_values(df: pd.DataFrame, strategy: str = "mean", columns=None): """Handle missing values. Supports: drop, drop_cols, mean, median, mode, constant.""" processed_df = df.copy() target_cols = columns if (columns and len(columns) > 0) else list(processed_df.columns) if strategy == "drop": processed_df = processed_df.dropna(subset=target_cols) elif strategy == "drop_cols": cols_to_drop = [c for c in target_cols if c in processed_df.columns and processed_df[c].isnull().any()] processed_df = processed_df.drop(columns=cols_to_drop) else: for col in target_cols: if col not in processed_df.columns: continue if processed_df[col].isnull().any(): is_numeric = pd.api.types.is_numeric_dtype(processed_df[col]) fill_val = None if not is_numeric: # Non-numeric always uses mode m = processed_df[col].mode() fill_val = m.iat[0] if not m.empty else "Missing" else: if strategy == "mean": fill_val = processed_df[col].mean() elif strategy == "median": fill_val = processed_df[col].median() elif strategy == "mode": m = processed_df[col].mode() fill_val = m.iat[0] if not m.empty else 0 elif strategy == "constant": fill_val = 0 if fill_val is not None: processed_df[col] = processed_df[col].fillna(fill_val) else: # Final fallback processed_df[col] = processed_df[col].fillna(0 if is_numeric else "Missing") return processed_df def handle_duplicates(df: pd.DataFrame, keep: str = "first"): """Handle duplicate rows. keep: first, last, or false (remove all).""" if keep == "false": return df.drop_duplicates(keep=False) return df.drop_duplicates(keep=keep) def remove_outliers(df: pd.DataFrame, method: str = "iqr", threshold: float = 1.5, treatment: str = "remove", columns=None): """Handle outliers: remove rows, or cap/floor values.""" processed = df.copy() numeric_cols = columns if columns else list(df.select_dtypes(include=[np.number]).columns) numeric_cols = [c for c in numeric_cols if c in processed.columns and np.issubdtype(processed[c].dtype, np.number)] if len(numeric_cols) == 0: return processed if method == "zscore": for col in numeric_cols: col_data = processed[col].dropna() if len(col_data) < 3: continue z = np.abs(stats.zscore(col_data)) outlier_idx = col_data.index[z >= threshold] if treatment == "remove": processed = processed.drop(index=outlier_idx) elif treatment == "cap": mean, std = col_data.mean(), col_data.std() lower, upper = mean - threshold * std, mean + threshold * std processed[col] = processed[col].clip(lower, upper) elif treatment == "null": processed.loc[outlier_idx, col] = np.nan elif method == "iqr": for col in numeric_cols: col_data = processed[col].dropna() if len(col_data) < 4: continue Q1, Q3 = col_data.quantile(0.25), col_data.quantile(0.75) IQR = Q3 - Q1 lower, upper = Q1 - threshold * IQR, Q3 + threshold * IQR outlier_mask = (processed[col] < lower) | (processed[col] > upper) if treatment == "remove": processed = processed[~outlier_mask | processed[col].isna()] elif treatment == "cap": processed[col] = processed[col].clip(lower, upper) elif treatment == "null": processed.loc[outlier_mask, col] = np.nan return processed.reset_index(drop=True) def clean_data_types(df: pd.DataFrame, column: str, sub_action: str = "trim"): """Clean data types for a specific column.""" if column not in df.columns: raise ValueError(f"Column '{column}' not found.") processed = df.copy() if sub_action == "trim": if processed[column].dtype == 'object': processed[column] = processed[column].str.strip() elif sub_action == "lowercase": if processed[column].dtype == 'object': processed[column] = processed[column].str.lower() elif sub_action == "uppercase": if processed[column].dtype == 'object': processed[column] = processed[column].str.upper() elif sub_action == "to_numeric": processed[column] = pd.to_numeric(processed[column], errors='coerce') elif sub_action == "to_datetime": processed[column] = pd.to_datetime(processed[column], errors='coerce') elif sub_action == "remove_special": if processed[column].dtype == 'object': processed[column] = processed[column].str.replace(r'[^a-zA-Z0-9\s]', '', regex=True) return processed def encode_categorical(df: pd.DataFrame, column: str, method: str = "label", target_column: str = None): """Encode a categorical column. Methods: label, onehot, ordinal, frequency, target.""" if column not in df.columns: raise ValueError(f"Column '{column}' not found.") processed = df.copy() if method == "label": processed[column] = processed[column].astype('category').cat.codes elif method == "onehot": dummies = pd.get_dummies(processed[column], prefix=column, drop_first=False) processed = pd.concat([processed.drop(columns=[column]), dummies], axis=1) elif method == "ordinal": # Same as label for now; user-defined order would need a UI parameter processed[column] = processed[column].astype('category').cat.codes elif method == "frequency": freq = processed[column].value_counts(normalize=True).to_dict() processed[column] = processed[column].map(freq) elif method == "target": if not target_column or target_column not in processed.columns: raise ValueError("Target encoding requires a selected target column.") target_series = processed[target_column] if not pd.api.types.is_numeric_dtype(target_series): target_series = target_series.astype('category').cat.codes target_mean = target_series.groupby(processed[column]).mean() processed[column] = processed[column].map(target_mean).fillna(target_series.mean()) return processed def scale_features(df: pd.DataFrame, columns, method: str = "standard"): """Scale numeric columns. Methods: standard, minmax, robust, log, sqrt.""" processed = df.copy() if columns is None: columns = list(df.select_dtypes(include=[np.number]).columns) columns = [c for c in columns if c in processed.columns and np.issubdtype(processed[c].dtype, np.number)] if method == "standard": for col in columns: mean = processed[col].mean() std = processed[col].std() if std > 0: processed[col] = (processed[col] - mean) / std elif method == "minmax": for col in columns: mn = processed[col].min() mx = processed[col].max() if mx - mn > 0: processed[col] = (processed[col] - mn) / (mx - mn) elif method == "robust": for col in columns: med = processed[col].median() Q1 = processed[col].quantile(0.25) Q3 = processed[col].quantile(0.75) iqr = Q3 - Q1 if iqr > 0: processed[col] = (processed[col] - med) / iqr elif method == "log": for col in columns: if (processed[col].dropna() > 0).all(): processed[col] = np.log(processed[col]) else: # log1p for columns with zeros if (processed[col].dropna() >= 0).all(): processed[col] = np.log1p(processed[col]) else: raise ValueError(f"Cannot apply log transform on column '{col}' with negative values.") elif method == "sqrt": for col in columns: if (processed[col].dropna() >= 0).all(): processed[col] = np.sqrt(processed[col]) else: raise ValueError(f"Cannot apply sqrt transform on column '{col}' with negative values.") elif method == "boxcox": for col in columns: if (processed[col].dropna() > 0).all(): processed[col] = processed[col].copy() processed.loc[processed[col].notnull(), col], _ = stats.boxcox(processed[col].dropna()) else: raise ValueError(f"Box-Cox transform requires strictly positive values (column '{col}' fails).") elif method == "yeojohnson": for col in columns: processed[col] = processed[col].copy() processed.loc[processed[col].notnull(), col], _ = stats.yeojohnson(processed[col].dropna()) return processed def solve_imbalance(df: pd.DataFrame, target_col: str, method: str = "smote"): """Handle imbalanced dataset using specified method.""" if target_col not in df.columns: raise ValueError(f"Target column {target_col} not found in dataset.") current_counts = df[target_col].value_counts() if len(current_counts) < 2: return df X = df.drop(columns=[target_col]) y = df[target_col] X_numeric = X.copy() for col in X_numeric.select_dtypes(include=['object', 'category']).columns: X_numeric[col] = X_numeric[col].astype('category').cat.codes if X_numeric.isnull().any().any(): raise ValueError(f"Cannot apply {method}. Please handle missing values first.") if method == "smote": sampler = SMOTE(random_state=42) elif method == "random_over": sampler = RandomOverSampler(random_state=42) elif method == "random_under": sampler = RandomUnderSampler(random_state=42) elif method == "smote_enn": sampler = SMOTEENN(random_state=42) else: raise ValueError(f"Unknown imbalance method: {method}") try: X_resampled, y_resampled = sampler.fit_resample(X_numeric, y) except ValueError as e: raise ValueError(f"Imbalance sampling failed. Minority class may be too small. Details: {str(e)}") resampled_df = pd.DataFrame(X_resampled, columns=X.columns) resampled_df[target_col] = y_resampled return resampled_df def feature_selection(df: pd.DataFrame, method: str = "variance", threshold: float = 0.0, target_col: str = None, k: int = 10): """Select features using variance threshold, correlation, select k-best, or mutual info.""" processed = df.copy() numeric_cols = list(processed.select_dtypes(include=[np.number]).columns) if len(numeric_cols) == 0: return processed if method == "variance": var_threshold = VarianceThreshold(threshold=threshold) X = processed[numeric_cols] X_filled = X.fillna(X.mean()).fillna(0) var_threshold.fit(X_filled) cols_to_keep = [numeric_cols[i] for i in range(len(numeric_cols)) if var_threshold.variances_[i] > threshold] cols_to_drop = [c for c in numeric_cols if c not in cols_to_keep] if target_col and target_col in cols_to_drop: cols_to_drop.remove(target_col) if cols_to_drop: processed = processed.drop(columns=cols_to_drop) elif method == "correlation": corr_matrix = processed[numeric_cols].corr().abs() upper = corr_matrix.where(np.triu(np.ones(corr_matrix.shape), k=1).astype(bool)) to_drop = [column for column in upper.columns if any(upper[column] > threshold)] if target_col and target_col in to_drop: to_drop.remove(target_col) if to_drop: processed = processed.drop(columns=to_drop) elif method in ["kbest", "mutual_info"]: if not target_col or target_col not in processed.columns: raise ValueError(f"{method} requires a target column.") X = processed[numeric_cols] if target_col in X.columns: X = X.drop(columns=[target_col]) if X.shape[1] == 0: return processed X_filled = X.fillna(X.mean()).fillna(0) y = processed[target_col] is_classification = not pd.api.types.is_numeric_dtype(y) or y.nunique() <= 20 y_encoded = y.astype('category').cat.codes if is_classification else y actual_k = min(k, X.shape[1]) if method == "kbest": score_func = f_classif if is_classification else f_regression else: score_func = mutual_info_classif if is_classification else mutual_info_regression selector = SelectKBest(score_func=score_func, k=actual_k) selector.fit(X_filled, y_encoded) cols_to_keep = X.columns[selector.get_support()].tolist() cols_to_drop = [c for c in X.columns if c not in cols_to_keep] if cols_to_drop: processed = processed.drop(columns=cols_to_drop) return processed def split_dataset(df: pd.DataFrame, test_size: float = 0.2, stratify_col: str = None, random_state: int = 42, split_name: str = "_split_"): """Split dataset into Train and Test, appending a new _split_ column.""" processed = df.copy() stratify_data = None if stratify_col and stratify_col in processed.columns: stratify_data = processed[stratify_col] try: train_idx, test_idx = train_test_split( processed.index, test_size=test_size, random_state=random_state, stratify=stratify_data ) except ValueError as e: raise ValueError(f"Failed to split dataset. Adjust ratio or check class counts if stratifying. Error: {str(e)}") processed[split_name] = "Train" processed.loc[test_idx, split_name] = "Test" return processed def drop_columns(df: pd.DataFrame, columns: list): """Drop specified columns from the dataset.""" if not columns: return df # Filter to avoid KeyError if column doesn't exist cols_to_drop = [c for c in columns if c in df.columns] return df.drop(columns=cols_to_drop)