| |
| |
|
|
| import gradio as gr |
| import pandas as pd |
| import numpy as np |
| import matplotlib.pyplot as plt |
| import seaborn as sns |
| import plotly.express as px |
| import plotly.graph_objects as go |
| import io |
| from sklearn.decomposition import PCA |
| from sklearn.preprocessing import StandardScaler |
| import os |
| import json |
| import re |
|
|
| |
| sns.set(style="whitegrid") |
| plt.rcParams["figure.figsize"] = (10, 6) |
|
|
| def read_file(file): |
| """Read different file formats into a pandas DataFrame with robust separator detection.""" |
| if file is None: |
| return None |
| |
| file_name = file.name if hasattr(file, 'name') else '' |
| print(f"Reading file: {file_name}") |
| |
| try: |
| |
| if file_name.endswith('.csv'): |
| |
| try: |
| df = pd.read_csv(file) |
| |
| |
| if len(df.columns) == 1 and ';' in str(df.columns[0]): |
| print("Detected potential semicolon-separated file") |
| |
| file.seek(0) |
| |
| df = pd.read_csv(file, sep=';') |
| print(f"Read file with semicolon separator: {df.shape}") |
| else: |
| print(f"Read file with comma separator: {df.shape}") |
| |
| |
| for col in df.columns: |
| |
| if df[col].dtype == 'object': |
| df[col] = pd.to_numeric(df[col], errors='ignore') |
| |
| return df |
| except Exception as e: |
| print(f"Error with standard separators: {e}") |
| |
| file.seek(0) |
| try: |
| df = pd.read_csv(file, sep=';') |
| print(f"Read file with semicolon separator after error: {df.shape}") |
| return df |
| except: |
| |
| file.seek(0) |
| return pd.read_csv(file, sep=None, engine='python') |
| |
| elif file_name.endswith(('.xls', '.xlsx')): |
| return pd.read_excel(file) |
| elif file_name.endswith('.json'): |
| return pd.read_json(file) |
| elif file_name.endswith('.txt'): |
| |
| try: |
| df = pd.read_csv(file, delimiter='\t') |
| if len(df.columns) <= 1: |
| |
| file.seek(0) |
| df = pd.read_csv(file, sep=None, engine='python') |
| return df |
| except: |
| |
| file.seek(0) |
| return pd.read_csv(file, sep=None, engine='python') |
| else: |
| return "Unsupported file format. Please upload .csv, .xlsx, .xls, .json, or .txt files." |
| except Exception as e: |
| print(f"Error reading file: {str(e)}") |
| return f"Error reading file: {str(e)}" |
|
|
| def analyze_data(df): |
| """Generate basic statistics and information about the dataset.""" |
| if not isinstance(df, pd.DataFrame): |
| return df |
| |
| |
| info = {} |
| info['Shape'] = df.shape |
| info['Columns'] = df.columns.tolist() |
| info['Data Types'] = df.dtypes.astype(str).to_dict() |
| |
| |
| missing_values = df.isnull().sum() |
| if missing_values.sum() > 0: |
| info['Missing Values'] = missing_values[missing_values > 0].to_dict() |
| else: |
| info['Missing Values'] = "No missing values found" |
| |
| |
| info['Data Quality Issues'] = identify_data_quality_issues(df) |
| |
| |
| numeric_cols = df.select_dtypes(include=[np.number]).columns.tolist() |
| if numeric_cols: |
| info['Numeric Columns'] = numeric_cols |
| info['Statistics'] = df[numeric_cols].describe().to_html() |
| |
| |
| outliers = detect_outliers(df, numeric_cols) |
| if outliers: |
| info['Outliers'] = outliers |
| |
| |
| categorical_cols = df.select_dtypes(include=['object', 'category']).columns.tolist() |
| if categorical_cols: |
| info['Categorical Columns'] = categorical_cols |
| |
| cat_counts = {} |
| for col in categorical_cols[:5]: |
| cat_counts[col] = df[col].value_counts().head(10).to_dict() |
| info['Category Counts'] = cat_counts |
| |
| return info |
|
|
| def identify_data_quality_issues(df): |
| """Identify common data quality issues.""" |
| issues = {} |
| |
| |
| duplicate_count = df.duplicated().sum() |
| if duplicate_count > 0: |
| issues['Duplicate Rows'] = duplicate_count |
| |
| |
| categorical_cols = df.select_dtypes(include=['object', 'category']).columns.tolist() |
| high_cardinality = {} |
| for col in categorical_cols: |
| unique_count = df[col].nunique() |
| if unique_count > 50: |
| high_cardinality[col] = unique_count |
| |
| if high_cardinality: |
| issues['High Cardinality Columns'] = high_cardinality |
| |
| |
| potential_date_cols = [] |
| for col in df.select_dtypes(include=['object']).columns: |
| |
| sample = df[col].dropna().head(10).tolist() |
| if all(isinstance(x, str) for x in sample): |
| |
| date_pattern = re.compile(r'\d{1,4}[-/\.]\d{1,2}[-/\.]\d{1,4}') |
| if any(date_pattern.search(str(x)) for x in sample): |
| potential_date_cols.append(col) |
| |
| if potential_date_cols: |
| issues['Potential Date Columns'] = potential_date_cols |
| |
| |
| high_missing = {} |
| for col in df.columns: |
| missing_pct = df[col].isnull().mean() * 100 |
| if missing_pct > 50: |
| high_missing[col] = f"{missing_pct:.2f}%" |
| |
| if high_missing: |
| issues['Columns with >50% Missing'] = high_missing |
| |
| return issues |
|
|
| def detect_outliers(df, numeric_cols): |
| """Detect outliers in numeric columns using IQR method.""" |
| outliers = {} |
| |
| for col in numeric_cols: |
| |
| if df[col].nunique() > df.shape[0] * 0.9: |
| continue |
| |
| |
| Q1 = df[col].quantile(0.25) |
| Q3 = df[col].quantile(0.75) |
| IQR = Q3 - Q1 |
| |
| |
| lower_bound = Q1 - 1.5 * IQR |
| upper_bound = Q3 + 1.5 * IQR |
| |
| |
| outlier_count = ((df[col] < lower_bound) | (df[col] > upper_bound)).sum() |
| |
| if outlier_count > 0: |
| outlier_pct = (outlier_count / df.shape[0]) * 100 |
| if outlier_pct > 1: |
| outliers[col] = { |
| 'count': outlier_count, |
| 'percentage': f"{outlier_pct:.2f}%", |
| 'lower_bound': lower_bound, |
| 'upper_bound': upper_bound |
| } |
| |
| return outliers |
|
|
| def generate_visualizations(df): |
| """Generate appropriate visualizations based on the data types.""" |
| if not isinstance(df, pd.DataFrame): |
| print(f"Not a DataFrame: {type(df)}") |
| return df |
| |
| print(f"Starting visualization generation for DataFrame with shape: {df.shape}") |
| |
| visualizations = {} |
| |
| |
| numeric_cols = df.select_dtypes(include=[np.number]).columns.tolist() |
| categorical_cols = df.select_dtypes(include=['object', 'category']).columns.tolist() |
| date_cols = [col for col in df.columns if df[col].dtype == 'datetime64[ns]' or |
| (df[col].dtype == 'object' and pd.to_datetime(df[col], errors='coerce').notna().all())] |
| |
| print(f"Found {len(numeric_cols)} numeric columns: {numeric_cols}") |
| print(f"Found {len(categorical_cols)} categorical columns: {categorical_cols}") |
| print(f"Found {len(date_cols)} date columns: {date_cols}") |
| |
| try: |
| |
| if len(df) > 0 and len(df.columns) > 0: |
| col = df.columns[0] |
| try: |
| test_data = df[col].head(100) |
| fig = px.histogram(x=test_data, title=f"Test Plot for {col}") |
| visualizations['test_plot'] = fig |
| print(f"Generated test plot for column: {col}") |
| except Exception as e: |
| print(f"Error creating test plot: {e}") |
| |
| |
| if numeric_cols: |
| for i, col in enumerate(numeric_cols[:5]): |
| try: |
| fig = px.histogram(df, x=col, marginal="box", title=f"Distribution of {col}") |
| visualizations[f'dist_{col}'] = fig |
| print(f"Generated distribution plot for {col}") |
| except Exception as e: |
| print(f"Error creating histogram for {col}: {e}") |
| |
| |
| if categorical_cols: |
| for i, col in enumerate(categorical_cols[:5]): |
| try: |
| |
| value_counts = df[col].value_counts().nlargest(10) |
| |
| |
| value_counts.index = value_counts.index.astype(str) |
| |
| fig = px.bar(x=value_counts.index, y=value_counts.values, |
| title=f"Top 10 categories in {col}") |
| fig.update_xaxes(title=col) |
| fig.update_yaxes(title="Count") |
| visualizations[f'bar_{col}'] = fig |
| print(f"Generated bar chart for {col}") |
| except Exception as e: |
| print(f"Error creating bar chart for {col}: {e}") |
| |
| |
| if len(numeric_cols) > 1: |
| try: |
| corr_matrix = df[numeric_cols].corr() |
| fig = px.imshow(corr_matrix, text_auto=True, aspect="auto", |
| title="Correlation Heatmap") |
| visualizations['correlation'] = fig |
| print("Generated correlation heatmap") |
| except Exception as e: |
| print(f"Error creating correlation heatmap: {e}") |
| |
| |
| if len(numeric_cols) >= 2: |
| try: |
| plot_cols = numeric_cols[:3] |
| fig = px.scatter_matrix(df, dimensions=plot_cols, title="Scatter Plot Matrix") |
| visualizations['scatter_matrix'] = fig |
| print("Generated scatter plot matrix") |
| except Exception as e: |
| print(f"Error creating scatter matrix: {e}") |
| |
| |
| if date_cols and numeric_cols: |
| try: |
| date_col = date_cols[0] |
| |
| if df[date_col].dtype != 'datetime64[ns]': |
| df[date_col] = pd.to_datetime(df[date_col], errors='coerce') |
| |
| |
| df_sorted = df.sort_values(by=date_col) |
| |
| |
| num_col = numeric_cols[0] |
| fig = px.line(df_sorted, x=date_col, y=num_col, |
| title=f"{num_col} over Time") |
| visualizations['time_series'] = fig |
| print("Generated time series plot") |
| except Exception as e: |
| print(f"Error creating time series plot: {e}") |
| |
| |
| if len(numeric_cols) >= 3: |
| try: |
| |
| numeric_data = df[numeric_cols].select_dtypes(include=[np.number]) |
| |
| numeric_data = numeric_data.fillna(numeric_data.mean()) |
| |
| |
| scaler = StandardScaler() |
| scaled_data = scaler.fit_transform(numeric_data) |
| |
| |
| pca = PCA(n_components=2) |
| pca_result = pca.fit_transform(scaled_data) |
| |
| |
| pca_df = pd.DataFrame(data=pca_result, columns=['PC1', 'PC2']) |
| |
| |
| if categorical_cols: |
| cat_col = categorical_cols[0] |
| pca_df[cat_col] = df[cat_col].values |
| fig = px.scatter(pca_df, x='PC1', y='PC2', color=cat_col, |
| title="PCA Visualization") |
| else: |
| fig = px.scatter(pca_df, x='PC1', y='PC2', |
| title="PCA Visualization") |
| |
| variance_ratio = pca.explained_variance_ratio_ |
| fig.update_layout( |
| annotations=[ |
| dict( |
| text=f"PC1 explained variance: {variance_ratio[0]:.2f}", |
| showarrow=False, |
| x=0.5, |
| y=1.05, |
| xref="paper", |
| yref="paper" |
| ), |
| dict( |
| text=f"PC2 explained variance: {variance_ratio[1]:.2f}", |
| showarrow=False, |
| x=0.5, |
| y=1.02, |
| xref="paper", |
| yref="paper" |
| ) |
| ] |
| ) |
| |
| visualizations['pca'] = fig |
| print("Generated PCA visualization") |
| except Exception as e: |
| print(f"Error creating PCA visualization: {e}") |
|
|
| except Exception as e: |
| print(f"Error in visualization generation: {e}") |
| |
| print(f"Generated {len(visualizations)} visualizations") |
| |
| |
| if not visualizations: |
| print("No visualizations generated, creating fallback") |
| try: |
| |
| fig = go.Figure() |
| |
| |
| if len(df) > 0: |
| fig.add_trace(go.Scatter( |
| x=list(range(min(20, len(df)))), |
| y=df.iloc[:min(20, len(df)), 0] if len(df.columns) > 0 else list(range(min(20, len(df)))), |
| mode='markers', |
| name='Fallback Plot' |
| )) |
| else: |
| fig.add_annotation(text="No data to visualize", showarrow=False) |
| |
| fig.update_layout(title="Fallback Visualization") |
| visualizations['fallback'] = fig |
| except Exception as e: |
| print(f"Error creating fallback visualization: {e}") |
| |
| return visualizations |
|
|
| def display_analysis(analysis): |
| """Format the analysis results for display.""" |
| if analysis is None: |
| return "No analysis available." |
| |
| if isinstance(analysis, str): |
| return analysis |
| |
| |
| html = "<h2>Data Analysis</h2>" |
| |
| |
| html += f"<p><strong>Shape:</strong> {analysis['Shape'][0]} rows, {analysis['Shape'][1]} columns</p>" |
| html += f"<p><strong>Columns:</strong> {', '.join(analysis['Columns'])}</p>" |
| |
| |
| html += "<h3>Missing Values</h3>" |
| if isinstance(analysis['Missing Values'], str): |
| html += f"<p>{analysis['Missing Values']}</p>" |
| else: |
| html += "<ul>" |
| for col, count in analysis['Missing Values'].items(): |
| html += f"<li>{col}: {count}</li>" |
| html += "</ul>" |
| |
| |
| if 'Data Quality Issues' in analysis and analysis['Data Quality Issues']: |
| html += "<h3>Data Quality Issues</h3>" |
| for issue_type, issue_details in analysis['Data Quality Issues'].items(): |
| html += f"<h4>{issue_type}</h4>" |
| if isinstance(issue_details, dict): |
| html += "<ul>" |
| for key, value in issue_details.items(): |
| html += f"<li>{key}: {value}</li>" |
| html += "</ul>" |
| else: |
| html += f"<p>{issue_details}</p>" |
| |
| |
| if 'Outliers' in analysis and analysis['Outliers']: |
| html += "<h3>Outliers Detected</h3>" |
| html += "<ul>" |
| for col, details in analysis['Outliers'].items(): |
| html += f"<li><strong>{col}:</strong> {details['count']} outliers ({details['percentage']})<br>" |
| html += f"Values outside range: [{details['lower_bound']:.2f}, {details['upper_bound']:.2f}]</li>" |
| html += "</ul>" |
| |
| |
| if 'Statistics' in analysis: |
| html += "<h3>Numeric Statistics</h3>" |
| html += analysis['Statistics'] |
| |
| |
| if 'Category Counts' in analysis: |
| html += "<h3>Categorical Data (Top Values)</h3>" |
| for col, counts in analysis['Category Counts'].items(): |
| html += f"<h4>{col}</h4><ul>" |
| for val, count in counts.items(): |
| html += f"<li>{val}: {count}</li>" |
| html += "</ul>" |
| |
| return html |
|
|
| def simple_process_file(file): |
| """Simplified version without AI models for testing""" |
| |
| df = read_file(file) |
| |
| if isinstance(df, str): |
| return df, None, None, None |
| |
| |
| analysis = analyze_data(df) |
| |
| |
| visualizations = generate_visualizations(df) |
| |
| |
| cleaning_recommendations = """ |
| ## Data Cleaning Recommendations |
| |
| * Handle missing values by either removing rows or imputing with mean/median/mode |
| * Remove duplicate rows if present |
| * Convert date-like string columns to proper datetime format |
| * Standardize text data by removing extra spaces and converting to lowercase |
| * Check for and handle outliers in numerical columns |
| |
| Note: This is a demo recommendation (AI model not connected in demo mode) |
| """ |
| |
| |
| analysis_insights = """ |
| ## Data Analysis Insights |
| |
| 1. Examine the distribution of each numeric column |
| 2. Analyze correlations between numeric features |
| 3. Look for patterns in categorical data |
| 4. Consider creating visualizations like histograms and scatter plots |
| 5. Explore relationships between different variables |
| |
| Note: This is a demo insight (AI model not connected in demo mode) |
| """ |
| |
| return analysis, visualizations, cleaning_recommendations, analysis_insights |
|
|
| def demo_ui(file): |
| """Demo mode UI function""" |
| if file is None: |
| return "Please upload a file to begin analysis.", None, None, None |
| |
| print(f"Processing file in demo_ui: {file.name if hasattr(file, 'name') else 'unknown'}") |
| |
| |
| analysis, visualizations, cleaning_recommendations, analysis_insights = simple_process_file(file) |
| |
| if isinstance(analysis, str): |
| print(f"Error in analysis: {analysis}") |
| return analysis, None, None, None |
| |
| |
| analysis_html = display_analysis(analysis) |
| |
| |
| viz_html = "" |
| if visualizations and not isinstance(visualizations, str): |
| print(f"Processing {len(visualizations)} visualizations for display") |
| for viz_name, fig in visualizations.items(): |
| try: |
| |
| print(f"Visualization {viz_name}: type={type(fig)}") |
| |
| |
| html_content = fig.to_html(full_html=False, include_plotlyjs="cdn") |
| print(f"Generated HTML for {viz_name}, length: {len(html_content)}") |
| |
| viz_html += f'<div style="margin-bottom: 30px;">{html_content}</div>' |
| print(f"Added visualization: {viz_name}") |
| except Exception as e: |
| print(f"Error rendering visualization {viz_name}: {e}") |
| else: |
| print(f"No visualizations to display: {visualizations}") |
| viz_html = "<p>No visualizations could be generated for this dataset.</p>" |
| |
| |
| result_html = f""" |
| <div style="display: flex; flex-direction: column;"> |
| <div>{analysis_html}</div> |
| <h2>Data Visualizations</h2> |
| <div>{viz_html}</div> |
| </div> |
| """ |
| |
| return result_html, visualizations, cleaning_recommendations, analysis_insights |
|
|
| def test_visualization(): |
| """Create a simple test visualization to verify plotly is working.""" |
| import plotly.express as px |
| import numpy as np |
| |
| |
| x = np.random.rand(100) |
| y = np.random.rand(100) |
| |
| |
| fig = px.scatter(x=x, y=y, title="Test Plot") |
| |
| |
| html = fig.to_html(full_html=False, include_plotlyjs="cdn") |
| |
| return html |
|
|
| |
| with gr.Blocks(title="Data Visualization & Cleaning AI (Demo Mode)") as demo: |
| gr.Markdown("# Data Visualization & Cleaning AI") |
| gr.Markdown("**DEMO MODE** - Upload your data file (CSV, Excel, JSON, or TXT) and get automatic analysis and visualizations.") |
| |
| with gr.Row(): |
| file_input = gr.File(label="Upload Data File") |
| |
| |
| test_viz_html = test_visualization() |
| gr.HTML(f"<details><summary>Plotly Test (Click to expand)</summary>{test_viz_html}</details>", visible=True) |
| |
| with gr.Tabs(): |
| with gr.TabItem("Data Analysis"): |
| with gr.Row(): |
| analyze_button = gr.Button("Analyze Data") |
| |
| with gr.Tabs(): |
| with gr.TabItem("Analysis & Visualizations"): |
| output = gr.HTML(label="Results") |
| with gr.TabItem("AI Cleaning Recommendations"): |
| cleaning_recommendations_output = gr.Markdown(label="AI Recommendations") |
| with gr.TabItem("AI Analysis Insights"): |
| analysis_insights_output = gr.Markdown(label="Analysis Insights") |
| with gr.TabItem("Raw Visualization Objects"): |
| viz_output = gr.JSON(label="Visualization Objects") |
| |
| |
| analyze_button.click( |
| fn=demo_ui, |
| inputs=[file_input], |
| outputs=[output, viz_output, cleaning_recommendations_output, analysis_insights_output] |
| ) |
|
|
| |
| if __name__ == "__main__": |
| demo.launch() |