Saumith devarsetty
Refactor: Consolidated insights, updated visualizations to Plotly, and updated README
4032138
"""
My BI Dashboard - Main Application
Built with Gradio for interactive data exploration and analysis.
"""
import gradio as gr
import pandas as pd
import matplotlib.pyplot as plt
import plotly.express as px
from typing import Optional, Tuple
import os
# My custom modules for data processing and visualization
from data_processor import (
load_data, get_data_summary, get_correlation_matrix,
apply_filters, clean_data, aggregate_data, get_data_preview
)
from visualizations import (
create_plotly_timeseries, create_plotly_distribution, create_plotly_category,
create_plotly_scatter, create_plotly_heatmap
)
from insights import (
generate_all_insights, format_insights_for_display,
generate_visualization_insights, generate_advanced_insights, format_advanced_insights,
generate_smart_dashboard, compare_datasets
)
from utils import get_column_types, get_missing_value_summary, get_dataframe_info
# Store the current dataset globally
current_df = None
filtered_df = None
def update_preview_pagination(offset=0):
"""Updates the data preview based on offset."""
global current_df
if current_df is None:
return "No data loaded.", None, 0
total_rows = len(current_df)
# Clamp offset
if offset >= total_rows:
offset = max(0, total_rows - (total_rows % 10 or 10))
elif offset < 0:
offset = 0
preview = current_df.iloc[offset : offset + 10]
status_msg = f"Dataset Loaded Successfully! βœ… (Rows {offset+1}-{min(offset+10, total_rows)} of {total_rows})"
return status_msg, preview, offset
def upload_and_preview_data(file):
"""Handles file upload and shows preview."""
global current_df, filtered_df
if file is None:
return (
"Please upload a file.", None, 0, 0, # status, preview, row_count, offset
gr.update(value=None), gr.update(value=None), gr.update(value=None), # num filters
gr.update(choices=[], value=None), gr.update(choices=[]), # cat filter 1
gr.update(choices=[], value=None), gr.update(choices=[]), # cat filter 2
gr.update(choices=[], value=None), gr.update(value=""), gr.update(value=""), # date filter
gr.update(choices=[], value=None), gr.update(choices=[], value=None), gr.update(choices=[], value=None), # viz cols
gr.update(choices=[], value=None), gr.update(choices=[], value=None), # comp cols
gr.update(choices=[], value=None) # drill col
)
try:
# Load data
df, error = load_data(file.name)
if error:
return [f"Error: {error}"] + [None]*19
if df is not None:
current_df = df
filtered_df = df.copy()
# Get column types
col_types = get_column_types(df)
# Get preview (first 10 rows)
preview = df.head(10)
status_msg = f"Dataset Loaded Successfully! βœ… (Rows 1-{min(10, len(df))} of {len(df)})"
# Get all columns
all_cols = df.columns.tolist()
return (
status_msg, preview, len(df), 0, # status, preview, row_count, offset
gr.update(choices=col_types['numerical'], value=col_types['numerical'][0] if col_types['numerical'] else None), # num_col
gr.update(value=None), gr.update(value=None), # num_min, num_max
gr.update(choices=col_types['categorical'], value=col_types['categorical'][0] if col_types['categorical'] else None), # cat_col
gr.update(choices=[]), # cat_vals
gr.update(choices=col_types['categorical'], value=None), # cat_col_2
gr.update(choices=[]), # cat_vals_2
gr.update(choices=col_types['datetime'], value=col_types['datetime'][0] if col_types['datetime'] else None), # date_col
gr.update(value=""), gr.update(value=""), # date_start, date_end
gr.update(choices=all_cols, value=None), # x_col
gr.update(choices=all_cols, value=None), # y_col
gr.update(choices=all_cols, value=None), # color_col
gr.update(choices=col_types['categorical'], value=None), # comp_a
gr.update(choices=col_types['categorical'], value=None), # comp_b
gr.update(choices=col_types['categorical'], value=None) # drill
)
else:
return ["Error loading file."] + [None]*19
except Exception as e:
return [f"Error: {str(e)}"] + [None]*19
def load_from_path_or_url(path_or_url):
"""Load data from a file path or URL."""
global current_df, filtered_df
if not path_or_url or path_or_url.strip() == "":
return "Please enter a file path or URL.", None, "", gr.update(), gr.update(), gr.update(), gr.update(), gr.update(), gr.update()
try:
# Check if it's a URL
if path_or_url.startswith('http://') or path_or_url.startswith('https://'):
# Load from URL
if path_or_url.endswith('.csv'):
df = pd.read_csv(path_or_url)
elif path_or_url.endswith(('.xlsx', '.xls')):
df = pd.read_excel(path_or_url)
else:
# Try CSV by default
df = pd.read_csv(path_or_url)
else:
# Load from local path
df, error = load_data(path_or_url)
if error:
return f"Error: {error}", None, "", gr.update(), gr.update(), gr.update(), gr.update(), gr.update(), gr.update()
# Validate the loaded DataFrame
is_valid, error_msg = validate_dataframe(df)
if not is_valid:
return f"Invalid data: {error_msg}", None, "", gr.update(), gr.update(), gr.update(), gr.update(), gr.update(), gr.update()
# Auto-detect and convert datetime columns
datetime_cols = detect_datetime_columns(df)
for col in datetime_cols:
try:
df[col] = pd.to_datetime(df[col], errors='coerce')
except Exception:
pass
current_df = df
filtered_df = df.copy()
# Get basic info
info = get_dataframe_info(df)
col_types = get_column_types(df)
# Create summary text
summary = f"""
## Dataset Loaded Successfully! βœ…
**Source:** {path_or_url}
**Basic Information:**
- Rows: {info['rows']:,}
- Columns: {info['columns']}
- Numerical Columns: {info['numerical_columns']}
- Categorical Columns: {info['categorical_columns']}
- DateTime Columns: {info['datetime_columns']}
- Memory Usage: {info['memory_usage_mb']:.2f} MB
- Missing Values: {info['total_missing']:,} ({info['missing_percentage']:.2f}%)
**Column Names:**
{', '.join(df.columns.tolist())}
"""
# Get preview
preview = df.head(10)
# Get all columns for dropdowns
all_cols = df.columns.tolist()
# Return updates for visualization dropdowns and filter dropdowns
return (
summary,
preview,
"Data loaded successfully from path/URL!",
gr.update(choices=all_cols, value=None), # x_column - clear selection
gr.update(choices=all_cols, value=None), # y_column - clear selection
gr.update(choices=all_cols, value=None), # color_column - clear selection
gr.update(choices=col_types['numerical'], value=None), # num_filter_col - clear selection
gr.update(choices=col_types['categorical'], value=None), # cat_filter_col - clear selection
gr.update(choices=col_types['datetime'], value=None) # date_filter_col - clear selection
)
except Exception as e:
return f"Error loading data: {str(e)}", None, "", gr.update(), gr.update(), gr.update(), gr.update(), gr.update(), gr.update()
def show_statistics():
"""Generate and display statistics for the current dataset."""
global current_df
if current_df is None:
return 0, 0, 0, 0, None, None, None
# Get summary from data processor
summary = get_data_summary(current_df)
# 1. Metrics
rows = summary['shape'][0]
cols = summary['shape'][1]
dupes = summary.get('duplicate_rows', 0)
total_cells = rows * cols
missing_cells = sum(summary['missing_values'].values())
missing_pct = (missing_cells / total_cells * 100) if total_cells > 0 else 0
# 2. Numerical Stats (Transposed for better readability)
if 'numerical_stats' in summary:
num_df = pd.DataFrame(summary['numerical_stats']).T
num_df = num_df.reset_index().rename(columns={'index': 'Feature'})
# Round numeric columns
for col in num_df.columns:
if col != 'Feature':
num_df[col] = num_df[col].apply(lambda x: round(x, 2))
else:
num_df = pd.DataFrame(columns=["No numerical columns found"])
# 3. Categorical Stats
if 'categorical_stats' in summary:
cat_data = []
for col, stats in summary['categorical_stats'].items():
top_val = list(stats['top_values'].keys())[0] if stats['top_values'] else "N/A"
top_count = list(stats['top_values'].values())[0] if stats['top_values'] else 0
cat_data.append({
'Feature': col,
'Unique Values': stats['unique_count'],
'Most Common': top_val,
'Count': top_count,
'Share (%)': round(top_count / rows * 100, 1)
})
cat_df = pd.DataFrame(cat_data)
else:
cat_df = pd.DataFrame(columns=["No categorical columns found"])
# 4. Missing Stats
missing_data = []
for col, count in summary['missing_values'].items():
if count > 0:
missing_data.append({
'Feature': col,
'Missing Count': count,
'Missing (%)': round(count / rows * 100, 2)
})
if missing_data:
missing_df = pd.DataFrame(missing_data).sort_values('Missing Count', ascending=False)
else:
missing_df = pd.DataFrame(columns=["No missing values found"])
# 5. Correlation Matrix
corr_plot = create_plotly_heatmap(current_df, title="Correlation Matrix")
return rows, cols, dupes, round(missing_pct, 2), num_df, cat_df, missing_df, corr_plot
def update_filter_options():
"""Update filter options based on current dataset."""
global current_df
if current_df is None:
return gr.update(choices=[]), gr.update(choices=[]), gr.update(choices=[])
col_types = get_column_types(current_df)
return (
gr.update(choices=col_types['numerical']),
gr.update(choices=col_types['categorical']),
gr.update(choices=col_types['datetime'])
)
def apply_filters_and_update(num_col, num_min, num_max, cat_col, cat_vals, cat_col_2, cat_vals_2, date_col, date_start, date_end, offset=0):
"""Apply filters and return summary and preview."""
global filtered_df, current_df
if current_df is None:
return "No data loaded.", None, 0, offset
# Construct filters dictionary
filters = {}
# Numerical filter
if num_col and (num_min is not None or num_max is not None):
filters[num_col] = {
'type': 'numerical',
'min': num_min,
'max': num_max
}
# Categorical filter 1
if cat_col and cat_vals:
filters[cat_col] = {
'type': 'categorical',
'values': cat_vals
}
# Categorical filter 2
if cat_col_2 and cat_vals_2:
filters[cat_col_2] = {
'type': 'categorical',
'values': cat_vals_2
}
# Date filter
if date_col and (date_start or date_end):
filters[date_col] = {
'type': 'datetime',
'start_date': date_start,
'end_date': date_end
}
# Apply filters
filtered_df = apply_filters(current_df, filters)
# Create summary
summary_lines = [
"## Filtered Data",
f"**Original rows:** {len(current_df):,}",
f"**Filtered rows:** {len(filtered_df):,}",
f"**Rows removed:** {len(current_df) - len(filtered_df):,}"
]
# Add breakdown for categorical filter 1
if cat_col and cat_vals and not filtered_df.empty:
try:
counts = filtered_df[cat_col].value_counts()
summary_lines.append(f"\n**{cat_col} Breakdown:**")
for val in cat_vals:
if val in counts:
summary_lines.append(f"- {val}: {counts[val]:,}")
else:
summary_lines.append(f"- {val}: 0")
except:
pass
# Add breakdown for categorical filter 2
if cat_col_2 and cat_vals_2 and not filtered_df.empty:
try:
counts = filtered_df[cat_col_2].value_counts()
summary_lines.append(f"\n**{cat_col_2} Breakdown:**")
for val in cat_vals_2:
if val in counts:
summary_lines.append(f"- {val}: {counts[val]:,}")
else:
summary_lines.append(f"- {val}: 0")
except:
pass
summary = "\n".join(summary_lines)
# Pagination Logic
total_rows = len(filtered_df)
if offset >= total_rows:
offset = max(0, total_rows - (total_rows % 20 or 20))
elif offset < 0:
offset = 0
preview = filtered_df.iloc[offset : offset + 20]
# Add pagination info to summary
if total_rows > 0:
summary += f"\n\n**Showing Rows:** {offset+1}-{min(offset+20, total_rows)}"
return summary, preview, len(filtered_df), offset
def create_visualization(viz_type, x_col, y_col, color_col, agg_method, top_n, offset=0):
"""Create visualizations based on user selection and generate insights."""
global filtered_df
if filtered_df is None or len(filtered_df) == 0:
return None, "Please upload and filter data first.", "", offset
try:
fig = None
if viz_type == "Time Series":
if not x_col or not y_col:
return None, "Please select Date (X) and Value (Y) columns.", "", offset
fig = create_plotly_timeseries(filtered_df, x_col, y_col, agg_method)
elif viz_type == "Distribution (Histogram)":
if not x_col:
return None, "Please select Column.", "", offset
fig = create_plotly_distribution(filtered_df, x_col)
elif viz_type == "Correlation Heatmap":
fig = create_plotly_heatmap(filtered_df)
if fig is None:
return None, "Need at least 2 numerical columns for correlation.", "", offset
elif viz_type == "Distribution (Box Plot)":
if not x_col:
return None, "Please select Column.", "", offset
fig = px.box(filtered_df, x=x_col, y=y_col, color=color_col, title=f"Distribution of {x_col}")
elif viz_type == "Bar Chart":
if not x_col:
return None, "Please select X-Axis column.", "", offset
# Handle Pagination Edge Cases
total_items = filtered_df[x_col].nunique()
if offset >= total_items and total_items > 0:
# Clamp to last page
offset = max(0, total_items - (total_items % top_n or top_n))
status_msg = f"Reached end of data. Showing items {offset+1}-{total_items}."
elif offset < 0:
offset = 0
status_msg = "Start of data."
else:
status_msg = "Visualization created successfully!"
fig = create_plotly_category(filtered_df, x_col, y_col, agg_method, top_n, offset=offset)
# Generate insights for the visualization
insights = generate_visualization_insights(viz_type, filtered_df, x_col, y_col)
insights_text = f"## πŸ“Š Visualization Insights\n\n{insights}"
return fig, status_msg, insights_text, offset
elif viz_type == "Pie Chart":
if not x_col:
return None, "Please select Category column.", "", offset
data = filtered_df[x_col].value_counts().head(top_n)
fig = px.pie(values=data.values, names=data.index, title=f"Top {top_n} {x_col}")
elif viz_type == "Scatter Plot":
if not x_col or not y_col:
return None, "Please select X and Y columns.", "", offset
fig = create_plotly_scatter(filtered_df, x_col, y_col, color_col)
elif viz_type == "Correlation Heatmap":
fig = create_plotly_heatmap(filtered_df)
if fig is None:
return None, "Need at least 2 numerical columns.", "", offset
# Generate insights for the visualization
insights = generate_visualization_insights(viz_type, filtered_df, x_col, y_col)
# Format insights with header
insights_text = f"## πŸ“Š Visualization Insights\n\n{insights}"
return fig, "Visualization created successfully!", insights_text, offset
except Exception as e:
return None, f"Error: {str(e)}", "", offset
def generate_insights_report():
"""Generate automated insights."""
global filtered_df
if filtered_df is None or len(filtered_df) == 0:
return "Please upload data first."
try:
insights = generate_all_insights(filtered_df)
formatted_insights = format_insights_for_display(insights)
return formatted_insights
except Exception as e:
return f"Error generating insights: {str(e)}"
def export_filtered_data():
"""Export filtered data to CSV."""
global filtered_df
if filtered_df is None:
return None, "No data to export."
output_path = "filtered_data_export.csv"
filtered_df.to_csv(output_path, index=False)
return output_path, f"Data exported successfully! ({len(filtered_df)} rows)"
def export_visualization(fig):
"""Export current visualization."""
if fig is None:
return None, "No visualization to export."
output_path = "visualization_export.png"
fig.savefig(output_path, dpi=300, bbox_inches='tight')
return output_path, "Visualization exported successfully!"
def create_dashboard():
"""Creates my main Gradio dashboard interface."""
with gr.Blocks(title="Business Intelligence Dashboard") as demo:
gr.Markdown("""
# πŸ“Š Business Intelligence Dashboard
### Professional Data Analysis & Visualization Platform
Upload your data, explore insights, create visualizations, and export results.
""")
# Dropdowns that will be populated across tabs
x_column_viz = None
y_column_viz = None
color_column_viz = None
num_filter_dropdown = None
cat_filter_dropdown = None
cat_filter_dropdown_2 = None
date_filter_dropdown = None
comp_cat_col_a = None
comp_cat_col_b = None
comp_cat_val_a = None
comp_cat_val_b = None
drill_col = None
drill_val = None
# Tab 1: Data Upload
with gr.Tab("πŸ“ Data Upload"):
# Welcome Banner Removed
gr.Markdown("### πŸ“€ Upload File")
file_input = gr.File(label="Drop CSV or Excel file here", file_types=[".csv", ".xlsx", ".xls"], height=100)
upload_btn = gr.Button("Load Data", variant="primary", size="lg")
# Status & Preview Section
gr.Markdown("### πŸ“‹ Data Status")
with gr.Row():
upload_message = gr.Textbox(label="System Status", value="Waiting for data...", interactive=False)
upload_status = gr.Markdown()
with gr.Accordion("πŸ‘€ Data Preview", open=True):
data_preview = gr.Dataframe(interactive=False)
# Pagination Controls for Data Preview
with gr.Row():
prev_preview_btn = gr.Button("⬅️ Prev Batch", size="sm")
next_preview_btn = gr.Button("Next Batch ➑️", size="sm")
preview_offset = gr.State(value=0)
preview_batch_size = gr.State(value=10)
# Tab 2: Statistics
with gr.Tab("πŸ“ˆ Statistics"):
gr.Markdown("## πŸ“Š Data Health & Statistics")
stats_btn = gr.Button("Generate Statistics", variant="primary")
# Metric Cards Row
with gr.Row():
stat_rows = gr.Number(label="Total Rows", value=0)
stat_cols = gr.Number(label="Total Columns", value=0)
stat_dupes = gr.Number(label="Duplicate Rows", value=0)
stat_missing = gr.Number(label="Missing Cells (%)", value=0)
gr.Markdown("### πŸ”’ Numerical Statistics")
numerical_stats = gr.Dataframe(label="Descriptive Statistics (Transposed)", interactive=False)
with gr.Row():
with gr.Column():
gr.Markdown("### πŸ“‹ Categorical Summary")
categorical_stats = gr.Dataframe(label="Top Categories", interactive=False)
with gr.Column():
gr.Markdown("### ⚠️ Missing Values Report")
missing_stats = gr.Dataframe(label="Missing Data by Column", interactive=False)
gr.Markdown("### πŸ”₯ Correlation Matrix")
corr_matrix_plot = gr.Plot(label="Correlation Matrix")
stats_btn.click(
fn=show_statistics,
inputs=[],
outputs=[stat_rows, stat_cols, stat_dupes, stat_missing, numerical_stats, categorical_stats, missing_stats, corr_matrix_plot]
)
# Tab 3: Filter & Explore
with gr.Tab("πŸ” Filter & Explore"):
gr.Markdown("## Interactive Data Filtering")
gr.Markdown("""
**How to use filters:**
1. Select a column from any filter section below
2. Set your filter criteria (range, values, or dates)
3. Click "Apply Filters" to see filtered results
4. You can combine multiple filters together
5. The filtered data will be used in Visualizations and Insights tabs
""")
with gr.Row():
with gr.Column():
gr.Markdown("### πŸ”’ Numerical Filters")
gr.Markdown("*Filter by number ranges (e.g., Sales, Price, Quantity)*")
num_filter_dropdown = gr.Dropdown(label="Select Numerical Column", choices=[], interactive=True)
with gr.Row():
num_min = gr.Number(label="Minimum Value", placeholder="Min")
num_max = gr.Number(label="Maximum Value", placeholder="Max")
with gr.Column():
gr.Markdown("### πŸ“‹ Categorical Filters")
gr.Markdown("*Filter by categories (e.g., Product, Region)*")
cat_filter_dropdown = gr.Dropdown(label="Select Categorical Column 1", choices=[], interactive=True)
cat_filter_values = gr.CheckboxGroup(label="Select Values to Include", choices=[])
gr.Markdown("---")
cat_filter_dropdown_2 = gr.Dropdown(label="Select Categorical Column 2 (Optional)", choices=[], interactive=True)
cat_filter_values_2 = gr.CheckboxGroup(label="Select Values to Include", choices=[])
# Date filters in accordion (optional)
with gr.Accordion("πŸ“… Date Filters (Optional - Click to Expand)", open=False):
gr.Markdown("*Filter by date ranges (format: YYYY-MM-DD)*")
date_filter_dropdown = gr.Dropdown(label="Select Date Column", choices=[], interactive=True)
with gr.Row():
date_start = gr.Textbox(label="Start Date", placeholder="YYYY-MM-DD")
date_end = gr.Textbox(label="End Date", placeholder="YYYY-MM-DD")
with gr.Row():
filter_btn = gr.Button("Apply Filters", variant="primary", size="lg")
reset_filter_btn = gr.Button("Reset Filters", variant="secondary", size="lg")
gr.Markdown("### Filter Results")
filter_summary = gr.Markdown()
with gr.Accordion("Filtered Data Preview", open=True):
filtered_preview = gr.Dataframe(label="Filtered Data Preview")
# Pagination Controls for Filtered Preview
with gr.Row():
prev_filtered_btn = gr.Button("⬅️ Prev Batch", size="sm")
next_filtered_btn = gr.Button("Next Batch ➑️", size="sm")
filtered_offset = gr.State(value=0)
filtered_batch_size = gr.State(value=20)
row_count = gr.Number(label="Total Filtered Rows", interactive=False)
# Update categorical values when column is selected
def update_cat_values(col):
if current_df is not None and col:
values = current_df[col].unique().tolist()
# Reset value to None to avoid "value not in choices" error
return gr.update(choices=values, value=None)
return gr.update(choices=[], value=None)
cat_filter_dropdown.change(
fn=update_cat_values,
inputs=[cat_filter_dropdown],
outputs=[cat_filter_values]
)
cat_filter_dropdown_2.change(
fn=update_cat_values,
inputs=[cat_filter_dropdown_2],
outputs=[cat_filter_values_2]
)
filter_btn.click(
fn=apply_filters_and_update,
inputs=[num_filter_dropdown, num_min, num_max, cat_filter_dropdown,
cat_filter_values, cat_filter_dropdown_2, cat_filter_values_2,
date_filter_dropdown, date_start, date_end],
outputs=[filter_summary, filtered_preview, row_count, filtered_offset]
)
# Reset filters function
def reset_filters():
global filtered_df, current_df
if current_df is not None:
filtered_df = current_df.copy()
preview = filtered_df.head(20)
return (
gr.update(value=None), # num_col
gr.update(value=None), # num_min
gr.update(value=None), # num_max
gr.update(value=None), # cat_col
gr.update(choices=[]), # cat_values
gr.update(value=None), # cat_col_2
gr.update(choices=[]), # cat_values_2
gr.update(value=None), # date_col
gr.update(value=""), # date_start
gr.update(value=""), # date_end
"Filters reset. Showing original data.",
preview,
len(filtered_df)
)
return [gr.update()] * 10 + ["No data loaded.", None, 0]
# Pagination Logic for Filtered Preview
def update_filtered_offset(current_offset, direction, batch_size=20):
if direction == "next":
return current_offset + batch_size
else:
return max(0, current_offset - batch_size)
prev_filtered_btn.click(
fn=update_filtered_offset,
inputs=[filtered_offset, gr.State("prev"), filtered_batch_size],
outputs=[filtered_offset]
).then(
fn=apply_filters_and_update,
inputs=[
num_filter_dropdown, num_min, num_max,
cat_filter_dropdown, cat_filter_values,
cat_filter_dropdown_2, cat_filter_values_2,
date_filter_dropdown, date_start, date_end,
filtered_offset
],
outputs=[filter_summary, filtered_preview, row_count, filtered_offset]
)
next_filtered_btn.click(
fn=update_filtered_offset,
inputs=[filtered_offset, gr.State("next"), filtered_batch_size],
outputs=[filtered_offset]
).then(
fn=apply_filters_and_update,
inputs=[
num_filter_dropdown, num_min, num_max,
cat_filter_dropdown, cat_filter_values,
cat_filter_dropdown_2, cat_filter_values_2,
date_filter_dropdown, date_start, date_end,
filtered_offset
],
outputs=[filter_summary, filtered_preview, row_count, filtered_offset]
)
# Update filter button to reset offset
filter_btn.click(
fn=lambda *args: apply_filters_and_update(*args, offset=0),
inputs=[
num_filter_dropdown, num_min, num_max,
cat_filter_dropdown, cat_filter_values,
cat_filter_dropdown_2, cat_filter_values_2,
date_filter_dropdown, date_start, date_end
],
outputs=[filter_summary, filtered_preview, row_count, filtered_offset]
)
# Pagination Logic for Data Preview
def update_preview_offset(current_offset, direction, batch_size=10):
if direction == "next":
return current_offset + batch_size
else:
return max(0, current_offset - batch_size)
prev_preview_btn.click(
fn=update_preview_offset,
inputs=[preview_offset, gr.State("prev"), preview_batch_size],
outputs=[preview_offset]
).then(
fn=update_preview_pagination,
inputs=[preview_offset],
outputs=[upload_status, data_preview, preview_offset]
)
next_preview_btn.click(
fn=update_preview_offset,
inputs=[preview_offset, gr.State("next"), preview_batch_size],
outputs=[preview_offset]
).then(
fn=update_preview_pagination,
inputs=[preview_offset],
outputs=[upload_status, data_preview, preview_offset]
)
# Update upload button to reset offset
# Update upload button to reset offset
# Tab 4: Visualizations
with gr.Tab("πŸ“Š Visualizations"):
gr.Markdown("## 🎨 Create Interactive Visualizations")
with gr.Row():
# Left Column: Chart Settings
with gr.Column(scale=1):
gr.Markdown("### 1. Chart Settings")
viz_type = gr.Dropdown(
label="Select Chart Type",
choices=["Time Series", "Distribution (Histogram)", "Distribution (Box Plot)",
"Bar Chart", "Pie Chart", "Scatter Plot", "Correlation Heatmap"],
value="Bar Chart"
)
# Dynamic help text based on selection could be added here, but static for now
gr.Markdown("""
<div style="font-size: 12px; color: #888; margin-bottom: 10px;">
<b>Guide:</b><br>
β€’ <b>Bar/Pie</b>: Compare categories<br>
β€’ <b>Time Series</b>: Trends over time<br>
β€’ <b>Scatter</b>: Relationships between numbers<br>
β€’ <b>Distribution</b>: Spread of data
</div>
""")
agg_method = gr.Dropdown(
label="Aggregation (for grouped data)",
choices=["sum", "mean", "count", "median"],
value="sum"
)
top_n = gr.Slider(label="Top N Categories", minimum=5, maximum=20, value=10, step=1)
# Pagination Controls
with gr.Row():
prev_batch_btn = gr.Button("⬅️ Prev Batch", size="sm")
next_batch_btn = gr.Button("Next Batch ➑️", size="sm")
viz_offset = gr.State(value=0)
viz_btn = gr.Button("πŸš€ Create Visualization", variant="primary")
# Right Column: Data Selection
with gr.Column(scale=2):
gr.Markdown("### 2. Select Data")
with gr.Row():
x_column_viz = gr.Dropdown(label="X-Axis / Category Column", choices=[], interactive=True)
y_column_viz = gr.Dropdown(label="Y-Axis / Value Column", choices=[], interactive=True)
color_column_viz = gr.Dropdown(label="Color / Grouping (Optional)", choices=[], interactive=True)
# Visualization Output Area
with gr.Row():
with gr.Column(scale=3):
viz_plot = gr.Plot(label="Interactive Chart")
with gr.Column(scale=1):
gr.Markdown("### πŸ’‘ AI Insights")
viz_insights = gr.Markdown(value="*Insights will appear here after generating a chart.*")
viz_status = gr.Textbox(label="Status", interactive=False, visible=True)
# Wrapper to reset offset when creating new visualization
def create_viz_reset(viz_type, x_col, y_col, color_col, agg_method, top_n):
return create_visualization(viz_type, x_col, y_col, color_col, agg_method, top_n, offset=0)
viz_btn.click(
fn=create_viz_reset,
inputs=[viz_type, x_column_viz, y_column_viz, color_column_viz, agg_method, top_n],
outputs=[viz_plot, viz_status, viz_insights, viz_offset]
)
# Export Toolbar
with gr.Row(variant="panel"):
with gr.Column(scale=3):
gr.Markdown("**Export Options:**")
with gr.Column(scale=1):
export_viz_btn = gr.Button("πŸ’Ύ Download PNG", size="sm")
export_viz_file = gr.File(label="Download File", visible=False)
export_viz_status = gr.Textbox(visible=False)
# Pagination Logic
def update_viz_offset(current_offset, direction, top_n):
if direction == "next":
return current_offset + top_n
else:
return max(0, current_offset - top_n)
prev_batch_btn.click(
fn=update_viz_offset,
inputs=[viz_offset, gr.State("prev"), top_n],
outputs=[viz_offset]
).then(
fn=create_visualization,
inputs=[viz_type, x_column_viz, y_column_viz, color_column_viz, agg_method, top_n, viz_offset],
outputs=[viz_plot, viz_status, viz_insights, viz_offset]
)
next_batch_btn.click(
fn=update_viz_offset,
inputs=[viz_offset, gr.State("next"), top_n],
outputs=[viz_offset]
).then(
fn=create_visualization,
inputs=[viz_type, x_column_viz, y_column_viz, color_column_viz, agg_method, top_n, viz_offset],
outputs=[viz_plot, viz_status, viz_insights, viz_offset]
)
export_viz_btn.click(
fn=lambda: export_visualization(viz_plot.value) if viz_plot.value else (None, "No visualization to export"),
inputs=[],
outputs=[export_viz_file, export_viz_status]
)
# Show file download when ready
def show_download(file, status):
return gr.update(visible=True), status
export_viz_btn.click(
fn=show_download,
inputs=[export_viz_file, export_viz_status],
outputs=[export_viz_file, export_viz_status]
)
# Tab 5: Insights
with gr.Tab("πŸ’‘ Insights"):
gr.Markdown("## 🧠 Advanced AI Insights")
gr.Markdown("Deep dive analysis of your data's performance, drivers, and risks.")
with gr.Row():
insights_source = gr.Radio(
choices=["Full Dataset", "Filtered Data"],
value="Filtered Data",
label="Analysis Scope",
info="Choose whether to analyze the entire dataset or just the filtered subset."
)
insights_btn = gr.Button("Generate Advanced Insights", variant="primary", size="lg")
# Executive Summary Row
gr.Markdown("### πŸ“‹ Executive Summary")
with gr.Row():
exec_card1 = gr.Markdown()
exec_card2 = gr.Markdown()
exec_card3 = gr.Markdown()
# Detailed Analysis Rows
with gr.Row():
with gr.Column():
gr.Markdown("### πŸ† Pareto Analysis (80/20 Rule)")
pareto_output = gr.Markdown("*Click Generate to see vital few categories*")
with gr.Column():
gr.Markdown("### πŸ”‘ Key Drivers")
drivers_output = gr.Markdown("*Click Generate to see what drives your metrics*")
with gr.Row():
with gr.Column():
gr.Markdown("### πŸ“Š Segment Performance")
segments_output = gr.Markdown("*Click Generate to see segment analysis*")
with gr.Column():
gr.Markdown("### ⚠️ Anomalies & Risks")
anomalies_output = gr.Markdown("*Click Generate to see detected anomalies*")
with gr.Accordion("πŸ“„ View Full Report", open=False):
full_report_output = gr.Markdown("Generate insights to see the full report.")
def update_insights(source):
# from advanced_insights import generate_advanced_insights, format_advanced_insights
global filtered_df, current_df
target_df = filtered_df if source == "Filtered Data" else current_df
if target_df is None:
return ["Please upload data first."] * 8
try:
insights = generate_advanced_insights(target_df)
# Format Executive Summary cards
cards = insights.get('executive_summary', [])
card_outputs = []
for card in cards:
html = f"""
<div style="background-color: #2b2b2b; padding: 20px; border-radius: 10px; border: 1px solid #444;">
<div style="font-size: 24px; margin-bottom: 10px;">{card['icon']}</div>
<div style="color: #888; font-size: 14px; text-transform: uppercase;">{card['title']}</div>
<div style="color: white; font-size: 28px; font-weight: bold; margin: 5px 0;">{card['value']}</div>
<div style="color: #aaa; font-size: 12px;">{card['description']}</div>
</div>
"""
card_outputs.append(html)
# Fill remaining cards if less than 3
while len(card_outputs) < 3:
card_outputs.append("")
# Format Pareto
pareto = insights.get('pareto_analysis', [])
if pareto:
pareto_text = ""
for p in pareto:
pareto_text += f"#### {p['category']} Analysis\n"
pareto_text += f"{p['insight']}\n\n"
pareto_text += "**Vital Few (Top 5):**\n" + ", ".join([f"`{x}`" for x in p['vital_few']]) + "\n\n---\n"
else:
pareto_text = "No significant Pareto patterns found."
# Format Drivers
drivers = insights.get('key_drivers', [])
if drivers:
drivers_text = ""
for d in drivers:
drivers_text += f"- {d['insight']}\n"
else:
drivers_text = "No strong correlations found to identify key drivers."
# Format Segments
segments = insights.get('segment_analysis', [])
if segments:
seg_text = ""
for s in segments:
seg_text += f"- {s['insight']}\n"
else:
seg_text = "Not enough categorical data for segment analysis."
# Format Anomalies
anomalies = insights.get('anomalies', [])
if anomalies:
anom_text = ""
for a in anomalies:
anom_text += f"- {a['insight']}\n"
else:
anom_text = "βœ… No significant anomalies detected."
# Generate Full Report
full_report = format_advanced_insights(insights)
return card_outputs + [pareto_text, drivers_text, seg_text, anom_text, full_report]
except Exception as e:
return [f"Error: {str(e)}"] * 8
insights_btn.click(
fn=update_insights,
inputs=[insights_source],
outputs=[exec_card1, exec_card2, exec_card3, pareto_output, drivers_output, segments_output, anomalies_output, full_report_output]
)
# Tab 6: Smart Dashboard
with gr.Tab("πŸš€ Smart Dashboard"):
smart_btn = gr.Button("✨ Generate Smart Dashboard", variant="primary", size="lg")
with gr.Row():
with gr.Column():
plot1 = gr.Plot()
desc1 = gr.Markdown()
with gr.Column():
plot2 = gr.Plot()
desc2 = gr.Markdown()
with gr.Row():
with gr.Column():
plot3 = gr.Plot()
desc3 = gr.Markdown()
with gr.Column():
plot4 = gr.Plot()
desc4 = gr.Markdown()
with gr.Row():
with gr.Column():
plot5 = gr.Plot()
desc5 = gr.Markdown()
with gr.Column():
plot6 = gr.Plot()
desc6 = gr.Markdown()
smart_status = gr.Textbox(label="Status", interactive=False)
def update_smart_dashboard():
# from smart_dashboard import generate_smart_dashboard
global filtered_df
if filtered_df is None:
return [None] * 12 + ["Please upload data first."]
try:
items = generate_smart_dashboard(filtered_df)
outputs = []
# Fill up to 6 slots
for i in range(6):
if i < len(items):
fig, title, insight = items[i]
outputs.append(fig)
outputs.append(f"### {title}\n\n{insight}")
else:
outputs.append(None)
outputs.append("")
outputs.append(f"Successfully generated {len(items)} visualizations based on your data patterns!")
return outputs
except Exception as e:
return [None] * 12 + [f"Error generating dashboard: {str(e)}"]
smart_btn.click(
fn=update_smart_dashboard,
inputs=[],
outputs=[plot1, desc1, plot2, desc2, plot3, desc3, plot4, desc4, plot5, desc5, plot6, desc6, smart_status]
)
# Tab 7: Comparison
with gr.Tab("βš–οΈ Compare"):
gr.Markdown("## Head-to-Head Comparison")
gr.Markdown("Compare two segments of your data side-by-side (e.g., Region A vs Region B).")
with gr.Row():
# Group A
with gr.Column(variant="panel"):
gr.Markdown("### Group A")
comp_cat_col_a = gr.Dropdown(label="Filter Column", choices=[], interactive=True)
comp_cat_val_a = gr.Dropdown(label="Filter Value", choices=[], interactive=True)
# Group B
with gr.Column(variant="panel"):
gr.Markdown("### Group B")
comp_cat_col_b = gr.Dropdown(label="Filter Column", choices=[], interactive=True)
comp_cat_val_b = gr.Dropdown(label="Filter Value", choices=[], interactive=True)
comp_btn = gr.Button("βš”οΈ Compare Groups", variant="primary", size="lg")
gr.Markdown("### Comparison Results")
with gr.Row():
comp_metric1 = gr.HTML()
comp_metric2 = gr.HTML()
comp_metric3 = gr.HTML()
comp_metric4 = gr.HTML()
comp_plot = gr.Plot(label="Side-by-Side Visualization")
# Update values when column selected
def update_comp_values(col):
global current_df
if current_df is not None and col:
unique_vals = current_df[col].unique().tolist()
if len(unique_vals) > 1000:
# Limit to first 1000 to prevent freeze
return gr.update(choices=unique_vals[:1000], label=f"Filter Value (Showing 1000/{len(unique_vals)})")
return gr.update(choices=unique_vals, label="Filter Value")
return gr.update(choices=[], label="Filter Value")
comp_cat_col_a.change(update_comp_values, comp_cat_col_a, comp_cat_val_a)
comp_cat_col_b.change(update_comp_values, comp_cat_col_b, comp_cat_val_b)
def run_comparison(col_a, val_a, col_b, val_b):
# from comparison import compare_datasets
global current_df
if current_df is None:
return ["Please upload data first."] * 4 + [None]
# Build filters
filter_a = {col_a: {'type': 'categorical', 'values': [val_a]}} if col_a and val_a else {}
filter_b = {col_b: {'type': 'categorical', 'values': [val_b]}} if col_b and val_b else {}
label_a = f"{col_a}={val_a}" if col_a else "All Data"
label_b = f"{col_b}={val_b}" if col_b else "All Data"
metrics, fig = compare_datasets(current_df, filter_a, filter_b, label_a, label_b)
# Pad metrics to 4
while len(metrics) < 4:
metrics.append("")
return metrics[:4] + [fig]
comp_btn.click(
fn=run_comparison,
inputs=[comp_cat_col_a, comp_cat_val_a, comp_cat_col_b, comp_cat_val_b],
outputs=[comp_metric1, comp_metric2, comp_metric3, comp_metric4, comp_plot]
)
comp_btn.click(
fn=run_comparison,
inputs=[comp_cat_col_a, comp_cat_val_a, comp_cat_col_b, comp_cat_val_b],
outputs=[comp_metric1, comp_metric2, comp_metric3, comp_metric4, comp_plot]
)
# Tab 8: Segment Explorer
with gr.Tab("πŸ” Segment Explorer"):
gr.Markdown("## Deep Dive Explorer")
gr.Markdown("Select a category to see a detailed mini-dashboard for that specific segment.")
with gr.Row():
drill_col = gr.Dropdown(label="Select Category Column", choices=[], interactive=True)
drill_val = gr.Dropdown(label="Select Value", choices=[], interactive=True)
drill_btn = gr.Button("πŸ”Ž Analyze Segment", variant="primary")
gr.Markdown("### Segment Overview")
with gr.Row():
drill_stat1 = gr.Number(label="Total Records")
drill_stat2 = gr.Number(label="% of Total Data")
drill_stat3 = gr.Number(label="Total Value (Sum of 1st Num Col)")
with gr.Row():
drill_plot1 = gr.Plot(label="Trend over Time")
drill_plot2 = gr.Plot(label="Top Associations")
# Update values
drill_col.change(update_comp_values, drill_col, drill_val)
def run_drill_down(col, val):
global current_df
if current_df is None:
return [0, 0, 0, None, None]
if not col or not val:
return [0, 0, 0, None, None]
# Filter data
subset = current_df[current_df[col] == val]
# Stats
count = len(subset)
pct = (count / len(current_df)) * 100
col_types = get_column_types(current_df)
total_val = 0
if col_types['numerical']:
total_val = subset[col_types['numerical'][0]].sum()
# Plot 1: Trend (if date exists)
fig1 = None
if col_types['datetime'] and col_types['numerical']:
date_col = col_types['datetime'][0]
num_col = col_types['numerical'][0]
agg = subset.groupby(date_col)[num_col].sum().reset_index()
fig1 = px.line(agg, x=date_col, y=num_col, title=f"{num_col} Trend for {val}")
# Plot 2: Top Category (if another cat exists)
fig2 = None
other_cats = [c for c in col_types['categorical'] if c != col]
if other_cats:
cat2 = other_cats[0]
top = subset[cat2].value_counts().head(10).reset_index()
top.columns = [cat2, 'Count']
fig2 = px.bar(top, x=cat2, y='Count', title=f"Top {cat2} in {val}")
return count, round(pct, 1), round(total_val, 2), fig1, fig2
drill_btn.click(
fn=run_drill_down,
inputs=[drill_col, drill_val],
outputs=[drill_stat1, drill_stat2, drill_stat3, drill_plot1, drill_plot2]
)
# Tab 9: Export
with gr.Tab("πŸ’Ύ Export"):
gr.Markdown("## Export Your Data")
export_data_btn = gr.Button("Export Filtered Data as CSV", variant="primary")
export_file = gr.File(label="Download CSV")
export_status = gr.Textbox(label="Export Status", interactive=False)
export_data_btn.click(
fn=export_filtered_data,
inputs=[],
outputs=[export_file, export_status]
)
# Connect upload button to update all dropdowns
upload_btn.click(
fn=upload_and_preview_data,
inputs=[file_input],
outputs=[
upload_status, data_preview, row_count, preview_offset,
num_filter_dropdown, num_min, num_max,
cat_filter_dropdown, cat_filter_values,
cat_filter_dropdown_2, cat_filter_values_2,
date_filter_dropdown, date_start, date_end,
x_column_viz, y_column_viz, color_column_viz,
comp_cat_col_a, comp_cat_col_b, drill_col
]
)
return demo
if __name__ == "__main__":
demo = create_dashboard()
demo.launch(share=False, server_name="0.0.0.0", server_port=7860)