Saumith devarsetty
Refactor: Consolidated insights, updated visualizations to Plotly, and updated README
4032138 | """ | |
| My BI Dashboard - Main Application | |
| Built with Gradio for interactive data exploration and analysis. | |
| """ | |
| import gradio as gr | |
| import pandas as pd | |
| import matplotlib.pyplot as plt | |
| import plotly.express as px | |
| from typing import Optional, Tuple | |
| import os | |
| # My custom modules for data processing and visualization | |
| from data_processor import ( | |
| load_data, get_data_summary, get_correlation_matrix, | |
| apply_filters, clean_data, aggregate_data, get_data_preview | |
| ) | |
| from visualizations import ( | |
| create_plotly_timeseries, create_plotly_distribution, create_plotly_category, | |
| create_plotly_scatter, create_plotly_heatmap | |
| ) | |
| from insights import ( | |
| generate_all_insights, format_insights_for_display, | |
| generate_visualization_insights, generate_advanced_insights, format_advanced_insights, | |
| generate_smart_dashboard, compare_datasets | |
| ) | |
| from utils import get_column_types, get_missing_value_summary, get_dataframe_info | |
| # Store the current dataset globally | |
| current_df = None | |
| filtered_df = None | |
| def update_preview_pagination(offset=0): | |
| """Updates the data preview based on offset.""" | |
| global current_df | |
| if current_df is None: | |
| return "No data loaded.", None, 0 | |
| total_rows = len(current_df) | |
| # Clamp offset | |
| if offset >= total_rows: | |
| offset = max(0, total_rows - (total_rows % 10 or 10)) | |
| elif offset < 0: | |
| offset = 0 | |
| preview = current_df.iloc[offset : offset + 10] | |
| status_msg = f"Dataset Loaded Successfully! β (Rows {offset+1}-{min(offset+10, total_rows)} of {total_rows})" | |
| return status_msg, preview, offset | |
| def upload_and_preview_data(file): | |
| """Handles file upload and shows preview.""" | |
| global current_df, filtered_df | |
| if file is None: | |
| return ( | |
| "Please upload a file.", None, 0, 0, # status, preview, row_count, offset | |
| gr.update(value=None), gr.update(value=None), gr.update(value=None), # num filters | |
| gr.update(choices=[], value=None), gr.update(choices=[]), # cat filter 1 | |
| gr.update(choices=[], value=None), gr.update(choices=[]), # cat filter 2 | |
| gr.update(choices=[], value=None), gr.update(value=""), gr.update(value=""), # date filter | |
| gr.update(choices=[], value=None), gr.update(choices=[], value=None), gr.update(choices=[], value=None), # viz cols | |
| gr.update(choices=[], value=None), gr.update(choices=[], value=None), # comp cols | |
| gr.update(choices=[], value=None) # drill col | |
| ) | |
| try: | |
| # Load data | |
| df, error = load_data(file.name) | |
| if error: | |
| return [f"Error: {error}"] + [None]*19 | |
| if df is not None: | |
| current_df = df | |
| filtered_df = df.copy() | |
| # Get column types | |
| col_types = get_column_types(df) | |
| # Get preview (first 10 rows) | |
| preview = df.head(10) | |
| status_msg = f"Dataset Loaded Successfully! β (Rows 1-{min(10, len(df))} of {len(df)})" | |
| # Get all columns | |
| all_cols = df.columns.tolist() | |
| return ( | |
| status_msg, preview, len(df), 0, # status, preview, row_count, offset | |
| gr.update(choices=col_types['numerical'], value=col_types['numerical'][0] if col_types['numerical'] else None), # num_col | |
| gr.update(value=None), gr.update(value=None), # num_min, num_max | |
| gr.update(choices=col_types['categorical'], value=col_types['categorical'][0] if col_types['categorical'] else None), # cat_col | |
| gr.update(choices=[]), # cat_vals | |
| gr.update(choices=col_types['categorical'], value=None), # cat_col_2 | |
| gr.update(choices=[]), # cat_vals_2 | |
| gr.update(choices=col_types['datetime'], value=col_types['datetime'][0] if col_types['datetime'] else None), # date_col | |
| gr.update(value=""), gr.update(value=""), # date_start, date_end | |
| gr.update(choices=all_cols, value=None), # x_col | |
| gr.update(choices=all_cols, value=None), # y_col | |
| gr.update(choices=all_cols, value=None), # color_col | |
| gr.update(choices=col_types['categorical'], value=None), # comp_a | |
| gr.update(choices=col_types['categorical'], value=None), # comp_b | |
| gr.update(choices=col_types['categorical'], value=None) # drill | |
| ) | |
| else: | |
| return ["Error loading file."] + [None]*19 | |
| except Exception as e: | |
| return [f"Error: {str(e)}"] + [None]*19 | |
| def load_from_path_or_url(path_or_url): | |
| """Load data from a file path or URL.""" | |
| global current_df, filtered_df | |
| if not path_or_url or path_or_url.strip() == "": | |
| return "Please enter a file path or URL.", None, "", gr.update(), gr.update(), gr.update(), gr.update(), gr.update(), gr.update() | |
| try: | |
| # Check if it's a URL | |
| if path_or_url.startswith('http://') or path_or_url.startswith('https://'): | |
| # Load from URL | |
| if path_or_url.endswith('.csv'): | |
| df = pd.read_csv(path_or_url) | |
| elif path_or_url.endswith(('.xlsx', '.xls')): | |
| df = pd.read_excel(path_or_url) | |
| else: | |
| # Try CSV by default | |
| df = pd.read_csv(path_or_url) | |
| else: | |
| # Load from local path | |
| df, error = load_data(path_or_url) | |
| if error: | |
| return f"Error: {error}", None, "", gr.update(), gr.update(), gr.update(), gr.update(), gr.update(), gr.update() | |
| # Validate the loaded DataFrame | |
| is_valid, error_msg = validate_dataframe(df) | |
| if not is_valid: | |
| return f"Invalid data: {error_msg}", None, "", gr.update(), gr.update(), gr.update(), gr.update(), gr.update(), gr.update() | |
| # Auto-detect and convert datetime columns | |
| datetime_cols = detect_datetime_columns(df) | |
| for col in datetime_cols: | |
| try: | |
| df[col] = pd.to_datetime(df[col], errors='coerce') | |
| except Exception: | |
| pass | |
| current_df = df | |
| filtered_df = df.copy() | |
| # Get basic info | |
| info = get_dataframe_info(df) | |
| col_types = get_column_types(df) | |
| # Create summary text | |
| summary = f""" | |
| ## Dataset Loaded Successfully! β | |
| **Source:** {path_or_url} | |
| **Basic Information:** | |
| - Rows: {info['rows']:,} | |
| - Columns: {info['columns']} | |
| - Numerical Columns: {info['numerical_columns']} | |
| - Categorical Columns: {info['categorical_columns']} | |
| - DateTime Columns: {info['datetime_columns']} | |
| - Memory Usage: {info['memory_usage_mb']:.2f} MB | |
| - Missing Values: {info['total_missing']:,} ({info['missing_percentage']:.2f}%) | |
| **Column Names:** | |
| {', '.join(df.columns.tolist())} | |
| """ | |
| # Get preview | |
| preview = df.head(10) | |
| # Get all columns for dropdowns | |
| all_cols = df.columns.tolist() | |
| # Return updates for visualization dropdowns and filter dropdowns | |
| return ( | |
| summary, | |
| preview, | |
| "Data loaded successfully from path/URL!", | |
| gr.update(choices=all_cols, value=None), # x_column - clear selection | |
| gr.update(choices=all_cols, value=None), # y_column - clear selection | |
| gr.update(choices=all_cols, value=None), # color_column - clear selection | |
| gr.update(choices=col_types['numerical'], value=None), # num_filter_col - clear selection | |
| gr.update(choices=col_types['categorical'], value=None), # cat_filter_col - clear selection | |
| gr.update(choices=col_types['datetime'], value=None) # date_filter_col - clear selection | |
| ) | |
| except Exception as e: | |
| return f"Error loading data: {str(e)}", None, "", gr.update(), gr.update(), gr.update(), gr.update(), gr.update(), gr.update() | |
| def show_statistics(): | |
| """Generate and display statistics for the current dataset.""" | |
| global current_df | |
| if current_df is None: | |
| return 0, 0, 0, 0, None, None, None | |
| # Get summary from data processor | |
| summary = get_data_summary(current_df) | |
| # 1. Metrics | |
| rows = summary['shape'][0] | |
| cols = summary['shape'][1] | |
| dupes = summary.get('duplicate_rows', 0) | |
| total_cells = rows * cols | |
| missing_cells = sum(summary['missing_values'].values()) | |
| missing_pct = (missing_cells / total_cells * 100) if total_cells > 0 else 0 | |
| # 2. Numerical Stats (Transposed for better readability) | |
| if 'numerical_stats' in summary: | |
| num_df = pd.DataFrame(summary['numerical_stats']).T | |
| num_df = num_df.reset_index().rename(columns={'index': 'Feature'}) | |
| # Round numeric columns | |
| for col in num_df.columns: | |
| if col != 'Feature': | |
| num_df[col] = num_df[col].apply(lambda x: round(x, 2)) | |
| else: | |
| num_df = pd.DataFrame(columns=["No numerical columns found"]) | |
| # 3. Categorical Stats | |
| if 'categorical_stats' in summary: | |
| cat_data = [] | |
| for col, stats in summary['categorical_stats'].items(): | |
| top_val = list(stats['top_values'].keys())[0] if stats['top_values'] else "N/A" | |
| top_count = list(stats['top_values'].values())[0] if stats['top_values'] else 0 | |
| cat_data.append({ | |
| 'Feature': col, | |
| 'Unique Values': stats['unique_count'], | |
| 'Most Common': top_val, | |
| 'Count': top_count, | |
| 'Share (%)': round(top_count / rows * 100, 1) | |
| }) | |
| cat_df = pd.DataFrame(cat_data) | |
| else: | |
| cat_df = pd.DataFrame(columns=["No categorical columns found"]) | |
| # 4. Missing Stats | |
| missing_data = [] | |
| for col, count in summary['missing_values'].items(): | |
| if count > 0: | |
| missing_data.append({ | |
| 'Feature': col, | |
| 'Missing Count': count, | |
| 'Missing (%)': round(count / rows * 100, 2) | |
| }) | |
| if missing_data: | |
| missing_df = pd.DataFrame(missing_data).sort_values('Missing Count', ascending=False) | |
| else: | |
| missing_df = pd.DataFrame(columns=["No missing values found"]) | |
| # 5. Correlation Matrix | |
| corr_plot = create_plotly_heatmap(current_df, title="Correlation Matrix") | |
| return rows, cols, dupes, round(missing_pct, 2), num_df, cat_df, missing_df, corr_plot | |
| def update_filter_options(): | |
| """Update filter options based on current dataset.""" | |
| global current_df | |
| if current_df is None: | |
| return gr.update(choices=[]), gr.update(choices=[]), gr.update(choices=[]) | |
| col_types = get_column_types(current_df) | |
| return ( | |
| gr.update(choices=col_types['numerical']), | |
| gr.update(choices=col_types['categorical']), | |
| gr.update(choices=col_types['datetime']) | |
| ) | |
| def apply_filters_and_update(num_col, num_min, num_max, cat_col, cat_vals, cat_col_2, cat_vals_2, date_col, date_start, date_end, offset=0): | |
| """Apply filters and return summary and preview.""" | |
| global filtered_df, current_df | |
| if current_df is None: | |
| return "No data loaded.", None, 0, offset | |
| # Construct filters dictionary | |
| filters = {} | |
| # Numerical filter | |
| if num_col and (num_min is not None or num_max is not None): | |
| filters[num_col] = { | |
| 'type': 'numerical', | |
| 'min': num_min, | |
| 'max': num_max | |
| } | |
| # Categorical filter 1 | |
| if cat_col and cat_vals: | |
| filters[cat_col] = { | |
| 'type': 'categorical', | |
| 'values': cat_vals | |
| } | |
| # Categorical filter 2 | |
| if cat_col_2 and cat_vals_2: | |
| filters[cat_col_2] = { | |
| 'type': 'categorical', | |
| 'values': cat_vals_2 | |
| } | |
| # Date filter | |
| if date_col and (date_start or date_end): | |
| filters[date_col] = { | |
| 'type': 'datetime', | |
| 'start_date': date_start, | |
| 'end_date': date_end | |
| } | |
| # Apply filters | |
| filtered_df = apply_filters(current_df, filters) | |
| # Create summary | |
| summary_lines = [ | |
| "## Filtered Data", | |
| f"**Original rows:** {len(current_df):,}", | |
| f"**Filtered rows:** {len(filtered_df):,}", | |
| f"**Rows removed:** {len(current_df) - len(filtered_df):,}" | |
| ] | |
| # Add breakdown for categorical filter 1 | |
| if cat_col and cat_vals and not filtered_df.empty: | |
| try: | |
| counts = filtered_df[cat_col].value_counts() | |
| summary_lines.append(f"\n**{cat_col} Breakdown:**") | |
| for val in cat_vals: | |
| if val in counts: | |
| summary_lines.append(f"- {val}: {counts[val]:,}") | |
| else: | |
| summary_lines.append(f"- {val}: 0") | |
| except: | |
| pass | |
| # Add breakdown for categorical filter 2 | |
| if cat_col_2 and cat_vals_2 and not filtered_df.empty: | |
| try: | |
| counts = filtered_df[cat_col_2].value_counts() | |
| summary_lines.append(f"\n**{cat_col_2} Breakdown:**") | |
| for val in cat_vals_2: | |
| if val in counts: | |
| summary_lines.append(f"- {val}: {counts[val]:,}") | |
| else: | |
| summary_lines.append(f"- {val}: 0") | |
| except: | |
| pass | |
| summary = "\n".join(summary_lines) | |
| # Pagination Logic | |
| total_rows = len(filtered_df) | |
| if offset >= total_rows: | |
| offset = max(0, total_rows - (total_rows % 20 or 20)) | |
| elif offset < 0: | |
| offset = 0 | |
| preview = filtered_df.iloc[offset : offset + 20] | |
| # Add pagination info to summary | |
| if total_rows > 0: | |
| summary += f"\n\n**Showing Rows:** {offset+1}-{min(offset+20, total_rows)}" | |
| return summary, preview, len(filtered_df), offset | |
| def create_visualization(viz_type, x_col, y_col, color_col, agg_method, top_n, offset=0): | |
| """Create visualizations based on user selection and generate insights.""" | |
| global filtered_df | |
| if filtered_df is None or len(filtered_df) == 0: | |
| return None, "Please upload and filter data first.", "", offset | |
| try: | |
| fig = None | |
| if viz_type == "Time Series": | |
| if not x_col or not y_col: | |
| return None, "Please select Date (X) and Value (Y) columns.", "", offset | |
| fig = create_plotly_timeseries(filtered_df, x_col, y_col, agg_method) | |
| elif viz_type == "Distribution (Histogram)": | |
| if not x_col: | |
| return None, "Please select Column.", "", offset | |
| fig = create_plotly_distribution(filtered_df, x_col) | |
| elif viz_type == "Correlation Heatmap": | |
| fig = create_plotly_heatmap(filtered_df) | |
| if fig is None: | |
| return None, "Need at least 2 numerical columns for correlation.", "", offset | |
| elif viz_type == "Distribution (Box Plot)": | |
| if not x_col: | |
| return None, "Please select Column.", "", offset | |
| fig = px.box(filtered_df, x=x_col, y=y_col, color=color_col, title=f"Distribution of {x_col}") | |
| elif viz_type == "Bar Chart": | |
| if not x_col: | |
| return None, "Please select X-Axis column.", "", offset | |
| # Handle Pagination Edge Cases | |
| total_items = filtered_df[x_col].nunique() | |
| if offset >= total_items and total_items > 0: | |
| # Clamp to last page | |
| offset = max(0, total_items - (total_items % top_n or top_n)) | |
| status_msg = f"Reached end of data. Showing items {offset+1}-{total_items}." | |
| elif offset < 0: | |
| offset = 0 | |
| status_msg = "Start of data." | |
| else: | |
| status_msg = "Visualization created successfully!" | |
| fig = create_plotly_category(filtered_df, x_col, y_col, agg_method, top_n, offset=offset) | |
| # Generate insights for the visualization | |
| insights = generate_visualization_insights(viz_type, filtered_df, x_col, y_col) | |
| insights_text = f"## π Visualization Insights\n\n{insights}" | |
| return fig, status_msg, insights_text, offset | |
| elif viz_type == "Pie Chart": | |
| if not x_col: | |
| return None, "Please select Category column.", "", offset | |
| data = filtered_df[x_col].value_counts().head(top_n) | |
| fig = px.pie(values=data.values, names=data.index, title=f"Top {top_n} {x_col}") | |
| elif viz_type == "Scatter Plot": | |
| if not x_col or not y_col: | |
| return None, "Please select X and Y columns.", "", offset | |
| fig = create_plotly_scatter(filtered_df, x_col, y_col, color_col) | |
| elif viz_type == "Correlation Heatmap": | |
| fig = create_plotly_heatmap(filtered_df) | |
| if fig is None: | |
| return None, "Need at least 2 numerical columns.", "", offset | |
| # Generate insights for the visualization | |
| insights = generate_visualization_insights(viz_type, filtered_df, x_col, y_col) | |
| # Format insights with header | |
| insights_text = f"## π Visualization Insights\n\n{insights}" | |
| return fig, "Visualization created successfully!", insights_text, offset | |
| except Exception as e: | |
| return None, f"Error: {str(e)}", "", offset | |
| def generate_insights_report(): | |
| """Generate automated insights.""" | |
| global filtered_df | |
| if filtered_df is None or len(filtered_df) == 0: | |
| return "Please upload data first." | |
| try: | |
| insights = generate_all_insights(filtered_df) | |
| formatted_insights = format_insights_for_display(insights) | |
| return formatted_insights | |
| except Exception as e: | |
| return f"Error generating insights: {str(e)}" | |
| def export_filtered_data(): | |
| """Export filtered data to CSV.""" | |
| global filtered_df | |
| if filtered_df is None: | |
| return None, "No data to export." | |
| output_path = "filtered_data_export.csv" | |
| filtered_df.to_csv(output_path, index=False) | |
| return output_path, f"Data exported successfully! ({len(filtered_df)} rows)" | |
| def export_visualization(fig): | |
| """Export current visualization.""" | |
| if fig is None: | |
| return None, "No visualization to export." | |
| output_path = "visualization_export.png" | |
| fig.savefig(output_path, dpi=300, bbox_inches='tight') | |
| return output_path, "Visualization exported successfully!" | |
| def create_dashboard(): | |
| """Creates my main Gradio dashboard interface.""" | |
| with gr.Blocks(title="Business Intelligence Dashboard") as demo: | |
| gr.Markdown(""" | |
| # π Business Intelligence Dashboard | |
| ### Professional Data Analysis & Visualization Platform | |
| Upload your data, explore insights, create visualizations, and export results. | |
| """) | |
| # Dropdowns that will be populated across tabs | |
| x_column_viz = None | |
| y_column_viz = None | |
| color_column_viz = None | |
| num_filter_dropdown = None | |
| cat_filter_dropdown = None | |
| cat_filter_dropdown_2 = None | |
| date_filter_dropdown = None | |
| comp_cat_col_a = None | |
| comp_cat_col_b = None | |
| comp_cat_val_a = None | |
| comp_cat_val_b = None | |
| drill_col = None | |
| drill_val = None | |
| # Tab 1: Data Upload | |
| with gr.Tab("π Data Upload"): | |
| # Welcome Banner Removed | |
| gr.Markdown("### π€ Upload File") | |
| file_input = gr.File(label="Drop CSV or Excel file here", file_types=[".csv", ".xlsx", ".xls"], height=100) | |
| upload_btn = gr.Button("Load Data", variant="primary", size="lg") | |
| # Status & Preview Section | |
| gr.Markdown("### π Data Status") | |
| with gr.Row(): | |
| upload_message = gr.Textbox(label="System Status", value="Waiting for data...", interactive=False) | |
| upload_status = gr.Markdown() | |
| with gr.Accordion("π Data Preview", open=True): | |
| data_preview = gr.Dataframe(interactive=False) | |
| # Pagination Controls for Data Preview | |
| with gr.Row(): | |
| prev_preview_btn = gr.Button("β¬ οΈ Prev Batch", size="sm") | |
| next_preview_btn = gr.Button("Next Batch β‘οΈ", size="sm") | |
| preview_offset = gr.State(value=0) | |
| preview_batch_size = gr.State(value=10) | |
| # Tab 2: Statistics | |
| with gr.Tab("π Statistics"): | |
| gr.Markdown("## π Data Health & Statistics") | |
| stats_btn = gr.Button("Generate Statistics", variant="primary") | |
| # Metric Cards Row | |
| with gr.Row(): | |
| stat_rows = gr.Number(label="Total Rows", value=0) | |
| stat_cols = gr.Number(label="Total Columns", value=0) | |
| stat_dupes = gr.Number(label="Duplicate Rows", value=0) | |
| stat_missing = gr.Number(label="Missing Cells (%)", value=0) | |
| gr.Markdown("### π’ Numerical Statistics") | |
| numerical_stats = gr.Dataframe(label="Descriptive Statistics (Transposed)", interactive=False) | |
| with gr.Row(): | |
| with gr.Column(): | |
| gr.Markdown("### π Categorical Summary") | |
| categorical_stats = gr.Dataframe(label="Top Categories", interactive=False) | |
| with gr.Column(): | |
| gr.Markdown("### β οΈ Missing Values Report") | |
| missing_stats = gr.Dataframe(label="Missing Data by Column", interactive=False) | |
| gr.Markdown("### π₯ Correlation Matrix") | |
| corr_matrix_plot = gr.Plot(label="Correlation Matrix") | |
| stats_btn.click( | |
| fn=show_statistics, | |
| inputs=[], | |
| outputs=[stat_rows, stat_cols, stat_dupes, stat_missing, numerical_stats, categorical_stats, missing_stats, corr_matrix_plot] | |
| ) | |
| # Tab 3: Filter & Explore | |
| with gr.Tab("π Filter & Explore"): | |
| gr.Markdown("## Interactive Data Filtering") | |
| gr.Markdown(""" | |
| **How to use filters:** | |
| 1. Select a column from any filter section below | |
| 2. Set your filter criteria (range, values, or dates) | |
| 3. Click "Apply Filters" to see filtered results | |
| 4. You can combine multiple filters together | |
| 5. The filtered data will be used in Visualizations and Insights tabs | |
| """) | |
| with gr.Row(): | |
| with gr.Column(): | |
| gr.Markdown("### π’ Numerical Filters") | |
| gr.Markdown("*Filter by number ranges (e.g., Sales, Price, Quantity)*") | |
| num_filter_dropdown = gr.Dropdown(label="Select Numerical Column", choices=[], interactive=True) | |
| with gr.Row(): | |
| num_min = gr.Number(label="Minimum Value", placeholder="Min") | |
| num_max = gr.Number(label="Maximum Value", placeholder="Max") | |
| with gr.Column(): | |
| gr.Markdown("### π Categorical Filters") | |
| gr.Markdown("*Filter by categories (e.g., Product, Region)*") | |
| cat_filter_dropdown = gr.Dropdown(label="Select Categorical Column 1", choices=[], interactive=True) | |
| cat_filter_values = gr.CheckboxGroup(label="Select Values to Include", choices=[]) | |
| gr.Markdown("---") | |
| cat_filter_dropdown_2 = gr.Dropdown(label="Select Categorical Column 2 (Optional)", choices=[], interactive=True) | |
| cat_filter_values_2 = gr.CheckboxGroup(label="Select Values to Include", choices=[]) | |
| # Date filters in accordion (optional) | |
| with gr.Accordion("π Date Filters (Optional - Click to Expand)", open=False): | |
| gr.Markdown("*Filter by date ranges (format: YYYY-MM-DD)*") | |
| date_filter_dropdown = gr.Dropdown(label="Select Date Column", choices=[], interactive=True) | |
| with gr.Row(): | |
| date_start = gr.Textbox(label="Start Date", placeholder="YYYY-MM-DD") | |
| date_end = gr.Textbox(label="End Date", placeholder="YYYY-MM-DD") | |
| with gr.Row(): | |
| filter_btn = gr.Button("Apply Filters", variant="primary", size="lg") | |
| reset_filter_btn = gr.Button("Reset Filters", variant="secondary", size="lg") | |
| gr.Markdown("### Filter Results") | |
| filter_summary = gr.Markdown() | |
| with gr.Accordion("Filtered Data Preview", open=True): | |
| filtered_preview = gr.Dataframe(label="Filtered Data Preview") | |
| # Pagination Controls for Filtered Preview | |
| with gr.Row(): | |
| prev_filtered_btn = gr.Button("β¬ οΈ Prev Batch", size="sm") | |
| next_filtered_btn = gr.Button("Next Batch β‘οΈ", size="sm") | |
| filtered_offset = gr.State(value=0) | |
| filtered_batch_size = gr.State(value=20) | |
| row_count = gr.Number(label="Total Filtered Rows", interactive=False) | |
| # Update categorical values when column is selected | |
| def update_cat_values(col): | |
| if current_df is not None and col: | |
| values = current_df[col].unique().tolist() | |
| # Reset value to None to avoid "value not in choices" error | |
| return gr.update(choices=values, value=None) | |
| return gr.update(choices=[], value=None) | |
| cat_filter_dropdown.change( | |
| fn=update_cat_values, | |
| inputs=[cat_filter_dropdown], | |
| outputs=[cat_filter_values] | |
| ) | |
| cat_filter_dropdown_2.change( | |
| fn=update_cat_values, | |
| inputs=[cat_filter_dropdown_2], | |
| outputs=[cat_filter_values_2] | |
| ) | |
| filter_btn.click( | |
| fn=apply_filters_and_update, | |
| inputs=[num_filter_dropdown, num_min, num_max, cat_filter_dropdown, | |
| cat_filter_values, cat_filter_dropdown_2, cat_filter_values_2, | |
| date_filter_dropdown, date_start, date_end], | |
| outputs=[filter_summary, filtered_preview, row_count, filtered_offset] | |
| ) | |
| # Reset filters function | |
| def reset_filters(): | |
| global filtered_df, current_df | |
| if current_df is not None: | |
| filtered_df = current_df.copy() | |
| preview = filtered_df.head(20) | |
| return ( | |
| gr.update(value=None), # num_col | |
| gr.update(value=None), # num_min | |
| gr.update(value=None), # num_max | |
| gr.update(value=None), # cat_col | |
| gr.update(choices=[]), # cat_values | |
| gr.update(value=None), # cat_col_2 | |
| gr.update(choices=[]), # cat_values_2 | |
| gr.update(value=None), # date_col | |
| gr.update(value=""), # date_start | |
| gr.update(value=""), # date_end | |
| "Filters reset. Showing original data.", | |
| preview, | |
| len(filtered_df) | |
| ) | |
| return [gr.update()] * 10 + ["No data loaded.", None, 0] | |
| # Pagination Logic for Filtered Preview | |
| def update_filtered_offset(current_offset, direction, batch_size=20): | |
| if direction == "next": | |
| return current_offset + batch_size | |
| else: | |
| return max(0, current_offset - batch_size) | |
| prev_filtered_btn.click( | |
| fn=update_filtered_offset, | |
| inputs=[filtered_offset, gr.State("prev"), filtered_batch_size], | |
| outputs=[filtered_offset] | |
| ).then( | |
| fn=apply_filters_and_update, | |
| inputs=[ | |
| num_filter_dropdown, num_min, num_max, | |
| cat_filter_dropdown, cat_filter_values, | |
| cat_filter_dropdown_2, cat_filter_values_2, | |
| date_filter_dropdown, date_start, date_end, | |
| filtered_offset | |
| ], | |
| outputs=[filter_summary, filtered_preview, row_count, filtered_offset] | |
| ) | |
| next_filtered_btn.click( | |
| fn=update_filtered_offset, | |
| inputs=[filtered_offset, gr.State("next"), filtered_batch_size], | |
| outputs=[filtered_offset] | |
| ).then( | |
| fn=apply_filters_and_update, | |
| inputs=[ | |
| num_filter_dropdown, num_min, num_max, | |
| cat_filter_dropdown, cat_filter_values, | |
| cat_filter_dropdown_2, cat_filter_values_2, | |
| date_filter_dropdown, date_start, date_end, | |
| filtered_offset | |
| ], | |
| outputs=[filter_summary, filtered_preview, row_count, filtered_offset] | |
| ) | |
| # Update filter button to reset offset | |
| filter_btn.click( | |
| fn=lambda *args: apply_filters_and_update(*args, offset=0), | |
| inputs=[ | |
| num_filter_dropdown, num_min, num_max, | |
| cat_filter_dropdown, cat_filter_values, | |
| cat_filter_dropdown_2, cat_filter_values_2, | |
| date_filter_dropdown, date_start, date_end | |
| ], | |
| outputs=[filter_summary, filtered_preview, row_count, filtered_offset] | |
| ) | |
| # Pagination Logic for Data Preview | |
| def update_preview_offset(current_offset, direction, batch_size=10): | |
| if direction == "next": | |
| return current_offset + batch_size | |
| else: | |
| return max(0, current_offset - batch_size) | |
| prev_preview_btn.click( | |
| fn=update_preview_offset, | |
| inputs=[preview_offset, gr.State("prev"), preview_batch_size], | |
| outputs=[preview_offset] | |
| ).then( | |
| fn=update_preview_pagination, | |
| inputs=[preview_offset], | |
| outputs=[upload_status, data_preview, preview_offset] | |
| ) | |
| next_preview_btn.click( | |
| fn=update_preview_offset, | |
| inputs=[preview_offset, gr.State("next"), preview_batch_size], | |
| outputs=[preview_offset] | |
| ).then( | |
| fn=update_preview_pagination, | |
| inputs=[preview_offset], | |
| outputs=[upload_status, data_preview, preview_offset] | |
| ) | |
| # Update upload button to reset offset | |
| # Update upload button to reset offset | |
| # Tab 4: Visualizations | |
| with gr.Tab("π Visualizations"): | |
| gr.Markdown("## π¨ Create Interactive Visualizations") | |
| with gr.Row(): | |
| # Left Column: Chart Settings | |
| with gr.Column(scale=1): | |
| gr.Markdown("### 1. Chart Settings") | |
| viz_type = gr.Dropdown( | |
| label="Select Chart Type", | |
| choices=["Time Series", "Distribution (Histogram)", "Distribution (Box Plot)", | |
| "Bar Chart", "Pie Chart", "Scatter Plot", "Correlation Heatmap"], | |
| value="Bar Chart" | |
| ) | |
| # Dynamic help text based on selection could be added here, but static for now | |
| gr.Markdown(""" | |
| <div style="font-size: 12px; color: #888; margin-bottom: 10px;"> | |
| <b>Guide:</b><br> | |
| β’ <b>Bar/Pie</b>: Compare categories<br> | |
| β’ <b>Time Series</b>: Trends over time<br> | |
| β’ <b>Scatter</b>: Relationships between numbers<br> | |
| β’ <b>Distribution</b>: Spread of data | |
| </div> | |
| """) | |
| agg_method = gr.Dropdown( | |
| label="Aggregation (for grouped data)", | |
| choices=["sum", "mean", "count", "median"], | |
| value="sum" | |
| ) | |
| top_n = gr.Slider(label="Top N Categories", minimum=5, maximum=20, value=10, step=1) | |
| # Pagination Controls | |
| with gr.Row(): | |
| prev_batch_btn = gr.Button("β¬ οΈ Prev Batch", size="sm") | |
| next_batch_btn = gr.Button("Next Batch β‘οΈ", size="sm") | |
| viz_offset = gr.State(value=0) | |
| viz_btn = gr.Button("π Create Visualization", variant="primary") | |
| # Right Column: Data Selection | |
| with gr.Column(scale=2): | |
| gr.Markdown("### 2. Select Data") | |
| with gr.Row(): | |
| x_column_viz = gr.Dropdown(label="X-Axis / Category Column", choices=[], interactive=True) | |
| y_column_viz = gr.Dropdown(label="Y-Axis / Value Column", choices=[], interactive=True) | |
| color_column_viz = gr.Dropdown(label="Color / Grouping (Optional)", choices=[], interactive=True) | |
| # Visualization Output Area | |
| with gr.Row(): | |
| with gr.Column(scale=3): | |
| viz_plot = gr.Plot(label="Interactive Chart") | |
| with gr.Column(scale=1): | |
| gr.Markdown("### π‘ AI Insights") | |
| viz_insights = gr.Markdown(value="*Insights will appear here after generating a chart.*") | |
| viz_status = gr.Textbox(label="Status", interactive=False, visible=True) | |
| # Wrapper to reset offset when creating new visualization | |
| def create_viz_reset(viz_type, x_col, y_col, color_col, agg_method, top_n): | |
| return create_visualization(viz_type, x_col, y_col, color_col, agg_method, top_n, offset=0) | |
| viz_btn.click( | |
| fn=create_viz_reset, | |
| inputs=[viz_type, x_column_viz, y_column_viz, color_column_viz, agg_method, top_n], | |
| outputs=[viz_plot, viz_status, viz_insights, viz_offset] | |
| ) | |
| # Export Toolbar | |
| with gr.Row(variant="panel"): | |
| with gr.Column(scale=3): | |
| gr.Markdown("**Export Options:**") | |
| with gr.Column(scale=1): | |
| export_viz_btn = gr.Button("πΎ Download PNG", size="sm") | |
| export_viz_file = gr.File(label="Download File", visible=False) | |
| export_viz_status = gr.Textbox(visible=False) | |
| # Pagination Logic | |
| def update_viz_offset(current_offset, direction, top_n): | |
| if direction == "next": | |
| return current_offset + top_n | |
| else: | |
| return max(0, current_offset - top_n) | |
| prev_batch_btn.click( | |
| fn=update_viz_offset, | |
| inputs=[viz_offset, gr.State("prev"), top_n], | |
| outputs=[viz_offset] | |
| ).then( | |
| fn=create_visualization, | |
| inputs=[viz_type, x_column_viz, y_column_viz, color_column_viz, agg_method, top_n, viz_offset], | |
| outputs=[viz_plot, viz_status, viz_insights, viz_offset] | |
| ) | |
| next_batch_btn.click( | |
| fn=update_viz_offset, | |
| inputs=[viz_offset, gr.State("next"), top_n], | |
| outputs=[viz_offset] | |
| ).then( | |
| fn=create_visualization, | |
| inputs=[viz_type, x_column_viz, y_column_viz, color_column_viz, agg_method, top_n, viz_offset], | |
| outputs=[viz_plot, viz_status, viz_insights, viz_offset] | |
| ) | |
| export_viz_btn.click( | |
| fn=lambda: export_visualization(viz_plot.value) if viz_plot.value else (None, "No visualization to export"), | |
| inputs=[], | |
| outputs=[export_viz_file, export_viz_status] | |
| ) | |
| # Show file download when ready | |
| def show_download(file, status): | |
| return gr.update(visible=True), status | |
| export_viz_btn.click( | |
| fn=show_download, | |
| inputs=[export_viz_file, export_viz_status], | |
| outputs=[export_viz_file, export_viz_status] | |
| ) | |
| # Tab 5: Insights | |
| with gr.Tab("π‘ Insights"): | |
| gr.Markdown("## π§ Advanced AI Insights") | |
| gr.Markdown("Deep dive analysis of your data's performance, drivers, and risks.") | |
| with gr.Row(): | |
| insights_source = gr.Radio( | |
| choices=["Full Dataset", "Filtered Data"], | |
| value="Filtered Data", | |
| label="Analysis Scope", | |
| info="Choose whether to analyze the entire dataset or just the filtered subset." | |
| ) | |
| insights_btn = gr.Button("Generate Advanced Insights", variant="primary", size="lg") | |
| # Executive Summary Row | |
| gr.Markdown("### π Executive Summary") | |
| with gr.Row(): | |
| exec_card1 = gr.Markdown() | |
| exec_card2 = gr.Markdown() | |
| exec_card3 = gr.Markdown() | |
| # Detailed Analysis Rows | |
| with gr.Row(): | |
| with gr.Column(): | |
| gr.Markdown("### π Pareto Analysis (80/20 Rule)") | |
| pareto_output = gr.Markdown("*Click Generate to see vital few categories*") | |
| with gr.Column(): | |
| gr.Markdown("### π Key Drivers") | |
| drivers_output = gr.Markdown("*Click Generate to see what drives your metrics*") | |
| with gr.Row(): | |
| with gr.Column(): | |
| gr.Markdown("### π Segment Performance") | |
| segments_output = gr.Markdown("*Click Generate to see segment analysis*") | |
| with gr.Column(): | |
| gr.Markdown("### β οΈ Anomalies & Risks") | |
| anomalies_output = gr.Markdown("*Click Generate to see detected anomalies*") | |
| with gr.Accordion("π View Full Report", open=False): | |
| full_report_output = gr.Markdown("Generate insights to see the full report.") | |
| def update_insights(source): | |
| # from advanced_insights import generate_advanced_insights, format_advanced_insights | |
| global filtered_df, current_df | |
| target_df = filtered_df if source == "Filtered Data" else current_df | |
| if target_df is None: | |
| return ["Please upload data first."] * 8 | |
| try: | |
| insights = generate_advanced_insights(target_df) | |
| # Format Executive Summary cards | |
| cards = insights.get('executive_summary', []) | |
| card_outputs = [] | |
| for card in cards: | |
| html = f""" | |
| <div style="background-color: #2b2b2b; padding: 20px; border-radius: 10px; border: 1px solid #444;"> | |
| <div style="font-size: 24px; margin-bottom: 10px;">{card['icon']}</div> | |
| <div style="color: #888; font-size: 14px; text-transform: uppercase;">{card['title']}</div> | |
| <div style="color: white; font-size: 28px; font-weight: bold; margin: 5px 0;">{card['value']}</div> | |
| <div style="color: #aaa; font-size: 12px;">{card['description']}</div> | |
| </div> | |
| """ | |
| card_outputs.append(html) | |
| # Fill remaining cards if less than 3 | |
| while len(card_outputs) < 3: | |
| card_outputs.append("") | |
| # Format Pareto | |
| pareto = insights.get('pareto_analysis', []) | |
| if pareto: | |
| pareto_text = "" | |
| for p in pareto: | |
| pareto_text += f"#### {p['category']} Analysis\n" | |
| pareto_text += f"{p['insight']}\n\n" | |
| pareto_text += "**Vital Few (Top 5):**\n" + ", ".join([f"`{x}`" for x in p['vital_few']]) + "\n\n---\n" | |
| else: | |
| pareto_text = "No significant Pareto patterns found." | |
| # Format Drivers | |
| drivers = insights.get('key_drivers', []) | |
| if drivers: | |
| drivers_text = "" | |
| for d in drivers: | |
| drivers_text += f"- {d['insight']}\n" | |
| else: | |
| drivers_text = "No strong correlations found to identify key drivers." | |
| # Format Segments | |
| segments = insights.get('segment_analysis', []) | |
| if segments: | |
| seg_text = "" | |
| for s in segments: | |
| seg_text += f"- {s['insight']}\n" | |
| else: | |
| seg_text = "Not enough categorical data for segment analysis." | |
| # Format Anomalies | |
| anomalies = insights.get('anomalies', []) | |
| if anomalies: | |
| anom_text = "" | |
| for a in anomalies: | |
| anom_text += f"- {a['insight']}\n" | |
| else: | |
| anom_text = "β No significant anomalies detected." | |
| # Generate Full Report | |
| full_report = format_advanced_insights(insights) | |
| return card_outputs + [pareto_text, drivers_text, seg_text, anom_text, full_report] | |
| except Exception as e: | |
| return [f"Error: {str(e)}"] * 8 | |
| insights_btn.click( | |
| fn=update_insights, | |
| inputs=[insights_source], | |
| outputs=[exec_card1, exec_card2, exec_card3, pareto_output, drivers_output, segments_output, anomalies_output, full_report_output] | |
| ) | |
| # Tab 6: Smart Dashboard | |
| with gr.Tab("π Smart Dashboard"): | |
| smart_btn = gr.Button("β¨ Generate Smart Dashboard", variant="primary", size="lg") | |
| with gr.Row(): | |
| with gr.Column(): | |
| plot1 = gr.Plot() | |
| desc1 = gr.Markdown() | |
| with gr.Column(): | |
| plot2 = gr.Plot() | |
| desc2 = gr.Markdown() | |
| with gr.Row(): | |
| with gr.Column(): | |
| plot3 = gr.Plot() | |
| desc3 = gr.Markdown() | |
| with gr.Column(): | |
| plot4 = gr.Plot() | |
| desc4 = gr.Markdown() | |
| with gr.Row(): | |
| with gr.Column(): | |
| plot5 = gr.Plot() | |
| desc5 = gr.Markdown() | |
| with gr.Column(): | |
| plot6 = gr.Plot() | |
| desc6 = gr.Markdown() | |
| smart_status = gr.Textbox(label="Status", interactive=False) | |
| def update_smart_dashboard(): | |
| # from smart_dashboard import generate_smart_dashboard | |
| global filtered_df | |
| if filtered_df is None: | |
| return [None] * 12 + ["Please upload data first."] | |
| try: | |
| items = generate_smart_dashboard(filtered_df) | |
| outputs = [] | |
| # Fill up to 6 slots | |
| for i in range(6): | |
| if i < len(items): | |
| fig, title, insight = items[i] | |
| outputs.append(fig) | |
| outputs.append(f"### {title}\n\n{insight}") | |
| else: | |
| outputs.append(None) | |
| outputs.append("") | |
| outputs.append(f"Successfully generated {len(items)} visualizations based on your data patterns!") | |
| return outputs | |
| except Exception as e: | |
| return [None] * 12 + [f"Error generating dashboard: {str(e)}"] | |
| smart_btn.click( | |
| fn=update_smart_dashboard, | |
| inputs=[], | |
| outputs=[plot1, desc1, plot2, desc2, plot3, desc3, plot4, desc4, plot5, desc5, plot6, desc6, smart_status] | |
| ) | |
| # Tab 7: Comparison | |
| with gr.Tab("βοΈ Compare"): | |
| gr.Markdown("## Head-to-Head Comparison") | |
| gr.Markdown("Compare two segments of your data side-by-side (e.g., Region A vs Region B).") | |
| with gr.Row(): | |
| # Group A | |
| with gr.Column(variant="panel"): | |
| gr.Markdown("### Group A") | |
| comp_cat_col_a = gr.Dropdown(label="Filter Column", choices=[], interactive=True) | |
| comp_cat_val_a = gr.Dropdown(label="Filter Value", choices=[], interactive=True) | |
| # Group B | |
| with gr.Column(variant="panel"): | |
| gr.Markdown("### Group B") | |
| comp_cat_col_b = gr.Dropdown(label="Filter Column", choices=[], interactive=True) | |
| comp_cat_val_b = gr.Dropdown(label="Filter Value", choices=[], interactive=True) | |
| comp_btn = gr.Button("βοΈ Compare Groups", variant="primary", size="lg") | |
| gr.Markdown("### Comparison Results") | |
| with gr.Row(): | |
| comp_metric1 = gr.HTML() | |
| comp_metric2 = gr.HTML() | |
| comp_metric3 = gr.HTML() | |
| comp_metric4 = gr.HTML() | |
| comp_plot = gr.Plot(label="Side-by-Side Visualization") | |
| # Update values when column selected | |
| def update_comp_values(col): | |
| global current_df | |
| if current_df is not None and col: | |
| unique_vals = current_df[col].unique().tolist() | |
| if len(unique_vals) > 1000: | |
| # Limit to first 1000 to prevent freeze | |
| return gr.update(choices=unique_vals[:1000], label=f"Filter Value (Showing 1000/{len(unique_vals)})") | |
| return gr.update(choices=unique_vals, label="Filter Value") | |
| return gr.update(choices=[], label="Filter Value") | |
| comp_cat_col_a.change(update_comp_values, comp_cat_col_a, comp_cat_val_a) | |
| comp_cat_col_b.change(update_comp_values, comp_cat_col_b, comp_cat_val_b) | |
| def run_comparison(col_a, val_a, col_b, val_b): | |
| # from comparison import compare_datasets | |
| global current_df | |
| if current_df is None: | |
| return ["Please upload data first."] * 4 + [None] | |
| # Build filters | |
| filter_a = {col_a: {'type': 'categorical', 'values': [val_a]}} if col_a and val_a else {} | |
| filter_b = {col_b: {'type': 'categorical', 'values': [val_b]}} if col_b and val_b else {} | |
| label_a = f"{col_a}={val_a}" if col_a else "All Data" | |
| label_b = f"{col_b}={val_b}" if col_b else "All Data" | |
| metrics, fig = compare_datasets(current_df, filter_a, filter_b, label_a, label_b) | |
| # Pad metrics to 4 | |
| while len(metrics) < 4: | |
| metrics.append("") | |
| return metrics[:4] + [fig] | |
| comp_btn.click( | |
| fn=run_comparison, | |
| inputs=[comp_cat_col_a, comp_cat_val_a, comp_cat_col_b, comp_cat_val_b], | |
| outputs=[comp_metric1, comp_metric2, comp_metric3, comp_metric4, comp_plot] | |
| ) | |
| comp_btn.click( | |
| fn=run_comparison, | |
| inputs=[comp_cat_col_a, comp_cat_val_a, comp_cat_col_b, comp_cat_val_b], | |
| outputs=[comp_metric1, comp_metric2, comp_metric3, comp_metric4, comp_plot] | |
| ) | |
| # Tab 8: Segment Explorer | |
| with gr.Tab("π Segment Explorer"): | |
| gr.Markdown("## Deep Dive Explorer") | |
| gr.Markdown("Select a category to see a detailed mini-dashboard for that specific segment.") | |
| with gr.Row(): | |
| drill_col = gr.Dropdown(label="Select Category Column", choices=[], interactive=True) | |
| drill_val = gr.Dropdown(label="Select Value", choices=[], interactive=True) | |
| drill_btn = gr.Button("π Analyze Segment", variant="primary") | |
| gr.Markdown("### Segment Overview") | |
| with gr.Row(): | |
| drill_stat1 = gr.Number(label="Total Records") | |
| drill_stat2 = gr.Number(label="% of Total Data") | |
| drill_stat3 = gr.Number(label="Total Value (Sum of 1st Num Col)") | |
| with gr.Row(): | |
| drill_plot1 = gr.Plot(label="Trend over Time") | |
| drill_plot2 = gr.Plot(label="Top Associations") | |
| # Update values | |
| drill_col.change(update_comp_values, drill_col, drill_val) | |
| def run_drill_down(col, val): | |
| global current_df | |
| if current_df is None: | |
| return [0, 0, 0, None, None] | |
| if not col or not val: | |
| return [0, 0, 0, None, None] | |
| # Filter data | |
| subset = current_df[current_df[col] == val] | |
| # Stats | |
| count = len(subset) | |
| pct = (count / len(current_df)) * 100 | |
| col_types = get_column_types(current_df) | |
| total_val = 0 | |
| if col_types['numerical']: | |
| total_val = subset[col_types['numerical'][0]].sum() | |
| # Plot 1: Trend (if date exists) | |
| fig1 = None | |
| if col_types['datetime'] and col_types['numerical']: | |
| date_col = col_types['datetime'][0] | |
| num_col = col_types['numerical'][0] | |
| agg = subset.groupby(date_col)[num_col].sum().reset_index() | |
| fig1 = px.line(agg, x=date_col, y=num_col, title=f"{num_col} Trend for {val}") | |
| # Plot 2: Top Category (if another cat exists) | |
| fig2 = None | |
| other_cats = [c for c in col_types['categorical'] if c != col] | |
| if other_cats: | |
| cat2 = other_cats[0] | |
| top = subset[cat2].value_counts().head(10).reset_index() | |
| top.columns = [cat2, 'Count'] | |
| fig2 = px.bar(top, x=cat2, y='Count', title=f"Top {cat2} in {val}") | |
| return count, round(pct, 1), round(total_val, 2), fig1, fig2 | |
| drill_btn.click( | |
| fn=run_drill_down, | |
| inputs=[drill_col, drill_val], | |
| outputs=[drill_stat1, drill_stat2, drill_stat3, drill_plot1, drill_plot2] | |
| ) | |
| # Tab 9: Export | |
| with gr.Tab("πΎ Export"): | |
| gr.Markdown("## Export Your Data") | |
| export_data_btn = gr.Button("Export Filtered Data as CSV", variant="primary") | |
| export_file = gr.File(label="Download CSV") | |
| export_status = gr.Textbox(label="Export Status", interactive=False) | |
| export_data_btn.click( | |
| fn=export_filtered_data, | |
| inputs=[], | |
| outputs=[export_file, export_status] | |
| ) | |
| # Connect upload button to update all dropdowns | |
| upload_btn.click( | |
| fn=upload_and_preview_data, | |
| inputs=[file_input], | |
| outputs=[ | |
| upload_status, data_preview, row_count, preview_offset, | |
| num_filter_dropdown, num_min, num_max, | |
| cat_filter_dropdown, cat_filter_values, | |
| cat_filter_dropdown_2, cat_filter_values_2, | |
| date_filter_dropdown, date_start, date_end, | |
| x_column_viz, y_column_viz, color_column_viz, | |
| comp_cat_col_a, comp_cat_col_b, drill_col | |
| ] | |
| ) | |
| return demo | |
| if __name__ == "__main__": | |
| demo = create_dashboard() | |
| demo.launch(share=False, server_name="0.0.0.0", server_port=7860) | |