Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import pandas as pd | |
| import os | |
| from data_engine import ( | |
| clean_numeric, run_analysis, create_visualization, handle_missing_data, | |
| undo_last_change, undo_all_changes, download_dataset, | |
| display_data_format, display_text_format | |
| ) | |
| from ai_agent import initialize_llm, analyze_question | |
| from prompts import SAMPLE_QUESTIONS | |
| llm = None | |
| uploaded_df = None | |
| original_df = None | |
| dataset_name = None | |
| change_history = [] | |
| def upload_dataset(file): | |
| global uploaded_df, original_df, dataset_name | |
| if file is None: | |
| return "No file uploaded", gr.update(visible=False), gr.update(choices=[]), gr.update(visible=False) | |
| try: | |
| dataset_name = os.path.basename(file.name) | |
| if file.name.endswith('.csv'): | |
| uploaded_df = pd.read_csv(file.name) | |
| elif file.name.endswith(('.xlsx', '.xls')): | |
| uploaded_df = pd.read_excel(file.name) | |
| else: | |
| return "Unsupported file format. Please upload CSV or Excel files.", gr.update(visible=False), gr.update(choices=[]), gr.update(visible=False) | |
| uploaded_df = clean_numeric(uploaded_df) | |
| original_df = uploaded_df.copy() | |
| info_text = f" Dataset Loaded: {dataset_name} ({uploaded_df.shape[0]} rows × {uploaded_df.shape[1]} columns)" | |
| return info_text, gr.update(visible=False), gr.update(choices=list(uploaded_df.columns), value=[]), gr.update(visible=True) | |
| except Exception as e: | |
| return f"Error loading file: {str(e)}", gr.update(visible=False), gr.update(choices=[]), gr.update(visible=False) | |
| def clear_dataset(): | |
| global uploaded_df, original_df, dataset_name, change_history | |
| uploaded_df = None | |
| original_df = None | |
| dataset_name = None | |
| change_history = [] | |
| return "Dataset cleared. Please upload a new file.", gr.update(visible=False), gr.update(choices=[], value=[]), gr.update(visible=False) | |
| def update_preview(format_type, selected_columns): | |
| if format_type == "None": | |
| return None, "", gr.update(visible=False), gr.update(visible=False), gr.update(visible=False) | |
| elif format_type == "DataFrame": | |
| return display_data_format(format_type, selected_columns, uploaded_df), "", gr.update(visible=True), gr.update(visible=False), gr.update(visible=True) | |
| else: | |
| return None, display_text_format(format_type, selected_columns, uploaded_df), gr.update(visible=False), gr.update(visible=True), gr.update(visible=True) | |
| def handle_analysis_change(analysis_type, selected_columns): | |
| result_text, data_table = run_analysis(analysis_type, selected_columns, uploaded_df) | |
| if result_text and result_text.strip() and analysis_type != "None": | |
| if data_table is not None: | |
| return gr.update(value=result_text, visible=True), gr.update(visible=True), gr.update(value=data_table, visible=True) | |
| else: | |
| return gr.update(value=result_text, visible=True), gr.update(visible=True), gr.update(visible=False) | |
| else: | |
| return gr.update(value="", visible=False), gr.update(visible=False), gr.update(visible=False) | |
| def handle_viz_change(viz_type, selected_columns): | |
| result = create_visualization(viz_type, selected_columns, uploaded_df) | |
| if result and len(result) == 3: | |
| fig, explanation, chart_obj = result | |
| if explanation and fig is not None: | |
| return fig, gr.update(visible=True), explanation, gr.update(visible=True) | |
| else: | |
| return None, gr.update(visible=False), explanation or "Error in visualization", gr.update(visible=False) | |
| else: | |
| return None, gr.update(visible=False), "Error in visualization", gr.update(visible=False) | |
| def show_constant_input(method): | |
| return gr.update(visible=(method == "Constant Fill")) | |
| def handle_data_and_refresh(method, selected_columns, constant_value, analysis_type): | |
| global uploaded_df, change_history | |
| result, uploaded_df, change_history = handle_missing_data(method, selected_columns, constant_value, uploaded_df, change_history) | |
| if analysis_type == "Missing Values" and uploaded_df is not None: | |
| analysis_result = "Missing Values Analysis:\n" + "=" * 30 + "\n\n" | |
| patterns = ['UNKNOWN', 'unknown', 'ERROR', 'error', 'NULL', 'null', 'NA', 'na', 'N/A', | |
| 'Not Given', 'not given', 'NOT GIVEN', '', ' ', '-', '?', 'NaN', 'nan', | |
| 'None', 'none', 'NONE', '#N/A', 'n/a', 'N.A.', 'n.a.'] | |
| for col in uploaded_df.columns: | |
| nan_count = uploaded_df[col].isnull().sum() | |
| pseudo_missing_count = 0 | |
| non_null_data = uploaded_df[col].dropna() | |
| if len(non_null_data) > 0: | |
| col_str = non_null_data.astype(str).str.strip() | |
| empty_count = (col_str == '').sum() | |
| pattern_count = 0 | |
| for pattern in patterns: | |
| if pattern != '': | |
| pattern_count += (col_str.str.lower() == pattern.lower()).sum() | |
| pseudo_missing_count = empty_count + pattern_count | |
| total_missing = nan_count + pseudo_missing_count | |
| missing_percent = (total_missing / len(uploaded_df)) * 100 | |
| if total_missing > 0: | |
| details = [] | |
| if nan_count > 0: | |
| details.append(f"{nan_count} NaN") | |
| if pseudo_missing_count > 0: | |
| details.append(f"{pseudo_missing_count} text-missing") | |
| detail_str = f" ({', '.join(details)})" | |
| else: | |
| detail_str = "" | |
| analysis_result += f"{col}: {total_missing} missing ({missing_percent:.2f}%){detail_str}\n" | |
| return result, gr.update(visible=True), gr.update(choices=list(uploaded_df.columns), value=[]), gr.update(value=analysis_result, visible=True), gr.update(visible=True) | |
| return result, gr.update(visible=True), gr.update(choices=list(uploaded_df.columns), value=[]), gr.update(), gr.update() | |
| def handle_undo_and_refresh(analysis_type, is_undo_all=False): | |
| global uploaded_df, change_history | |
| if is_undo_all: | |
| result, uploaded_df, change_history = undo_all_changes(original_df, change_history) | |
| else: | |
| result, uploaded_df, change_history = undo_last_change(uploaded_df, change_history) | |
| if analysis_type == "Missing Values" and uploaded_df is not None: | |
| result_text, data_table = run_analysis(analysis_type, [], uploaded_df) | |
| return result, gr.update(visible=True), gr.update(choices=list(uploaded_df.columns), value=[]), gr.update(value=result_text, visible=True), gr.update(visible=True) | |
| return result, gr.update(visible=True), gr.update(choices=list(uploaded_df.columns), value=[]), gr.update(), gr.update() | |
| def handle_question_analysis(question, selected_columns): | |
| return analyze_question(question, selected_columns, uploaded_df, llm) | |
| custom_css = """ | |
| .gradio-container { | |
| max-width: 1400px !important; | |
| margin: 0 auto !important; | |
| } | |
| .header-box { | |
| background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); | |
| border-radius: 15px; | |
| padding: 25px; | |
| margin: 20px auto; | |
| text-align: center; | |
| box-shadow: 0 8px 32px rgba(0, 0, 0, 0.1); | |
| } | |
| .header-title { | |
| font-size: 36px; | |
| font-weight: bold; | |
| color: white; | |
| margin: 0; | |
| text-shadow: 2px 2px 4px rgba(0,0,0,0.3); | |
| } | |
| .section-box { | |
| background-color: #f8f9fa; | |
| padding: 20px; | |
| border-radius: 12px; | |
| margin: 15px 0; | |
| border: 1px solid #e9ecef; | |
| } | |
| """ | |
| with gr.Blocks(theme=gr.themes.Soft(), css=custom_css) as demo: | |
| gr.HTML(""" | |
| <div class="header-box"> | |
| <h1 class="header-title">SparkNova</h1> | |
| <p style="color: white; font-size: 18px; margin: 10px 0 0 0;">Advanced Data Analysis Platform</p> | |
| </div> | |
| """) | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| gr.Markdown("### Upload Dataset") | |
| file_input = gr.File(label="Choose CSV or Excel File", file_types=[".csv", ".xlsx", ".xls"]) | |
| dataset_info = gr.Markdown() | |
| with gr.Row(): | |
| clear_btn = gr.Button("Clear Dataset", variant="secondary", size="sm") | |
| column_selector = gr.CheckboxGroup( | |
| label="Select Columns (optional - for multi-column charts)", | |
| choices=[], | |
| visible=False | |
| ) | |
| format_selector = gr.Dropdown( | |
| choices=["None", "DataFrame", "JSON", "Dictionary"], | |
| value="None", | |
| label="Display Format" | |
| ) | |
| gr.Markdown("### Choose an Analysis Type") | |
| analysis_selector = gr.Dropdown( | |
| choices=["None", "Summary", "Describe", "Top 5 Rows", "Bottom 5 Rows", "Missing Values", "Group & Aggregate", "Calculate Expressions", "Highest Correlation"], | |
| value="None", | |
| label="Analysis Type" | |
| ) | |
| gr.Markdown("### Visualization Types") | |
| viz_selector = gr.Dropdown( | |
| choices=["None", "Bar Chart", "Line Chart", "Scatter Plot", "Pie Chart", "Histogram", "Box Plot", "Heat Map"], | |
| value="None", | |
| label="Chart Type" | |
| ) | |
| gr.Markdown("### Handling Data") | |
| data_handler = gr.Dropdown( | |
| choices=["None", "Forward Fill", "Backward Fill", "Constant Fill", "Mean Fill", "Median Fill", "Mode Fill", "Drop Columns"], | |
| value="None", | |
| label="Data Handling Method" | |
| ) | |
| constant_input = gr.Textbox( | |
| label="Constant Value (for Constant Fill)", | |
| placeholder="Enter value to fill missing data", | |
| visible=False | |
| ) | |
| with gr.Row(): | |
| apply_btn = gr.Button("Apply Change", variant="primary", size="sm") | |
| undo_last_btn = gr.Button("Undo Last", variant="secondary", size="sm") | |
| with gr.Row(): | |
| undo_all_btn = gr.Button("Undo All", variant="secondary", size="sm") | |
| download_btn = gr.Button("Download", variant="secondary", size="sm") | |
| data_handling_output = gr.Textbox(label="Data Handling Results", lines=3, visible=False, interactive=False) | |
| download_file = gr.File(label="Download Modified Dataset", visible=False) | |
| with gr.Column(scale=2): | |
| preview_heading = gr.Markdown("### Dataset Preview", visible=False) | |
| dataset_preview = gr.Dataframe(wrap=True, visible=False) | |
| text_preview = gr.Textbox(label="Text Preview", lines=15, visible=False) | |
| analysis_heading = gr.Markdown("### Analysis Results", visible=False) | |
| analysis_output = gr.Textbox(label="Analysis Output", lines=10, visible=False, interactive=False) | |
| analysis_data_table = gr.Dataframe(label="Data Table", visible=False, wrap=True) | |
| chart_output_new = gr.Plot(label="Chart", visible=False) | |
| chart_explanation = gr.Textbox(label="Chart Analysis", lines=5, visible=False, interactive=False) | |
| gr.Markdown("### Sample Questions") | |
| with gr.Row(): | |
| for i in range(0, len(SAMPLE_QUESTIONS), 3): | |
| with gr.Column(): | |
| for j in range(3): | |
| if i + j < len(SAMPLE_QUESTIONS): | |
| gr.Markdown(f"• {SAMPLE_QUESTIONS[i + j]}") | |
| gr.Markdown("### Ask Your Question") | |
| user_question = gr.Textbox( | |
| label="Enter your question", | |
| placeholder="Ask anything about your data...", | |
| lines=3 | |
| ) | |
| submit_btn = gr.Button("Analyze", variant="primary", size="lg") | |
| gr.Markdown("### Analysis Results") | |
| with gr.Tabs(): | |
| with gr.Tab("Response"): | |
| output_text = gr.Textbox( | |
| label="Analysis Response", | |
| interactive=False, | |
| lines=15, | |
| show_copy_button=True | |
| ) | |
| with gr.Tab("Visualization"): | |
| chart_output = gr.Plot(label="Generated Chart") | |
| with gr.Tab("Data"): | |
| result_table = gr.Dataframe(label="Result Data", wrap=True) | |
| file_input.change(upload_dataset, inputs=file_input, outputs=[dataset_info, dataset_preview, column_selector, column_selector]) | |
| clear_btn.click(clear_dataset, outputs=[dataset_info, dataset_preview, column_selector, column_selector]) | |
| format_selector.change(update_preview, inputs=[format_selector, column_selector], outputs=[dataset_preview, text_preview, dataset_preview, text_preview, preview_heading]) | |
| column_selector.change(update_preview, inputs=[format_selector, column_selector], outputs=[dataset_preview, text_preview, dataset_preview, text_preview, preview_heading]) | |
| analysis_selector.change(handle_analysis_change, inputs=[analysis_selector, column_selector], outputs=[analysis_output, analysis_heading, analysis_data_table]) | |
| column_selector.change(handle_analysis_change, inputs=[analysis_selector, column_selector], outputs=[analysis_output, analysis_heading, analysis_data_table]) | |
| viz_selector.change(handle_viz_change, inputs=[viz_selector, column_selector], outputs=[chart_output_new, chart_output_new, chart_explanation, chart_explanation]) | |
| column_selector.change(handle_viz_change, inputs=[viz_selector, column_selector], outputs=[chart_output_new, chart_output_new, chart_explanation, chart_explanation]) | |
| submit_btn.click(handle_question_analysis, inputs=[user_question, column_selector], outputs=[output_text, chart_output, result_table]) | |
| data_handler.change(show_constant_input, inputs=data_handler, outputs=constant_input) | |
| apply_btn.click(handle_data_and_refresh, inputs=[data_handler, column_selector, constant_input, analysis_selector], outputs=[data_handling_output, data_handling_output, column_selector, analysis_output, analysis_heading]) | |
| undo_last_btn.click(lambda analysis_type: handle_undo_and_refresh(analysis_type, False), inputs=[analysis_selector], outputs=[data_handling_output, data_handling_output, column_selector, analysis_output, analysis_heading]) | |
| undo_all_btn.click(lambda analysis_type: handle_undo_and_refresh(analysis_type, True), inputs=[analysis_selector], outputs=[data_handling_output, data_handling_output, column_selector, analysis_output, analysis_heading]) | |
| def handle_download(): | |
| filepath = download_dataset(uploaded_df, dataset_name) | |
| return gr.File(value=filepath, visible=bool(filepath)) | |
| download_btn.click(handle_download, outputs=download_file) | |
| gr.HTML("<div style='text-align: center; margin-top: 20px; color: #666;'>Powered by GROQ LLM & Gradio</div>") | |
| if __name__ == "__main__": | |
| llm = initialize_llm() | |
| if not llm: | |
| print("Warning: Failed to initialize GROQ API") | |
| demo.launch(show_error=True, share=False) |