"""Gradio application wiring for the Business Intelligence dashboard.""" from __future__ import annotations import tempfile from typing import Any, Dict, Iterable, List, Optional, Tuple # Monkey-patch to fix gradio_client TypeError bug with boolean additionalProperties # This must be done BEFORE importing gradio import gradio_client.utils as _gc_utils _original_json_schema_to_python_type = _gc_utils._json_schema_to_python_type def _patched_json_schema_to_python_type(schema, defs=None): """Patched version that handles boolean schema values.""" # Handle boolean schemas (e.g., additionalProperties: true/false) if isinstance(schema, bool): return "Any" if schema else "None" return _original_json_schema_to_python_type(schema, defs) _gc_utils._json_schema_to_python_type = _patched_json_schema_to_python_type import gradio as gr import pandas as pd import matplotlib.figure as mpl_fig from data_processor import ( DatasetBundle, dataset_overview, dataset_preview, filter_dataframe, filter_metadata, load_dataset, load_sample_dataset, missing_value_report, numeric_summary, categorical_summary, correlation_matrix, sample_dataset_options, ) from insights import ( detect_anomalies, detect_trend, get_default_insight_columns, top_bottom_performers, ) from visualizations import ( create_category_plot, create_correlation_heatmap, create_distribution_plot, create_scatter_plot, create_time_series_plot, figure_to_png_bytes, ) def _format_overview_text(info: Dict[str, Any], source_name: str) -> str: """Render dataset information as Markdown.""" lines = [ f"**Source:** {source_name}", f"- Rows: {info['Rows']}", f"- Columns: {info['Columns']}", f"- Memory Usage: {info['Memory Usage (MB)']} MB", ] return "\n".join(lines) def _empty_dataframe(message: str = "No data available") -> pd.DataFrame: """Return a placeholder DataFrame for empty displays.""" return pd.DataFrame({"status": [message]}) DEFAULT_STATE = { "dataframe": None, "filtered_df": None, "column_types": None, "filter_meta": None, "source_name": None, "current_figure": None, } def _ensure_state(state) -> Dict[str, Any]: """Guarantee a dictionary-based state object.""" return state or DEFAULT_STATE.copy() def _current_dataframe(state, filtered: bool = True) -> pd.DataFrame: """Return the filtered or raw dataframe from state.""" state = _ensure_state(state) key = "filtered_df" if filtered else "dataframe" df = state.get(key) if isinstance(df, pd.DataFrame): return df raise ValueError("Please upload a dataset before performing this action.") def _finalize_dataset_load(bundle, state): """Populate shared outputs after a dataset is loaded.""" df = bundle.dataframe state = { "dataframe": df, "filtered_df": df, "column_types": { "numeric": bundle.column_types.numeric, "categorical": bundle.column_types.categorical, "datetime": bundle.column_types.datetime, }, "filter_meta": filter_metadata(df, bundle.column_types), "source_name": bundle.source_name, } overview = dataset_overview(df) preview = dataset_preview(df) status = f"✅ Loaded '{bundle.source_name}' with {df.shape[0]} rows and {df.shape[1]} columns." info_text = _format_overview_text(overview["info"], bundle.source_name) dtypes_df = overview["dtypes"] head_df = preview["head"] tail_df = preview["tail"] filter_preview = head_df row_count = f"Rows displayed: {len(df)}" return state, status, info_text, dtypes_df, head_df, tail_df, filter_preview, row_count def _handle_file_upload(file, state): """Load a dataset from the uploaded file.""" state = _ensure_state(state) try: bundle = load_dataset(file) except ValueError as exc: return ( state, f"❌ {exc}", "No dataset loaded.", _empty_dataframe(), _empty_dataframe(), _empty_dataframe(), _empty_dataframe(), "Rows displayed: 0", ) return _finalize_dataset_load(bundle, state) def _handle_sample_dataset(selection: Optional[str], state): """Load one of the bundled sample datasets.""" state = _ensure_state(state) if not selection: message = "Please choose a sample dataset before loading." empty = _empty_dataframe(message) return state, f"⚠️ {message}", "No dataset loaded.", empty, empty, empty, empty, "Rows displayed: 0" try: bundle = load_sample_dataset(selection) except ValueError as exc: empty = _empty_dataframe(str(exc)) return state, f"❌ {exc}", "No dataset loaded.", empty, empty, empty, empty, "Rows displayed: 0" return _finalize_dataset_load(bundle, state) def _populate_column_options( state, ): """Populate dropdown choices based on the uploaded dataset.""" state = _ensure_state(state) column_types = state.get("column_types") if not column_types: empty_dropdown = gr.update(choices=[], value=None, interactive=False, visible=True) hidden_checkbox = gr.update(choices=[], value=[], visible=False, interactive=False) return ( empty_dropdown, empty_dropdown, hidden_checkbox, empty_dropdown, empty_dropdown, empty_dropdown, empty_dropdown, empty_dropdown, empty_dropdown, empty_dropdown, empty_dropdown, empty_dropdown, empty_dropdown, empty_dropdown, empty_dropdown, empty_dropdown, ) numeric = list(column_types["numeric"]) categorical = list(column_types["categorical"]) datetime_cols = list(column_types["datetime"]) all_columns = list(state["dataframe"].columns) defaults = { "numeric": numeric[0] if numeric else None, "datetime": datetime_cols[0] if datetime_cols else None, } def dropdown(values: Iterable[str], default: Optional[str] = None): choices = list(values) value = default if default in choices else None return gr.update( choices=choices, value=value, interactive=bool(choices), visible=True, ) return ( dropdown(numeric), # numeric filter column dropdown(datetime_cols), # date filter column gr.update(choices=[], value=[], visible=False, interactive=False), # categorical values reset dropdown(categorical), # categorical filter column dropdown(all_columns, defaults.get("datetime")), # time series date dropdown(numeric, defaults.get("numeric")), # time series value dropdown(numeric), # distribution numeric dropdown(categorical), # category column dropdown(numeric), # category value dropdown(numeric), # scatter x dropdown(numeric), # scatter y gr.update(choices=all_columns, value=None, interactive=bool(all_columns), visible=True), # scatter color dropdown(numeric, defaults.get("numeric")), # insight numeric dropdown(datetime_cols, defaults.get("datetime")), # insight datetime dropdown(numeric, defaults.get("numeric")), # trend value dropdown(numeric, defaults.get("numeric")), # anomaly column ) def _update_numeric_inputs(column: Optional[str], state) -> Tuple[Any, Any]: """Update numeric min/max inputs when a column is selected.""" state = _ensure_state(state) hidden = gr.update(visible=False, value=None) if not column or "filter_meta" not in state: return hidden, hidden meta = state["filter_meta"]["numeric"].get(column) if not meta: return hidden, hidden minimum = float(meta["min"]) maximum = float(meta["max"]) return ( gr.update(value=minimum, visible=True, interactive=True, label=f"Min ({column})"), gr.update(value=maximum, visible=True, interactive=True, label=f"Max ({column})"), ) def _update_categorical_values(column: Optional[str], state): """Populate categorical value options for filtering.""" state = _ensure_state(state) if not column or "filter_meta" not in state: return gr.update(visible=False) values = state["filter_meta"]["categorical"].get(column, []) return gr.update( choices=values, value=values[: min(10, len(values))], visible=bool(values), interactive=bool(values), label=f"Values to include ({column})", ) def _update_date_bounds(column: Optional[str], state) -> Tuple[Any, Any]: """Populate date inputs when a date column is selected.""" state = _ensure_state(state) if not column or "filter_meta" not in state: hidden = gr.update(visible=False, value=None) return hidden, hidden meta = state["filter_meta"]["datetime"].get(column) if not meta: hidden = gr.update(visible=False, value=None) return hidden, hidden start = str(meta["min"]) end = str(meta["max"]) return ( gr.update(value=start, visible=True, label=f"Start date ({column})"), gr.update(value=end, visible=True, label=f"End date ({column})"), ) def _apply_filters( state, numeric_column: Optional[str], numeric_min: Optional[float], numeric_max: Optional[float], categorical_column: Optional[str], categorical_values: Optional[List[str]], date_column: Optional[str], start_date: Optional[str], end_date: Optional[str], ) -> Tuple[Dict[str, Any], pd.DataFrame, str]: """Filter the dataset according to user selections.""" state = _ensure_state(state) df = _current_dataframe(state, filtered=False) numeric_filters: Dict[str, Tuple[Optional[float], Optional[float]]] = {} categorical_filters: Dict[str, List[str]] = {} date_filters: Dict[str, Tuple[Optional[str], Optional[str]]] = {} if numeric_column and (numeric_min is not None or numeric_max is not None): lower = numeric_min upper = numeric_max if lower is not None and upper is not None and lower > upper: lower, upper = upper, lower numeric_filters[numeric_column] = (lower, upper) if categorical_column and categorical_values: categorical_filters[categorical_column] = categorical_values if date_column and (start_date or end_date): date_filters[date_column] = (start_date, end_date) filtered_df = filter_dataframe(df, numeric_filters, categorical_filters, date_filters) state["filtered_df"] = filtered_df row_count = f"Rows displayed: {len(filtered_df)}" preview = filtered_df.head(5) if not filtered_df.empty else _empty_dataframe("No rows match the filters.") return state, preview, row_count def _generate_statistics(state) -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame, pd.DataFrame, str]: """Produce summary statistics for the Statistics tab.""" state = _ensure_state(state) try: df = _current_dataframe(state, filtered=False) except ValueError as exc: message = str(exc) empty = _empty_dataframe(message) return empty, empty, empty, empty, f"⚠️ {message}" num_summary = numeric_summary(df) cat_summary = categorical_summary(df) missing = missing_value_report(df) corr = correlation_matrix(df) message = "Statistics generated successfully." return ( num_summary if not num_summary.empty else _empty_dataframe("No numeric columns available."), cat_summary if not cat_summary.empty else _empty_dataframe("No categorical columns available."), missing if not missing.empty else _empty_dataframe("No missing values detected."), corr if not corr.empty else _empty_dataframe("Not enough numeric columns for correlation."), message, ) def _generate_chart( state, chart_type: str, ts_date: Optional[str], ts_value: Optional[str], ts_agg: str, dist_column: Optional[str], dist_type: str, cat_column: Optional[str], cat_value: Optional[str], cat_chart_type: str, cat_agg: str, scatter_x: Optional[str], scatter_y: Optional[str], scatter_color: Optional[str], ) -> Tuple[Dict[str, Any], Any, str]: """Create a visualization based on user selections.""" state = _ensure_state(state) try: df = _current_dataframe(state, filtered=True) except ValueError as exc: state["current_figure"] = None return state, None, f"⚠️ {exc}" try: if chart_type == "Time Series": if not ts_date or not ts_value: raise ValueError("Select both a date and value column.") fig = create_time_series_plot(df, ts_date, ts_value, aggregation=ts_agg) elif chart_type == "Distribution": if not dist_column: raise ValueError("Select a numeric column for the distribution plot.") fig = create_distribution_plot(df, dist_column, plot_type=dist_type) elif chart_type == "Category": if not cat_column or not cat_value: raise ValueError("Select both category and value columns.") fig = create_category_plot(df, cat_column, cat_value, aggregation=cat_agg, chart_type=cat_chart_type.lower()) elif chart_type == "Scatter": if not scatter_x or not scatter_y: raise ValueError("Select x and y columns for the scatter plot.") fig = create_scatter_plot(df, scatter_x, scatter_y, color_column=scatter_color) elif chart_type == "Correlation Heatmap": fig = create_correlation_heatmap(df) else: raise ValueError("Unsupported chart type.") except ValueError as exc: state["current_figure"] = None return state, None, f"⚠️ {exc}" state["current_figure"] = fig return state, fig, "✅ Visualization generated. Use 'Export Chart' to download." def _download_filtered(state) -> str: """Export the filtered dataset to a temporary CSV file.""" state = _ensure_state(state) df = _current_dataframe(state, filtered=True) if df.empty: raise ValueError("There are no rows to export. Adjust your filters and try again.") temp = tempfile.NamedTemporaryFile(delete=False, suffix=".csv", prefix="filtered_", dir=".") df.to_csv(temp.name, index=False) temp.close() return temp.name def _download_chart(state) -> str: """Export the most recent chart to PNG.""" state = _ensure_state(state) fig = state.get("current_figure") if fig is None: raise ValueError("Generate a visualization before exporting.") buffer = figure_to_png_bytes(fig) temp = tempfile.NamedTemporaryFile(delete=False, suffix=".png", prefix="chart_", dir=".") with open(temp.name, "wb") as fp: fp.write(buffer.read()) return temp.name def _generate_insights( state, numeric_column: Optional[str], trend_date_column: Optional[str], trend_value_column: Optional[str], anomaly_column: Optional[str], ) -> Tuple[pd.DataFrame, pd.DataFrame, str, pd.DataFrame, str]: """Generate top/bottom performers, trends, and anomalies.""" state = _ensure_state(state) try: df = _current_dataframe(state, filtered=True) except ValueError as exc: empty = _empty_dataframe(str(exc)) return empty, empty, f"⚠️ {exc}", empty, f"⚠️ {exc}" status_messages: List[str] = [] top_df = bottom_df = _empty_dataframe("Select a numeric column for insights.") if numeric_column: try: performers = top_bottom_performers(df, numeric_column) top_df = performers["top"] bottom_df = performers["bottom"] status_messages.append(f"Top/bottom performers calculated for {numeric_column}.") except ValueError as exc: top_df = bottom_df = _empty_dataframe(str(exc)) status_messages.append(f"⚠️ {exc}") trend_text = "Select a date and value column to evaluate trend." if trend_date_column and trend_value_column: try: trend_text = detect_trend(df, trend_date_column, trend_value_column) except ValueError as exc: trend_text = f"⚠️ {exc}" anomaly_df = _empty_dataframe("Select a numeric column to detect anomalies.") if anomaly_column: anomalies = detect_anomalies(df, anomaly_column) anomaly_df = anomalies if not anomalies.empty else _empty_dataframe("No significant anomalies detected.") combined_status = "\n".join(status_messages) if status_messages else "Insights generated." return top_df, bottom_df, trend_text, anomaly_df, combined_status def _describe_sample_dataset(selection: Optional[str]) -> str: """Return a user-friendly description for the selected sample dataset.""" if not selection: return "Select a sample dataset to view its description." descriptions = sample_dataset_options() description = descriptions.get(selection) if not description: return "Sample dataset description unavailable. Ensure the file exists in the `data/` directory." return f"**{selection}**\n\n{description}" def create_dashboard(): with gr.Blocks(theme=gr.themes.Soft()) as demo: gr.Markdown("# Business Intelligence Dashboard") dataset_state = gr.State(DEFAULT_STATE.copy()) #last_figure_state = gr.State(None) sample_choices = list(sample_dataset_options().keys()) with gr.Tab("Data Upload"): with gr.Row(): file_input = gr.File(label="Upload CSV or Excel", file_types=[".csv", ".xlsx", ".xls"]) load_button = gr.Button("Load Data", variant="primary") gr.Markdown("Or load one of the curated datasets bundled with the project:") with gr.Row(): sample_dropdown = gr.Dropdown(label="Sample Dataset", choices=sample_choices, value=None, interactive=bool(sample_choices)) load_sample_button = gr.Button("Load Sample", variant="secondary", interactive=bool(sample_choices)) if sample_choices: sample_description = gr.Markdown("Select a sample dataset to view its description.") else: sample_description = gr.Markdown("⚠️ No sample datasets detected in the `data/` folder.") upload_status = gr.Markdown("No dataset loaded.") dataset_info = gr.Markdown() dtypes_table = gr.Dataframe(label="Column Types", interactive=False) with gr.Row(): head_table = gr.Dataframe(label="Preview (Head)", interactive=False) tail_table = gr.Dataframe(label="Preview (Tail)", interactive=False) with gr.Tab("Statistics"): stats_status = gr.Markdown() numeric_table = gr.Dataframe(label="Numeric Summary", interactive=False) categorical_table = gr.Dataframe(label="Categorical Summary", interactive=False) missing_table = gr.Dataframe(label="Missing Value Report", interactive=False) correlation_table = gr.Dataframe(label="Correlation Matrix", interactive=False) generate_stats_button = gr.Button("Generate Statistics", variant="secondary") with gr.Tab("Filter & Explore"): filter_status = gr.Markdown("Rows displayed: 0") with gr.Accordion("Numeric Filter", open=False): numeric_column_dropdown = gr.Dropdown(label="Numeric Column", choices=[]) numeric_min_input = gr.Number(label="Minimum Value", visible=False) numeric_max_input = gr.Number(label="Maximum Value", visible=False) with gr.Accordion("Categorical Filter", open=False): categorical_column_dropdown = gr.Dropdown(label="Category Column", choices=[]) categorical_values = gr.CheckboxGroup(label="Values", choices=[], visible=False) with gr.Accordion("Date Filter", open=False): date_column_dropdown = gr.Dropdown(label="Date Column", choices=[]) start_date_picker = gr.Textbox(label="Start Date (YYYY-MM-DD)", visible=False) end_date_picker = gr.Textbox(label="End Date (YYYY-MM-DD)", visible=False) apply_filters_button = gr.Button("Apply Filters", variant="primary") filter_preview_table = gr.Dataframe(label="Filtered Preview", interactive=False) export_filtered_button = gr.Button("Download Filtered Data", variant="secondary") export_filtered_file = gr.File(label="Filtered CSV", interactive=False) with gr.Tab("Visualizations"): viz_status = gr.Markdown() chart_type = gr.Radio( label="Chart Type", choices=["Time Series", "Distribution", "Category", "Scatter", "Correlation Heatmap"], value="Time Series", ) with gr.Column(visible=True) as time_series_controls: ts_date_column = gr.Dropdown(label="Date Column", choices=[]) ts_value_column = gr.Dropdown(label="Value Column", choices=[]) ts_aggregation = gr.Dropdown(label="Aggregation", choices=["sum", "mean", "median", "count"], value="sum") with gr.Column(visible=False) as distribution_controls: dist_column = gr.Dropdown(label="Numeric Column", choices=[]) dist_type = gr.Radio(label="Distribution Type", choices=["histogram", "box"], value="histogram") with gr.Column(visible=False) as category_controls: category_column = gr.Dropdown(label="Category Column", choices=[]) category_value_column = gr.Dropdown(label="Value Column", choices=[]) category_chart_type = gr.Radio(label="Chart Style", choices=["Bar", "Pie"], value="Bar") category_aggregation = gr.Dropdown(label="Aggregation", choices=["sum", "mean", "median", "count"], value="sum") with gr.Column(visible=False) as scatter_controls: scatter_x_column = gr.Dropdown(label="X Axis", choices=[]) scatter_y_column = gr.Dropdown(label="Y Axis", choices=[]) scatter_color_column = gr.Dropdown(label="Color (optional)", choices=[]) with gr.Row(): generate_chart_button = gr.Button("Generate Visualization", variant="primary") export_chart_button = gr.Button("Export Chart (PNG)", variant="secondary") chart_output = gr.Plot(label="Visualization") export_chart_file = gr.File(label="Exported Chart", interactive=False) with gr.Tab("Insights"): insights_status = gr.Markdown() insight_numeric_column = gr.Dropdown(label="Numeric Column", choices=[]) trend_date_column = gr.Dropdown(label="Date Column", choices=[]) trend_value_column = gr.Dropdown(label="Value Column", choices=[]) anomaly_column = gr.Dropdown(label="Column for Anomaly Detection", choices=[]) generate_insights_button = gr.Button("Generate Insights", variant="primary") top_table = gr.Dataframe(label="Top Performers", interactive=False) bottom_table = gr.Dataframe(label="Bottom Performers", interactive=False) trend_output = gr.Markdown() anomaly_table = gr.Dataframe(label="Potential Anomalies", interactive=False) # Interactions load_button.click( fn=_handle_file_upload, inputs=[file_input, dataset_state], outputs=[ dataset_state, upload_status, dataset_info, dtypes_table, head_table, tail_table, filter_preview_table, filter_status, ], ).then( fn=_populate_column_options, inputs=[dataset_state], outputs=[ numeric_column_dropdown, date_column_dropdown, categorical_values, categorical_column_dropdown, ts_date_column, ts_value_column, dist_column, category_column, category_value_column, scatter_x_column, scatter_y_column, scatter_color_column, insight_numeric_column, trend_date_column, trend_value_column, anomaly_column, ], ).then( fn=_generate_statistics, inputs=[dataset_state], outputs=[ numeric_table, categorical_table, missing_table, correlation_table, stats_status, ], ) load_sample_button.click( fn=_handle_sample_dataset, inputs=[sample_dropdown, dataset_state], outputs=[ dataset_state, upload_status, dataset_info, dtypes_table, head_table, tail_table, filter_preview_table, filter_status, ], ).then( fn=_populate_column_options, inputs=[dataset_state], outputs=[ numeric_column_dropdown, date_column_dropdown, categorical_values, categorical_column_dropdown, ts_date_column, ts_value_column, dist_column, category_column, category_value_column, scatter_x_column, scatter_y_column, scatter_color_column, insight_numeric_column, trend_date_column, trend_value_column, anomaly_column, ], ).then( fn=_generate_statistics, inputs=[dataset_state], outputs=[ numeric_table, categorical_table, missing_table, correlation_table, stats_status, ], ) sample_dropdown.change( fn=_describe_sample_dataset, inputs=[sample_dropdown], outputs=[sample_description], ) numeric_column_dropdown.change( fn=_update_numeric_inputs, inputs=[numeric_column_dropdown, dataset_state], outputs=[numeric_min_input, numeric_max_input], ) categorical_column_dropdown.change( fn=_update_categorical_values, inputs=[categorical_column_dropdown, dataset_state], outputs=[categorical_values], ) date_column_dropdown.change( fn=_update_date_bounds, inputs=[date_column_dropdown, dataset_state], outputs=[start_date_picker, end_date_picker], ) generate_stats_button.click( fn=_generate_statistics, inputs=[dataset_state], outputs=[numeric_table, categorical_table, missing_table, correlation_table, stats_status], ) apply_filters_button.click( fn=_apply_filters, inputs=[ dataset_state, numeric_column_dropdown, numeric_min_input, numeric_max_input, categorical_column_dropdown, categorical_values, date_column_dropdown, start_date_picker, end_date_picker, ], outputs=[dataset_state, filter_preview_table, filter_status], ) export_filtered_button.click( fn=_download_filtered, inputs=[dataset_state], outputs=[export_filtered_file], ) def _toggle_controls(selected: str) -> Tuple[Any, Any, Any, Any]: return ( gr.update(visible=selected == "Time Series"), gr.update(visible=selected == "Distribution"), gr.update(visible=selected == "Category"), gr.update(visible=selected == "Scatter"), ) chart_type.change( fn=_toggle_controls, inputs=[chart_type], outputs=[time_series_controls, distribution_controls, category_controls, scatter_controls], ) generate_chart_button.click( fn=_generate_chart, inputs=[ dataset_state, chart_type, ts_date_column, ts_value_column, ts_aggregation, dist_column, dist_type, category_column, category_value_column, category_chart_type, category_aggregation, scatter_x_column, scatter_y_column, scatter_color_column, ], outputs=[dataset_state, chart_output, viz_status], ) export_chart_button.click( fn=_download_chart, inputs=[dataset_state], outputs=[export_chart_file], ) generate_insights_button.click( fn=_generate_insights, inputs=[ dataset_state, insight_numeric_column, trend_date_column, trend_value_column, anomaly_column, ], outputs=[ top_table, bottom_table, trend_output, anomaly_table, insights_status, ], ) return demo if __name__ == "__main__": demo = create_dashboard() demo.launch(server_name="0.0.0.0", server_port=7860, share=True)