Spaces:
Sleeping
Sleeping
| import streamlit as st | |
| import numpy as np | |
| import pandas as pd | |
| from sklearn.model_selection import train_test_split | |
| from sklearn.ensemble import RandomForestClassifier | |
| from sklearn.metrics import accuracy_score, classification_report, confusion_matrix | |
| import plotly.express as px | |
| from sklearn.linear_model import LogisticRegression, RidgeClassifier | |
| from sklearn.ensemble import AdaBoostClassifier, ExtraTreesClassifier | |
| from sklearn.svm import SVC, LinearSVC | |
| from sklearn.naive_bayes import GaussianNB, MultinomialNB | |
| from sklearn.neighbors import KNeighborsClassifier | |
| from sklearn.neural_network import MLPClassifier | |
| from sklearn.preprocessing import MaxAbsScaler, StandardScaler, MinMaxScaler | |
| import time | |
| import warnings | |
| import joblib | |
| import os | |
| from datetime import datetime | |
| import seaborn as sns | |
| import matplotlib.pyplot as plt | |
| from matplotlib.colors import LinearSegmentedColormap | |
| from sklearn.model_selection import learning_curve | |
| import pickle | |
| warnings.filterwarnings('ignore') | |
| class DataGenerator: | |
| def __init__(self): | |
| self.features = None | |
| self.feature_configs = None | |
| self.classes = None | |
| self.class_configs = None | |
| def generate_synthetic_data(self, n_samples, feature_configs, classes, class_configs=None): | |
| """Generate synthetic data based on configurations""" | |
| n_features = len(feature_configs) | |
| n_classes = len(classes) | |
| X = [] | |
| y = [] | |
| samples_per_class = n_samples // n_classes | |
| for i in range(n_classes): | |
| class_samples = [] | |
| class_name = classes[i] | |
| for j, (feature_name, config) in enumerate(feature_configs.items()): | |
| if class_configs and class_name in class_configs: | |
| center = class_configs[class_name]['mean'][j] | |
| std = class_configs[class_name]['std'][j] | |
| else: | |
| if config['type'] == 'random': | |
| center = np.random.randn() * 5 | |
| std = config['std'] | |
| else: | |
| center = config['center'] | |
| std = config['std'] | |
| feature_samples = np.round(np.random.normal( | |
| loc=center, | |
| scale=std, | |
| size=samples_per_class | |
| ), decimals=2) | |
| class_samples.append(feature_samples) | |
| X.append(np.column_stack(class_samples)) | |
| y.extend([classes[i]] * samples_per_class) | |
| X = np.vstack(X) | |
| return X, np.array(y) | |
| class ModelManager: | |
| def get_classifiers(): | |
| """Return dictionary of classifiers with appropriate preprocessing""" | |
| return { | |
| 'LogisticRegression': { | |
| 'model': LogisticRegression(max_iter=1000), | |
| 'scaler': StandardScaler() | |
| }, | |
| 'RidgeClassifier': { | |
| 'model': RidgeClassifier(), | |
| 'scaler': StandardScaler() | |
| }, | |
| 'RandomForestClassifier': { | |
| 'model': RandomForestClassifier(random_state=42), | |
| 'scaler': StandardScaler() | |
| }, | |
| 'AdaBoostClassifier': { | |
| 'model': AdaBoostClassifier(), | |
| 'scaler': StandardScaler() | |
| }, | |
| 'ExtraTreesClassifier': { | |
| 'model': ExtraTreesClassifier(), | |
| 'scaler': StandardScaler() | |
| }, | |
| 'SVC': { | |
| 'model': SVC(), | |
| 'scaler': StandardScaler() | |
| }, | |
| 'LinearSVC': { | |
| 'model': LinearSVC(max_iter=2000), | |
| 'scaler': StandardScaler() | |
| }, | |
| 'GaussianNB': { | |
| 'model': GaussianNB(), | |
| 'scaler': StandardScaler() | |
| }, | |
| 'KNeighborsClassifier': { | |
| 'model': KNeighborsClassifier(), | |
| 'scaler': StandardScaler() | |
| }, | |
| 'MLPClassifier': { | |
| 'model': MLPClassifier(max_iter=1000), | |
| 'scaler': StandardScaler() | |
| }, | |
| 'MultinomialNB': { | |
| 'model': MultinomialNB(), | |
| 'scaler': MaxAbsScaler() | |
| } | |
| } | |
| def ensure_non_negative(X): | |
| """Ensure data is non-negative by shifting""" | |
| if isinstance(X, pd.DataFrame): | |
| min_val = X.values.min() | |
| if min_val < 0: | |
| return X + abs(min_val) | |
| return X | |
| else: | |
| min_val = X.min() | |
| if min_val < 0: | |
| return X - min_val | |
| return X | |
| def save_model(self, model_dict, model_name): | |
| """Save model and its scaler to files""" | |
| if not os.path.exists('models'): | |
| os.makedirs('models') | |
| base_filename = f"{model_name}" | |
| if hasattr(model_dict['model'], 'feature_names_in_'): | |
| model_dict['scaler'].feature_names_in_ = model_dict['model'].feature_names_in_ | |
| elif hasattr(st.session_state, 'features'): | |
| model_dict['scaler'].feature_names_in_ = np.array(st.session_state.features) | |
| model_path = os.path.join('models', f"{base_filename}_model.joblib") | |
| scaler_path = os.path.join('models', f"{base_filename}_scaler.joblib") | |
| joblib.dump(model_dict['model'], model_path) | |
| joblib.dump(model_dict['scaler'], scaler_path) | |
| return model_path, scaler_path | |
| def train_and_evaluate_model(self, clf_dict, X_train, X_test, y_train, y_test, model_name): | |
| """Train and evaluate a single model""" | |
| start_time = time.time() | |
| try: | |
| scaler = clf_dict['scaler'] | |
| feature_names = st.session_state.features if hasattr(st.session_state, 'features') else None | |
| if model_name == 'MultinomialNB': | |
| X_train_positive = self.ensure_non_negative(X_train) | |
| X_test_positive = self.ensure_non_negative(X_test) | |
| X_train_scaled = scaler.fit_transform(X_train_positive) | |
| X_test_scaled = scaler.transform(X_test_positive) | |
| if np.any(X_train_scaled < 0) or np.any(X_test_scaled < 0): | |
| raise ValueError("Negative values in scaled data") | |
| else: | |
| X_train_scaled = scaler.fit_transform(X_train) | |
| X_test_scaled = scaler.transform(X_test) | |
| if feature_names is not None: | |
| if hasattr(clf_dict['model'], 'feature_names_in_'): | |
| clf_dict['model'].feature_names_in_ = np.array(feature_names) | |
| scaler.feature_names_in_ = np.array(feature_names) | |
| clf_dict['model'].fit(X_train_scaled, y_train) | |
| y_pred = clf_dict['model'].predict(X_test_scaled) | |
| accuracy = accuracy_score(y_test, y_pred) | |
| training_time = time.time() - start_time | |
| model_path, scaler_path = self.save_model(clf_dict, model_name) | |
| conf_matrix = confusion_matrix(y_test, y_pred) | |
| return { | |
| 'model_name': model_name, | |
| 'accuracy': accuracy, | |
| 'training_time': training_time, | |
| 'model': clf_dict['model'], | |
| 'predictions': y_pred, | |
| 'status': 'success', | |
| 'scaler': scaler_path, | |
| 'model_path': model_path, | |
| 'confusion_matrix': conf_matrix | |
| } | |
| except Exception as e: | |
| return { | |
| 'model_name': model_name, | |
| 'accuracy': 0, | |
| 'training_time': 0, | |
| 'model': None, | |
| 'predictions': None, | |
| 'status': f'failed: {str(e)}', | |
| 'scaler': None, | |
| 'model_path': None, | |
| 'confusion_matrix': None | |
| } | |
| class Visualizer: | |
| def plot_learning_curve(estimator, X, y, title, ax): | |
| """Plot learning curves for a model""" | |
| train_sizes, train_scores, test_scores = learning_curve( | |
| estimator, X, y, | |
| train_sizes=np.linspace(0.1, 1.0, 10), | |
| cv=5, | |
| n_jobs=-1, | |
| scoring='accuracy' | |
| ) | |
| train_mean = np.mean(train_scores, axis=1) | |
| train_std = np.std(train_scores, axis=1) | |
| test_mean = np.mean(test_scores, axis=1) | |
| test_std = np.std(test_scores, axis=1) | |
| ax.plot(train_sizes, train_mean, label='Training score') | |
| ax.plot(train_sizes, test_mean, label='Cross-validation score') | |
| ax.fill_between(train_sizes, train_mean - train_std, train_mean + train_std, alpha=0.1) | |
| ax.fill_between(train_sizes, test_mean - test_std, test_mean + test_std, alpha=0.1) | |
| ax.set_xlabel('Training Examples') | |
| ax.set_ylabel('Score') | |
| ax.set_title(title) | |
| ax.legend(loc='lower right') | |
| ax.grid(True) | |
| def create_confusion_matrices_plot(self, successful_results, y_test): | |
| """Create and display confusion matrices for successful models""" | |
| n_models = len(successful_results) | |
| n_cols = 2 | |
| n_rows = (n_models + n_cols - 1) // n_cols | |
| fig = plt.figure(figsize=(15, 5 * n_rows)) | |
| colors = ['white', '#4a90e2'] | |
| n_bins = 100 | |
| # cmap = LinearSegmentedColormap.from_list("custom_blues", colors, N=n_bins) | |
| for idx, result in enumerate(successful_results): | |
| ax = plt.subplot(n_rows, n_cols, idx + 1) | |
| sns.heatmap( | |
| result['confusion_matrix'], | |
| annot=True, | |
| fmt='d', | |
| # cmap=cmap, | |
| cmap='viridis', | |
| ax=ax, | |
| xticklabels=sorted(set(y_test)), | |
| yticklabels=sorted(set(y_test)) | |
| ) | |
| ax.set_xlabel('Predicted') | |
| ax.set_ylabel('Actual') | |
| ax.set_title(f"{result['model_name']}\nAccuracy: {result['accuracy']:.4f}") | |
| plt.tight_layout() | |
| return fig | |
| def create_performance_summary_plot(self, successful_df, selected_models): | |
| """Create performance metrics summary plot""" | |
| metrics_to_compare = ['Accuracy', 'Precision', 'Recall', 'F1-Score'] | |
| summary_df = successful_df[successful_df['Model'].isin(selected_models)].melt( | |
| id_vars=['Model'], | |
| value_vars=metrics_to_compare, | |
| var_name='Metric', | |
| value_name='Score' | |
| ) | |
| fig_summary = px.bar( | |
| summary_df, | |
| x='Model', | |
| y='Score', | |
| color='Metric', | |
| barmode='group', | |
| title="Model Performance Metrics Comparison", | |
| text='Score' | |
| ) | |
| fig_summary.update_layout( | |
| xaxis_tickangle=-45, | |
| showlegend=True, | |
| height=600, | |
| yaxis=dict( | |
| range=[0, 1], | |
| title='Score' | |
| ), | |
| legend=dict( | |
| title='Metric', | |
| orientation='h', | |
| yanchor='bottom', | |
| y=1.02, | |
| xanchor='right', | |
| x=1 | |
| ) | |
| ) | |
| fig_summary.update_traces( | |
| texttemplate='%{text:.4f}', | |
| textposition='outside', | |
| textangle=0 | |
| ) | |
| summary_df['Avg_Score'] = summary_df.groupby('Model')['Score'].transform('mean') | |
| models_order = summary_df.drop_duplicates('Model').sort_values('Avg_Score', ascending=False)['Model'] | |
| fig_summary.update_layout(xaxis={'categoryorder': 'array', 'categoryarray': models_order}) | |
| return fig_summary | |
| class StreamlitUI: | |
| def __init__(self): | |
| self.data_generator = DataGenerator() | |
| self.model_manager = ModelManager() | |
| self.visualizer = Visualizer() | |
| # Add default configurations as class attribute | |
| self.default_configs = { | |
| # Features: [length (mm), width (mm), density (g/cm³), pH] | |
| # AMPALAYA: Medium length (150-180mm), thin width (40-50mm) | |
| # Medium density (95 g/cm³) due to hollow interior, slightly basic pH (6.8-7.0) | |
| "Ampalaya": {'mean': [165, 45, 95, 6.9], 'std': [15, 5, 10, 0.1]}, | |
| # BANANA: Long length (180-220mm), medium width (30-40mm) | |
| # Low density (85 g/cm³), acidic pH (4.5-5.2) | |
| "Banana": {'mean': [200, 35, 85, 4.8], 'std': [20, 5, 8, 0.3]}, | |
| # CABBAGE: Round shape - similar length/width (150-200mm x 150-200mm) | |
| # Very low density (65 g/cm³) due to layered leaves, neutral pH (6.5-7.0) | |
| "Cabbage": {'mean': [175, 175, 65, 6.8], 'std': [25, 25, 5, 0.2]}, | |
| # CARROT: Medium length (140-180mm), narrow width (25-35mm) | |
| # High density (115 g/cm³) due to dense flesh, slightly acidic pH (6.0-6.5) | |
| "Carrot": {'mean': [160, 30, 115, 6.3], 'std': [20, 5, 10, 0.2]}, | |
| # CASSAVA: Long length (200-300mm), thick width (50-80mm) | |
| # High density (125 g/cm³) due to starchy flesh, slightly acidic pH (6.0-6.5) | |
| "Cassava": {'mean': [250, 65, 125, 6.2], 'std': [50, 15, 12, 0.2]} | |
| } | |
| # Default feature names that match the measurements in default_configs | |
| self.default_features = [ | |
| 'length (mm)', | |
| 'width (mm)', | |
| 'density (g/cm³)', | |
| 'pH' | |
| ] | |
| # Add new session state variables for static visualizations | |
| self.initialize_static_visualizations() | |
| # Add new session state variable for data source | |
| if 'data_source' not in st.session_state: | |
| st.session_state.data_source = 'synthetic' | |
| def initialize_static_visualizations(self): | |
| """Initialize session state variables for static visualizations""" | |
| if 'confusion_matrices_fig' not in st.session_state: | |
| st.session_state.confusion_matrices_fig = None | |
| if 'learning_curves_fig' not in st.session_state: | |
| st.session_state.learning_curves_fig = None | |
| def initialize_session_state(self): | |
| """Initialize all session state variables""" | |
| session_vars = { | |
| 'data_generated': False, | |
| 'df': None, | |
| 'features': None, | |
| 'feature_configs': None, | |
| 'X_train': None, | |
| 'X_test': None, | |
| 'y_train': None, | |
| 'y_test': None, | |
| 'y_pred': None, | |
| 'model_results': None, | |
| 'best_model': None, | |
| 'accuracy': None, | |
| 'feature_importance': None, | |
| 'split_info': None | |
| } | |
| for var, value in session_vars.items(): | |
| if var not in st.session_state: | |
| st.session_state[var] = value | |
| def setup_page_config(self): | |
| """Configure the Streamlit page""" | |
| st.set_page_config( | |
| page_title="ML Model Generator & Implementation", | |
| page_icon="🤖", | |
| layout="wide", | |
| menu_items={ | |
| 'About': """ | |
| ## Final project in Modeling and Simulation \n | |
| ### Juan Dela Cruz - BSCS 4A""" | |
| } | |
| ) | |
| def get_sidebar_inputs(self): | |
| """Get all inputs from the sidebar""" | |
| st.sidebar.header("Data Generation Parameters") | |
| # Feature configuration | |
| st.sidebar.subheader("Feature Configuration") | |
| # Initialize default features if not in session state | |
| if 'features_input' not in st.session_state: | |
| st.session_state.features_input = ", ".join(self.default_features) | |
| features_input = st.sidebar.text_input( | |
| "Enter feature names (comma-separated)", | |
| key='features_input' | |
| ) | |
| features = [f.strip() for f in features_input.split(",")] | |
| # Initialize default classes if not in session state | |
| if 'classes_input' not in st.session_state: | |
| st.session_state.classes_input = ", ".join(self.default_configs.keys()) | |
| classes_input = st.sidebar.text_input( | |
| "Enter class names (comma-separated)", | |
| key='classes_input' | |
| ) | |
| classes = [c.strip() for c in classes_input.split(",")] | |
| # Generate feature configs | |
| feature_configs = {} | |
| for feature in features: | |
| feature_configs[feature] = { | |
| 'type': 'random', | |
| 'std': 20.0, | |
| 'center': None | |
| } | |
| return features, feature_configs, classes | |
| def get_class_configs(self, classes, features): | |
| """Get class-specific configurations from the sidebar""" | |
| class_configs = {} | |
| st.sidebar.subheader("Class-Specific Settings") | |
| for class_name in classes: | |
| with st.sidebar.expander(f"{class_name} Settings", expanded=False): | |
| checkbox_key = f"use_specific_{class_name}" | |
| # Initialize checkbox state if not in session state | |
| if checkbox_key not in st.session_state: | |
| st.session_state[checkbox_key] = True | |
| use_specific = st.checkbox( | |
| f"Set specific values for {class_name}", | |
| key=checkbox_key | |
| ) | |
| means = [] | |
| stds = [] | |
| # Generate unique means for each class if not in default configs | |
| if class_name not in self.default_configs: | |
| # Generate random means between 0-100 that are different from other classes | |
| random_means = [] | |
| for _ in range(len(features)): | |
| mean = np.random.uniform(0, 100) | |
| # Ensure means are unique across classes | |
| while any(abs(mean - c['mean'][_]) < 10 for c in class_configs.values() if 'mean' in c): | |
| mean = np.random.uniform(0, 100) | |
| random_means.append(mean) | |
| default_values = {'mean': random_means, 'std': [20.0] * len(features)} | |
| else: | |
| # Ensure default values match the number of features | |
| default_means = self.default_configs[class_name]['mean'] | |
| default_stds = self.default_configs[class_name]['std'] | |
| # If we have more features than default values, extend with random values | |
| if len(features) > len(default_means): | |
| additional_means = [np.random.uniform(0, 100) for _ in range(len(features) - len(default_means))] | |
| additional_stds = [20.0 for _ in range(len(features) - len(default_stds))] | |
| default_means.extend(additional_means) | |
| default_stds.extend(additional_stds) | |
| # If we have fewer features than default values, truncate | |
| elif len(features) < len(default_means): | |
| default_means = default_means[:len(features)] | |
| default_stds = default_stds[:len(features)] | |
| default_values = {'mean': default_means, 'std': default_stds} | |
| if use_specific: | |
| for idx, feature in enumerate(features): | |
| mean_key = f"mean_{class_name}_{feature}" | |
| std_key = f"std_{class_name}_{feature}" | |
| if mean_key not in st.session_state: | |
| st.session_state[mean_key] = float(default_values['mean'][idx]) | |
| if std_key not in st.session_state: | |
| st.session_state[std_key] = float(default_values['std'][idx]) | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| mean = st.number_input( | |
| f"Mean for {feature}", | |
| key=mean_key | |
| ) | |
| means.append(mean) | |
| with col2: | |
| std = st.number_input( | |
| f"Std Dev for {feature}", | |
| min_value=0.1, | |
| key=std_key | |
| ) | |
| stds.append(std) | |
| else: | |
| # Use default values if specific values not requested | |
| means = default_values['mean'] | |
| stds = default_values['std'] | |
| class_configs[class_name] = { | |
| 'mean': means, | |
| 'std': stds | |
| } | |
| return class_configs | |
| def get_training_params(self): | |
| """Get training parameters from the sidebar""" | |
| st.sidebar.subheader("Sample Size & Train/Test Split Configuration") | |
| # Initialize default values if not in session state | |
| if 'n_samples' not in st.session_state: | |
| st.session_state.n_samples = 10000 | |
| col1, col2 = st.sidebar.columns(2) | |
| with col1: | |
| n_samples = st.slider( | |
| "Number of samples", | |
| 500, | |
| 50000, | |
| step=500, | |
| key='n_samples' | |
| ) | |
| with col2: | |
| test_size = st.slider( | |
| "Test Size", | |
| min_value=10, | |
| max_value=50, | |
| value=30, # Default value directly in the widget | |
| step=5, | |
| key='test_size', | |
| format="%d%%", | |
| help="Percentage of data to use for testing" | |
| ) | |
| st.write(f"Test: {test_size}% / Train: {100 - test_size}%") | |
| return n_samples, test_size | |
| def generate_and_train(self, n_samples, feature_configs, classes, class_configs, test_size): | |
| """Generate data and train models""" | |
| X, y = self.data_generator.generate_synthetic_data( | |
| n_samples, | |
| feature_configs, | |
| classes, | |
| class_configs | |
| ) | |
| st.session_state.df = pd.DataFrame(X, columns=st.session_state.features) | |
| st.session_state.df['target'] = y | |
| # Train test split | |
| X_train, X_test, y_train, y_test = train_test_split( | |
| X, y, | |
| test_size=test_size/100, | |
| random_state=42 | |
| ) | |
| # Store split data | |
| st.session_state.X_train = X_train | |
| st.session_state.X_test = X_test | |
| st.session_state.y_train = y_train | |
| st.session_state.y_test = y_test | |
| # Get classifiers and train models | |
| classifiers = self.model_manager.get_classifiers() | |
| results = [] | |
| with st.spinner('Training models... Please wait.'): | |
| progress_bar = st.progress(0) | |
| for idx, (name, clf_dict) in enumerate(classifiers.items()): | |
| result = self.model_manager.train_and_evaluate_model( | |
| clf_dict, | |
| X_train, | |
| X_test, | |
| y_train, | |
| y_test, | |
| name | |
| ) | |
| results.append(result) | |
| progress_bar.progress((idx + 1) / len(classifiers)) | |
| st.session_state.model_results = results | |
| st.session_state.data_generated = True | |
| # Find best model | |
| successful_results = [r for r in results if r['status'] == 'success'] | |
| if successful_results: | |
| best_model = max(successful_results, key=lambda x: x['accuracy']) | |
| st.session_state.best_model = best_model | |
| # Store split information | |
| st.session_state.split_info = { | |
| 'total_samples': len(X), | |
| 'train_samples': len(X_train), | |
| 'test_samples': len(X_test), | |
| 'test_percentage': test_size | |
| } | |
| st.session_state.feature_configs = feature_configs | |
| # Generate static visualizations after training | |
| successful_results = [r for r in st.session_state.model_results if r['status'] == 'success'] | |
| if successful_results: | |
| # Generate and store confusion matrices | |
| st.session_state.confusion_matrices_fig = self.visualizer.create_confusion_matrices_plot( | |
| successful_results, | |
| st.session_state.y_test | |
| ) | |
| # Generate and store learning curves | |
| st.session_state.learning_curves_fig = self.generate_learning_curves_figure(successful_results) | |
| def generate_learning_curves_figure(self, successful_results): | |
| """Generate learning curves figure""" | |
| successful_results.sort(key=lambda x: x['accuracy'], reverse=True) | |
| n_models = len(successful_results) | |
| n_cols = 2 | |
| n_rows = (n_models + n_cols - 1) // n_cols | |
| fig_learning = plt.figure(figsize=(15, 5 * n_rows)) | |
| for idx, result in enumerate(successful_results): | |
| ax = plt.subplot(n_rows, n_cols, idx + 1) | |
| model_name = result['model_name'] | |
| model = result['model'] | |
| scaler = joblib.load(result['scaler']) | |
| if model_name == 'MultinomialNB': | |
| X_scaled = self.model_manager.ensure_non_negative( | |
| st.session_state.df.drop('target', axis=1) | |
| ) | |
| X_scaled = scaler.transform(X_scaled) | |
| else: | |
| X_scaled = scaler.transform(st.session_state.df.drop('target', axis=1)) | |
| y = st.session_state.df['target'] | |
| self.visualizer.plot_learning_curve( | |
| model, | |
| X_scaled, | |
| y, | |
| f'Learning Curve - {model_name}\nFinal Accuracy: {result["accuracy"]:.4f}', | |
| ax | |
| ) | |
| plt.tight_layout() | |
| return fig_learning | |
| def display_model_comparison(self): | |
| """Display model comparison section""" | |
| st.subheader("Model Comparison") | |
| comparison_data = [] | |
| for result in st.session_state.model_results: | |
| if result['status'] == 'success': | |
| report_dict = classification_report( | |
| st.session_state.y_test, | |
| result['predictions'], | |
| output_dict=True | |
| ) | |
| macro_avg = report_dict['macro avg'] | |
| comparison_data.append({ | |
| 'Model': result['model_name'], | |
| 'Accuracy': float(f"{result['accuracy']:.4f}"), | |
| 'Precision': float(f"{macro_avg['precision']:.4f}"), | |
| 'Recall': float(f"{macro_avg['recall']:.4f}"), | |
| 'F1-Score': float(f"{macro_avg['f1-score']:.4f}"), | |
| 'Training Time (s)': float(f"{result['training_time']:.3f}"), | |
| 'Status': 'Success' | |
| }) | |
| else: | |
| comparison_data.append({ | |
| 'Model': result['model_name'], | |
| 'Accuracy': 0, | |
| 'Precision': 0, | |
| 'Recall': 0, | |
| 'F1-Score': 0, | |
| 'Training Time (s)': 0, | |
| 'Status': result['status'] | |
| }) | |
| comparison_df = pd.DataFrame(comparison_data) | |
| comparison_df = comparison_df.sort_values('Accuracy', ascending=False) | |
| st.dataframe(comparison_df.style.format({ | |
| 'Accuracy': '{:.4f}', | |
| 'Precision': '{:.4f}', | |
| 'Recall': '{:.4f}', | |
| 'F1-Score': '{:.4f}', | |
| 'Training Time (s)': '{:.3f}' | |
| })) | |
| return comparison_df | |
| def display_metric_visualization(self, comparison_df): | |
| """Display metric visualization section""" | |
| metric_to_plot = st.selectbox( | |
| "Select metric to visualize", | |
| ['Accuracy', 'Precision', 'Recall', 'F1-Score', 'Training Time (s)'] | |
| ) | |
| successful_df = comparison_df[comparison_df['Status'] == 'Success'] | |
| if metric_to_plot == 'Training Time (s)': | |
| successful_df = successful_df.sort_values(metric_to_plot) | |
| else: | |
| successful_df = successful_df.sort_values(metric_to_plot, ascending=False) | |
| fig_comparison = px.bar( | |
| successful_df, | |
| x='Model', | |
| y=metric_to_plot, | |
| title=f"Model {metric_to_plot} Comparison", | |
| color=metric_to_plot, | |
| text=metric_to_plot | |
| ) | |
| fig_comparison.update_layout( | |
| xaxis_tickangle=-45, | |
| showlegend=True, | |
| height=500, | |
| yaxis=dict( | |
| range=[0, 1] if metric_to_plot != 'Training Time (s)' else None | |
| ) | |
| ) | |
| fig_comparison.update_traces( | |
| texttemplate='%{text:.4f}', | |
| textposition='outside', | |
| textangle=0 | |
| ) | |
| st.plotly_chart(fig_comparison) | |
| return successful_df | |
| def display_best_model_performance(self): | |
| """Display best model performance section""" | |
| if hasattr(st.session_state, 'best_model'): | |
| st.subheader("Best Model Performance") | |
| best_model = st.session_state.best_model | |
| st.write(f"Best Model: **{best_model['model_name']}**") | |
| st.write(f"Accuracy: {best_model['accuracy']:.4f}") | |
| st.write("Classification Report (Best Model):") | |
| report_dict = classification_report( | |
| st.session_state.y_test, | |
| best_model['predictions'], | |
| output_dict=True | |
| ) | |
| report_df = pd.DataFrame(report_dict).transpose() | |
| st.dataframe(report_df.style.format('{:.4f}')) | |
| def display_dataset_info(self): | |
| """Display dataset split information""" | |
| if st.session_state.split_info: | |
| st.subheader("Dataset Split Information") | |
| col1, col2, col3 = st.columns(3) | |
| with col1: | |
| st.metric( | |
| "Total Samples", | |
| st.session_state.split_info['total_samples'] | |
| ) | |
| with col2: | |
| st.metric( | |
| "Training Samples", | |
| f"{st.session_state.split_info['train_samples']} " | |
| f"({100 - st.session_state.split_info['test_percentage']}%)" | |
| ) | |
| with col3: | |
| st.metric( | |
| "Testing Samples", | |
| f"{st.session_state.split_info['test_samples']} " | |
| f"({st.session_state.split_info['test_percentage']}%)" | |
| ) | |
| def display_feature_configs(self): | |
| """Display feature configurations""" | |
| st.subheader("Feature Configurations") | |
| config_data = [] | |
| for feature, config in st.session_state.feature_configs.items(): | |
| config_data.append({ | |
| 'Feature': feature, | |
| 'Type': config['type'], | |
| 'Std Dev': config['std'], | |
| 'Center': config['center'] if config['type'] == 'user-defined' else 'Random' | |
| }) | |
| st.table(pd.DataFrame(config_data)) | |
| def display_data_samples(self): | |
| """Display original and scaled data samples""" | |
| st.subheader("Generated Data Sample") | |
| # Get random samples from each class | |
| unique_classes = st.session_state.df['target'].unique() | |
| samples_per_class = 2 # Number of samples to show per class | |
| sampled_data = [] | |
| for class_name in unique_classes: | |
| class_data = st.session_state.df[st.session_state.df['target'] == class_name] | |
| sampled_data.append(class_data.sample(n=min(samples_per_class, len(class_data)))) | |
| sampled_df = pd.concat(sampled_data).sample(frac=1).reset_index(drop=True) | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| st.write("Original Data (Random samples from each class):") | |
| st.write(sampled_df) | |
| with col2: | |
| st.write("Scaled Data (using best model's scaler):") | |
| if st.session_state.best_model and st.session_state.best_model['status'] == 'success': | |
| best_model_name = st.session_state.best_model['model_name'] | |
| scaler = joblib.load(st.session_state.best_model['scaler']) | |
| features_df = sampled_df.drop('target', axis=1) | |
| if best_model_name == 'MultinomialNB': | |
| features_scaled = self.model_manager.ensure_non_negative(features_df) | |
| features_scaled = scaler.transform(features_scaled) | |
| else: | |
| features_scaled = scaler.transform(features_df) | |
| scaled_df = pd.DataFrame( | |
| features_scaled, | |
| columns=features_df.columns, | |
| index=features_df.index | |
| ) | |
| scaled_df['target'] = sampled_df['target'] | |
| st.write(scaled_df) | |
| else: | |
| st.write("No scaled data available (best model not found)") | |
| def display_confusion_matrices(self): | |
| """Display confusion matrices section""" | |
| st.subheader("Confusion Matrices") | |
| st.write(""" | |
| Confusion matrices show the model's prediction performance across different classes. | |
| - Each row represents the actual class | |
| - Each column represents the predicted class | |
| - Diagonal elements represent correct predictions (True Positives for each class) | |
| - Off-diagonal elements represent incorrect predictions | |
| - Numbers show how many samples were classified for each combination | |
| - Colors range from yellow (high values) to green-blue (low values) using the viridis colormap | |
| """) | |
| if st.session_state.confusion_matrices_fig is not None: | |
| st.pyplot(st.session_state.confusion_matrices_fig) | |
| plt.close() | |
| def display_performance_summary(self, successful_df): | |
| """Display performance metrics summary""" | |
| st.subheader("Performance Metrics Summary") | |
| all_models = successful_df['Model'].unique().tolist() | |
| default_selection = all_models | |
| col1, col2 = st.columns([3, 1]) | |
| with col1: | |
| selected_models = st.multiselect( | |
| "Select models to compare", | |
| all_models, | |
| default=default_selection | |
| ) | |
| if not selected_models: | |
| st.warning("Please select at least one model to display the comparison.") | |
| return | |
| fig_summary = self.visualizer.create_performance_summary_plot( | |
| successful_df, | |
| selected_models | |
| ) | |
| st.plotly_chart(fig_summary, use_container_width=True) | |
| def display_saved_models(self): | |
| """Display saved models information and download buttons""" | |
| st.subheader("Saved Models") | |
| saved_models = [] | |
| for result in st.session_state.model_results: | |
| if result['status'] == 'success' and result['model_path']: | |
| # Load model and scaler | |
| model = joblib.load(result['model_path']) | |
| scaler = joblib.load(result['scaler']) | |
| # Create binary data for download using pickle | |
| model_bytes = pickle.dumps(model) | |
| scaler_bytes = pickle.dumps(scaler) | |
| saved_models.append({ | |
| 'Model': result['model_name'], | |
| 'Accuracy': result['accuracy'], | |
| 'Model_Binary': model_bytes, | |
| 'Scaler_Binary': scaler_bytes | |
| }) | |
| if saved_models: | |
| # Display models table | |
| display_df = pd.DataFrame([{ | |
| 'Model': m['Model'], | |
| 'Accuracy': m['Accuracy'] | |
| } for m in saved_models]) | |
| st.dataframe(display_df.style.format({ | |
| 'Accuracy': '{:.4f}' | |
| })) | |
| # Add download buttons for each model | |
| st.write("Download Models:") | |
| for model_data in saved_models: | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| st.download_button( | |
| label=f"Download {model_data['Model']} Model", | |
| data=model_data['Model_Binary'], | |
| file_name=f"{model_data['Model']}_model.pkl", | |
| mime="application/octet-stream" | |
| ) | |
| with col2: | |
| st.download_button( | |
| label=f"Download {model_data['Model']} Scaler", | |
| data=model_data['Scaler_Binary'], | |
| file_name=f"{model_data['Model']}_scaler.pkl", | |
| mime="application/octet-stream" | |
| ) | |
| else: | |
| st.info("No models were saved. Models are saved automatically when accuracy exceeds 0.5") | |
| def display_download_section(self): | |
| """Display dataset download section""" | |
| st.subheader("Download Dataset") | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| if st.session_state.df is not None: | |
| csv = st.session_state.df.to_csv(index=False) | |
| st.download_button( | |
| label="Download Original Dataset (CSV)", | |
| data=csv, | |
| file_name=f"synthetic_data_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv", | |
| mime='text/csv', | |
| help="Download the original unscaled dataset" | |
| ) | |
| with col2: | |
| if st.session_state.best_model and st.session_state.best_model['status'] == 'success': | |
| best_model_name = st.session_state.best_model['model_name'] | |
| scaler = joblib.load(st.session_state.best_model['scaler']) | |
| features_df = st.session_state.df.drop('target', axis=1) | |
| if best_model_name == 'MultinomialNB': | |
| features_scaled = self.model_manager.ensure_non_negative(features_df) | |
| features_scaled = scaler.transform(features_scaled) | |
| else: | |
| features_scaled = scaler.transform(features_df) | |
| scaled_df = pd.DataFrame( | |
| features_scaled, | |
| columns=features_df.columns, | |
| index=features_df.index | |
| ) | |
| scaled_df['target'] = st.session_state.df['target'] | |
| csv_scaled = scaled_df.to_csv(index=False) | |
| st.download_button( | |
| label="Download Scaled Dataset (CSV)", | |
| data=csv_scaled, | |
| file_name=f"synthetic_data_scaled_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv", | |
| mime='text/csv', | |
| help="Download the scaled dataset (using best model's scaler)" | |
| ) | |
| def display_dataset_statistics(self): | |
| """Display dataset statistics""" | |
| with st.expander("Dataset Statistics"): | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| st.write("Original Dataset Statistics:") | |
| st.write(st.session_state.df.describe()) | |
| with col2: | |
| if st.session_state.best_model and st.session_state.best_model['status'] == 'success': | |
| st.write("Scaled Dataset Statistics:") | |
| best_model_name = st.session_state.best_model['model_name'] | |
| scaler = joblib.load(st.session_state.best_model['scaler']) | |
| features_df = st.session_state.df.drop('target', axis=1) | |
| if best_model_name == 'MultinomialNB': | |
| features_scaled = self.model_manager.ensure_non_negative(features_df) | |
| features_scaled = scaler.transform(features_scaled) | |
| else: | |
| features_scaled = scaler.transform(features_df) | |
| scaled_df = pd.DataFrame( | |
| features_scaled, | |
| columns=features_df.columns, | |
| index=features_df.index | |
| ) | |
| scaled_df['target'] = st.session_state.df['target'] | |
| st.write(scaled_df.describe()) | |
| def display_learning_curves(self): | |
| """Display learning curves section""" | |
| st.subheader("Learning Curves") | |
| st.write(""" | |
| Learning curves show how model performance changes with increasing training data. | |
| - Blue line: Training score | |
| - Orange line: Cross-validation score | |
| - Shaded areas represent standard deviation | |
| """) | |
| if st.session_state.learning_curves_fig is not None: | |
| st.pyplot(st.session_state.learning_curves_fig) | |
| plt.close() | |
| def display_feature_visualization(self): | |
| """Display 2D and 3D feature visualizations""" | |
| st.subheader("Feature Visualization") | |
| plot_type = st.radio("Select plot type", ["2D Plot", "3D Plot"], index=1) | |
| if plot_type == "2D Plot": | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| x_feature = st.selectbox( | |
| "Select X-axis feature", | |
| st.session_state.features, | |
| index=0, | |
| key='x_2d' | |
| ) | |
| with col2: | |
| y_features = [f for f in st.session_state.features if f != x_feature] | |
| y_feature = st.selectbox( | |
| "Select Y-axis feature", | |
| y_features, | |
| index=0, | |
| key='y_2d' | |
| ) | |
| fig = px.scatter( | |
| st.session_state.df, | |
| x=x_feature, | |
| y=y_feature, | |
| color='target', | |
| title=f"2D Visualization of {x_feature} vs {y_feature}", | |
| labels={'target': 'Class'} | |
| ) | |
| st.plotly_chart(fig, use_container_width=True) | |
| else: # 3D Plot | |
| col1, col2, col3 = st.columns(3) | |
| with col1: | |
| x_feature = st.selectbox( | |
| "Select X-axis feature", | |
| st.session_state.features, | |
| index=0, | |
| key='x_3d' | |
| ) | |
| with col2: | |
| y_features = [f for f in st.session_state.features if f != x_feature] | |
| y_feature = st.selectbox( | |
| "Select Y-axis feature", | |
| y_features, | |
| index=0, | |
| key='y_3d' | |
| ) | |
| with col3: | |
| z_features = [f for f in st.session_state.features if f not in [x_feature, y_feature]] | |
| z_feature = st.selectbox( | |
| "Select Z-axis feature", | |
| z_features, | |
| index=0, | |
| key='z_3d' | |
| ) | |
| fig = px.scatter_3d( | |
| st.session_state.df, | |
| x=x_feature, | |
| y=y_feature, | |
| z=z_feature, | |
| color='target', | |
| title=f"3D Visualization of {x_feature} vs {y_feature} vs {z_feature}", | |
| labels={'target': 'Class'} | |
| ) | |
| fig.update_layout( | |
| scene = dict( | |
| xaxis_title=x_feature, | |
| yaxis_title=y_feature, | |
| zaxis_title=z_feature | |
| ), | |
| scene_camera=dict( | |
| up=dict(x=0, y=0, z=1), | |
| center=dict(x=0, y=0, z=0), | |
| eye=dict(x=1.5, y=1.5, z=1.5) | |
| ) | |
| ) | |
| st.plotly_chart(fig, use_container_width=True) | |
| def get_data_source(self): | |
| """Get user's choice of data source""" | |
| st.sidebar.header("Data Source") | |
| data_source = st.sidebar.radio( | |
| "Choose data source", | |
| ['Generate Synthetic Data', 'Upload Dataset'], | |
| key='data_source_radio' | |
| ) | |
| st.session_state.data_source = 'synthetic' if data_source == 'Generate Synthetic Data' else 'upload' | |
| return st.session_state.data_source | |
| def upload_dataset(self): | |
| """Handle dataset upload""" | |
| st.sidebar.header("Upload Dataset") | |
| uploaded_file = st.sidebar.file_uploader( | |
| "Choose a CSV file", | |
| type="csv", | |
| help="Upload a CSV file with features and target column" | |
| ) | |
| if uploaded_file is not None: | |
| try: | |
| df = pd.read_csv(uploaded_file) | |
| # Let user select target column | |
| target_col = st.sidebar.selectbox( | |
| "Select target column", | |
| df.columns.tolist() | |
| ) | |
| # Store features and target | |
| features = [col for col in df.columns if col != target_col] | |
| X = df[features] | |
| y = df[target_col] | |
| # Store in session state | |
| st.session_state.df = df | |
| st.session_state.features = features | |
| # Train test split | |
| test_size = st.sidebar.slider( | |
| "Test Size", | |
| min_value=10, | |
| max_value=50, | |
| value=30, | |
| step=5, | |
| format="%d%%", | |
| help="Percentage of data to use for testing" | |
| ) | |
| X_train, X_test, y_train, y_test = train_test_split( | |
| X, y, | |
| test_size=test_size/100, | |
| random_state=42 | |
| ) | |
| # Store split data | |
| st.session_state.X_train = X_train | |
| st.session_state.X_test = X_test | |
| st.session_state.y_train = y_train | |
| st.session_state.y_test = y_test | |
| # Store split information | |
| st.session_state.split_info = { | |
| 'total_samples': len(X), | |
| 'train_samples': len(X_train), | |
| 'test_samples': len(X_test), | |
| 'test_percentage': test_size | |
| } | |
| return True | |
| except Exception as e: | |
| st.sidebar.error(f"Error loading dataset: {str(e)}") | |
| return False | |
| return False | |
| def run(self): | |
| """Main application logic""" | |
| self.setup_page_config() | |
| self.initialize_session_state() | |
| st.title("ML Model Generator") | |
| # Get data source choice | |
| data_source = self.get_data_source() | |
| if data_source == 'synthetic': | |
| st.sidebar.header("Synthetic Data Generation") | |
| # Get inputs from sidebar for synthetic data | |
| features, feature_configs, classes = self.get_sidebar_inputs() | |
| class_configs = self.get_class_configs(classes, features) | |
| n_samples, test_size = self.get_training_params() | |
| # Store features in session state | |
| st.session_state.features = features | |
| # Generate Data button | |
| if st.sidebar.button("Generate Data and Train Models"): | |
| self.generate_and_train(n_samples, feature_configs, classes, class_configs, test_size) | |
| else: # upload | |
| # Handle dataset upload | |
| if self.upload_dataset(): | |
| if st.sidebar.button("Train Models"): | |
| # Get classifiers and train models | |
| classifiers = self.model_manager.get_classifiers() | |
| results = [] | |
| with st.spinner('Training models... Please wait.'): | |
| progress_bar = st.progress(0) | |
| for idx, (name, clf_dict) in enumerate(classifiers.items()): | |
| result = self.model_manager.train_and_evaluate_model( | |
| clf_dict, | |
| st.session_state.X_train, | |
| st.session_state.X_test, | |
| st.session_state.y_train, | |
| st.session_state.y_test, | |
| name | |
| ) | |
| results.append(result) | |
| progress_bar.progress((idx + 1) / len(classifiers)) | |
| st.session_state.model_results = results | |
| st.session_state.data_generated = True | |
| # Find best model | |
| successful_results = [r for r in results if r['status'] == 'success'] | |
| if successful_results: | |
| best_model = max(successful_results, key=lambda x: x['accuracy']) | |
| st.session_state.best_model = best_model | |
| # Generate static visualizations | |
| st.session_state.confusion_matrices_fig = self.visualizer.create_confusion_matrices_plot( | |
| successful_results, | |
| st.session_state.y_test | |
| ) | |
| st.session_state.learning_curves_fig = self.generate_learning_curves_figure(successful_results) | |
| # Display results if data has been generated/uploaded and trained | |
| if st.session_state.data_generated: | |
| self.display_dataset_info() | |
| self.display_data_samples() | |
| self.display_feature_visualization() | |
| self.display_download_section() | |
| self.display_dataset_statistics() | |
| self.display_best_model_performance() | |
| successful_df = self.display_model_comparison() | |
| if successful_df is not None and not successful_df.empty: | |
| self.display_performance_summary(successful_df) | |
| self.display_saved_models() | |
| self.display_learning_curves() | |
| self.display_confusion_matrices() | |
| else: | |
| if data_source == 'synthetic': | |
| st.info("Please generate data using the sidebar button to view visualizations and results.") | |
| else: | |
| st.info("Please upload a dataset and click 'Train Models' to view visualizations and results.") | |
| def main(): | |
| app = StreamlitUI() | |
| app.run() | |
| if __name__ == "__main__": | |
| main() | |