BI-dashboard / app.py
Lohith Venkat Chamakura
Initial commit
48909ac
"""
Main Gradio application for the Business Intelligence Dashboard.
This module creates a Tableau-like interactive dashboard interface
for data exploration and analysis.
"""
import gradio as gr
import pandas as pd
import numpy as np
from typing import Optional, Dict, List, Tuple, Any
import io
import base64
from PIL import Image
import plotly.graph_objects as go
from data_processor import DataLoader, DataFilter, DataProfiler
from visualizations import VisualizationFactory
from insights import InsightGenerator
from utils import detect_column_types, get_missing_value_summary
from constants import (
PREVIEW_ROWS,
FILTERED_PREVIEW_ROWS,
MAX_COLUMNS_DISPLAY,
MAX_UNIQUE_VALUES_DISPLAY,
EXPORT_IMAGE_WIDTH,
EXPORT_IMAGE_HEIGHT,
EXPORT_IMAGE_SCALE,
EXPORT_IMAGE_FILENAME,
EXPORT_HTML_FILENAME,
DEFAULT_TOP_N,
KB_CONVERSION,
TEXTBOX_LINES_DEFAULT,
TEXTBOX_LINES_INSIGHTS
)
# Global state
current_df: Optional[pd.DataFrame] = None
current_filters: Dict[str, Any] = {}
current_figure: Optional[go.Figure] = None
def load_and_preview_data(file) -> Tuple[str, pd.DataFrame, str]:
"""
Load data file and return preview information.
Args:
file: Uploaded file object (can be string path or file object in Gradio 6.0.2)
Returns:
Tuple of (info_text, preview_df, error_message)
"""
global current_df, current_filters
if file is None:
return "No file uploaded", None, ""
try:
loader = DataLoader()
# Handle both string paths and file objects (Gradio 6.0.2 compatibility)
file_path = file if isinstance(file, str) else file.name
df, error = loader.load_data(file_path)
if error:
return f"Error: {error}", None, error
current_df = df
current_filters = {}
# Get basic info
profiler = DataProfiler()
info = profiler.get_basic_info(df)
info_text = f"""
**Dataset Information:**
- **Shape:** {info['shape'][0]:,} rows × {info['shape'][1]} columns
- **Memory Usage:** {info['memory_usage'] / KB_CONVERSION:.2f} KB
- **Columns:** {', '.join(info['columns'][:MAX_COLUMNS_DISPLAY])}{'...' if len(info['columns']) > MAX_COLUMNS_DISPLAY else ''}
"""
# Preview first rows
preview_df = df.head(PREVIEW_ROWS)
return info_text, preview_df, ""
except Exception as e:
return f"Error loading file: {str(e)}", None, str(e)
def get_statistics() -> Tuple[str, pd.DataFrame, pd.DataFrame, pd.DataFrame]:
"""
Generate comprehensive statistics for the loaded dataset.
Returns:
Tuple of (missing_values_text, numerical_stats, categorical_stats, correlation_matrix)
"""
global current_df
if current_df is None or current_df.empty:
return "No data loaded", pd.DataFrame(), pd.DataFrame(), pd.DataFrame()
try:
profiler = DataProfiler()
# Missing values
missing_df = get_missing_value_summary(current_df)
if missing_df.empty:
missing_text = "✅ No missing values found in the dataset."
else:
missing_text = "**Missing Values Summary:**\n\n"
missing_text += missing_df.to_string(index=False)
# Numerical statistics
numerical_stats = profiler.get_numerical_stats(current_df)
# Categorical statistics
categorical_stats = profiler.get_categorical_stats(current_df)
# Correlation matrix
correlation_matrix = profiler.get_correlation_matrix(current_df)
return missing_text, numerical_stats, categorical_stats, correlation_matrix
except Exception as e:
return f"Error generating statistics: {str(e)}", pd.DataFrame(), pd.DataFrame(), pd.DataFrame()
def update_column_dropdowns():
"""
Update column dropdown choices based on loaded data.
Returns:
Tuple of update dictionaries for x_column and y_column dropdowns
"""
global current_df
if current_df is None or current_df.empty:
return gr.update(choices=[]), gr.update(choices=[])
all_columns = list(current_df.columns)
return gr.update(choices=all_columns), gr.update(choices=all_columns)
def apply_simple_filters(
filter_column: Optional[str],
filter_type: str,
min_val: Optional[float],
max_val: Optional[float],
selected_values: List[str]
) -> Tuple[str, pd.DataFrame, int]:
"""
Apply a single filter to the dataset.
Args:
filter_column: Column to filter on
filter_type: Type of filter (numerical/categorical)
min_val: Minimum value for numerical filter
max_val: Maximum value for numerical filter
selected_values: Selected values for categorical filter
Returns:
Tuple of (info_text, filtered_df, row_count)
"""
global current_df, current_filters
if current_df is None or current_df.empty:
return "No data loaded", pd.DataFrame(), 0
if filter_column is None or filter_column == "":
# No filter applied, return original data
current_filters = {}
row_count = len(current_df)
info_text = f"**Dataset:** {row_count:,} rows (no filters applied)"
return info_text, current_df.head(FILTERED_PREVIEW_ROWS), row_count
try:
filters = {}
numerical, categorical, date_columns = detect_column_types(current_df)
if filter_type == "numerical" and filter_column in numerical:
if min_val is not None and max_val is not None:
original_min = float(current_df[filter_column].min())
original_max = float(current_df[filter_column].max())
if min_val != original_min or max_val != original_max:
filters[filter_column] = (min_val, max_val)
elif filter_type == "categorical" and filter_column in categorical:
if selected_values:
all_vals = sorted(current_df[filter_column].dropna().unique().tolist())
if set(selected_values) != set(all_vals):
filters[filter_column] = selected_values
# Apply filters
data_filter = DataFilter()
filtered_df = data_filter.apply_filters(current_df, filters)
current_filters = filters
row_count = len(filtered_df)
info_text = f"**Filtered Dataset:** {row_count:,} rows (from {len(current_df):,} original rows)"
return info_text, filtered_df.head(FILTERED_PREVIEW_ROWS), row_count
except Exception as e:
return f"Error applying filters: {str(e)}", pd.DataFrame(), 0
def get_filter_options() -> Tuple[List[str], str, Dict]:
"""
Get filter options based on current data.
Returns:
Tuple of (column_choices, default_type, filter_component_updates)
"""
global current_df
if current_df is None or current_df.empty:
return [], "numerical", {}
numerical, categorical, date_columns = detect_column_types(current_df)
all_columns = list(current_df.columns)
# Determine default filter type
default_type = "numerical" if numerical else "categorical" if categorical else "numerical"
return all_columns, default_type, {}
def create_visualization(
chart_type: str,
x_column: Optional[str],
y_column: Optional[str],
aggregation: str,
category_chart_type: str = 'bar'
) -> go.Figure:
"""
Create visualization based on user selections.
Args:
chart_type: Type of chart to create
x_column: X-axis column
y_column: Y-axis column
aggregation: Aggregation method
category_chart_type: Type for category charts (bar/pie)
Returns:
Plotly figure object
"""
global current_df, current_filters, current_figure
if current_df is None or current_df.empty:
current_figure = None
return None
try:
# Apply current filters
if current_filters:
data_filter = DataFilter()
df = data_filter.apply_filters(current_df, current_filters)
else:
df = current_df.copy()
if df.empty:
current_figure = None
return None
# Validate required columns for specific chart types
if chart_type in ['time_series', 'scatter']:
if not x_column or not y_column:
# Return a simple error message plot
fig = go.Figure()
fig.add_annotation(
text="Please select both X and Y columns for this chart type",
xref="paper", yref="paper",
x=0.5, y=0.5, showarrow=False,
font=dict(size=16)
)
fig.update_layout(title="Missing Required Columns")
current_figure = fig
return fig
factory = VisualizationFactory()
# Handle category chart type and distribution chart type
# Pass sub-type (bar/pie for category, histogram/box for distribution) in kwargs
# Use 'sub_chart_type' key to avoid conflict with factory's 'chart_type' parameter
kwargs = {}
if chart_type == 'category':
kwargs['sub_chart_type'] = category_chart_type
elif chart_type == 'distribution':
kwargs['sub_chart_type'] = 'histogram'
fig = factory.create_visualization(
chart_type=chart_type,
df=df,
x_column=x_column,
y_column=y_column,
aggregation=aggregation,
**kwargs
)
# Store the figure globally for export
current_figure = fig
return fig
except Exception as e:
print(f"Error creating visualization: {e}")
# Return a simple error message plot
fig = go.Figure()
fig.add_annotation(
text=f"Error creating visualization: {str(e)}",
xref="paper", yref="paper",
x=0.5, y=0.5, showarrow=False,
font=dict(size=14)
)
fig.update_layout(title="Visualization Error")
current_figure = fig
return fig
def generate_insights() -> Tuple[str, str, str]:
"""
Generate automated insights from the data.
Returns:
Tuple of (summary_insights, top_performers, trend_analysis)
"""
global current_df, current_filters
if current_df is None or current_df.empty:
return "No data loaded", "", ""
try:
# Apply filters if any
if current_filters:
data_filter = DataFilter()
df = data_filter.apply_filters(current_df, current_filters)
else:
df = current_df.copy()
generator = InsightGenerator()
# Summary insights
summary = generator.generate_summary_insights(df)
summary_text = "\n".join([f"• {insight}" for insight in summary])
# Top/Bottom performers
numerical, _, _ = detect_column_types(df)
top_bottom_text = ""
if numerical:
# Use first numerical column
col = numerical[0]
performers = generator.get_top_bottom_performers(df, col, top_n=DEFAULT_TOP_N)
top_bottom_text = f"**Top {DEFAULT_TOP_N} Performers for '{col}':**\n"
for idx, val in performers['top']:
top_bottom_text += f" • Row {idx}: {val:,.2f}\n"
top_bottom_text += f"\n**Bottom {DEFAULT_TOP_N} Performers for '{col}':**\n"
for idx, val in performers['bottom']:
top_bottom_text += f" • Row {idx}: {val:,.2f}\n"
# Trend analysis
date_cols = [col for col in df.columns if 'date' in col.lower() or 'time' in col.lower()]
trend_text = ""
if date_cols and numerical:
date_col = date_cols[0]
value_col = numerical[0]
trend = generator.detect_trends(df, date_col, value_col)
trend_text = f"**Trend Analysis ({value_col} over {date_col}):**\n"
trend_text += f" • {trend.get('message', 'No trend detected')}\n"
return summary_text, top_bottom_text, trend_text
except Exception as e:
return f"Error generating insights: {str(e)}", "", ""
def export_data() -> str:
"""
Export filtered data as CSV.
Returns:
Path to exported CSV file
"""
global current_df, current_filters
if current_df is None or current_df.empty:
return None
try:
# Apply filters
if current_filters:
data_filter = DataFilter()
df = data_filter.apply_filters(current_df, current_filters)
else:
df = current_df.copy()
# Save to temporary file
output_path = "filtered_data_export.csv"
df.to_csv(output_path, index=False)
return output_path
except Exception as e:
print(f"Error exporting data: {e}")
return None
def export_visualization(fig) -> Optional[str]:
"""
Export visualization as PNG or HTML.
Args:
fig: Plotly figure object or PlotData from Gradio (can be None)
Returns:
Path to exported file, or None if no figure
"""
global current_figure
# Use the stored figure instead of the PlotData object from Gradio
plotly_fig = current_figure
if plotly_fig is None:
return None
try:
output_path = EXPORT_IMAGE_FILENAME
# Try to export as PNG, fallback to HTML if kaleido not available
try:
plotly_fig.write_image(
output_path,
width=EXPORT_IMAGE_WIDTH,
height=EXPORT_IMAGE_HEIGHT,
scale=EXPORT_IMAGE_SCALE
)
except Exception as img_error:
# If image export fails, save as HTML instead
try:
output_path = EXPORT_HTML_FILENAME
plotly_fig.write_html(output_path)
except Exception as html_error:
print(f"Error exporting visualization: {html_error}")
return None
return output_path
except Exception as e:
print(f"Error exporting visualization: {e}")
return None
def create_dashboard():
"""Create and configure the Gradio dashboard interface."""
with gr.Blocks(title="Business Intelligence Dashboard") as demo:
gr.Markdown(
"""
# 📊 Business Intelligence Dashboard
**Interactive Data Analysis and Visualization Platform**
Upload your dataset and explore insights through an intuitive, Tableau-like interface.
"""
)
# State to store current dataframe
df_state = gr.State(value=None)
# Tab 1: Data Upload
with gr.Tab("📁 Data Upload & Preview"):
with gr.Row():
with gr.Column(scale=1):
file_input = gr.File(
label="Upload Dataset",
file_types=[".csv", ".xlsx", ".xls"],
type="filepath"
)
upload_btn = gr.Button("Load Data", variant="primary", size="lg")
with gr.Column(scale=2):
info_output = gr.Markdown("Upload a CSV or Excel file to begin.")
preview_output = gr.Dataframe(
label=f"Data Preview (First {PREVIEW_ROWS} Rows)",
interactive=False,
wrap=True
)
upload_btn.click(
fn=load_and_preview_data,
inputs=[file_input],
outputs=[info_output, preview_output, df_state]
)
# Tab 2: Statistics
with gr.Tab("📈 Statistics & Profiling"):
with gr.Row():
with gr.Column():
stats_btn = gr.Button("Generate Statistics", variant="primary")
missing_output = gr.Textbox(
label="Missing Values Report",
lines=TEXTBOX_LINES_DEFAULT,
interactive=False
)
with gr.Column():
numerical_stats_output = gr.Dataframe(
label="Numerical Statistics",
interactive=False,
wrap=True
)
with gr.Row():
categorical_stats_output = gr.Dataframe(
label="Categorical Statistics",
interactive=False,
wrap=True
)
correlation_output = gr.Dataframe(
label="Correlation Matrix",
interactive=False,
wrap=True
)
stats_btn.click(
fn=get_statistics,
inputs=[],
outputs=[missing_output, numerical_stats_output, categorical_stats_output, correlation_output]
)
# Tab 3: Filter & Explore
with gr.Tab("🔍 Filter & Explore"):
with gr.Row():
with gr.Column(scale=1):
filter_info = gr.Markdown("**Apply filters to explore your data:**")
filter_column = gr.Dropdown(
choices=[],
label="Select Column to Filter",
interactive=True
)
filter_type = gr.Radio(
choices=["numerical", "categorical"],
label="Filter Type",
value="numerical",
interactive=True
)
with gr.Group(visible=True) as numerical_filter_group:
min_val_input = gr.Number(label="Minimum Value", interactive=True)
max_val_input = gr.Number(label="Maximum Value", interactive=True)
with gr.Group(visible=False) as categorical_filter_group:
selected_values = gr.CheckboxGroup(
choices=[],
label="Select Values",
interactive=True
)
filter_btn = gr.Button("Apply Filter", variant="primary")
clear_filter_btn = gr.Button("Clear Filters", variant="secondary")
with gr.Column(scale=2):
filter_result_info = gr.Markdown("")
filtered_data_output = gr.Dataframe(
label=f"Filtered Data Preview (First {FILTERED_PREVIEW_ROWS} Rows)",
interactive=False,
wrap=True
)
row_count_output = gr.Number(
label="Filtered Row Count",
interactive=False
)
def update_filter_ui(column, filter_type_val):
"""Update filter UI based on column and type selection."""
global current_df
if current_df is None or current_df.empty or not column:
return (
gr.update(visible=False),
gr.update(visible=False),
gr.update(value=None),
gr.update(value=None),
gr.update(choices=[])
)
numerical, categorical, _ = detect_column_types(current_df)
if filter_type_val == "numerical" and column in numerical:
min_val = float(current_df[column].min())
max_val = float(current_df[column].max())
return (
gr.update(visible=True),
gr.update(visible=False),
gr.update(value=min_val, label=f"Min {column}"),
gr.update(value=max_val, label=f"Max {column}"),
gr.update(choices=[])
)
elif filter_type_val == "categorical" and column in categorical:
unique_vals = sorted(
current_df[column].dropna().unique().tolist()
)[:MAX_UNIQUE_VALUES_DISPLAY]
return (
gr.update(visible=False),
gr.update(visible=True),
gr.update(value=None),
gr.update(value=None),
gr.update(choices=unique_vals, value=unique_vals)
)
else:
return (
gr.update(visible=False),
gr.update(visible=False),
gr.update(value=None),
gr.update(value=None),
gr.update(choices=[])
)
filter_column.change(
fn=update_filter_ui,
inputs=[filter_column, filter_type],
outputs=[numerical_filter_group, categorical_filter_group,
min_val_input, max_val_input, selected_values]
)
filter_type.change(
fn=update_filter_ui,
inputs=[filter_column, filter_type],
outputs=[numerical_filter_group, categorical_filter_group,
min_val_input, max_val_input, selected_values]
)
filter_btn.click(
fn=apply_simple_filters,
inputs=[filter_column, filter_type, min_val_input, max_val_input, selected_values],
outputs=[filter_result_info, filtered_data_output, row_count_output]
)
def clear_filters():
"""Clear all filters."""
global current_filters
current_filters = {}
if current_df is not None:
row_count = len(current_df)
info_text = f"**Dataset:** {row_count:,} rows (filters cleared)"
return info_text, current_df.head(FILTERED_PREVIEW_ROWS), row_count
return "No data loaded", pd.DataFrame(), 0
clear_filter_btn.click(
fn=clear_filters,
inputs=[],
outputs=[filter_result_info, filtered_data_output, row_count_output]
)
def update_filter_column_choices():
"""Update filter column dropdown when data is loaded."""
global current_df
if current_df is not None and not current_df.empty:
return gr.update(choices=list(current_df.columns))
return gr.update(choices=[])
# Update filter column choices when data is loaded
upload_btn.click(
fn=update_filter_column_choices,
inputs=[],
outputs=[filter_column],
queue=False
)
# Tab 4: Visualizations
with gr.Tab("📊 Visualizations"):
with gr.Row():
with gr.Column(scale=1):
chart_type = gr.Dropdown(
choices=[
("Time Series", "time_series"),
("Distribution (Histogram)", "distribution"),
("Category Analysis", "category"),
("Scatter Plot", "scatter"),
("Correlation Heatmap", "correlation")
],
label="Chart Type",
value="time_series"
)
x_column = gr.Dropdown(
choices=[],
label="X-Axis Column",
interactive=True
)
y_column = gr.Dropdown(
choices=[],
label="Y-Axis Column (Optional)",
interactive=True
)
aggregation = gr.Dropdown(
choices=["sum", "mean", "count", "median", "none"],
label="Aggregation Method",
value="sum"
)
category_chart_type = gr.Radio(
choices=["bar", "pie"],
label="Category Chart Type",
value="bar",
visible=False
)
viz_btn = gr.Button("Generate Visualization", variant="primary")
export_viz_btn = gr.Button("Export Visualization", variant="secondary")
export_viz_file = gr.File(label="Download Visualization (PNG or HTML)")
with gr.Column(scale=2):
visualization_output = gr.Plot(
label="Visualization",
container=True
)
def toggle_category_type(chart_type_val):
"""Show/hide category chart type based on selection."""
return gr.update(visible=(chart_type_val == "category"))
def update_viz_column_choices():
"""Update column dropdowns based on loaded data."""
global current_df
if current_df is not None and not current_df.empty:
all_columns = list(current_df.columns)
return gr.update(choices=all_columns), gr.update(choices=all_columns)
return gr.update(choices=[]), gr.update(choices=[])
chart_type.change(
fn=toggle_category_type,
inputs=[chart_type],
outputs=[category_chart_type]
)
# Update visualization column choices when data is loaded
upload_btn.click(
fn=update_viz_column_choices,
inputs=[],
outputs=[x_column, y_column],
queue=False
)
viz_btn.click(
fn=create_visualization,
inputs=[chart_type, x_column, y_column, aggregation, category_chart_type],
outputs=[visualization_output]
)
export_viz_btn.click(
fn=export_visualization,
inputs=[visualization_output],
outputs=[export_viz_file]
)
# Tab 5: Insights
with gr.Tab("💡 Insights"):
with gr.Row():
insights_btn = gr.Button("Generate Insights", variant="primary", size="lg")
with gr.Row():
with gr.Column():
summary_insights = gr.Markdown("### Summary Insights")
summary_output = gr.Textbox(
label="",
lines=TEXTBOX_LINES_DEFAULT,
interactive=False
)
with gr.Column():
top_bottom_output = gr.Textbox(
label="Top/Bottom Performers",
lines=TEXTBOX_LINES_DEFAULT,
interactive=False
)
trend_output = gr.Textbox(
label="Trend Analysis",
lines=TEXTBOX_LINES_INSIGHTS,
interactive=False
)
insights_btn.click(
fn=generate_insights,
inputs=[],
outputs=[summary_output, top_bottom_output, trend_output]
)
# Tab 6: Export
with gr.Tab("💾 Export"):
with gr.Row():
with gr.Column():
gr.Markdown("### Export Filtered Data")
export_data_btn = gr.Button("Export as CSV", variant="primary")
export_data_file = gr.File(label="Download CSV")
export_data_btn.click(
fn=export_data,
inputs=[],
outputs=[export_data_file]
)
return demo
if __name__ == "__main__":
demo = create_dashboard()
demo.launch(
share=False,
server_name="0.0.0.0",
server_port=7860,
theme=gr.themes.Soft()
)