Spaces:
Sleeping
Sleeping
| """ | |
| EDA Engine Module - InsightGenAI | |
| ================================ | |
| Performs comprehensive Exploratory Data Analysis including | |
| summary statistics, correlation analysis, distribution analysis, | |
| and outlier detection. | |
| Author: InsightGenAI Team | |
| Version: 1.0.0 | |
| """ | |
| import pandas as pd | |
| import numpy as np | |
| import matplotlib.pyplot as plt | |
| import seaborn as sns | |
| from typing import Dict, List, Tuple, Optional, Any | |
| import streamlit as st | |
| from scipy import stats | |
| import warnings | |
| warnings.filterwarnings('ignore') | |
| class EDAEngine: | |
| """ | |
| A class to perform comprehensive Exploratory Data Analysis. | |
| Attributes: | |
| df (pd.DataFrame): The dataset to analyze | |
| numeric_cols (List): List of numeric column names | |
| categorical_cols (List): List of categorical column names | |
| text_cols (List): List of text column names | |
| """ | |
| def __init__(self, df: pd.DataFrame, column_types: Dict[str, str]): | |
| """ | |
| Initialize the EDA Engine. | |
| Args: | |
| df: The dataset to analyze | |
| column_types: Dictionary mapping columns to their types | |
| """ | |
| self.df = df.copy() | |
| self.column_types = column_types | |
| self.numeric_cols = [col for col, t in column_types.items() if t == 'numeric'] | |
| self.categorical_cols = [col for col, t in column_types.items() if t == 'categorical'] | |
| self.text_cols = [col for col, t in column_types.items() if t == 'text'] | |
| self.datetime_cols = [col for col, t in column_types.items() if t == 'datetime'] | |
| # Set style for matplotlib | |
| plt.style.use('seaborn-v0_8-darkgrid') | |
| sns.set_palette("husl") | |
| def get_summary_statistics(self) -> Dict[str, pd.DataFrame]: | |
| """ | |
| Generate summary statistics for all columns. | |
| Returns: | |
| Dict containing statistics for different column types | |
| """ | |
| stats_dict = {} | |
| # Numeric columns statistics | |
| if self.numeric_cols: | |
| numeric_stats = self.df[self.numeric_cols].describe() | |
| # Add additional statistics | |
| numeric_stats.loc['skewness'] = self.df[self.numeric_cols].skew() | |
| numeric_stats.loc['kurtosis'] = self.df[self.numeric_cols].kurtosis() | |
| numeric_stats.loc['variance'] = self.df[self.numeric_cols].var() | |
| numeric_stats.loc['range'] = self.df[self.numeric_cols].max() - self.df[self.numeric_cols].min() | |
| stats_dict['numeric'] = numeric_stats | |
| # Categorical columns statistics | |
| if self.categorical_cols: | |
| cat_stats = pd.DataFrame({ | |
| col: { | |
| 'unique_count': self.df[col].nunique(), | |
| 'most_frequent': self.df[col].mode()[0] if not self.df[col].mode().empty else 'N/A', | |
| 'most_frequent_count': self.df[col].value_counts().iloc[0] if not self.df[col].value_counts().empty else 0, | |
| 'missing': self.df[col].isnull().sum() | |
| } | |
| for col in self.categorical_cols | |
| }).T | |
| stats_dict['categorical'] = cat_stats | |
| # Text columns statistics | |
| if self.text_cols: | |
| text_stats = pd.DataFrame({ | |
| col: { | |
| 'unique_count': self.df[col].nunique(), | |
| 'avg_length': self.df[col].dropna().astype(str).str.len().mean(), | |
| 'max_length': self.df[col].dropna().astype(str).str.len().max(), | |
| 'min_length': self.df[col].dropna().astype(str).str.len().min() | |
| } | |
| for col in self.text_cols | |
| }).T | |
| stats_dict['text'] = text_stats | |
| return stats_dict | |
| def get_correlation_matrix(self) -> Optional[pd.DataFrame]: | |
| """ | |
| Calculate correlation matrix for numeric columns. | |
| Returns: | |
| pd.DataFrame: Correlation matrix or None if no numeric columns | |
| """ | |
| if len(self.numeric_cols) < 2: | |
| return None | |
| return self.df[self.numeric_cols].corr() | |
| def plot_correlation_matrix(self, figsize: Tuple[int, int] = (10, 8)) -> plt.Figure: | |
| """ | |
| Create a correlation matrix heatmap. | |
| Args: | |
| figsize: Figure size tuple | |
| Returns: | |
| matplotlib Figure object | |
| """ | |
| corr_matrix = self.get_correlation_matrix() | |
| if corr_matrix is None: | |
| return None | |
| fig, ax = plt.subplots(figsize=figsize) | |
| mask = np.triu(np.ones_like(corr_matrix, dtype=bool)) | |
| sns.heatmap(corr_matrix, mask=mask, annot=True, fmt='.2f', | |
| cmap='RdBu_r', center=0, ax=ax, | |
| square=True, linewidths=0.5) | |
| ax.set_title('Correlation Matrix', fontsize=14, fontweight='bold') | |
| plt.tight_layout() | |
| return fig | |
| def plot_distribution(self, column: str, figsize: Tuple[int, int] = (10, 6)) -> plt.Figure: | |
| """ | |
| Plot distribution of a numeric column. | |
| Args: | |
| column: Column name to plot | |
| figsize: Figure size tuple | |
| Returns: | |
| matplotlib Figure object | |
| """ | |
| fig, axes = plt.subplots(1, 2, figsize=figsize) | |
| # Histogram with KDE | |
| sns.histplot(self.df[column].dropna(), kde=True, ax=axes[0], color='steelblue') | |
| axes[0].set_title(f'Distribution of {column}', fontweight='bold') | |
| axes[0].set_xlabel(column) | |
| axes[0].set_ylabel('Frequency') | |
| # Box plot | |
| sns.boxplot(y=self.df[column].dropna(), ax=axes[1], color='lightblue') | |
| axes[1].set_title(f'Box Plot of {column}', fontweight='bold') | |
| axes[1].set_ylabel(column) | |
| plt.tight_layout() | |
| return fig | |
| def plot_target_distribution(self, target_col: str, figsize: Tuple[int, int] = (10, 6)) -> plt.Figure: | |
| """ | |
| Plot distribution of the target variable. | |
| Args: | |
| target_col: Target column name | |
| figsize: Figure size tuple | |
| Returns: | |
| matplotlib Figure object | |
| """ | |
| fig, ax = plt.subplots(figsize=figsize) | |
| if self.column_types.get(target_col) == 'numeric': | |
| # For regression target | |
| sns.histplot(self.df[target_col].dropna(), kde=True, ax=ax, color='steelblue') | |
| ax.set_title(f'Target Distribution: {target_col}', fontsize=14, fontweight='bold') | |
| ax.set_xlabel(target_col) | |
| ax.set_ylabel('Frequency') | |
| else: | |
| # For classification target | |
| value_counts = self.df[target_col].value_counts() | |
| colors = sns.color_palette("husl", len(value_counts)) | |
| value_counts.plot(kind='bar', ax=ax, color=colors) | |
| ax.set_title(f'Target Distribution: {target_col}', fontsize=14, fontweight='bold') | |
| ax.set_xlabel(target_col) | |
| ax.set_ylabel('Count') | |
| ax.tick_params(axis='x', rotation=45) | |
| plt.tight_layout() | |
| return fig | |
| def detect_outliers(self, method: str = 'iqr') -> Dict[str, Dict]: | |
| """ | |
| Detect outliers in numeric columns. | |
| Args: | |
| method: Outlier detection method ('iqr' or 'zscore') | |
| Returns: | |
| Dict with outlier information for each column | |
| """ | |
| outliers_dict = {} | |
| for col in self.numeric_cols: | |
| col_data = self.df[col].dropna() | |
| outliers_info = { | |
| 'total_values': len(col_data), | |
| 'outlier_count': 0, | |
| 'outlier_percentage': 0, | |
| 'outlier_indices': [] | |
| } | |
| if method == 'iqr': | |
| Q1 = col_data.quantile(0.25) | |
| Q3 = col_data.quantile(0.75) | |
| IQR = Q3 - Q1 | |
| lower_bound = Q1 - 1.5 * IQR | |
| upper_bound = Q3 + 1.5 * IQR | |
| outlier_mask = (col_data < lower_bound) | (col_data > upper_bound) | |
| outliers_info['outlier_indices'] = col_data[outlier_mask].index.tolist() | |
| outliers_info['outlier_count'] = outlier_mask.sum() | |
| elif method == 'zscore': | |
| z_scores = np.abs(stats.zscore(col_data)) | |
| outlier_mask = z_scores > 3 | |
| outliers_info['outlier_indices'] = col_data[outlier_mask].index.tolist() | |
| outliers_info['outlier_count'] = outlier_mask.sum() | |
| outliers_info['outlier_percentage'] = (outliers_info['outlier_count'] / outliers_info['total_values']) * 100 | |
| outliers_dict[col] = outliers_info | |
| return outliers_dict | |
| def plot_outliers(self, columns: Optional[List[str]] = None, | |
| figsize: Tuple[int, int] = (12, 8)) -> plt.Figure: | |
| """ | |
| Create box plots to visualize outliers. | |
| Args: | |
| columns: List of columns to plot (default: all numeric) | |
| figsize: Figure size tuple | |
| Returns: | |
| matplotlib Figure object | |
| """ | |
| cols_to_plot = columns if columns else self.numeric_cols[:6] # Limit to 6 columns | |
| if not cols_to_plot: | |
| return None | |
| n_cols = min(3, len(cols_to_plot)) | |
| n_rows = (len(cols_to_plot) + n_cols - 1) // n_cols | |
| fig, axes = plt.subplots(n_rows, n_cols, figsize=figsize) | |
| if n_rows == 1 and n_cols == 1: | |
| axes = np.array([axes]) | |
| axes = axes.flatten() | |
| for i, col in enumerate(cols_to_plot): | |
| sns.boxplot(y=self.df[col].dropna(), ax=axes[i], color='lightcoral') | |
| axes[i].set_title(f'{col}', fontweight='bold') | |
| axes[i].set_ylabel('') | |
| # Hide unused subplots | |
| for i in range(len(cols_to_plot), len(axes)): | |
| axes[i].set_visible(False) | |
| plt.suptitle('Outlier Detection - Box Plots', fontsize=16, fontweight='bold') | |
| plt.tight_layout() | |
| return fig | |
| def get_feature_importance_preliminary(self, target_col: str) -> Optional[pd.DataFrame]: | |
| """ | |
| Calculate preliminary feature importance using correlation (for numeric features) | |
| and mutual information (for categorical features). | |
| Args: | |
| target_col: Target column name | |
| Returns: | |
| pd.DataFrame with feature importance scores | |
| """ | |
| importance_scores = [] | |
| target_type = self.column_types.get(target_col) | |
| # Correlation with target for numeric features | |
| if target_type == 'numeric': | |
| for col in self.numeric_cols: | |
| if col != target_col: | |
| corr = self.df[col].corr(self.df[target_col]) | |
| importance_scores.append({ | |
| 'feature': col, | |
| 'importance': abs(corr) if not pd.isna(corr) else 0, | |
| 'method': 'correlation' | |
| }) | |
| # For categorical target, use ANOVA F-value | |
| else: | |
| from sklearn.feature_selection import f_classif | |
| numeric_features = [col for col in self.numeric_cols if col != target_col] | |
| if numeric_features: | |
| X = self.df[numeric_features].fillna(self.df[numeric_features].mean()) | |
| y = self.df[target_col] | |
| # Remove rows where target is missing | |
| mask = y.notna() | |
| X = X[mask] | |
| y = y[mask] | |
| if len(X) > 0: | |
| f_scores, p_values = f_classif(X, y) | |
| for i, col in enumerate(numeric_features): | |
| importance_scores.append({ | |
| 'feature': col, | |
| 'importance': f_scores[i] if not pd.isna(f_scores[i]) else 0, | |
| 'method': 'f_classif' | |
| }) | |
| if importance_scores: | |
| importance_df = pd.DataFrame(importance_scores) | |
| importance_df = importance_df.sort_values('importance', ascending=False) | |
| return importance_df | |
| return None | |
| def generate_insights(self, target_col: Optional[str] = None) -> Dict[str, Any]: | |
| """ | |
| Generate automated insights about the dataset. | |
| Args: | |
| target_col: Optional target column for targeted insights | |
| Returns: | |
| Dict containing various insights | |
| """ | |
| insights = { | |
| 'dataset_shape': self.df.shape, | |
| 'total_missing': self.df.isnull().sum().sum(), | |
| 'missing_percentage': (self.df.isnull().sum().sum() / (self.df.shape[0] * self.df.shape[1])) * 100, | |
| 'duplicate_rows': self.df.duplicated().sum(), | |
| 'numeric_columns': len(self.numeric_cols), | |
| 'categorical_columns': len(self.categorical_cols), | |
| 'text_columns': len(self.text_cols) | |
| } | |
| # High correlation pairs | |
| if len(self.numeric_cols) >= 2: | |
| corr_matrix = self.get_correlation_matrix() | |
| high_corr_pairs = [] | |
| for i in range(len(corr_matrix.columns)): | |
| for j in range(i+1, len(corr_matrix.columns)): | |
| corr_val = corr_matrix.iloc[i, j] | |
| if abs(corr_val) > 0.8: | |
| high_corr_pairs.append({ | |
| 'feature1': corr_matrix.columns[i], | |
| 'feature2': corr_matrix.columns[j], | |
| 'correlation': corr_val | |
| }) | |
| insights['high_correlation_pairs'] = high_corr_pairs | |
| # Skewed features | |
| if self.numeric_cols: | |
| skewed_features = [] | |
| for col in self.numeric_cols: | |
| skewness = self.df[col].skew() | |
| if abs(skewness) > 2: | |
| skewed_features.append({'feature': col, 'skewness': skewness}) | |
| insights['highly_skewed_features'] = skewed_features | |
| # Target-specific insights | |
| if target_col and target_col in self.df.columns: | |
| target_type = self.column_types.get(target_col) | |
| if target_type == 'numeric': | |
| insights['target_stats'] = { | |
| 'mean': self.df[target_col].mean(), | |
| 'std': self.df[target_col].std(), | |
| 'min': self.df[target_col].min(), | |
| 'max': self.df[target_col].max() | |
| } | |
| else: | |
| class_balance = self.df[target_col].value_counts(normalize=True) | |
| insights['class_balance'] = class_balance.to_dict() | |
| insights['is_imbalanced'] = (class_balance.max() > 0.7) | |
| return insights | |
| # Streamlit display functions | |
| def display_eda_summary(eda: EDAEngine): | |
| """Display EDA summary in Streamlit.""" | |
| st.subheader("π Summary Statistics") | |
| stats = eda.get_summary_statistics() | |
| if 'numeric' in stats: | |
| with st.expander("Numeric Columns"): | |
| st.dataframe(stats['numeric'], use_container_width=True) | |
| if 'categorical' in stats: | |
| with st.expander("Categorical Columns"): | |
| st.dataframe(stats['categorical'], use_container_width=True) | |
| if 'text' in stats: | |
| with st.expander("Text Columns"): | |
| st.dataframe(stats['text'], use_container_width=True) | |
| def display_correlation_analysis(eda: EDAEngine): | |
| """Display correlation analysis in Streamlit.""" | |
| st.subheader("π Correlation Analysis") | |
| corr_matrix = eda.get_correlation_matrix() | |
| if corr_matrix is not None: | |
| fig = eda.plot_correlation_matrix() | |
| st.pyplot(fig) | |
| # Show correlation table | |
| with st.expander("View Correlation Matrix"): | |
| st.dataframe(corr_matrix, use_container_width=True) | |
| else: | |
| st.info("Need at least 2 numeric columns for correlation analysis.") | |
| def display_outlier_analysis(eda: EDAEngine): | |
| """Display outlier analysis in Streamlit.""" | |
| st.subheader("π Outlier Analysis") | |
| outliers = eda.detect_outliers() | |
| if outliers: | |
| # Summary table | |
| outlier_summary = [] | |
| for col, info in outliers.items(): | |
| outlier_summary.append({ | |
| 'Column': col, | |
| 'Outlier Count': info['outlier_count'], | |
| 'Outlier %': f"{info['outlier_percentage']:.2f}%" | |
| }) | |
| st.dataframe(pd.DataFrame(outlier_summary), use_container_width=True) | |
| # Box plots | |
| fig = eda.plot_outliers() | |
| if fig: | |
| st.pyplot(fig) | |
| else: | |
| st.info("No numeric columns available for outlier analysis.") | |