Spaces:
Sleeping
Sleeping
| import streamlit as st | |
| import pandas as pd | |
| import numpy as np | |
| import seaborn as sns | |
| import matplotlib.pyplot as plt | |
| from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier | |
| from sklearn.linear_model import LogisticRegression | |
| from sklearn.svm import SVC | |
| from sklearn.neural_network import MLPClassifier | |
| from sklearn.model_selection import train_test_split, GridSearchCV | |
| from sklearn.pipeline import Pipeline | |
| from sklearn.preprocessing import StandardScaler | |
| from sklearn.metrics import ( | |
| roc_auc_score, precision_score, recall_score, | |
| f1_score, confusion_matrix, accuracy_score | |
| ) | |
| # Configurazione della pagina | |
| st.set_page_config(page_title="APA - AI Parts Analyzer", page_icon="🔍", layout="wide") | |
| ######################################### | |
| # Definizione dei modelli Transformer dummy | |
| ######################################### | |
| class DummyTabTransformerClassifier: | |
| def __init__(self): | |
| # Placeholder per TabTransformer | |
| self.clf = MLPClassifier(hidden_layer_sizes=(100,), max_iter=500, random_state=42) | |
| def fit(self, X, y): | |
| self.clf.fit(X, y) | |
| return self | |
| def predict(self, X): | |
| return self.clf.predict(X) | |
| def predict_proba(self, X): | |
| return self.clf.predict_proba(X) | |
| class DummySAINTClassifier: | |
| def __init__(self): | |
| # Placeholder per SAINT | |
| self.clf = MLPClassifier(hidden_layer_sizes=(50,50), max_iter=500, random_state=42) | |
| def fit(self, X, y): | |
| self.clf.fit(X, y) | |
| return self | |
| def predict(self, X): | |
| return self.clf.predict(X) | |
| def predict_proba(self, X): | |
| return self.clf.predict_proba(X) | |
| # Dizionario dei modelli con alcune configurazioni iniziali | |
| MODELS = { | |
| "Random Forest": RandomForestClassifier(random_state=42, n_estimators=100), | |
| "Gradient Boosting": GradientBoostingClassifier(random_state=42, n_estimators=100), | |
| "Logistic Regression": LogisticRegression(random_state=42, max_iter=1000), | |
| "Support Vector Machine": SVC(probability=True, random_state=42), | |
| "TabTransformer": DummyTabTransformerClassifier(), | |
| "SAINT": DummySAINTClassifier() | |
| } | |
| def reset_app(): | |
| """Resetta lo stato dell'applicazione""" | |
| st.session_state.clear() | |
| st.success("Applicazione resettata. Puoi iniziare da capo.") | |
| def clean_dataset(data): | |
| """Pulizia iniziale del dataset: conversione di colonne 'object' in numerico se possibile.""" | |
| for col in data.columns: | |
| if data[col].dtype == "object": | |
| converted_col = pd.to_numeric(data[col], errors='coerce') | |
| if converted_col.notnull().mean() > 0.5: | |
| data[col] = converted_col | |
| st.write(f"Colonna '{col}' convertita in formato numerico.") | |
| return data | |
| def prepare_dataset(): | |
| """Fase di preparazione e esplorazione del dataset""" | |
| st.title("📊 Preparazione del Dataset") | |
| with st.expander("Configura il tuo dataset", expanded=True): | |
| data_option = st.radio("Scegli la fonte del dataset", | |
| ["Genera dati sintetici", "Carica un CSV"], | |
| horizontal=True) | |
| if data_option == "Genera dati sintetici": | |
| st.subheader("Generazione Dati Sintetici") | |
| n_samples = st.slider("Numero di campioni", 100, 2000, 500) | |
| np.random.seed(42) | |
| eta_uso = np.random.randint(0, 15, size=n_samples) | |
| frequenza_uso = np.random.randint(1, 24, size=n_samples) | |
| costo_riparazione = np.random.randint(50, 500, size=n_samples) | |
| valore_residuo = np.random.randint(100, 1000, size=n_samples) | |
| profittevole = [1 if vr - cr - (e * 10) > 0 else 0 | |
| for e, fr, cr, vr in zip(eta_uso, frequenza_uso, costo_riparazione, valore_residuo)] | |
| data = pd.DataFrame({ | |
| "eta_uso": eta_uso, | |
| "frequenza_uso": frequenza_uso, | |
| "costo_riparazione": costo_riparazione, | |
| "valore_residuo": valore_residuo, | |
| "Profittevole": profittevole | |
| }) | |
| else: | |
| uploaded_file = st.file_uploader("Carica un file CSV", type=['csv']) | |
| if uploaded_file is None: | |
| st.info("Carica un file CSV per procedere.") | |
| return None | |
| try: | |
| data = pd.read_csv(uploaded_file) | |
| except Exception as e: | |
| st.error(f"Errore nel caricamento del file: {e}") | |
| return None | |
| # Se il dataset ha una sola colonna e il primo record contiene il delimitatore ";", | |
| # si tenta di rileggerlo usando il separatore corretto. | |
| if data.shape[1] == 1 and data.iloc[0, 0].find(';') != -1: | |
| st.info("Il file sembra essere tabulato male. Rilettura con separatore ';'.") | |
| uploaded_file.seek(0) # Resetto il puntatore del file | |
| try: | |
| data = pd.read_csv(uploaded_file, sep=';', engine='python') | |
| except Exception as e: | |
| st.error(f"Errore nella riletura del file con il separatore ';': {e}") | |
| return None | |
| # Pulizia iniziale del dataset: conversione di colonne 'object' in numerico se possibile | |
| data = clean_dataset(data) | |
| st.subheader("Anteprima dei Dati") | |
| st.dataframe(data.head(10).style.background_gradient(cmap='Blues')) | |
| csv = data.to_csv(index=False) | |
| if data_option == "Genera dati sintetici": | |
| st.download_button("Scarica dati sintetici", csv, "synthetic_data.csv", "text/csv") | |
| else: | |
| st.download_button("Scarica il CSV", csv, "uploaded_data.csv", "text/csv") | |
| # Panoramica del dataset | |
| st.subheader("Panoramica del Dataset") | |
| col1, col2, col3 = st.columns(3) | |
| col1.metric("Campioni totali", len(data)) | |
| col2.metric("Features", len(data.columns) - 1) | |
| col3.metric("Campioni profittevoli", data['Profittevole'].sum()) | |
| # Heatmap delle correlazioni | |
| st.subheader("Correlazioni tra Features") | |
| numeric_cols = data.select_dtypes(include=[np.number]).columns | |
| fig, ax = plt.subplots(figsize=(10, 8)) | |
| sns.heatmap(data[numeric_cols].corr(), annot=True, cmap='viridis', ax=ax, linewidths=0.5) | |
| st.pyplot(fig) | |
| return data | |
| def train_models(data): | |
| """Fase di addestramento dei modelli""" | |
| st.title("🤖 Addestramento dei Modelli") | |
| # Preparazione dei dati | |
| X = data.drop(columns=['Profittevole']) | |
| y = data['Profittevole'] | |
| X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42) | |
| # Per abilitare il tuning dei modelli | |
| tune_models = st.checkbox("Abilita ottimizzazione dei modelli (GridSearchCV)") | |
| model_results = [] | |
| trained_pipelines = {} | |
| progress_bar = st.progress(0) | |
| for i, (name, model) in enumerate(MODELS.items()): | |
| with st.expander(f"{name}", expanded=False): | |
| # Preparazione del pipeline | |
| pipeline = Pipeline([ | |
| ('scaler', StandardScaler()), | |
| ('classifier', model) | |
| ]) | |
| # Tuning del modello se abilitato (solo per Random Forest in questo esempio) | |
| if tune_models: | |
| if name == "Random Forest": | |
| param_grid = { | |
| 'classifier__n_estimators': [50, 100, 200], | |
| 'classifier__max_depth': [None, 10, 20, 30], | |
| 'classifier__min_samples_split': [2, 5, 10] | |
| } | |
| grid_search = GridSearchCV(pipeline, param_grid, cv=3, scoring='accuracy') | |
| grid_search.fit(X_train, y_train) | |
| pipeline = grid_search.best_estimator_ | |
| st.write("Migliori parametri:", grid_search.best_params_) | |
| else: | |
| pipeline.fit(X_train, y_train) | |
| else: | |
| pipeline.fit(X_train, y_train) | |
| # Predizione e valutazione | |
| y_pred = pipeline.predict(X_test) | |
| y_pred_proba = pipeline.predict_proba(X_test)[:, 1] | |
| # Calcolo delle metriche | |
| results = { | |
| 'Model': name, | |
| 'Accuracy': accuracy_score(y_test, y_pred), | |
| 'AUC-ROC': roc_auc_score(y_test, y_pred_proba), | |
| 'Precision': precision_score(y_test, y_pred), | |
| 'Recall': recall_score(y_test, y_pred), | |
| 'F1 Score': f1_score(y_test, y_pred) | |
| } | |
| model_results.append(results) | |
| trained_pipelines[name] = pipeline | |
| # Visualizzazione delle metriche | |
| col1, col2, col3, col4, col5 = st.columns(5) | |
| col1.metric("Accuracy", f"{results['Accuracy']:.2%}") | |
| col2.metric("AUC-ROC", f"{results['AUC-ROC']:.2f}") | |
| col3.metric("Precision", f"{results['Precision']:.2f}") | |
| col4.metric("Recall", f"{results['Recall']:.2f}") | |
| col5.metric("F1 Score", f"{results['F1 Score']:.2f}") | |
| # Matrice di confusione | |
| cm = confusion_matrix(y_test, y_pred) | |
| fig, ax = plt.subplots(figsize=(5, 4)) | |
| sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', ax=ax) | |
| ax.set_title(f'Confusion Matrix - {name}') | |
| ax.set_xlabel('Predicted') | |
| ax.set_ylabel('Actual') | |
| st.pyplot(fig) | |
| progress_bar.progress((i + 1) / len(MODELS)) | |
| # Confronto dei modelli | |
| st.subheader("Confronto Modelli") | |
| results_df = pd.DataFrame(model_results) | |
| st.dataframe(results_df.style.background_gradient(cmap='Blues').highlight_max(axis=0)) | |
| return results_df, trained_pipelines | |
| def model_inference(trained_pipelines, data): | |
| """Fase di inferenza del modello""" | |
| st.title("🔮 Inferenza del Modello") | |
| # Rimozione della colonna target per l'input | |
| input_features = data.drop(columns=['Profittevole']).columns | |
| with st.form(key='inference_form'): | |
| st.subheader("Inserisci i dati del nuovo campione") | |
| # Input in due colonne per migliorare l'impaginazione | |
| input_values = {} | |
| cols = st.columns(2) | |
| for i, feature in enumerate(input_features): | |
| with cols[i % 2]: | |
| input_values[feature] = st.number_input( | |
| f"{feature}", | |
| value=float(data[feature].median()), | |
| step=1.0, | |
| key=f"input_{feature}" | |
| ) | |
| # Selezione multipla dei modelli | |
| selected_models = st.multiselect( | |
| "Scegli i modelli per l'inferenza", | |
| list(trained_pipelines.keys()), | |
| default=list(trained_pipelines.keys()) | |
| ) | |
| submit_button = st.form_submit_button(label="Prevedi Profittabilità") | |
| if submit_button and selected_models: | |
| input_df = pd.DataFrame([input_values]) | |
| st.subheader("Risultati della Previsione") | |
| results = [] | |
| for model_name in selected_models: | |
| pipeline = trained_pipelines[model_name] | |
| prediction = pipeline.predict(input_df)[0] | |
| proba = pipeline.predict_proba(input_df)[0] | |
| results.append({ | |
| "Modello": model_name, | |
| "Previsione": "Profittevole" if prediction == 1 else "Non Profittevole", | |
| "Prob. Profit": f"{proba[1]:.2%}", | |
| "Prob. Non Profit": f"{proba[0]:.2%}" | |
| }) | |
| if prediction == 1: | |
| st.success(f"✅ {model_name}: Il componente è PROFITTEVOLE (Prob: {proba[1]:.2%})") | |
| else: | |
| st.error(f"❌ {model_name}: Il componente NON è PROFITTEVOLE (Prob: {proba[0]:.2%})") | |
| # Tabella di confronto | |
| st.subheader("Confronto tra Modelli") | |
| st.dataframe(pd.DataFrame(results).style.background_gradient(cmap='Blues')) | |
| def main(): | |
| """Funzione principale dell'applicazione""" | |
| # Sidebar con opzioni di navigazione | |
| st.sidebar.title("🔍 APA - AI Parts Analyzer") | |
| # Opzioni di navigazione | |
| phase = st.sidebar.radio( | |
| "Fase dell'analisi", | |
| [ | |
| "📊 1. Preparazione Dataset", | |
| "🤖 2. Addestramento Modelli", | |
| "🔮 3. Inferenza Modello" | |
| ], | |
| label_visibility="collapsed" | |
| ) | |
| # Pulsante di reset | |
| st.sidebar.button("🔄 Azzera Applicazione", on_click=reset_app) | |
| # Logica di navigazione | |
| if "1. Preparazione Dataset" in phase: | |
| st.session_state.data = prepare_dataset() | |
| elif "2. Addestramento Modelli" in phase: | |
| if hasattr(st.session_state, 'data') and st.session_state.data is not None: | |
| st.session_state.model_results, st.session_state.trained_pipelines = train_models(st.session_state.data) | |
| else: | |
| st.error("⚠️ Prepara prima un dataset!") | |
| elif "3. Inferenza Modello" in phase: | |
| if hasattr(st.session_state, 'trained_pipelines') and hasattr(st.session_state, 'data'): | |
| model_inference(st.session_state.trained_pipelines, st.session_state.data) | |
| else: | |
| st.error("⚠️ Addestra prima i modelli!") | |
| # Punto di ingresso dell'applicazione | |
| if __name__ == "__main__": | |
| main() | |