MENG21's picture
Enhance README.md with detailed application overview, features, installation instructions, and usage guidelines for synthetic data generation and ML model training.
d6212ac
import streamlit as st
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix
import plotly.express as px
from sklearn.linear_model import LogisticRegression, RidgeClassifier
from sklearn.ensemble import AdaBoostClassifier, ExtraTreesClassifier
from sklearn.svm import SVC, LinearSVC
from sklearn.naive_bayes import GaussianNB, MultinomialNB
from sklearn.neighbors import KNeighborsClassifier
from sklearn.neural_network import MLPClassifier
from sklearn.preprocessing import MaxAbsScaler, StandardScaler, MinMaxScaler
import time
import warnings
import joblib
import os
from datetime import datetime
import seaborn as sns
import matplotlib.pyplot as plt
from matplotlib.colors import LinearSegmentedColormap
from sklearn.model_selection import learning_curve
import pickle
warnings.filterwarnings('ignore')
class DataGenerator:
def __init__(self):
self.features = None
self.feature_configs = None
self.classes = None
self.class_configs = None
def generate_synthetic_data(self, n_samples, feature_configs, classes, class_configs=None):
"""Generate synthetic data based on configurations"""
n_features = len(feature_configs)
n_classes = len(classes)
X = []
y = []
samples_per_class = n_samples // n_classes
for i in range(n_classes):
class_samples = []
class_name = classes[i]
for j, (feature_name, config) in enumerate(feature_configs.items()):
if class_configs and class_name in class_configs:
center = class_configs[class_name]['mean'][j]
std = class_configs[class_name]['std'][j]
else:
if config['type'] == 'random':
center = np.random.randn() * 5
std = config['std']
else:
center = config['center']
std = config['std']
feature_samples = np.round(np.random.normal(
loc=center,
scale=std,
size=samples_per_class
), decimals=2)
class_samples.append(feature_samples)
X.append(np.column_stack(class_samples))
y.extend([classes[i]] * samples_per_class)
X = np.vstack(X)
return X, np.array(y)
class ModelManager:
@staticmethod
def get_classifiers():
"""Return dictionary of classifiers with appropriate preprocessing"""
return {
'LogisticRegression': {
'model': LogisticRegression(max_iter=1000),
'scaler': StandardScaler()
},
'RidgeClassifier': {
'model': RidgeClassifier(),
'scaler': StandardScaler()
},
'RandomForestClassifier': {
'model': RandomForestClassifier(random_state=42),
'scaler': StandardScaler()
},
'AdaBoostClassifier': {
'model': AdaBoostClassifier(),
'scaler': StandardScaler()
},
'ExtraTreesClassifier': {
'model': ExtraTreesClassifier(),
'scaler': StandardScaler()
},
'SVC': {
'model': SVC(),
'scaler': StandardScaler()
},
'LinearSVC': {
'model': LinearSVC(max_iter=2000),
'scaler': StandardScaler()
},
'GaussianNB': {
'model': GaussianNB(),
'scaler': StandardScaler()
},
'KNeighborsClassifier': {
'model': KNeighborsClassifier(),
'scaler': StandardScaler()
},
'MLPClassifier': {
'model': MLPClassifier(max_iter=1000),
'scaler': StandardScaler()
},
'MultinomialNB': {
'model': MultinomialNB(),
'scaler': MaxAbsScaler()
}
}
@staticmethod
def ensure_non_negative(X):
"""Ensure data is non-negative by shifting"""
if isinstance(X, pd.DataFrame):
min_val = X.values.min()
if min_val < 0:
return X + abs(min_val)
return X
else:
min_val = X.min()
if min_val < 0:
return X - min_val
return X
def save_model(self, model_dict, model_name):
"""Save model and its scaler to files"""
if not os.path.exists('models'):
os.makedirs('models')
base_filename = f"{model_name}"
if hasattr(model_dict['model'], 'feature_names_in_'):
model_dict['scaler'].feature_names_in_ = model_dict['model'].feature_names_in_
elif hasattr(st.session_state, 'features'):
model_dict['scaler'].feature_names_in_ = np.array(st.session_state.features)
model_path = os.path.join('models', f"{base_filename}_model.joblib")
scaler_path = os.path.join('models', f"{base_filename}_scaler.joblib")
joblib.dump(model_dict['model'], model_path)
joblib.dump(model_dict['scaler'], scaler_path)
return model_path, scaler_path
def train_and_evaluate_model(self, clf_dict, X_train, X_test, y_train, y_test, model_name):
"""Train and evaluate a single model"""
start_time = time.time()
try:
scaler = clf_dict['scaler']
feature_names = st.session_state.features if hasattr(st.session_state, 'features') else None
if model_name == 'MultinomialNB':
X_train_positive = self.ensure_non_negative(X_train)
X_test_positive = self.ensure_non_negative(X_test)
X_train_scaled = scaler.fit_transform(X_train_positive)
X_test_scaled = scaler.transform(X_test_positive)
if np.any(X_train_scaled < 0) or np.any(X_test_scaled < 0):
raise ValueError("Negative values in scaled data")
else:
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)
if feature_names is not None:
if hasattr(clf_dict['model'], 'feature_names_in_'):
clf_dict['model'].feature_names_in_ = np.array(feature_names)
scaler.feature_names_in_ = np.array(feature_names)
clf_dict['model'].fit(X_train_scaled, y_train)
y_pred = clf_dict['model'].predict(X_test_scaled)
accuracy = accuracy_score(y_test, y_pred)
training_time = time.time() - start_time
model_path, scaler_path = self.save_model(clf_dict, model_name)
conf_matrix = confusion_matrix(y_test, y_pred)
return {
'model_name': model_name,
'accuracy': accuracy,
'training_time': training_time,
'model': clf_dict['model'],
'predictions': y_pred,
'status': 'success',
'scaler': scaler_path,
'model_path': model_path,
'confusion_matrix': conf_matrix
}
except Exception as e:
return {
'model_name': model_name,
'accuracy': 0,
'training_time': 0,
'model': None,
'predictions': None,
'status': f'failed: {str(e)}',
'scaler': None,
'model_path': None,
'confusion_matrix': None
}
class Visualizer:
@staticmethod
def plot_learning_curve(estimator, X, y, title, ax):
"""Plot learning curves for a model"""
train_sizes, train_scores, test_scores = learning_curve(
estimator, X, y,
train_sizes=np.linspace(0.1, 1.0, 10),
cv=5,
n_jobs=-1,
scoring='accuracy'
)
train_mean = np.mean(train_scores, axis=1)
train_std = np.std(train_scores, axis=1)
test_mean = np.mean(test_scores, axis=1)
test_std = np.std(test_scores, axis=1)
ax.plot(train_sizes, train_mean, label='Training score')
ax.plot(train_sizes, test_mean, label='Cross-validation score')
ax.fill_between(train_sizes, train_mean - train_std, train_mean + train_std, alpha=0.1)
ax.fill_between(train_sizes, test_mean - test_std, test_mean + test_std, alpha=0.1)
ax.set_xlabel('Training Examples')
ax.set_ylabel('Score')
ax.set_title(title)
ax.legend(loc='lower right')
ax.grid(True)
def create_confusion_matrices_plot(self, successful_results, y_test):
"""Create and display confusion matrices for successful models"""
n_models = len(successful_results)
n_cols = 2
n_rows = (n_models + n_cols - 1) // n_cols
fig = plt.figure(figsize=(15, 5 * n_rows))
colors = ['white', '#4a90e2']
n_bins = 100
# cmap = LinearSegmentedColormap.from_list("custom_blues", colors, N=n_bins)
for idx, result in enumerate(successful_results):
ax = plt.subplot(n_rows, n_cols, idx + 1)
sns.heatmap(
result['confusion_matrix'],
annot=True,
fmt='d',
# cmap=cmap,
cmap='viridis',
ax=ax,
xticklabels=sorted(set(y_test)),
yticklabels=sorted(set(y_test))
)
ax.set_xlabel('Predicted')
ax.set_ylabel('Actual')
ax.set_title(f"{result['model_name']}\nAccuracy: {result['accuracy']:.4f}")
plt.tight_layout()
return fig
def create_performance_summary_plot(self, successful_df, selected_models):
"""Create performance metrics summary plot"""
metrics_to_compare = ['Accuracy', 'Precision', 'Recall', 'F1-Score']
summary_df = successful_df[successful_df['Model'].isin(selected_models)].melt(
id_vars=['Model'],
value_vars=metrics_to_compare,
var_name='Metric',
value_name='Score'
)
fig_summary = px.bar(
summary_df,
x='Model',
y='Score',
color='Metric',
barmode='group',
title="Model Performance Metrics Comparison",
text='Score'
)
fig_summary.update_layout(
xaxis_tickangle=-45,
showlegend=True,
height=600,
yaxis=dict(
range=[0, 1],
title='Score'
),
legend=dict(
title='Metric',
orientation='h',
yanchor='bottom',
y=1.02,
xanchor='right',
x=1
)
)
fig_summary.update_traces(
texttemplate='%{text:.4f}',
textposition='outside',
textangle=0
)
summary_df['Avg_Score'] = summary_df.groupby('Model')['Score'].transform('mean')
models_order = summary_df.drop_duplicates('Model').sort_values('Avg_Score', ascending=False)['Model']
fig_summary.update_layout(xaxis={'categoryorder': 'array', 'categoryarray': models_order})
return fig_summary
class StreamlitUI:
def __init__(self):
self.data_generator = DataGenerator()
self.model_manager = ModelManager()
self.visualizer = Visualizer()
# Add default configurations as class attribute
self.default_configs = {
# Features: [length (mm), width (mm), density (g/cm³), pH]
# AMPALAYA: Medium length (150-180mm), thin width (40-50mm)
# Medium density (95 g/cm³) due to hollow interior, slightly basic pH (6.8-7.0)
"Ampalaya": {'mean': [165, 45, 95, 6.9], 'std': [15, 5, 10, 0.1]},
# BANANA: Long length (180-220mm), medium width (30-40mm)
# Low density (85 g/cm³), acidic pH (4.5-5.2)
"Banana": {'mean': [200, 35, 85, 4.8], 'std': [20, 5, 8, 0.3]},
# CABBAGE: Round shape - similar length/width (150-200mm x 150-200mm)
# Very low density (65 g/cm³) due to layered leaves, neutral pH (6.5-7.0)
"Cabbage": {'mean': [175, 175, 65, 6.8], 'std': [25, 25, 5, 0.2]},
# CARROT: Medium length (140-180mm), narrow width (25-35mm)
# High density (115 g/cm³) due to dense flesh, slightly acidic pH (6.0-6.5)
"Carrot": {'mean': [160, 30, 115, 6.3], 'std': [20, 5, 10, 0.2]},
# CASSAVA: Long length (200-300mm), thick width (50-80mm)
# High density (125 g/cm³) due to starchy flesh, slightly acidic pH (6.0-6.5)
"Cassava": {'mean': [250, 65, 125, 6.2], 'std': [50, 15, 12, 0.2]}
}
# Default feature names that match the measurements in default_configs
self.default_features = [
'length (mm)',
'width (mm)',
'density (g/cm³)',
'pH'
]
# Add new session state variables for static visualizations
self.initialize_static_visualizations()
# Add new session state variable for data source
if 'data_source' not in st.session_state:
st.session_state.data_source = 'synthetic'
def initialize_static_visualizations(self):
"""Initialize session state variables for static visualizations"""
if 'confusion_matrices_fig' not in st.session_state:
st.session_state.confusion_matrices_fig = None
if 'learning_curves_fig' not in st.session_state:
st.session_state.learning_curves_fig = None
def initialize_session_state(self):
"""Initialize all session state variables"""
session_vars = {
'data_generated': False,
'df': None,
'features': None,
'feature_configs': None,
'X_train': None,
'X_test': None,
'y_train': None,
'y_test': None,
'y_pred': None,
'model_results': None,
'best_model': None,
'accuracy': None,
'feature_importance': None,
'split_info': None
}
for var, value in session_vars.items():
if var not in st.session_state:
st.session_state[var] = value
def setup_page_config(self):
"""Configure the Streamlit page"""
st.set_page_config(
page_title="ML Model Generator & Implementation",
page_icon="🤖",
layout="wide",
menu_items={
'About': """
## Final project in Modeling and Simulation \n
### Juan Dela Cruz - BSCS 4A"""
}
)
def get_sidebar_inputs(self):
"""Get all inputs from the sidebar"""
st.sidebar.header("Data Generation Parameters")
# Feature configuration
st.sidebar.subheader("Feature Configuration")
# Initialize default features if not in session state
if 'features_input' not in st.session_state:
st.session_state.features_input = ", ".join(self.default_features)
features_input = st.sidebar.text_input(
"Enter feature names (comma-separated)",
key='features_input'
)
features = [f.strip() for f in features_input.split(",")]
# Initialize default classes if not in session state
if 'classes_input' not in st.session_state:
st.session_state.classes_input = ", ".join(self.default_configs.keys())
classes_input = st.sidebar.text_input(
"Enter class names (comma-separated)",
key='classes_input'
)
classes = [c.strip() for c in classes_input.split(",")]
# Generate feature configs
feature_configs = {}
for feature in features:
feature_configs[feature] = {
'type': 'random',
'std': 20.0,
'center': None
}
return features, feature_configs, classes
def get_class_configs(self, classes, features):
"""Get class-specific configurations from the sidebar"""
class_configs = {}
st.sidebar.subheader("Class-Specific Settings")
for class_name in classes:
with st.sidebar.expander(f"{class_name} Settings", expanded=False):
checkbox_key = f"use_specific_{class_name}"
# Initialize checkbox state if not in session state
if checkbox_key not in st.session_state:
st.session_state[checkbox_key] = True
use_specific = st.checkbox(
f"Set specific values for {class_name}",
key=checkbox_key
)
means = []
stds = []
# Generate unique means for each class if not in default configs
if class_name not in self.default_configs:
# Generate random means between 0-100 that are different from other classes
random_means = []
for _ in range(len(features)):
mean = np.random.uniform(0, 100)
# Ensure means are unique across classes
while any(abs(mean - c['mean'][_]) < 10 for c in class_configs.values() if 'mean' in c):
mean = np.random.uniform(0, 100)
random_means.append(mean)
default_values = {'mean': random_means, 'std': [20.0] * len(features)}
else:
# Ensure default values match the number of features
default_means = self.default_configs[class_name]['mean']
default_stds = self.default_configs[class_name]['std']
# If we have more features than default values, extend with random values
if len(features) > len(default_means):
additional_means = [np.random.uniform(0, 100) for _ in range(len(features) - len(default_means))]
additional_stds = [20.0 for _ in range(len(features) - len(default_stds))]
default_means.extend(additional_means)
default_stds.extend(additional_stds)
# If we have fewer features than default values, truncate
elif len(features) < len(default_means):
default_means = default_means[:len(features)]
default_stds = default_stds[:len(features)]
default_values = {'mean': default_means, 'std': default_stds}
if use_specific:
for idx, feature in enumerate(features):
mean_key = f"mean_{class_name}_{feature}"
std_key = f"std_{class_name}_{feature}"
if mean_key not in st.session_state:
st.session_state[mean_key] = float(default_values['mean'][idx])
if std_key not in st.session_state:
st.session_state[std_key] = float(default_values['std'][idx])
col1, col2 = st.columns(2)
with col1:
mean = st.number_input(
f"Mean for {feature}",
key=mean_key
)
means.append(mean)
with col2:
std = st.number_input(
f"Std Dev for {feature}",
min_value=0.1,
key=std_key
)
stds.append(std)
else:
# Use default values if specific values not requested
means = default_values['mean']
stds = default_values['std']
class_configs[class_name] = {
'mean': means,
'std': stds
}
return class_configs
def get_training_params(self):
"""Get training parameters from the sidebar"""
st.sidebar.subheader("Sample Size & Train/Test Split Configuration")
# Initialize default values if not in session state
if 'n_samples' not in st.session_state:
st.session_state.n_samples = 10000
col1, col2 = st.sidebar.columns(2)
with col1:
n_samples = st.slider(
"Number of samples",
500,
50000,
step=500,
key='n_samples'
)
with col2:
test_size = st.slider(
"Test Size",
min_value=10,
max_value=50,
value=30, # Default value directly in the widget
step=5,
key='test_size',
format="%d%%",
help="Percentage of data to use for testing"
)
st.write(f"Test: {test_size}% / Train: {100 - test_size}%")
return n_samples, test_size
def generate_and_train(self, n_samples, feature_configs, classes, class_configs, test_size):
"""Generate data and train models"""
X, y = self.data_generator.generate_synthetic_data(
n_samples,
feature_configs,
classes,
class_configs
)
st.session_state.df = pd.DataFrame(X, columns=st.session_state.features)
st.session_state.df['target'] = y
# Train test split
X_train, X_test, y_train, y_test = train_test_split(
X, y,
test_size=test_size/100,
random_state=42
)
# Store split data
st.session_state.X_train = X_train
st.session_state.X_test = X_test
st.session_state.y_train = y_train
st.session_state.y_test = y_test
# Get classifiers and train models
classifiers = self.model_manager.get_classifiers()
results = []
with st.spinner('Training models... Please wait.'):
progress_bar = st.progress(0)
for idx, (name, clf_dict) in enumerate(classifiers.items()):
result = self.model_manager.train_and_evaluate_model(
clf_dict,
X_train,
X_test,
y_train,
y_test,
name
)
results.append(result)
progress_bar.progress((idx + 1) / len(classifiers))
st.session_state.model_results = results
st.session_state.data_generated = True
# Find best model
successful_results = [r for r in results if r['status'] == 'success']
if successful_results:
best_model = max(successful_results, key=lambda x: x['accuracy'])
st.session_state.best_model = best_model
# Store split information
st.session_state.split_info = {
'total_samples': len(X),
'train_samples': len(X_train),
'test_samples': len(X_test),
'test_percentage': test_size
}
st.session_state.feature_configs = feature_configs
# Generate static visualizations after training
successful_results = [r for r in st.session_state.model_results if r['status'] == 'success']
if successful_results:
# Generate and store confusion matrices
st.session_state.confusion_matrices_fig = self.visualizer.create_confusion_matrices_plot(
successful_results,
st.session_state.y_test
)
# Generate and store learning curves
st.session_state.learning_curves_fig = self.generate_learning_curves_figure(successful_results)
def generate_learning_curves_figure(self, successful_results):
"""Generate learning curves figure"""
successful_results.sort(key=lambda x: x['accuracy'], reverse=True)
n_models = len(successful_results)
n_cols = 2
n_rows = (n_models + n_cols - 1) // n_cols
fig_learning = plt.figure(figsize=(15, 5 * n_rows))
for idx, result in enumerate(successful_results):
ax = plt.subplot(n_rows, n_cols, idx + 1)
model_name = result['model_name']
model = result['model']
scaler = joblib.load(result['scaler'])
if model_name == 'MultinomialNB':
X_scaled = self.model_manager.ensure_non_negative(
st.session_state.df.drop('target', axis=1)
)
X_scaled = scaler.transform(X_scaled)
else:
X_scaled = scaler.transform(st.session_state.df.drop('target', axis=1))
y = st.session_state.df['target']
self.visualizer.plot_learning_curve(
model,
X_scaled,
y,
f'Learning Curve - {model_name}\nFinal Accuracy: {result["accuracy"]:.4f}',
ax
)
plt.tight_layout()
return fig_learning
def display_model_comparison(self):
"""Display model comparison section"""
st.subheader("Model Comparison")
comparison_data = []
for result in st.session_state.model_results:
if result['status'] == 'success':
report_dict = classification_report(
st.session_state.y_test,
result['predictions'],
output_dict=True
)
macro_avg = report_dict['macro avg']
comparison_data.append({
'Model': result['model_name'],
'Accuracy': float(f"{result['accuracy']:.4f}"),
'Precision': float(f"{macro_avg['precision']:.4f}"),
'Recall': float(f"{macro_avg['recall']:.4f}"),
'F1-Score': float(f"{macro_avg['f1-score']:.4f}"),
'Training Time (s)': float(f"{result['training_time']:.3f}"),
'Status': 'Success'
})
else:
comparison_data.append({
'Model': result['model_name'],
'Accuracy': 0,
'Precision': 0,
'Recall': 0,
'F1-Score': 0,
'Training Time (s)': 0,
'Status': result['status']
})
comparison_df = pd.DataFrame(comparison_data)
comparison_df = comparison_df.sort_values('Accuracy', ascending=False)
st.dataframe(comparison_df.style.format({
'Accuracy': '{:.4f}',
'Precision': '{:.4f}',
'Recall': '{:.4f}',
'F1-Score': '{:.4f}',
'Training Time (s)': '{:.3f}'
}))
return comparison_df
def display_metric_visualization(self, comparison_df):
"""Display metric visualization section"""
metric_to_plot = st.selectbox(
"Select metric to visualize",
['Accuracy', 'Precision', 'Recall', 'F1-Score', 'Training Time (s)']
)
successful_df = comparison_df[comparison_df['Status'] == 'Success']
if metric_to_plot == 'Training Time (s)':
successful_df = successful_df.sort_values(metric_to_plot)
else:
successful_df = successful_df.sort_values(metric_to_plot, ascending=False)
fig_comparison = px.bar(
successful_df,
x='Model',
y=metric_to_plot,
title=f"Model {metric_to_plot} Comparison",
color=metric_to_plot,
text=metric_to_plot
)
fig_comparison.update_layout(
xaxis_tickangle=-45,
showlegend=True,
height=500,
yaxis=dict(
range=[0, 1] if metric_to_plot != 'Training Time (s)' else None
)
)
fig_comparison.update_traces(
texttemplate='%{text:.4f}',
textposition='outside',
textangle=0
)
st.plotly_chart(fig_comparison)
return successful_df
def display_best_model_performance(self):
"""Display best model performance section"""
if hasattr(st.session_state, 'best_model'):
st.subheader("Best Model Performance")
best_model = st.session_state.best_model
st.write(f"Best Model: **{best_model['model_name']}**")
st.write(f"Accuracy: {best_model['accuracy']:.4f}")
st.write("Classification Report (Best Model):")
report_dict = classification_report(
st.session_state.y_test,
best_model['predictions'],
output_dict=True
)
report_df = pd.DataFrame(report_dict).transpose()
st.dataframe(report_df.style.format('{:.4f}'))
def display_dataset_info(self):
"""Display dataset split information"""
if st.session_state.split_info:
st.subheader("Dataset Split Information")
col1, col2, col3 = st.columns(3)
with col1:
st.metric(
"Total Samples",
st.session_state.split_info['total_samples']
)
with col2:
st.metric(
"Training Samples",
f"{st.session_state.split_info['train_samples']} "
f"({100 - st.session_state.split_info['test_percentage']}%)"
)
with col3:
st.metric(
"Testing Samples",
f"{st.session_state.split_info['test_samples']} "
f"({st.session_state.split_info['test_percentage']}%)"
)
def display_feature_configs(self):
"""Display feature configurations"""
st.subheader("Feature Configurations")
config_data = []
for feature, config in st.session_state.feature_configs.items():
config_data.append({
'Feature': feature,
'Type': config['type'],
'Std Dev': config['std'],
'Center': config['center'] if config['type'] == 'user-defined' else 'Random'
})
st.table(pd.DataFrame(config_data))
def display_data_samples(self):
"""Display original and scaled data samples"""
st.subheader("Generated Data Sample")
# Get random samples from each class
unique_classes = st.session_state.df['target'].unique()
samples_per_class = 2 # Number of samples to show per class
sampled_data = []
for class_name in unique_classes:
class_data = st.session_state.df[st.session_state.df['target'] == class_name]
sampled_data.append(class_data.sample(n=min(samples_per_class, len(class_data))))
sampled_df = pd.concat(sampled_data).sample(frac=1).reset_index(drop=True)
col1, col2 = st.columns(2)
with col1:
st.write("Original Data (Random samples from each class):")
st.write(sampled_df)
with col2:
st.write("Scaled Data (using best model's scaler):")
if st.session_state.best_model and st.session_state.best_model['status'] == 'success':
best_model_name = st.session_state.best_model['model_name']
scaler = joblib.load(st.session_state.best_model['scaler'])
features_df = sampled_df.drop('target', axis=1)
if best_model_name == 'MultinomialNB':
features_scaled = self.model_manager.ensure_non_negative(features_df)
features_scaled = scaler.transform(features_scaled)
else:
features_scaled = scaler.transform(features_df)
scaled_df = pd.DataFrame(
features_scaled,
columns=features_df.columns,
index=features_df.index
)
scaled_df['target'] = sampled_df['target']
st.write(scaled_df)
else:
st.write("No scaled data available (best model not found)")
def display_confusion_matrices(self):
"""Display confusion matrices section"""
st.subheader("Confusion Matrices")
st.write("""
Confusion matrices show the model's prediction performance across different classes.
- Each row represents the actual class
- Each column represents the predicted class
- Diagonal elements represent correct predictions (True Positives for each class)
- Off-diagonal elements represent incorrect predictions
- Numbers show how many samples were classified for each combination
- Colors range from yellow (high values) to green-blue (low values) using the viridis colormap
""")
if st.session_state.confusion_matrices_fig is not None:
st.pyplot(st.session_state.confusion_matrices_fig)
plt.close()
def display_performance_summary(self, successful_df):
"""Display performance metrics summary"""
st.subheader("Performance Metrics Summary")
all_models = successful_df['Model'].unique().tolist()
default_selection = all_models
col1, col2 = st.columns([3, 1])
with col1:
selected_models = st.multiselect(
"Select models to compare",
all_models,
default=default_selection
)
if not selected_models:
st.warning("Please select at least one model to display the comparison.")
return
fig_summary = self.visualizer.create_performance_summary_plot(
successful_df,
selected_models
)
st.plotly_chart(fig_summary, use_container_width=True)
def display_saved_models(self):
"""Display saved models information and download buttons"""
st.subheader("Saved Models")
saved_models = []
for result in st.session_state.model_results:
if result['status'] == 'success' and result['model_path']:
# Load model and scaler
model = joblib.load(result['model_path'])
scaler = joblib.load(result['scaler'])
# Create binary data for download using pickle
model_bytes = pickle.dumps(model)
scaler_bytes = pickle.dumps(scaler)
saved_models.append({
'Model': result['model_name'],
'Accuracy': result['accuracy'],
'Model_Binary': model_bytes,
'Scaler_Binary': scaler_bytes
})
if saved_models:
# Display models table
display_df = pd.DataFrame([{
'Model': m['Model'],
'Accuracy': m['Accuracy']
} for m in saved_models])
st.dataframe(display_df.style.format({
'Accuracy': '{:.4f}'
}))
# Add download buttons for each model
st.write("Download Models:")
for model_data in saved_models:
col1, col2 = st.columns(2)
with col1:
st.download_button(
label=f"Download {model_data['Model']} Model",
data=model_data['Model_Binary'],
file_name=f"{model_data['Model']}_model.pkl",
mime="application/octet-stream"
)
with col2:
st.download_button(
label=f"Download {model_data['Model']} Scaler",
data=model_data['Scaler_Binary'],
file_name=f"{model_data['Model']}_scaler.pkl",
mime="application/octet-stream"
)
else:
st.info("No models were saved. Models are saved automatically when accuracy exceeds 0.5")
def display_download_section(self):
"""Display dataset download section"""
st.subheader("Download Dataset")
col1, col2 = st.columns(2)
with col1:
if st.session_state.df is not None:
csv = st.session_state.df.to_csv(index=False)
st.download_button(
label="Download Original Dataset (CSV)",
data=csv,
file_name=f"synthetic_data_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv",
mime='text/csv',
help="Download the original unscaled dataset"
)
with col2:
if st.session_state.best_model and st.session_state.best_model['status'] == 'success':
best_model_name = st.session_state.best_model['model_name']
scaler = joblib.load(st.session_state.best_model['scaler'])
features_df = st.session_state.df.drop('target', axis=1)
if best_model_name == 'MultinomialNB':
features_scaled = self.model_manager.ensure_non_negative(features_df)
features_scaled = scaler.transform(features_scaled)
else:
features_scaled = scaler.transform(features_df)
scaled_df = pd.DataFrame(
features_scaled,
columns=features_df.columns,
index=features_df.index
)
scaled_df['target'] = st.session_state.df['target']
csv_scaled = scaled_df.to_csv(index=False)
st.download_button(
label="Download Scaled Dataset (CSV)",
data=csv_scaled,
file_name=f"synthetic_data_scaled_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv",
mime='text/csv',
help="Download the scaled dataset (using best model's scaler)"
)
def display_dataset_statistics(self):
"""Display dataset statistics"""
with st.expander("Dataset Statistics"):
col1, col2 = st.columns(2)
with col1:
st.write("Original Dataset Statistics:")
st.write(st.session_state.df.describe())
with col2:
if st.session_state.best_model and st.session_state.best_model['status'] == 'success':
st.write("Scaled Dataset Statistics:")
best_model_name = st.session_state.best_model['model_name']
scaler = joblib.load(st.session_state.best_model['scaler'])
features_df = st.session_state.df.drop('target', axis=1)
if best_model_name == 'MultinomialNB':
features_scaled = self.model_manager.ensure_non_negative(features_df)
features_scaled = scaler.transform(features_scaled)
else:
features_scaled = scaler.transform(features_df)
scaled_df = pd.DataFrame(
features_scaled,
columns=features_df.columns,
index=features_df.index
)
scaled_df['target'] = st.session_state.df['target']
st.write(scaled_df.describe())
def display_learning_curves(self):
"""Display learning curves section"""
st.subheader("Learning Curves")
st.write("""
Learning curves show how model performance changes with increasing training data.
- Blue line: Training score
- Orange line: Cross-validation score
- Shaded areas represent standard deviation
""")
if st.session_state.learning_curves_fig is not None:
st.pyplot(st.session_state.learning_curves_fig)
plt.close()
def display_feature_visualization(self):
"""Display 2D and 3D feature visualizations"""
st.subheader("Feature Visualization")
plot_type = st.radio("Select plot type", ["2D Plot", "3D Plot"], index=1)
if plot_type == "2D Plot":
col1, col2 = st.columns(2)
with col1:
x_feature = st.selectbox(
"Select X-axis feature",
st.session_state.features,
index=0,
key='x_2d'
)
with col2:
y_features = [f for f in st.session_state.features if f != x_feature]
y_feature = st.selectbox(
"Select Y-axis feature",
y_features,
index=0,
key='y_2d'
)
fig = px.scatter(
st.session_state.df,
x=x_feature,
y=y_feature,
color='target',
title=f"2D Visualization of {x_feature} vs {y_feature}",
labels={'target': 'Class'}
)
st.plotly_chart(fig, use_container_width=True)
else: # 3D Plot
col1, col2, col3 = st.columns(3)
with col1:
x_feature = st.selectbox(
"Select X-axis feature",
st.session_state.features,
index=0,
key='x_3d'
)
with col2:
y_features = [f for f in st.session_state.features if f != x_feature]
y_feature = st.selectbox(
"Select Y-axis feature",
y_features,
index=0,
key='y_3d'
)
with col3:
z_features = [f for f in st.session_state.features if f not in [x_feature, y_feature]]
z_feature = st.selectbox(
"Select Z-axis feature",
z_features,
index=0,
key='z_3d'
)
fig = px.scatter_3d(
st.session_state.df,
x=x_feature,
y=y_feature,
z=z_feature,
color='target',
title=f"3D Visualization of {x_feature} vs {y_feature} vs {z_feature}",
labels={'target': 'Class'}
)
fig.update_layout(
scene = dict(
xaxis_title=x_feature,
yaxis_title=y_feature,
zaxis_title=z_feature
),
scene_camera=dict(
up=dict(x=0, y=0, z=1),
center=dict(x=0, y=0, z=0),
eye=dict(x=1.5, y=1.5, z=1.5)
)
)
st.plotly_chart(fig, use_container_width=True)
def get_data_source(self):
"""Get user's choice of data source"""
st.sidebar.header("Data Source")
data_source = st.sidebar.radio(
"Choose data source",
['Generate Synthetic Data', 'Upload Dataset'],
key='data_source_radio'
)
st.session_state.data_source = 'synthetic' if data_source == 'Generate Synthetic Data' else 'upload'
return st.session_state.data_source
def upload_dataset(self):
"""Handle dataset upload"""
st.sidebar.header("Upload Dataset")
uploaded_file = st.sidebar.file_uploader(
"Choose a CSV file",
type="csv",
help="Upload a CSV file with features and target column"
)
if uploaded_file is not None:
try:
df = pd.read_csv(uploaded_file)
# Let user select target column
target_col = st.sidebar.selectbox(
"Select target column",
df.columns.tolist()
)
# Store features and target
features = [col for col in df.columns if col != target_col]
X = df[features]
y = df[target_col]
# Store in session state
st.session_state.df = df
st.session_state.features = features
# Train test split
test_size = st.sidebar.slider(
"Test Size",
min_value=10,
max_value=50,
value=30,
step=5,
format="%d%%",
help="Percentage of data to use for testing"
)
X_train, X_test, y_train, y_test = train_test_split(
X, y,
test_size=test_size/100,
random_state=42
)
# Store split data
st.session_state.X_train = X_train
st.session_state.X_test = X_test
st.session_state.y_train = y_train
st.session_state.y_test = y_test
# Store split information
st.session_state.split_info = {
'total_samples': len(X),
'train_samples': len(X_train),
'test_samples': len(X_test),
'test_percentage': test_size
}
return True
except Exception as e:
st.sidebar.error(f"Error loading dataset: {str(e)}")
return False
return False
def run(self):
"""Main application logic"""
self.setup_page_config()
self.initialize_session_state()
st.title("ML Model Generator")
# Get data source choice
data_source = self.get_data_source()
if data_source == 'synthetic':
st.sidebar.header("Synthetic Data Generation")
# Get inputs from sidebar for synthetic data
features, feature_configs, classes = self.get_sidebar_inputs()
class_configs = self.get_class_configs(classes, features)
n_samples, test_size = self.get_training_params()
# Store features in session state
st.session_state.features = features
# Generate Data button
if st.sidebar.button("Generate Data and Train Models"):
self.generate_and_train(n_samples, feature_configs, classes, class_configs, test_size)
else: # upload
# Handle dataset upload
if self.upload_dataset():
if st.sidebar.button("Train Models"):
# Get classifiers and train models
classifiers = self.model_manager.get_classifiers()
results = []
with st.spinner('Training models... Please wait.'):
progress_bar = st.progress(0)
for idx, (name, clf_dict) in enumerate(classifiers.items()):
result = self.model_manager.train_and_evaluate_model(
clf_dict,
st.session_state.X_train,
st.session_state.X_test,
st.session_state.y_train,
st.session_state.y_test,
name
)
results.append(result)
progress_bar.progress((idx + 1) / len(classifiers))
st.session_state.model_results = results
st.session_state.data_generated = True
# Find best model
successful_results = [r for r in results if r['status'] == 'success']
if successful_results:
best_model = max(successful_results, key=lambda x: x['accuracy'])
st.session_state.best_model = best_model
# Generate static visualizations
st.session_state.confusion_matrices_fig = self.visualizer.create_confusion_matrices_plot(
successful_results,
st.session_state.y_test
)
st.session_state.learning_curves_fig = self.generate_learning_curves_figure(successful_results)
# Display results if data has been generated/uploaded and trained
if st.session_state.data_generated:
self.display_dataset_info()
self.display_data_samples()
self.display_feature_visualization()
self.display_download_section()
self.display_dataset_statistics()
self.display_best_model_performance()
successful_df = self.display_model_comparison()
if successful_df is not None and not successful_df.empty:
self.display_performance_summary(successful_df)
self.display_saved_models()
self.display_learning_curves()
self.display_confusion_matrices()
else:
if data_source == 'synthetic':
st.info("Please generate data using the sidebar button to view visualizations and results.")
else:
st.info("Please upload a dataset and click 'Train Models' to view visualizations and results.")
def main():
app = StreamlitUI()
app.run()
if __name__ == "__main__":
main()