yogesh882's picture
Update app.py
81a727a verified
"""
Business Intelligence Dashboard - Main Application
A professional, interactive dashboard for ANY data analysis.
Works with any CSV or Excel dataset - fully generic!
Author: BI Dashboard Team
Version: 2.0.0
"""
import gradio as gr
import pandas as pd
import numpy as np
import tempfile
import os
# Import custom modules
from data_processor import (
load_data, clean_data, get_data_info, get_data_preview, get_summary_statistics,
get_missing_value_report, get_correlation_matrix,
get_column_unique_values, aggregate_data, export_to_csv
)
from visualizations import (
create_time_series_plot, create_distribution_plot, create_category_bar_chart,
create_pie_chart, create_scatter_plot, create_correlation_heatmap, save_plot
)
from insights import (
generate_all_insights, generate_top_performers, detect_anomalies
)
from utils import get_numeric_columns, get_categorical_columns, get_datetime_columns, AGGREGATION_METHODS
# Global state for data
current_data = {"df": None, "filtered_df": None}
def upload_and_process(file):
"""Handle file upload and initial processing."""
if file is None:
empty_update = gr.update(choices=[], value=None)
return (
"⚠️ Please upload a CSV or Excel file.",
None, None,
empty_update, empty_update, empty_update,
empty_update, empty_update,
empty_update, empty_update, empty_update,
empty_update, empty_update, empty_update, empty_update,
empty_update
)
df, message = load_data(file.name)
if df is None:
empty_update = gr.update(choices=[], value=None)
return (
f"❌ {message}",
None, None,
empty_update, empty_update, empty_update,
empty_update, empty_update,
empty_update, empty_update, empty_update,
empty_update, empty_update, empty_update, empty_update,
empty_update
)
current_data["df"] = df
current_data["filtered_df"] = df.copy()
return create_ui_outputs(df, message)
def create_ui_outputs(df, message):
"""Create all UI outputs after data load or clean."""
info = get_data_info(df)
info_text = f"""## βœ… Data Loaded Successfully!
### Dataset Overview
- **Rows:** {info['rows']:,}
- **Columns:** {info['columns']}
- **Memory Usage:** {info['memory_usage'] / 1024 / 1024:.2f} MB
- **Total Missing Values:** {info['total_missing']:,}
### Column Types
- **Numeric:** {len(info['numeric_columns'])} columns
- **Categorical:** {len(info['categorical_columns'])} columns
- **DateTime:** {len(info['datetime_columns'])} columns
### Columns
{', '.join(info['column_names'])}
"""
preview = get_data_preview(df, 10)
dtypes_df = pd.DataFrame({
'Column': df.columns,
'Type': df.dtypes.astype(str).values,
'Non-Null': df.notna().sum().values,
'Missing': df.isna().sum().values,
'Missing %': (df.isna().sum() / len(df) * 100).round(2).values
})
numeric_cols = get_numeric_columns(df)
categorical_cols = get_categorical_columns(df)
datetime_cols = get_datetime_columns(df)
default_date = datetime_cols[0] if datetime_cols else None
default_numeric = numeric_cols[0] if numeric_cols else None
default_numeric2 = numeric_cols[1] if len(numeric_cols) > 1 else default_numeric
default_cat = categorical_cols[0] if categorical_cols else None
color_choices = ["None"] + categorical_cols
# Get unique values for default category column
default_cat_values = get_column_unique_values(df, default_cat) if default_cat else []
return (
info_text,
preview,
dtypes_df,
# Time series
gr.update(choices=datetime_cols, value=default_date),
gr.update(choices=numeric_cols, value=default_numeric),
# Distribution
gr.update(choices=numeric_cols, value=default_numeric),
# Category charts
gr.update(choices=categorical_cols, value=default_cat),
gr.update(choices=numeric_cols, value=default_numeric),
# Scatter
gr.update(choices=numeric_cols, value=default_numeric),
gr.update(choices=numeric_cols, value=default_numeric2),
gr.update(choices=color_choices, value="None"),
# Filters
gr.update(choices=datetime_cols, value=default_date),
gr.update(choices=categorical_cols, value=default_cat),
gr.update(choices=default_cat_values, value=None),
gr.update(choices=numeric_cols, value=default_numeric),
# Insights
gr.update(choices=categorical_cols, value=default_cat)
)
def clean_data_action():
"""Clean the current data."""
df = current_data.get("df")
if df is None:
return ("⚠️ Please upload data first.", None, None) + tuple([gr.update()] * 13)
cleaned_df, report = clean_data(df)
current_data["df"] = cleaned_df
current_data["filtered_df"] = cleaned_df.copy()
outputs = create_ui_outputs(cleaned_df, "Data cleaned successfully!")
return (report, outputs[1], outputs[2]) + outputs[3:]
def get_statistics():
"""Generate summary statistics."""
df = current_data.get("df")
if df is None:
return None, None, None, None
num_stats, cat_stats = get_summary_statistics(df)
missing_report = get_missing_value_report(df)
corr_matrix = get_correlation_matrix(df)
return num_stats, cat_stats, missing_report, corr_matrix
def update_filter_values(filter_cat_col):
"""Update filter dropdown values when category column changes."""
df = current_data.get("df")
if df is None or not filter_cat_col:
return gr.update(choices=[], value=None)
values = get_column_unique_values(df, filter_cat_col)
return gr.update(choices=values, value=None)
def apply_filters(filter_date_col, filter_start_date, filter_end_date,
filter_cat_col, filter_cat_values,
filter_num_col, filter_num_min, filter_num_max,
filter_search_term):
"""Apply filters to the data."""
df = current_data.get("df")
if df is None:
return "Please upload data first.", None, 0
filtered_df = df.copy()
filters_applied = []
# Date filter
if filter_date_col and filter_date_col in df.columns:
if filter_start_date and str(filter_start_date).strip():
try:
start = pd.to_datetime(filter_start_date)
filtered_df = filtered_df[filtered_df[filter_date_col] >= start]
filters_applied.append(f"{filter_date_col} >= {filter_start_date}")
except Exception as e:
print(f"Date start error: {e}")
if filter_end_date and str(filter_end_date).strip():
try:
end = pd.to_datetime(filter_end_date)
filtered_df = filtered_df[filtered_df[filter_date_col] <= end]
filters_applied.append(f"{filter_date_col} <= {filter_end_date}")
except Exception as e:
print(f"Date end error: {e}")
# Categorical filter
if filter_cat_col and filter_cat_col in df.columns and filter_cat_values:
if isinstance(filter_cat_values, str):
filter_cat_values = [filter_cat_values]
if len(filter_cat_values) > 0:
filtered_df = filtered_df[filtered_df[filter_cat_col].isin(filter_cat_values)]
filters_applied.append(f"{filter_cat_col} in {filter_cat_values}")
# Numeric filter
if filter_num_col and filter_num_col in df.columns:
if filter_num_min is not None and filter_num_min != "":
try:
filtered_df = filtered_df[filtered_df[filter_num_col] >= float(filter_num_min)]
filters_applied.append(f"{filter_num_col} >= {filter_num_min}")
except (ValueError, TypeError) as e:
print(f"Num min error: {e}")
if filter_num_max is not None and filter_num_max != "":
try:
filtered_df = filtered_df[filtered_df[filter_num_col] <= float(filter_num_max)]
filters_applied.append(f"{filter_num_col} <= {filter_num_max}")
except (ValueError, TypeError) as e:
print(f"Num max error: {e}")
# Text search filter
if filter_search_term and str(filter_search_term).strip():
cat_cols = get_categorical_columns(df)
if cat_cols:
search_col = cat_cols[0]
mask = filtered_df[search_col].fillna('').astype(str).str.lower().str.contains(
str(filter_search_term).lower(), regex=False
)
filtered_df = filtered_df[mask]
filters_applied.append(f"Search '{filter_search_term}' in {search_col}")
current_data["filtered_df"] = filtered_df
if filters_applied:
summary = f"βœ… **Filters applied:** {', '.join(filters_applied)}\n\n"
summary += f"πŸ“Š **Results:** {len(filtered_df):,} rows (from {len(df):,} original rows)"
else:
summary = f"ℹ️ No filters applied. Showing all {len(filtered_df):,} rows."
preview = get_data_preview(filtered_df, 20)
return summary, preview, len(filtered_df)
def reset_filters():
"""Reset all filters."""
df = current_data.get("df")
if df is not None:
current_data["filtered_df"] = df.copy()
return (
"ℹ️ Filters reset. Showing all data.",
get_data_preview(df, 20),
len(df),
"", "", # date inputs
None, # cat values
None, None, # num min/max
"" # search term
)
return "No data loaded.", None, 0, "", "", None, None, None, ""
def create_visualization(viz_type, date_col, ts_value_col, time_freq, ts_agg_method,
dist_col, cat_col, cat_value_col, cat_agg_method, top_n,
scatter_x, scatter_y, color_col):
"""Create visualization based on user selections."""
import matplotlib.pyplot as plt
plt.close('all')
df = current_data.get("filtered_df")
if df is None:
return None, "Please upload data first."
try:
if viz_type == "Time Series":
if not date_col or date_col not in df.columns:
return None, "❌ Please select a valid Date column."
if not ts_value_col or ts_value_col not in df.columns:
return None, "❌ Please select a valid Value column."
fig, _ = create_time_series_plot(
df, date_col, ts_value_col, ts_agg_method, time_freq,
f"{ts_value_col} Over Time ({ts_agg_method.capitalize()}) - {time_freq}"
)
return fig, f"βœ… Time Series created! Column: {ts_value_col}, Freq: {time_freq}, Agg: {ts_agg_method}"
elif viz_type == "Distribution (Histogram)":
if not dist_col or dist_col not in df.columns:
return None, "❌ Please select a valid Numeric column."
fig, _ = create_distribution_plot(df, dist_col, 'histogram')
return fig, f"βœ… Histogram created for {dist_col}"
elif viz_type == "Distribution (Box Plot)":
if not dist_col or dist_col not in df.columns:
return None, "❌ Please select a valid Numeric column."
fig, _ = create_distribution_plot(df, dist_col, 'boxplot')
return fig, f"βœ… Box Plot created for {dist_col}"
elif viz_type == "Bar Chart":
if not cat_col or cat_col not in df.columns:
return None, "❌ Please select a valid Category column."
if not cat_value_col or cat_value_col not in df.columns:
return None, "❌ Please select a valid Value column."
fig, _ = create_category_bar_chart(
df, cat_col, cat_value_col, cat_agg_method, int(top_n),
f"Top {int(top_n)} {cat_col} by {cat_value_col} ({cat_agg_method})"
)
return fig, f"βœ… Bar Chart created! Category: {cat_col}, Value: {cat_value_col}"
elif viz_type == "Pie Chart":
if not cat_col or cat_col not in df.columns:
return None, "❌ Please select a valid Category column."
if not cat_value_col or cat_value_col not in df.columns:
return None, "❌ Please select a valid Value column."
fig, _ = create_pie_chart(
df, cat_col, cat_value_col, cat_agg_method, int(top_n),
f"{cat_col} Distribution by {cat_value_col} ({cat_agg_method})"
)
return fig, f"βœ… Pie Chart created! Category: {cat_col}, Value: {cat_value_col}"
elif viz_type == "Scatter Plot":
if not scatter_x or scatter_x not in df.columns:
return None, "❌ Please select a valid X-axis column."
if not scatter_y or scatter_y not in df.columns:
return None, "❌ Please select a valid Y-axis column."
color = color_col if color_col and color_col != "None" else None
fig, _ = create_scatter_plot(df, scatter_x, scatter_y, color)
return fig, f"βœ… Scatter Plot created! X: {scatter_x}, Y: {scatter_y}"
elif viz_type == "Correlation Heatmap":
fig, _ = create_correlation_heatmap(df)
return fig, "βœ… Correlation Heatmap created!"
else:
return None, "Please select a visualization type."
except Exception as e:
return None, f"❌ Error creating visualization: {str(e)}"
def generate_insights_report(group_col):
"""Generate comprehensive insights."""
df = current_data.get("filtered_df")
if df is None:
return "Please upload data first.", None, None
insights_text = generate_all_insights(df)
top_df = None
numeric_cols = get_numeric_columns(df)
if group_col and group_col in df.columns and numeric_cols:
top_df, _ = generate_top_performers(df, group_col, numeric_cols[0], n=10)
anomaly_df = None
if numeric_cols:
anomaly_df, _ = detect_anomalies(df, numeric_cols[0])
return insights_text, top_df, anomaly_df
def export_filtered_data():
"""Export filtered data to CSV."""
df = current_data.get("filtered_df")
if df is None:
return None, "No data to export."
try:
temp_file = tempfile.NamedTemporaryFile(delete=False, suffix='.csv', prefix='filtered_data_')
df.to_csv(temp_file.name, index=False)
return temp_file.name, f"βœ… Exported {len(df):,} rows to CSV"
except Exception as e:
return None, f"❌ Export failed: {str(e)}"
def export_chart(fig):
"""Export current chart to PNG."""
if fig is None:
return None, "No chart to export."
try:
temp_file = tempfile.NamedTemporaryFile(delete=False, suffix='.png', prefix='chart_')
save_plot(fig, temp_file.name)
return temp_file.name, "βœ… Chart exported to PNG"
except Exception as e:
return None, f"❌ Export failed: {str(e)}"
def create_dashboard():
"""Create the main Gradio dashboard interface."""
with gr.Blocks(
title="Business Intelligence Dashboard"
) as demo:
gr.Markdown("""
# πŸ“Š Business Intelligence Dashboard
### Universal Data Analysis Platform
Upload **any CSV or Excel file** to explore insights, filter records, create visualizations, and generate reports.
""")
# ==================== TAB 1: DATA UPLOAD ====================
with gr.Tab("πŸ“ Data Upload"):
with gr.Row():
with gr.Column(scale=1):
file_input = gr.File(
label="Upload CSV or Excel File",
file_types=[".csv", ".xlsx", ".xls"],
type="filepath"
)
with gr.Row():
upload_btn = gr.Button("πŸš€ Load Data", variant="primary", size="lg")
clean_btn = gr.Button("🧹 Clean Data", variant="secondary", size="lg")
with gr.Column(scale=2):
upload_status = gr.Markdown("*Upload a file to begin*")
with gr.Row():
with gr.Column():
gr.Markdown("### πŸ“‹ Data Preview")
data_preview = gr.Dataframe(label="First 10 Rows", wrap=True)
with gr.Column():
gr.Markdown("### πŸ“Š Column Information")
column_info = gr.Dataframe(label="Column Details", wrap=True)
# ==================== TAB 2: STATISTICS ====================
with gr.Tab("πŸ“ˆ Statistics"):
stats_btn = gr.Button("πŸ”„ Generate Statistics", variant="primary")
with gr.Row():
with gr.Column():
gr.Markdown("### πŸ”’ Numerical Statistics")
num_stats_table = gr.Dataframe(label="Numerical Columns", wrap=True)
with gr.Column():
gr.Markdown("### 🏷️ Categorical Statistics")
cat_stats_table = gr.Dataframe(label="Categorical Columns", wrap=True)
with gr.Row():
with gr.Column():
gr.Markdown("### ⚠️ Missing Values Report")
missing_table = gr.Dataframe(label="Missing Values", wrap=True)
with gr.Column():
gr.Markdown("### πŸ”— Correlation Matrix")
corr_table = gr.Dataframe(label="Correlations", wrap=True)
# ==================== TAB 3: FILTER & EXPLORE ====================
with gr.Tab("πŸ” Filter & Explore"):
gr.Markdown("### Apply Dynamic Filters")
with gr.Row():
with gr.Column(scale=1):
gr.Markdown("**πŸ“… Date Filter**")
filter_date_col = gr.Dropdown(label="Date Column", choices=[])
filter_start_date = gr.Textbox(label="Start Date (YYYY-MM-DD)", placeholder="2020-01-01")
filter_end_date = gr.Textbox(label="End Date (YYYY-MM-DD)", placeholder="2024-12-31")
with gr.Column(scale=1):
gr.Markdown("**🏷️ Category Filter**")
filter_cat_col = gr.Dropdown(label="Category Column", choices=[])
filter_cat_values = gr.Dropdown(label="Select Values", choices=[], multiselect=True)
with gr.Column(scale=1):
gr.Markdown("**πŸ”’ Numeric Filter**")
filter_num_col = gr.Dropdown(label="Numeric Column", choices=[])
filter_num_min = gr.Number(label="Min Value", value=None)
filter_num_max = gr.Number(label="Max Value", value=None)
with gr.Column(scale=1):
gr.Markdown("**πŸ”Ž Text Search**")
filter_search_term = gr.Textbox(label="Search Term", placeholder="Enter search term...")
with gr.Row():
filter_btn = gr.Button("βœ… Apply Filters", variant="primary")
reset_btn = gr.Button("πŸ”„ Reset Filters", variant="secondary")
filter_status = gr.Markdown("*Apply filters to explore data*")
row_count = gr.Number(label="Filtered Rows", interactive=False)
gr.Markdown("### πŸ“‹ Filtered Data Preview")
filtered_preview = gr.Dataframe(label="Filtered Results", wrap=True)
# ==================== TAB 4: VISUALIZATIONS ====================
with gr.Tab("πŸ“Š Visualizations"):
gr.Markdown("### Select Visualization Type")
viz_type = gr.Dropdown(
label="Visualization Type",
choices=[
"Time Series", "Distribution (Histogram)", "Distribution (Box Plot)",
"Bar Chart", "Pie Chart", "Scatter Plot", "Correlation Heatmap"
],
value="Time Series"
)
with gr.Row():
with gr.Column(scale=1):
with gr.Group(visible=True) as time_series_options:
gr.Markdown("### πŸ“ˆ Time Series Settings")
date_col = gr.Dropdown(label="πŸ“… Date Column", choices=[])
ts_value_col = gr.Dropdown(label="πŸ“Š Value Column", choices=[])
time_freq = gr.Dropdown(
label="⏱️ Frequency",
choices=["D", "W", "M"],
value="D"
)
gr.Markdown("*D=Daily, W=Weekly, M=Monthly*")
ts_agg_method = gr.Dropdown(
label="πŸ“ Aggregation Method",
choices=AGGREGATION_METHODS,
value="sum"
)
with gr.Group(visible=False) as distribution_options:
gr.Markdown("### πŸ“Š Distribution Settings")
dist_col = gr.Dropdown(label="πŸ“Š Numeric Column", choices=[])
with gr.Group(visible=False) as category_options:
gr.Markdown("### πŸ“Š Category Chart Settings")
cat_col = gr.Dropdown(label="🏷️ Category Column", choices=[])
cat_value_col = gr.Dropdown(label="πŸ“Š Value Column", choices=[])
cat_agg_method = gr.Dropdown(
label="πŸ“ Aggregation Method",
choices=AGGREGATION_METHODS,
value="sum"
)
top_n = gr.Slider(label="πŸ” Top N Items", minimum=5, maximum=20, value=10, step=1)
with gr.Group(visible=False) as scatter_options:
gr.Markdown("### πŸ“Š Scatter Plot Settings")
scatter_x = gr.Dropdown(label="πŸ“Š X-Axis Column", choices=[])
scatter_y = gr.Dropdown(label="πŸ“Š Y-Axis Column", choices=[])
color_col = gr.Dropdown(label="🎨 Color By (optional)", choices=["None"], value="None")
with gr.Group(visible=False) as heatmap_options:
gr.Markdown("### πŸ”₯ Correlation Heatmap")
gr.Markdown("*Shows correlations between all numeric columns*")
viz_btn = gr.Button("πŸ“ˆ Create Visualization", variant="primary", size="lg")
with gr.Column(scale=2):
viz_status = gr.Markdown("*Select options and click Create*")
chart_output = gr.Plot(label="Visualization")
with gr.Row():
export_chart_btn = gr.Button("πŸ’Ύ Export Chart as PNG")
chart_file = gr.File(label="Download Chart")
def update_viz_options(viz_type):
return {
time_series_options: gr.update(visible=(viz_type == "Time Series")),
distribution_options: gr.update(visible=(viz_type in ["Distribution (Histogram)", "Distribution (Box Plot)"])),
category_options: gr.update(visible=(viz_type in ["Bar Chart", "Pie Chart"])),
scatter_options: gr.update(visible=(viz_type == "Scatter Plot")),
heatmap_options: gr.update(visible=(viz_type == "Correlation Heatmap"))
}
viz_type.change(
fn=update_viz_options,
inputs=[viz_type],
outputs=[time_series_options, distribution_options, category_options, scatter_options, heatmap_options]
)
# ==================== TAB 5: INSIGHTS ====================
with gr.Tab("πŸ’‘ Insights"):
with gr.Row():
insights_group_col = gr.Dropdown(label="Group By Column (for Top Performers)", choices=[])
insights_btn = gr.Button("πŸ” Generate Insights", variant="primary", size="lg")
insights_output = gr.Markdown("*Click to generate automated insights*")
with gr.Row():
with gr.Column():
gr.Markdown("### πŸ† Top Performers")
top_performers_table = gr.Dataframe(wrap=True)
with gr.Column():
gr.Markdown("### ⚠️ Detected Anomalies")
anomalies_table = gr.Dataframe(wrap=True)
# ==================== TAB 6: EXPORT ====================
with gr.Tab("πŸ’Ύ Export"):
gr.Markdown("### Export Your Data")
with gr.Row():
with gr.Column():
gr.Markdown("**πŸ“„ Export Filtered Data**")
export_csv_btn = gr.Button("πŸ“₯ Export to CSV", variant="primary")
csv_file = gr.File(label="Download CSV")
csv_status = gr.Markdown("")
# ==================== EVENT HANDLERS ====================
# Upload handler
upload_btn.click(
fn=upload_and_process,
inputs=[file_input],
outputs=[
upload_status, data_preview, column_info,
date_col, ts_value_col,
dist_col,
cat_col, cat_value_col,
scatter_x, scatter_y, color_col,
filter_date_col, filter_cat_col, filter_cat_values, filter_num_col,
insights_group_col
]
)
# Clean data handler
clean_btn.click(
fn=clean_data_action,
inputs=[],
outputs=[
upload_status, data_preview, column_info,
date_col, ts_value_col,
dist_col,
cat_col, cat_value_col,
scatter_x, scatter_y, color_col,
filter_date_col, filter_cat_col, filter_cat_values, filter_num_col,
insights_group_col
]
)
# Statistics handler
stats_btn.click(
fn=get_statistics,
inputs=[],
outputs=[num_stats_table, cat_stats_table, missing_table, corr_table]
)
# Update category filter values when column changes
filter_cat_col.change(
fn=update_filter_values,
inputs=[filter_cat_col],
outputs=[filter_cat_values]
)
# Filter handlers
filter_btn.click(
fn=apply_filters,
inputs=[
filter_date_col, filter_start_date, filter_end_date,
filter_cat_col, filter_cat_values,
filter_num_col, filter_num_min, filter_num_max,
filter_search_term
],
outputs=[filter_status, filtered_preview, row_count]
)
reset_btn.click(
fn=reset_filters,
inputs=[],
outputs=[
filter_status, filtered_preview, row_count,
filter_start_date, filter_end_date,
filter_cat_values,
filter_num_min, filter_num_max,
filter_search_term
]
)
# Visualization handler
viz_btn.click(
fn=create_visualization,
inputs=[
viz_type,
date_col, ts_value_col, time_freq, ts_agg_method,
dist_col,
cat_col, cat_value_col, cat_agg_method, top_n,
scatter_x, scatter_y, color_col
],
outputs=[chart_output, viz_status]
)
# Insights handler
insights_btn.click(
fn=generate_insights_report,
inputs=[insights_group_col],
outputs=[insights_output, top_performers_table, anomalies_table]
)
# Export handlers
export_csv_btn.click(
fn=export_filtered_data,
inputs=[],
outputs=[csv_file, csv_status]
)
export_chart_btn.click(
fn=export_chart,
inputs=[chart_output],
outputs=[chart_file, viz_status]
)
return demo
# ==================== MAIN ====================
if __name__ == "__main__":
demo = create_dashboard()
demo.launch(
share=True,
server_name="0.0.0.0",
server_port=7860,
show_error=True
)