Girinath11's picture
Upload 8 files
4aa0277 verified
"""
Exploratory Data Analysis Agent - Handles comprehensive data analysis
"""
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from scipy import stats
import warnings
warnings.filterwarnings('ignore')
class EDAAgent:
"""Agent for Exploratory Data Analysis"""
def __init__(self):
self.analysis_results = {}
def analyze_data(self, data, target_column=None):
"""
Comprehensive EDA analysis
Args:
data: Input DataFrame
target_column: Optional target variable for supervised analysis
Returns:
Dictionary containing comprehensive analysis results
"""
analysis = {}
# Basic statistics
analysis['basic_stats'] = self._basic_statistics(data)
# Correlation analysis
analysis['correlations'] = self._correlation_analysis(data)
# Distribution analysis
analysis['distributions'] = self._distribution_analysis(data)
# Feature insights
analysis['feature_insights'] = self._feature_insights(data)
# Target analysis (if target column provided)
if target_column and target_column in data.columns:
analysis['target_analysis'] = self._target_analysis(data, target_column)
# Data quality insights
analysis['data_quality'] = self._data_quality_insights(data)
return {
'status': 'success',
'analysis': analysis,
'visualization_recommendations': self._get_visualization_recommendations(data)
}
def _basic_statistics(self, data):
"""Generate comprehensive statistical summary"""
stats = {}
# Overall info
stats['shape'] = data.shape
stats['dtypes'] = data.dtypes.to_dict()
stats['memory_usage'] = f"{data.memory_usage(deep=True).sum() / 1024**2:.2f} MB"
# Numeric summary
numeric_data = data.select_dtypes(include=[np.number])
if not numeric_data.empty:
desc = numeric_data.describe()
stats['numeric_summary'] = desc.to_dict()
# Additional statistics
stats['numeric_extended'] = {}
for col in numeric_data.columns:
stats['numeric_extended'][col] = {
'variance': numeric_data[col].var(),
'skewness': numeric_data[col].skew(),
'kurtosis': numeric_data[col].kurtosis(),
'coefficient_of_variation': numeric_data[col].std() / numeric_data[col].mean() if numeric_data[col].mean() != 0 else np.inf
}
# Categorical summary
categorical_data = data.select_dtypes(include=['object', 'category'])
if not categorical_data.empty:
stats['categorical_summary'] = {}
for col in categorical_data.columns:
stats['categorical_summary'][col] = {
'unique_count': categorical_data[col].nunique(),
'most_frequent': categorical_data[col].mode().iloc[0] if len(categorical_data[col].mode()) > 0 else None,
'frequency_of_most_frequent': categorical_data[col].value_counts().iloc[0] if len(categorical_data[col]) > 0 else 0
}
# Missing values
stats['missing_values'] = data.isnull().sum().to_dict()
# Unique values count
stats['unique_values'] = {col: data[col].nunique() for col in data.columns}
return stats
def _correlation_analysis(self, data):
"""Analyze correlations between numeric variables"""
numeric_data = data.select_dtypes(include=[np.number])
if len(numeric_data.columns) < 2:
return {'message': 'Not enough numeric columns for correlation analysis'}
# Correlation matrix
corr_matrix = numeric_data.corr()
# Find strong correlations
strong_corr = []
for i in range(len(corr_matrix.columns)):
for j in range(i+1, len(corr_matrix.columns)):
corr_val = corr_matrix.iloc[i, j]
if not np.isnan(corr_val) and abs(corr_val) > 0.7:
strong_corr.append({
'var1': corr_matrix.columns[i],
'var2': corr_matrix.columns[j],
'correlation': corr_val,
'strength': 'very_strong' if abs(corr_val) > 0.9 else 'strong'
})
# Find moderate correlations
moderate_corr = []
for i in range(len(corr_matrix.columns)):
for j in range(i+1, len(corr_matrix.columns)):
corr_val = corr_matrix.iloc[i, j]
if not np.isnan(corr_val) and 0.3 <= abs(corr_val) <= 0.7:
moderate_corr.append({
'var1': corr_matrix.columns[i],
'var2': corr_matrix.columns[j],
'correlation': corr_val
})
return {
'correlation_matrix': corr_matrix.to_dict(),
'strong_correlations': strong_corr,
'moderate_correlations': moderate_corr[:10], # Limit to top 10
'summary': {
'total_pairs': len(corr_matrix.columns) * (len(corr_matrix.columns) - 1) // 2,
'strong_correlations_count': len(strong_corr),
'moderate_correlations_count': len(moderate_corr)
}
}
def _distribution_analysis(self, data):
"""Analyze distributions of all variables"""
distributions = {}
for col in data.columns:
col_info = {'column': col, 'dtype': str(data[col].dtype)}
if data[col].dtype in ['object', 'category']:
# Categorical distribution
value_counts = data[col].value_counts()
col_info.update({
'type': 'categorical',
'unique_count': len(value_counts),
'top_values': value_counts.head(10).to_dict(),
'entropy': stats.entropy(value_counts.values) if len(value_counts) > 1 else 0,
'most_frequent_percentage': (value_counts.iloc[0] / len(data)) * 100 if len(value_counts) > 0 else 0
})
else:
# Numerical distribution
col_data = data[col].dropna()
if len(col_data) > 0:
col_info.update({
'type': 'numerical',
'mean': col_data.mean(),
'median': col_data.median(),
'std': col_data.std(),
'min': col_data.min(),
'max': col_data.max(),
'skewness': col_data.skew(),
'kurtosis': col_data.kurtosis(),
'outliers_iqr': self._count_outliers_iqr(col_data),
'normality_test': self._test_normality(col_data)
})
distributions[col] = col_info
return distributions
def _feature_insights(self, data):
"""Generate feature insights and recommendations"""
insights = []
# Identify potential target variables
for col in data.columns:
unique_count = data[col].nunique()
if unique_count == 2:
insights.append({
'type': 'potential_target',
'feature': col,
'insight': f'{col} is binary - potential target for classification'
})
elif unique_count < 10 and data[col].dtype in ['object', 'string']:
insights.append({
'type': 'low_cardinality',
'feature': col,
'insight': f'{col} has low cardinality ({unique_count}) - good for classification target'
})
# Identify high cardinality categorical features
for col in data.select_dtypes(include=['object']).columns:
unique_count = data[col].nunique()
if unique_count > 50:
insights.append({
'type': 'high_cardinality',
'feature': col,
'insight': f'{col} has high cardinality ({unique_count}) - consider target encoding or grouping'
})
# Identify constant or near-constant features
for col in data.columns:
unique_count = data[col].nunique()
if unique_count == 1:
insights.append({
'type': 'constant_feature',
'feature': col,
'insight': f'{col} is constant - consider removing'
})
elif unique_count / len(data) < 0.01:
insights.append({
'type': 'near_constant',
'feature': col,
'insight': f'{col} is near-constant ({unique_count} unique values) - low information content'
})
# Identify features with many missing values
missing_threshold = 0.5
for col in data.columns:
missing_pct = data[col].isnull().sum() / len(data)
if missing_pct > missing_threshold:
insights.append({
'type': 'high_missing',
'feature': col,
'insight': f'{col} has {missing_pct:.1%} missing values - consider imputation or removal'
})
return insights
def _target_analysis(self, data, target_column):
"""Analyze target variable and its relationships"""
target = data[target_column]
analysis = {}
# Target distribution
if target.dtype in ['object', 'category']:
# Classification target
value_counts = target.value_counts()
analysis['type'] = 'classification'
analysis['classes'] = value_counts.to_dict()
analysis['class_balance'] = {
'balanced': max(value_counts) / min(value_counts) < 3,
'ratio': max(value_counts) / min(value_counts)
}
else:
# Regression target
analysis['type'] = 'regression'
analysis['distribution'] = {
'mean': target.mean(),
'median': target.median(),
'std': target.std(),
'skewness': target.skew(),
'kurtosis': target.kurtosis()
}
# Feature-target relationships
feature_relationships = []
other_features = [col for col in data.columns if col != target_column]
for feature in other_features[:20]: # Limit to first 20 features
if data[feature].dtype in [np.number]:
if analysis['type'] == 'classification':
# ANOVA F-test for numeric feature vs categorical target
try:
groups = [data[data[target_column] == cls][feature].dropna()
for cls in target.unique()]
f_stat, p_val = stats.f_oneway(*groups)
feature_relationships.append({
'feature': feature,
'test': 'ANOVA',
'f_statistic': f_stat,
'p_value': p_val,
'significant': p_val < 0.05
})
except:
pass
else:
# Correlation for numeric feature vs numeric target
corr, p_val = stats.pearsonr(data[feature].dropna(),
target[data[feature].notna()])
feature_relationships.append({
'feature': feature,
'test': 'Correlation',
'correlation': corr,
'p_value': p_val,
'significant': p_val < 0.05
})
analysis['feature_relationships'] = feature_relationships
return analysis
def _data_quality_insights(self, data):
"""Generate data quality insights"""
insights = []
# Overall data quality score
total_cells = data.shape[0] * data.shape[1]
missing_cells = data.isnull().sum().sum()
quality_score = (total_cells - missing_cells) / total_cells
insights.append({
'type': 'overall_quality',
'score': quality_score,
'interpretation': 'excellent' if quality_score > 0.95 else
'good' if quality_score > 0.85 else
'fair' if quality_score > 0.7 else 'poor'
})
# Duplicate rows
duplicate_count = data.duplicated().sum()
if duplicate_count > 0:
insights.append({
'type': 'duplicates',
'count': duplicate_count,
'percentage': (duplicate_count / len(data)) * 100
})
return insights
def _count_outliers_iqr(self, series):
"""Count outliers using IQR method"""
Q1 = series.quantile(0.25)
Q3 = series.quantile(0.75)
IQR = Q3 - Q1
lower_bound = Q1 - 1.5 * IQR
upper_bound = Q3 + 1.5 * IQR
outliers = series[(series < lower_bound) | (series > upper_bound)]
return len(outliers)
def _test_normality(self, series, max_samples=5000):
"""Test normality using Shapiro-Wilk test"""
try:
if len(series) > max_samples:
series_sample = series.sample(max_samples)
else:
series_sample = series
stat, p_value = stats.shapiro(series_sample)
return {
'test_statistic': stat,
'p_value': p_value,
'is_normal': p_value > 0.05
}
except:
return {'test_statistic': None, 'p_value': None, 'is_normal': None}
def _get_visualization_recommendations(self, data):
"""Generate visualization recommendations based on data characteristics"""
recommendations = []
numeric_cols = data.select_dtypes(include=[np.number]).columns
categorical_cols = data.select_dtypes(include=['object', 'category']).columns
# Distribution plots
if len(numeric_cols) > 0:
recommendations.append({
'type': 'histogram',
'purpose': 'Show distribution of numeric variables',
'columns': list(numeric_cols[:5])
})
recommendations.append({
'type': 'box_plot',
'purpose': 'Identify outliers in numeric variables',
'columns': list(numeric_cols[:5])
})
# Categorical plots
if len(categorical_cols) > 0:
recommendations.append({
'type': 'bar_chart',
'purpose': 'Show frequency of categorical variables',
'columns': list(categorical_cols[:5])
})
# Relationship plots
if len(numeric_cols) >= 2:
recommendations.append({
'type': 'correlation_heatmap',
'purpose': 'Show correlations between numeric variables',
'columns': list(numeric_cols)
})
recommendations.append({
'type': 'scatter_plot',
'purpose': 'Show relationships between numeric variables',
'columns': list(numeric_cols[:4])
})
# Mixed plots
if len(numeric_cols) > 0 and len(categorical_cols) > 0:
recommendations.append({
'type': 'grouped_box_plot',
'purpose': 'Show numeric distributions by categorical groups',
'numeric_columns': list(numeric_cols[:3]),
'categorical_columns': list(categorical_cols[:2])
})
return recommendations
def generate_insights_summary(self, analysis_results):
"""Generate a human-readable summary of key insights"""
if analysis_results['status'] != 'success':
return "Analysis failed"
analysis = analysis_results['analysis']
insights = []
# Basic stats insights
basic_stats = analysis['basic_stats']
insights.append(f"Dataset contains {basic_stats['shape'][0]:,} rows and {basic_stats['shape'][1]} columns")
# Missing values insight
missing_total = sum(basic_stats['missing_values'].values())
if missing_total > 0:
insights.append(f"Found {missing_total:,} missing values across the dataset")
# Correlation insights
if 'correlations' in analysis and 'strong_correlations' in analysis['correlations']:
strong_corr_count = len(analysis['correlations']['strong_correlations'])
if strong_corr_count > 0:
insights.append(f"Identified {strong_corr_count} strong correlations between variables")
# Feature insights
if 'feature_insights' in analysis:
feature_insights = analysis['feature_insights']
potential_targets = [i for i in feature_insights if i['type'] == 'potential_target']
if potential_targets:
insights.append(f"Found {len(potential_targets)} potential target variables for machine learning")
return insights