Data_Analyzer / app.py
Tamannathakur's picture
Upload 5 files
15930e9 verified
import gradio as gr
import pandas as pd
import os
from data_engine import (
clean_numeric, run_analysis, create_visualization, handle_missing_data,
undo_last_change, undo_all_changes, download_dataset,
display_data_format, display_text_format
)
from ai_agent import initialize_llm, analyze_question
from prompts import SAMPLE_QUESTIONS
llm = None
uploaded_df = None
original_df = None
dataset_name = None
change_history = []
def upload_dataset(file):
global uploaded_df, original_df, dataset_name
if file is None:
return "No file uploaded", gr.update(visible=False), gr.update(choices=[]), gr.update(visible=False)
try:
dataset_name = os.path.basename(file.name)
if file.name.endswith('.csv'):
uploaded_df = pd.read_csv(file.name)
elif file.name.endswith(('.xlsx', '.xls')):
uploaded_df = pd.read_excel(file.name)
else:
return "Unsupported file format. Please upload CSV or Excel files.", gr.update(visible=False), gr.update(choices=[]), gr.update(visible=False)
uploaded_df = clean_numeric(uploaded_df)
original_df = uploaded_df.copy()
info_text = f" Dataset Loaded: {dataset_name} ({uploaded_df.shape[0]} rows × {uploaded_df.shape[1]} columns)"
return info_text, gr.update(visible=False), gr.update(choices=list(uploaded_df.columns), value=[]), gr.update(visible=True)
except Exception as e:
return f"Error loading file: {str(e)}", gr.update(visible=False), gr.update(choices=[]), gr.update(visible=False)
def clear_dataset():
global uploaded_df, original_df, dataset_name, change_history
uploaded_df = None
original_df = None
dataset_name = None
change_history = []
return "Dataset cleared. Please upload a new file.", gr.update(visible=False), gr.update(choices=[], value=[]), gr.update(visible=False)
def update_preview(format_type, selected_columns):
if format_type == "None":
return None, "", gr.update(visible=False), gr.update(visible=False), gr.update(visible=False)
elif format_type == "DataFrame":
return display_data_format(format_type, selected_columns, uploaded_df), "", gr.update(visible=True), gr.update(visible=False), gr.update(visible=True)
else:
return None, display_text_format(format_type, selected_columns, uploaded_df), gr.update(visible=False), gr.update(visible=True), gr.update(visible=True)
def handle_analysis_change(analysis_type, selected_columns):
result_text, data_table = run_analysis(analysis_type, selected_columns, uploaded_df)
if result_text and result_text.strip() and analysis_type != "None":
if data_table is not None:
return gr.update(value=result_text, visible=True), gr.update(visible=True), gr.update(value=data_table, visible=True)
else:
return gr.update(value=result_text, visible=True), gr.update(visible=True), gr.update(visible=False)
else:
return gr.update(value="", visible=False), gr.update(visible=False), gr.update(visible=False)
def handle_viz_change(viz_type, selected_columns):
result = create_visualization(viz_type, selected_columns, uploaded_df)
if result and len(result) == 3:
fig, explanation, chart_obj = result
if explanation and fig is not None:
return fig, gr.update(visible=True), explanation, gr.update(visible=True)
else:
return None, gr.update(visible=False), explanation or "Error in visualization", gr.update(visible=False)
else:
return None, gr.update(visible=False), "Error in visualization", gr.update(visible=False)
def show_constant_input(method):
return gr.update(visible=(method == "Constant Fill"))
def handle_data_and_refresh(method, selected_columns, constant_value, analysis_type):
global uploaded_df, change_history
result, uploaded_df, change_history = handle_missing_data(method, selected_columns, constant_value, uploaded_df, change_history)
if analysis_type == "Missing Values" and uploaded_df is not None:
analysis_result = "Missing Values Analysis:\n" + "=" * 30 + "\n\n"
patterns = ['UNKNOWN', 'unknown', 'ERROR', 'error', 'NULL', 'null', 'NA', 'na', 'N/A',
'Not Given', 'not given', 'NOT GIVEN', '', ' ', '-', '?', 'NaN', 'nan',
'None', 'none', 'NONE', '#N/A', 'n/a', 'N.A.', 'n.a.']
for col in uploaded_df.columns:
nan_count = uploaded_df[col].isnull().sum()
pseudo_missing_count = 0
non_null_data = uploaded_df[col].dropna()
if len(non_null_data) > 0:
col_str = non_null_data.astype(str).str.strip()
empty_count = (col_str == '').sum()
pattern_count = 0
for pattern in patterns:
if pattern != '':
pattern_count += (col_str.str.lower() == pattern.lower()).sum()
pseudo_missing_count = empty_count + pattern_count
total_missing = nan_count + pseudo_missing_count
missing_percent = (total_missing / len(uploaded_df)) * 100
if total_missing > 0:
details = []
if nan_count > 0:
details.append(f"{nan_count} NaN")
if pseudo_missing_count > 0:
details.append(f"{pseudo_missing_count} text-missing")
detail_str = f" ({', '.join(details)})"
else:
detail_str = ""
analysis_result += f"{col}: {total_missing} missing ({missing_percent:.2f}%){detail_str}\n"
return result, gr.update(visible=True), gr.update(choices=list(uploaded_df.columns), value=[]), gr.update(value=analysis_result, visible=True), gr.update(visible=True)
return result, gr.update(visible=True), gr.update(choices=list(uploaded_df.columns), value=[]), gr.update(), gr.update()
def handle_undo_and_refresh(analysis_type, is_undo_all=False):
global uploaded_df, change_history
if is_undo_all:
result, uploaded_df, change_history = undo_all_changes(original_df, change_history)
else:
result, uploaded_df, change_history = undo_last_change(uploaded_df, change_history)
if analysis_type == "Missing Values" and uploaded_df is not None:
result_text, data_table = run_analysis(analysis_type, [], uploaded_df)
return result, gr.update(visible=True), gr.update(choices=list(uploaded_df.columns), value=[]), gr.update(value=result_text, visible=True), gr.update(visible=True)
return result, gr.update(visible=True), gr.update(choices=list(uploaded_df.columns), value=[]), gr.update(), gr.update()
def handle_question_analysis(question, selected_columns):
return analyze_question(question, selected_columns, uploaded_df, llm)
custom_css = """
.gradio-container {
max-width: 1400px !important;
margin: 0 auto !important;
}
.header-box {
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
border-radius: 15px;
padding: 25px;
margin: 20px auto;
text-align: center;
box-shadow: 0 8px 32px rgba(0, 0, 0, 0.1);
}
.header-title {
font-size: 36px;
font-weight: bold;
color: white;
margin: 0;
text-shadow: 2px 2px 4px rgba(0,0,0,0.3);
}
.section-box {
background-color: #f8f9fa;
padding: 20px;
border-radius: 12px;
margin: 15px 0;
border: 1px solid #e9ecef;
}
"""
with gr.Blocks(theme=gr.themes.Soft(), css=custom_css) as demo:
gr.HTML("""
<div class="header-box">
<h1 class="header-title">SparkNova</h1>
<p style="color: white; font-size: 18px; margin: 10px 0 0 0;">Advanced Data Analysis Platform</p>
</div>
""")
with gr.Row():
with gr.Column(scale=1):
gr.Markdown("### Upload Dataset")
file_input = gr.File(label="Choose CSV or Excel File", file_types=[".csv", ".xlsx", ".xls"])
dataset_info = gr.Markdown()
with gr.Row():
clear_btn = gr.Button("Clear Dataset", variant="secondary", size="sm")
column_selector = gr.CheckboxGroup(
label="Select Columns (optional - for multi-column charts)",
choices=[],
visible=False
)
format_selector = gr.Dropdown(
choices=["None", "DataFrame", "JSON", "Dictionary"],
value="None",
label="Display Format"
)
gr.Markdown("### Choose an Analysis Type")
analysis_selector = gr.Dropdown(
choices=["None", "Summary", "Describe", "Top 5 Rows", "Bottom 5 Rows", "Missing Values", "Group & Aggregate", "Calculate Expressions", "Highest Correlation"],
value="None",
label="Analysis Type"
)
gr.Markdown("### Visualization Types")
viz_selector = gr.Dropdown(
choices=["None", "Bar Chart", "Line Chart", "Scatter Plot", "Pie Chart", "Histogram", "Box Plot", "Heat Map"],
value="None",
label="Chart Type"
)
gr.Markdown("### Handling Data")
data_handler = gr.Dropdown(
choices=["None", "Forward Fill", "Backward Fill", "Constant Fill", "Mean Fill", "Median Fill", "Mode Fill", "Drop Columns"],
value="None",
label="Data Handling Method"
)
constant_input = gr.Textbox(
label="Constant Value (for Constant Fill)",
placeholder="Enter value to fill missing data",
visible=False
)
with gr.Row():
apply_btn = gr.Button("Apply Change", variant="primary", size="sm")
undo_last_btn = gr.Button("Undo Last", variant="secondary", size="sm")
with gr.Row():
undo_all_btn = gr.Button("Undo All", variant="secondary", size="sm")
download_btn = gr.Button("Download", variant="secondary", size="sm")
data_handling_output = gr.Textbox(label="Data Handling Results", lines=3, visible=False, interactive=False)
download_file = gr.File(label="Download Modified Dataset", visible=False)
with gr.Column(scale=2):
preview_heading = gr.Markdown("### Dataset Preview", visible=False)
dataset_preview = gr.Dataframe(wrap=True, visible=False)
text_preview = gr.Textbox(label="Text Preview", lines=15, visible=False)
analysis_heading = gr.Markdown("### Analysis Results", visible=False)
analysis_output = gr.Textbox(label="Analysis Output", lines=10, visible=False, interactive=False)
analysis_data_table = gr.Dataframe(label="Data Table", visible=False, wrap=True)
chart_output_new = gr.Plot(label="Chart", visible=False)
chart_explanation = gr.Textbox(label="Chart Analysis", lines=5, visible=False, interactive=False)
gr.Markdown("### Sample Questions")
with gr.Row():
for i in range(0, len(SAMPLE_QUESTIONS), 3):
with gr.Column():
for j in range(3):
if i + j < len(SAMPLE_QUESTIONS):
gr.Markdown(f"• {SAMPLE_QUESTIONS[i + j]}")
gr.Markdown("### Ask Your Question")
user_question = gr.Textbox(
label="Enter your question",
placeholder="Ask anything about your data...",
lines=3
)
submit_btn = gr.Button("Analyze", variant="primary", size="lg")
gr.Markdown("### Analysis Results")
with gr.Tabs():
with gr.Tab("Response"):
output_text = gr.Textbox(
label="Analysis Response",
interactive=False,
lines=15,
show_copy_button=True
)
with gr.Tab("Visualization"):
chart_output = gr.Plot(label="Generated Chart")
with gr.Tab("Data"):
result_table = gr.Dataframe(label="Result Data", wrap=True)
file_input.change(upload_dataset, inputs=file_input, outputs=[dataset_info, dataset_preview, column_selector, column_selector])
clear_btn.click(clear_dataset, outputs=[dataset_info, dataset_preview, column_selector, column_selector])
format_selector.change(update_preview, inputs=[format_selector, column_selector], outputs=[dataset_preview, text_preview, dataset_preview, text_preview, preview_heading])
column_selector.change(update_preview, inputs=[format_selector, column_selector], outputs=[dataset_preview, text_preview, dataset_preview, text_preview, preview_heading])
analysis_selector.change(handle_analysis_change, inputs=[analysis_selector, column_selector], outputs=[analysis_output, analysis_heading, analysis_data_table])
column_selector.change(handle_analysis_change, inputs=[analysis_selector, column_selector], outputs=[analysis_output, analysis_heading, analysis_data_table])
viz_selector.change(handle_viz_change, inputs=[viz_selector, column_selector], outputs=[chart_output_new, chart_output_new, chart_explanation, chart_explanation])
column_selector.change(handle_viz_change, inputs=[viz_selector, column_selector], outputs=[chart_output_new, chart_output_new, chart_explanation, chart_explanation])
submit_btn.click(handle_question_analysis, inputs=[user_question, column_selector], outputs=[output_text, chart_output, result_table])
data_handler.change(show_constant_input, inputs=data_handler, outputs=constant_input)
apply_btn.click(handle_data_and_refresh, inputs=[data_handler, column_selector, constant_input, analysis_selector], outputs=[data_handling_output, data_handling_output, column_selector, analysis_output, analysis_heading])
undo_last_btn.click(lambda analysis_type: handle_undo_and_refresh(analysis_type, False), inputs=[analysis_selector], outputs=[data_handling_output, data_handling_output, column_selector, analysis_output, analysis_heading])
undo_all_btn.click(lambda analysis_type: handle_undo_and_refresh(analysis_type, True), inputs=[analysis_selector], outputs=[data_handling_output, data_handling_output, column_selector, analysis_output, analysis_heading])
def handle_download():
filepath = download_dataset(uploaded_df, dataset_name)
return gr.File(value=filepath, visible=bool(filepath))
download_btn.click(handle_download, outputs=download_file)
gr.HTML("<div style='text-align: center; margin-top: 20px; color: #666;'>Powered by GROQ LLM & Gradio</div>")
if __name__ == "__main__":
llm = initialize_llm()
if not llm:
print("Warning: Failed to initialize GROQ API")
demo.launch(show_error=True, share=False)