import gradio as gr import pandas as pd import os from data_engine import ( clean_numeric, run_analysis, create_visualization, handle_missing_data, undo_last_change, undo_all_changes, download_dataset, display_data_format, display_text_format ) from ai_agent import initialize_llm, analyze_question from prompts import SAMPLE_QUESTIONS llm = None uploaded_df = None original_df = None dataset_name = None change_history = [] def upload_dataset(file): global uploaded_df, original_df, dataset_name if file is None: return "No file uploaded", gr.update(visible=False), gr.update(choices=[]), gr.update(visible=False) try: dataset_name = os.path.basename(file.name) if file.name.endswith('.csv'): uploaded_df = pd.read_csv(file.name) elif file.name.endswith(('.xlsx', '.xls')): uploaded_df = pd.read_excel(file.name) else: return "Unsupported file format. Please upload CSV or Excel files.", gr.update(visible=False), gr.update(choices=[]), gr.update(visible=False) uploaded_df = clean_numeric(uploaded_df) original_df = uploaded_df.copy() info_text = f" Dataset Loaded: {dataset_name} ({uploaded_df.shape[0]} rows × {uploaded_df.shape[1]} columns)" return info_text, gr.update(visible=False), gr.update(choices=list(uploaded_df.columns), value=[]), gr.update(visible=True) except Exception as e: return f"Error loading file: {str(e)}", gr.update(visible=False), gr.update(choices=[]), gr.update(visible=False) def clear_dataset(): global uploaded_df, original_df, dataset_name, change_history uploaded_df = None original_df = None dataset_name = None change_history = [] return "Dataset cleared. Please upload a new file.", gr.update(visible=False), gr.update(choices=[], value=[]), gr.update(visible=False) def update_preview(format_type, selected_columns): if format_type == "None": return None, "", gr.update(visible=False), gr.update(visible=False), gr.update(visible=False) elif format_type == "DataFrame": return display_data_format(format_type, selected_columns, uploaded_df), "", gr.update(visible=True), gr.update(visible=False), gr.update(visible=True) else: return None, display_text_format(format_type, selected_columns, uploaded_df), gr.update(visible=False), gr.update(visible=True), gr.update(visible=True) def handle_analysis_change(analysis_type, selected_columns): result_text, data_table = run_analysis(analysis_type, selected_columns, uploaded_df) if result_text and result_text.strip() and analysis_type != "None": if data_table is not None: return gr.update(value=result_text, visible=True), gr.update(visible=True), gr.update(value=data_table, visible=True) else: return gr.update(value=result_text, visible=True), gr.update(visible=True), gr.update(visible=False) else: return gr.update(value="", visible=False), gr.update(visible=False), gr.update(visible=False) def handle_viz_change(viz_type, selected_columns): result = create_visualization(viz_type, selected_columns, uploaded_df) if result and len(result) == 3: fig, explanation, chart_obj = result if explanation and fig is not None: return fig, gr.update(visible=True), explanation, gr.update(visible=True) else: return None, gr.update(visible=False), explanation or "Error in visualization", gr.update(visible=False) else: return None, gr.update(visible=False), "Error in visualization", gr.update(visible=False) def show_constant_input(method): return gr.update(visible=(method == "Constant Fill")) def handle_data_and_refresh(method, selected_columns, constant_value, analysis_type): global uploaded_df, change_history result, uploaded_df, change_history = handle_missing_data(method, selected_columns, constant_value, uploaded_df, change_history) if analysis_type == "Missing Values" and uploaded_df is not None: analysis_result = "Missing Values Analysis:\n" + "=" * 30 + "\n\n" patterns = ['UNKNOWN', 'unknown', 'ERROR', 'error', 'NULL', 'null', 'NA', 'na', 'N/A', 'Not Given', 'not given', 'NOT GIVEN', '', ' ', '-', '?', 'NaN', 'nan', 'None', 'none', 'NONE', '#N/A', 'n/a', 'N.A.', 'n.a.'] for col in uploaded_df.columns: nan_count = uploaded_df[col].isnull().sum() pseudo_missing_count = 0 non_null_data = uploaded_df[col].dropna() if len(non_null_data) > 0: col_str = non_null_data.astype(str).str.strip() empty_count = (col_str == '').sum() pattern_count = 0 for pattern in patterns: if pattern != '': pattern_count += (col_str.str.lower() == pattern.lower()).sum() pseudo_missing_count = empty_count + pattern_count total_missing = nan_count + pseudo_missing_count missing_percent = (total_missing / len(uploaded_df)) * 100 if total_missing > 0: details = [] if nan_count > 0: details.append(f"{nan_count} NaN") if pseudo_missing_count > 0: details.append(f"{pseudo_missing_count} text-missing") detail_str = f" ({', '.join(details)})" else: detail_str = "" analysis_result += f"{col}: {total_missing} missing ({missing_percent:.2f}%){detail_str}\n" return result, gr.update(visible=True), gr.update(choices=list(uploaded_df.columns), value=[]), gr.update(value=analysis_result, visible=True), gr.update(visible=True) return result, gr.update(visible=True), gr.update(choices=list(uploaded_df.columns), value=[]), gr.update(), gr.update() def handle_undo_and_refresh(analysis_type, is_undo_all=False): global uploaded_df, change_history if is_undo_all: result, uploaded_df, change_history = undo_all_changes(original_df, change_history) else: result, uploaded_df, change_history = undo_last_change(uploaded_df, change_history) if analysis_type == "Missing Values" and uploaded_df is not None: result_text, data_table = run_analysis(analysis_type, [], uploaded_df) return result, gr.update(visible=True), gr.update(choices=list(uploaded_df.columns), value=[]), gr.update(value=result_text, visible=True), gr.update(visible=True) return result, gr.update(visible=True), gr.update(choices=list(uploaded_df.columns), value=[]), gr.update(), gr.update() def handle_question_analysis(question, selected_columns): return analyze_question(question, selected_columns, uploaded_df, llm) custom_css = """ .gradio-container { max-width: 1400px !important; margin: 0 auto !important; } .header-box { background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); border-radius: 15px; padding: 25px; margin: 20px auto; text-align: center; box-shadow: 0 8px 32px rgba(0, 0, 0, 0.1); } .header-title { font-size: 36px; font-weight: bold; color: white; margin: 0; text-shadow: 2px 2px 4px rgba(0,0,0,0.3); } .section-box { background-color: #f8f9fa; padding: 20px; border-radius: 12px; margin: 15px 0; border: 1px solid #e9ecef; } """ with gr.Blocks(theme=gr.themes.Soft(), css=custom_css) as demo: gr.HTML("""

SparkNova

Advanced Data Analysis Platform

""") with gr.Row(): with gr.Column(scale=1): gr.Markdown("### Upload Dataset") file_input = gr.File(label="Choose CSV or Excel File", file_types=[".csv", ".xlsx", ".xls"]) dataset_info = gr.Markdown() with gr.Row(): clear_btn = gr.Button("Clear Dataset", variant="secondary", size="sm") column_selector = gr.CheckboxGroup( label="Select Columns (optional - for multi-column charts)", choices=[], visible=False ) format_selector = gr.Dropdown( choices=["None", "DataFrame", "JSON", "Dictionary"], value="None", label="Display Format" ) gr.Markdown("### Choose an Analysis Type") analysis_selector = gr.Dropdown( choices=["None", "Summary", "Describe", "Top 5 Rows", "Bottom 5 Rows", "Missing Values", "Group & Aggregate", "Calculate Expressions", "Highest Correlation"], value="None", label="Analysis Type" ) gr.Markdown("### Visualization Types") viz_selector = gr.Dropdown( choices=["None", "Bar Chart", "Line Chart", "Scatter Plot", "Pie Chart", "Histogram", "Box Plot", "Heat Map"], value="None", label="Chart Type" ) gr.Markdown("### Handling Data") data_handler = gr.Dropdown( choices=["None", "Forward Fill", "Backward Fill", "Constant Fill", "Mean Fill", "Median Fill", "Mode Fill", "Drop Columns"], value="None", label="Data Handling Method" ) constant_input = gr.Textbox( label="Constant Value (for Constant Fill)", placeholder="Enter value to fill missing data", visible=False ) with gr.Row(): apply_btn = gr.Button("Apply Change", variant="primary", size="sm") undo_last_btn = gr.Button("Undo Last", variant="secondary", size="sm") with gr.Row(): undo_all_btn = gr.Button("Undo All", variant="secondary", size="sm") download_btn = gr.Button("Download", variant="secondary", size="sm") data_handling_output = gr.Textbox(label="Data Handling Results", lines=3, visible=False, interactive=False) download_file = gr.File(label="Download Modified Dataset", visible=False) with gr.Column(scale=2): preview_heading = gr.Markdown("### Dataset Preview", visible=False) dataset_preview = gr.Dataframe(wrap=True, visible=False) text_preview = gr.Textbox(label="Text Preview", lines=15, visible=False) analysis_heading = gr.Markdown("### Analysis Results", visible=False) analysis_output = gr.Textbox(label="Analysis Output", lines=10, visible=False, interactive=False) analysis_data_table = gr.Dataframe(label="Data Table", visible=False, wrap=True) chart_output_new = gr.Plot(label="Chart", visible=False) chart_explanation = gr.Textbox(label="Chart Analysis", lines=5, visible=False, interactive=False) gr.Markdown("### Sample Questions") with gr.Row(): for i in range(0, len(SAMPLE_QUESTIONS), 3): with gr.Column(): for j in range(3): if i + j < len(SAMPLE_QUESTIONS): gr.Markdown(f"• {SAMPLE_QUESTIONS[i + j]}") gr.Markdown("### Ask Your Question") user_question = gr.Textbox( label="Enter your question", placeholder="Ask anything about your data...", lines=3 ) submit_btn = gr.Button("Analyze", variant="primary", size="lg") gr.Markdown("### Analysis Results") with gr.Tabs(): with gr.Tab("Response"): output_text = gr.Textbox( label="Analysis Response", interactive=False, lines=15, show_copy_button=True ) with gr.Tab("Visualization"): chart_output = gr.Plot(label="Generated Chart") with gr.Tab("Data"): result_table = gr.Dataframe(label="Result Data", wrap=True) file_input.change(upload_dataset, inputs=file_input, outputs=[dataset_info, dataset_preview, column_selector, column_selector]) clear_btn.click(clear_dataset, outputs=[dataset_info, dataset_preview, column_selector, column_selector]) format_selector.change(update_preview, inputs=[format_selector, column_selector], outputs=[dataset_preview, text_preview, dataset_preview, text_preview, preview_heading]) column_selector.change(update_preview, inputs=[format_selector, column_selector], outputs=[dataset_preview, text_preview, dataset_preview, text_preview, preview_heading]) analysis_selector.change(handle_analysis_change, inputs=[analysis_selector, column_selector], outputs=[analysis_output, analysis_heading, analysis_data_table]) column_selector.change(handle_analysis_change, inputs=[analysis_selector, column_selector], outputs=[analysis_output, analysis_heading, analysis_data_table]) viz_selector.change(handle_viz_change, inputs=[viz_selector, column_selector], outputs=[chart_output_new, chart_output_new, chart_explanation, chart_explanation]) column_selector.change(handle_viz_change, inputs=[viz_selector, column_selector], outputs=[chart_output_new, chart_output_new, chart_explanation, chart_explanation]) submit_btn.click(handle_question_analysis, inputs=[user_question, column_selector], outputs=[output_text, chart_output, result_table]) data_handler.change(show_constant_input, inputs=data_handler, outputs=constant_input) apply_btn.click(handle_data_and_refresh, inputs=[data_handler, column_selector, constant_input, analysis_selector], outputs=[data_handling_output, data_handling_output, column_selector, analysis_output, analysis_heading]) undo_last_btn.click(lambda analysis_type: handle_undo_and_refresh(analysis_type, False), inputs=[analysis_selector], outputs=[data_handling_output, data_handling_output, column_selector, analysis_output, analysis_heading]) undo_all_btn.click(lambda analysis_type: handle_undo_and_refresh(analysis_type, True), inputs=[analysis_selector], outputs=[data_handling_output, data_handling_output, column_selector, analysis_output, analysis_heading]) def handle_download(): filepath = download_dataset(uploaded_df, dataset_name) return gr.File(value=filepath, visible=bool(filepath)) download_btn.click(handle_download, outputs=download_file) gr.HTML("
Powered by GROQ LLM & Gradio
") if __name__ == "__main__": llm = initialize_llm() if not llm: print("Warning: Failed to initialize GROQ API") demo.launch(show_error=True, share=False)