Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import pandas as pd | |
| import os | |
| import matplotlib.pyplot as plt | |
| import io | |
| from PIL import Image | |
| import base64 | |
| import re | |
| import numpy as np | |
| from llama_index.llms.groq import Groq | |
| from llama_index.core.query_pipeline import ( | |
| QueryPipeline as QP, | |
| Link, | |
| InputComponent, | |
| ) | |
| from llama_index.experimental.query_engine.pandas import ( | |
| PandasInstructionParser, | |
| ) | |
| from llama_index.core import PromptTemplate | |
| # Example datasets | |
| EXAMPLE_DATASETS = { | |
| "Hotel Bookings": "hotel_bookings.csv", | |
| } | |
| def load_dataframe(file_path): | |
| try: | |
| if isinstance(file_path, str): | |
| # If it's a URL or file path | |
| df = pd.read_csv(file_path) | |
| else: | |
| # If it's an uploaded file | |
| df = pd.read_csv(file_path.name) | |
| return df, f"Successfully loaded dataset with {df.shape[0]} rows and {df.shape[1]} columns." | |
| except Exception as e: | |
| return None, f"Error loading dataset: {str(e)}" | |
| def create_query_pipeline(df, api_key, model="llama-3.3-70b-versatile"): | |
| # Create Groq LLM with the provided API key | |
| try: | |
| llm = Groq(model=model, api_key=api_key) | |
| except Exception as e: | |
| return None, f"Error initializing Groq LLM: {str(e)}" | |
| instruction_str = ( | |
| "1. Convert the query to executable Python code using Pandas.\n" | |
| "2. The final line of code should be a Python expression that can be called with the `eval()` function.\n" | |
| "3. The code should represent a solution to the query.\n" | |
| "4. PRINT ONLY THE EXPRESSION.\n" | |
| "5. Do not quote the expression.\n" | |
| ) | |
| pandas_prompt_str = ( | |
| "You are working with a pandas dataframe in Python.\n" | |
| "The name of the dataframe is `df`.\n" | |
| "This is the result of `print(df.head())`:\n" | |
| "{df_str}\n\n" | |
| "Follow these instructions:\n" | |
| "{instruction_str}\n" | |
| "Query: {query_str}\n\n" | |
| "Expression:" | |
| ) | |
| response_synthesis_prompt_str = ( | |
| "Given an input question, synthesize a response from the query results.\n" | |
| "Query: {query_str}\n\n" | |
| "Pandas Instructions (optional):\n{pandas_instructions}\n\n" | |
| "Pandas Output: {pandas_output}\n\n" | |
| "Response: " | |
| ) | |
| pandas_prompt = PromptTemplate(pandas_prompt_str).partial_format( | |
| instruction_str=instruction_str, df_str=df.head(5) | |
| ) | |
| pandas_output_parser = PandasInstructionParser(df) | |
| response_synthesis_prompt = PromptTemplate(response_synthesis_prompt_str) | |
| qp = QP( | |
| modules={ | |
| "input": InputComponent(), | |
| "pandas_prompt": pandas_prompt, | |
| "llm1": llm, | |
| "pandas_output_parser": pandas_output_parser, | |
| "response_synthesis_prompt": response_synthesis_prompt, | |
| "llm2": llm, | |
| }, | |
| verbose=True, | |
| ) | |
| qp.add_chain(["input", "pandas_prompt", "llm1", "pandas_output_parser"]) | |
| qp.add_links( | |
| [ | |
| Link("input", "response_synthesis_prompt", dest_key="query_str"), | |
| Link( | |
| "llm1", "response_synthesis_prompt", dest_key="pandas_instructions" | |
| ), | |
| Link( | |
| "pandas_output_parser", | |
| "response_synthesis_prompt", | |
| dest_key="pandas_output", | |
| ), | |
| ] | |
| ) | |
| qp.add_link("response_synthesis_prompt", "llm2") | |
| return qp, "Query pipeline created successfully!" | |
| def enhance_visualization(df, query): | |
| """ | |
| Create an enhanced visualization based on the dataframe and query | |
| This function attempts to create a better visualization with proper labels and formatting | |
| """ | |
| try: | |
| # Close any existing figures to avoid conflicts | |
| plt.close('all') | |
| # Create a new figure with larger size for better quality | |
| plt.figure(figsize=(12, 8), dpi=100) | |
| # Time-related visualization handling (for bookings over time, trends, etc.) | |
| if any(term in query.lower() for term in ['trend', 'time', 'year', 'month', 'booking', 'reservation']): | |
| # Try to detect date columns | |
| date_cols = [col for col in df.columns if any(term in col.lower() for term in | |
| ['date', 'year', 'month', 'time', 'arrival', 'reservation'])] | |
| if 'arrival_date_year' in df.columns and 'arrival_date_month' in df.columns: | |
| try: | |
| # Create a year-month based visualization | |
| # Convert month names to numbers for sorting | |
| month_order = { | |
| 'January': 1, 'February': 2, 'March': 3, 'April': 4, 'May': 5, 'June': 6, | |
| 'July': 7, 'August': 8, 'September': 9, 'October': 10, 'November': 11, 'December': 12 | |
| } | |
| # Count bookings by year and month | |
| booking_counts = df.groupby(['arrival_date_year', 'arrival_date_month']).size().reset_index(name='count') | |
| # Add month order for sorting | |
| booking_counts['month_order'] = booking_counts['arrival_date_month'].map(month_order) | |
| booking_counts = booking_counts.sort_values(['arrival_date_year', 'month_order']) | |
| # Create pivot table for visualization | |
| pivot_data = booking_counts.pivot(index='arrival_date_year', columns='arrival_date_month', values='count') | |
| # Reorder columns by month | |
| months = sorted(booking_counts['arrival_date_month'].unique(), key=lambda x: month_order.get(x, 13)) | |
| if len(months) > 0: # Check if the months list is not empty | |
| pivot_data = pivot_data[months] | |
| # Plot the data | |
| ax = pivot_data.plot(kind='bar', figsize=(14, 8), width=0.8) | |
| # Enhance the plot | |
| plt.title('Bookings by Month and Year', fontsize=16) | |
| plt.xlabel('Year', fontsize=14) | |
| plt.ylabel('Number of Bookings', fontsize=14) | |
| plt.legend(title='Month', fontsize=12) | |
| plt.grid(axis='y', linestyle='--', alpha=0.7) | |
| plt.tight_layout() | |
| # Add value labels on top of bars | |
| for container in ax.containers: | |
| ax.bar_label(container, fontsize=9, fmt='%d') | |
| else: | |
| return None # No months data found | |
| except Exception as e: | |
| print(f"Error in time visualization: {str(e)}") | |
| return None | |
| elif len(date_cols) > 0 and any(col in df.columns for col in date_cols): | |
| try: | |
| # Handle other time-based visualizations | |
| date_col = [col for col in date_cols if col in df.columns][0] | |
| df_count = df.groupby(date_col).size().reset_index(name='count') | |
| plt.bar(df_count[date_col], df_count['count'], color='steelblue') | |
| plt.title(f'Distribution by {date_col}', fontsize=16) | |
| plt.xlabel(date_col, fontsize=14) | |
| plt.ylabel('Count', fontsize=14) | |
| plt.grid(axis='y', linestyle='--', alpha=0.7) | |
| plt.xticks(rotation=45) | |
| plt.tight_layout() | |
| except Exception as e: | |
| print(f"Error in date column visualization: {str(e)}") | |
| return None | |
| else: | |
| # Default time visualization if we can't find specific columns | |
| return None # Let matplotlib handle it | |
| # Distribution visualization (for questions about distributions) | |
| elif any(term in query.lower() for term in ['distribution', 'histogram', 'spread']): | |
| try: | |
| numeric_cols = df.select_dtypes(include=['number']).columns.tolist() | |
| if len(numeric_cols) > 0: | |
| # Choose a relevant column based on query or the first numeric column | |
| target_col = None | |
| for col in numeric_cols: | |
| if col.lower() in query.lower(): | |
| target_col = col | |
| break | |
| if target_col is None and numeric_cols: | |
| target_col = numeric_cols[0] | |
| if target_col: | |
| # Create histogram | |
| plt.hist(df[target_col].dropna(), bins=30, color='steelblue', edgecolor='black', alpha=0.7) | |
| plt.title(f'Distribution of {target_col}', fontsize=16) | |
| plt.xlabel(target_col, fontsize=14) | |
| plt.ylabel('Frequency', fontsize=14) | |
| plt.grid(axis='y', linestyle='--', alpha=0.7) | |
| plt.tight_layout() | |
| else: | |
| return None # Let matplotlib handle it | |
| else: | |
| return None # Let matplotlib handle it | |
| except Exception as e: | |
| print(f"Error in distribution visualization: {str(e)}") | |
| return None | |
| # Comparison visualization (for questions comparing categories) | |
| elif any(term in query.lower() for term in ['compare', 'comparison', 'versus', 'vs', 'most', 'least']): | |
| try: | |
| categorical_cols = df.select_dtypes(include=['object']).columns.tolist() | |
| if len(categorical_cols) > 0: | |
| # Choose a relevant column based on query or the first categorical column | |
| target_col = None | |
| for col in categorical_cols: | |
| if col.lower() in query.lower(): | |
| target_col = col | |
| break | |
| if target_col is None and categorical_cols: | |
| target_col = categorical_cols[0] | |
| if target_col: | |
| # Get top categories by count | |
| top_categories = df[target_col].value_counts().nlargest(10) | |
| # Create bar chart | |
| plt.bar(top_categories.index, top_categories.values, color='steelblue') | |
| plt.title(f'Top Categories by {target_col}', fontsize=16) | |
| plt.xlabel(target_col, fontsize=14) | |
| plt.ylabel('Count', fontsize=14) | |
| plt.grid(axis='y', linestyle='--', alpha=0.7) | |
| plt.xticks(rotation=45, ha='right') | |
| plt.tight_layout() | |
| else: | |
| return None # Let matplotlib handle it | |
| else: | |
| return None # Let matplotlib handle it | |
| except Exception as e: | |
| print(f"Error in comparison visualization: {str(e)}") | |
| return None | |
| else: | |
| # For other types of queries, let the default matplotlib handle it | |
| return None | |
| # Save figure to buffer | |
| buf = io.BytesIO() | |
| plt.savefig(buf, format='png') | |
| buf.seek(0) | |
| # Create an image from the buffer | |
| img = Image.open(buf) | |
| plt.close('all') # Close the figure to free memory | |
| return img | |
| except Exception as e: | |
| print(f"Error in enhance_visualization: {str(e)}") | |
| plt.close('all') # Make sure to close any figures in case of error | |
| return None | |
| def process_query(query, api_key, df, model_choice): | |
| if df is None: | |
| return "Please load a dataset first.", None | |
| if not api_key: | |
| return "Please provide your Groq API key.", None | |
| try: | |
| # First, try to create an enhanced visualization based on the query | |
| enhanced_img = enhance_visualization(df, query) | |
| # Create and run the query pipeline | |
| pipeline, message = create_query_pipeline(df, api_key, model_choice) | |
| if pipeline is None: | |
| return message, None | |
| # Run the query | |
| response = pipeline.run(query_str=query) | |
| # If we already have an enhanced visualization, use it | |
| if enhanced_img is not None: | |
| return response.message.content, enhanced_img | |
| # Otherwise check if any matplotlib figures were created by the query | |
| figures = plt.get_fignums() | |
| if figures: | |
| try: | |
| # Improve any existing figure if possible | |
| fig = plt.figure(figures[0]) | |
| axes = fig.axes | |
| if axes and len(axes) > 0: # Make sure axes list isn't empty | |
| ax = axes[0] | |
| # Add grid lines | |
| ax.grid(axis='y', linestyle='--', alpha=0.7) | |
| # Enhance title and labels if they exist | |
| if ax.get_title(): | |
| ax.set_title(ax.get_title(), fontsize=16) | |
| if ax.get_xlabel(): | |
| ax.set_xlabel(ax.get_xlabel(), fontsize=14) | |
| if ax.get_ylabel(): | |
| ax.set_ylabel(ax.get_ylabel(), fontsize=14) | |
| # Handle legend if it exists | |
| if ax.get_legend(): | |
| ax.legend(fontsize=12) | |
| fig.tight_layout() | |
| # Save the figure to a bytes buffer | |
| buf = io.BytesIO() | |
| plt.savefig(buf, format='png', dpi=100) | |
| buf.seek(0) | |
| # Create an image from the buffer | |
| img = Image.open(buf) | |
| plt.close('all') # Close the figure to free memory | |
| return response.message.content, img | |
| except Exception as e: | |
| plt.close('all') | |
| # Log the error but continue without crashing | |
| print(f"Visualization error: {str(e)}") | |
| return response.message.content, None | |
| else: | |
| # No visualization was generated | |
| return response.message.content, None | |
| except Exception as e: | |
| plt.close('all') # Make sure to close any figures in case of error | |
| return f"Error processing query: {str(e)}", None | |
| def handle_example_selection(example_name): | |
| if example_name in EXAMPLE_DATASETS: | |
| file_path = EXAMPLE_DATASETS[example_name] | |
| df, message = load_dataframe(file_path) | |
| return df, message, gr.update(value=f"Dataset preview:\n{df.head().to_string()}") | |
| return None, "Please select a valid example dataset.", gr.update(value="") | |
| def handle_file_upload(file): | |
| if file is not None: | |
| df, message = load_dataframe(file) | |
| return df, message, gr.update(value=f"Dataset preview:\n{df.head().to_string()}") | |
| return None, "No file uploaded.", gr.update(value="") | |
| # Create Gradio interface | |
| with gr.Blocks(title="Pandas Data Analysis with Groq LLM") as app: | |
| gr.Markdown("# Pandas Data Analysis with Groq LLM") | |
| gr.Markdown("Upload your CSV data or choose an example dataset, then ask questions about it.") | |
| # State variables | |
| df_state = gr.State(value=None) | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| with gr.Group(): | |
| gr.Markdown("### Data Selection") | |
| with gr.Tab("Upload Data"): | |
| file_input = gr.File(label="Upload CSV File", file_types=[".csv"]) | |
| upload_button = gr.Button("Load Uploaded Data") | |
| with gr.Tab("Example Datasets"): | |
| example_dropdown = gr.Dropdown( | |
| choices=list(EXAMPLE_DATASETS.keys()), | |
| label="Select Example Dataset" | |
| ) | |
| example_button = gr.Button("Load Example Dataset") | |
| data_status = gr.Textbox(label="Data Loading Status", interactive=False) | |
| with gr.Group(): | |
| gr.Markdown("### Groq API Configuration") | |
| api_key = gr.Textbox( | |
| label="Enter your Groq API Key", | |
| placeholder="gsk_...", | |
| type="password" | |
| ) | |
| model_choice = gr.Dropdown( | |
| choices=["llama-3.3-70b-versatile", "mixtral-8x7b-32768", "gemma-7b-it"], | |
| value="llama-3.3-70b-versatile", | |
| label="Select Groq Model" | |
| ) | |
| with gr.Column(scale=1): | |
| data_preview = gr.Textbox(label="Dataset Preview", interactive=False, lines=10) | |
| query_input = gr.Textbox( | |
| label="Ask a question about your data", | |
| placeholder="e.g., What is the trend of monthly bookings over time?", | |
| lines=2 | |
| ) | |
| query_button = gr.Button("Submit Query") | |
| # Output display with tabs for text and visualization | |
| with gr.Tabs(): | |
| with gr.TabItem("Text Response"): | |
| response_output = gr.Textbox(label="Response", interactive=False, lines=10) | |
| with gr.TabItem("Visualization"): | |
| image_output = gr.Image(label="Data Visualization", interactive=False) | |
| # Handle events | |
| upload_button.click( | |
| handle_file_upload, | |
| inputs=[file_input], | |
| outputs=[df_state, data_status, data_preview] | |
| ) | |
| example_button.click( | |
| handle_example_selection, | |
| inputs=[example_dropdown], | |
| outputs=[df_state, data_status, data_preview] | |
| ) | |
| query_button.click( | |
| process_query, | |
| inputs=[query_input, api_key, df_state, model_choice], | |
| outputs=[response_output, image_output] | |
| ) | |
| gr.Markdown(""" | |
| ### Instructions | |
| 1. Upload your CSV file or select an example dataset | |
| 2. Enter your Groq API key (get one at [https://console.groq.com](https://console.groq.com)) | |
| 3. Ask questions about your data in natural language | |
| 4. Get AI-powered insights and visualizations based on your data | |
| ### Example Questions | |
| - What is the trend of monthly bookings over time? | |
| - What's the distribution of stay duration? | |
| - Which country has the most bookings? | |
| - Is there a correlation between lead time and cancellations? | |
| - Show me bookings by month and year | |
| """) | |
| # Launch the app | |
| if __name__ == "__main__": | |
| app.launch() |