Spaces:
Runtime error
Runtime error
| import streamlit as st | |
| import pandas as pd | |
| import matplotlib.pyplot as plt | |
| import numpy as np | |
| import os | |
| import pickle | |
| import re | |
| import string | |
| from pathlib import Path | |
| from sklearn.preprocessing import LabelEncoder | |
| from sklearn.feature_extraction.text import TfidfVectorizer, CountVectorizer | |
| from sklearn.model_selection import train_test_split | |
| from sklearn.metrics import accuracy_score | |
| from sklearn.linear_model import LogisticRegression | |
| from sklearn.tree import DecisionTreeClassifier | |
| from sklearn.svm import LinearSVC | |
| from sklearn.ensemble import RandomForestClassifier | |
| from sklearn.naive_bayes import MultinomialNB | |
| # Configure Streamlit page | |
| st.set_page_config(page_title="No Code Text Classifier", page_icon="🤖", layout="wide") | |
| # Initialize NLTK components with fallbacks | |
| def init_nltk_components(): | |
| """Initialize NLTK components with fallbacks""" | |
| try: | |
| import nltk | |
| # Try to use pre-downloaded data first | |
| try: | |
| from nltk.corpus import stopwords | |
| from nltk.stem import WordNetLemmatizer | |
| stop_words = set(stopwords.words('english')) | |
| lemmatizer = WordNetLemmatizer() | |
| # Test lemmatizer | |
| _ = lemmatizer.lemmatize('test') | |
| return stop_words, lemmatizer, True | |
| except: | |
| # Fallback: try to download | |
| try: | |
| nltk.download('stopwords', quiet=True) | |
| nltk.download('wordnet', quiet=True) | |
| nltk.download('omw-1.4', quiet=True) | |
| from nltk.corpus import stopwords | |
| from nltk.stem import WordNetLemmatizer | |
| stop_words = set(stopwords.words('english')) | |
| lemmatizer = WordNetLemmatizer() | |
| return stop_words, lemmatizer, True | |
| except: | |
| # Final fallback: use basic English stopwords | |
| basic_stopwords = { | |
| 'i', 'me', 'my', 'myself', 'we', 'our', 'ours', 'ourselves', 'you', | |
| 'your', 'yours', 'yourself', 'yourselves', 'he', 'him', 'his', | |
| 'himself', 'she', 'her', 'hers', 'herself', 'it', 'its', 'itself', | |
| 'they', 'them', 'their', 'theirs', 'themselves', 'what', 'which', | |
| 'who', 'whom', 'this', 'that', 'these', 'those', 'am', 'is', 'are', | |
| 'was', 'were', 'be', 'been', 'being', 'have', 'has', 'had', 'having', | |
| 'do', 'does', 'did', 'doing', 'a', 'an', 'the', 'and', 'but', 'if', | |
| 'or', 'because', 'as', 'until', 'while', 'of', 'at', 'by', 'for', | |
| 'with', 'through', 'during', 'before', 'after', 'above', 'below', | |
| 'up', 'down', 'in', 'out', 'on', 'off', 'over', 'under', 'again', | |
| 'further', 'then', 'once' | |
| } | |
| return basic_stopwords, None, False | |
| except ImportError: | |
| # NLTK not available at all | |
| basic_stopwords = set() | |
| return basic_stopwords, None, False | |
| # Initialize NLTK components | |
| STOP_WORDS, LEMMATIZER, NLTK_AVAILABLE = init_nltk_components() | |
| class TextCleaner: | |
| """Simplified text cleaner with fallbacks""" | |
| def __init__(self): | |
| self.currency_symbols = r'[\$\£\€\¥\₹\¢\₽\₩\₪]' | |
| self.stop_words = STOP_WORDS | |
| self.lemmatizer = LEMMATIZER | |
| self.nltk_available = NLTK_AVAILABLE | |
| def remove_punctuation(self, text): | |
| return text.translate(str.maketrans('', '', string.punctuation)) | |
| def clean_text(self, text): | |
| """Clean text with robust error handling""" | |
| if not isinstance(text, str): | |
| text = str(text) if text is not None else "" | |
| if not text.strip(): | |
| return "" | |
| try: | |
| # Basic cleaning | |
| text = text.lower() | |
| text = re.sub(self.currency_symbols, 'currency', text) | |
| # Remove emojis | |
| emoji_pattern = re.compile("[" | |
| u"\U0001F600-\U0001F64F" # emoticons | |
| u"\U0001F300-\U0001F5FF" # symbols & pictographs | |
| u"\U0001F680-\U0001F6FF" # transport & map symbols | |
| u"\U0001F1E0-\U0001F1FF" # flags (iOS) | |
| u"\U00002702-\U000027B0" | |
| u"\U000024C2-\U0001F251" | |
| "]+", flags=re.UNICODE) | |
| text = emoji_pattern.sub(r'', text) | |
| # Remove punctuation and clean | |
| text = self.remove_punctuation(text) | |
| text = re.compile('<.*?>').sub('', text) | |
| text = text.replace('_', '') | |
| text = re.sub(r'[^\w\s]', '', text) | |
| text = re.sub(r'\d', ' ', text) | |
| text = re.sub(r'\s+', ' ', text).strip() | |
| # Remove stopwords if available | |
| if self.stop_words: | |
| text = ' '.join(word for word in text.split() if word not in self.stop_words) | |
| # Lemmatize if available | |
| if self.lemmatizer and self.nltk_available: | |
| try: | |
| text = ' '.join(self.lemmatizer.lemmatize(word) for word in text.split()) | |
| except: | |
| pass # Skip lemmatization if it fails | |
| return text | |
| except Exception as e: | |
| st.warning(f"Text cleaning warning: {e}") | |
| return str(text) | |
| class DataAnalyzer: | |
| """Simplified data analyzer""" | |
| def __init__(self, df, text_column, target_column): | |
| self.df = df | |
| self.text_column = text_column | |
| self.target_column = target_column | |
| def get_basic_info(self): | |
| info = { | |
| 'shape': self.df.shape, | |
| 'missing_values': self.df.isnull().sum().to_dict(), | |
| 'class_distribution': self.df[self.target_column].value_counts().to_dict() | |
| } | |
| return info | |
| def plot_class_distribution(self): | |
| try: | |
| fig, ax = plt.subplots(figsize=(10, 6)) | |
| self.df[self.target_column].value_counts().plot(kind='bar', ax=ax) | |
| ax.set_title('Class Distribution') | |
| ax.set_xlabel('Classes') | |
| ax.set_ylabel('Count') | |
| plt.xticks(rotation=45) | |
| plt.tight_layout() | |
| st.pyplot(fig) | |
| except Exception as e: | |
| st.error(f"Error creating plot: {e}") | |
| def plot_text_length_distribution(self): | |
| try: | |
| fig, ax = plt.subplots(figsize=(10, 6)) | |
| text_lengths = self.df[self.text_column].str.len() | |
| ax.hist(text_lengths, bins=50, alpha=0.7) | |
| ax.set_title('Text Length Distribution') | |
| ax.set_xlabel('Text Length') | |
| ax.set_ylabel('Frequency') | |
| plt.tight_layout() | |
| st.pyplot(fig) | |
| except Exception as e: | |
| st.error(f"Error creating plot: {e}") | |
| # Utility functions with better error handling | |
| def save_artifacts(obj, folder_name, file_name): | |
| """Save artifacts with error handling""" | |
| try: | |
| os.makedirs(folder_name, exist_ok=True) | |
| with open(os.path.join(folder_name, file_name), 'wb') as f: | |
| pickle.dump(obj, f) | |
| return True | |
| except Exception as e: | |
| st.error(f"Error saving {file_name}: {e}") | |
| return False | |
| def load_artifacts(folder_name, file_name): | |
| """Load artifacts with error handling""" | |
| try: | |
| with open(os.path.join(folder_name, file_name), 'rb') as f: | |
| return pickle.load(f) | |
| except FileNotFoundError: | |
| st.error(f"File {file_name} not found in {folder_name}") | |
| return None | |
| except Exception as e: | |
| st.error(f"Error loading {file_name}: {e}") | |
| return None | |
| def train_model(model_name, X_train, X_test, y_train, y_test): | |
| """Train model with simplified selection""" | |
| try: | |
| os.makedirs("models", exist_ok=True) | |
| # Simplified model dictionary | |
| models_dict = { | |
| "Logistic Regression": LogisticRegression(max_iter=1000, random_state=42), | |
| "Decision Tree": DecisionTreeClassifier(random_state=42), | |
| "Random Forest": RandomForestClassifier(n_estimators=50, random_state=42), # Reduced for speed | |
| "Linear SVC": LinearSVC(random_state=42, max_iter=1000), | |
| "Multinomial Naive Bayes": MultinomialNB(), | |
| } | |
| if model_name not in models_dict: | |
| st.error(f"Model {model_name} not supported") | |
| return None | |
| model = models_dict[model_name] | |
| # Train model | |
| model.fit(X_train, y_train) | |
| # Save model | |
| model_filename = f"{model_name.replace(' ', '_')}.pkl" | |
| save_path = os.path.join("models", model_filename) | |
| if save_artifacts(model, "models", model_filename): | |
| # Evaluate | |
| y_pred = model.predict(X_test) | |
| accuracy = accuracy_score(y_test, y_pred) | |
| st.success("✅ Model training completed!") | |
| st.write(f"**Accuracy**: {accuracy:.4f}") | |
| return model_filename | |
| else: | |
| return None | |
| except Exception as e: | |
| st.error(f"Error training model: {e}") | |
| return None | |
| def predict_text(model_name, text, vectorizer_type="tfidf"): | |
| """Make prediction with better error handling""" | |
| try: | |
| # Load components | |
| model = load_artifacts("models", model_name) | |
| if model is None: | |
| return None, None | |
| vectorizer_file = f"{vectorizer_type}_vectorizer.pkl" | |
| vectorizer = load_artifacts("artifacts", vectorizer_file) | |
| if vectorizer is None: | |
| return None, None | |
| encoder = load_artifacts("artifacts", "encoder.pkl") | |
| if encoder is None: | |
| return None, None | |
| # Process text | |
| text_cleaner = TextCleaner() | |
| clean_text = text_cleaner.clean_text(text) | |
| if not clean_text.strip(): | |
| st.warning("Text became empty after cleaning") | |
| return None, None | |
| # Vectorize and predict | |
| text_vector = vectorizer.transform([clean_text]) | |
| prediction = model.predict(text_vector) | |
| # Get probabilities if possible | |
| prediction_proba = None | |
| if hasattr(model, 'predict_proba'): | |
| try: | |
| prediction_proba = model.predict_proba(text_vector)[0] | |
| except: | |
| pass | |
| # Decode prediction | |
| predicted_label = encoder.inverse_transform(prediction)[0] | |
| return predicted_label, prediction_proba | |
| except Exception as e: | |
| st.error(f"Prediction error: {e}") | |
| return None, None | |
| # Main Streamlit App | |
| st.title('🤖 No Code Text Classification App') | |
| # Show NLTK status | |
| if not NLTK_AVAILABLE: | |
| st.warning("⚠️ NLTK not fully available. Using basic text processing.") | |
| st.write('Understand the behavior of your text data and train a model to classify text data') | |
| # Sidebar | |
| section = st.sidebar.radio("Choose Section", ["Data Analysis", "Train Model", "Predictions"]) | |
| # Upload Data | |
| st.sidebar.subheader("📁 Upload Your Dataset") | |
| train_data = st.sidebar.file_uploader("Upload training data", type=["csv"]) | |
| # Initialize session state | |
| if 'vectorizer_type' not in st.session_state: | |
| st.session_state.vectorizer_type = "tfidf" | |
| # Load and process data | |
| train_df = None | |
| if train_data is not None: | |
| try: | |
| # Try different encodings | |
| for encoding in ['utf-8', 'latin1', 'iso-8859-1']: | |
| try: | |
| train_df = pd.read_csv(train_data, encoding=encoding) | |
| break | |
| except UnicodeDecodeError: | |
| continue | |
| if train_df is None: | |
| st.error("Could not read the CSV file. Please check the encoding.") | |
| else: | |
| st.write("**Training Data Preview:**") | |
| st.dataframe(train_df.head(3)) | |
| columns = train_df.columns.tolist() | |
| text_data = st.sidebar.selectbox("Choose the text column:", columns) | |
| target = st.sidebar.selectbox("Choose the target column:", columns) | |
| # Process data | |
| if text_data and target: | |
| with st.spinner("Processing data..."): | |
| text_cleaner = TextCleaner() | |
| train_df['clean_text'] = train_df[text_data].apply( | |
| lambda x: text_cleaner.clean_text(x) if pd.notna(x) else "" | |
| ) | |
| train_df['text_length'] = train_df[text_data].astype(str).str.len() | |
| # Handle label encoding | |
| label_encoder = LabelEncoder() | |
| train_df['target'] = label_encoder.fit_transform(train_df[target].astype(str)) | |
| # Save encoder | |
| save_artifacts(label_encoder, "artifacts", "encoder.pkl") | |
| except Exception as e: | |
| st.error(f"Error processing data: {e}") | |
| train_df = None | |
| # Data Analysis Section | |
| if section == "Data Analysis": | |
| if train_df is not None: | |
| st.subheader("📊 Data Insights") | |
| analyzer = DataAnalyzer(train_df, text_data, target) | |
| info = analyzer.get_basic_info() | |
| col1, col2, col3 = st.columns(3) | |
| with col1: | |
| st.metric("Total Samples", info['shape'][0]) | |
| with col2: | |
| st.metric("Features", info['shape'][1]) | |
| with col3: | |
| st.metric("Classes", len(info['class_distribution'])) | |
| st.write("**Class Distribution:**") | |
| st.write(info['class_distribution']) | |
| # Show sample of processed data | |
| st.write("**Processed Data Preview:**") | |
| sample_df = train_df[['clean_text', 'text_length', 'target']].head(10) | |
| st.dataframe(sample_df) | |
| st.subheader("📈 Visualizations") | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| st.write("**Class Distribution**") | |
| analyzer.plot_class_distribution() | |
| with col2: | |
| st.write("**Text Length Distribution**") | |
| analyzer.plot_text_length_distribution() | |
| else: | |
| st.warning("⚠️ Please upload training data to see analysis") | |
| # Train Model Section | |
| elif section == "Train Model": | |
| if train_df is not None and 'clean_text' in train_df.columns: | |
| st.subheader("🚀 Train a Model") | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| model = st.selectbox("Choose the Model", [ | |
| "Logistic Regression", | |
| "Decision Tree", | |
| "Random Forest", | |
| "Linear SVC", | |
| "Multinomial Naive Bayes" | |
| ]) | |
| with col2: | |
| vectorizer_choice = st.selectbox("Choose Vectorizer", | |
| ["Tfidf Vectorizer", "Count Vectorizer"]) | |
| # Filter out empty texts | |
| valid_data = train_df[train_df['clean_text'].str.len() > 0].copy() | |
| if len(valid_data) == 0: | |
| st.error("No valid text data after cleaning!") | |
| else: | |
| st.write(f"**Valid samples**: {len(valid_data)}") | |
| # Initialize vectorizer | |
| max_features = min(10000, len(valid_data) * 10) # Adaptive max_features | |
| if vectorizer_choice == "Tfidf Vectorizer": | |
| vectorizer = TfidfVectorizer(max_features=max_features, stop_words='english') | |
| st.session_state.vectorizer_type = "tfidf" | |
| else: | |
| vectorizer = CountVectorizer(max_features=max_features, stop_words='english') | |
| st.session_state.vectorizer_type = "count" | |
| if st.button("🎯 Start Training", type="primary"): | |
| with st.spinner("Training model..."): | |
| try: | |
| # Vectorize | |
| X = vectorizer.fit_transform(valid_data['clean_text']) | |
| y = valid_data['target'] | |
| # Split data | |
| test_size = min(0.3, max(0.1, len(valid_data) * 0.2 / len(valid_data))) | |
| X_train, X_test, y_train, y_test = train_test_split( | |
| X, y, test_size=test_size, random_state=42, stratify=y | |
| ) | |
| st.write(f"**Data split** - Train: {X_train.shape[0]}, Test: {X_test.shape[0]}") | |
| # Save vectorizer | |
| vectorizer_filename = f"{st.session_state.vectorizer_type}_vectorizer.pkl" | |
| if save_artifacts(vectorizer, "artifacts", vectorizer_filename): | |
| # Train model | |
| model_filename = train_model(model, X_train, X_test, y_train, y_test) | |
| if model_filename: | |
| st.success("✅ Model ready! Go to 'Predictions' to test it.") | |
| except Exception as e: | |
| st.error(f"Training failed: {e}") | |
| else: | |
| st.warning("⚠️ Please upload and process training data first") | |
| # Predictions Section | |
| elif section == "Predictions": | |
| st.subheader("🔮 Make Predictions") | |
| if os.path.exists("models") and os.listdir("models"): | |
| available_models = [f for f in os.listdir("models") if f.endswith('.pkl')] | |
| if available_models: | |
| selected_model = st.selectbox("Choose trained model:", available_models) | |
| text_input = st.text_area("Enter text to classify:", | |
| height=100, | |
| placeholder="Type your text here...") | |
| if st.button("🎯 Predict", type="primary"): | |
| if text_input.strip(): | |
| with st.spinner("Making prediction..."): | |
| predicted_label, prediction_proba = predict_text( | |
| selected_model, | |
| text_input, | |
| st.session_state.get('vectorizer_type', 'tfidf') | |
| ) | |
| if predicted_label is not None: | |
| st.success("✅ Prediction completed!") | |
| st.markdown(f"**Predicted Class:** `{predicted_label}`") | |
| if prediction_proba is not None: | |
| st.markdown("**Class Probabilities:**") | |
| encoder = load_artifacts("artifacts", "encoder.pkl") | |
| if encoder is not None: | |
| classes = encoder.classes_ | |
| prob_df = pd.DataFrame({ | |
| 'Class': classes, | |
| 'Probability': prediction_proba | |
| }).sort_values('Probability', ascending=False) | |
| st.dataframe(prob_df, use_container_width=True) | |
| else: | |
| st.warning("⚠️ Please enter some text") | |
| else: | |
| st.warning("⚠️ No trained models found") | |
| else: | |
| st.warning("⚠️ No models available. Please train a model first.") | |
| # Footer | |
| st.markdown("---") | |
| st.markdown("🚀 Built with Streamlit | Ready for 🤗 Hugging Face Spaces") |