AI_Data_Analyst / app.py
Hanan-Tabak's picture
Update app.py
484434f verified
import gradio as gr
import pandas as pd
import numpy as np
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.impute import SimpleImputer
from scipy import stats
import seaborn as sns
from sklearn.feature_selection import f_regression, mutual_info_classif
import warnings
warnings.filterwarnings('ignore')
class AIDataAnalyst:
def __init__(self):
self.data = None
self.preprocessed_data = None
self.numerical_cols = []
self.categorical_cols = []
self.analysis_results = {}
def detect_column_types(self, df):
"""Detect numerical and categorical columns"""
numerical_cols = df.select_dtypes(include=[np.number]).columns.tolist()
categorical_cols = df.select_dtypes(include=['object', 'category']).columns.tolist()
# Check for numerical columns with low cardinality that might be categorical
for col in numerical_cols[:]:
if df[col].nunique() <= 10 and df[col].nunique() < len(df) * 0.05:
categorical_cols.append(col)
numerical_cols.remove(col)
return numerical_cols, categorical_cols
def preprocess_data(self, df):
"""Comprehensive data preprocessing"""
df_clean = df.copy()
# Detect column types
self.numerical_cols, self.categorical_cols = self.detect_column_types(df_clean)
# Handle missing values for numerical columns
if self.numerical_cols:
num_imputer = SimpleImputer(strategy='median')
df_clean[self.numerical_cols] = num_imputer.fit_transform(df_clean[self.numerical_cols])
# Handle missing values for categorical columns
if self.categorical_cols:
cat_imputer = SimpleImputer(strategy='most_frequent')
df_clean[self.categorical_cols] = cat_imputer.fit_transform(df_clean[self.categorical_cols])
# Remove constant columns
constant_cols = [col for col in df_clean.columns if df_clean[col].nunique() <= 1]
if constant_cols:
df_clean = df_clean.drop(columns=constant_cols)
self.numerical_cols = [col for col in self.numerical_cols if col not in constant_cols]
self.categorical_cols = [col for col in self.categorical_cols if col not in constant_cols]
self.preprocessed_data = df_clean
return df_clean
def analyze_numerical_features(self, df):
"""Comprehensive numerical feature analysis"""
analysis = {}
for col in self.numerical_cols:
try:
# Basic statistics
basic_stats = {
'count': len(df[col]),
'mean': df[col].mean(),
'median': df[col].median(),
'std': df[col].std(),
'min': df[col].min(),
'max': df[col].max(),
'q1': df[col].quantile(0.25),
'q3': df[col].quantile(0.75),
'iqr': df[col].quantile(0.75) - df[col].quantile(0.25),
'skewness': df[col].skew(),
'kurtosis': df[col].kurtosis()
}
# Outlier detection
z_scores = np.abs(stats.zscore(df[col]))
z_score_outliers = len(df[z_scores > 3])
Q1 = df[col].quantile(0.25)
Q3 = df[col].quantile(0.75)
IQR = Q3 - Q1
iqr_outliers = len(df[(df[col] < (Q1 - 1.5 * IQR)) | (df[col] > (Q3 + 1.5 * IQR))])
# Normality tests (for smaller datasets)
shapiro_pvalue = None
dagostino_pvalue = None
if len(df[col]) < 5000:
try:
shapiro_pvalue = stats.shapiro(df[col])[1]
except:
shapiro_pvalue = None
try:
dagostino_pvalue = stats.normaltest(df[col])[1]
except:
dagostino_pvalue = None
analysis[col] = {
'basic_stats': basic_stats,
'outliers': {
'z_score_outliers': z_score_outliers,
'iqr_outliers': iqr_outliers
},
'normality': {
'shapiro_pvalue': shapiro_pvalue,
'dagostino_pvalue': dagostino_pvalue
}
}
except Exception as e:
print(f"Error analyzing numerical column {col}: {str(e)}")
continue
return analysis
def analyze_categorical_features(self, df):
"""Comprehensive categorical feature analysis"""
analysis = {}
for col in self.categorical_cols:
try:
value_counts = df[col].value_counts()
mode_value = df[col].mode()
# Calculate rare categories
value_percentages = value_counts / len(df[col])
categories_less_than_1pct = len(value_percentages[value_percentages < 0.01])
categories_less_than_5pct = len(value_percentages[value_percentages < 0.05])
analysis[col] = {
'basic_stats': {
'count': len(df[col]),
'unique_count': df[col].nunique(),
'mode': mode_value.iloc[0] if not mode_value.empty else None,
'mode_frequency': value_counts.iloc[0] if not value_counts.empty else 0,
'mode_percentage': (value_counts.iloc[0] / len(df[col]) * 100) if not value_counts.empty else 0,
'entropy': stats.entropy(value_counts / len(df[col])) if len(value_counts) > 1 else 0,
'categories_less_than_1pct': categories_less_than_1pct,
'categories_less_than_5pct': categories_less_than_5pct
},
'value_distribution': value_counts.head(10).to_dict() # Top 10 only
}
except Exception as e:
print(f"Error analyzing categorical column {col}: {str(e)}")
continue
return analysis
def analyze_relationships(self, df):
"""Analyze relationships between features"""
relationships = {}
try:
# Numerical-numerical correlations
if len(self.numerical_cols) > 1:
corr_matrix = df[self.numerical_cols].corr()
relationships['numerical_correlations'] = {
'pearson': corr_matrix,
'top_correlations': self.get_top_correlations(corr_matrix)
}
except Exception as e:
print(f"Error in numerical correlations: {str(e)}")
try:
# Categorical-categorical associations
if len(self.categorical_cols) > 1:
relationships['categorical_associations'] = self.analyze_categorical_associations(df)
except Exception as e:
print(f"Error in categorical associations: {str(e)}")
try:
# Numerical-categorical relationships
if self.numerical_cols and self.categorical_cols:
relationships['mixed_relationships'] = self.analyze_mixed_relationships(df)
except Exception as e:
print(f"Error in mixed relationships: {str(e)}")
return relationships
def get_top_correlations(self, corr_matrix, n=10):
"""Get top n correlations from correlation matrix"""
corr_pairs = []
try:
for i in range(len(corr_matrix.columns)):
for j in range(i+1, len(corr_matrix.columns)):
corr_pairs.append({
'features': (corr_matrix.columns[i], corr_matrix.columns[j]),
'correlation': corr_matrix.iloc[i, j]
})
# Sort by absolute correlation value
corr_pairs.sort(key=lambda x: abs(x['correlation']), reverse=True)
return corr_pairs[:n]
except Exception as e:
print(f"Error getting top correlations: {str(e)}")
return []
def analyze_categorical_associations(self, df):
"""Analyze associations between categorical variables using CramΓ©r's V"""
from scipy.stats import chi2_contingency
associations = {}
try:
for i, col1 in enumerate(self.categorical_cols):
for j, col2 in enumerate(self.categorical_cols[i+1:], i+1):
try:
contingency_table = pd.crosstab(df[col1], df[col2])
if contingency_table.size > 0:
chi2, p_value, _, _ = chi2_contingency(contingency_table)
# Calculate CramΓ©r's V
n = contingency_table.sum().sum()
phi2 = chi2 / n
r, k = contingency_table.shape
cramers_v = np.sqrt(phi2 / min((k-1), (r-1)))
associations[f"{col1}_vs_{col2}"] = {
'chi2_statistic': chi2,
'p_value': p_value,
'cramers_v': cramers_v,
'association_strength': self.interpret_cramers_v(cramers_v)
}
except Exception as e:
print(f"Error analyzing association {col1} vs {col2}: {str(e)}")
continue
except Exception as e:
print(f"Error in categorical associations analysis: {str(e)}")
return associations
def interpret_cramers_v(self, v):
"""Interpret CramΓ©r's V value"""
if v < 0.1: return "Very Weak"
elif v < 0.2: return "Weak"
elif v < 0.4: return "Moderate"
elif v < 0.6: return "Relatively Strong"
else: return "Strong"
def analyze_mixed_relationships(self, df):
"""Analyze relationships between numerical and categorical variables"""
relationships = {}
try:
for num_col in self.numerical_cols:
for cat_col in self.categorical_cols:
# ANOVA for numerical vs categorical
groups = [group.values for name, group in df.groupby(cat_col)[num_col]]
if len(groups) > 1:
try:
f_stat, p_value = stats.f_oneway(*groups)
relationships[f"{num_col}_by_{cat_col}"] = {
'anova_f_statistic': f_stat,
'anova_p_value': p_value,
'group_means': df.groupby(cat_col)[num_col].mean().to_dict(),
'group_std': df.groupby(cat_col)[num_col].std().to_dict()
}
except Exception as e:
print(f"Error in ANOVA for {num_col} by {cat_col}: {str(e)}")
continue
except Exception as e:
print(f"Error in mixed relationships analysis: {str(e)}")
return relationships
def perform_comprehensive_analysis(self, df):
"""Perform comprehensive data analysis"""
print("Starting comprehensive analysis...")
# Store basic dataset info
self.analysis_results['dataset_info'] = {
'original_shape': df.shape,
'preprocessed_shape': self.preprocessed_data.shape,
'numerical_columns_count': len(self.numerical_cols),
'categorical_columns_count': len(self.categorical_cols),
'total_memory_usage_mb': df.memory_usage(deep=True).sum() / 1024**2
}
print("Analyzing numerical features...")
# Numerical feature analysis
self.analysis_results['numerical_analysis'] = self.analyze_numerical_features(self.preprocessed_data)
print("Analyzing categorical features...")
# Categorical feature analysis
self.analysis_results['categorical_analysis'] = self.analyze_categorical_features(self.preprocessed_data)
print("Analyzing relationships...")
# Relationship analysis
self.analysis_results['relationship_analysis'] = self.analyze_relationships(self.preprocessed_data)
print("Generating insights...")
# Generate insights
self.analysis_results['key_insights'] = self.generate_insights()
return self.analysis_results
def generate_insights(self):
"""Generate key insights from the analysis"""
insights = []
# Data quality insights
total_columns = len(self.numerical_cols) + len(self.categorical_cols)
insights.append(f"πŸ“Š Dataset Overview: {self.preprocessed_data.shape[0]} rows Γ— {total_columns} columns "
f"({len(self.numerical_cols)} numerical, {len(self.categorical_cols)} categorical)")
# Numerical insights
if self.numerical_cols:
high_skew_cols = [col for col, analysis in self.analysis_results['numerical_analysis'].items()
if abs(analysis['basic_stats']['skewness']) > 2]
if high_skew_cols:
insights.append(f"⚑ High Skewness Detected: {', '.join(high_skew_cols)} show significant skewness "
"(|skewness| > 2), suggesting non-normal distributions")
# Outlier insights
outlier_cols = []
for col, analysis in self.analysis_results['numerical_analysis'].items():
if analysis['outliers']['z_score_outliers'] > len(self.preprocessed_data) * 0.05:
outlier_cols.append(col)
if outlier_cols:
insights.append(f"🚨 Significant Outliers: {', '.join(outlier_cols)} contain more than 5% outliers")
# Categorical insights
if self.categorical_cols:
high_cardinality_cols = [col for col, analysis in self.analysis_results['categorical_analysis'].items()
if analysis['basic_stats']['unique_count'] > 50]
if high_cardinality_cols:
insights.append(f"🎯 High Cardinality: {', '.join(high_cardinality_cols)} have many unique values "
"which might need feature engineering")
# Rare categories insights
high_rare_cats_cols = []
for col, analysis in self.analysis_results['categorical_analysis'].items():
if analysis['basic_stats']['categories_less_than_5pct'] > 5:
high_rare_cats_cols.append(col)
if high_rare_cats_cols:
insights.append(f"πŸ” Many Rare Categories: {', '.join(high_rare_cats_cols)} have multiple categories with <5% frequency")
# Relationship insights
if 'relationship_analysis' in self.analysis_results:
rel_analysis = self.analysis_results['relationship_analysis']
if 'numerical_correlations' in rel_analysis:
top_corr = rel_analysis['numerical_correlations']['top_correlations']
strong_corrs = [corr for corr in top_corr if abs(corr['correlation']) > 0.7]
if strong_corrs:
insights.append("πŸ”— Strong Correlations: " + "; ".join(
[f"{corr['features'][0]} & {corr['features'][1]} (r={corr['correlation']:.3f})"
for corr in strong_corrs[:3]]
))
if 'mixed_relationships' in rel_analysis:
sig_anova = []
for key, analysis in rel_analysis['mixed_relationships'].items():
if analysis['anova_p_value'] < 0.05:
sig_anova.append(key.replace('_by_', ' varies significantly by '))
if sig_anova:
insights.append("πŸ“ˆ Significant Group Differences: " + "; ".join(sig_anova[:2]))
return insights
def format_analysis_results(analysis_results):
"""Format analysis results for display"""
if not analysis_results:
return "No analysis performed yet."
output = []
# Dataset Information
dataset_info = analysis_results['dataset_info']
output.append("## πŸ“Š Dataset Overview")
output.append(f"- **Original Dimensions**: {dataset_info['original_shape'][0]} rows Γ— {dataset_info['original_shape'][1]} columns")
output.append(f"- **After Preprocessing**: {dataset_info['preprocessed_shape'][0]} rows Γ— {dataset_info['preprocessed_shape'][1]} columns")
output.append(f"- **Numerical Features**: {dataset_info['numerical_columns_count']}")
output.append(f"- **Categorical Features**: {dataset_info['categorical_columns_count']}")
output.append(f"- **Memory Usage**: {dataset_info['total_memory_usage_mb']:.2f} MB")
# Key Insights
output.append("\n## πŸ” Key Insights")
for insight in analysis_results.get('key_insights', []):
output.append(f"- {insight}")
# Numerical Analysis Summary
if analysis_results.get('numerical_analysis'):
output.append("\n## πŸ”’ Numerical Features Analysis")
for col, analysis in analysis_results['numerical_analysis'].items():
stats = analysis['basic_stats']
output.append(f"\n### {col}")
output.append(f"- **Distribution**: Mean={stats['mean']:.3f}, Median={stats['median']:.3f}, Std={stats['std']:.3f}")
output.append(f"- **Range**: [{stats['min']:.3f}, {stats['max']:.3f}] | IQR: {stats['iqr']:.3f}")
output.append(f"- **Shape**: Skewness={stats['skewness']:.3f}, Kurtosis={stats['kurtosis']:.3f}")
output.append(f"- **Outliers**: {analysis['outliers']['z_score_outliers']} (Z-score), {analysis['outliers']['iqr_outliers']} (IQR)")
# Categorical Analysis Summary
if analysis_results.get('categorical_analysis'):
output.append("\n## πŸ“ˆ Categorical Features Analysis")
for col, analysis in analysis_results['categorical_analysis'].items():
stats = analysis['basic_stats']
output.append(f"\n### {col}")
output.append(f"- **Cardinality**: {stats['unique_count']} unique values")
output.append(f"- **Most Frequent**: '{stats['mode']}' ({stats['mode_percentage']:.1f}%)")
output.append(f"- **Entropy**: {stats['entropy']:.3f}")
output.append(f"- **Rare Categories**: {stats['categories_less_than_5pct']} with <5% frequency")
# Relationship Analysis Summary
if analysis_results.get('relationship_analysis'):
rel_analysis = analysis_results['relationship_analysis']
output.append("\n## πŸ”— Feature Relationships")
if 'numerical_correlations' in rel_analysis:
output.append("\n### Top Numerical Correlations")
top_corrs = rel_analysis['numerical_correlations']['top_correlations'][:5]
for corr in top_corrs:
strength = "Strong" if abs(corr['correlation']) > 0.7 else "Moderate" if abs(corr['correlation']) > 0.3 else "Weak"
output.append(f"- {corr['features'][0]} ↔ {corr['features'][1]}: {corr['correlation']:.3f} ({strength})")
if 'mixed_relationships' in rel_analysis:
output.append("\n### Significant Numerical-Categorical Relationships (ANOVA p < 0.05)")
sig_count = 0
for key, analysis in rel_analysis['mixed_relationships'].items():
if analysis['anova_p_value'] < 0.05 and sig_count < 3:
num_col, cat_col = key.split('_by_')
output.append(f"- {num_col} varies significantly by {cat_col} (p={analysis['anova_p_value']:.4f})")
sig_count += 1
return "\n".join(output)
def analyze_data(file, use_sample=False):
"""Main analysis function"""
try:
# Read file
if file.name.endswith('.csv'):
df = pd.read_csv(file)
elif file.name.endswith(('.xlsx', '.xls')):
df = pd.read_excel(file)
else:
return "❌ Unsupported file format. Please upload CSV or Excel file."
# Sample data if requested
if use_sample and len(df) > 1000:
df = df.sample(n=1000, random_state=42)
# Initialize analyzer
analyst = AIDataAnalyst()
# Preprocess data
processed_df = analyst.preprocess_data(df)
# Perform comprehensive analysis
analysis_results = analyst.perform_comprehensive_analysis(df)
# Format results
return format_analysis_results(analysis_results)
except Exception as e:
return f"❌ Error during analysis: {str(e)}"
# Create Gradio interface with corrected Gradio 6.0 syntax
with gr.Blocks(title="AI Data Analyst") as demo:
with gr.Row():
with gr.Column(scale=1):
# Add your image
try:
gr.Image("data_analyst.png",
label="AI Data Analyst",
show_label=False,
height=300,
container=False)
except:
pass
with gr.Column(scale=3):
gr.Markdown("""
# 🧠 AI Data Analyst
### Comprehensive Tabular Data Analysis
Upload your dataset (CSV or Excel) and get detailed statistical analysis, feature insights, and relationship discovery.
**Supported analyses:**
- πŸ“Š Dataset overview and data quality assessment
- πŸ”’ Numerical feature statistics and outlier detection
- πŸ“ˆ Categorical feature analysis and cardinality assessment
- πŸ”— Feature relationships and correlation analysis
- ⚑ Automated insights and pattern detection
""")
with gr.Row():
with gr.Column():
file_input = gr.File(
label="πŸ“ Upload Dataset",
file_types=[".csv", ".xlsx", ".xls"],
type="filepath"
)
sample_checkbox = gr.Checkbox(
label="Use sample (first 1000 rows) for large datasets",
value=False
)
analyze_btn = gr.Button("πŸš€ Analyze Data", variant="primary")
with gr.Column():
output_text = gr.Markdown(
label="πŸ“‹ Analysis Results",
value="Upload a dataset to begin analysis..."
)
# Set up event handling
analyze_btn.click(
fn=analyze_data,
inputs=[file_input, sample_checkbox],
outputs=output_text
)
if __name__ == "__main__":
demo.launch(
server_name="0.0.0.0",
mcp_server=True,
footer_links=["api", "gradio", "settings"]
)