import gradio as gr import pandas as pd import numpy as np import matplotlib.pyplot as plt import seaborn as sns from sklearn.model_selection import train_test_split from sklearn.preprocessing import StandardScaler, OneHotEncoder, LabelEncoder from sklearn.compose import ColumnTransformer from sklearn.pipeline import Pipeline from sklearn.impute import SimpleImputer from sklearn.ensemble import RandomForestClassifier, RandomForestRegressor from sklearn.linear_model import LogisticRegression, LinearRegression from sklearn.metrics import accuracy_score, classification_report, mean_squared_error, r2_score import xgboost as xgb from catboost import CatBoostClassifier, CatBoostRegressor import lightgbm as lgb import io import base64 from PIL import Image import os import pickle import warnings warnings.filterwarnings('ignore') def infer_problem_type(df, target_col): """Determine if it's a classification or regression problem""" unique_values = df[target_col].nunique() # If the target column has less than 10 unique values and is an integer type, # it's likely a classification problem if unique_values < 10 or df[target_col].dtype in ['object', 'category', 'bool']: return "Classification" else: return "Regression" def generate_eda_report(df): """Generate EDA report for the dataset""" buffer = io.BytesIO() report = {} # Basic info report['shape'] = df.shape report['dtypes'] = df.dtypes.astype(str).to_dict() report['null_counts'] = df.isnull().sum().to_dict() report['desc_stats'] = df.describe().to_html() # Correlation heatmap plt.figure(figsize=(10, 8)) numeric_df = df.select_dtypes(include=['number']) if not numeric_df.empty: sns.heatmap(numeric_df.corr(), annot=True, cmap='coolwarm', linewidths=0.5) plt.title('Correlation Matrix') plt.tight_layout() plt.savefig(buffer, format='png') plt.close() buffer.seek(0) report['corr_heatmap'] = base64.b64encode(buffer.getvalue()).decode('utf-8') buffer.close() # Summary of categorical columns categorical_cols = df.select_dtypes(include=['object', 'category']).columns.tolist() report['categorical_cols'] = categorical_cols # Summary of numerical columns numerical_cols = df.select_dtypes(include=['number']).columns.tolist() report['numerical_cols'] = numerical_cols return report def clean_and_preprocess(df, problem_type, target_col): """Clean and preprocess the dataset""" # Make a copy of the dataframe processed_df = df.copy() # Handle missing values for col in processed_df.columns: if processed_df[col].dtype in ['int64', 'float64']: processed_df[col].fillna(processed_df[col].median(), inplace=True) else: processed_df[col].fillna(processed_df[col].mode()[0], inplace=True) # Split features and target X = processed_df.drop(columns=[target_col]) y = processed_df[target_col] # Identify categorical and numerical columns categorical_cols = X.select_dtypes(include=['object', 'category']).columns.tolist() numerical_cols = X.select_dtypes(include=['number']).columns.tolist() # Create preprocessor preprocessor = ColumnTransformer( transformers=[ ('num', Pipeline(steps=[ ('imputer', SimpleImputer(strategy='median')), ('scaler', StandardScaler()) ]), numerical_cols), ('cat', Pipeline(steps=[ ('imputer', SimpleImputer(strategy='most_frequent')), ('onehot', OneHotEncoder(handle_unknown='ignore')) ]), categorical_cols) ] ) # Create and fit preprocessor X_processed = preprocessor.fit_transform(X) # Handle target for classification if problem_type == "Classification": le = LabelEncoder() y = le.fit_transform(y) # Split data X_train, X_test, y_train, y_test = train_test_split(X_processed, y, test_size=0.2, random_state=42) preprocessing_info = { 'preprocessor': preprocessor, 'X_train': X_train, 'X_test': X_test, 'y_train': y_train, 'y_test': y_test, 'categorical_cols': categorical_cols, 'numerical_cols': numerical_cols, 'target_encoder': le if problem_type == "Classification" else None } return preprocessing_info def train_and_evaluate_models(preprocessing_info, problem_type): """Train and evaluate models based on problem type""" X_train = preprocessing_info['X_train'] X_test = preprocessing_info['X_test'] y_train = preprocessing_info['y_train'] y_test = preprocessing_info['y_test'] results = {} models = {} if problem_type == "Classification": # Classification models models_to_train = { 'RandomForest': RandomForestClassifier(n_estimators=100, random_state=42), 'LogisticRegression': LogisticRegression(max_iter=1000, random_state=42), 'XGBoost': xgb.XGBClassifier(random_state=42), 'CatBoost': CatBoostClassifier(verbose=0, random_state=42), 'LightGBM': lgb.LGBMClassifier(random_state=42) } for name, model in models_to_train.items(): model.fit(X_train, y_train) y_pred = model.predict(X_test) accuracy = accuracy_score(y_test, y_pred) report = classification_report(y_test, y_pred, output_dict=True) results[name] = { 'accuracy': accuracy, 'report': report } models[name] = model else: # Regression models models_to_train = { 'RandomForest': RandomForestRegressor(n_estimators=100, random_state=42), 'LinearRegression': LinearRegression(), 'XGBoost': xgb.XGBRegressor(random_state=42), 'CatBoost': CatBoostRegressor(verbose=0, random_state=42), 'LightGBM': lgb.LGBMRegressor(random_state=42) } for name, model in models_to_train.items(): model.fit(X_train, y_train) y_pred = model.predict(X_test) mse = mean_squared_error(y_test, y_pred) r2 = r2_score(y_test, y_pred) results[name] = { 'mse': mse, 'r2': r2 } models[name] = model # Find best model if problem_type == "Classification": best_model_name = max(results, key=lambda x: results[x]['accuracy']) best_score = results[best_model_name]['accuracy'] metric_name = 'accuracy' else: best_model_name = max(results, key=lambda x: results[x]['r2']) best_score = results[best_model_name]['r2'] metric_name = 'R²' return { 'results': results, 'best_model_name': best_model_name, 'best_score': best_score, 'metric_name': metric_name, 'models': models, 'best_model': models[best_model_name] } def save_model(model, preprocessor, target_encoder=None): """Save model and preprocessor to files""" os.makedirs('models', exist_ok=True) # Save model with open('models/model.pkl', 'wb') as f: pickle.dump(model, f) # Save preprocessor with open('models/preprocessor.pkl', 'wb') as f: pickle.dump(preprocessor, f) # Save target encoder if it exists if target_encoder is not None: with open('models/target_encoder.pkl', 'wb') as f: pickle.dump(target_encoder, f) return 'models/model.pkl' def process_dataset(df, target_col): """Process the entire dataset pipeline""" # Determine problem type problem_type = infer_problem_type(df, target_col) # Generate EDA report eda_report = generate_eda_report(df) # Preprocess data preprocessing_info = clean_and_preprocess(df, problem_type, target_col) # Train and evaluate models model_results = train_and_evaluate_models(preprocessing_info, problem_type) # Save best model model_path = save_model( model_results['best_model'], preprocessing_info['preprocessor'], preprocessing_info.get('target_encoder') ) return { 'problem_type': problem_type, 'eda_report': eda_report, 'preprocessing_info': preprocessing_info, 'model_results': model_results, 'model_path': model_path } def format_results_html(results_data): """Format results as HTML for display""" problem_type = results_data['problem_type'] eda_report = results_data['eda_report'] model_results = results_data['model_results'] html = f"""
Shape: {eda_report['shape'][0]} rows, {eda_report['shape'][1]} columns
Numerical Columns: {', '.join(eda_report['numerical_cols'])}
Categorical Columns: {', '.join(eda_report['categorical_cols'])}
Best Model: {model_results['best_model_name']}
Best {model_results['metric_name']}: {model_results['best_score']:.4f}
| Model | """ if problem_type == "Classification": html += "Accuracy | |
|---|---|---|
| {model} | {result['accuracy']:.4f} | MSE | R² | " for model, result in model_results['results'].items(): html += f"""
| {model} | {result['mse']:.4f} | {result['r2']:.4f} |
| Class | Precision | Recall | F1-Score | Support |
|---|---|---|---|---|
| {class_name} | {metrics['precision']:.4f} | {metrics['recall']:.4f} | {metrics['f1-score']:.4f} | {metrics['support']} |
Your model has been saved and is ready for download.
""" return html def process_file(file, target_col): """Process uploaded CSV file""" if file is None: return "Please upload a CSV file." # Read the CSV file try: df = pd.read_csv(file.name) except Exception as e: return f"Error reading the CSV file: {str(e)}" # Validate target column if target_col not in df.columns: return f"Target column '{target_col}' not found in the dataset. Available columns: {', '.join(df.columns)}" # Process the dataset try: results = process_dataset(df, target_col) return format_results_html(results) except Exception as e: return f"Error processing the dataset: {str(e)}" # Define Gradio interface with gr.Blocks(title="AutoML for Structured Data") as demo: gr.Markdown("# AutoML for Structured Data") gr.Markdown(""" Upload a CSV file, specify the target column, and let AutoML do the rest! This app will: 1. Perform exploratory data analysis (EDA) 2. Determine if it's a regression or classification problem 3. Handle preprocessing (cleaning, encoding, etc.) 4. Train multiple models and select the best one 5. Display the results and allow you to download the model """) with gr.Row(): with gr.Column(): file_input = gr.File(label="Upload CSV File") target_col = gr.Textbox(label="Target Column Name") submit_btn = gr.Button("Process Dataset") with gr.Column(): output = gr.HTML(label="Results") submit_btn.click(fn=process_file, inputs=[file_input, target_col], outputs=output) # Launch the app demo.launch()