|
|
import gradio as gr |
|
|
import pandas as pd |
|
|
import numpy as np |
|
|
import matplotlib.pyplot as plt |
|
|
import seaborn as sns |
|
|
from sklearn.model_selection import train_test_split |
|
|
from sklearn.preprocessing import StandardScaler, OneHotEncoder, LabelEncoder |
|
|
from sklearn.compose import ColumnTransformer |
|
|
from sklearn.pipeline import Pipeline |
|
|
from sklearn.impute import SimpleImputer |
|
|
from sklearn.ensemble import RandomForestClassifier, RandomForestRegressor |
|
|
from sklearn.linear_model import LogisticRegression, LinearRegression |
|
|
from sklearn.metrics import accuracy_score, classification_report, mean_squared_error, r2_score |
|
|
import xgboost as xgb |
|
|
from catboost import CatBoostClassifier, CatBoostRegressor |
|
|
import lightgbm as lgb |
|
|
import io |
|
|
import base64 |
|
|
from PIL import Image |
|
|
import os |
|
|
import pickle |
|
|
import warnings |
|
|
warnings.filterwarnings('ignore') |
|
|
|
|
|
def infer_problem_type(df, target_col): |
|
|
"""Determine if it's a classification or regression problem""" |
|
|
unique_values = df[target_col].nunique() |
|
|
|
|
|
|
|
|
if unique_values < 10 or df[target_col].dtype in ['object', 'category', 'bool']: |
|
|
return "Classification" |
|
|
else: |
|
|
return "Regression" |
|
|
|
|
|
def generate_eda_report(df): |
|
|
"""Generate EDA report for the dataset""" |
|
|
buffer = io.BytesIO() |
|
|
|
|
|
report = {} |
|
|
|
|
|
|
|
|
report['shape'] = df.shape |
|
|
report['dtypes'] = df.dtypes.astype(str).to_dict() |
|
|
report['null_counts'] = df.isnull().sum().to_dict() |
|
|
report['desc_stats'] = df.describe().to_html() |
|
|
|
|
|
|
|
|
plt.figure(figsize=(10, 8)) |
|
|
numeric_df = df.select_dtypes(include=['number']) |
|
|
if not numeric_df.empty: |
|
|
sns.heatmap(numeric_df.corr(), annot=True, cmap='coolwarm', linewidths=0.5) |
|
|
plt.title('Correlation Matrix') |
|
|
plt.tight_layout() |
|
|
plt.savefig(buffer, format='png') |
|
|
plt.close() |
|
|
buffer.seek(0) |
|
|
report['corr_heatmap'] = base64.b64encode(buffer.getvalue()).decode('utf-8') |
|
|
buffer.close() |
|
|
|
|
|
|
|
|
categorical_cols = df.select_dtypes(include=['object', 'category']).columns.tolist() |
|
|
report['categorical_cols'] = categorical_cols |
|
|
|
|
|
|
|
|
numerical_cols = df.select_dtypes(include=['number']).columns.tolist() |
|
|
report['numerical_cols'] = numerical_cols |
|
|
|
|
|
return report |
|
|
|
|
|
def clean_and_preprocess(df, problem_type, target_col): |
|
|
"""Clean and preprocess the dataset""" |
|
|
|
|
|
processed_df = df.copy() |
|
|
|
|
|
|
|
|
for col in processed_df.columns: |
|
|
if processed_df[col].dtype in ['int64', 'float64']: |
|
|
processed_df[col].fillna(processed_df[col].median(), inplace=True) |
|
|
else: |
|
|
processed_df[col].fillna(processed_df[col].mode()[0], inplace=True) |
|
|
|
|
|
|
|
|
X = processed_df.drop(columns=[target_col]) |
|
|
y = processed_df[target_col] |
|
|
|
|
|
|
|
|
categorical_cols = X.select_dtypes(include=['object', 'category']).columns.tolist() |
|
|
numerical_cols = X.select_dtypes(include=['number']).columns.tolist() |
|
|
|
|
|
|
|
|
preprocessor = ColumnTransformer( |
|
|
transformers=[ |
|
|
('num', Pipeline(steps=[ |
|
|
('imputer', SimpleImputer(strategy='median')), |
|
|
('scaler', StandardScaler()) |
|
|
]), numerical_cols), |
|
|
('cat', Pipeline(steps=[ |
|
|
('imputer', SimpleImputer(strategy='most_frequent')), |
|
|
('onehot', OneHotEncoder(handle_unknown='ignore')) |
|
|
]), categorical_cols) |
|
|
] |
|
|
) |
|
|
|
|
|
|
|
|
X_processed = preprocessor.fit_transform(X) |
|
|
|
|
|
|
|
|
if problem_type == "Classification": |
|
|
le = LabelEncoder() |
|
|
y = le.fit_transform(y) |
|
|
|
|
|
|
|
|
X_train, X_test, y_train, y_test = train_test_split(X_processed, y, test_size=0.2, random_state=42) |
|
|
|
|
|
preprocessing_info = { |
|
|
'preprocessor': preprocessor, |
|
|
'X_train': X_train, |
|
|
'X_test': X_test, |
|
|
'y_train': y_train, |
|
|
'y_test': y_test, |
|
|
'categorical_cols': categorical_cols, |
|
|
'numerical_cols': numerical_cols, |
|
|
'target_encoder': le if problem_type == "Classification" else None |
|
|
} |
|
|
|
|
|
return preprocessing_info |
|
|
|
|
|
def train_and_evaluate_models(preprocessing_info, problem_type): |
|
|
"""Train and evaluate models based on problem type""" |
|
|
X_train = preprocessing_info['X_train'] |
|
|
X_test = preprocessing_info['X_test'] |
|
|
y_train = preprocessing_info['y_train'] |
|
|
y_test = preprocessing_info['y_test'] |
|
|
|
|
|
results = {} |
|
|
models = {} |
|
|
|
|
|
if problem_type == "Classification": |
|
|
|
|
|
models_to_train = { |
|
|
'RandomForest': RandomForestClassifier(n_estimators=100, random_state=42), |
|
|
'LogisticRegression': LogisticRegression(max_iter=1000, random_state=42), |
|
|
'XGBoost': xgb.XGBClassifier(random_state=42), |
|
|
'CatBoost': CatBoostClassifier(verbose=0, random_state=42), |
|
|
'LightGBM': lgb.LGBMClassifier(random_state=42) |
|
|
} |
|
|
|
|
|
for name, model in models_to_train.items(): |
|
|
model.fit(X_train, y_train) |
|
|
y_pred = model.predict(X_test) |
|
|
|
|
|
accuracy = accuracy_score(y_test, y_pred) |
|
|
report = classification_report(y_test, y_pred, output_dict=True) |
|
|
|
|
|
results[name] = { |
|
|
'accuracy': accuracy, |
|
|
'report': report |
|
|
} |
|
|
models[name] = model |
|
|
|
|
|
else: |
|
|
|
|
|
models_to_train = { |
|
|
'RandomForest': RandomForestRegressor(n_estimators=100, random_state=42), |
|
|
'LinearRegression': LinearRegression(), |
|
|
'XGBoost': xgb.XGBRegressor(random_state=42), |
|
|
'CatBoost': CatBoostRegressor(verbose=0, random_state=42), |
|
|
'LightGBM': lgb.LGBMRegressor(random_state=42) |
|
|
} |
|
|
|
|
|
for name, model in models_to_train.items(): |
|
|
model.fit(X_train, y_train) |
|
|
y_pred = model.predict(X_test) |
|
|
|
|
|
mse = mean_squared_error(y_test, y_pred) |
|
|
r2 = r2_score(y_test, y_pred) |
|
|
|
|
|
results[name] = { |
|
|
'mse': mse, |
|
|
'r2': r2 |
|
|
} |
|
|
models[name] = model |
|
|
|
|
|
|
|
|
if problem_type == "Classification": |
|
|
best_model_name = max(results, key=lambda x: results[x]['accuracy']) |
|
|
best_score = results[best_model_name]['accuracy'] |
|
|
metric_name = 'accuracy' |
|
|
else: |
|
|
best_model_name = max(results, key=lambda x: results[x]['r2']) |
|
|
best_score = results[best_model_name]['r2'] |
|
|
metric_name = 'R²' |
|
|
|
|
|
return { |
|
|
'results': results, |
|
|
'best_model_name': best_model_name, |
|
|
'best_score': best_score, |
|
|
'metric_name': metric_name, |
|
|
'models': models, |
|
|
'best_model': models[best_model_name] |
|
|
} |
|
|
|
|
|
def save_model(model, preprocessor, target_encoder=None): |
|
|
"""Save model and preprocessor to files""" |
|
|
os.makedirs('models', exist_ok=True) |
|
|
|
|
|
|
|
|
with open('models/model.pkl', 'wb') as f: |
|
|
pickle.dump(model, f) |
|
|
|
|
|
|
|
|
with open('models/preprocessor.pkl', 'wb') as f: |
|
|
pickle.dump(preprocessor, f) |
|
|
|
|
|
|
|
|
if target_encoder is not None: |
|
|
with open('models/target_encoder.pkl', 'wb') as f: |
|
|
pickle.dump(target_encoder, f) |
|
|
|
|
|
return 'models/model.pkl' |
|
|
|
|
|
def process_dataset(df, target_col): |
|
|
"""Process the entire dataset pipeline""" |
|
|
|
|
|
problem_type = infer_problem_type(df, target_col) |
|
|
|
|
|
|
|
|
eda_report = generate_eda_report(df) |
|
|
|
|
|
|
|
|
preprocessing_info = clean_and_preprocess(df, problem_type, target_col) |
|
|
|
|
|
|
|
|
model_results = train_and_evaluate_models(preprocessing_info, problem_type) |
|
|
|
|
|
|
|
|
model_path = save_model( |
|
|
model_results['best_model'], |
|
|
preprocessing_info['preprocessor'], |
|
|
preprocessing_info.get('target_encoder') |
|
|
) |
|
|
|
|
|
return { |
|
|
'problem_type': problem_type, |
|
|
'eda_report': eda_report, |
|
|
'preprocessing_info': preprocessing_info, |
|
|
'model_results': model_results, |
|
|
'model_path': model_path |
|
|
} |
|
|
|
|
|
def format_results_html(results_data): |
|
|
"""Format results as HTML for display""" |
|
|
problem_type = results_data['problem_type'] |
|
|
eda_report = results_data['eda_report'] |
|
|
model_results = results_data['model_results'] |
|
|
|
|
|
html = f""" |
|
|
<h2>AutoML Analysis Results</h2> |
|
|
<h3>Problem Type: {problem_type}</h3> |
|
|
|
|
|
<h3>Dataset Information</h3> |
|
|
<p><strong>Shape:</strong> {eda_report['shape'][0]} rows, {eda_report['shape'][1]} columns</p> |
|
|
<p><strong>Numerical Columns:</strong> {', '.join(eda_report['numerical_cols'])}</p> |
|
|
<p><strong>Categorical Columns:</strong> {', '.join(eda_report['categorical_cols'])}</p> |
|
|
|
|
|
<h3>Missing Values</h3> |
|
|
<ul> |
|
|
""" |
|
|
|
|
|
for col, count in eda_report['null_counts'].items(): |
|
|
if count > 0: |
|
|
html += f"<li>{col}: {count} missing values</li>" |
|
|
|
|
|
html += "</ul>" |
|
|
|
|
|
if 'corr_heatmap' in eda_report: |
|
|
html += f""" |
|
|
<h3>Correlation Heatmap</h3> |
|
|
<img src="data:image/png;base64,{eda_report['corr_heatmap']}" alt="Correlation Heatmap" width="600"> |
|
|
""" |
|
|
|
|
|
html += f""" |
|
|
<h3>Model Results</h3> |
|
|
<p><strong>Best Model:</strong> {model_results['best_model_name']}</p> |
|
|
<p><strong>Best {model_results['metric_name']}:</strong> {model_results['best_score']:.4f}</p> |
|
|
|
|
|
<h4>All Models Performance</h4> |
|
|
<table border="1" cellpadding="5"> |
|
|
<tr> |
|
|
<th>Model</th> |
|
|
""" |
|
|
|
|
|
if problem_type == "Classification": |
|
|
html += "<th>Accuracy</th></tr>" |
|
|
|
|
|
for model, result in model_results['results'].items(): |
|
|
html += f""" |
|
|
<tr> |
|
|
<td>{model}</td> |
|
|
<td>{result['accuracy']:.4f}</td> |
|
|
</tr> |
|
|
""" |
|
|
else: |
|
|
html += "<th>MSE</th><th>R²</th></tr>" |
|
|
|
|
|
for model, result in model_results['results'].items(): |
|
|
html += f""" |
|
|
<tr> |
|
|
<td>{model}</td> |
|
|
<td>{result['mse']:.4f}</td> |
|
|
<td>{result['r2']:.4f}</td> |
|
|
</tr> |
|
|
""" |
|
|
|
|
|
html += "</table>" |
|
|
|
|
|
|
|
|
if problem_type == "Classification": |
|
|
best_model = model_results['best_model_name'] |
|
|
report = model_results['results'][best_model]['report'] |
|
|
|
|
|
html += f""" |
|
|
<h4>Classification Report for {best_model}</h4> |
|
|
<table border="1" cellpadding="5"> |
|
|
<tr> |
|
|
<th>Class</th> |
|
|
<th>Precision</th> |
|
|
<th>Recall</th> |
|
|
<th>F1-Score</th> |
|
|
<th>Support</th> |
|
|
</tr> |
|
|
""" |
|
|
|
|
|
for class_name, metrics in report.items(): |
|
|
if class_name in ['accuracy', 'macro avg', 'weighted avg']: |
|
|
continue |
|
|
|
|
|
html += f""" |
|
|
<tr> |
|
|
<td>{class_name}</td> |
|
|
<td>{metrics['precision']:.4f}</td> |
|
|
<td>{metrics['recall']:.4f}</td> |
|
|
<td>{metrics['f1-score']:.4f}</td> |
|
|
<td>{metrics['support']}</td> |
|
|
</tr> |
|
|
""" |
|
|
|
|
|
html += "</table>" |
|
|
|
|
|
html += f""" |
|
|
<h3>Model Download</h3> |
|
|
<p>Your model has been saved and is ready for download.</p> |
|
|
""" |
|
|
|
|
|
return html |
|
|
|
|
|
def process_file(file, target_col): |
|
|
"""Process uploaded CSV file""" |
|
|
if file is None: |
|
|
return "Please upload a CSV file." |
|
|
|
|
|
|
|
|
try: |
|
|
df = pd.read_csv(file.name) |
|
|
except Exception as e: |
|
|
return f"Error reading the CSV file: {str(e)}" |
|
|
|
|
|
|
|
|
if target_col not in df.columns: |
|
|
return f"Target column '{target_col}' not found in the dataset. Available columns: {', '.join(df.columns)}" |
|
|
|
|
|
|
|
|
try: |
|
|
results = process_dataset(df, target_col) |
|
|
return format_results_html(results) |
|
|
except Exception as e: |
|
|
return f"Error processing the dataset: {str(e)}" |
|
|
|
|
|
|
|
|
with gr.Blocks(title="AutoML for Structured Data") as demo: |
|
|
gr.Markdown("# AutoML for Structured Data") |
|
|
gr.Markdown(""" |
|
|
Upload a CSV file, specify the target column, and let AutoML do the rest! This app will: |
|
|
1. Perform exploratory data analysis (EDA) |
|
|
2. Determine if it's a regression or classification problem |
|
|
3. Handle preprocessing (cleaning, encoding, etc.) |
|
|
4. Train multiple models and select the best one |
|
|
5. Display the results and allow you to download the model |
|
|
""") |
|
|
|
|
|
with gr.Row(): |
|
|
with gr.Column(): |
|
|
file_input = gr.File(label="Upload CSV File") |
|
|
target_col = gr.Textbox(label="Target Column Name") |
|
|
submit_btn = gr.Button("Process Dataset") |
|
|
|
|
|
with gr.Column(): |
|
|
output = gr.HTML(label="Results") |
|
|
|
|
|
submit_btn.click(fn=process_file, inputs=[file_input, target_col], outputs=output) |
|
|
|
|
|
|
|
|
demo.launch() |