| import gradio as gr |
| import pandas as pd |
| import numpy as np |
| import matplotlib.pyplot as plt |
| import seaborn as sns |
| import plotly.express as px |
| import plotly.graph_objects as go |
| import io |
| from sklearn.decomposition import PCA |
| from sklearn.preprocessing import StandardScaler |
| import os |
| import json |
| import requests |
| import re |
| import torch |
| import openai |
| from transformers import pipeline, AutoModelForCausalLM, AutoTokenizer |
| import base64 |
| from io import BytesIO |
|
|
| |
| sns.set(style="whitegrid") |
| plt.rcParams["figure.figsize"] = (10, 6) |
|
|
| |
| OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY", "") |
| HF_API_TOKEN = os.environ.get("HF_API_TOKEN", "") |
| data_assistant = None |
|
|
| def set_openai_key(api_key): |
| """Set the OpenAI API key.""" |
| global OPENAI_API_KEY |
| OPENAI_API_KEY = api_key |
| openai.api_key = api_key |
| return "OpenAI API key set successfully!" |
|
|
| def set_hf_token(api_token): |
| """Set the Hugging Face API token.""" |
| global HF_API_TOKEN, data_assistant |
| HF_API_TOKEN = api_token |
| os.environ["TRANSFORMERS_TOKEN"] = api_token |
| data_assistant = initialize_ai_models() |
| return "Hugging Face token set successfully!" |
|
|
| |
| def initialize_ai_models(): |
| """Initialize the AI models for data analysis.""" |
| |
| |
| |
| |
| try: |
| tokenizer = AutoTokenizer.from_pretrained("distilgpt2") |
| model = AutoModelForCausalLM.from_pretrained("distilgpt2") |
| data_assistant = pipeline("text-generation", model=model, tokenizer=tokenizer) |
| except Exception as e: |
| print(f"Error loading model: {e}") |
| |
| try: |
| data_assistant = pipeline("text-generation", model="distilgpt2") |
| except: |
| data_assistant = None |
| |
| return data_assistant |
|
|
| def read_file(file): |
| """Read different file formats into a pandas DataFrame with robust separator detection.""" |
| if file is None: |
| return None |
| |
| file_name = file.name if hasattr(file, 'name') else '' |
| print(f"Reading file: {file_name}") |
| |
| try: |
| |
| if file_name.endswith('.csv'): |
| |
| separators = [',', ';', '\t', '|'] |
| errors = [] |
| |
| for sep in separators: |
| try: |
| |
| |
| df = pd.read_csv(file, sep=sep) |
| |
| |
| if len(df.columns) > 1: |
| print(f"Successfully read CSV with separator '{sep}': {df.shape}") |
| |
| |
| for col in df.columns: |
| |
| if df[col].dtype == 'object': |
| df[col] = pd.to_numeric(df[col], errors='ignore') |
| |
| return df |
| else: |
| errors.append(f"Only got {len(df.columns)} columns with '{sep}' separator") |
| except Exception as e: |
| errors.append(f"Error with '{sep}' separator: {str(e)}") |
| |
| |
| error_msg = "\n".join(errors) |
| print(f"All separators failed: {error_msg}") |
| |
| |
| try: |
| df = pd.read_csv(file, sep=None, engine='python') |
| if len(df.columns) > 1: |
| print(f"Read CSV with automatic separator detection: {df.shape}") |
| return df |
| else: |
| return "Could not detect the appropriate separator for this CSV file." |
| except Exception as e: |
| print(f"Error with automatic separator detection: {e}") |
| return "Could not read the CSV file. Please check the file format and try again." |
| |
| elif file_name.endswith(('.xls', '.xlsx')): |
| return pd.read_excel(file) |
| elif file_name.endswith('.json'): |
| return pd.read_json(file) |
| elif file_name.endswith('.txt'): |
| |
| try: |
| df = pd.read_csv(file, delimiter='\t') |
| if len(df.columns) > 1: |
| return df |
| else: |
| |
| return pd.read_csv(file, sep=None, engine='python') |
| except Exception as e: |
| print(f"Error reading text file: {e}") |
| return f"Error reading text file: {str(e)}" |
| else: |
| return "Unsupported file format. Please upload .csv, .xlsx, .xls, .json, or .txt files." |
| except Exception as e: |
| print(f"Error reading file: {str(e)}") |
| return f"Error reading file: {str(e)}" |
|
|
| def analyze_data(df): |
| """Generate basic statistics and information about the dataset.""" |
| if not isinstance(df, pd.DataFrame): |
| return df |
| |
| |
| info = {} |
| info['Shape'] = df.shape |
| info['Columns'] = df.columns.tolist() |
| info['Data Types'] = df.dtypes.astype(str).to_dict() |
| |
| |
| missing_values = df.isnull().sum() |
| if missing_values.sum() > 0: |
| info['Missing Values'] = missing_values[missing_values > 0].to_dict() |
| else: |
| info['Missing Values'] = "No missing values found" |
| |
| |
| info['Data Quality Issues'] = identify_data_quality_issues(df) |
| |
| |
| numeric_cols = df.select_dtypes(include=[np.number]).columns.tolist() |
| if numeric_cols: |
| info['Numeric Columns'] = numeric_cols |
| info['Statistics'] = df[numeric_cols].describe().to_html() |
| |
| |
| outliers = detect_outliers(df, numeric_cols) |
| if outliers: |
| info['Outliers'] = outliers |
| |
| |
| categorical_cols = df.select_dtypes(include=['object', 'category']).columns.tolist() |
| if categorical_cols: |
| info['Categorical Columns'] = categorical_cols |
| |
| cat_counts = {} |
| for col in categorical_cols[:5]: |
| cat_counts[col] = df[col].value_counts().head(10).to_dict() |
| info['Category Counts'] = cat_counts |
| |
| return info |
|
|
| def identify_data_quality_issues(df): |
| """Identify common data quality issues.""" |
| issues = {} |
| |
| |
| duplicate_count = df.duplicated().sum() |
| if duplicate_count > 0: |
| issues['Duplicate Rows'] = duplicate_count |
| |
| |
| categorical_cols = df.select_dtypes(include=['object', 'category']).columns.tolist() |
| high_cardinality = {} |
| for col in categorical_cols: |
| unique_count = df[col].nunique() |
| if unique_count > 50: |
| high_cardinality[col] = unique_count |
| |
| if high_cardinality: |
| issues['High Cardinality Columns'] = high_cardinality |
| |
| |
| potential_date_cols = [] |
| for col in df.select_dtypes(include=['object']).columns: |
| |
| sample = df[col].dropna().head(10).tolist() |
| if all(isinstance(x, str) for x in sample): |
| |
| date_pattern = re.compile(r'\d{1,4}[-/\.]\d{1,2}[-/\.]\d{1,4}') |
| if any(date_pattern.search(str(x)) for x in sample): |
| potential_date_cols.append(col) |
| |
| if potential_date_cols: |
| issues['Potential Date Columns'] = potential_date_cols |
| |
| |
| high_missing = {} |
| for col in df.columns: |
| missing_pct = df[col].isnull().mean() * 100 |
| if missing_pct > 50: |
| high_missing[col] = f"{missing_pct:.2f}%" |
| |
| if high_missing: |
| issues['Columns with >50% Missing'] = high_missing |
| |
| return issues |
|
|
| def detect_outliers(df, numeric_cols): |
| """Detect outliers in numeric columns using IQR method.""" |
| outliers = {} |
| |
| for col in numeric_cols: |
| |
| if df[col].nunique() > df.shape[0] * 0.9: |
| continue |
| |
| |
| Q1 = df[col].quantile(0.25) |
| Q3 = df[col].quantile(0.75) |
| IQR = Q3 - Q1 |
| |
| |
| lower_bound = Q1 - 1.5 * IQR |
| upper_bound = Q3 + 1.5 * IQR |
| |
| |
| outlier_count = ((df[col] < lower_bound) | (df[col] > upper_bound)).sum() |
| |
| if outlier_count > 0: |
| outlier_pct = (outlier_count / df.shape[0]) * 100 |
| if outlier_pct > 1: |
| outliers[col] = { |
| 'count': outlier_count, |
| 'percentage': f"{outlier_pct:.2f}%", |
| 'lower_bound': lower_bound, |
| 'upper_bound': upper_bound |
| } |
| |
| return outliers |
|
|
| def generate_visualizations(df): |
| """Generate appropriate visualizations based on the data types.""" |
| if not isinstance(df, pd.DataFrame): |
| print(f"Not a DataFrame: {type(df)}") |
| return df |
| |
| print(f"Starting visualization generation for DataFrame with shape: {df.shape}") |
| |
| visualizations = {} |
| |
| |
| numeric_cols = df.select_dtypes(include=[np.number]).columns.tolist() |
| categorical_cols = df.select_dtypes(include=['object', 'category']).columns.tolist() |
| date_cols = [col for col in df.columns if df[col].dtype == 'datetime64[ns]' or |
| (df[col].dtype == 'object' and pd.to_datetime(df[col], errors='coerce').notna().all())] |
| |
| print(f"Found {len(numeric_cols)} numeric columns: {numeric_cols}") |
| print(f"Found {len(categorical_cols)} categorical columns: {categorical_cols}") |
| print(f"Found {len(date_cols)} date columns: {date_cols}") |
| |
| try: |
| |
| if len(df) > 0 and len(df.columns) > 0: |
| col = df.columns[0] |
| try: |
| test_data = df[col].head(100) |
| fig = px.histogram(x=test_data, title=f"Test Plot for {col}") |
| visualizations['test_plot'] = fig |
| print(f"Generated test plot for column: {col}") |
| except Exception as e: |
| print(f"Error creating test plot: {e}") |
| |
| |
| if numeric_cols: |
| for i, col in enumerate(numeric_cols[:5]): |
| try: |
| fig = px.histogram(df, x=col, marginal="box", title=f"Distribution of {col}") |
| visualizations[f'dist_{col}'] = fig |
| print(f"Generated distribution plot for {col}") |
| except Exception as e: |
| print(f"Error creating histogram for {col}: {e}") |
| |
| |
| if categorical_cols: |
| for i, col in enumerate(categorical_cols[:5]): |
| try: |
| |
| value_counts = df[col].value_counts().nlargest(10) |
| |
| |
| value_counts.index = value_counts.index.astype(str) |
| |
| fig = px.bar(x=value_counts.index, y=value_counts.values, |
| title=f"Top 10 categories in {col}") |
| fig.update_xaxes(title=col) |
| fig.update_yaxes(title="Count") |
| visualizations[f'bar_{col}'] = fig |
| print(f"Generated bar chart for {col}") |
| except Exception as e: |
| print(f"Error creating bar chart for {col}: {e}") |
| |
| |
| if len(numeric_cols) > 1: |
| try: |
| corr_matrix = df[numeric_cols].corr() |
| fig = px.imshow(corr_matrix, text_auto=True, aspect="auto", |
| title="Correlation Heatmap") |
| visualizations['correlation'] = fig |
| print("Generated correlation heatmap") |
| except Exception as e: |
| print(f"Error creating correlation heatmap: {e}") |
| |
| |
| if len(numeric_cols) >= 2: |
| try: |
| plot_cols = numeric_cols[:3] |
| fig = px.scatter_matrix(df, dimensions=plot_cols, title="Scatter Plot Matrix") |
| visualizations['scatter_matrix'] = fig |
| print("Generated scatter plot matrix") |
| except Exception as e: |
| print(f"Error creating scatter matrix: {e}") |
| |
| |
| if date_cols and numeric_cols: |
| try: |
| date_col = date_cols[0] |
| |
| if df[date_col].dtype != 'datetime64[ns]': |
| df[date_col] = pd.to_datetime(df[date_col], errors='coerce') |
| |
| |
| df_sorted = df.sort_values(by=date_col) |
| |
| |
| num_col = numeric_cols[0] |
| fig = px.line(df_sorted, x=date_col, y=num_col, |
| title=f"{num_col} over Time") |
| visualizations['time_series'] = fig |
| print("Generated time series plot") |
| except Exception as e: |
| print(f"Error creating time series plot: {e}") |
| |
| |
| if len(numeric_cols) >= 3: |
| try: |
| |
| numeric_data = df[numeric_cols].select_dtypes(include=[np.number]) |
| |
| numeric_data = numeric_data.fillna(numeric_data.mean()) |
| |
| |
| scaler = StandardScaler() |
| scaled_data = scaler.fit_transform(numeric_data) |
| |
| |
| pca = PCA(n_components=2) |
| pca_result = pca.fit_transform(scaled_data) |
| |
| |
| pca_df = pd.DataFrame(data=pca_result, columns=['PC1', 'PC2']) |
| |
| |
| if categorical_cols: |
| cat_col = categorical_cols[0] |
| pca_df[cat_col] = df[cat_col].values |
| fig = px.scatter(pca_df, x='PC1', y='PC2', color=cat_col, |
| title="PCA Visualization") |
| else: |
| fig = px.scatter(pca_df, x='PC1', y='PC2', |
| title="PCA Visualization") |
| |
| variance_ratio = pca.explained_variance_ratio_ |
| fig.update_layout( |
| annotations=[ |
| dict( |
| text=f"PC1 explained variance: {variance_ratio[0]:.2f}", |
| showarrow=False, |
| x=0.5, |
| y=1.05, |
| xref="paper", |
| yref="paper" |
| ), |
| dict( |
| text=f"PC2 explained variance: {variance_ratio[1]:.2f}", |
| showarrow=False, |
| x=0.5, |
| y=1.02, |
| xref="paper", |
| yref="paper" |
| ) |
| ] |
| ) |
| |
| visualizations['pca'] = fig |
| print("Generated PCA visualization") |
| except Exception as e: |
| print(f"Error creating PCA visualization: {e}") |
|
|
| except Exception as e: |
| print(f"Error in visualization generation: {e}") |
| |
| print(f"Generated {len(visualizations)} visualizations") |
| |
| |
| if not visualizations: |
| visualizations['fallback'] = generate_fallback_visualization(df) |
| |
| return visualizations |
|
|
| def generate_fallback_visualization(df): |
| """Generate a simple fallback visualization using matplotlib.""" |
| try: |
| plt.figure(figsize=(10, 6)) |
| |
| |
| numeric_cols = df.select_dtypes(include=[np.number]).columns.tolist() |
| if numeric_cols: |
| |
| col = numeric_cols[0] |
| plt.hist(df[col].dropna(), bins=20) |
| plt.title(f"Distribution of {col}") |
| plt.xlabel(col) |
| plt.ylabel("Count") |
| else: |
| |
| col = df.columns[0] |
| value_counts = df[col].value_counts().nlargest(10) |
| plt.bar(value_counts.index.astype(str), value_counts.values) |
| plt.title(f"Top values for {col}") |
| plt.xticks(rotation=45) |
| plt.ylabel("Count") |
| |
| |
| fig = go.Figure() |
| |
| |
| if numeric_cols: |
| hist, bin_edges = np.histogram(df[numeric_cols[0]].dropna(), bins=20) |
| bin_centers = (bin_edges[:-1] + bin_edges[1:]) / 2 |
| fig.add_trace(go.Bar(x=bin_centers, y=hist, name=numeric_cols[0])) |
| fig.update_layout(title=f"Distribution of {numeric_cols[0]}") |
| else: |
| col = df.columns[0] |
| counts = df[col].value_counts().nlargest(10) |
| fig.add_trace(go.Bar(x=counts.index.astype(str), y=counts.values, name=col)) |
| fig.update_layout(title=f"Top values for {col}") |
| |
| return fig |
| except Exception as e: |
| print(f"Error generating fallback visualization: {e}") |
| |
| fig = go.Figure() |
| fig.add_annotation(text="Could not generate visualization", showarrow=False) |
| fig.update_layout(title="Visualization Error") |
| return fig |
|
|
| def get_ai_cleaning_recommendations(df): |
| """Get AI-powered recommendations for data cleaning using OpenAI.""" |
| try: |
| |
| global OPENAI_API_KEY |
| if not OPENAI_API_KEY: |
| return """ |
| ## OpenAI API Key Not Configured |
| |
| Please set your OpenAI API key in the Settings tab to get AI-powered data cleaning recommendations. |
| |
| Without an API key, here are some general recommendations: |
| |
| * Handle missing values by either removing rows or imputing with mean/median/mode |
| * Remove duplicate rows if present |
| * Convert date-like string columns to proper datetime format |
| * Standardize text data by removing extra spaces and converting to lowercase |
| * Check for and handle outliers in numerical columns |
| """ |
| |
| |
| summary = { |
| "shape": df.shape, |
| "columns": df.columns.tolist(), |
| "dtypes": df.dtypes.astype(str).to_dict(), |
| "missing_values": df.isnull().sum().to_dict(), |
| "duplicates": df.duplicated().sum(), |
| "sample_data": df.head(5).to_dict() |
| } |
| |
| |
| prompt = f""" |
| I have a dataset with the following properties: |
| - Shape: {summary['shape']} |
| - Columns: {', '.join(summary['columns'])} |
| - Missing values: {summary['missing_values']} |
| - Duplicate rows: {summary['duplicates']} |
| |
| Here's a sample of the data: |
| {json.dumps(summary['sample_data'], indent=2)} |
| |
| Based on this information, provide specific data cleaning recommendations in a bulleted list. |
| Include suggestions for handling missing values, outliers, data types, and duplicate rows. |
| Format your response as markdown and ONLY include the cleaning recommendations. |
| """ |
| |
| |
| openai.api_key = OPENAI_API_KEY |
| response = openai.ChatCompletion.create( |
| model="gpt-3.5-turbo", |
| messages=[ |
| {"role": "system", "content": "You are a data science assistant focused on data cleaning recommendations."}, |
| {"role": "user", "content": prompt} |
| ], |
| max_tokens=700 |
| ) |
| return response.choices[0].message.content |
| except Exception as e: |
| |
| global data_assistant |
| if data_assistant is None: |
| data_assistant = initialize_ai_models() |
| |
| if data_assistant: |
| |
| short_prompt = f"Data cleaning recommendations for dataset with {df.shape[0]} rows, {df.shape[1]} columns, and columns: {', '.join(df.columns[:5])}..." |
| |
| try: |
| |
| recommendations = data_assistant( |
| short_prompt, |
| max_length=500, |
| num_return_sequences=1 |
| )[0]['generated_text'] |
| |
| return f""" |
| ## Data Cleaning Recommendations |
| |
| * Handle missing values in columns with appropriate imputation techniques |
| * Check for and remove duplicate records |
| * Standardize text fields and correct spelling errors |
| * Convert columns to appropriate data types |
| * Check for and handle outliers in numerical columns |
| |
| Note: Using basic AI model as OpenAI API encountered an error: {str(e)} |
| """ |
| except: |
| pass |
| |
| return f""" |
| ## Data Cleaning Recommendations |
| |
| * Handle missing values by either removing rows or imputing with mean/median/mode |
| * Remove duplicate rows if present |
| * Convert date-like string columns to proper datetime format |
| * Standardize text data by removing extra spaces and converting to lowercase |
| * Check for and handle outliers in numerical columns |
| |
| Note: Could not access AI models for customized recommendations. Error: {str(e)} |
| """ |
|
|
| def get_hf_model_insights(df): |
| """Get dataset insights using Hugging Face model.""" |
| try: |
| global data_assistant, HF_API_TOKEN |
| |
| |
| if not HF_API_TOKEN and not data_assistant: |
| return """ |
| ## Hugging Face API Token Not Configured |
| |
| Please set your Hugging Face API token in the Settings tab to get AI-powered data analysis insights. |
| |
| Without an API token, here are some general analysis suggestions: |
| |
| 1. Examine the distribution of each numeric column |
| 2. Analyze correlations between numeric features |
| 3. Look for patterns in categorical data |
| 4. Consider creating visualizations like histograms and scatter plots |
| 5. Explore relationships between different variables |
| """ |
| |
| |
| if data_assistant is None: |
| data_assistant = initialize_ai_models() |
| |
| if not data_assistant: |
| return """ |
| ## AI Model Not Available |
| |
| Could not initialize the Hugging Face model. Please check your API token or try again later. |
| |
| Here are some general analysis suggestions: |
| |
| 1. Examine the distribution of each numeric column |
| 2. Analyze correlations between numeric features |
| 3. Look for patterns in categorical data |
| 4. Consider creating pivot tables to understand relationships |
| 5. Look for time-based patterns if datetime columns are present |
| """ |
| |
| |
| numeric_cols = df.select_dtypes(include=[np.number]).columns.tolist() |
| categorical_cols = df.select_dtypes(include=['object', 'category']).columns.tolist() |
| |
| dataset_summary = f""" |
| Dataset with {df.shape[0]} rows and {df.shape[1]} columns. |
| Numeric columns: {', '.join(numeric_cols[:5])} |
| Categorical columns: {', '.join(categorical_cols[:5])} |
| """ |
| |
| |
| prompt = f"Based on this dataset summary, suggest data analysis approaches: {dataset_summary}" |
| |
| response = data_assistant( |
| prompt, |
| max_length=300, |
| num_return_sequences=1 |
| )[0]['generated_text'] |
| |
| |
| analysis_insights = response.replace(prompt, "").strip() |
| |
| if not analysis_insights or len(analysis_insights) < 50: |
| |
| analysis_insights = """ |
| ## Data Analysis Suggestions |
| |
| 1. For numeric columns, calculate correlation matrices to identify relationships |
| 2. For categorical columns, analyze frequency distributions |
| 3. Consider creating pivot tables to understand how categories relate |
| 4. Look for time-based patterns if datetime columns are present |
| 5. Consider dimensionality reduction techniques like PCA for visualization |
| """ |
| |
| return analysis_insights |
| |
| except Exception as e: |
| return f""" |
| ## Data Analysis Suggestions |
| |
| 1. Examine the distribution of each numeric column |
| 2. Analyze correlations between numeric features |
| 3. Look for patterns in categorical data |
| 4. Consider creating visualizations like histograms and scatter plots |
| 5. Explore relationships between different variables |
| |
| Note: Could not access AI models for customized recommendations. Error: {str(e)} |
| """ |
|
|
| def process_file(file): |
| """Main function to process uploaded file and generate analysis.""" |
| |
| df = read_file(file) |
| |
| if isinstance(df, str): |
| return df, None, None, None |
| |
| |
| for col in df.columns: |
| if df[col].dtype == 'object': |
| try: |
| if pd.to_datetime(df[col], errors='coerce').notna().all(): |
| df[col] = pd.to_datetime(df[col]) |
| except: |
| pass |
| |
| |
| analysis = analyze_data(df) |
| |
| |
| visualizations = generate_visualizations(df) |
| |
| |
| cleaning_recommendations = get_ai_cleaning_recommendations(df) |
| |
| |
| analysis_insights = get_hf_model_insights(df) |
| |
| return analysis, visualizations, cleaning_recommendations, analysis_insights |
|
|
| def display_analysis(analysis): |
| """Format the analysis results for display.""" |
| if analysis is None: |
| return "No analysis available." |
| |
| if isinstance(analysis, str): |
| return analysis |
| |
| |
| html = "<h2>Data Analysis</h2>" |
| |
| |
| html += f"<p><strong>Shape:</strong> {analysis['Shape'][0]} rows, {analysis['Shape'][1]} columns</p>" |
| html += f"<p><strong>Columns:</strong> {', '.join(analysis['Columns'])}</p>" |
| |
| |
| html += "<h3>Missing Values</h3>" |
| if isinstance(analysis['Missing Values'], str): |
| html += f"<p>{analysis['Missing Values']}</p>" |
| else: |
| html += "<ul>" |
| for col, count in analysis['Missing Values'].items(): |
| html += f"<li>{col}: {count}</li>" |
| html += "</ul>" |
| |
| |
| if 'Data Quality Issues' in analysis and analysis['Data Quality Issues']: |
| html += "<h3>Data Quality Issues</h3>" |
| for issue_type, issue_details in analysis['Data Quality Issues'].items(): |
| html += f"<h4>{issue_type}</h4>" |
| if isinstance(issue_details, dict): |
| html += "<ul>" |
| for key, value in issue_details.items(): |
| html += f"<li>{key}: {value}</li>" |
| html += "</ul>" |
| else: |
| html += f"<p>{issue_details}</p>" |
| |
| |
| if 'Outliers' in analysis and analysis['Outliers']: |
| html += "<h3>Outliers Detected</h3>" |
| html += "<ul>" |
| for col, details in analysis['Outliers'].items(): |
| html += f"<li><strong>{col}:</strong> {details['count']} outliers ({details['percentage']})<br>" |
| html += f"Values outside range: [{details['lower_bound']:.2f}, {details['upper_bound']:.2f}]</li>" |
| html += "</ul>" |
| |
| |
| if 'Statistics' in analysis: |
| html += "<h3>Numeric Statistics</h3>" |
| html += analysis['Statistics'] |
| |
| |
| if 'Category Counts' in analysis: |
| html += "<h3>Categorical Data (Top Values)</h3>" |
| for col, counts in analysis['Category Counts'].items(): |
| html += f"<h4>{col}</h4><ul>" |
| for val, count in counts.items(): |
| html += f"<li>{val}: {count}</li>" |
| html += "</ul>" |
| |
| return html |
|
|
| def apply_data_cleaning(df, cleaning_options): |
| """Apply selected data cleaning operations to the DataFrame.""" |
| cleaned_df = df.copy() |
| cleaning_log = [] |
| |
| |
| if cleaning_options.get("handle_missing"): |
| method = cleaning_options.get("missing_method", "drop") |
| for col in cleaned_df.columns: |
| missing_count_before = cleaned_df[col].isnull().sum() |
| if missing_count_before > 0: |
| if method == "drop": |
| |
| cleaned_df = cleaned_df.dropna(subset=[col]) |
| cleaning_log.append(f"Dropped {missing_count_before} rows with missing values in column '{col}'") |
| elif method == "mean" and cleaned_df[col].dtype in [np.float64, np.int64]: |
| |
| mean_val = cleaned_df[col].mean() |
| cleaned_df[col] = cleaned_df[col].fillna(mean_val) |
| cleaning_log.append(f"Filled {missing_count_before} missing values in column '{col}' with mean ({mean_val:.2f})") |
| elif method == "median" and cleaned_df[col].dtype in [np.float64, np.int64]: |
| |
| median_val = cleaned_df[col].median() |
| cleaned_df[col] = cleaned_df[col].fillna(median_val) |
| cleaning_log.append(f"Filled {missing_count_before} missing values in column '{col}' with median ({median_val:.2f})") |
| elif method == "mode": |
| |
| mode_val = cleaned_df[col].mode()[0] |
| cleaned_df[col] = cleaned_df[col].fillna(mode_val) |
| cleaning_log.append(f"Filled {missing_count_before} missing values in column '{col}' with mode ({mode_val})") |
| elif method == "zero" and cleaned_df[col].dtype in [np.float64, np.int64]: |
| |
| cleaned_df[col] = cleaned_df[col].fillna(0) |
| cleaning_log.append(f"Filled {missing_count_before} missing values in column '{col}' with 0") |
| |
| |
| if cleaning_options.get("remove_duplicates"): |
| dupe_count_before = cleaned_df.duplicated().sum() |
| if dupe_count_before > 0: |
| cleaned_df = cleaned_df.drop_duplicates() |
| cleaning_log.append(f"Removed {dupe_count_before} duplicate rows") |
| |
| |
| if cleaning_options.get("handle_outliers"): |
| method = cleaning_options.get("outlier_method", "remove") |
| numeric_cols = cleaned_df.select_dtypes(include=[np.number]).columns |
| |
| for col in numeric_cols: |
| |
| Q1 = cleaned_df[col].quantile(0.25) |
| Q3 = cleaned_df[col].quantile(0.75) |
| IQR = Q3 - Q1 |
| |
| |
| lower_bound = Q1 - 1.5 * IQR |
| upper_bound = Q3 + 1.5 * IQR |
| |
| |
| outliers = ((cleaned_df[col] < lower_bound) | (cleaned_df[col] > upper_bound)) |
| outlier_count = outliers.sum() |
| |
| if outlier_count > 0: |
| if method == "remove": |
| |
| cleaned_df = cleaned_df[~outliers] |
| cleaning_log.append(f"Removed {outlier_count} rows with outliers in column '{col}'") |
| elif method == "cap": |
| |
| cleaned_df.loc[cleaned_df[col] < lower_bound, col] = lower_bound |
| cleaned_df.loc[cleaned_df[col] > upper_bound, col] = upper_bound |
| cleaning_log.append(f"Capped {outlier_count} outliers in column '{col}' to range [{lower_bound:.2f}, {upper_bound:.2f}]") |
| |
| |
| if cleaning_options.get("convert_dates"): |
| for col in cleaned_df.columns: |
| if col in cleaning_options.get("date_columns", []): |
| try: |
| cleaned_df[col] = pd.to_datetime(cleaned_df[col]) |
| cleaning_log.append(f"Converted column '{col}' to datetime format") |
| except: |
| cleaning_log.append(f"Failed to convert column '{col}' to datetime format") |
| |
| |
| if cleaning_options.get("normalize_columns"): |
| for col in cleaned_df.columns: |
| if col in cleaning_options.get("normalize_columns_list", []) and cleaned_df[col].dtype in [np.float64, np.int64]: |
| |
| min_val = cleaned_df[col].min() |
| max_val = cleaned_df[col].max() |
| if max_val > min_val: |
| cleaned_df[col] = (cleaned_df[col] - min_val) / (max_val - min_val) |
| cleaning_log.append(f"Normalized column '{col}' to range [0, 1]") |
| |
| return cleaned_df, cleaning_log |
|
|
| def apply_cleaning_ui(file, handle_missing, missing_method, remove_duplicates, |
| handle_outliers, outlier_method, convert_dates, date_columns, |
| normalize_numeric): |
| """UI function for data cleaning workflow.""" |
| if file is None: |
| return "Please upload a file before attempting to clean data.", None |
| |
| |
| df = read_file(file) |
| |
| if isinstance(df, str): |
| return df, None |
| |
| |
| cleaning_options = { |
| "handle_missing": handle_missing, |
| "missing_method": missing_method, |
| "remove_duplicates": remove_duplicates, |
| "handle_outliers": handle_outliers, |
| "outlier_method": outlier_method, |
| "convert_dates": convert_dates, |
| "date_columns": date_columns.split(",") if date_columns else [], |
| "normalize_columns": normalize_numeric, |
| "normalize_columns_list": df.select_dtypes(include=[np.number]).columns.tolist() if normalize_numeric else [] |
| } |
| |
| |
| cleaned_df, cleaning_log = apply_data_cleaning(df, cleaning_options) |
| |
| |
| result_summary = f""" |
| <h2>Data Cleaning Results</h2> |
| <p>Original data: {df.shape[0]} rows, {df.shape[1]} columns</p> |
| <p>Cleaned data: {cleaned_df.shape[0]} rows, {cleaned_df.shape[1]} columns</p> |
| |
| <h3>Cleaning Operations Applied:</h3> |
| <ul> |
| """ |
| |
| for log_item in cleaning_log: |
| result_summary += f"<li>{log_item}</li>" |
| |
| result_summary += "</ul>" |
| |
| |
| buffer = io.BytesIO() |
| cleaned_df.to_csv(buffer, index=False) |
| buffer.seek(0) |
| |
| return result_summary, buffer |
|
|
| def app_ui(file): |
| """Main function for the Gradio interface.""" |
| if file is None: |
| return "Please upload a file to begin analysis.", None, None, None |
| |
| print(f"Processing file in app_ui: {file.name if hasattr(file, 'name') else 'unknown'}") |
| |
| |
| analysis, visualizations, cleaning_recommendations, analysis_insights = process_file(file) |
| |
| if isinstance(analysis, str): |
| print(f"Error in analysis: {analysis}") |
| return analysis, None, None, None |
| |
| |
| analysis_html = display_analysis(analysis) |
| |
| |
| viz_html = "" |
| if visualizations and not isinstance(visualizations, str): |
| print(f"Processing {len(visualizations)} visualizations for display") |
| for viz_name, fig in visualizations.items(): |
| try: |
| |
| print(f"Visualization {viz_name}: type={type(fig)}") |
| |
| |
| html_content = fig.to_html(full_html=False, include_plotlyjs="cdn") |
| print(f"Generated HTML for {viz_name}, length: {len(html_content)}") |
| |
| viz_html += f'<div style="margin-bottom: 30px;">{html_content}</div>' |
| print(f"Added visualization: {viz_name}") |
| except Exception as e: |
| print(f"Error rendering visualization {viz_name}: {e}") |
| else: |
| print(f"No visualizations to display: {visualizations}") |
| viz_html = "<p>No visualizations could be generated for this dataset.</p>" |
| |
| |
| result_html = f""" |
| <div style="display: flex; flex-direction: column;"> |
| <div>{analysis_html}</div> |
| <h2>Data Visualizations</h2> |
| <div>{viz_html}</div> |
| </div> |
| """ |
| |
| return result_html, visualizations, cleaning_recommendations, analysis_insights |
|
|
| def test_visualization(): |
| """Create a simple test visualization to verify plotly is working.""" |
| import plotly.express as px |
| import numpy as np |
| |
| |
| x = np.random.rand(100) |
| y = np.random.rand(100) |
| |
| |
| fig = px.scatter(x=x, y=y, title="Test Plot") |
| |
| |
| html = fig.to_html(full_html=False, include_plotlyjs="cdn") |
| |
| return html |
|
|
| |
| with gr.Blocks(title="Data Visualization & Cleaning AI") as demo: |
| gr.Markdown("# Data Visualization & Cleaning AI") |
| gr.Markdown("Upload your data file (CSV, Excel, JSON, or TXT) and get automatic analysis, visualizations, and AI-powered insights.") |
| |
| with gr.Tabs() as tabs: |
| with gr.TabItem("Data Analysis"): |
| with gr.Row(): |
| file_input = gr.File(label="Upload Data File") |
| analyze_button = gr.Button("Analyze Data") |
| |
| |
| test_viz_html = test_visualization() |
| gr.HTML(f"<details><summary>Plotly Test (Click to expand)</summary>{test_viz_html}</details>", visible=True) |
| |
| with gr.Tabs(): |
| with gr.TabItem("Analysis & Visualizations"): |
| output = gr.HTML(label="Results") |
| with gr.TabItem("AI Cleaning Recommendations"): |
| cleaning_recommendations_output = gr.Markdown(label="AI Recommendations") |
| with gr.TabItem("AI Analysis Insights"): |
| analysis_insights_output = gr.Markdown(label="Analysis Insights") |
| with gr.TabItem("Raw Visualization Objects"): |
| viz_output = gr.JSON(label="Visualization Objects") |
| |
| with gr.TabItem("Data Cleaning"): |
| with gr.Row(): |
| with gr.Column(scale=1): |
| gr.Markdown("### Cleaning Options") |
| handle_missing = gr.Checkbox(label="Handle Missing Values", value=True) |
| missing_method = gr.Radio( |
| label="Missing Values Method", |
| choices=["drop", "mean", "median", "mode", "zero"], |
| value="mean" |
| ) |
| remove_duplicates = gr.Checkbox(label="Remove Duplicate Rows", value=True) |
| handle_outliers = gr.Checkbox(label="Handle Outliers", value=False) |
| outlier_method = gr.Radio( |
| label="Outlier Method", |
| choices=["remove", "cap"], |
| value="cap" |
| ) |
| convert_dates = gr.Checkbox(label="Convert Date Columns", value=False) |
| date_columns = gr.Textbox( |
| label="Date Columns (comma-separated)", |
| placeholder="e.g., date,created_at,timestamp" |
| ) |
| normalize_numeric = gr.Checkbox(label="Normalize Numeric Columns", value=False) |
| |
| with gr.Column(scale=2): |
| clean_button = gr.Button("Clean Data") |
| cleaning_output = gr.HTML(label="Cleaning Results") |
| cleaned_file_output = gr.File(label="Download Cleaned Data") |
| |
| with gr.TabItem("Settings"): |
| gr.Markdown("### API Key Configuration") |
| gr.Markdown("Enter your API keys to enable AI-powered features.") |
| |
| with gr.Group(): |
| gr.Markdown("#### OpenAI API Key") |
| gr.Markdown("Required for advanced data cleaning recommendations.") |
| openai_key_input = gr.Textbox( |
| label="OpenAI API Key", |
| placeholder="sk-...", |
| type="password" |
| ) |
| openai_key_button = gr.Button("Save OpenAI API Key") |
| openai_key_status = gr.Markdown("Status: Not configured") |
| |
| with gr.Group(): |
| gr.Markdown("#### Hugging Face API Token") |
| gr.Markdown("Required for AI-powered data analysis insights.") |
| hf_token_input = gr.Textbox( |
| label="Hugging Face API Token", |
| placeholder="hf_...", |
| type="password" |
| ) |
| hf_token_button = gr.Button("Save Hugging Face Token") |
| hf_token_status = gr.Markdown("Status: Not configured") |
| |
| |
| analyze_button.click( |
| fn=app_ui, |
| inputs=[file_input], |
| outputs=[output, viz_output, cleaning_recommendations_output, analysis_insights_output] |
| ) |
| |
| clean_button.click( |
| fn=apply_cleaning_ui, |
| inputs=[ |
| file_input, handle_missing, missing_method, remove_duplicates, |
| handle_outliers, outlier_method, convert_dates, date_columns, |
| normalize_numeric |
| ], |
| outputs=[cleaning_output, cleaned_file_output] |
| ) |
| |
| openai_key_button.click( |
| fn=set_openai_key, |
| inputs=[openai_key_input], |
| outputs=[openai_key_status] |
| ) |
| |
| hf_token_button.click( |
| fn=set_hf_token, |
| inputs=[hf_token_input], |
| outputs=[hf_token_status] |
| ) |
|
|
| |
| try: |
| data_assistant = initialize_ai_models() |
| except Exception as e: |
| print(f"Error initializing AI models: {e}") |
| data_assistant = None |
|
|
| |
| if __name__ == "__main__": |
| demo.launch() |