""" EDA Engine Module - InsightGenAI ================================ Performs comprehensive Exploratory Data Analysis including summary statistics, correlation analysis, distribution analysis, and outlier detection. Author: InsightGenAI Team Version: 1.0.0 """ import pandas as pd import numpy as np import matplotlib.pyplot as plt import seaborn as sns from typing import Dict, List, Tuple, Optional, Any import streamlit as st from scipy import stats import warnings warnings.filterwarnings('ignore') class EDAEngine: """ A class to perform comprehensive Exploratory Data Analysis. Attributes: df (pd.DataFrame): The dataset to analyze numeric_cols (List): List of numeric column names categorical_cols (List): List of categorical column names text_cols (List): List of text column names """ def __init__(self, df: pd.DataFrame, column_types: Dict[str, str]): """ Initialize the EDA Engine. Args: df: The dataset to analyze column_types: Dictionary mapping columns to their types """ self.df = df.copy() self.column_types = column_types self.numeric_cols = [col for col, t in column_types.items() if t == 'numeric'] self.categorical_cols = [col for col, t in column_types.items() if t == 'categorical'] self.text_cols = [col for col, t in column_types.items() if t == 'text'] self.datetime_cols = [col for col, t in column_types.items() if t == 'datetime'] # Set style for matplotlib plt.style.use('seaborn-v0_8-darkgrid') sns.set_palette("husl") def get_summary_statistics(self) -> Dict[str, pd.DataFrame]: """ Generate summary statistics for all columns. Returns: Dict containing statistics for different column types """ stats_dict = {} # Numeric columns statistics if self.numeric_cols: numeric_stats = self.df[self.numeric_cols].describe() # Add additional statistics numeric_stats.loc['skewness'] = self.df[self.numeric_cols].skew() numeric_stats.loc['kurtosis'] = self.df[self.numeric_cols].kurtosis() numeric_stats.loc['variance'] = self.df[self.numeric_cols].var() numeric_stats.loc['range'] = self.df[self.numeric_cols].max() - self.df[self.numeric_cols].min() stats_dict['numeric'] = numeric_stats # Categorical columns statistics if self.categorical_cols: cat_stats = pd.DataFrame({ col: { 'unique_count': self.df[col].nunique(), 'most_frequent': self.df[col].mode()[0] if not self.df[col].mode().empty else 'N/A', 'most_frequent_count': self.df[col].value_counts().iloc[0] if not self.df[col].value_counts().empty else 0, 'missing': self.df[col].isnull().sum() } for col in self.categorical_cols }).T stats_dict['categorical'] = cat_stats # Text columns statistics if self.text_cols: text_stats = pd.DataFrame({ col: { 'unique_count': self.df[col].nunique(), 'avg_length': self.df[col].dropna().astype(str).str.len().mean(), 'max_length': self.df[col].dropna().astype(str).str.len().max(), 'min_length': self.df[col].dropna().astype(str).str.len().min() } for col in self.text_cols }).T stats_dict['text'] = text_stats return stats_dict def get_correlation_matrix(self) -> Optional[pd.DataFrame]: """ Calculate correlation matrix for numeric columns. Returns: pd.DataFrame: Correlation matrix or None if no numeric columns """ if len(self.numeric_cols) < 2: return None return self.df[self.numeric_cols].corr() def plot_correlation_matrix(self, figsize: Tuple[int, int] = (10, 8)) -> plt.Figure: """ Create a correlation matrix heatmap. Args: figsize: Figure size tuple Returns: matplotlib Figure object """ corr_matrix = self.get_correlation_matrix() if corr_matrix is None: return None fig, ax = plt.subplots(figsize=figsize) mask = np.triu(np.ones_like(corr_matrix, dtype=bool)) sns.heatmap(corr_matrix, mask=mask, annot=True, fmt='.2f', cmap='RdBu_r', center=0, ax=ax, square=True, linewidths=0.5) ax.set_title('Correlation Matrix', fontsize=14, fontweight='bold') plt.tight_layout() return fig def plot_distribution(self, column: str, figsize: Tuple[int, int] = (10, 6)) -> plt.Figure: """ Plot distribution of a numeric column. Args: column: Column name to plot figsize: Figure size tuple Returns: matplotlib Figure object """ fig, axes = plt.subplots(1, 2, figsize=figsize) # Histogram with KDE sns.histplot(self.df[column].dropna(), kde=True, ax=axes[0], color='steelblue') axes[0].set_title(f'Distribution of {column}', fontweight='bold') axes[0].set_xlabel(column) axes[0].set_ylabel('Frequency') # Box plot sns.boxplot(y=self.df[column].dropna(), ax=axes[1], color='lightblue') axes[1].set_title(f'Box Plot of {column}', fontweight='bold') axes[1].set_ylabel(column) plt.tight_layout() return fig def plot_target_distribution(self, target_col: str, figsize: Tuple[int, int] = (10, 6)) -> plt.Figure: """ Plot distribution of the target variable. Args: target_col: Target column name figsize: Figure size tuple Returns: matplotlib Figure object """ fig, ax = plt.subplots(figsize=figsize) if self.column_types.get(target_col) == 'numeric': # For regression target sns.histplot(self.df[target_col].dropna(), kde=True, ax=ax, color='steelblue') ax.set_title(f'Target Distribution: {target_col}', fontsize=14, fontweight='bold') ax.set_xlabel(target_col) ax.set_ylabel('Frequency') else: # For classification target value_counts = self.df[target_col].value_counts() colors = sns.color_palette("husl", len(value_counts)) value_counts.plot(kind='bar', ax=ax, color=colors) ax.set_title(f'Target Distribution: {target_col}', fontsize=14, fontweight='bold') ax.set_xlabel(target_col) ax.set_ylabel('Count') ax.tick_params(axis='x', rotation=45) plt.tight_layout() return fig def detect_outliers(self, method: str = 'iqr') -> Dict[str, Dict]: """ Detect outliers in numeric columns. Args: method: Outlier detection method ('iqr' or 'zscore') Returns: Dict with outlier information for each column """ outliers_dict = {} for col in self.numeric_cols: col_data = self.df[col].dropna() outliers_info = { 'total_values': len(col_data), 'outlier_count': 0, 'outlier_percentage': 0, 'outlier_indices': [] } if method == 'iqr': Q1 = col_data.quantile(0.25) Q3 = col_data.quantile(0.75) IQR = Q3 - Q1 lower_bound = Q1 - 1.5 * IQR upper_bound = Q3 + 1.5 * IQR outlier_mask = (col_data < lower_bound) | (col_data > upper_bound) outliers_info['outlier_indices'] = col_data[outlier_mask].index.tolist() outliers_info['outlier_count'] = outlier_mask.sum() elif method == 'zscore': z_scores = np.abs(stats.zscore(col_data)) outlier_mask = z_scores > 3 outliers_info['outlier_indices'] = col_data[outlier_mask].index.tolist() outliers_info['outlier_count'] = outlier_mask.sum() outliers_info['outlier_percentage'] = (outliers_info['outlier_count'] / outliers_info['total_values']) * 100 outliers_dict[col] = outliers_info return outliers_dict def plot_outliers(self, columns: Optional[List[str]] = None, figsize: Tuple[int, int] = (12, 8)) -> plt.Figure: """ Create box plots to visualize outliers. Args: columns: List of columns to plot (default: all numeric) figsize: Figure size tuple Returns: matplotlib Figure object """ cols_to_plot = columns if columns else self.numeric_cols[:6] # Limit to 6 columns if not cols_to_plot: return None n_cols = min(3, len(cols_to_plot)) n_rows = (len(cols_to_plot) + n_cols - 1) // n_cols fig, axes = plt.subplots(n_rows, n_cols, figsize=figsize) if n_rows == 1 and n_cols == 1: axes = np.array([axes]) axes = axes.flatten() for i, col in enumerate(cols_to_plot): sns.boxplot(y=self.df[col].dropna(), ax=axes[i], color='lightcoral') axes[i].set_title(f'{col}', fontweight='bold') axes[i].set_ylabel('') # Hide unused subplots for i in range(len(cols_to_plot), len(axes)): axes[i].set_visible(False) plt.suptitle('Outlier Detection - Box Plots', fontsize=16, fontweight='bold') plt.tight_layout() return fig def get_feature_importance_preliminary(self, target_col: str) -> Optional[pd.DataFrame]: """ Calculate preliminary feature importance using correlation (for numeric features) and mutual information (for categorical features). Args: target_col: Target column name Returns: pd.DataFrame with feature importance scores """ importance_scores = [] target_type = self.column_types.get(target_col) # Correlation with target for numeric features if target_type == 'numeric': for col in self.numeric_cols: if col != target_col: corr = self.df[col].corr(self.df[target_col]) importance_scores.append({ 'feature': col, 'importance': abs(corr) if not pd.isna(corr) else 0, 'method': 'correlation' }) # For categorical target, use ANOVA F-value else: from sklearn.feature_selection import f_classif numeric_features = [col for col in self.numeric_cols if col != target_col] if numeric_features: X = self.df[numeric_features].fillna(self.df[numeric_features].mean()) y = self.df[target_col] # Remove rows where target is missing mask = y.notna() X = X[mask] y = y[mask] if len(X) > 0: f_scores, p_values = f_classif(X, y) for i, col in enumerate(numeric_features): importance_scores.append({ 'feature': col, 'importance': f_scores[i] if not pd.isna(f_scores[i]) else 0, 'method': 'f_classif' }) if importance_scores: importance_df = pd.DataFrame(importance_scores) importance_df = importance_df.sort_values('importance', ascending=False) return importance_df return None def generate_insights(self, target_col: Optional[str] = None) -> Dict[str, Any]: """ Generate automated insights about the dataset. Args: target_col: Optional target column for targeted insights Returns: Dict containing various insights """ insights = { 'dataset_shape': self.df.shape, 'total_missing': self.df.isnull().sum().sum(), 'missing_percentage': (self.df.isnull().sum().sum() / (self.df.shape[0] * self.df.shape[1])) * 100, 'duplicate_rows': self.df.duplicated().sum(), 'numeric_columns': len(self.numeric_cols), 'categorical_columns': len(self.categorical_cols), 'text_columns': len(self.text_cols) } # High correlation pairs if len(self.numeric_cols) >= 2: corr_matrix = self.get_correlation_matrix() high_corr_pairs = [] for i in range(len(corr_matrix.columns)): for j in range(i+1, len(corr_matrix.columns)): corr_val = corr_matrix.iloc[i, j] if abs(corr_val) > 0.8: high_corr_pairs.append({ 'feature1': corr_matrix.columns[i], 'feature2': corr_matrix.columns[j], 'correlation': corr_val }) insights['high_correlation_pairs'] = high_corr_pairs # Skewed features if self.numeric_cols: skewed_features = [] for col in self.numeric_cols: skewness = self.df[col].skew() if abs(skewness) > 2: skewed_features.append({'feature': col, 'skewness': skewness}) insights['highly_skewed_features'] = skewed_features # Target-specific insights if target_col and target_col in self.df.columns: target_type = self.column_types.get(target_col) if target_type == 'numeric': insights['target_stats'] = { 'mean': self.df[target_col].mean(), 'std': self.df[target_col].std(), 'min': self.df[target_col].min(), 'max': self.df[target_col].max() } else: class_balance = self.df[target_col].value_counts(normalize=True) insights['class_balance'] = class_balance.to_dict() insights['is_imbalanced'] = (class_balance.max() > 0.7) return insights # Streamlit display functions def display_eda_summary(eda: EDAEngine): """Display EDA summary in Streamlit.""" st.subheader("📊 Summary Statistics") stats = eda.get_summary_statistics() if 'numeric' in stats: with st.expander("Numeric Columns"): st.dataframe(stats['numeric'], use_container_width=True) if 'categorical' in stats: with st.expander("Categorical Columns"): st.dataframe(stats['categorical'], use_container_width=True) if 'text' in stats: with st.expander("Text Columns"): st.dataframe(stats['text'], use_container_width=True) def display_correlation_analysis(eda: EDAEngine): """Display correlation analysis in Streamlit.""" st.subheader("🔗 Correlation Analysis") corr_matrix = eda.get_correlation_matrix() if corr_matrix is not None: fig = eda.plot_correlation_matrix() st.pyplot(fig) # Show correlation table with st.expander("View Correlation Matrix"): st.dataframe(corr_matrix, use_container_width=True) else: st.info("Need at least 2 numeric columns for correlation analysis.") def display_outlier_analysis(eda: EDAEngine): """Display outlier analysis in Streamlit.""" st.subheader("📈 Outlier Analysis") outliers = eda.detect_outliers() if outliers: # Summary table outlier_summary = [] for col, info in outliers.items(): outlier_summary.append({ 'Column': col, 'Outlier Count': info['outlier_count'], 'Outlier %': f"{info['outlier_percentage']:.2f}%" }) st.dataframe(pd.DataFrame(outlier_summary), use_container_width=True) # Box plots fig = eda.plot_outliers() if fig: st.pyplot(fig) else: st.info("No numeric columns available for outlier analysis.")