TextClassifier / app.py
Alamgirapi's picture
Update app.py
b97bac9 verified
raw
history blame
19.8 kB
import streamlit as st
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
import os
import pickle
import re
import string
from pathlib import Path
from sklearn.preprocessing import LabelEncoder
from sklearn.feature_extraction.text import TfidfVectorizer, CountVectorizer
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
from sklearn.linear_model import LogisticRegression
from sklearn.tree import DecisionTreeClassifier
from sklearn.svm import LinearSVC
from sklearn.ensemble import RandomForestClassifier
from sklearn.naive_bayes import MultinomialNB
# Configure Streamlit page
st.set_page_config(page_title="No Code Text Classifier", page_icon="🤖", layout="wide")
# Initialize NLTK components with fallbacks
@st.cache_resource
def init_nltk_components():
"""Initialize NLTK components with fallbacks"""
try:
import nltk
# Try to use pre-downloaded data first
try:
from nltk.corpus import stopwords
from nltk.stem import WordNetLemmatizer
stop_words = set(stopwords.words('english'))
lemmatizer = WordNetLemmatizer()
# Test lemmatizer
_ = lemmatizer.lemmatize('test')
return stop_words, lemmatizer, True
except:
# Fallback: try to download
try:
nltk.download('stopwords', quiet=True)
nltk.download('wordnet', quiet=True)
nltk.download('omw-1.4', quiet=True)
from nltk.corpus import stopwords
from nltk.stem import WordNetLemmatizer
stop_words = set(stopwords.words('english'))
lemmatizer = WordNetLemmatizer()
return stop_words, lemmatizer, True
except:
# Final fallback: use basic English stopwords
basic_stopwords = {
'i', 'me', 'my', 'myself', 'we', 'our', 'ours', 'ourselves', 'you',
'your', 'yours', 'yourself', 'yourselves', 'he', 'him', 'his',
'himself', 'she', 'her', 'hers', 'herself', 'it', 'its', 'itself',
'they', 'them', 'their', 'theirs', 'themselves', 'what', 'which',
'who', 'whom', 'this', 'that', 'these', 'those', 'am', 'is', 'are',
'was', 'were', 'be', 'been', 'being', 'have', 'has', 'had', 'having',
'do', 'does', 'did', 'doing', 'a', 'an', 'the', 'and', 'but', 'if',
'or', 'because', 'as', 'until', 'while', 'of', 'at', 'by', 'for',
'with', 'through', 'during', 'before', 'after', 'above', 'below',
'up', 'down', 'in', 'out', 'on', 'off', 'over', 'under', 'again',
'further', 'then', 'once'
}
return basic_stopwords, None, False
except ImportError:
# NLTK not available at all
basic_stopwords = set()
return basic_stopwords, None, False
# Initialize NLTK components
STOP_WORDS, LEMMATIZER, NLTK_AVAILABLE = init_nltk_components()
class TextCleaner:
"""Simplified text cleaner with fallbacks"""
def __init__(self):
self.currency_symbols = r'[\$\£\€\¥\₹\¢\₽\₩\₪]'
self.stop_words = STOP_WORDS
self.lemmatizer = LEMMATIZER
self.nltk_available = NLTK_AVAILABLE
def remove_punctuation(self, text):
return text.translate(str.maketrans('', '', string.punctuation))
def clean_text(self, text):
"""Clean text with robust error handling"""
if not isinstance(text, str):
text = str(text) if text is not None else ""
if not text.strip():
return ""
try:
# Basic cleaning
text = text.lower()
text = re.sub(self.currency_symbols, 'currency', text)
# Remove emojis
emoji_pattern = re.compile("["
u"\U0001F600-\U0001F64F" # emoticons
u"\U0001F300-\U0001F5FF" # symbols & pictographs
u"\U0001F680-\U0001F6FF" # transport & map symbols
u"\U0001F1E0-\U0001F1FF" # flags (iOS)
u"\U00002702-\U000027B0"
u"\U000024C2-\U0001F251"
"]+", flags=re.UNICODE)
text = emoji_pattern.sub(r'', text)
# Remove punctuation and clean
text = self.remove_punctuation(text)
text = re.compile('<.*?>').sub('', text)
text = text.replace('_', '')
text = re.sub(r'[^\w\s]', '', text)
text = re.sub(r'\d', ' ', text)
text = re.sub(r'\s+', ' ', text).strip()
# Remove stopwords if available
if self.stop_words:
text = ' '.join(word for word in text.split() if word not in self.stop_words)
# Lemmatize if available
if self.lemmatizer and self.nltk_available:
try:
text = ' '.join(self.lemmatizer.lemmatize(word) for word in text.split())
except:
pass # Skip lemmatization if it fails
return text
except Exception as e:
st.warning(f"Text cleaning warning: {e}")
return str(text)
class DataAnalyzer:
"""Simplified data analyzer"""
def __init__(self, df, text_column, target_column):
self.df = df
self.text_column = text_column
self.target_column = target_column
def get_basic_info(self):
info = {
'shape': self.df.shape,
'missing_values': self.df.isnull().sum().to_dict(),
'class_distribution': self.df[self.target_column].value_counts().to_dict()
}
return info
def plot_class_distribution(self):
try:
fig, ax = plt.subplots(figsize=(10, 6))
self.df[self.target_column].value_counts().plot(kind='bar', ax=ax)
ax.set_title('Class Distribution')
ax.set_xlabel('Classes')
ax.set_ylabel('Count')
plt.xticks(rotation=45)
plt.tight_layout()
st.pyplot(fig)
except Exception as e:
st.error(f"Error creating plot: {e}")
def plot_text_length_distribution(self):
try:
fig, ax = plt.subplots(figsize=(10, 6))
text_lengths = self.df[self.text_column].str.len()
ax.hist(text_lengths, bins=50, alpha=0.7)
ax.set_title('Text Length Distribution')
ax.set_xlabel('Text Length')
ax.set_ylabel('Frequency')
plt.tight_layout()
st.pyplot(fig)
except Exception as e:
st.error(f"Error creating plot: {e}")
# Utility functions with better error handling
def save_artifacts(obj, folder_name, file_name):
"""Save artifacts with error handling"""
try:
os.makedirs(folder_name, exist_ok=True)
with open(os.path.join(folder_name, file_name), 'wb') as f:
pickle.dump(obj, f)
return True
except Exception as e:
st.error(f"Error saving {file_name}: {e}")
return False
def load_artifacts(folder_name, file_name):
"""Load artifacts with error handling"""
try:
with open(os.path.join(folder_name, file_name), 'rb') as f:
return pickle.load(f)
except FileNotFoundError:
st.error(f"File {file_name} not found in {folder_name}")
return None
except Exception as e:
st.error(f"Error loading {file_name}: {e}")
return None
def train_model(model_name, X_train, X_test, y_train, y_test):
"""Train model with simplified selection"""
try:
os.makedirs("models", exist_ok=True)
# Simplified model dictionary
models_dict = {
"Logistic Regression": LogisticRegression(max_iter=1000, random_state=42),
"Decision Tree": DecisionTreeClassifier(random_state=42),
"Random Forest": RandomForestClassifier(n_estimators=50, random_state=42), # Reduced for speed
"Linear SVC": LinearSVC(random_state=42, max_iter=1000),
"Multinomial Naive Bayes": MultinomialNB(),
}
if model_name not in models_dict:
st.error(f"Model {model_name} not supported")
return None
model = models_dict[model_name]
# Train model
model.fit(X_train, y_train)
# Save model
model_filename = f"{model_name.replace(' ', '_')}.pkl"
save_path = os.path.join("models", model_filename)
if save_artifacts(model, "models", model_filename):
# Evaluate
y_pred = model.predict(X_test)
accuracy = accuracy_score(y_test, y_pred)
st.success("✅ Model training completed!")
st.write(f"**Accuracy**: {accuracy:.4f}")
return model_filename
else:
return None
except Exception as e:
st.error(f"Error training model: {e}")
return None
def predict_text(model_name, text, vectorizer_type="tfidf"):
"""Make prediction with better error handling"""
try:
# Load components
model = load_artifacts("models", model_name)
if model is None:
return None, None
vectorizer_file = f"{vectorizer_type}_vectorizer.pkl"
vectorizer = load_artifacts("artifacts", vectorizer_file)
if vectorizer is None:
return None, None
encoder = load_artifacts("artifacts", "encoder.pkl")
if encoder is None:
return None, None
# Process text
text_cleaner = TextCleaner()
clean_text = text_cleaner.clean_text(text)
if not clean_text.strip():
st.warning("Text became empty after cleaning")
return None, None
# Vectorize and predict
text_vector = vectorizer.transform([clean_text])
prediction = model.predict(text_vector)
# Get probabilities if possible
prediction_proba = None
if hasattr(model, 'predict_proba'):
try:
prediction_proba = model.predict_proba(text_vector)[0]
except:
pass
# Decode prediction
predicted_label = encoder.inverse_transform(prediction)[0]
return predicted_label, prediction_proba
except Exception as e:
st.error(f"Prediction error: {e}")
return None, None
# Main Streamlit App
st.title('🤖 No Code Text Classification App')
# Show NLTK status
if not NLTK_AVAILABLE:
st.warning("⚠️ NLTK not fully available. Using basic text processing.")
st.write('Understand the behavior of your text data and train a model to classify text data')
# Sidebar
section = st.sidebar.radio("Choose Section", ["Data Analysis", "Train Model", "Predictions"])
# Upload Data
st.sidebar.subheader("📁 Upload Your Dataset")
train_data = st.sidebar.file_uploader("Upload training data", type=["csv"])
# Initialize session state
if 'vectorizer_type' not in st.session_state:
st.session_state.vectorizer_type = "tfidf"
# Load and process data
train_df = None
if train_data is not None:
try:
# Try different encodings
for encoding in ['utf-8', 'latin1', 'iso-8859-1']:
try:
train_df = pd.read_csv(train_data, encoding=encoding)
break
except UnicodeDecodeError:
continue
if train_df is None:
st.error("Could not read the CSV file. Please check the encoding.")
else:
st.write("**Training Data Preview:**")
st.dataframe(train_df.head(3))
columns = train_df.columns.tolist()
text_data = st.sidebar.selectbox("Choose the text column:", columns)
target = st.sidebar.selectbox("Choose the target column:", columns)
# Process data
if text_data and target:
with st.spinner("Processing data..."):
text_cleaner = TextCleaner()
train_df['clean_text'] = train_df[text_data].apply(
lambda x: text_cleaner.clean_text(x) if pd.notna(x) else ""
)
train_df['text_length'] = train_df[text_data].astype(str).str.len()
# Handle label encoding
label_encoder = LabelEncoder()
train_df['target'] = label_encoder.fit_transform(train_df[target].astype(str))
# Save encoder
save_artifacts(label_encoder, "artifacts", "encoder.pkl")
except Exception as e:
st.error(f"Error processing data: {e}")
train_df = None
# Data Analysis Section
if section == "Data Analysis":
if train_df is not None:
st.subheader("📊 Data Insights")
analyzer = DataAnalyzer(train_df, text_data, target)
info = analyzer.get_basic_info()
col1, col2, col3 = st.columns(3)
with col1:
st.metric("Total Samples", info['shape'][0])
with col2:
st.metric("Features", info['shape'][1])
with col3:
st.metric("Classes", len(info['class_distribution']))
st.write("**Class Distribution:**")
st.write(info['class_distribution'])
# Show sample of processed data
st.write("**Processed Data Preview:**")
sample_df = train_df[['clean_text', 'text_length', 'target']].head(10)
st.dataframe(sample_df)
st.subheader("📈 Visualizations")
col1, col2 = st.columns(2)
with col1:
st.write("**Class Distribution**")
analyzer.plot_class_distribution()
with col2:
st.write("**Text Length Distribution**")
analyzer.plot_text_length_distribution()
else:
st.warning("⚠️ Please upload training data to see analysis")
# Train Model Section
elif section == "Train Model":
if train_df is not None and 'clean_text' in train_df.columns:
st.subheader("🚀 Train a Model")
col1, col2 = st.columns(2)
with col1:
model = st.selectbox("Choose the Model", [
"Logistic Regression",
"Decision Tree",
"Random Forest",
"Linear SVC",
"Multinomial Naive Bayes"
])
with col2:
vectorizer_choice = st.selectbox("Choose Vectorizer",
["Tfidf Vectorizer", "Count Vectorizer"])
# Filter out empty texts
valid_data = train_df[train_df['clean_text'].str.len() > 0].copy()
if len(valid_data) == 0:
st.error("No valid text data after cleaning!")
else:
st.write(f"**Valid samples**: {len(valid_data)}")
# Initialize vectorizer
max_features = min(10000, len(valid_data) * 10) # Adaptive max_features
if vectorizer_choice == "Tfidf Vectorizer":
vectorizer = TfidfVectorizer(max_features=max_features, stop_words='english')
st.session_state.vectorizer_type = "tfidf"
else:
vectorizer = CountVectorizer(max_features=max_features, stop_words='english')
st.session_state.vectorizer_type = "count"
if st.button("🎯 Start Training", type="primary"):
with st.spinner("Training model..."):
try:
# Vectorize
X = vectorizer.fit_transform(valid_data['clean_text'])
y = valid_data['target']
# Split data
test_size = min(0.3, max(0.1, len(valid_data) * 0.2 / len(valid_data)))
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=test_size, random_state=42, stratify=y
)
st.write(f"**Data split** - Train: {X_train.shape[0]}, Test: {X_test.shape[0]}")
# Save vectorizer
vectorizer_filename = f"{st.session_state.vectorizer_type}_vectorizer.pkl"
if save_artifacts(vectorizer, "artifacts", vectorizer_filename):
# Train model
model_filename = train_model(model, X_train, X_test, y_train, y_test)
if model_filename:
st.success("✅ Model ready! Go to 'Predictions' to test it.")
except Exception as e:
st.error(f"Training failed: {e}")
else:
st.warning("⚠️ Please upload and process training data first")
# Predictions Section
elif section == "Predictions":
st.subheader("🔮 Make Predictions")
if os.path.exists("models") and os.listdir("models"):
available_models = [f for f in os.listdir("models") if f.endswith('.pkl')]
if available_models:
selected_model = st.selectbox("Choose trained model:", available_models)
text_input = st.text_area("Enter text to classify:",
height=100,
placeholder="Type your text here...")
if st.button("🎯 Predict", type="primary"):
if text_input.strip():
with st.spinner("Making prediction..."):
predicted_label, prediction_proba = predict_text(
selected_model,
text_input,
st.session_state.get('vectorizer_type', 'tfidf')
)
if predicted_label is not None:
st.success("✅ Prediction completed!")
st.markdown(f"**Predicted Class:** `{predicted_label}`")
if prediction_proba is not None:
st.markdown("**Class Probabilities:**")
encoder = load_artifacts("artifacts", "encoder.pkl")
if encoder is not None:
classes = encoder.classes_
prob_df = pd.DataFrame({
'Class': classes,
'Probability': prediction_proba
}).sort_values('Probability', ascending=False)
st.dataframe(prob_df, use_container_width=True)
else:
st.warning("⚠️ Please enter some text")
else:
st.warning("⚠️ No trained models found")
else:
st.warning("⚠️ No models available. Please train a model first.")
# Footer
st.markdown("---")
st.markdown("🚀 Built with Streamlit | Ready for 🤗 Hugging Face Spaces")