Spaces:
Running
Running
| """ | |
| Advanced Analysis Tools | |
| Tools for EDA, model diagnostics, anomaly detection, multicollinearity, and statistical tests. | |
| """ | |
| import polars as pl | |
| import numpy as np | |
| from typing import Dict, Any, List, Optional, Tuple | |
| from pathlib import Path | |
| import sys | |
| import os | |
| import warnings | |
| import json | |
| warnings.filterwarnings('ignore') | |
| # Add parent directory to path for imports | |
| sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) | |
| from sklearn.ensemble import IsolationForest | |
| from sklearn.neighbors import LocalOutlierFactor | |
| from sklearn.model_selection import learning_curve | |
| from scipy import stats | |
| from scipy.stats import chi2_contingency, f_oneway, ttest_ind, pearsonr | |
| from statsmodels.stats.outliers_influence import variance_inflation_factor | |
| import plotly.graph_objects as go | |
| import plotly.express as px | |
| from plotly.subplots import make_subplots | |
| import pandas as pd | |
| from ..utils.polars_helpers import ( | |
| load_dataframe, get_numeric_columns, get_categorical_columns | |
| ) | |
| from ..utils.validation import ( | |
| validate_file_exists, validate_file_format, validate_dataframe, | |
| validate_column_exists | |
| ) | |
| def perform_eda_analysis( | |
| file_path: str, | |
| target_col: Optional[str] = None, | |
| output_html: Optional[str] = None | |
| ) -> Dict[str, Any]: | |
| """ | |
| Perform comprehensive automated Exploratory Data Analysis with interactive visualizations. | |
| Args: | |
| file_path: Path to dataset | |
| target_col: Target column for supervised analysis | |
| output_html: Path to save HTML report | |
| Returns: | |
| Dictionary with EDA insights and statistics | |
| """ | |
| # Validation | |
| validate_file_exists(file_path) | |
| validate_file_format(file_path) | |
| # Load data | |
| df = load_dataframe(file_path) | |
| validate_dataframe(df) | |
| if target_col: | |
| validate_column_exists(df, target_col) | |
| print("📊 Performing comprehensive EDA...") | |
| # Basic statistics | |
| n_rows, n_cols = df.shape | |
| numeric_cols = get_numeric_columns(df) | |
| categorical_cols = get_categorical_columns(df) | |
| # Missing values analysis | |
| missing_stats = {} | |
| for col in df.columns: | |
| null_count = df[col].null_count() | |
| if null_count > 0: | |
| missing_stats[col] = { | |
| 'count': null_count, | |
| 'percentage': float(null_count / n_rows * 100) | |
| } | |
| # Univariate analysis for numeric columns | |
| numeric_stats = {} | |
| for col in numeric_cols[:20]: # Limit to 20 columns | |
| col_data = df[col].drop_nulls().to_numpy() | |
| numeric_stats[col] = { | |
| 'mean': float(np.mean(col_data)), | |
| 'median': float(np.median(col_data)), | |
| 'std': float(np.std(col_data)), | |
| 'min': float(np.min(col_data)), | |
| 'max': float(np.max(col_data)), | |
| 'q25': float(np.percentile(col_data, 25)), | |
| 'q75': float(np.percentile(col_data, 75)), | |
| 'skewness': float(stats.skew(col_data)), | |
| 'kurtosis': float(stats.kurtosis(col_data)) | |
| } | |
| # Categorical analysis | |
| categorical_stats = {} | |
| for col in categorical_cols[:10]: # Limit to 10 columns | |
| value_counts = df[col].value_counts().head(10) | |
| categorical_stats[col] = { | |
| 'unique_values': df[col].n_unique(), | |
| 'mode': df[col].mode()[0] if len(df[col].mode()) > 0 else None, | |
| 'top_10_values': {str(row[col]): row['count'] for row in value_counts.to_dicts()} | |
| } | |
| # Correlation analysis (numeric only) | |
| correlations = {} | |
| if len(numeric_cols) > 1: | |
| corr_matrix = df[numeric_cols[:20]].to_pandas().corr() | |
| # Find highly correlated pairs | |
| high_corr_pairs = [] | |
| for i in range(len(corr_matrix.columns)): | |
| for j in range(i+1, len(corr_matrix.columns)): | |
| corr_val = corr_matrix.iloc[i, j] | |
| if abs(corr_val) > 0.7: | |
| high_corr_pairs.append({ | |
| 'feature_1': corr_matrix.columns[i], | |
| 'feature_2': corr_matrix.columns[j], | |
| 'correlation': float(corr_val) | |
| }) | |
| correlations['high_correlations'] = high_corr_pairs | |
| correlations['correlation_matrix_shape'] = corr_matrix.shape | |
| # Target analysis | |
| target_insights = {} | |
| if target_col: | |
| if target_col in numeric_cols: | |
| # Numeric target - regression | |
| target_data = df[target_col].drop_nulls().to_numpy() | |
| target_insights = { | |
| 'type': 'regression', | |
| 'mean': float(np.mean(target_data)), | |
| 'std': float(np.std(target_data)), | |
| 'min': float(np.min(target_data)), | |
| 'max': float(np.max(target_data)) | |
| } | |
| # Feature-target correlations | |
| target_corr = {} | |
| for col in numeric_cols: | |
| if col != target_col: | |
| try: | |
| corr, pval = pearsonr( | |
| df[col].drop_nulls().to_numpy(), | |
| df[target_col].drop_nulls().to_numpy() | |
| ) | |
| if abs(corr) > 0.3: | |
| target_corr[col] = { | |
| 'correlation': float(corr), | |
| 'p_value': float(pval) | |
| } | |
| except: | |
| pass | |
| target_insights['correlated_features'] = target_corr | |
| else: | |
| # Categorical target - classification | |
| value_counts = df[target_col].value_counts() | |
| target_insights = { | |
| 'type': 'classification', | |
| 'classes': len(value_counts), | |
| 'distribution': {str(row[target_col]): row['count'] for row in value_counts.to_dicts()}, | |
| 'imbalance_ratio': float(value_counts['count'].max() / value_counts['count'].min()) | |
| } | |
| # Create visualizations if output_html requested | |
| if output_html: | |
| print("📈 Generating interactive visualizations...") | |
| fig = make_subplots( | |
| rows=3, cols=2, | |
| subplot_titles=('Distribution of Numeric Features', 'Missing Values', | |
| 'Correlation Heatmap', 'Target Distribution', | |
| 'Outliers Detection', 'Feature Importance') | |
| ) | |
| # Distribution plot (first numeric column) | |
| if numeric_cols: | |
| col = numeric_cols[0] | |
| fig.add_trace( | |
| go.Histogram(x=df[col].to_list(), name=col), | |
| row=1, col=1 | |
| ) | |
| # Missing values plot | |
| if missing_stats: | |
| missing_cols = list(missing_stats.keys())[:10] | |
| missing_pcts = [missing_stats[col]['percentage'] for col in missing_cols] | |
| fig.add_trace( | |
| go.Bar(x=missing_cols, y=missing_pcts, name='Missing %'), | |
| row=1, col=2 | |
| ) | |
| # Correlation heatmap | |
| if len(numeric_cols) > 1: | |
| corr_matrix_np = corr_matrix.values | |
| fig.add_trace( | |
| go.Heatmap( | |
| z=corr_matrix_np, | |
| x=corr_matrix.columns.tolist(), | |
| y=corr_matrix.columns.tolist(), | |
| colorscale='RdBu' | |
| ), | |
| row=2, col=1 | |
| ) | |
| # Target distribution | |
| if target_col and target_col in categorical_cols: | |
| target_counts = df[target_col].value_counts() | |
| fig.add_trace( | |
| go.Bar( | |
| x=[str(row[target_col]) for row in target_counts.to_dicts()], | |
| y=[row['count'] for row in target_counts.to_dicts()], | |
| name='Target' | |
| ), | |
| row=2, col=2 | |
| ) | |
| fig.update_layout(height=1200, showlegend=False, title_text="Automated EDA Report") | |
| # Save HTML | |
| os.makedirs(os.path.dirname(output_html) if os.path.dirname(output_html) else '.', exist_ok=True) | |
| fig.write_html(output_html) | |
| print(f"💾 EDA report saved to: {output_html}") | |
| return { | |
| 'status': 'success', | |
| 'dataset_shape': {'rows': n_rows, 'columns': n_cols}, | |
| 'column_types': { | |
| 'numeric': len(numeric_cols), | |
| 'categorical': len(categorical_cols) | |
| }, | |
| 'missing_values': missing_stats, | |
| 'numeric_statistics': numeric_stats, | |
| 'categorical_statistics': categorical_stats, | |
| 'correlations': correlations, | |
| 'target_insights': target_insights, | |
| 'output_html': output_html | |
| } | |
| def detect_model_issues( | |
| model_path: str, | |
| train_data_path: str, | |
| test_data_path: str, | |
| target_col: str | |
| ) -> Dict[str, Any]: | |
| """ | |
| Detect overfitting, underfitting, and other model issues using learning curves and diagnostics. | |
| Args: | |
| model_path: Path to trained model (.pkl) | |
| train_data_path: Path to training dataset | |
| test_data_path: Path to test dataset | |
| target_col: Target column name | |
| Returns: | |
| Dictionary with model diagnostics | |
| """ | |
| import joblib | |
| from sklearn.metrics import accuracy_score, mean_squared_error, r2_score | |
| # Validation | |
| validate_file_exists(model_path) | |
| validate_file_exists(train_data_path) | |
| validate_file_exists(test_data_path) | |
| # Load model | |
| model = joblib.load(model_path) | |
| # Load data | |
| train_df = load_dataframe(train_data_path) | |
| test_df = load_dataframe(test_data_path) | |
| validate_column_exists(train_df, target_col) | |
| validate_column_exists(test_df, target_col) | |
| # Prepare data | |
| from utils.polars_helpers import split_features_target | |
| X_train, y_train = split_features_target(train_df, target_col) | |
| X_test, y_test = split_features_target(test_df, target_col) | |
| print("🔍 Analyzing model performance...") | |
| # Predictions | |
| y_train_pred = model.predict(X_train) | |
| y_test_pred = model.predict(X_test) | |
| # Detect task type | |
| unique_values = len(np.unique(y_train)) | |
| task_type = "classification" if unique_values < 20 else "regression" | |
| # Calculate metrics | |
| if task_type == "classification": | |
| train_score = accuracy_score(y_train, y_train_pred) | |
| test_score = accuracy_score(y_test, y_test_pred) | |
| metric_name = "accuracy" | |
| else: | |
| train_score = r2_score(y_train, y_train_pred) | |
| test_score = r2_score(y_test, y_test_pred) | |
| metric_name = "r2" | |
| # Diagnose issues | |
| score_gap = train_score - test_score | |
| diagnosis = [] | |
| if score_gap > 0.15: | |
| diagnosis.append({ | |
| 'issue': 'overfitting', | |
| 'severity': 'high' if score_gap > 0.25 else 'medium', | |
| 'description': f'Training {metric_name} ({train_score:.3f}) is much higher than test {metric_name} ({test_score:.3f})', | |
| 'recommendations': [ | |
| 'Add regularization (L1/L2)', | |
| 'Reduce model complexity', | |
| 'Increase training data', | |
| 'Use cross-validation', | |
| 'Add dropout (for neural networks)' | |
| ] | |
| }) | |
| if test_score < 0.6 and task_type == "classification": | |
| diagnosis.append({ | |
| 'issue': 'underfitting', | |
| 'severity': 'high', | |
| 'description': f'Test accuracy ({test_score:.3f}) is too low', | |
| 'recommendations': [ | |
| 'Increase model complexity', | |
| 'Engineer better features', | |
| 'Try ensemble methods', | |
| 'Tune hyperparameters', | |
| 'Check for data quality issues' | |
| ] | |
| }) | |
| if test_score < 0.3 and task_type == "regression": | |
| diagnosis.append({ | |
| 'issue': 'underfitting', | |
| 'severity': 'high', | |
| 'description': f'Test R² ({test_score:.3f}) is too low', | |
| 'recommendations': [ | |
| 'Increase model complexity', | |
| 'Engineer better features', | |
| 'Try non-linear models', | |
| 'Check for data scaling issues' | |
| ] | |
| }) | |
| # Bias-variance analysis | |
| if abs(score_gap) < 0.05: | |
| bias_variance = 'balanced' | |
| elif score_gap > 0.15: | |
| bias_variance = 'high_variance' # Overfitting | |
| else: | |
| bias_variance = 'high_bias' # Underfitting | |
| # Generate learning curve data | |
| print("📊 Generating learning curve...") | |
| try: | |
| train_sizes = np.linspace(0.1, 1.0, 10) | |
| train_sizes_abs, train_scores, val_scores = learning_curve( | |
| model, X_train, y_train, | |
| train_sizes=train_sizes, | |
| cv=5, | |
| scoring='accuracy' if task_type == "classification" else 'r2', | |
| n_jobs=-1 | |
| ) | |
| learning_curve_data = { | |
| 'train_sizes': train_sizes_abs.tolist(), | |
| 'train_scores_mean': train_scores.mean(axis=1).tolist(), | |
| 'val_scores_mean': val_scores.mean(axis=1).tolist() | |
| } | |
| except Exception as e: | |
| learning_curve_data = {'error': str(e)} | |
| return { | |
| 'status': 'success', | |
| 'task_type': task_type, | |
| 'train_score': float(train_score), | |
| 'test_score': float(test_score), | |
| 'score_gap': float(score_gap), | |
| 'bias_variance_assessment': bias_variance, | |
| 'diagnosis': diagnosis, | |
| 'learning_curve': learning_curve_data, | |
| 'summary': f"Model shows {bias_variance} with {len(diagnosis)} issues detected" | |
| } | |
| def detect_anomalies( | |
| file_path: str, | |
| method: str = "isolation_forest", | |
| contamination: float = 0.1, | |
| columns: Optional[List[str]] = None, | |
| output_path: Optional[str] = None | |
| ) -> Dict[str, Any]: | |
| """ | |
| Detect anomalies/outliers using various methods. | |
| Args: | |
| file_path: Path to dataset | |
| method: Anomaly detection method: | |
| - 'isolation_forest': Isolation Forest (good for high-dim data) | |
| - 'lof': Local Outlier Factor | |
| - 'zscore': Z-score method (univariate) | |
| - 'iqr': Interquartile Range method (univariate) | |
| contamination: Expected proportion of outliers (0.01 to 0.5) | |
| columns: Columns to analyze (None = all numeric) | |
| output_path: Path to save dataset with anomaly labels | |
| Returns: | |
| Dictionary with anomaly detection results | |
| """ | |
| # Validation | |
| validate_file_exists(file_path) | |
| validate_file_format(file_path) | |
| # Load data | |
| df = load_dataframe(file_path) | |
| validate_dataframe(df) | |
| # Get numeric columns if not specified | |
| if columns is None: | |
| columns = get_numeric_columns(df) | |
| print(f"🔢 Auto-detected {len(columns)} numeric columns") | |
| else: | |
| for col in columns: | |
| validate_column_exists(df, col) | |
| if not columns: | |
| return { | |
| 'status': 'skipped', | |
| 'message': 'No numeric columns found for anomaly detection' | |
| } | |
| X = df[columns].fill_null(0).to_numpy() | |
| print(f"🔍 Detecting anomalies using {method}...") | |
| # Detect anomalies based on method | |
| if method == "isolation_forest": | |
| detector = IsolationForest(contamination=contamination, random_state=42, n_jobs=-1) | |
| predictions = detector.fit_predict(X) | |
| anomaly_scores = detector.score_samples(X) | |
| anomalies = predictions == -1 | |
| elif method == "lof": | |
| detector = LocalOutlierFactor(contamination=contamination, n_jobs=-1) | |
| predictions = detector.fit_predict(X) | |
| anomaly_scores = detector.negative_outlier_factor_ | |
| anomalies = predictions == -1 | |
| elif method == "zscore": | |
| # Z-score for each column | |
| z_scores = np.abs(stats.zscore(X, axis=0)) | |
| anomalies = (z_scores > 3).any(axis=1) | |
| anomaly_scores = z_scores.max(axis=1) | |
| elif method == "iqr": | |
| # IQR for each column | |
| Q1 = np.percentile(X, 25, axis=0) | |
| Q3 = np.percentile(X, 75, axis=0) | |
| IQR = Q3 - Q1 | |
| lower_bound = Q1 - 1.5 * IQR | |
| upper_bound = Q3 + 1.5 * IQR | |
| anomalies = ((X < lower_bound) | (X > upper_bound)).any(axis=1) | |
| # Calculate how many IQRs away from bounds | |
| dist_from_bounds = np.maximum( | |
| (lower_bound - X) / IQR, | |
| (X - upper_bound) / IQR | |
| ).max(axis=1) | |
| anomaly_scores = dist_from_bounds | |
| else: | |
| raise ValueError(f"Unsupported method: {method}") | |
| # Count anomalies | |
| n_anomalies = int(anomalies.sum()) | |
| anomaly_percentage = float(n_anomalies / len(df) * 100) | |
| print(f"🚨 Found {n_anomalies} anomalies ({anomaly_percentage:.2f}%)") | |
| # Add anomaly labels to dataframe | |
| df_with_anomalies = df.with_columns([ | |
| pl.Series('is_anomaly', anomalies.astype(int)), | |
| pl.Series('anomaly_score', anomaly_scores) | |
| ]) | |
| # Get indices of anomalies | |
| anomaly_indices = np.where(anomalies)[0].tolist() | |
| # Analyze anomalies by column | |
| column_anomaly_stats = {} | |
| for col in columns: | |
| col_data = df[col].to_numpy() | |
| anomaly_values = col_data[anomalies] | |
| if len(anomaly_values) > 0: | |
| column_anomaly_stats[col] = { | |
| 'mean_normal': float(np.mean(col_data[~anomalies])), | |
| 'mean_anomaly': float(np.mean(anomaly_values)), | |
| 'std_normal': float(np.std(col_data[~anomalies])), | |
| 'std_anomaly': float(np.std(anomaly_values)) | |
| } | |
| # Save if output path provided | |
| if output_path: | |
| from utils.polars_helpers import save_dataframe | |
| save_dataframe(df_with_anomalies, output_path) | |
| print(f"💾 Dataset with anomaly labels saved to: {output_path}") | |
| return { | |
| 'status': 'success', | |
| 'method': method, | |
| 'n_anomalies': n_anomalies, | |
| 'anomaly_percentage': anomaly_percentage, | |
| 'anomaly_indices': anomaly_indices[:100], # First 100 | |
| 'column_statistics': column_anomaly_stats, | |
| 'contamination': contamination, | |
| 'output_path': output_path | |
| } | |
| def detect_and_handle_multicollinearity( | |
| file_path: str, | |
| threshold: float = 10.0, | |
| action: str = "report", | |
| output_path: Optional[str] = None | |
| ) -> Dict[str, Any]: | |
| """ | |
| Detect and optionally handle multicollinearity using VIF (Variance Inflation Factor). | |
| Args: | |
| file_path: Path to dataset | |
| threshold: VIF threshold (10 = high multicollinearity, 5 = moderate) | |
| action: Action to take: | |
| - 'report': Only report VIF values | |
| - 'remove': Remove features with VIF > threshold | |
| - 'recommend': Provide regularization recommendations | |
| output_path: Path to save dataset with reduced features | |
| Returns: | |
| Dictionary with VIF values and recommendations | |
| """ | |
| # Validation | |
| validate_file_exists(file_path) | |
| validate_file_format(file_path) | |
| # Load data | |
| df = load_dataframe(file_path) | |
| validate_dataframe(df) | |
| # Get numeric columns | |
| numeric_cols = get_numeric_columns(df) | |
| if len(numeric_cols) < 2: | |
| return { | |
| 'status': 'skipped', | |
| 'message': 'Need at least 2 numeric columns for multicollinearity analysis' | |
| } | |
| print(f"🔍 Calculating VIF for {len(numeric_cols)} features...") | |
| # Prepare data | |
| X = df[numeric_cols].fill_null(0).to_numpy() | |
| # Calculate VIF for each feature | |
| vif_data = {} | |
| problematic_features = [] | |
| for i, col in enumerate(numeric_cols): | |
| try: | |
| vif = variance_inflation_factor(X, i) | |
| vif_data[col] = float(vif) | |
| if vif > threshold: | |
| problematic_features.append({ | |
| 'feature': col, | |
| 'vif': float(vif), | |
| 'severity': 'high' if vif > 20 else 'moderate' | |
| }) | |
| except Exception as e: | |
| vif_data[col] = None | |
| print(f"⚠️ Could not calculate VIF for {col}: {str(e)}") | |
| # Sort by VIF | |
| sorted_vif = dict(sorted(vif_data.items(), key=lambda x: x[1] if x[1] is not None else 0, reverse=True)) | |
| # Generate recommendations | |
| recommendations = [] | |
| if len(problematic_features) > 0: | |
| recommendations.append({ | |
| 'type': 'regularization', | |
| 'description': 'Use Ridge (L2) or Elastic Net regularization to handle multicollinearity', | |
| 'reason': f'{len(problematic_features)} features have VIF > {threshold}' | |
| }) | |
| recommendations.append({ | |
| 'type': 'pca', | |
| 'description': 'Apply PCA to reduce dimensionality and eliminate correlations', | |
| 'reason': 'PCA creates orthogonal features' | |
| }) | |
| if action == "remove": | |
| # Remove features with highest VIF iteratively | |
| features_to_remove = [f['feature'] for f in problematic_features] | |
| recommendations.append({ | |
| 'type': 'feature_removal', | |
| 'description': f'Remove {len(features_to_remove)} features with high VIF', | |
| 'features': features_to_remove | |
| }) | |
| # Handle action | |
| if action == "remove" and len(problematic_features) > 0: | |
| # Remove features with VIF > threshold | |
| features_to_keep = [col for col in numeric_cols if col not in [f['feature'] for f in problematic_features]] | |
| categorical_cols = get_categorical_columns(df) | |
| df_reduced = df.select(features_to_keep + categorical_cols) | |
| if output_path: | |
| from utils.polars_helpers import save_dataframe | |
| save_dataframe(df_reduced, output_path) | |
| print(f"💾 Dataset with reduced features saved to: {output_path}") | |
| return { | |
| 'status': 'success', | |
| 'action': 'removed', | |
| 'vif_values': sorted_vif, | |
| 'problematic_features': problematic_features, | |
| 'features_removed': len(problematic_features), | |
| 'features_remaining': len(features_to_keep), | |
| 'recommendations': recommendations, | |
| 'output_path': output_path | |
| } | |
| return { | |
| 'status': 'success', | |
| 'action': action, | |
| 'vif_values': sorted_vif, | |
| 'problematic_features': problematic_features, | |
| 'threshold': threshold, | |
| 'recommendations': recommendations | |
| } | |
| def perform_statistical_tests( | |
| file_path: str, | |
| target_col: str, | |
| test_type: str = "auto", | |
| features: Optional[List[str]] = None, | |
| alpha: float = 0.05 | |
| ) -> Dict[str, Any]: | |
| """ | |
| Perform statistical hypothesis tests to validate feature relationships. | |
| Args: | |
| file_path: Path to dataset | |
| target_col: Target column name | |
| test_type: Type of test: | |
| - 'auto': Automatically select based on data types | |
| - 'chi2': Chi-square test (categorical vs categorical) | |
| - 'ttest': T-test (binary categorical vs numeric) | |
| - 'anova': ANOVA (multi-class categorical vs numeric) | |
| - 'pearson': Pearson correlation test (numeric vs numeric) | |
| features: Features to test (None = all) | |
| alpha: Significance level (default 0.05) | |
| Returns: | |
| Dictionary with test results and p-values | |
| """ | |
| # Validation | |
| validate_file_exists(file_path) | |
| validate_file_format(file_path) | |
| # Load data | |
| df = load_dataframe(file_path) | |
| validate_dataframe(df) | |
| validate_column_exists(df, target_col) | |
| # Get column types | |
| numeric_cols = get_numeric_columns(df) | |
| categorical_cols = get_categorical_columns(df) | |
| # Determine target type | |
| target_is_numeric = target_col in numeric_cols | |
| target_is_categorical = target_col in categorical_cols | |
| # Get features to test | |
| if features is None: | |
| features = [col for col in df.columns if col != target_col] | |
| print(f"📊 Performing statistical tests for {len(features)} features...") | |
| test_results = [] | |
| for feature in features: | |
| feature_is_numeric = feature in numeric_cols | |
| feature_is_categorical = feature in categorical_cols | |
| # Skip if feature is target | |
| if feature == target_col: | |
| continue | |
| # Select appropriate test | |
| if test_type == "auto": | |
| if target_is_numeric and feature_is_numeric: | |
| selected_test = "pearson" | |
| elif target_is_categorical and feature_is_numeric: | |
| target_unique = df[target_col].n_unique() | |
| selected_test = "ttest" if target_unique == 2 else "anova" | |
| elif target_is_categorical and feature_is_categorical: | |
| selected_test = "chi2" | |
| elif target_is_numeric and feature_is_categorical: | |
| selected_test = "anova" | |
| else: | |
| continue | |
| else: | |
| selected_test = test_type | |
| # Perform test | |
| try: | |
| if selected_test == "pearson": | |
| # Pearson correlation | |
| feature_data = df[feature].drop_nulls().to_numpy() | |
| target_data = df[target_col].drop_nulls().to_numpy() | |
| # Align lengths | |
| min_len = min(len(feature_data), len(target_data)) | |
| corr, pval = pearsonr(feature_data[:min_len], target_data[:min_len]) | |
| test_results.append({ | |
| 'feature': feature, | |
| 'test': 'pearson', | |
| 'statistic': float(corr), | |
| 'p_value': float(pval), | |
| 'significant': pval < alpha, | |
| 'interpretation': f"Correlation: {corr:.3f}" | |
| }) | |
| elif selected_test == "chi2": | |
| # Chi-square test | |
| contingency_table = pd.crosstab( | |
| df[feature].to_pandas(), | |
| df[target_col].to_pandas() | |
| ) | |
| chi2, pval, dof, expected = chi2_contingency(contingency_table) | |
| test_results.append({ | |
| 'feature': feature, | |
| 'test': 'chi2', | |
| 'statistic': float(chi2), | |
| 'p_value': float(pval), | |
| 'dof': int(dof), | |
| 'significant': pval < alpha | |
| }) | |
| elif selected_test == "ttest": | |
| # T-test | |
| target_values = df[target_col].unique().to_list() | |
| if len(target_values) != 2: | |
| continue | |
| group1 = df.filter(pl.col(target_col) == target_values[0])[feature].drop_nulls().to_numpy() | |
| group2 = df.filter(pl.col(target_col) == target_values[1])[feature].drop_nulls().to_numpy() | |
| t_stat, pval = ttest_ind(group1, group2) | |
| test_results.append({ | |
| 'feature': feature, | |
| 'test': 'ttest', | |
| 'statistic': float(t_stat), | |
| 'p_value': float(pval), | |
| 'significant': pval < alpha, | |
| 'mean_diff': float(np.mean(group1) - np.mean(group2)) | |
| }) | |
| elif selected_test == "anova": | |
| # ANOVA | |
| groups = [] | |
| target_values = df[target_col].unique().to_list() | |
| for val in target_values: | |
| group_data = df.filter(pl.col(target_col) == val)[feature].drop_nulls().to_numpy() | |
| if len(group_data) > 0: | |
| groups.append(group_data) | |
| if len(groups) > 1: | |
| f_stat, pval = f_oneway(*groups) | |
| test_results.append({ | |
| 'feature': feature, | |
| 'test': 'anova', | |
| 'statistic': float(f_stat), | |
| 'p_value': float(pval), | |
| 'significant': pval < alpha, | |
| 'n_groups': len(groups) | |
| }) | |
| except Exception as e: | |
| print(f"⚠️ Test failed for {feature}: {str(e)}") | |
| # Summary | |
| significant_features = [r for r in test_results if r['significant']] | |
| print(f"✅ {len(significant_features)}/{len(test_results)} features are statistically significant (α={alpha})") | |
| return { | |
| 'status': 'success', | |
| 'target_column': target_col, | |
| 'alpha': alpha, | |
| 'total_tests': len(test_results), | |
| 'significant_features': len(significant_features), | |
| 'test_results': test_results, | |
| 'significant_features_list': [r['feature'] for r in significant_features] | |
| } | |