Spaces:
Sleeping
Sleeping
| import streamlit as st | |
| import pandas as pd | |
| import matplotlib.pyplot as plt | |
| import numpy as np | |
| from NoCodeTextClassifier.EDA import Informations, Visualizations | |
| from sklearn.feature_extraction.text import TfidfVectorizer, CountVectorizer | |
| from NoCodeTextClassifier.preprocessing import process, TextCleaner, Vectorization | |
| from NoCodeTextClassifier.models import Models | |
| import os | |
| import pickle | |
| import hashlib | |
| import hmac | |
| from sklearn.metrics import accuracy_score, classification_report, confusion_matrix | |
| # Authentication Configuration | |
| USERS = { | |
| "admin": "admin123", | |
| "user1": "password123", | |
| "demo": "demo123" | |
| } | |
| def check_password(): | |
| """Returns True if the user has correct password.""" | |
| def password_entered(): | |
| """Checks whether a password entered by the user is correct.""" | |
| username = st.session_state["username"] | |
| password = st.session_state["password"] | |
| if username in USERS and hmac.compare_digest(USERS[username], password): | |
| st.session_state["password_correct"] = True | |
| st.session_state["authenticated_user"] = username | |
| del st.session_state["password"] # Don't store passwords | |
| else: | |
| st.session_state["password_correct"] = False | |
| # Return True if password is validated | |
| if st.session_state.get("password_correct", False): | |
| return True | |
| # Show login form | |
| st.markdown("## 🔐 Login Required") | |
| st.markdown("Please enter your credentials to access the Text Classification App") | |
| col1, col2, col3 = st.columns([1, 2, 1]) | |
| with col2: | |
| st.text_input("Username", key="username", placeholder="Enter username") | |
| st.text_input("Password", type="password", key="password", placeholder="Enter password") | |
| if st.button("Login", use_container_width=True): | |
| password_entered() | |
| # Show demo credentials | |
| with st.expander("Demo Credentials"): | |
| st.info(""" | |
| **Demo Account:** | |
| - Username: `demo` | |
| - Password: `demo123` | |
| **Admin Account:** | |
| - Username: `admin` | |
| - Password: `admin123` | |
| """) | |
| if st.session_state.get("password_correct", False) == False: | |
| st.error("😞 Username or password incorrect") | |
| return False | |
| # Utility functions | |
| def save_artifacts(obj, folder_name, file_name): | |
| """Save artifacts like encoders and vectorizers""" | |
| try: | |
| os.makedirs(folder_name, exist_ok=True) | |
| with open(os.path.join(folder_name, file_name), 'wb') as f: | |
| pickle.dump(obj, f) | |
| return True | |
| except Exception as e: | |
| st.error(f"Error saving {file_name}: {str(e)}") | |
| return False | |
| def load_artifacts(folder_name, file_name): | |
| """Load saved artifacts""" | |
| try: | |
| with open(os.path.join(folder_name, file_name), 'rb') as f: | |
| return pickle.load(f) | |
| except FileNotFoundError: | |
| st.warning(f"File {file_name} not found in {folder_name} folder") | |
| return None | |
| except Exception as e: | |
| st.error(f"Error loading {file_name}: {str(e)}") | |
| return None | |
| def load_model(model_name): | |
| """Load trained model""" | |
| try: | |
| with open(os.path.join('models', model_name), 'rb') as f: | |
| return pickle.load(f) | |
| except FileNotFoundError: | |
| st.error(f"Model {model_name} not found. Please train a model first.") | |
| return None | |
| except Exception as e: | |
| st.error(f"Error loading model: {str(e)}") | |
| return None | |
| def safe_file_upload(uploaded_file, encoding='utf-8'): | |
| """Safely read uploaded file with multiple encoding attempts""" | |
| if uploaded_file is None: | |
| return None | |
| encodings_to_try = [encoding, 'latin1', 'cp1252', 'iso-8859-1'] | |
| for enc in encodings_to_try: | |
| try: | |
| # Reset file pointer | |
| uploaded_file.seek(0) | |
| df = pd.read_csv(uploaded_file, encoding=enc) | |
| st.success(f"File loaded successfully with {enc} encoding") | |
| return df | |
| except UnicodeDecodeError: | |
| continue | |
| except Exception as e: | |
| st.error(f"Error reading file with {enc}: {str(e)}") | |
| continue | |
| st.error("Could not read file with any common encoding. Please check your file format.") | |
| return None | |
| def predict_text(model_name, text, vectorizer_type="tfidf"): | |
| """Make prediction on new text""" | |
| try: | |
| # Load model | |
| model = load_model(model_name) | |
| if model is None: | |
| return None, None | |
| # Load vectorizer | |
| vectorizer_file = f"{vectorizer_type}_vectorizer.pkl" | |
| vectorizer = load_artifacts("artifacts", vectorizer_file) | |
| if vectorizer is None: | |
| return None, None | |
| # Load label encoder | |
| encoder = load_artifacts("artifacts", "encoder.pkl") | |
| if encoder is None: | |
| return None, None | |
| # Clean and vectorize text | |
| text_cleaner = TextCleaner() | |
| clean_text = text_cleaner.clean_text(text) | |
| # Transform text using the same vectorizer used during training | |
| text_vector = vectorizer.transform([clean_text]) | |
| # Make prediction | |
| prediction = model.predict(text_vector) | |
| prediction_proba = None | |
| # Get prediction probabilities if available | |
| if hasattr(model, 'predict_proba'): | |
| try: | |
| prediction_proba = model.predict_proba(text_vector)[0] | |
| except: | |
| pass | |
| # Decode prediction | |
| predicted_label = encoder.inverse_transform(prediction)[0] | |
| return predicted_label, prediction_proba | |
| except Exception as e: | |
| st.error(f"Error during prediction: {str(e)}") | |
| return None, None | |
| # Main App Logic | |
| def main_app(): | |
| # Header with user info | |
| col1, col2 = st.columns([3, 1]) | |
| with col1: | |
| st.title('🤖 No Code Text Classification App') | |
| st.write('Understand the behavior of your text data and train a model to classify the text data') | |
| with col2: | |
| st.markdown(f"**👤 User:** {st.session_state.get('authenticated_user', 'Unknown')}") | |
| if st.button("Logout", type="secondary"): | |
| for key in list(st.session_state.keys()): | |
| del st.session_state[key] | |
| st.rerun() | |
| # Sidebar | |
| section = st.sidebar.radio("Choose Section", ["📊 Data Analysis", "🚀 Train Model", "🔮 Predictions"]) | |
| # Upload Data with improved error handling | |
| st.sidebar.subheader("📁 Upload Your Dataset") | |
| # File encoding selection | |
| encoding_choice = st.sidebar.selectbox( | |
| "File Encoding", | |
| ["utf-8", "latin1", "cp1252", "iso-8859-1"], | |
| help="If file upload fails, try different encodings" | |
| ) | |
| train_data = st.sidebar.file_uploader( | |
| "Upload training data", | |
| type=["csv"], | |
| help="Upload a CSV file with your training data" | |
| ) | |
| test_data = st.sidebar.file_uploader( | |
| "Upload test data (optional)", | |
| type=["csv"], | |
| help="Optional: Upload separate test data" | |
| ) | |
| # Global variables to store data and settings | |
| if 'vectorizer_type' not in st.session_state: | |
| st.session_state.vectorizer_type = "tfidf" | |
| train_df = None | |
| test_df = None | |
| info = None | |
| if train_data is not None: | |
| with st.spinner("Loading training data..."): | |
| train_df = safe_file_upload(train_data, encoding_choice) | |
| if train_df is not None: | |
| try: | |
| if test_data is not None: | |
| test_df = safe_file_upload(test_data, encoding_choice) | |
| st.sidebar.success(f"✅ Training data loaded: {train_df.shape[0]} rows, {train_df.shape[1]} columns") | |
| st.write("📋 Training Data Preview:") | |
| st.dataframe(train_df.head(3), use_container_width=True) | |
| columns = train_df.columns.tolist() | |
| text_data = st.sidebar.selectbox("📝 Choose the text column:", columns) | |
| target = st.sidebar.selectbox("🎯 Choose the target column:", columns) | |
| # Process data | |
| if text_data and target and text_data != target: | |
| with st.spinner("Processing data..."): | |
| info = Informations(train_df, text_data, target) | |
| train_df['clean_text'] = info.clean_text() | |
| train_df['text_length'] = info.text_length() | |
| # Handle label encoding manually if the class doesn't store encoder | |
| from sklearn.preprocessing import LabelEncoder | |
| label_encoder = LabelEncoder() | |
| train_df['target'] = label_encoder.fit_transform(train_df[target]) | |
| # Save label encoder for later use | |
| if save_artifacts(label_encoder, "artifacts", "encoder.pkl"): | |
| st.sidebar.success("✅ Data processed successfully") | |
| else: | |
| st.sidebar.warning("Please select different columns for text and target") | |
| except Exception as e: | |
| st.error(f"❌ Error processing data: {str(e)}") | |
| train_df = None | |
| info = None | |
| # Data Analysis Section | |
| if section == "📊 Data Analysis": | |
| st.header("📊 Data Analysis & Insights") | |
| if train_data is not None and train_df is not None and info is not None: | |
| try: | |
| # Create tabs for better organization | |
| tab1, tab2, tab3 = st.tabs(["📈 Basic Stats", "📝 Text Analysis", "📊 Visualizations"]) | |
| with tab1: | |
| col1, col2, col3 = st.columns(3) | |
| with col1: | |
| st.metric("📊 Data Shape", f"{info.shape()[0]} x {info.shape()[1]}") | |
| with col2: | |
| imbalance_info = info.class_imbalanced() | |
| st.metric("⚖️ Class Balance", "Balanced" if not imbalance_info else "Imbalanced") | |
| with col3: | |
| missing_info = info.missing_values() | |
| total_missing = sum(missing_info.values()) if isinstance(missing_info, dict) else 0 | |
| st.metric("❌ Missing Values", str(total_missing)) | |
| st.subheader("📋 Processed Data Preview") | |
| st.dataframe(train_df[['clean_text', 'text_length', 'target']].head(), use_container_width=True) | |
| with tab2: | |
| st.subheader("📏 Text Length Analysis") | |
| text_analysis = info.analysis_text_length('text_length') | |
| # Display stats in a nice format | |
| stats_col1, stats_col2 = st.columns(2) | |
| with stats_col1: | |
| st.json(text_analysis) | |
| with stats_col2: | |
| correlation = train_df[['text_length', 'target']].corr().iloc[0, 1] | |
| st.metric("🔗 Text Length-Target Correlation", f"{correlation:.4f}") | |
| with tab3: | |
| st.subheader("📊 Data Visualizations") | |
| vis = Visualizations(train_df, text_data, target) | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| st.write("**Class Distribution**") | |
| vis.class_distribution() | |
| with col2: | |
| st.write("**Text Length Distribution**") | |
| vis.text_length_distribution() | |
| except Exception as e: | |
| st.error(f"❌ Error in data analysis: {str(e)}") | |
| else: | |
| st.info("👆 Please upload training data in the sidebar to get insights") | |
| # Train Model Section | |
| elif section == "🚀 Train Model": | |
| st.header("🚀 Train Classification Model") | |
| if train_data is not None and train_df is not None: | |
| try: | |
| # Create two columns for model selection | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| st.subheader("🤖 Choose Model") | |
| model = st.radio("Select Algorithm:", [ | |
| "Logistic Regression", "Decision Tree", | |
| "Random Forest", "Linear SVC", "SVC", | |
| "Multinomial Naive Bayes", "Gaussian Naive Bayes" | |
| ]) | |
| with col2: | |
| st.subheader("🔤 Choose Vectorizer") | |
| vectorizer_choice = st.radio("Select Vectorizer:", ["Tfidf Vectorizer", "Count Vectorizer"]) | |
| # Initialize vectorizer | |
| if vectorizer_choice == "Tfidf Vectorizer": | |
| vectorizer = TfidfVectorizer(max_features=10000) | |
| st.session_state.vectorizer_type = "tfidf" | |
| else: | |
| vectorizer = CountVectorizer(max_features=10000) | |
| st.session_state.vectorizer_type = "count" | |
| st.subheader("📋 Training Data Preview") | |
| st.dataframe(train_df[['clean_text', 'target']].head(3), use_container_width=True) | |
| # Vectorize text data | |
| with st.spinner("Preparing data..."): | |
| X = vectorizer.fit_transform(train_df['clean_text']) | |
| y = train_df['target'] | |
| # Split data | |
| X_train, X_test, y_train, y_test = process.split_data(X, y) | |
| st.success(f"✅ Data prepared - Train: {X_train.shape}, Test: {X_test.shape}") | |
| # Save vectorizer for later use | |
| vectorizer_filename = f"{st.session_state.vectorizer_type}_vectorizer.pkl" | |
| save_artifacts(vectorizer, "artifacts", vectorizer_filename) | |
| if st.button("🚀 Start Training", type="primary", use_container_width=True): | |
| progress_bar = st.progress(0) | |
| status_text = st.empty() | |
| with st.spinner(f"Training {model} model..."): | |
| status_text.text("Initializing model...") | |
| progress_bar.progress(20) | |
| models = Models(X_train=X_train, X_test=X_test, y_train=y_train, y_test=y_test) | |
| status_text.text("Training in progress...") | |
| progress_bar.progress(50) | |
| # Train selected model | |
| if model == "Logistic Regression": | |
| models.LogisticRegression() | |
| elif model == "Decision Tree": | |
| models.DecisionTree() | |
| elif model == "Linear SVC": | |
| models.LinearSVC() | |
| elif model == "SVC": | |
| models.SVC() | |
| elif model == "Multinomial Naive Bayes": | |
| models.MultinomialNB() | |
| elif model == "Random Forest": | |
| models.RandomForestClassifier() | |
| elif model == "Gaussian Naive Bayes": | |
| models.GaussianNB() | |
| progress_bar.progress(100) | |
| status_text.text("Training completed!") | |
| st.success("🎉 Model training completed successfully!") | |
| st.balloons() | |
| st.info("💡 You can now use the 'Predictions' section to classify new text.") | |
| except Exception as e: | |
| st.error(f"❌ Error in model training: {str(e)}") | |
| st.exception(e) | |
| else: | |
| st.info("👆 Please upload training data in the sidebar to train a model") | |
| # Predictions Section | |
| elif section == "🔮 Predictions": | |
| st.header("🔮 Text Classification Predictions") | |
| # Check if models exist | |
| if os.path.exists("models") and os.listdir("models"): | |
| tab1, tab2 = st.tabs(["🎯 Single Prediction", "📊 Batch Predictions"]) | |
| with tab1: | |
| st.subheader("🎯 Classify Single Text") | |
| # Text input for prediction | |
| text_input = st.text_area("Enter the text to classify:", height=100, placeholder="Type or paste your text here...") | |
| # Model selection | |
| available_models = [f for f in os.listdir("models") if f.endswith('.pkl')] | |
| if available_models: | |
| selected_model = st.selectbox("🤖 Choose the trained model:", available_models) | |
| # Prediction button | |
| if st.button("🔮 Predict", key="single_predict", type="primary"): | |
| if text_input.strip(): | |
| with st.spinner("Making prediction..."): | |
| predicted_label, prediction_proba = predict_text( | |
| selected_model, | |
| text_input, | |
| st.session_state.get('vectorizer_type', 'tfidf') | |
| ) | |
| if predicted_label is not None: | |
| st.success("🎉 Prediction completed!") | |
| # Display results | |
| st.markdown("### 📋 Prediction Results") | |
| # Create result container | |
| result_container = st.container() | |
| with result_container: | |
| st.markdown(f"**📝 Input Text:** {text_input}") | |
| st.markdown(f"**🏷️ Predicted Class:** `{predicted_label}`") | |
| # Display probabilities if available | |
| if prediction_proba is not None: | |
| st.markdown("**📊 Class Probabilities:**") | |
| # Load encoder to get class names | |
| encoder = load_artifacts("artifacts", "encoder.pkl") | |
| if encoder is not None: | |
| classes = encoder.classes_ | |
| prob_df = pd.DataFrame({ | |
| 'Class': classes, | |
| 'Probability': prediction_proba | |
| }).sort_values('Probability', ascending=False) | |
| st.bar_chart(prob_df.set_index('Class')) | |
| st.dataframe(prob_df, use_container_width=True) | |
| else: | |
| st.warning("⚠️ Please enter some text to classify") | |
| else: | |
| st.warning("⚠️ No trained models found. Please train a model first.") | |
| with tab2: | |
| st.subheader("📊 Batch Classification") | |
| uploaded_file = st.file_uploader( | |
| "Upload a CSV file with text to classify", | |
| type=['csv'], | |
| help="Upload a CSV file containing text data for batch classification" | |
| ) | |
| if uploaded_file is not None: | |
| try: | |
| batch_df = safe_file_upload(uploaded_file) | |
| if batch_df is not None: | |
| st.write("📋 Uploaded data preview:") | |
| st.dataframe(batch_df.head(), use_container_width=True) | |
| # Select text column | |
| text_column = st.selectbox("📝 Select the text column:", batch_df.columns.tolist()) | |
| available_models = [f for f in os.listdir("models") if f.endswith('.pkl')] | |
| batch_model = st.selectbox("🤖 Choose model for batch prediction:", available_models, key="batch_model") | |
| if st.button("🚀 Run Batch Predictions", key="batch_predict", type="primary"): | |
| progress_bar = st.progress(0) | |
| status_text = st.empty() | |
| with st.spinner("Processing batch predictions..."): | |
| predictions = [] | |
| total_texts = len(batch_df) | |
| for i, text in enumerate(batch_df[text_column]): | |
| status_text.text(f"Processing {i+1}/{total_texts} texts...") | |
| progress_bar.progress((i+1)/total_texts) | |
| pred, _ = predict_text( | |
| batch_model, | |
| str(text), | |
| st.session_state.get('vectorizer_type', 'tfidf') | |
| ) | |
| predictions.append(pred if pred is not None else "Error") | |
| batch_df['Predicted_Class'] = predictions | |
| st.success("🎉 Batch predictions completed!") | |
| st.write("📊 Results:") | |
| st.dataframe(batch_df[[text_column, 'Predicted_Class']], use_container_width=True) | |
| # Download results | |
| csv = batch_df.to_csv(index=False) | |
| st.download_button( | |
| label="📥 Download predictions as CSV", | |
| data=csv, | |
| file_name="batch_predictions.csv", | |
| mime="text/csv", | |
| type="primary" | |
| ) | |
| except Exception as e: | |
| st.error(f"❌ Error in batch prediction: {str(e)}") | |
| else: | |
| st.info("⚠️ No trained models found. Please go to 'Train Model' section to train a model first.") | |
| # Main execution | |
| def main(): | |
| # Page config | |
| st.set_page_config( | |
| page_title="Text Classification App", | |
| page_icon="🤖", | |
| layout="wide", | |
| initial_sidebar_state="expanded" | |
| ) | |
| # Custom CSS for better styling | |
| st.markdown(""" | |
| <style> | |
| .main { | |
| padding-top: 1rem; | |
| } | |
| .stAlert { | |
| margin-top: 1rem; | |
| } | |
| .metric-container { | |
| background-color: #f0f2f6; | |
| padding: 1rem; | |
| border-radius: 0.5rem; | |
| margin: 0.5rem 0; | |
| } | |
| </style> | |
| """, unsafe_allow_html=True) | |
| # Check authentication | |
| if check_password(): | |
| main_app() | |
| if __name__ == "__main__": | |
| main() |