Spaces:
Sleeping
Sleeping
| import streamlit as st | |
| import pandas as pd | |
| import matplotlib.pyplot as plt | |
| import numpy as np | |
| from NoCodeTextClassifier.EDA import Informations, Visualizations | |
| from sklearn.feature_extraction.text import TfidfVectorizer, CountVectorizer | |
| from NoCodeTextClassifier.preprocessing import process, TextCleaner, Vectorization | |
| from NoCodeTextClassifier.models import Models | |
| import os | |
| import pickle | |
| import io | |
| import base64 | |
| from sklearn.metrics import accuracy_score, classification_report, confusion_matrix | |
| from sklearn.preprocessing import LabelEncoder | |
| # Configure page | |
| st.set_page_config(page_title="Text Classifier", page_icon="π", layout="wide") | |
| # Utility functions | |
| def safe_read_csv(uploaded_file, encoding_options=['utf-8', 'latin1', 'iso-8859-1', 'cp1252']): | |
| """Safely read CSV with multiple encoding attempts""" | |
| if uploaded_file is None: | |
| return None | |
| # Reset file pointer | |
| uploaded_file.seek(0) | |
| for encoding in encoding_options: | |
| try: | |
| # Read the file content as bytes | |
| bytes_data = uploaded_file.read() | |
| # Convert bytes to string with the current encoding | |
| string_data = bytes_data.decode(encoding) | |
| # Use StringIO to create a file-like object | |
| df = pd.read_csv(io.StringIO(string_data)) | |
| st.success(f"File loaded successfully with {encoding} encoding") | |
| return df | |
| except (UnicodeDecodeError, pd.errors.EmptyDataError, pd.errors.ParserError) as e: | |
| st.warning(f"Failed to read with {encoding} encoding: {str(e)}") | |
| continue | |
| except Exception as e: | |
| st.error(f"Unexpected error with {encoding} encoding: {str(e)}") | |
| continue | |
| st.error("Failed to read the file with any supported encoding") | |
| return None | |
| def create_sample_data(): | |
| """Create sample data for testing""" | |
| sample_data = { | |
| 'text': [ | |
| "I love this product, it's amazing!", | |
| "This is the worst thing I've ever bought", | |
| "Great quality and fast delivery", | |
| "Terrible customer service, very disappointed", | |
| "Excellent value for money", | |
| "Poor quality, broke after one day", | |
| "Highly recommend this to everyone", | |
| "Waste of money, don't buy this" | |
| ], | |
| 'sentiment': ['positive', 'negative', 'positive', 'negative', 'positive', 'negative', 'positive', 'negative'] | |
| } | |
| return pd.DataFrame(sample_data) | |
| def save_artifacts(obj, folder_name, file_name): | |
| """Save artifacts like encoders and vectorizers""" | |
| try: | |
| os.makedirs(folder_name, exist_ok=True) | |
| with open(os.path.join(folder_name, file_name), 'wb') as f: | |
| pickle.dump(obj, f) | |
| return True | |
| except Exception as e: | |
| st.error(f"Error saving {file_name}: {str(e)}") | |
| return False | |
| def load_artifacts(folder_name, file_name): | |
| """Load saved artifacts""" | |
| try: | |
| with open(os.path.join(folder_name, file_name), 'rb') as f: | |
| return pickle.load(f) | |
| except FileNotFoundError: | |
| st.error(f"File {file_name} not found in {folder_name} folder") | |
| return None | |
| except Exception as e: | |
| st.error(f"Error loading {file_name}: {str(e)}") | |
| return None | |
| def load_model(model_name): | |
| """Load trained model""" | |
| try: | |
| with open(os.path.join('models', model_name), 'rb') as f: | |
| return pickle.load(f) | |
| except FileNotFoundError: | |
| st.error(f"Model {model_name} not found. Please train a model first.") | |
| return None | |
| except Exception as e: | |
| st.error(f"Error loading model: {str(e)}") | |
| return None | |
| def predict_text(model_name, text, vectorizer_type="tfidf"): | |
| """Make prediction on new text""" | |
| try: | |
| # Load model | |
| model = load_model(model_name) | |
| if model is None: | |
| return None, None | |
| # Load vectorizer | |
| vectorizer_file = f"{vectorizer_type}_vectorizer.pkl" | |
| vectorizer = load_artifacts("artifacts", vectorizer_file) | |
| if vectorizer is None: | |
| return None, None | |
| # Load label encoder | |
| encoder = load_artifacts("artifacts", "encoder.pkl") | |
| if encoder is None: | |
| return None, None | |
| # Clean and vectorize text | |
| text_cleaner = TextCleaner() | |
| clean_text = text_cleaner.clean_text(text) | |
| # Transform text using the same vectorizer used during training | |
| text_vector = vectorizer.transform([clean_text]) | |
| # Make prediction | |
| prediction = model.predict(text_vector) | |
| prediction_proba = None | |
| # Get prediction probabilities if available | |
| if hasattr(model, 'predict_proba'): | |
| try: | |
| prediction_proba = model.predict_proba(text_vector)[0] | |
| except: | |
| pass | |
| # Decode prediction | |
| predicted_label = encoder.inverse_transform(prediction)[0] | |
| return predicted_label, prediction_proba | |
| except Exception as e: | |
| st.error(f"Error during prediction: {str(e)}") | |
| return None, None | |
| def download_sample_csv(): | |
| """Generate sample CSV for download""" | |
| sample_df = create_sample_data() | |
| csv = sample_df.to_csv(index=False) | |
| b64 = base64.b64encode(csv.encode()).decode() | |
| href = f'<a href="data:file/csv;base64,{b64}" download="sample_data.csv">Download Sample CSV</a>' | |
| return href | |
| # Main App | |
| st.title('π No Code Text Classification App') | |
| st.markdown('---') | |
| st.write('Understand the behavior of your text data and train a model to classify the text data') | |
| # Initialize session state | |
| if 'vectorizer_type' not in st.session_state: | |
| st.session_state.vectorizer_type = "tfidf" | |
| if 'train_df' not in st.session_state: | |
| st.session_state.train_df = None | |
| # Sidebar | |
| st.sidebar.title("Navigation") | |
| section = st.sidebar.radio("Choose Section", ["π Data Analysis", "π§ Train Model", "π― Predictions"]) | |
| # Data Upload Section | |
| st.sidebar.markdown("---") | |
| st.sidebar.subheader("π Data Upload") | |
| # Option to use sample data | |
| if st.sidebar.button("Use Sample Data"): | |
| st.session_state.train_df = create_sample_data() | |
| st.sidebar.success("Sample data loaded!") | |
| # Sample data download | |
| st.sidebar.markdown("**Download Sample Data:**") | |
| st.sidebar.markdown(download_sample_csv(), unsafe_allow_html=True) | |
| st.sidebar.markdown("**Or upload your own data:**") | |
| # File upload with better error handling | |
| train_data = st.sidebar.file_uploader( | |
| "Upload training data", | |
| type=["csv"], | |
| help="Upload a CSV file with text and target columns" | |
| ) | |
| test_data = st.sidebar.file_uploader( | |
| "Upload test data (optional)", | |
| type=["csv"], | |
| help="Optional: Upload separate test data" | |
| ) | |
| # Alternative text input method | |
| st.sidebar.markdown("**Or paste CSV data:**") | |
| if st.sidebar.checkbox("Enter data manually"): | |
| csv_text = st.sidebar.text_area( | |
| "Paste CSV data here:", | |
| height=100, | |
| placeholder="text,sentiment\n\"Great product!\",positive\n\"Poor quality\",negative" | |
| ) | |
| if csv_text and st.sidebar.button("Load from text"): | |
| try: | |
| train_df = pd.read_csv(io.StringIO(csv_text)) | |
| st.session_state.train_df = train_df | |
| st.sidebar.success("Data loaded from text!") | |
| except Exception as e: | |
| st.sidebar.error(f"Error parsing CSV text: {str(e)}") | |
| # Load data | |
| train_df = None | |
| test_df = None | |
| # Try to load from uploaded file first | |
| if train_data is not None: | |
| train_df = safe_read_csv(train_data) | |
| if train_df is not None: | |
| st.session_state.train_df = train_df | |
| # Use session state data if available | |
| if st.session_state.train_df is not None: | |
| train_df = st.session_state.train_df | |
| if test_data is not None: | |
| test_df = safe_read_csv(test_data) | |
| # Process data if available | |
| if train_df is not None: | |
| try: | |
| st.sidebar.success("β Training data loaded successfully!") | |
| # Show data info in sidebar | |
| st.sidebar.write(f"**Rows:** {len(train_df)}") | |
| st.sidebar.write(f"**Columns:** {len(train_df.columns)}") | |
| with st.expander("π Data Preview", expanded=False): | |
| st.write("**Training Data Preview:**") | |
| st.dataframe(train_df.head()) | |
| columns = train_df.columns.tolist() | |
| # Column selection with validation | |
| if len(columns) >= 2: | |
| text_data = st.sidebar.selectbox("Choose the text column:", columns, index=0) | |
| # Default to second column for target, or first if same as text | |
| target_default = 1 if len(columns) > 1 and columns[1] != text_data else 0 | |
| target = st.sidebar.selectbox("Choose the target column:", columns, index=target_default) | |
| if text_data == target: | |
| st.sidebar.error("Text and target columns must be different!") | |
| st.stop() | |
| else: | |
| st.sidebar.error("Data must have at least 2 columns (text and target)") | |
| st.stop() | |
| # Process data | |
| try: | |
| info = Informations(train_df, text_data, target) | |
| train_df['clean_text'] = info.clean_text() | |
| train_df['text_length'] = info.text_length() | |
| # Handle label encoding | |
| label_encoder = LabelEncoder() | |
| train_df['target'] = label_encoder.fit_transform(train_df[target]) | |
| # Save label encoder | |
| save_artifacts(label_encoder, "artifacts", "encoder.pkl") | |
| except Exception as e: | |
| st.error(f"Error processing data: {str(e)}") | |
| st.stop() | |
| except Exception as e: | |
| st.error(f"Error loading data: {str(e)}") | |
| train_df = None | |
| # Main Content Based on Section | |
| if section == "π Data Analysis": | |
| if train_df is not None: | |
| try: | |
| st.header("π Data Analysis & Insights") | |
| # Create columns for metrics | |
| col1, col2, col3, col4 = st.columns(4) | |
| with col1: | |
| st.metric("Total Samples", info.shape()[0]) | |
| with col2: | |
| st.metric("Features", info.shape()[1]) | |
| with col3: | |
| st.metric("Classes", len(train_df[target].unique())) | |
| with col4: | |
| missing_pct = (info.missing_values().sum() / len(train_df)) * 100 | |
| st.metric("Missing Data %", f"{missing_pct:.1f}%") | |
| st.markdown("---") | |
| # Class distribution | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| st.subheader("Class Distribution") | |
| class_dist = train_df[target].value_counts() | |
| st.bar_chart(class_dist) | |
| # Check for imbalance | |
| imbalance_ratio = class_dist.max() / class_dist.min() | |
| if imbalance_ratio > 2: | |
| st.warning(f"β οΈ Class imbalance detected (ratio: {imbalance_ratio:.1f}:1)") | |
| else: | |
| st.success("β Classes are relatively balanced") | |
| with col2: | |
| st.subheader("Text Length Distribution") | |
| fig, ax = plt.subplots(figsize=(8, 6)) | |
| ax.hist(train_df['text_length'], bins=30, alpha=0.7, color='skyblue') | |
| ax.set_xlabel('Text Length (characters)') | |
| ax.set_ylabel('Frequency') | |
| ax.set_title('Distribution of Text Lengths') | |
| st.pyplot(fig) | |
| # Detailed analysis | |
| with st.expander("π Detailed Analysis", expanded=False): | |
| st.write("**Class Imbalance Analysis:**") | |
| st.write(info.class_imbalanced()) | |
| st.write("**Missing Values:**") | |
| st.write(info.missing_values()) | |
| st.write("**Text Length Statistics:**") | |
| st.write(info.analysis_text_length('text_length')) | |
| # Correlation | |
| correlation = train_df[['text_length', 'target']].corr().iloc[0, 1] | |
| st.write(f"**Correlation between Text Length and Target:** {correlation:.4f}") | |
| if abs(correlation) > 0.3: | |
| st.info(f"π Moderate correlation detected ({correlation:.3f})") | |
| elif abs(correlation) > 0.1: | |
| st.info(f"π Weak correlation detected ({correlation:.3f})") | |
| else: | |
| st.info("π No significant correlation between text length and target") | |
| except Exception as e: | |
| st.error(f"Error in data analysis: {str(e)}") | |
| else: | |
| st.warning("π€ Please upload training data or use sample data to get insights") | |
| # Show instructions | |
| st.info(""" | |
| **To get started:** | |
| 1. Click "Use Sample Data" in the sidebar, OR | |
| 2. Upload your own CSV file with text and target columns, OR | |
| 3. Use the manual text input option in the sidebar | |
| """) | |
| # Train Model Section | |
| elif section == "π§ Train Model": | |
| if train_df is not None: | |
| try: | |
| st.header("π§ Train Classification Model") | |
| # Model and vectorizer selection | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| st.subheader("Choose Model") | |
| model = st.selectbox("Select Algorithm:", [ | |
| "Logistic Regression", "Decision Tree", | |
| "Random Forest", "Linear SVC", "SVC", | |
| "Multinomial Naive Bayes", "Gaussian Naive Bayes" | |
| ], help="Different algorithms have different strengths") | |
| with col2: | |
| st.subheader("Choose Vectorizer") | |
| vectorizer_choice = st.selectbox("Select Vectorization Method:", | |
| ["Tfidf Vectorizer", "Count Vectorizer"], | |
| help="TF-IDF is usually better for text classification") | |
| # Initialize vectorizer | |
| if vectorizer_choice == "Tfidf Vectorizer": | |
| vectorizer = TfidfVectorizer(max_features=10000, stop_words='english') | |
| st.session_state.vectorizer_type = "tfidf" | |
| else: | |
| vectorizer = CountVectorizer(max_features=10000, stop_words='english') | |
| st.session_state.vectorizer_type = "count" | |
| # Show processed data preview | |
| with st.expander("π Processed Data Preview", expanded=False): | |
| preview_df = train_df[['clean_text', 'target']].head(10) | |
| st.dataframe(preview_df) | |
| st.markdown("---") | |
| # Training section | |
| if st.button("π Start Training", type="primary"): | |
| with st.spinner("Training model... This may take a few moments."): | |
| try: | |
| # Progress bar | |
| progress_bar = st.progress(0) | |
| status_text = st.empty() | |
| status_text.text("Vectorizing text data...") | |
| progress_bar.progress(20) | |
| # Vectorize text data | |
| X = vectorizer.fit_transform(train_df['clean_text']) | |
| y = train_df['target'] | |
| status_text.text("Splitting data...") | |
| progress_bar.progress(40) | |
| # Split data | |
| X_train, X_test, y_train, y_test = process.split_data(X, y) | |
| status_text.text("Saving vectorizer...") | |
| progress_bar.progress(50) | |
| # Save vectorizer | |
| vectorizer_filename = f"{st.session_state.vectorizer_type}_vectorizer.pkl" | |
| save_artifacts(vectorizer, "artifacts", vectorizer_filename) | |
| status_text.text(f"Training {model}...") | |
| progress_bar.progress(70) | |
| # Train model | |
| models = Models(X_train=X_train, X_test=X_test, y_train=y_train, y_test=y_test) | |
| if model == "Logistic Regression": | |
| models.LogisticRegression() | |
| elif model == "Decision Tree": | |
| models.DecisionTree() | |
| elif model == "Linear SVC": | |
| models.LinearSVC() | |
| elif model == "SVC": | |
| models.SVC() | |
| elif model == "Multinomial Naive Bayes": | |
| models.MultinomialNB() | |
| elif model == "Random Forest": | |
| models.RandomForestClassifier() | |
| elif model == "Gaussian Naive Bayes": | |
| models.GaussianNB() | |
| progress_bar.progress(100) | |
| status_text.text("Training completed!") | |
| st.success("π Model training completed successfully!") | |
| st.balloons() | |
| # Show training info | |
| st.info(f""" | |
| **Training Summary:** | |
| - Model: {model} | |
| - Vectorizer: {vectorizer_choice} | |
| - Training samples: {X_train.shape[0]} | |
| - Test samples: {X_test.shape[0]} | |
| - Features: {X_train.shape[1]} | |
| """) | |
| except Exception as e: | |
| st.error(f"Training failed: {str(e)}") | |
| except Exception as e: | |
| st.error(f"Error in model training setup: {str(e)}") | |
| else: | |
| st.warning("π€ Please upload training data to train a model") | |
| # Predictions Section | |
| elif section == "π― Predictions": | |
| st.header("π― Make Predictions") | |
| # Check if models exist | |
| if os.path.exists("models") and os.listdir("models"): | |
| available_models = [f for f in os.listdir("models") if f.endswith('.pkl')] | |
| if available_models: | |
| # Single prediction | |
| st.subheader("Single Text Prediction") | |
| col1, col2 = st.columns([3, 1]) | |
| with col1: | |
| text_input = st.text_area( | |
| "Enter text to classify:", | |
| height=100, | |
| placeholder="Type or paste your text here..." | |
| ) | |
| with col2: | |
| selected_model = st.selectbox("Choose model:", available_models) | |
| predict_btn = st.button("π― Predict", type="primary") | |
| if predict_btn and text_input.strip(): | |
| with st.spinner("Making prediction..."): | |
| predicted_label, prediction_proba = predict_text( | |
| selected_model, | |
| text_input, | |
| st.session_state.get('vectorizer_type', 'tfidf') | |
| ) | |
| if predicted_label is not None: | |
| st.success("Prediction completed!") | |
| # Results in columns | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| st.markdown("### π Input Text") | |
| st.text_area("", value=text_input, height=100, disabled=True) | |
| with col2: | |
| st.markdown("### π― Prediction Result") | |
| st.markdown(f"**Predicted Class:** `{predicted_label}`") | |
| # Show probabilities if available | |
| if prediction_proba is not None: | |
| encoder = load_artifacts("artifacts", "encoder.pkl") | |
| if encoder is not None: | |
| classes = encoder.classes_ | |
| prob_df = pd.DataFrame({ | |
| 'Class': classes, | |
| 'Probability': prediction_proba | |
| }).sort_values('Probability', ascending=False) | |
| st.markdown("**Confidence Scores:**") | |
| # Show as progress bars | |
| for _, row in prob_df.iterrows(): | |
| st.write(f"{row['Class']}: {row['Probability']:.3f}") | |
| st.progress(row['Probability']) | |
| elif predict_btn and not text_input.strip(): | |
| st.warning("Please enter some text to classify") | |
| st.markdown("---") | |
| # Batch prediction | |
| st.subheader("Batch Predictions") | |
| uploaded_file = st.file_uploader( | |
| "Upload CSV file for batch predictions", | |
| type=['csv'], | |
| help="Upload a CSV with a text column to classify multiple texts at once" | |
| ) | |
| if uploaded_file is not None: | |
| batch_df = safe_read_csv(uploaded_file) | |
| if batch_df is not None: | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| text_column = st.selectbox("Select text column:", batch_df.columns.tolist()) | |
| with col2: | |
| batch_model = st.selectbox("Choose model:", available_models, key="batch_model") | |
| st.write("**Data Preview:**") | |
| st.dataframe(batch_df.head()) | |
| if st.button("π Run Batch Predictions"): | |
| with st.spinner("Processing batch predictions..."): | |
| predictions = [] | |
| # Progress tracking | |
| progress_bar = st.progress(0) | |
| total_texts = len(batch_df) | |
| for i, text in enumerate(batch_df[text_column]): | |
| pred, _ = predict_text( | |
| batch_model, | |
| str(text), | |
| st.session_state.get('vectorizer_type', 'tfidf') | |
| ) | |
| predictions.append(pred if pred is not None else "Error") | |
| progress_bar.progress((i + 1) / total_texts) | |
| batch_df['Predicted_Class'] = predictions | |
| st.success("β Batch predictions completed!") | |
| # Results | |
| st.write("**Results:**") | |
| st.dataframe(batch_df[[text_column, 'Predicted_Class']]) | |
| # Download button | |
| csv = batch_df.to_csv(index=False) | |
| st.download_button( | |
| label="β¬οΈ Download Results", | |
| data=csv, | |
| file_name="batch_predictions.csv", | |
| mime="text/csv" | |
| ) | |
| # Show prediction distribution | |
| pred_dist = batch_df['Predicted_Class'].value_counts() | |
| st.bar_chart(pred_dist) | |
| else: | |
| st.warning("No trained models found.") | |
| else: | |
| st.warning("π§ No models available. Please train a model first in the 'Train Model' section.") | |
| # Footer | |
| st.markdown("---") | |
| st.markdown("*Built with Streamlit β’ No-Code Text Classification*") |