""" My BI Dashboard - Main Application Built with Gradio for interactive data exploration and analysis. """ import gradio as gr import pandas as pd import matplotlib.pyplot as plt import plotly.express as px from typing import Optional, Tuple import os # My custom modules for data processing and visualization from data_processor import ( load_data, get_data_summary, get_correlation_matrix, apply_filters, clean_data, aggregate_data, get_data_preview ) from visualizations import ( create_plotly_timeseries, create_plotly_distribution, create_plotly_category, create_plotly_scatter, create_plotly_heatmap ) from insights import ( generate_all_insights, format_insights_for_display, generate_visualization_insights, generate_advanced_insights, format_advanced_insights, generate_smart_dashboard, compare_datasets ) from utils import get_column_types, get_missing_value_summary, get_dataframe_info # Store the current dataset globally current_df = None filtered_df = None def update_preview_pagination(offset=0): """Updates the data preview based on offset.""" global current_df if current_df is None: return "No data loaded.", None, 0 total_rows = len(current_df) # Clamp offset if offset >= total_rows: offset = max(0, total_rows - (total_rows % 10 or 10)) elif offset < 0: offset = 0 preview = current_df.iloc[offset : offset + 10] status_msg = f"Dataset Loaded Successfully! ✅ (Rows {offset+1}-{min(offset+10, total_rows)} of {total_rows})" return status_msg, preview, offset def upload_and_preview_data(file): """Handles file upload and shows preview.""" global current_df, filtered_df if file is None: return ( "Please upload a file.", None, 0, 0, # status, preview, row_count, offset gr.update(value=None), gr.update(value=None), gr.update(value=None), # num filters gr.update(choices=[], value=None), gr.update(choices=[]), # cat filter 1 gr.update(choices=[], value=None), gr.update(choices=[]), # cat filter 2 gr.update(choices=[], value=None), gr.update(value=""), gr.update(value=""), # date filter gr.update(choices=[], value=None), gr.update(choices=[], value=None), gr.update(choices=[], value=None), # viz cols gr.update(choices=[], value=None), gr.update(choices=[], value=None), # comp cols gr.update(choices=[], value=None) # drill col ) try: # Load data df, error = load_data(file.name) if error: return [f"Error: {error}"] + [None]*19 if df is not None: current_df = df filtered_df = df.copy() # Get column types col_types = get_column_types(df) # Get preview (first 10 rows) preview = df.head(10) status_msg = f"Dataset Loaded Successfully! ✅ (Rows 1-{min(10, len(df))} of {len(df)})" # Get all columns all_cols = df.columns.tolist() return ( status_msg, preview, len(df), 0, # status, preview, row_count, offset gr.update(choices=col_types['numerical'], value=col_types['numerical'][0] if col_types['numerical'] else None), # num_col gr.update(value=None), gr.update(value=None), # num_min, num_max gr.update(choices=col_types['categorical'], value=col_types['categorical'][0] if col_types['categorical'] else None), # cat_col gr.update(choices=[]), # cat_vals gr.update(choices=col_types['categorical'], value=None), # cat_col_2 gr.update(choices=[]), # cat_vals_2 gr.update(choices=col_types['datetime'], value=col_types['datetime'][0] if col_types['datetime'] else None), # date_col gr.update(value=""), gr.update(value=""), # date_start, date_end gr.update(choices=all_cols, value=None), # x_col gr.update(choices=all_cols, value=None), # y_col gr.update(choices=all_cols, value=None), # color_col gr.update(choices=col_types['categorical'], value=None), # comp_a gr.update(choices=col_types['categorical'], value=None), # comp_b gr.update(choices=col_types['categorical'], value=None) # drill ) else: return ["Error loading file."] + [None]*19 except Exception as e: return [f"Error: {str(e)}"] + [None]*19 def load_from_path_or_url(path_or_url): """Load data from a file path or URL.""" global current_df, filtered_df if not path_or_url or path_or_url.strip() == "": return "Please enter a file path or URL.", None, "", gr.update(), gr.update(), gr.update(), gr.update(), gr.update(), gr.update() try: # Check if it's a URL if path_or_url.startswith('http://') or path_or_url.startswith('https://'): # Load from URL if path_or_url.endswith('.csv'): df = pd.read_csv(path_or_url) elif path_or_url.endswith(('.xlsx', '.xls')): df = pd.read_excel(path_or_url) else: # Try CSV by default df = pd.read_csv(path_or_url) else: # Load from local path df, error = load_data(path_or_url) if error: return f"Error: {error}", None, "", gr.update(), gr.update(), gr.update(), gr.update(), gr.update(), gr.update() # Validate the loaded DataFrame is_valid, error_msg = validate_dataframe(df) if not is_valid: return f"Invalid data: {error_msg}", None, "", gr.update(), gr.update(), gr.update(), gr.update(), gr.update(), gr.update() # Auto-detect and convert datetime columns datetime_cols = detect_datetime_columns(df) for col in datetime_cols: try: df[col] = pd.to_datetime(df[col], errors='coerce') except Exception: pass current_df = df filtered_df = df.copy() # Get basic info info = get_dataframe_info(df) col_types = get_column_types(df) # Create summary text summary = f""" ## Dataset Loaded Successfully! ✅ **Source:** {path_or_url} **Basic Information:** - Rows: {info['rows']:,} - Columns: {info['columns']} - Numerical Columns: {info['numerical_columns']} - Categorical Columns: {info['categorical_columns']} - DateTime Columns: {info['datetime_columns']} - Memory Usage: {info['memory_usage_mb']:.2f} MB - Missing Values: {info['total_missing']:,} ({info['missing_percentage']:.2f}%) **Column Names:** {', '.join(df.columns.tolist())} """ # Get preview preview = df.head(10) # Get all columns for dropdowns all_cols = df.columns.tolist() # Return updates for visualization dropdowns and filter dropdowns return ( summary, preview, "Data loaded successfully from path/URL!", gr.update(choices=all_cols, value=None), # x_column - clear selection gr.update(choices=all_cols, value=None), # y_column - clear selection gr.update(choices=all_cols, value=None), # color_column - clear selection gr.update(choices=col_types['numerical'], value=None), # num_filter_col - clear selection gr.update(choices=col_types['categorical'], value=None), # cat_filter_col - clear selection gr.update(choices=col_types['datetime'], value=None) # date_filter_col - clear selection ) except Exception as e: return f"Error loading data: {str(e)}", None, "", gr.update(), gr.update(), gr.update(), gr.update(), gr.update(), gr.update() def show_statistics(): """Generate and display statistics for the current dataset.""" global current_df if current_df is None: return 0, 0, 0, 0, None, None, None # Get summary from data processor summary = get_data_summary(current_df) # 1. Metrics rows = summary['shape'][0] cols = summary['shape'][1] dupes = summary.get('duplicate_rows', 0) total_cells = rows * cols missing_cells = sum(summary['missing_values'].values()) missing_pct = (missing_cells / total_cells * 100) if total_cells > 0 else 0 # 2. Numerical Stats (Transposed for better readability) if 'numerical_stats' in summary: num_df = pd.DataFrame(summary['numerical_stats']).T num_df = num_df.reset_index().rename(columns={'index': 'Feature'}) # Round numeric columns for col in num_df.columns: if col != 'Feature': num_df[col] = num_df[col].apply(lambda x: round(x, 2)) else: num_df = pd.DataFrame(columns=["No numerical columns found"]) # 3. Categorical Stats if 'categorical_stats' in summary: cat_data = [] for col, stats in summary['categorical_stats'].items(): top_val = list(stats['top_values'].keys())[0] if stats['top_values'] else "N/A" top_count = list(stats['top_values'].values())[0] if stats['top_values'] else 0 cat_data.append({ 'Feature': col, 'Unique Values': stats['unique_count'], 'Most Common': top_val, 'Count': top_count, 'Share (%)': round(top_count / rows * 100, 1) }) cat_df = pd.DataFrame(cat_data) else: cat_df = pd.DataFrame(columns=["No categorical columns found"]) # 4. Missing Stats missing_data = [] for col, count in summary['missing_values'].items(): if count > 0: missing_data.append({ 'Feature': col, 'Missing Count': count, 'Missing (%)': round(count / rows * 100, 2) }) if missing_data: missing_df = pd.DataFrame(missing_data).sort_values('Missing Count', ascending=False) else: missing_df = pd.DataFrame(columns=["No missing values found"]) # 5. Correlation Matrix corr_plot = create_plotly_heatmap(current_df, title="Correlation Matrix") return rows, cols, dupes, round(missing_pct, 2), num_df, cat_df, missing_df, corr_plot def update_filter_options(): """Update filter options based on current dataset.""" global current_df if current_df is None: return gr.update(choices=[]), gr.update(choices=[]), gr.update(choices=[]) col_types = get_column_types(current_df) return ( gr.update(choices=col_types['numerical']), gr.update(choices=col_types['categorical']), gr.update(choices=col_types['datetime']) ) def apply_filters_and_update(num_col, num_min, num_max, cat_col, cat_vals, cat_col_2, cat_vals_2, date_col, date_start, date_end, offset=0): """Apply filters and return summary and preview.""" global filtered_df, current_df if current_df is None: return "No data loaded.", None, 0, offset # Construct filters dictionary filters = {} # Numerical filter if num_col and (num_min is not None or num_max is not None): filters[num_col] = { 'type': 'numerical', 'min': num_min, 'max': num_max } # Categorical filter 1 if cat_col and cat_vals: filters[cat_col] = { 'type': 'categorical', 'values': cat_vals } # Categorical filter 2 if cat_col_2 and cat_vals_2: filters[cat_col_2] = { 'type': 'categorical', 'values': cat_vals_2 } # Date filter if date_col and (date_start or date_end): filters[date_col] = { 'type': 'datetime', 'start_date': date_start, 'end_date': date_end } # Apply filters filtered_df = apply_filters(current_df, filters) # Create summary summary_lines = [ "## Filtered Data", f"**Original rows:** {len(current_df):,}", f"**Filtered rows:** {len(filtered_df):,}", f"**Rows removed:** {len(current_df) - len(filtered_df):,}" ] # Add breakdown for categorical filter 1 if cat_col and cat_vals and not filtered_df.empty: try: counts = filtered_df[cat_col].value_counts() summary_lines.append(f"\n**{cat_col} Breakdown:**") for val in cat_vals: if val in counts: summary_lines.append(f"- {val}: {counts[val]:,}") else: summary_lines.append(f"- {val}: 0") except: pass # Add breakdown for categorical filter 2 if cat_col_2 and cat_vals_2 and not filtered_df.empty: try: counts = filtered_df[cat_col_2].value_counts() summary_lines.append(f"\n**{cat_col_2} Breakdown:**") for val in cat_vals_2: if val in counts: summary_lines.append(f"- {val}: {counts[val]:,}") else: summary_lines.append(f"- {val}: 0") except: pass summary = "\n".join(summary_lines) # Pagination Logic total_rows = len(filtered_df) if offset >= total_rows: offset = max(0, total_rows - (total_rows % 20 or 20)) elif offset < 0: offset = 0 preview = filtered_df.iloc[offset : offset + 20] # Add pagination info to summary if total_rows > 0: summary += f"\n\n**Showing Rows:** {offset+1}-{min(offset+20, total_rows)}" return summary, preview, len(filtered_df), offset def create_visualization(viz_type, x_col, y_col, color_col, agg_method, top_n, offset=0): """Create visualizations based on user selection and generate insights.""" global filtered_df if filtered_df is None or len(filtered_df) == 0: return None, "Please upload and filter data first.", "", offset try: fig = None if viz_type == "Time Series": if not x_col or not y_col: return None, "Please select Date (X) and Value (Y) columns.", "", offset fig = create_plotly_timeseries(filtered_df, x_col, y_col, agg_method) elif viz_type == "Distribution (Histogram)": if not x_col: return None, "Please select Column.", "", offset fig = create_plotly_distribution(filtered_df, x_col) elif viz_type == "Correlation Heatmap": fig = create_plotly_heatmap(filtered_df) if fig is None: return None, "Need at least 2 numerical columns for correlation.", "", offset elif viz_type == "Distribution (Box Plot)": if not x_col: return None, "Please select Column.", "", offset fig = px.box(filtered_df, x=x_col, y=y_col, color=color_col, title=f"Distribution of {x_col}") elif viz_type == "Bar Chart": if not x_col: return None, "Please select X-Axis column.", "", offset # Handle Pagination Edge Cases total_items = filtered_df[x_col].nunique() if offset >= total_items and total_items > 0: # Clamp to last page offset = max(0, total_items - (total_items % top_n or top_n)) status_msg = f"Reached end of data. Showing items {offset+1}-{total_items}." elif offset < 0: offset = 0 status_msg = "Start of data." else: status_msg = "Visualization created successfully!" fig = create_plotly_category(filtered_df, x_col, y_col, agg_method, top_n, offset=offset) # Generate insights for the visualization insights = generate_visualization_insights(viz_type, filtered_df, x_col, y_col) insights_text = f"## 📊 Visualization Insights\n\n{insights}" return fig, status_msg, insights_text, offset elif viz_type == "Pie Chart": if not x_col: return None, "Please select Category column.", "", offset data = filtered_df[x_col].value_counts().head(top_n) fig = px.pie(values=data.values, names=data.index, title=f"Top {top_n} {x_col}") elif viz_type == "Scatter Plot": if not x_col or not y_col: return None, "Please select X and Y columns.", "", offset fig = create_plotly_scatter(filtered_df, x_col, y_col, color_col) elif viz_type == "Correlation Heatmap": fig = create_plotly_heatmap(filtered_df) if fig is None: return None, "Need at least 2 numerical columns.", "", offset # Generate insights for the visualization insights = generate_visualization_insights(viz_type, filtered_df, x_col, y_col) # Format insights with header insights_text = f"## 📊 Visualization Insights\n\n{insights}" return fig, "Visualization created successfully!", insights_text, offset except Exception as e: return None, f"Error: {str(e)}", "", offset def generate_insights_report(): """Generate automated insights.""" global filtered_df if filtered_df is None or len(filtered_df) == 0: return "Please upload data first." try: insights = generate_all_insights(filtered_df) formatted_insights = format_insights_for_display(insights) return formatted_insights except Exception as e: return f"Error generating insights: {str(e)}" def export_filtered_data(): """Export filtered data to CSV.""" global filtered_df if filtered_df is None: return None, "No data to export." output_path = "filtered_data_export.csv" filtered_df.to_csv(output_path, index=False) return output_path, f"Data exported successfully! ({len(filtered_df)} rows)" def export_visualization(fig): """Export current visualization.""" if fig is None: return None, "No visualization to export." output_path = "visualization_export.png" fig.savefig(output_path, dpi=300, bbox_inches='tight') return output_path, "Visualization exported successfully!" def create_dashboard(): """Creates my main Gradio dashboard interface.""" with gr.Blocks(title="Business Intelligence Dashboard") as demo: gr.Markdown(""" # 📊 Business Intelligence Dashboard ### Professional Data Analysis & Visualization Platform Upload your data, explore insights, create visualizations, and export results. """) # Dropdowns that will be populated across tabs x_column_viz = None y_column_viz = None color_column_viz = None num_filter_dropdown = None cat_filter_dropdown = None cat_filter_dropdown_2 = None date_filter_dropdown = None comp_cat_col_a = None comp_cat_col_b = None comp_cat_val_a = None comp_cat_val_b = None drill_col = None drill_val = None # Tab 1: Data Upload with gr.Tab("📁 Data Upload"): # Welcome Banner Removed gr.Markdown("### 📤 Upload File") file_input = gr.File(label="Drop CSV or Excel file here", file_types=[".csv", ".xlsx", ".xls"], height=100) upload_btn = gr.Button("Load Data", variant="primary", size="lg") # Status & Preview Section gr.Markdown("### 📋 Data Status") with gr.Row(): upload_message = gr.Textbox(label="System Status", value="Waiting for data...", interactive=False) upload_status = gr.Markdown() with gr.Accordion("👀 Data Preview", open=True): data_preview = gr.Dataframe(interactive=False) # Pagination Controls for Data Preview with gr.Row(): prev_preview_btn = gr.Button("⬅️ Prev Batch", size="sm") next_preview_btn = gr.Button("Next Batch ➡️", size="sm") preview_offset = gr.State(value=0) preview_batch_size = gr.State(value=10) # Tab 2: Statistics with gr.Tab("📈 Statistics"): gr.Markdown("## 📊 Data Health & Statistics") stats_btn = gr.Button("Generate Statistics", variant="primary") # Metric Cards Row with gr.Row(): stat_rows = gr.Number(label="Total Rows", value=0) stat_cols = gr.Number(label="Total Columns", value=0) stat_dupes = gr.Number(label="Duplicate Rows", value=0) stat_missing = gr.Number(label="Missing Cells (%)", value=0) gr.Markdown("### 🔢 Numerical Statistics") numerical_stats = gr.Dataframe(label="Descriptive Statistics (Transposed)", interactive=False) with gr.Row(): with gr.Column(): gr.Markdown("### 📋 Categorical Summary") categorical_stats = gr.Dataframe(label="Top Categories", interactive=False) with gr.Column(): gr.Markdown("### ⚠️ Missing Values Report") missing_stats = gr.Dataframe(label="Missing Data by Column", interactive=False) gr.Markdown("### 🔥 Correlation Matrix") corr_matrix_plot = gr.Plot(label="Correlation Matrix") stats_btn.click( fn=show_statistics, inputs=[], outputs=[stat_rows, stat_cols, stat_dupes, stat_missing, numerical_stats, categorical_stats, missing_stats, corr_matrix_plot] ) # Tab 3: Filter & Explore with gr.Tab("🔍 Filter & Explore"): gr.Markdown("## Interactive Data Filtering") gr.Markdown(""" **How to use filters:** 1. Select a column from any filter section below 2. Set your filter criteria (range, values, or dates) 3. Click "Apply Filters" to see filtered results 4. You can combine multiple filters together 5. The filtered data will be used in Visualizations and Insights tabs """) with gr.Row(): with gr.Column(): gr.Markdown("### 🔢 Numerical Filters") gr.Markdown("*Filter by number ranges (e.g., Sales, Price, Quantity)*") num_filter_dropdown = gr.Dropdown(label="Select Numerical Column", choices=[], interactive=True) with gr.Row(): num_min = gr.Number(label="Minimum Value", placeholder="Min") num_max = gr.Number(label="Maximum Value", placeholder="Max") with gr.Column(): gr.Markdown("### 📋 Categorical Filters") gr.Markdown("*Filter by categories (e.g., Product, Region)*") cat_filter_dropdown = gr.Dropdown(label="Select Categorical Column 1", choices=[], interactive=True) cat_filter_values = gr.CheckboxGroup(label="Select Values to Include", choices=[]) gr.Markdown("---") cat_filter_dropdown_2 = gr.Dropdown(label="Select Categorical Column 2 (Optional)", choices=[], interactive=True) cat_filter_values_2 = gr.CheckboxGroup(label="Select Values to Include", choices=[]) # Date filters in accordion (optional) with gr.Accordion("📅 Date Filters (Optional - Click to Expand)", open=False): gr.Markdown("*Filter by date ranges (format: YYYY-MM-DD)*") date_filter_dropdown = gr.Dropdown(label="Select Date Column", choices=[], interactive=True) with gr.Row(): date_start = gr.Textbox(label="Start Date", placeholder="YYYY-MM-DD") date_end = gr.Textbox(label="End Date", placeholder="YYYY-MM-DD") with gr.Row(): filter_btn = gr.Button("Apply Filters", variant="primary", size="lg") reset_filter_btn = gr.Button("Reset Filters", variant="secondary", size="lg") gr.Markdown("### Filter Results") filter_summary = gr.Markdown() with gr.Accordion("Filtered Data Preview", open=True): filtered_preview = gr.Dataframe(label="Filtered Data Preview") # Pagination Controls for Filtered Preview with gr.Row(): prev_filtered_btn = gr.Button("⬅️ Prev Batch", size="sm") next_filtered_btn = gr.Button("Next Batch ➡️", size="sm") filtered_offset = gr.State(value=0) filtered_batch_size = gr.State(value=20) row_count = gr.Number(label="Total Filtered Rows", interactive=False) # Update categorical values when column is selected def update_cat_values(col): if current_df is not None and col: values = current_df[col].unique().tolist() # Reset value to None to avoid "value not in choices" error return gr.update(choices=values, value=None) return gr.update(choices=[], value=None) cat_filter_dropdown.change( fn=update_cat_values, inputs=[cat_filter_dropdown], outputs=[cat_filter_values] ) cat_filter_dropdown_2.change( fn=update_cat_values, inputs=[cat_filter_dropdown_2], outputs=[cat_filter_values_2] ) filter_btn.click( fn=apply_filters_and_update, inputs=[num_filter_dropdown, num_min, num_max, cat_filter_dropdown, cat_filter_values, cat_filter_dropdown_2, cat_filter_values_2, date_filter_dropdown, date_start, date_end], outputs=[filter_summary, filtered_preview, row_count, filtered_offset] ) # Reset filters function def reset_filters(): global filtered_df, current_df if current_df is not None: filtered_df = current_df.copy() preview = filtered_df.head(20) return ( gr.update(value=None), # num_col gr.update(value=None), # num_min gr.update(value=None), # num_max gr.update(value=None), # cat_col gr.update(choices=[]), # cat_values gr.update(value=None), # cat_col_2 gr.update(choices=[]), # cat_values_2 gr.update(value=None), # date_col gr.update(value=""), # date_start gr.update(value=""), # date_end "Filters reset. Showing original data.", preview, len(filtered_df) ) return [gr.update()] * 10 + ["No data loaded.", None, 0] # Pagination Logic for Filtered Preview def update_filtered_offset(current_offset, direction, batch_size=20): if direction == "next": return current_offset + batch_size else: return max(0, current_offset - batch_size) prev_filtered_btn.click( fn=update_filtered_offset, inputs=[filtered_offset, gr.State("prev"), filtered_batch_size], outputs=[filtered_offset] ).then( fn=apply_filters_and_update, inputs=[ num_filter_dropdown, num_min, num_max, cat_filter_dropdown, cat_filter_values, cat_filter_dropdown_2, cat_filter_values_2, date_filter_dropdown, date_start, date_end, filtered_offset ], outputs=[filter_summary, filtered_preview, row_count, filtered_offset] ) next_filtered_btn.click( fn=update_filtered_offset, inputs=[filtered_offset, gr.State("next"), filtered_batch_size], outputs=[filtered_offset] ).then( fn=apply_filters_and_update, inputs=[ num_filter_dropdown, num_min, num_max, cat_filter_dropdown, cat_filter_values, cat_filter_dropdown_2, cat_filter_values_2, date_filter_dropdown, date_start, date_end, filtered_offset ], outputs=[filter_summary, filtered_preview, row_count, filtered_offset] ) # Update filter button to reset offset filter_btn.click( fn=lambda *args: apply_filters_and_update(*args, offset=0), inputs=[ num_filter_dropdown, num_min, num_max, cat_filter_dropdown, cat_filter_values, cat_filter_dropdown_2, cat_filter_values_2, date_filter_dropdown, date_start, date_end ], outputs=[filter_summary, filtered_preview, row_count, filtered_offset] ) # Pagination Logic for Data Preview def update_preview_offset(current_offset, direction, batch_size=10): if direction == "next": return current_offset + batch_size else: return max(0, current_offset - batch_size) prev_preview_btn.click( fn=update_preview_offset, inputs=[preview_offset, gr.State("prev"), preview_batch_size], outputs=[preview_offset] ).then( fn=update_preview_pagination, inputs=[preview_offset], outputs=[upload_status, data_preview, preview_offset] ) next_preview_btn.click( fn=update_preview_offset, inputs=[preview_offset, gr.State("next"), preview_batch_size], outputs=[preview_offset] ).then( fn=update_preview_pagination, inputs=[preview_offset], outputs=[upload_status, data_preview, preview_offset] ) # Update upload button to reset offset # Update upload button to reset offset # Tab 4: Visualizations with gr.Tab("📊 Visualizations"): gr.Markdown("## 🎨 Create Interactive Visualizations") with gr.Row(): # Left Column: Chart Settings with gr.Column(scale=1): gr.Markdown("### 1. Chart Settings") viz_type = gr.Dropdown( label="Select Chart Type", choices=["Time Series", "Distribution (Histogram)", "Distribution (Box Plot)", "Bar Chart", "Pie Chart", "Scatter Plot", "Correlation Heatmap"], value="Bar Chart" ) # Dynamic help text based on selection could be added here, but static for now gr.Markdown("""
Guide:
Bar/Pie: Compare categories
Time Series: Trends over time
Scatter: Relationships between numbers
Distribution: Spread of data
""") agg_method = gr.Dropdown( label="Aggregation (for grouped data)", choices=["sum", "mean", "count", "median"], value="sum" ) top_n = gr.Slider(label="Top N Categories", minimum=5, maximum=20, value=10, step=1) # Pagination Controls with gr.Row(): prev_batch_btn = gr.Button("⬅️ Prev Batch", size="sm") next_batch_btn = gr.Button("Next Batch ➡️", size="sm") viz_offset = gr.State(value=0) viz_btn = gr.Button("🚀 Create Visualization", variant="primary") # Right Column: Data Selection with gr.Column(scale=2): gr.Markdown("### 2. Select Data") with gr.Row(): x_column_viz = gr.Dropdown(label="X-Axis / Category Column", choices=[], interactive=True) y_column_viz = gr.Dropdown(label="Y-Axis / Value Column", choices=[], interactive=True) color_column_viz = gr.Dropdown(label="Color / Grouping (Optional)", choices=[], interactive=True) # Visualization Output Area with gr.Row(): with gr.Column(scale=3): viz_plot = gr.Plot(label="Interactive Chart") with gr.Column(scale=1): gr.Markdown("### 💡 AI Insights") viz_insights = gr.Markdown(value="*Insights will appear here after generating a chart.*") viz_status = gr.Textbox(label="Status", interactive=False, visible=True) # Wrapper to reset offset when creating new visualization def create_viz_reset(viz_type, x_col, y_col, color_col, agg_method, top_n): return create_visualization(viz_type, x_col, y_col, color_col, agg_method, top_n, offset=0) viz_btn.click( fn=create_viz_reset, inputs=[viz_type, x_column_viz, y_column_viz, color_column_viz, agg_method, top_n], outputs=[viz_plot, viz_status, viz_insights, viz_offset] ) # Export Toolbar with gr.Row(variant="panel"): with gr.Column(scale=3): gr.Markdown("**Export Options:**") with gr.Column(scale=1): export_viz_btn = gr.Button("💾 Download PNG", size="sm") export_viz_file = gr.File(label="Download File", visible=False) export_viz_status = gr.Textbox(visible=False) # Pagination Logic def update_viz_offset(current_offset, direction, top_n): if direction == "next": return current_offset + top_n else: return max(0, current_offset - top_n) prev_batch_btn.click( fn=update_viz_offset, inputs=[viz_offset, gr.State("prev"), top_n], outputs=[viz_offset] ).then( fn=create_visualization, inputs=[viz_type, x_column_viz, y_column_viz, color_column_viz, agg_method, top_n, viz_offset], outputs=[viz_plot, viz_status, viz_insights, viz_offset] ) next_batch_btn.click( fn=update_viz_offset, inputs=[viz_offset, gr.State("next"), top_n], outputs=[viz_offset] ).then( fn=create_visualization, inputs=[viz_type, x_column_viz, y_column_viz, color_column_viz, agg_method, top_n, viz_offset], outputs=[viz_plot, viz_status, viz_insights, viz_offset] ) export_viz_btn.click( fn=lambda: export_visualization(viz_plot.value) if viz_plot.value else (None, "No visualization to export"), inputs=[], outputs=[export_viz_file, export_viz_status] ) # Show file download when ready def show_download(file, status): return gr.update(visible=True), status export_viz_btn.click( fn=show_download, inputs=[export_viz_file, export_viz_status], outputs=[export_viz_file, export_viz_status] ) # Tab 5: Insights with gr.Tab("💡 Insights"): gr.Markdown("## 🧠 Advanced AI Insights") gr.Markdown("Deep dive analysis of your data's performance, drivers, and risks.") with gr.Row(): insights_source = gr.Radio( choices=["Full Dataset", "Filtered Data"], value="Filtered Data", label="Analysis Scope", info="Choose whether to analyze the entire dataset or just the filtered subset." ) insights_btn = gr.Button("Generate Advanced Insights", variant="primary", size="lg") # Executive Summary Row gr.Markdown("### 📋 Executive Summary") with gr.Row(): exec_card1 = gr.Markdown() exec_card2 = gr.Markdown() exec_card3 = gr.Markdown() # Detailed Analysis Rows with gr.Row(): with gr.Column(): gr.Markdown("### 🏆 Pareto Analysis (80/20 Rule)") pareto_output = gr.Markdown("*Click Generate to see vital few categories*") with gr.Column(): gr.Markdown("### 🔑 Key Drivers") drivers_output = gr.Markdown("*Click Generate to see what drives your metrics*") with gr.Row(): with gr.Column(): gr.Markdown("### 📊 Segment Performance") segments_output = gr.Markdown("*Click Generate to see segment analysis*") with gr.Column(): gr.Markdown("### ⚠️ Anomalies & Risks") anomalies_output = gr.Markdown("*Click Generate to see detected anomalies*") with gr.Accordion("📄 View Full Report", open=False): full_report_output = gr.Markdown("Generate insights to see the full report.") def update_insights(source): # from advanced_insights import generate_advanced_insights, format_advanced_insights global filtered_df, current_df target_df = filtered_df if source == "Filtered Data" else current_df if target_df is None: return ["Please upload data first."] * 8 try: insights = generate_advanced_insights(target_df) # Format Executive Summary cards cards = insights.get('executive_summary', []) card_outputs = [] for card in cards: html = f"""
{card['icon']}
{card['title']}
{card['value']}
{card['description']}
""" card_outputs.append(html) # Fill remaining cards if less than 3 while len(card_outputs) < 3: card_outputs.append("") # Format Pareto pareto = insights.get('pareto_analysis', []) if pareto: pareto_text = "" for p in pareto: pareto_text += f"#### {p['category']} Analysis\n" pareto_text += f"{p['insight']}\n\n" pareto_text += "**Vital Few (Top 5):**\n" + ", ".join([f"`{x}`" for x in p['vital_few']]) + "\n\n---\n" else: pareto_text = "No significant Pareto patterns found." # Format Drivers drivers = insights.get('key_drivers', []) if drivers: drivers_text = "" for d in drivers: drivers_text += f"- {d['insight']}\n" else: drivers_text = "No strong correlations found to identify key drivers." # Format Segments segments = insights.get('segment_analysis', []) if segments: seg_text = "" for s in segments: seg_text += f"- {s['insight']}\n" else: seg_text = "Not enough categorical data for segment analysis." # Format Anomalies anomalies = insights.get('anomalies', []) if anomalies: anom_text = "" for a in anomalies: anom_text += f"- {a['insight']}\n" else: anom_text = "✅ No significant anomalies detected." # Generate Full Report full_report = format_advanced_insights(insights) return card_outputs + [pareto_text, drivers_text, seg_text, anom_text, full_report] except Exception as e: return [f"Error: {str(e)}"] * 8 insights_btn.click( fn=update_insights, inputs=[insights_source], outputs=[exec_card1, exec_card2, exec_card3, pareto_output, drivers_output, segments_output, anomalies_output, full_report_output] ) # Tab 6: Smart Dashboard with gr.Tab("🚀 Smart Dashboard"): smart_btn = gr.Button("✨ Generate Smart Dashboard", variant="primary", size="lg") with gr.Row(): with gr.Column(): plot1 = gr.Plot() desc1 = gr.Markdown() with gr.Column(): plot2 = gr.Plot() desc2 = gr.Markdown() with gr.Row(): with gr.Column(): plot3 = gr.Plot() desc3 = gr.Markdown() with gr.Column(): plot4 = gr.Plot() desc4 = gr.Markdown() with gr.Row(): with gr.Column(): plot5 = gr.Plot() desc5 = gr.Markdown() with gr.Column(): plot6 = gr.Plot() desc6 = gr.Markdown() smart_status = gr.Textbox(label="Status", interactive=False) def update_smart_dashboard(): # from smart_dashboard import generate_smart_dashboard global filtered_df if filtered_df is None: return [None] * 12 + ["Please upload data first."] try: items = generate_smart_dashboard(filtered_df) outputs = [] # Fill up to 6 slots for i in range(6): if i < len(items): fig, title, insight = items[i] outputs.append(fig) outputs.append(f"### {title}\n\n{insight}") else: outputs.append(None) outputs.append("") outputs.append(f"Successfully generated {len(items)} visualizations based on your data patterns!") return outputs except Exception as e: return [None] * 12 + [f"Error generating dashboard: {str(e)}"] smart_btn.click( fn=update_smart_dashboard, inputs=[], outputs=[plot1, desc1, plot2, desc2, plot3, desc3, plot4, desc4, plot5, desc5, plot6, desc6, smart_status] ) # Tab 7: Comparison with gr.Tab("⚖️ Compare"): gr.Markdown("## Head-to-Head Comparison") gr.Markdown("Compare two segments of your data side-by-side (e.g., Region A vs Region B).") with gr.Row(): # Group A with gr.Column(variant="panel"): gr.Markdown("### Group A") comp_cat_col_a = gr.Dropdown(label="Filter Column", choices=[], interactive=True) comp_cat_val_a = gr.Dropdown(label="Filter Value", choices=[], interactive=True) # Group B with gr.Column(variant="panel"): gr.Markdown("### Group B") comp_cat_col_b = gr.Dropdown(label="Filter Column", choices=[], interactive=True) comp_cat_val_b = gr.Dropdown(label="Filter Value", choices=[], interactive=True) comp_btn = gr.Button("⚔️ Compare Groups", variant="primary", size="lg") gr.Markdown("### Comparison Results") with gr.Row(): comp_metric1 = gr.HTML() comp_metric2 = gr.HTML() comp_metric3 = gr.HTML() comp_metric4 = gr.HTML() comp_plot = gr.Plot(label="Side-by-Side Visualization") # Update values when column selected def update_comp_values(col): global current_df if current_df is not None and col: unique_vals = current_df[col].unique().tolist() if len(unique_vals) > 1000: # Limit to first 1000 to prevent freeze return gr.update(choices=unique_vals[:1000], label=f"Filter Value (Showing 1000/{len(unique_vals)})") return gr.update(choices=unique_vals, label="Filter Value") return gr.update(choices=[], label="Filter Value") comp_cat_col_a.change(update_comp_values, comp_cat_col_a, comp_cat_val_a) comp_cat_col_b.change(update_comp_values, comp_cat_col_b, comp_cat_val_b) def run_comparison(col_a, val_a, col_b, val_b): # from comparison import compare_datasets global current_df if current_df is None: return ["Please upload data first."] * 4 + [None] # Build filters filter_a = {col_a: {'type': 'categorical', 'values': [val_a]}} if col_a and val_a else {} filter_b = {col_b: {'type': 'categorical', 'values': [val_b]}} if col_b and val_b else {} label_a = f"{col_a}={val_a}" if col_a else "All Data" label_b = f"{col_b}={val_b}" if col_b else "All Data" metrics, fig = compare_datasets(current_df, filter_a, filter_b, label_a, label_b) # Pad metrics to 4 while len(metrics) < 4: metrics.append("") return metrics[:4] + [fig] comp_btn.click( fn=run_comparison, inputs=[comp_cat_col_a, comp_cat_val_a, comp_cat_col_b, comp_cat_val_b], outputs=[comp_metric1, comp_metric2, comp_metric3, comp_metric4, comp_plot] ) comp_btn.click( fn=run_comparison, inputs=[comp_cat_col_a, comp_cat_val_a, comp_cat_col_b, comp_cat_val_b], outputs=[comp_metric1, comp_metric2, comp_metric3, comp_metric4, comp_plot] ) # Tab 8: Segment Explorer with gr.Tab("🔍 Segment Explorer"): gr.Markdown("## Deep Dive Explorer") gr.Markdown("Select a category to see a detailed mini-dashboard for that specific segment.") with gr.Row(): drill_col = gr.Dropdown(label="Select Category Column", choices=[], interactive=True) drill_val = gr.Dropdown(label="Select Value", choices=[], interactive=True) drill_btn = gr.Button("🔎 Analyze Segment", variant="primary") gr.Markdown("### Segment Overview") with gr.Row(): drill_stat1 = gr.Number(label="Total Records") drill_stat2 = gr.Number(label="% of Total Data") drill_stat3 = gr.Number(label="Total Value (Sum of 1st Num Col)") with gr.Row(): drill_plot1 = gr.Plot(label="Trend over Time") drill_plot2 = gr.Plot(label="Top Associations") # Update values drill_col.change(update_comp_values, drill_col, drill_val) def run_drill_down(col, val): global current_df if current_df is None: return [0, 0, 0, None, None] if not col or not val: return [0, 0, 0, None, None] # Filter data subset = current_df[current_df[col] == val] # Stats count = len(subset) pct = (count / len(current_df)) * 100 col_types = get_column_types(current_df) total_val = 0 if col_types['numerical']: total_val = subset[col_types['numerical'][0]].sum() # Plot 1: Trend (if date exists) fig1 = None if col_types['datetime'] and col_types['numerical']: date_col = col_types['datetime'][0] num_col = col_types['numerical'][0] agg = subset.groupby(date_col)[num_col].sum().reset_index() fig1 = px.line(agg, x=date_col, y=num_col, title=f"{num_col} Trend for {val}") # Plot 2: Top Category (if another cat exists) fig2 = None other_cats = [c for c in col_types['categorical'] if c != col] if other_cats: cat2 = other_cats[0] top = subset[cat2].value_counts().head(10).reset_index() top.columns = [cat2, 'Count'] fig2 = px.bar(top, x=cat2, y='Count', title=f"Top {cat2} in {val}") return count, round(pct, 1), round(total_val, 2), fig1, fig2 drill_btn.click( fn=run_drill_down, inputs=[drill_col, drill_val], outputs=[drill_stat1, drill_stat2, drill_stat3, drill_plot1, drill_plot2] ) # Tab 9: Export with gr.Tab("💾 Export"): gr.Markdown("## Export Your Data") export_data_btn = gr.Button("Export Filtered Data as CSV", variant="primary") export_file = gr.File(label="Download CSV") export_status = gr.Textbox(label="Export Status", interactive=False) export_data_btn.click( fn=export_filtered_data, inputs=[], outputs=[export_file, export_status] ) # Connect upload button to update all dropdowns upload_btn.click( fn=upload_and_preview_data, inputs=[file_input], outputs=[ upload_status, data_preview, row_count, preview_offset, num_filter_dropdown, num_min, num_max, cat_filter_dropdown, cat_filter_values, cat_filter_dropdown_2, cat_filter_values_2, date_filter_dropdown, date_start, date_end, x_column_viz, y_column_viz, color_column_viz, comp_cat_col_a, comp_cat_col_b, drill_col ] ) return demo if __name__ == "__main__": demo = create_dashboard() demo.launch(share=False, server_name="0.0.0.0", server_port=7860)