insightgenai / modules /eda_engine.py
mohsinbhatti's picture
Initial commit - InsightGenAI files
e478478
"""
EDA Engine Module - InsightGenAI
================================
Performs comprehensive Exploratory Data Analysis including
summary statistics, correlation analysis, distribution analysis,
and outlier detection.
Author: InsightGenAI Team
Version: 1.0.0
"""
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from typing import Dict, List, Tuple, Optional, Any
import streamlit as st
from scipy import stats
import warnings
warnings.filterwarnings('ignore')
class EDAEngine:
"""
A class to perform comprehensive Exploratory Data Analysis.
Attributes:
df (pd.DataFrame): The dataset to analyze
numeric_cols (List): List of numeric column names
categorical_cols (List): List of categorical column names
text_cols (List): List of text column names
"""
def __init__(self, df: pd.DataFrame, column_types: Dict[str, str]):
"""
Initialize the EDA Engine.
Args:
df: The dataset to analyze
column_types: Dictionary mapping columns to their types
"""
self.df = df.copy()
self.column_types = column_types
self.numeric_cols = [col for col, t in column_types.items() if t == 'numeric']
self.categorical_cols = [col for col, t in column_types.items() if t == 'categorical']
self.text_cols = [col for col, t in column_types.items() if t == 'text']
self.datetime_cols = [col for col, t in column_types.items() if t == 'datetime']
# Set style for matplotlib
plt.style.use('seaborn-v0_8-darkgrid')
sns.set_palette("husl")
def get_summary_statistics(self) -> Dict[str, pd.DataFrame]:
"""
Generate summary statistics for all columns.
Returns:
Dict containing statistics for different column types
"""
stats_dict = {}
# Numeric columns statistics
if self.numeric_cols:
numeric_stats = self.df[self.numeric_cols].describe()
# Add additional statistics
numeric_stats.loc['skewness'] = self.df[self.numeric_cols].skew()
numeric_stats.loc['kurtosis'] = self.df[self.numeric_cols].kurtosis()
numeric_stats.loc['variance'] = self.df[self.numeric_cols].var()
numeric_stats.loc['range'] = self.df[self.numeric_cols].max() - self.df[self.numeric_cols].min()
stats_dict['numeric'] = numeric_stats
# Categorical columns statistics
if self.categorical_cols:
cat_stats = pd.DataFrame({
col: {
'unique_count': self.df[col].nunique(),
'most_frequent': self.df[col].mode()[0] if not self.df[col].mode().empty else 'N/A',
'most_frequent_count': self.df[col].value_counts().iloc[0] if not self.df[col].value_counts().empty else 0,
'missing': self.df[col].isnull().sum()
}
for col in self.categorical_cols
}).T
stats_dict['categorical'] = cat_stats
# Text columns statistics
if self.text_cols:
text_stats = pd.DataFrame({
col: {
'unique_count': self.df[col].nunique(),
'avg_length': self.df[col].dropna().astype(str).str.len().mean(),
'max_length': self.df[col].dropna().astype(str).str.len().max(),
'min_length': self.df[col].dropna().astype(str).str.len().min()
}
for col in self.text_cols
}).T
stats_dict['text'] = text_stats
return stats_dict
def get_correlation_matrix(self) -> Optional[pd.DataFrame]:
"""
Calculate correlation matrix for numeric columns.
Returns:
pd.DataFrame: Correlation matrix or None if no numeric columns
"""
if len(self.numeric_cols) < 2:
return None
return self.df[self.numeric_cols].corr()
def plot_correlation_matrix(self, figsize: Tuple[int, int] = (10, 8)) -> plt.Figure:
"""
Create a correlation matrix heatmap.
Args:
figsize: Figure size tuple
Returns:
matplotlib Figure object
"""
corr_matrix = self.get_correlation_matrix()
if corr_matrix is None:
return None
fig, ax = plt.subplots(figsize=figsize)
mask = np.triu(np.ones_like(corr_matrix, dtype=bool))
sns.heatmap(corr_matrix, mask=mask, annot=True, fmt='.2f',
cmap='RdBu_r', center=0, ax=ax,
square=True, linewidths=0.5)
ax.set_title('Correlation Matrix', fontsize=14, fontweight='bold')
plt.tight_layout()
return fig
def plot_distribution(self, column: str, figsize: Tuple[int, int] = (10, 6)) -> plt.Figure:
"""
Plot distribution of a numeric column.
Args:
column: Column name to plot
figsize: Figure size tuple
Returns:
matplotlib Figure object
"""
fig, axes = plt.subplots(1, 2, figsize=figsize)
# Histogram with KDE
sns.histplot(self.df[column].dropna(), kde=True, ax=axes[0], color='steelblue')
axes[0].set_title(f'Distribution of {column}', fontweight='bold')
axes[0].set_xlabel(column)
axes[0].set_ylabel('Frequency')
# Box plot
sns.boxplot(y=self.df[column].dropna(), ax=axes[1], color='lightblue')
axes[1].set_title(f'Box Plot of {column}', fontweight='bold')
axes[1].set_ylabel(column)
plt.tight_layout()
return fig
def plot_target_distribution(self, target_col: str, figsize: Tuple[int, int] = (10, 6)) -> plt.Figure:
"""
Plot distribution of the target variable.
Args:
target_col: Target column name
figsize: Figure size tuple
Returns:
matplotlib Figure object
"""
fig, ax = plt.subplots(figsize=figsize)
if self.column_types.get(target_col) == 'numeric':
# For regression target
sns.histplot(self.df[target_col].dropna(), kde=True, ax=ax, color='steelblue')
ax.set_title(f'Target Distribution: {target_col}', fontsize=14, fontweight='bold')
ax.set_xlabel(target_col)
ax.set_ylabel('Frequency')
else:
# For classification target
value_counts = self.df[target_col].value_counts()
colors = sns.color_palette("husl", len(value_counts))
value_counts.plot(kind='bar', ax=ax, color=colors)
ax.set_title(f'Target Distribution: {target_col}', fontsize=14, fontweight='bold')
ax.set_xlabel(target_col)
ax.set_ylabel('Count')
ax.tick_params(axis='x', rotation=45)
plt.tight_layout()
return fig
def detect_outliers(self, method: str = 'iqr') -> Dict[str, Dict]:
"""
Detect outliers in numeric columns.
Args:
method: Outlier detection method ('iqr' or 'zscore')
Returns:
Dict with outlier information for each column
"""
outliers_dict = {}
for col in self.numeric_cols:
col_data = self.df[col].dropna()
outliers_info = {
'total_values': len(col_data),
'outlier_count': 0,
'outlier_percentage': 0,
'outlier_indices': []
}
if method == 'iqr':
Q1 = col_data.quantile(0.25)
Q3 = col_data.quantile(0.75)
IQR = Q3 - Q1
lower_bound = Q1 - 1.5 * IQR
upper_bound = Q3 + 1.5 * IQR
outlier_mask = (col_data < lower_bound) | (col_data > upper_bound)
outliers_info['outlier_indices'] = col_data[outlier_mask].index.tolist()
outliers_info['outlier_count'] = outlier_mask.sum()
elif method == 'zscore':
z_scores = np.abs(stats.zscore(col_data))
outlier_mask = z_scores > 3
outliers_info['outlier_indices'] = col_data[outlier_mask].index.tolist()
outliers_info['outlier_count'] = outlier_mask.sum()
outliers_info['outlier_percentage'] = (outliers_info['outlier_count'] / outliers_info['total_values']) * 100
outliers_dict[col] = outliers_info
return outliers_dict
def plot_outliers(self, columns: Optional[List[str]] = None,
figsize: Tuple[int, int] = (12, 8)) -> plt.Figure:
"""
Create box plots to visualize outliers.
Args:
columns: List of columns to plot (default: all numeric)
figsize: Figure size tuple
Returns:
matplotlib Figure object
"""
cols_to_plot = columns if columns else self.numeric_cols[:6] # Limit to 6 columns
if not cols_to_plot:
return None
n_cols = min(3, len(cols_to_plot))
n_rows = (len(cols_to_plot) + n_cols - 1) // n_cols
fig, axes = plt.subplots(n_rows, n_cols, figsize=figsize)
if n_rows == 1 and n_cols == 1:
axes = np.array([axes])
axes = axes.flatten()
for i, col in enumerate(cols_to_plot):
sns.boxplot(y=self.df[col].dropna(), ax=axes[i], color='lightcoral')
axes[i].set_title(f'{col}', fontweight='bold')
axes[i].set_ylabel('')
# Hide unused subplots
for i in range(len(cols_to_plot), len(axes)):
axes[i].set_visible(False)
plt.suptitle('Outlier Detection - Box Plots', fontsize=16, fontweight='bold')
plt.tight_layout()
return fig
def get_feature_importance_preliminary(self, target_col: str) -> Optional[pd.DataFrame]:
"""
Calculate preliminary feature importance using correlation (for numeric features)
and mutual information (for categorical features).
Args:
target_col: Target column name
Returns:
pd.DataFrame with feature importance scores
"""
importance_scores = []
target_type = self.column_types.get(target_col)
# Correlation with target for numeric features
if target_type == 'numeric':
for col in self.numeric_cols:
if col != target_col:
corr = self.df[col].corr(self.df[target_col])
importance_scores.append({
'feature': col,
'importance': abs(corr) if not pd.isna(corr) else 0,
'method': 'correlation'
})
# For categorical target, use ANOVA F-value
else:
from sklearn.feature_selection import f_classif
numeric_features = [col for col in self.numeric_cols if col != target_col]
if numeric_features:
X = self.df[numeric_features].fillna(self.df[numeric_features].mean())
y = self.df[target_col]
# Remove rows where target is missing
mask = y.notna()
X = X[mask]
y = y[mask]
if len(X) > 0:
f_scores, p_values = f_classif(X, y)
for i, col in enumerate(numeric_features):
importance_scores.append({
'feature': col,
'importance': f_scores[i] if not pd.isna(f_scores[i]) else 0,
'method': 'f_classif'
})
if importance_scores:
importance_df = pd.DataFrame(importance_scores)
importance_df = importance_df.sort_values('importance', ascending=False)
return importance_df
return None
def generate_insights(self, target_col: Optional[str] = None) -> Dict[str, Any]:
"""
Generate automated insights about the dataset.
Args:
target_col: Optional target column for targeted insights
Returns:
Dict containing various insights
"""
insights = {
'dataset_shape': self.df.shape,
'total_missing': self.df.isnull().sum().sum(),
'missing_percentage': (self.df.isnull().sum().sum() / (self.df.shape[0] * self.df.shape[1])) * 100,
'duplicate_rows': self.df.duplicated().sum(),
'numeric_columns': len(self.numeric_cols),
'categorical_columns': len(self.categorical_cols),
'text_columns': len(self.text_cols)
}
# High correlation pairs
if len(self.numeric_cols) >= 2:
corr_matrix = self.get_correlation_matrix()
high_corr_pairs = []
for i in range(len(corr_matrix.columns)):
for j in range(i+1, len(corr_matrix.columns)):
corr_val = corr_matrix.iloc[i, j]
if abs(corr_val) > 0.8:
high_corr_pairs.append({
'feature1': corr_matrix.columns[i],
'feature2': corr_matrix.columns[j],
'correlation': corr_val
})
insights['high_correlation_pairs'] = high_corr_pairs
# Skewed features
if self.numeric_cols:
skewed_features = []
for col in self.numeric_cols:
skewness = self.df[col].skew()
if abs(skewness) > 2:
skewed_features.append({'feature': col, 'skewness': skewness})
insights['highly_skewed_features'] = skewed_features
# Target-specific insights
if target_col and target_col in self.df.columns:
target_type = self.column_types.get(target_col)
if target_type == 'numeric':
insights['target_stats'] = {
'mean': self.df[target_col].mean(),
'std': self.df[target_col].std(),
'min': self.df[target_col].min(),
'max': self.df[target_col].max()
}
else:
class_balance = self.df[target_col].value_counts(normalize=True)
insights['class_balance'] = class_balance.to_dict()
insights['is_imbalanced'] = (class_balance.max() > 0.7)
return insights
# Streamlit display functions
def display_eda_summary(eda: EDAEngine):
"""Display EDA summary in Streamlit."""
st.subheader("πŸ“Š Summary Statistics")
stats = eda.get_summary_statistics()
if 'numeric' in stats:
with st.expander("Numeric Columns"):
st.dataframe(stats['numeric'], use_container_width=True)
if 'categorical' in stats:
with st.expander("Categorical Columns"):
st.dataframe(stats['categorical'], use_container_width=True)
if 'text' in stats:
with st.expander("Text Columns"):
st.dataframe(stats['text'], use_container_width=True)
def display_correlation_analysis(eda: EDAEngine):
"""Display correlation analysis in Streamlit."""
st.subheader("πŸ”— Correlation Analysis")
corr_matrix = eda.get_correlation_matrix()
if corr_matrix is not None:
fig = eda.plot_correlation_matrix()
st.pyplot(fig)
# Show correlation table
with st.expander("View Correlation Matrix"):
st.dataframe(corr_matrix, use_container_width=True)
else:
st.info("Need at least 2 numeric columns for correlation analysis.")
def display_outlier_analysis(eda: EDAEngine):
"""Display outlier analysis in Streamlit."""
st.subheader("πŸ“ˆ Outlier Analysis")
outliers = eda.detect_outliers()
if outliers:
# Summary table
outlier_summary = []
for col, info in outliers.items():
outlier_summary.append({
'Column': col,
'Outlier Count': info['outlier_count'],
'Outlier %': f"{info['outlier_percentage']:.2f}%"
})
st.dataframe(pd.DataFrame(outlier_summary), use_container_width=True)
# Box plots
fig = eda.plot_outliers()
if fig:
st.pyplot(fig)
else:
st.info("No numeric columns available for outlier analysis.")