""" Main Gradio application for the Business Intelligence Dashboard. This module creates a Tableau-like interactive dashboard interface for data exploration and analysis. """ import gradio as gr import pandas as pd import numpy as np from typing import Optional, Dict, List, Tuple, Any import io import base64 from PIL import Image import plotly.graph_objects as go from data_processor import DataLoader, DataFilter, DataProfiler from visualizations import VisualizationFactory from insights import InsightGenerator from utils import detect_column_types, get_missing_value_summary from constants import ( PREVIEW_ROWS, FILTERED_PREVIEW_ROWS, MAX_COLUMNS_DISPLAY, MAX_UNIQUE_VALUES_DISPLAY, EXPORT_IMAGE_WIDTH, EXPORT_IMAGE_HEIGHT, EXPORT_IMAGE_SCALE, EXPORT_IMAGE_FILENAME, EXPORT_HTML_FILENAME, DEFAULT_TOP_N, KB_CONVERSION, TEXTBOX_LINES_DEFAULT, TEXTBOX_LINES_INSIGHTS ) # Global state current_df: Optional[pd.DataFrame] = None current_filters: Dict[str, Any] = {} current_figure: Optional[go.Figure] = None def load_and_preview_data(file) -> Tuple[str, pd.DataFrame, str]: """ Load data file and return preview information. Args: file: Uploaded file object (can be string path or file object in Gradio 6.0.2) Returns: Tuple of (info_text, preview_df, error_message) """ global current_df, current_filters if file is None: return "No file uploaded", None, "" try: loader = DataLoader() # Handle both string paths and file objects (Gradio 6.0.2 compatibility) file_path = file if isinstance(file, str) else file.name df, error = loader.load_data(file_path) if error: return f"Error: {error}", None, error current_df = df current_filters = {} # Get basic info profiler = DataProfiler() info = profiler.get_basic_info(df) info_text = f""" **Dataset Information:** - **Shape:** {info['shape'][0]:,} rows × {info['shape'][1]} columns - **Memory Usage:** {info['memory_usage'] / KB_CONVERSION:.2f} KB - **Columns:** {', '.join(info['columns'][:MAX_COLUMNS_DISPLAY])}{'...' if len(info['columns']) > MAX_COLUMNS_DISPLAY else ''} """ # Preview first rows preview_df = df.head(PREVIEW_ROWS) return info_text, preview_df, "" except Exception as e: return f"Error loading file: {str(e)}", None, str(e) def get_statistics() -> Tuple[str, pd.DataFrame, pd.DataFrame, pd.DataFrame]: """ Generate comprehensive statistics for the loaded dataset. Returns: Tuple of (missing_values_text, numerical_stats, categorical_stats, correlation_matrix) """ global current_df if current_df is None or current_df.empty: return "No data loaded", pd.DataFrame(), pd.DataFrame(), pd.DataFrame() try: profiler = DataProfiler() # Missing values missing_df = get_missing_value_summary(current_df) if missing_df.empty: missing_text = "✅ No missing values found in the dataset." else: missing_text = "**Missing Values Summary:**\n\n" missing_text += missing_df.to_string(index=False) # Numerical statistics numerical_stats = profiler.get_numerical_stats(current_df) # Categorical statistics categorical_stats = profiler.get_categorical_stats(current_df) # Correlation matrix correlation_matrix = profiler.get_correlation_matrix(current_df) return missing_text, numerical_stats, categorical_stats, correlation_matrix except Exception as e: return f"Error generating statistics: {str(e)}", pd.DataFrame(), pd.DataFrame(), pd.DataFrame() def update_column_dropdowns(): """ Update column dropdown choices based on loaded data. Returns: Tuple of update dictionaries for x_column and y_column dropdowns """ global current_df if current_df is None or current_df.empty: return gr.update(choices=[]), gr.update(choices=[]) all_columns = list(current_df.columns) return gr.update(choices=all_columns), gr.update(choices=all_columns) def apply_simple_filters( filter_column: Optional[str], filter_type: str, min_val: Optional[float], max_val: Optional[float], selected_values: List[str] ) -> Tuple[str, pd.DataFrame, int]: """ Apply a single filter to the dataset. Args: filter_column: Column to filter on filter_type: Type of filter (numerical/categorical) min_val: Minimum value for numerical filter max_val: Maximum value for numerical filter selected_values: Selected values for categorical filter Returns: Tuple of (info_text, filtered_df, row_count) """ global current_df, current_filters if current_df is None or current_df.empty: return "No data loaded", pd.DataFrame(), 0 if filter_column is None or filter_column == "": # No filter applied, return original data current_filters = {} row_count = len(current_df) info_text = f"**Dataset:** {row_count:,} rows (no filters applied)" return info_text, current_df.head(FILTERED_PREVIEW_ROWS), row_count try: filters = {} numerical, categorical, date_columns = detect_column_types(current_df) if filter_type == "numerical" and filter_column in numerical: if min_val is not None and max_val is not None: original_min = float(current_df[filter_column].min()) original_max = float(current_df[filter_column].max()) if min_val != original_min or max_val != original_max: filters[filter_column] = (min_val, max_val) elif filter_type == "categorical" and filter_column in categorical: if selected_values: all_vals = sorted(current_df[filter_column].dropna().unique().tolist()) if set(selected_values) != set(all_vals): filters[filter_column] = selected_values # Apply filters data_filter = DataFilter() filtered_df = data_filter.apply_filters(current_df, filters) current_filters = filters row_count = len(filtered_df) info_text = f"**Filtered Dataset:** {row_count:,} rows (from {len(current_df):,} original rows)" return info_text, filtered_df.head(FILTERED_PREVIEW_ROWS), row_count except Exception as e: return f"Error applying filters: {str(e)}", pd.DataFrame(), 0 def get_filter_options() -> Tuple[List[str], str, Dict]: """ Get filter options based on current data. Returns: Tuple of (column_choices, default_type, filter_component_updates) """ global current_df if current_df is None or current_df.empty: return [], "numerical", {} numerical, categorical, date_columns = detect_column_types(current_df) all_columns = list(current_df.columns) # Determine default filter type default_type = "numerical" if numerical else "categorical" if categorical else "numerical" return all_columns, default_type, {} def create_visualization( chart_type: str, x_column: Optional[str], y_column: Optional[str], aggregation: str, category_chart_type: str = 'bar' ) -> go.Figure: """ Create visualization based on user selections. Args: chart_type: Type of chart to create x_column: X-axis column y_column: Y-axis column aggregation: Aggregation method category_chart_type: Type for category charts (bar/pie) Returns: Plotly figure object """ global current_df, current_filters, current_figure if current_df is None or current_df.empty: current_figure = None return None try: # Apply current filters if current_filters: data_filter = DataFilter() df = data_filter.apply_filters(current_df, current_filters) else: df = current_df.copy() if df.empty: current_figure = None return None # Validate required columns for specific chart types if chart_type in ['time_series', 'scatter']: if not x_column or not y_column: # Return a simple error message plot fig = go.Figure() fig.add_annotation( text="Please select both X and Y columns for this chart type", xref="paper", yref="paper", x=0.5, y=0.5, showarrow=False, font=dict(size=16) ) fig.update_layout(title="Missing Required Columns") current_figure = fig return fig factory = VisualizationFactory() # Handle category chart type and distribution chart type # Pass sub-type (bar/pie for category, histogram/box for distribution) in kwargs # Use 'sub_chart_type' key to avoid conflict with factory's 'chart_type' parameter kwargs = {} if chart_type == 'category': kwargs['sub_chart_type'] = category_chart_type elif chart_type == 'distribution': kwargs['sub_chart_type'] = 'histogram' fig = factory.create_visualization( chart_type=chart_type, df=df, x_column=x_column, y_column=y_column, aggregation=aggregation, **kwargs ) # Store the figure globally for export current_figure = fig return fig except Exception as e: print(f"Error creating visualization: {e}") # Return a simple error message plot fig = go.Figure() fig.add_annotation( text=f"Error creating visualization: {str(e)}", xref="paper", yref="paper", x=0.5, y=0.5, showarrow=False, font=dict(size=14) ) fig.update_layout(title="Visualization Error") current_figure = fig return fig def generate_insights() -> Tuple[str, str, str]: """ Generate automated insights from the data. Returns: Tuple of (summary_insights, top_performers, trend_analysis) """ global current_df, current_filters if current_df is None or current_df.empty: return "No data loaded", "", "" try: # Apply filters if any if current_filters: data_filter = DataFilter() df = data_filter.apply_filters(current_df, current_filters) else: df = current_df.copy() generator = InsightGenerator() # Summary insights summary = generator.generate_summary_insights(df) summary_text = "\n".join([f"• {insight}" for insight in summary]) # Top/Bottom performers numerical, _, _ = detect_column_types(df) top_bottom_text = "" if numerical: # Use first numerical column col = numerical[0] performers = generator.get_top_bottom_performers(df, col, top_n=DEFAULT_TOP_N) top_bottom_text = f"**Top {DEFAULT_TOP_N} Performers for '{col}':**\n" for idx, val in performers['top']: top_bottom_text += f" • Row {idx}: {val:,.2f}\n" top_bottom_text += f"\n**Bottom {DEFAULT_TOP_N} Performers for '{col}':**\n" for idx, val in performers['bottom']: top_bottom_text += f" • Row {idx}: {val:,.2f}\n" # Trend analysis date_cols = [col for col in df.columns if 'date' in col.lower() or 'time' in col.lower()] trend_text = "" if date_cols and numerical: date_col = date_cols[0] value_col = numerical[0] trend = generator.detect_trends(df, date_col, value_col) trend_text = f"**Trend Analysis ({value_col} over {date_col}):**\n" trend_text += f" • {trend.get('message', 'No trend detected')}\n" return summary_text, top_bottom_text, trend_text except Exception as e: return f"Error generating insights: {str(e)}", "", "" def export_data() -> str: """ Export filtered data as CSV. Returns: Path to exported CSV file """ global current_df, current_filters if current_df is None or current_df.empty: return None try: # Apply filters if current_filters: data_filter = DataFilter() df = data_filter.apply_filters(current_df, current_filters) else: df = current_df.copy() # Save to temporary file output_path = "filtered_data_export.csv" df.to_csv(output_path, index=False) return output_path except Exception as e: print(f"Error exporting data: {e}") return None def export_visualization(fig) -> Optional[str]: """ Export visualization as PNG or HTML. Args: fig: Plotly figure object or PlotData from Gradio (can be None) Returns: Path to exported file, or None if no figure """ global current_figure # Use the stored figure instead of the PlotData object from Gradio plotly_fig = current_figure if plotly_fig is None: return None try: output_path = EXPORT_IMAGE_FILENAME # Try to export as PNG, fallback to HTML if kaleido not available try: plotly_fig.write_image( output_path, width=EXPORT_IMAGE_WIDTH, height=EXPORT_IMAGE_HEIGHT, scale=EXPORT_IMAGE_SCALE ) except Exception as img_error: # If image export fails, save as HTML instead try: output_path = EXPORT_HTML_FILENAME plotly_fig.write_html(output_path) except Exception as html_error: print(f"Error exporting visualization: {html_error}") return None return output_path except Exception as e: print(f"Error exporting visualization: {e}") return None def create_dashboard(): """Create and configure the Gradio dashboard interface.""" with gr.Blocks(title="Business Intelligence Dashboard") as demo: gr.Markdown( """ # 📊 Business Intelligence Dashboard **Interactive Data Analysis and Visualization Platform** Upload your dataset and explore insights through an intuitive, Tableau-like interface. """ ) # State to store current dataframe df_state = gr.State(value=None) # Tab 1: Data Upload with gr.Tab("📁 Data Upload & Preview"): with gr.Row(): with gr.Column(scale=1): file_input = gr.File( label="Upload Dataset", file_types=[".csv", ".xlsx", ".xls"], type="filepath" ) upload_btn = gr.Button("Load Data", variant="primary", size="lg") with gr.Column(scale=2): info_output = gr.Markdown("Upload a CSV or Excel file to begin.") preview_output = gr.Dataframe( label=f"Data Preview (First {PREVIEW_ROWS} Rows)", interactive=False, wrap=True ) upload_btn.click( fn=load_and_preview_data, inputs=[file_input], outputs=[info_output, preview_output, df_state] ) # Tab 2: Statistics with gr.Tab("📈 Statistics & Profiling"): with gr.Row(): with gr.Column(): stats_btn = gr.Button("Generate Statistics", variant="primary") missing_output = gr.Textbox( label="Missing Values Report", lines=TEXTBOX_LINES_DEFAULT, interactive=False ) with gr.Column(): numerical_stats_output = gr.Dataframe( label="Numerical Statistics", interactive=False, wrap=True ) with gr.Row(): categorical_stats_output = gr.Dataframe( label="Categorical Statistics", interactive=False, wrap=True ) correlation_output = gr.Dataframe( label="Correlation Matrix", interactive=False, wrap=True ) stats_btn.click( fn=get_statistics, inputs=[], outputs=[missing_output, numerical_stats_output, categorical_stats_output, correlation_output] ) # Tab 3: Filter & Explore with gr.Tab("🔍 Filter & Explore"): with gr.Row(): with gr.Column(scale=1): filter_info = gr.Markdown("**Apply filters to explore your data:**") filter_column = gr.Dropdown( choices=[], label="Select Column to Filter", interactive=True ) filter_type = gr.Radio( choices=["numerical", "categorical"], label="Filter Type", value="numerical", interactive=True ) with gr.Group(visible=True) as numerical_filter_group: min_val_input = gr.Number(label="Minimum Value", interactive=True) max_val_input = gr.Number(label="Maximum Value", interactive=True) with gr.Group(visible=False) as categorical_filter_group: selected_values = gr.CheckboxGroup( choices=[], label="Select Values", interactive=True ) filter_btn = gr.Button("Apply Filter", variant="primary") clear_filter_btn = gr.Button("Clear Filters", variant="secondary") with gr.Column(scale=2): filter_result_info = gr.Markdown("") filtered_data_output = gr.Dataframe( label=f"Filtered Data Preview (First {FILTERED_PREVIEW_ROWS} Rows)", interactive=False, wrap=True ) row_count_output = gr.Number( label="Filtered Row Count", interactive=False ) def update_filter_ui(column, filter_type_val): """Update filter UI based on column and type selection.""" global current_df if current_df is None or current_df.empty or not column: return ( gr.update(visible=False), gr.update(visible=False), gr.update(value=None), gr.update(value=None), gr.update(choices=[]) ) numerical, categorical, _ = detect_column_types(current_df) if filter_type_val == "numerical" and column in numerical: min_val = float(current_df[column].min()) max_val = float(current_df[column].max()) return ( gr.update(visible=True), gr.update(visible=False), gr.update(value=min_val, label=f"Min {column}"), gr.update(value=max_val, label=f"Max {column}"), gr.update(choices=[]) ) elif filter_type_val == "categorical" and column in categorical: unique_vals = sorted( current_df[column].dropna().unique().tolist() )[:MAX_UNIQUE_VALUES_DISPLAY] return ( gr.update(visible=False), gr.update(visible=True), gr.update(value=None), gr.update(value=None), gr.update(choices=unique_vals, value=unique_vals) ) else: return ( gr.update(visible=False), gr.update(visible=False), gr.update(value=None), gr.update(value=None), gr.update(choices=[]) ) filter_column.change( fn=update_filter_ui, inputs=[filter_column, filter_type], outputs=[numerical_filter_group, categorical_filter_group, min_val_input, max_val_input, selected_values] ) filter_type.change( fn=update_filter_ui, inputs=[filter_column, filter_type], outputs=[numerical_filter_group, categorical_filter_group, min_val_input, max_val_input, selected_values] ) filter_btn.click( fn=apply_simple_filters, inputs=[filter_column, filter_type, min_val_input, max_val_input, selected_values], outputs=[filter_result_info, filtered_data_output, row_count_output] ) def clear_filters(): """Clear all filters.""" global current_filters current_filters = {} if current_df is not None: row_count = len(current_df) info_text = f"**Dataset:** {row_count:,} rows (filters cleared)" return info_text, current_df.head(FILTERED_PREVIEW_ROWS), row_count return "No data loaded", pd.DataFrame(), 0 clear_filter_btn.click( fn=clear_filters, inputs=[], outputs=[filter_result_info, filtered_data_output, row_count_output] ) def update_filter_column_choices(): """Update filter column dropdown when data is loaded.""" global current_df if current_df is not None and not current_df.empty: return gr.update(choices=list(current_df.columns)) return gr.update(choices=[]) # Update filter column choices when data is loaded upload_btn.click( fn=update_filter_column_choices, inputs=[], outputs=[filter_column], queue=False ) # Tab 4: Visualizations with gr.Tab("📊 Visualizations"): with gr.Row(): with gr.Column(scale=1): chart_type = gr.Dropdown( choices=[ ("Time Series", "time_series"), ("Distribution (Histogram)", "distribution"), ("Category Analysis", "category"), ("Scatter Plot", "scatter"), ("Correlation Heatmap", "correlation") ], label="Chart Type", value="time_series" ) x_column = gr.Dropdown( choices=[], label="X-Axis Column", interactive=True ) y_column = gr.Dropdown( choices=[], label="Y-Axis Column (Optional)", interactive=True ) aggregation = gr.Dropdown( choices=["sum", "mean", "count", "median", "none"], label="Aggregation Method", value="sum" ) category_chart_type = gr.Radio( choices=["bar", "pie"], label="Category Chart Type", value="bar", visible=False ) viz_btn = gr.Button("Generate Visualization", variant="primary") export_viz_btn = gr.Button("Export Visualization", variant="secondary") export_viz_file = gr.File(label="Download Visualization (PNG or HTML)") with gr.Column(scale=2): visualization_output = gr.Plot( label="Visualization", container=True ) def toggle_category_type(chart_type_val): """Show/hide category chart type based on selection.""" return gr.update(visible=(chart_type_val == "category")) def update_viz_column_choices(): """Update column dropdowns based on loaded data.""" global current_df if current_df is not None and not current_df.empty: all_columns = list(current_df.columns) return gr.update(choices=all_columns), gr.update(choices=all_columns) return gr.update(choices=[]), gr.update(choices=[]) chart_type.change( fn=toggle_category_type, inputs=[chart_type], outputs=[category_chart_type] ) # Update visualization column choices when data is loaded upload_btn.click( fn=update_viz_column_choices, inputs=[], outputs=[x_column, y_column], queue=False ) viz_btn.click( fn=create_visualization, inputs=[chart_type, x_column, y_column, aggregation, category_chart_type], outputs=[visualization_output] ) export_viz_btn.click( fn=export_visualization, inputs=[visualization_output], outputs=[export_viz_file] ) # Tab 5: Insights with gr.Tab("💡 Insights"): with gr.Row(): insights_btn = gr.Button("Generate Insights", variant="primary", size="lg") with gr.Row(): with gr.Column(): summary_insights = gr.Markdown("### Summary Insights") summary_output = gr.Textbox( label="", lines=TEXTBOX_LINES_DEFAULT, interactive=False ) with gr.Column(): top_bottom_output = gr.Textbox( label="Top/Bottom Performers", lines=TEXTBOX_LINES_DEFAULT, interactive=False ) trend_output = gr.Textbox( label="Trend Analysis", lines=TEXTBOX_LINES_INSIGHTS, interactive=False ) insights_btn.click( fn=generate_insights, inputs=[], outputs=[summary_output, top_bottom_output, trend_output] ) # Tab 6: Export with gr.Tab("💾 Export"): with gr.Row(): with gr.Column(): gr.Markdown("### Export Filtered Data") export_data_btn = gr.Button("Export as CSV", variant="primary") export_data_file = gr.File(label="Download CSV") export_data_btn.click( fn=export_data, inputs=[], outputs=[export_data_file] ) return demo if __name__ == "__main__": demo = create_dashboard() demo.launch( share=False, server_name="0.0.0.0", server_port=7860, theme=gr.themes.Soft() )