Spaces:
Runtime error
Runtime error
| import streamlit as st | |
| import pandas as pd | |
| import matplotlib.pyplot as plt | |
| import numpy as np | |
| import os | |
| import pickle | |
| import re | |
| import string | |
| from pathlib import Path | |
| # Configure Streamlit page | |
| st.set_page_config(page_title="No Code Text Classifier", page_icon="๐ค", layout="wide") | |
| # Lazy imports to speed up startup | |
| def load_ml_libraries(): | |
| """Lazy load ML libraries only when needed""" | |
| try: | |
| from sklearn.preprocessing import LabelEncoder | |
| from sklearn.feature_extraction.text import TfidfVectorizer, CountVectorizer | |
| from sklearn.model_selection import train_test_split | |
| from sklearn.metrics import accuracy_score | |
| from sklearn.linear_model import LogisticRegression | |
| from sklearn.tree import DecisionTreeClassifier | |
| from sklearn.svm import LinearSVC | |
| from sklearn.ensemble import RandomForestClassifier | |
| from sklearn.naive_bayes import MultinomialNB | |
| return { | |
| 'LabelEncoder': LabelEncoder, | |
| 'TfidfVectorizer': TfidfVectorizer, | |
| 'CountVectorizer': CountVectorizer, | |
| 'train_test_split': train_test_split, | |
| 'accuracy_score': accuracy_score, | |
| 'models': { | |
| "Logistic Regression": LogisticRegression, | |
| "Decision Tree": DecisionTreeClassifier, | |
| "Random Forest": RandomForestClassifier, | |
| "Linear SVC": LinearSVC, | |
| "Multinomial Naive Bayes": MultinomialNB, | |
| } | |
| } | |
| except ImportError as e: | |
| st.error(f"Error importing ML libraries: {e}") | |
| return None | |
| # Basic stopwords (no NLTK dependency) | |
| BASIC_STOPWORDS = { | |
| 'i', 'me', 'my', 'myself', 'we', 'our', 'ours', 'ourselves', 'you', | |
| 'your', 'yours', 'yourself', 'yourselves', 'he', 'him', 'his', | |
| 'himself', 'she', 'her', 'hers', 'herself', 'it', 'its', 'itself', | |
| 'they', 'them', 'their', 'theirs', 'themselves', 'what', 'which', | |
| 'who', 'whom', 'this', 'that', 'these', 'those', 'am', 'is', 'are', | |
| 'was', 'were', 'be', 'been', 'being', 'have', 'has', 'had', 'having', | |
| 'do', 'does', 'did', 'doing', 'a', 'an', 'the', 'and', 'but', 'if', | |
| 'or', 'because', 'as', 'until', 'while', 'of', 'at', 'by', 'for', | |
| 'with', 'through', 'during', 'before', 'after', 'above', 'below', | |
| 'up', 'down', 'in', 'out', 'on', 'off', 'over', 'under', 'again', | |
| 'further', 'then', 'once' | |
| } | |
| class TextCleaner: | |
| """Lightweight text cleaner without NLTK dependencies""" | |
| def __init__(self): | |
| self.currency_symbols = r'[\$\ยฃ\โฌ\ยฅ\โน\ยข\โฝ\โฉ\โช]' | |
| self.stop_words = BASIC_STOPWORDS | |
| def remove_punctuation(self, text): | |
| return text.translate(str.maketrans('', '', string.punctuation)) | |
| def clean_text(self, text): | |
| """Clean text with basic processing""" | |
| if not isinstance(text, str): | |
| text = str(text) if text is not None else "" | |
| if not text.strip(): | |
| return "" | |
| try: | |
| # Basic cleaning | |
| text = text.lower() | |
| text = re.sub(self.currency_symbols, 'currency', text) | |
| # Remove emojis (simplified pattern) | |
| text = re.sub(r'[^\w\s]', ' ', text) | |
| text = re.sub(r'\d+', ' ', text) | |
| text = re.sub(r'\s+', ' ', text).strip() | |
| # Remove stopwords | |
| words = [word for word in text.split() if word not in self.stop_words and len(word) > 2] | |
| return ' '.join(words) | |
| except Exception as e: | |
| st.warning(f"Text cleaning warning: {e}") | |
| return str(text).lower() | |
| class DataAnalyzer: | |
| """Lightweight data analyzer""" | |
| def __init__(self, df, text_column, target_column): | |
| self.df = df | |
| self.text_column = text_column | |
| self.target_column = target_column | |
| def get_basic_info(self): | |
| info = { | |
| 'shape': self.df.shape, | |
| 'missing_values': self.df.isnull().sum().to_dict(), | |
| 'class_distribution': self.df[self.target_column].value_counts().to_dict() | |
| } | |
| return info | |
| def plot_class_distribution(self): | |
| try: | |
| fig, ax = plt.subplots(figsize=(8, 5)) | |
| self.df[self.target_column].value_counts().plot(kind='bar', ax=ax, color='steelblue') | |
| ax.set_title('Class Distribution') | |
| ax.set_xlabel('Classes') | |
| ax.set_ylabel('Count') | |
| plt.xticks(rotation=45) | |
| plt.tight_layout() | |
| st.pyplot(fig) | |
| plt.close() | |
| except Exception as e: | |
| st.error(f"Error creating plot: {e}") | |
| def plot_text_length_distribution(self): | |
| try: | |
| fig, ax = plt.subplots(figsize=(8, 5)) | |
| text_lengths = self.df[self.text_column].astype(str).str.len() | |
| ax.hist(text_lengths, bins=30, alpha=0.7, color='lightcoral') | |
| ax.set_title('Text Length Distribution') | |
| ax.set_xlabel('Text Length (characters)') | |
| ax.set_ylabel('Frequency') | |
| plt.tight_layout() | |
| st.pyplot(fig) | |
| plt.close() | |
| except Exception as e: | |
| st.error(f"Error creating plot: {e}") | |
| # Utility functions | |
| def save_artifacts(obj, folder_name, file_name): | |
| """Save artifacts with error handling""" | |
| try: | |
| os.makedirs(folder_name, exist_ok=True) | |
| file_path = os.path.join(folder_name, file_name) | |
| with open(file_path, 'wb') as f: | |
| pickle.dump(obj, f) | |
| return True | |
| except Exception as e: | |
| st.error(f"Error saving {file_name}: {e}") | |
| return False | |
| def load_artifacts(folder_name, file_name): | |
| """Load artifacts with error handling""" | |
| try: | |
| file_path = os.path.join(folder_name, file_name) | |
| with open(file_path, 'rb') as f: | |
| return pickle.load(f) | |
| except FileNotFoundError: | |
| st.error(f"File {file_name} not found in {folder_name}") | |
| return None | |
| except Exception as e: | |
| st.error(f"Error loading {file_name}: {e}") | |
| return None | |
| def train_model(model_name, X_train, X_test, y_train, y_test, ml_libs): | |
| """Train model with optimized parameters""" | |
| try: | |
| os.makedirs("models", exist_ok=True) | |
| # Get model class | |
| model_class = ml_libs['models'].get(model_name) | |
| if not model_class: | |
| st.error(f"Model {model_name} not supported") | |
| return None | |
| # Initialize model with faster parameters | |
| if model_name == "Logistic Regression": | |
| model = model_class(max_iter=500, random_state=42, solver='liblinear') | |
| elif model_name == "Random Forest": | |
| model = model_class(n_estimators=20, random_state=42, n_jobs=1) # Reduced trees | |
| elif model_name == "Linear SVC": | |
| model = model_class(random_state=42, max_iter=500) | |
| else: | |
| model = model_class(random_state=42) if 'random_state' in model_class().get_params() else model_class() | |
| # Train model | |
| with st.spinner(f"Training {model_name}..."): | |
| model.fit(X_train, y_train) | |
| # Save model | |
| model_filename = f"{model_name.replace(' ', '_')}.pkl" | |
| if save_artifacts(model, "models", model_filename): | |
| # Quick evaluation | |
| y_pred = model.predict(X_test) | |
| accuracy = ml_libs['accuracy_score'](y_test, y_pred) | |
| st.success("โ Model training completed!") | |
| st.write(f"**Accuracy**: {accuracy:.4f}") | |
| return model_filename | |
| else: | |
| return None | |
| except Exception as e: | |
| st.error(f"Error training model: {e}") | |
| return None | |
| def predict_text(model_name, text, vectorizer_type="tfidf", ml_libs=None): | |
| """Make prediction with error handling""" | |
| try: | |
| # Load components | |
| model = load_artifacts("models", model_name) | |
| if model is None: | |
| return None, None | |
| vectorizer_file = f"{vectorizer_type}_vectorizer.pkl" | |
| vectorizer = load_artifacts("artifacts", vectorizer_file) | |
| if vectorizer is None: | |
| return None, None | |
| encoder = load_artifacts("artifacts", "encoder.pkl") | |
| if encoder is None: | |
| return None, None | |
| # Process text | |
| text_cleaner = TextCleaner() | |
| clean_text = text_cleaner.clean_text(text) | |
| if not clean_text.strip(): | |
| st.warning("Text became empty after cleaning") | |
| return None, None | |
| # Vectorize and predict | |
| text_vector = vectorizer.transform([clean_text]) | |
| prediction = model.predict(text_vector) | |
| # Get probabilities if available | |
| prediction_proba = None | |
| if hasattr(model, 'predict_proba'): | |
| try: | |
| prediction_proba = model.predict_proba(text_vector)[0] | |
| except: | |
| pass | |
| # Decode prediction | |
| predicted_label = encoder.inverse_transform(prediction)[0] | |
| return predicted_label, prediction_proba | |
| except Exception as e: | |
| st.error(f"Prediction error: {e}") | |
| return None, None | |
| # Main Streamlit App | |
| def main(): | |
| st.title('๐ค No Code Text Classification App') | |
| st.write('Build and deploy text classification models without coding!') | |
| # Sidebar | |
| section = st.sidebar.radio("Choose Section", ["๐ Data Analysis", "๐ Train Model", "๐ฎ Predictions"]) | |
| # Upload Data | |
| st.sidebar.subheader("๐ Upload Your Dataset") | |
| train_data = st.sidebar.file_uploader("Upload training data (CSV)", type=["csv"]) | |
| # Initialize session state | |
| if 'vectorizer_type' not in st.session_state: | |
| st.session_state.vectorizer_type = "tfidf" | |
| # Load and process data | |
| train_df = None | |
| if train_data is not None: | |
| try: | |
| # Try different encodings | |
| for encoding in ['utf-8', 'latin1', 'iso-8859-1', 'cp1252']: | |
| try: | |
| train_df = pd.read_csv(train_data, encoding=encoding) | |
| st.success(f"โ File loaded successfully with {encoding} encoding") | |
| break | |
| except UnicodeDecodeError: | |
| continue | |
| if train_df is None: | |
| st.error("โ Could not read the CSV file. Please check the file format.") | |
| else: | |
| st.write("**Training Data Preview:**") | |
| st.dataframe(train_df.head(3)) | |
| columns = train_df.columns.tolist() | |
| text_data = st.sidebar.selectbox("๐ Choose the text column:", columns) | |
| target = st.sidebar.selectbox("๐ฏ Choose the target column:", columns) | |
| # Process data | |
| if text_data and target and st.sidebar.button("Process Data"): | |
| with st.spinner("Processing data..."): | |
| text_cleaner = TextCleaner() | |
| # Clean text with progress | |
| progress_bar = st.progress(0) | |
| cleaned_texts = [] | |
| for i, text in enumerate(train_df[text_data]): | |
| cleaned_texts.append(text_cleaner.clean_text(text) if pd.notna(text) else "") | |
| progress_bar.progress((i + 1) / len(train_df)) | |
| train_df['clean_text'] = cleaned_texts | |
| train_df['text_length'] = train_df[text_data].astype(str).str.len() | |
| # Handle label encoding | |
| ml_libs = load_ml_libraries() | |
| if ml_libs: | |
| label_encoder = ml_libs['LabelEncoder']() | |
| train_df['target'] = label_encoder.fit_transform(train_df[target].astype(str)) | |
| # Save encoder | |
| save_artifacts(label_encoder, "artifacts", "encoder.pkl") | |
| st.success("โ Data processed successfully!") | |
| else: | |
| st.error("โ Could not load ML libraries") | |
| except Exception as e: | |
| st.error(f"โ Error processing data: {e}") | |
| train_df = None | |
| # Data Analysis Section | |
| if section == "๐ Data Analysis": | |
| if train_df is not None and 'clean_text' in train_df.columns: | |
| st.subheader("๐ Data Insights") | |
| analyzer = DataAnalyzer(train_df, text_data, target) | |
| info = analyzer.get_basic_info() | |
| # Metrics | |
| col1, col2, col3 = st.columns(3) | |
| with col1: | |
| st.metric("๐ Total Samples", f"{info['shape'][0]:,}") | |
| with col2: | |
| st.metric("๐ Features", info['shape'][1]) | |
| with col3: | |
| st.metric("๐ท๏ธ Classes", len(info['class_distribution'])) | |
| # Class distribution | |
| st.write("**Class Distribution:**") | |
| class_dist_df = pd.DataFrame(list(info['class_distribution'].items()), | |
| columns=['Class', 'Count']) | |
| st.dataframe(class_dist_df, use_container_width=True) | |
| # Sample data | |
| st.write("**Processed Data Sample:**") | |
| if 'clean_text' in train_df.columns: | |
| sample_df = train_df[['clean_text', 'text_length', target]].head(5) | |
| st.dataframe(sample_df, use_container_width=True) | |
| # Visualizations | |
| st.subheader("๐ Data Visualizations") | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| st.write("**Class Distribution**") | |
| analyzer.plot_class_distribution() | |
| with col2: | |
| st.write("**Text Length Distribution**") | |
| analyzer.plot_text_length_distribution() | |
| else: | |
| st.info("๐ Upload and process your data to see analysis") | |
| # Train Model Section | |
| elif section == "๐ Train Model": | |
| if train_df is not None and 'clean_text' in train_df.columns: | |
| st.subheader("๐ Train Your Classification Model") | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| model = st.selectbox("๐ค Choose Model", [ | |
| "Logistic Regression", | |
| "Decision Tree", | |
| "Random Forest", | |
| "Linear SVC", | |
| "Multinomial Naive Bayes" | |
| ]) | |
| with col2: | |
| vectorizer_choice = st.selectbox("๐ Choose Vectorizer", | |
| ["Tfidf Vectorizer", "Count Vectorizer"]) | |
| # Filter out empty texts | |
| valid_data = train_df[train_df['clean_text'].str.len() > 0].copy() | |
| if len(valid_data) < 10: | |
| st.error("โ Not enough valid text data after cleaning! Need at least 10 samples.") | |
| else: | |
| st.info(f"โ Ready to train with {len(valid_data):,} valid samples") | |
| # Load ML libraries when needed | |
| ml_libs = load_ml_libraries() | |
| if not ml_libs: | |
| st.error("โ Could not load ML libraries") | |
| return | |
| # Initialize vectorizer | |
| max_features = min(5000, len(valid_data) * 5) # Conservative limit | |
| if vectorizer_choice == "Tfidf Vectorizer": | |
| vectorizer = ml_libs['TfidfVectorizer'](max_features=max_features, stop_words='english', ngram_range=(1,1)) | |
| st.session_state.vectorizer_type = "tfidf" | |
| else: | |
| vectorizer = ml_libs['CountVectorizer'](max_features=max_features, stop_words='english', ngram_range=(1,1)) | |
| st.session_state.vectorizer_type = "count" | |
| if st.button("๐ฏ Start Training", type="primary"): | |
| try: | |
| # Vectorize | |
| with st.spinner("Vectorizing text data..."): | |
| X = vectorizer.fit_transform(valid_data['clean_text']) | |
| y = valid_data['target'] | |
| st.write(f"๐ **Feature matrix shape:** {X.shape}") | |
| # Split data | |
| test_size = min(0.3, max(0.1, 50 / len(valid_data))) | |
| X_train, X_test, y_train, y_test = ml_libs['train_test_split']( | |
| X, y, test_size=test_size, random_state=42, stratify=y | |
| ) | |
| st.write(f"๐ **Data split** - Train: {X_train.shape[0]:,}, Test: {X_test.shape[0]:,}") | |
| # Save vectorizer | |
| vectorizer_filename = f"{st.session_state.vectorizer_type}_vectorizer.pkl" | |
| if save_artifacts(vectorizer, "artifacts", vectorizer_filename): | |
| # Train model | |
| model_filename = train_model(model, X_train, X_test, y_train, y_test, ml_libs) | |
| if model_filename: | |
| st.balloons() | |
| st.success("๐ Model ready! Go to 'Predictions' to test it.") | |
| except Exception as e: | |
| st.error(f"โ Training failed: {e}") | |
| else: | |
| st.info("๐ Please upload and process training data first") | |
| # Predictions Section | |
| elif section == "๐ฎ Predictions": | |
| st.subheader("๐ฎ Make Predictions") | |
| if os.path.exists("models") and os.listdir("models"): | |
| available_models = [f for f in os.listdir("models") if f.endswith('.pkl')] | |
| if available_models: | |
| selected_model = st.selectbox("๐ค Choose trained model:", available_models) | |
| text_input = st.text_area("๐ Enter text to classify:", | |
| height=120, | |
| placeholder="Type your text here...") | |
| col1, col2 = st.columns([1, 3]) | |
| with col1: | |
| predict_button = st.button("๐ฏ Predict", type="primary") | |
| if predict_button and text_input.strip(): | |
| ml_libs = load_ml_libraries() | |
| if ml_libs: | |
| predicted_label, prediction_proba = predict_text( | |
| selected_model, | |
| text_input, | |
| st.session_state.get('vectorizer_type', 'tfidf'), | |
| ml_libs | |
| ) | |
| if predicted_label is not None: | |
| st.success("โ Prediction completed!") | |
| # Show prediction | |
| st.markdown("### ๐ท๏ธ Prediction Result") | |
| st.markdown(f"**Predicted Class:** `{predicted_label}`") | |
| # Show probabilities if available | |
| if prediction_proba is not None: | |
| st.markdown("### ๐ Class Probabilities") | |
| encoder = load_artifacts("artifacts", "encoder.pkl") | |
| if encoder is not None: | |
| classes = encoder.classes_ | |
| prob_df = pd.DataFrame({ | |
| 'Class': classes, | |
| 'Probability': prediction_proba | |
| }).sort_values('Probability', ascending=False) | |
| # Create a nice probability display | |
| for idx, row in prob_df.iterrows(): | |
| confidence = row['Probability'] | |
| st.write(f"**{row['Class']}**: {confidence:.1%}") | |
| st.progress(confidence) | |
| elif predict_button: | |
| st.warning("โ ๏ธ Please enter some text to classify") | |
| else: | |
| st.info("๐ No trained models found") | |
| else: | |
| st.info("๐ No models available. Please train a model first in the 'Train Model' section.") | |
| # Footer | |
| st.markdown("---") | |
| st.markdown("๐ **Built with Streamlit** | Ready for deployment on Hugging Face Spaces") | |
| if __name__ == "__main__": | |
| main() |