automl / app.py
shvy's picture
app.py
994aeff verified
import gradio as gr
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, OneHotEncoder, LabelEncoder
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.impute import SimpleImputer
from sklearn.ensemble import RandomForestClassifier, RandomForestRegressor
from sklearn.linear_model import LogisticRegression, LinearRegression
from sklearn.metrics import accuracy_score, classification_report, mean_squared_error, r2_score
import xgboost as xgb
from catboost import CatBoostClassifier, CatBoostRegressor
import lightgbm as lgb
import io
import base64
from PIL import Image
import os
import pickle
import warnings
warnings.filterwarnings('ignore')
def infer_problem_type(df, target_col):
"""Determine if it's a classification or regression problem"""
unique_values = df[target_col].nunique()
# If the target column has less than 10 unique values and is an integer type,
# it's likely a classification problem
if unique_values < 10 or df[target_col].dtype in ['object', 'category', 'bool']:
return "Classification"
else:
return "Regression"
def generate_eda_report(df):
"""Generate EDA report for the dataset"""
buffer = io.BytesIO()
report = {}
# Basic info
report['shape'] = df.shape
report['dtypes'] = df.dtypes.astype(str).to_dict()
report['null_counts'] = df.isnull().sum().to_dict()
report['desc_stats'] = df.describe().to_html()
# Correlation heatmap
plt.figure(figsize=(10, 8))
numeric_df = df.select_dtypes(include=['number'])
if not numeric_df.empty:
sns.heatmap(numeric_df.corr(), annot=True, cmap='coolwarm', linewidths=0.5)
plt.title('Correlation Matrix')
plt.tight_layout()
plt.savefig(buffer, format='png')
plt.close()
buffer.seek(0)
report['corr_heatmap'] = base64.b64encode(buffer.getvalue()).decode('utf-8')
buffer.close()
# Summary of categorical columns
categorical_cols = df.select_dtypes(include=['object', 'category']).columns.tolist()
report['categorical_cols'] = categorical_cols
# Summary of numerical columns
numerical_cols = df.select_dtypes(include=['number']).columns.tolist()
report['numerical_cols'] = numerical_cols
return report
def clean_and_preprocess(df, problem_type, target_col):
"""Clean and preprocess the dataset"""
# Make a copy of the dataframe
processed_df = df.copy()
# Handle missing values
for col in processed_df.columns:
if processed_df[col].dtype in ['int64', 'float64']:
processed_df[col].fillna(processed_df[col].median(), inplace=True)
else:
processed_df[col].fillna(processed_df[col].mode()[0], inplace=True)
# Split features and target
X = processed_df.drop(columns=[target_col])
y = processed_df[target_col]
# Identify categorical and numerical columns
categorical_cols = X.select_dtypes(include=['object', 'category']).columns.tolist()
numerical_cols = X.select_dtypes(include=['number']).columns.tolist()
# Create preprocessor
preprocessor = ColumnTransformer(
transformers=[
('num', Pipeline(steps=[
('imputer', SimpleImputer(strategy='median')),
('scaler', StandardScaler())
]), numerical_cols),
('cat', Pipeline(steps=[
('imputer', SimpleImputer(strategy='most_frequent')),
('onehot', OneHotEncoder(handle_unknown='ignore'))
]), categorical_cols)
]
)
# Create and fit preprocessor
X_processed = preprocessor.fit_transform(X)
# Handle target for classification
if problem_type == "Classification":
le = LabelEncoder()
y = le.fit_transform(y)
# Split data
X_train, X_test, y_train, y_test = train_test_split(X_processed, y, test_size=0.2, random_state=42)
preprocessing_info = {
'preprocessor': preprocessor,
'X_train': X_train,
'X_test': X_test,
'y_train': y_train,
'y_test': y_test,
'categorical_cols': categorical_cols,
'numerical_cols': numerical_cols,
'target_encoder': le if problem_type == "Classification" else None
}
return preprocessing_info
def train_and_evaluate_models(preprocessing_info, problem_type):
"""Train and evaluate models based on problem type"""
X_train = preprocessing_info['X_train']
X_test = preprocessing_info['X_test']
y_train = preprocessing_info['y_train']
y_test = preprocessing_info['y_test']
results = {}
models = {}
if problem_type == "Classification":
# Classification models
models_to_train = {
'RandomForest': RandomForestClassifier(n_estimators=100, random_state=42),
'LogisticRegression': LogisticRegression(max_iter=1000, random_state=42),
'XGBoost': xgb.XGBClassifier(random_state=42),
'CatBoost': CatBoostClassifier(verbose=0, random_state=42),
'LightGBM': lgb.LGBMClassifier(random_state=42)
}
for name, model in models_to_train.items():
model.fit(X_train, y_train)
y_pred = model.predict(X_test)
accuracy = accuracy_score(y_test, y_pred)
report = classification_report(y_test, y_pred, output_dict=True)
results[name] = {
'accuracy': accuracy,
'report': report
}
models[name] = model
else:
# Regression models
models_to_train = {
'RandomForest': RandomForestRegressor(n_estimators=100, random_state=42),
'LinearRegression': LinearRegression(),
'XGBoost': xgb.XGBRegressor(random_state=42),
'CatBoost': CatBoostRegressor(verbose=0, random_state=42),
'LightGBM': lgb.LGBMRegressor(random_state=42)
}
for name, model in models_to_train.items():
model.fit(X_train, y_train)
y_pred = model.predict(X_test)
mse = mean_squared_error(y_test, y_pred)
r2 = r2_score(y_test, y_pred)
results[name] = {
'mse': mse,
'r2': r2
}
models[name] = model
# Find best model
if problem_type == "Classification":
best_model_name = max(results, key=lambda x: results[x]['accuracy'])
best_score = results[best_model_name]['accuracy']
metric_name = 'accuracy'
else:
best_model_name = max(results, key=lambda x: results[x]['r2'])
best_score = results[best_model_name]['r2']
metric_name = 'R²'
return {
'results': results,
'best_model_name': best_model_name,
'best_score': best_score,
'metric_name': metric_name,
'models': models,
'best_model': models[best_model_name]
}
def save_model(model, preprocessor, target_encoder=None):
"""Save model and preprocessor to files"""
os.makedirs('models', exist_ok=True)
# Save model
with open('models/model.pkl', 'wb') as f:
pickle.dump(model, f)
# Save preprocessor
with open('models/preprocessor.pkl', 'wb') as f:
pickle.dump(preprocessor, f)
# Save target encoder if it exists
if target_encoder is not None:
with open('models/target_encoder.pkl', 'wb') as f:
pickle.dump(target_encoder, f)
return 'models/model.pkl'
def process_dataset(df, target_col):
"""Process the entire dataset pipeline"""
# Determine problem type
problem_type = infer_problem_type(df, target_col)
# Generate EDA report
eda_report = generate_eda_report(df)
# Preprocess data
preprocessing_info = clean_and_preprocess(df, problem_type, target_col)
# Train and evaluate models
model_results = train_and_evaluate_models(preprocessing_info, problem_type)
# Save best model
model_path = save_model(
model_results['best_model'],
preprocessing_info['preprocessor'],
preprocessing_info.get('target_encoder')
)
return {
'problem_type': problem_type,
'eda_report': eda_report,
'preprocessing_info': preprocessing_info,
'model_results': model_results,
'model_path': model_path
}
def format_results_html(results_data):
"""Format results as HTML for display"""
problem_type = results_data['problem_type']
eda_report = results_data['eda_report']
model_results = results_data['model_results']
html = f"""
<h2>AutoML Analysis Results</h2>
<h3>Problem Type: {problem_type}</h3>
<h3>Dataset Information</h3>
<p><strong>Shape:</strong> {eda_report['shape'][0]} rows, {eda_report['shape'][1]} columns</p>
<p><strong>Numerical Columns:</strong> {', '.join(eda_report['numerical_cols'])}</p>
<p><strong>Categorical Columns:</strong> {', '.join(eda_report['categorical_cols'])}</p>
<h3>Missing Values</h3>
<ul>
"""
for col, count in eda_report['null_counts'].items():
if count > 0:
html += f"<li>{col}: {count} missing values</li>"
html += "</ul>"
if 'corr_heatmap' in eda_report:
html += f"""
<h3>Correlation Heatmap</h3>
<img src="data:image/png;base64,{eda_report['corr_heatmap']}" alt="Correlation Heatmap" width="600">
"""
html += f"""
<h3>Model Results</h3>
<p><strong>Best Model:</strong> {model_results['best_model_name']}</p>
<p><strong>Best {model_results['metric_name']}:</strong> {model_results['best_score']:.4f}</p>
<h4>All Models Performance</h4>
<table border="1" cellpadding="5">
<tr>
<th>Model</th>
"""
if problem_type == "Classification":
html += "<th>Accuracy</th></tr>"
for model, result in model_results['results'].items():
html += f"""
<tr>
<td>{model}</td>
<td>{result['accuracy']:.4f}</td>
</tr>
"""
else:
html += "<th>MSE</th><th>R²</th></tr>"
for model, result in model_results['results'].items():
html += f"""
<tr>
<td>{model}</td>
<td>{result['mse']:.4f}</td>
<td>{result['r2']:.4f}</td>
</tr>
"""
html += "</table>"
# Add detailed performance metrics for classification
if problem_type == "Classification":
best_model = model_results['best_model_name']
report = model_results['results'][best_model]['report']
html += f"""
<h4>Classification Report for {best_model}</h4>
<table border="1" cellpadding="5">
<tr>
<th>Class</th>
<th>Precision</th>
<th>Recall</th>
<th>F1-Score</th>
<th>Support</th>
</tr>
"""
for class_name, metrics in report.items():
if class_name in ['accuracy', 'macro avg', 'weighted avg']:
continue
html += f"""
<tr>
<td>{class_name}</td>
<td>{metrics['precision']:.4f}</td>
<td>{metrics['recall']:.4f}</td>
<td>{metrics['f1-score']:.4f}</td>
<td>{metrics['support']}</td>
</tr>
"""
html += "</table>"
html += f"""
<h3>Model Download</h3>
<p>Your model has been saved and is ready for download.</p>
"""
return html
def process_file(file, target_col):
"""Process uploaded CSV file"""
if file is None:
return "Please upload a CSV file."
# Read the CSV file
try:
df = pd.read_csv(file.name)
except Exception as e:
return f"Error reading the CSV file: {str(e)}"
# Validate target column
if target_col not in df.columns:
return f"Target column '{target_col}' not found in the dataset. Available columns: {', '.join(df.columns)}"
# Process the dataset
try:
results = process_dataset(df, target_col)
return format_results_html(results)
except Exception as e:
return f"Error processing the dataset: {str(e)}"
# Define Gradio interface
with gr.Blocks(title="AutoML for Structured Data") as demo:
gr.Markdown("# AutoML for Structured Data")
gr.Markdown("""
Upload a CSV file, specify the target column, and let AutoML do the rest! This app will:
1. Perform exploratory data analysis (EDA)
2. Determine if it's a regression or classification problem
3. Handle preprocessing (cleaning, encoding, etc.)
4. Train multiple models and select the best one
5. Display the results and allow you to download the model
""")
with gr.Row():
with gr.Column():
file_input = gr.File(label="Upload CSV File")
target_col = gr.Textbox(label="Target Column Name")
submit_btn = gr.Button("Process Dataset")
with gr.Column():
output = gr.HTML(label="Results")
submit_btn.click(fn=process_file, inputs=[file_input, target_col], outputs=output)
# Launch the app
demo.launch()