Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import pandas as pd | |
| import numpy as np | |
| from sklearn.preprocessing import StandardScaler, LabelEncoder | |
| from sklearn.impute import SimpleImputer | |
| from scipy import stats | |
| import seaborn as sns | |
| from sklearn.feature_selection import f_regression, mutual_info_classif | |
| import warnings | |
| warnings.filterwarnings('ignore') | |
| class AIDataAnalyst: | |
| def __init__(self): | |
| self.data = None | |
| self.preprocessed_data = None | |
| self.numerical_cols = [] | |
| self.categorical_cols = [] | |
| self.analysis_results = {} | |
| def detect_column_types(self, df): | |
| """Detect numerical and categorical columns""" | |
| numerical_cols = df.select_dtypes(include=[np.number]).columns.tolist() | |
| categorical_cols = df.select_dtypes(include=['object', 'category']).columns.tolist() | |
| # Check for numerical columns with low cardinality that might be categorical | |
| for col in numerical_cols[:]: | |
| if df[col].nunique() <= 10 and df[col].nunique() < len(df) * 0.05: | |
| categorical_cols.append(col) | |
| numerical_cols.remove(col) | |
| return numerical_cols, categorical_cols | |
| def preprocess_data(self, df): | |
| """Comprehensive data preprocessing""" | |
| df_clean = df.copy() | |
| # Detect column types | |
| self.numerical_cols, self.categorical_cols = self.detect_column_types(df_clean) | |
| # Handle missing values for numerical columns | |
| if self.numerical_cols: | |
| num_imputer = SimpleImputer(strategy='median') | |
| df_clean[self.numerical_cols] = num_imputer.fit_transform(df_clean[self.numerical_cols]) | |
| # Handle missing values for categorical columns | |
| if self.categorical_cols: | |
| cat_imputer = SimpleImputer(strategy='most_frequent') | |
| df_clean[self.categorical_cols] = cat_imputer.fit_transform(df_clean[self.categorical_cols]) | |
| # Remove constant columns | |
| constant_cols = [col for col in df_clean.columns if df_clean[col].nunique() <= 1] | |
| if constant_cols: | |
| df_clean = df_clean.drop(columns=constant_cols) | |
| self.numerical_cols = [col for col in self.numerical_cols if col not in constant_cols] | |
| self.categorical_cols = [col for col in self.categorical_cols if col not in constant_cols] | |
| self.preprocessed_data = df_clean | |
| return df_clean | |
| def analyze_numerical_features(self, df): | |
| """Comprehensive numerical feature analysis""" | |
| analysis = {} | |
| for col in self.numerical_cols: | |
| try: | |
| # Basic statistics | |
| basic_stats = { | |
| 'count': len(df[col]), | |
| 'mean': df[col].mean(), | |
| 'median': df[col].median(), | |
| 'std': df[col].std(), | |
| 'min': df[col].min(), | |
| 'max': df[col].max(), | |
| 'q1': df[col].quantile(0.25), | |
| 'q3': df[col].quantile(0.75), | |
| 'iqr': df[col].quantile(0.75) - df[col].quantile(0.25), | |
| 'skewness': df[col].skew(), | |
| 'kurtosis': df[col].kurtosis() | |
| } | |
| # Outlier detection | |
| z_scores = np.abs(stats.zscore(df[col])) | |
| z_score_outliers = len(df[z_scores > 3]) | |
| Q1 = df[col].quantile(0.25) | |
| Q3 = df[col].quantile(0.75) | |
| IQR = Q3 - Q1 | |
| iqr_outliers = len(df[(df[col] < (Q1 - 1.5 * IQR)) | (df[col] > (Q3 + 1.5 * IQR))]) | |
| # Normality tests (for smaller datasets) | |
| shapiro_pvalue = None | |
| dagostino_pvalue = None | |
| if len(df[col]) < 5000: | |
| try: | |
| shapiro_pvalue = stats.shapiro(df[col])[1] | |
| except: | |
| shapiro_pvalue = None | |
| try: | |
| dagostino_pvalue = stats.normaltest(df[col])[1] | |
| except: | |
| dagostino_pvalue = None | |
| analysis[col] = { | |
| 'basic_stats': basic_stats, | |
| 'outliers': { | |
| 'z_score_outliers': z_score_outliers, | |
| 'iqr_outliers': iqr_outliers | |
| }, | |
| 'normality': { | |
| 'shapiro_pvalue': shapiro_pvalue, | |
| 'dagostino_pvalue': dagostino_pvalue | |
| } | |
| } | |
| except Exception as e: | |
| print(f"Error analyzing numerical column {col}: {str(e)}") | |
| continue | |
| return analysis | |
| def analyze_categorical_features(self, df): | |
| """Comprehensive categorical feature analysis""" | |
| analysis = {} | |
| for col in self.categorical_cols: | |
| try: | |
| value_counts = df[col].value_counts() | |
| mode_value = df[col].mode() | |
| # Calculate rare categories | |
| value_percentages = value_counts / len(df[col]) | |
| categories_less_than_1pct = len(value_percentages[value_percentages < 0.01]) | |
| categories_less_than_5pct = len(value_percentages[value_percentages < 0.05]) | |
| analysis[col] = { | |
| 'basic_stats': { | |
| 'count': len(df[col]), | |
| 'unique_count': df[col].nunique(), | |
| 'mode': mode_value.iloc[0] if not mode_value.empty else None, | |
| 'mode_frequency': value_counts.iloc[0] if not value_counts.empty else 0, | |
| 'mode_percentage': (value_counts.iloc[0] / len(df[col]) * 100) if not value_counts.empty else 0, | |
| 'entropy': stats.entropy(value_counts / len(df[col])) if len(value_counts) > 1 else 0, | |
| 'categories_less_than_1pct': categories_less_than_1pct, | |
| 'categories_less_than_5pct': categories_less_than_5pct | |
| }, | |
| 'value_distribution': value_counts.head(10).to_dict() # Top 10 only | |
| } | |
| except Exception as e: | |
| print(f"Error analyzing categorical column {col}: {str(e)}") | |
| continue | |
| return analysis | |
| def analyze_relationships(self, df): | |
| """Analyze relationships between features""" | |
| relationships = {} | |
| try: | |
| # Numerical-numerical correlations | |
| if len(self.numerical_cols) > 1: | |
| corr_matrix = df[self.numerical_cols].corr() | |
| relationships['numerical_correlations'] = { | |
| 'pearson': corr_matrix, | |
| 'top_correlations': self.get_top_correlations(corr_matrix) | |
| } | |
| except Exception as e: | |
| print(f"Error in numerical correlations: {str(e)}") | |
| try: | |
| # Categorical-categorical associations | |
| if len(self.categorical_cols) > 1: | |
| relationships['categorical_associations'] = self.analyze_categorical_associations(df) | |
| except Exception as e: | |
| print(f"Error in categorical associations: {str(e)}") | |
| try: | |
| # Numerical-categorical relationships | |
| if self.numerical_cols and self.categorical_cols: | |
| relationships['mixed_relationships'] = self.analyze_mixed_relationships(df) | |
| except Exception as e: | |
| print(f"Error in mixed relationships: {str(e)}") | |
| return relationships | |
| def get_top_correlations(self, corr_matrix, n=10): | |
| """Get top n correlations from correlation matrix""" | |
| corr_pairs = [] | |
| try: | |
| for i in range(len(corr_matrix.columns)): | |
| for j in range(i+1, len(corr_matrix.columns)): | |
| corr_pairs.append({ | |
| 'features': (corr_matrix.columns[i], corr_matrix.columns[j]), | |
| 'correlation': corr_matrix.iloc[i, j] | |
| }) | |
| # Sort by absolute correlation value | |
| corr_pairs.sort(key=lambda x: abs(x['correlation']), reverse=True) | |
| return corr_pairs[:n] | |
| except Exception as e: | |
| print(f"Error getting top correlations: {str(e)}") | |
| return [] | |
| def analyze_categorical_associations(self, df): | |
| """Analyze associations between categorical variables using CramΓ©r's V""" | |
| from scipy.stats import chi2_contingency | |
| associations = {} | |
| try: | |
| for i, col1 in enumerate(self.categorical_cols): | |
| for j, col2 in enumerate(self.categorical_cols[i+1:], i+1): | |
| try: | |
| contingency_table = pd.crosstab(df[col1], df[col2]) | |
| if contingency_table.size > 0: | |
| chi2, p_value, _, _ = chi2_contingency(contingency_table) | |
| # Calculate CramΓ©r's V | |
| n = contingency_table.sum().sum() | |
| phi2 = chi2 / n | |
| r, k = contingency_table.shape | |
| cramers_v = np.sqrt(phi2 / min((k-1), (r-1))) | |
| associations[f"{col1}_vs_{col2}"] = { | |
| 'chi2_statistic': chi2, | |
| 'p_value': p_value, | |
| 'cramers_v': cramers_v, | |
| 'association_strength': self.interpret_cramers_v(cramers_v) | |
| } | |
| except Exception as e: | |
| print(f"Error analyzing association {col1} vs {col2}: {str(e)}") | |
| continue | |
| except Exception as e: | |
| print(f"Error in categorical associations analysis: {str(e)}") | |
| return associations | |
| def interpret_cramers_v(self, v): | |
| """Interpret CramΓ©r's V value""" | |
| if v < 0.1: return "Very Weak" | |
| elif v < 0.2: return "Weak" | |
| elif v < 0.4: return "Moderate" | |
| elif v < 0.6: return "Relatively Strong" | |
| else: return "Strong" | |
| def analyze_mixed_relationships(self, df): | |
| """Analyze relationships between numerical and categorical variables""" | |
| relationships = {} | |
| try: | |
| for num_col in self.numerical_cols: | |
| for cat_col in self.categorical_cols: | |
| # ANOVA for numerical vs categorical | |
| groups = [group.values for name, group in df.groupby(cat_col)[num_col]] | |
| if len(groups) > 1: | |
| try: | |
| f_stat, p_value = stats.f_oneway(*groups) | |
| relationships[f"{num_col}_by_{cat_col}"] = { | |
| 'anova_f_statistic': f_stat, | |
| 'anova_p_value': p_value, | |
| 'group_means': df.groupby(cat_col)[num_col].mean().to_dict(), | |
| 'group_std': df.groupby(cat_col)[num_col].std().to_dict() | |
| } | |
| except Exception as e: | |
| print(f"Error in ANOVA for {num_col} by {cat_col}: {str(e)}") | |
| continue | |
| except Exception as e: | |
| print(f"Error in mixed relationships analysis: {str(e)}") | |
| return relationships | |
| def perform_comprehensive_analysis(self, df): | |
| """Perform comprehensive data analysis""" | |
| print("Starting comprehensive analysis...") | |
| # Store basic dataset info | |
| self.analysis_results['dataset_info'] = { | |
| 'original_shape': df.shape, | |
| 'preprocessed_shape': self.preprocessed_data.shape, | |
| 'numerical_columns_count': len(self.numerical_cols), | |
| 'categorical_columns_count': len(self.categorical_cols), | |
| 'total_memory_usage_mb': df.memory_usage(deep=True).sum() / 1024**2 | |
| } | |
| print("Analyzing numerical features...") | |
| # Numerical feature analysis | |
| self.analysis_results['numerical_analysis'] = self.analyze_numerical_features(self.preprocessed_data) | |
| print("Analyzing categorical features...") | |
| # Categorical feature analysis | |
| self.analysis_results['categorical_analysis'] = self.analyze_categorical_features(self.preprocessed_data) | |
| print("Analyzing relationships...") | |
| # Relationship analysis | |
| self.analysis_results['relationship_analysis'] = self.analyze_relationships(self.preprocessed_data) | |
| print("Generating insights...") | |
| # Generate insights | |
| self.analysis_results['key_insights'] = self.generate_insights() | |
| return self.analysis_results | |
| def generate_insights(self): | |
| """Generate key insights from the analysis""" | |
| insights = [] | |
| # Data quality insights | |
| total_columns = len(self.numerical_cols) + len(self.categorical_cols) | |
| insights.append(f"π Dataset Overview: {self.preprocessed_data.shape[0]} rows Γ {total_columns} columns " | |
| f"({len(self.numerical_cols)} numerical, {len(self.categorical_cols)} categorical)") | |
| # Numerical insights | |
| if self.numerical_cols: | |
| high_skew_cols = [col for col, analysis in self.analysis_results['numerical_analysis'].items() | |
| if abs(analysis['basic_stats']['skewness']) > 2] | |
| if high_skew_cols: | |
| insights.append(f"β‘ High Skewness Detected: {', '.join(high_skew_cols)} show significant skewness " | |
| "(|skewness| > 2), suggesting non-normal distributions") | |
| # Outlier insights | |
| outlier_cols = [] | |
| for col, analysis in self.analysis_results['numerical_analysis'].items(): | |
| if analysis['outliers']['z_score_outliers'] > len(self.preprocessed_data) * 0.05: | |
| outlier_cols.append(col) | |
| if outlier_cols: | |
| insights.append(f"π¨ Significant Outliers: {', '.join(outlier_cols)} contain more than 5% outliers") | |
| # Categorical insights | |
| if self.categorical_cols: | |
| high_cardinality_cols = [col for col, analysis in self.analysis_results['categorical_analysis'].items() | |
| if analysis['basic_stats']['unique_count'] > 50] | |
| if high_cardinality_cols: | |
| insights.append(f"π― High Cardinality: {', '.join(high_cardinality_cols)} have many unique values " | |
| "which might need feature engineering") | |
| # Rare categories insights | |
| high_rare_cats_cols = [] | |
| for col, analysis in self.analysis_results['categorical_analysis'].items(): | |
| if analysis['basic_stats']['categories_less_than_5pct'] > 5: | |
| high_rare_cats_cols.append(col) | |
| if high_rare_cats_cols: | |
| insights.append(f"π Many Rare Categories: {', '.join(high_rare_cats_cols)} have multiple categories with <5% frequency") | |
| # Relationship insights | |
| if 'relationship_analysis' in self.analysis_results: | |
| rel_analysis = self.analysis_results['relationship_analysis'] | |
| if 'numerical_correlations' in rel_analysis: | |
| top_corr = rel_analysis['numerical_correlations']['top_correlations'] | |
| strong_corrs = [corr for corr in top_corr if abs(corr['correlation']) > 0.7] | |
| if strong_corrs: | |
| insights.append("π Strong Correlations: " + "; ".join( | |
| [f"{corr['features'][0]} & {corr['features'][1]} (r={corr['correlation']:.3f})" | |
| for corr in strong_corrs[:3]] | |
| )) | |
| if 'mixed_relationships' in rel_analysis: | |
| sig_anova = [] | |
| for key, analysis in rel_analysis['mixed_relationships'].items(): | |
| if analysis['anova_p_value'] < 0.05: | |
| sig_anova.append(key.replace('_by_', ' varies significantly by ')) | |
| if sig_anova: | |
| insights.append("π Significant Group Differences: " + "; ".join(sig_anova[:2])) | |
| return insights | |
| def format_analysis_results(analysis_results): | |
| """Format analysis results for display""" | |
| if not analysis_results: | |
| return "No analysis performed yet." | |
| output = [] | |
| # Dataset Information | |
| dataset_info = analysis_results['dataset_info'] | |
| output.append("## π Dataset Overview") | |
| output.append(f"- **Original Dimensions**: {dataset_info['original_shape'][0]} rows Γ {dataset_info['original_shape'][1]} columns") | |
| output.append(f"- **After Preprocessing**: {dataset_info['preprocessed_shape'][0]} rows Γ {dataset_info['preprocessed_shape'][1]} columns") | |
| output.append(f"- **Numerical Features**: {dataset_info['numerical_columns_count']}") | |
| output.append(f"- **Categorical Features**: {dataset_info['categorical_columns_count']}") | |
| output.append(f"- **Memory Usage**: {dataset_info['total_memory_usage_mb']:.2f} MB") | |
| # Key Insights | |
| output.append("\n## π Key Insights") | |
| for insight in analysis_results.get('key_insights', []): | |
| output.append(f"- {insight}") | |
| # Numerical Analysis Summary | |
| if analysis_results.get('numerical_analysis'): | |
| output.append("\n## π’ Numerical Features Analysis") | |
| for col, analysis in analysis_results['numerical_analysis'].items(): | |
| stats = analysis['basic_stats'] | |
| output.append(f"\n### {col}") | |
| output.append(f"- **Distribution**: Mean={stats['mean']:.3f}, Median={stats['median']:.3f}, Std={stats['std']:.3f}") | |
| output.append(f"- **Range**: [{stats['min']:.3f}, {stats['max']:.3f}] | IQR: {stats['iqr']:.3f}") | |
| output.append(f"- **Shape**: Skewness={stats['skewness']:.3f}, Kurtosis={stats['kurtosis']:.3f}") | |
| output.append(f"- **Outliers**: {analysis['outliers']['z_score_outliers']} (Z-score), {analysis['outliers']['iqr_outliers']} (IQR)") | |
| # Categorical Analysis Summary | |
| if analysis_results.get('categorical_analysis'): | |
| output.append("\n## π Categorical Features Analysis") | |
| for col, analysis in analysis_results['categorical_analysis'].items(): | |
| stats = analysis['basic_stats'] | |
| output.append(f"\n### {col}") | |
| output.append(f"- **Cardinality**: {stats['unique_count']} unique values") | |
| output.append(f"- **Most Frequent**: '{stats['mode']}' ({stats['mode_percentage']:.1f}%)") | |
| output.append(f"- **Entropy**: {stats['entropy']:.3f}") | |
| output.append(f"- **Rare Categories**: {stats['categories_less_than_5pct']} with <5% frequency") | |
| # Relationship Analysis Summary | |
| if analysis_results.get('relationship_analysis'): | |
| rel_analysis = analysis_results['relationship_analysis'] | |
| output.append("\n## π Feature Relationships") | |
| if 'numerical_correlations' in rel_analysis: | |
| output.append("\n### Top Numerical Correlations") | |
| top_corrs = rel_analysis['numerical_correlations']['top_correlations'][:5] | |
| for corr in top_corrs: | |
| strength = "Strong" if abs(corr['correlation']) > 0.7 else "Moderate" if abs(corr['correlation']) > 0.3 else "Weak" | |
| output.append(f"- {corr['features'][0]} β {corr['features'][1]}: {corr['correlation']:.3f} ({strength})") | |
| if 'mixed_relationships' in rel_analysis: | |
| output.append("\n### Significant Numerical-Categorical Relationships (ANOVA p < 0.05)") | |
| sig_count = 0 | |
| for key, analysis in rel_analysis['mixed_relationships'].items(): | |
| if analysis['anova_p_value'] < 0.05 and sig_count < 3: | |
| num_col, cat_col = key.split('_by_') | |
| output.append(f"- {num_col} varies significantly by {cat_col} (p={analysis['anova_p_value']:.4f})") | |
| sig_count += 1 | |
| return "\n".join(output) | |
| def analyze_data(file, use_sample=False): | |
| """Main analysis function""" | |
| try: | |
| # Read file | |
| if file.name.endswith('.csv'): | |
| df = pd.read_csv(file) | |
| elif file.name.endswith(('.xlsx', '.xls')): | |
| df = pd.read_excel(file) | |
| else: | |
| return "β Unsupported file format. Please upload CSV or Excel file." | |
| # Sample data if requested | |
| if use_sample and len(df) > 1000: | |
| df = df.sample(n=1000, random_state=42) | |
| # Initialize analyzer | |
| analyst = AIDataAnalyst() | |
| # Preprocess data | |
| processed_df = analyst.preprocess_data(df) | |
| # Perform comprehensive analysis | |
| analysis_results = analyst.perform_comprehensive_analysis(df) | |
| # Format results | |
| return format_analysis_results(analysis_results) | |
| except Exception as e: | |
| return f"β Error during analysis: {str(e)}" | |
| # Create Gradio interface with corrected Gradio 6.0 syntax | |
| with gr.Blocks(title="AI Data Analyst") as demo: | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| # Add your image | |
| try: | |
| gr.Image("data_analyst.png", | |
| label="AI Data Analyst", | |
| show_label=False, | |
| height=300, | |
| container=False) | |
| except: | |
| pass | |
| with gr.Column(scale=3): | |
| gr.Markdown(""" | |
| # π§ AI Data Analyst | |
| ### Comprehensive Tabular Data Analysis | |
| Upload your dataset (CSV or Excel) and get detailed statistical analysis, feature insights, and relationship discovery. | |
| **Supported analyses:** | |
| - π Dataset overview and data quality assessment | |
| - π’ Numerical feature statistics and outlier detection | |
| - π Categorical feature analysis and cardinality assessment | |
| - π Feature relationships and correlation analysis | |
| - β‘ Automated insights and pattern detection | |
| """) | |
| with gr.Row(): | |
| with gr.Column(): | |
| file_input = gr.File( | |
| label="π Upload Dataset", | |
| file_types=[".csv", ".xlsx", ".xls"], | |
| type="filepath" | |
| ) | |
| sample_checkbox = gr.Checkbox( | |
| label="Use sample (first 1000 rows) for large datasets", | |
| value=False | |
| ) | |
| analyze_btn = gr.Button("π Analyze Data", variant="primary") | |
| with gr.Column(): | |
| output_text = gr.Markdown( | |
| label="π Analysis Results", | |
| value="Upload a dataset to begin analysis..." | |
| ) | |
| # Set up event handling | |
| analyze_btn.click( | |
| fn=analyze_data, | |
| inputs=[file_input, sample_checkbox], | |
| outputs=output_text | |
| ) | |
| if __name__ == "__main__": | |
| demo.launch( | |
| server_name="0.0.0.0", | |
| mcp_server=True, | |
| footer_links=["api", "gradio", "settings"] | |
| ) |