Alamgirapi's picture
Update app.py
a5bc77a verified
raw
history blame
22.4 kB
import streamlit as st
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
from sklearn.feature_extraction.text import TfidfVectorizer, CountVectorizer
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression
from sklearn.tree import DecisionTreeClassifier
from sklearn.ensemble import RandomForestClassifier
from sklearn.svm import LinearSVC, SVC
from sklearn.naive_bayes import MultinomialNB, GaussianNB
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix
import re
import string
import nltk
import os
import pickle
import io
import base64
# Download required NLTK data
try:
nltk.data.find('corpora/stopwords')
except LookupError:
nltk.download('stopwords', quiet=True)
try:
nltk.data.find('corpora/wordnet')
except LookupError:
nltk.download('wordnet', quiet=True)
from nltk.corpus import stopwords
from nltk.stem import WordNetLemmatizer
# Set page config
st.set_page_config(
page_title="No Code Text Classification",
page_icon="📝",
layout="wide"
)
# Initialize session state
if 'trained_model' not in st.session_state:
st.session_state.trained_model = None
if 'vectorizer' not in st.session_state:
st.session_state.vectorizer = None
if 'label_encoder' not in st.session_state:
st.session_state.label_encoder = None
if 'vectorizer_type' not in st.session_state:
st.session_state.vectorizer_type = 'tfidf'
if 'train_df' not in st.session_state:
st.session_state.train_df = None
# Text cleaning class
class TextCleaner:
def __init__(self):
self.stop_words = set(stopwords.words('english'))
self.lemmatizer = WordNetLemmatizer()
def clean_text(self, text):
if pd.isna(text):
return ""
# Convert to lowercase
text = str(text).lower()
# Remove URLs
text = re.sub(r'http\S+|www\S+|https\S+', '', text, flags=re.MULTILINE)
# Remove user mentions and hashtags
text = re.sub(r'@\w+|#\w+', '', text)
# Remove punctuation
text = text.translate(str.maketrans('', '', string.punctuation))
# Remove extra whitespace
text = re.sub(r'\s+', ' ', text).strip()
# Remove stopwords and lemmatize
words = text.split()
words = [self.lemmatizer.lemmatize(word) for word in words if word not in self.stop_words]
return ' '.join(words)
# Utility functions
def create_download_link(val, filename):
"""Generate a download link for a file"""
b64 = base64.b64encode(val)
return f'<a href="data:application/octet-stream;base64,{b64.decode()}" download="{filename}">Download {filename}</a>'
def safe_file_read(uploaded_file):
"""Safely read uploaded file with multiple encoding attempts"""
try:
# Try UTF-8 first
return pd.read_csv(uploaded_file, encoding='utf-8')
except UnicodeDecodeError:
try:
# Try latin1
uploaded_file.seek(0) # Reset file pointer
return pd.read_csv(uploaded_file, encoding='latin1')
except:
try:
# Try cp1252
uploaded_file.seek(0)
return pd.read_csv(uploaded_file, encoding='cp1252')
except Exception as e:
st.error(f"Error reading file: {str(e)}")
return None
# Data Analysis Functions
def get_data_insights(df, text_col, target_col):
"""Get basic insights from the data"""
insights = {}
# Basic info
insights['shape'] = df.shape
insights['missing_values'] = df.isnull().sum().to_dict()
# Class distribution
insights['class_distribution'] = df[target_col].value_counts().to_dict()
# Text length analysis
df['text_length'] = df[text_col].astype(str).str.len()
insights['avg_text_length'] = df['text_length'].mean()
insights['min_text_length'] = df['text_length'].min()
insights['max_text_length'] = df['text_length'].max()
return insights
def create_visualizations(df, text_col, target_col):
"""Create visualizations for the data"""
# Class distribution
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 5))
# Class distribution bar plot
class_counts = df[target_col].value_counts()
ax1.bar(class_counts.index, class_counts.values)
ax1.set_title('Class Distribution')
ax1.set_xlabel('Classes')
ax1.set_ylabel('Count')
ax1.tick_params(axis='x', rotation=45)
# Text length distribution
df['text_length'] = df[text_col].astype(str).str.len()
ax2.hist(df['text_length'], bins=30, alpha=0.7)
ax2.set_title('Text Length Distribution')
ax2.set_xlabel('Text Length')
ax2.set_ylabel('Frequency')
plt.tight_layout()
st.pyplot(fig)
# Model Training Functions
def train_model(X_train, X_test, y_train, y_test, model_name):
"""Train the selected model"""
models = {
'Logistic Regression': LogisticRegression(random_state=42, max_iter=1000),
'Decision Tree': DecisionTreeClassifier(random_state=42),
'Random Forest': RandomForestClassifier(random_state=42, n_estimators=100),
'Linear SVC': LinearSVC(random_state=42, max_iter=1000),
'SVC': SVC(random_state=42, probability=True),
'Multinomial Naive Bayes': MultinomialNB(),
'Gaussian Naive Bayes': GaussianNB()
}
model = models[model_name]
# Handle sparse matrices for Gaussian NB
if model_name == 'Gaussian Naive Bayes':
if hasattr(X_train, 'toarray'):
X_train = X_train.toarray()
X_test = X_test.toarray()
# Train model
model.fit(X_train, y_train)
# Make predictions
y_pred = model.predict(X_test)
# Calculate metrics
accuracy = accuracy_score(y_test, y_pred)
return model, accuracy, y_pred
# Main App
st.title('🔤 No Code Text Classification App')
st.markdown('Upload your data, analyze it, train models, and make predictions without writing any code!')
# Sidebar
st.sidebar.header("📁 Data Upload")
# File upload with better error handling
train_data = st.sidebar.file_uploader(
"Upload training data (CSV)",
type=["csv"],
help="Upload a CSV file with text and labels"
)
# Process uploaded data
if train_data is not None:
try:
with st.spinner("Loading data..."):
train_df = safe_file_read(train_data)
if train_df is not None:
st.session_state.train_df = train_df
st.sidebar.success(f"✅ Data loaded: {train_df.shape[0]} rows, {train_df.shape[1]} columns")
# Column selection
columns = train_df.columns.tolist()
text_col = st.sidebar.selectbox("📝 Select text column:", columns, key="text_col")
target_col = st.sidebar.selectbox("🎯 Select target column:", columns, key="target_col")
if text_col and target_col and text_col != target_col:
# Clean and prepare data
with st.spinner("Preprocessing data..."):
text_cleaner = TextCleaner()
train_df['clean_text'] = train_df[text_col].apply(text_cleaner.clean_text)
# Encode labels
label_encoder = LabelEncoder()
train_df['encoded_target'] = label_encoder.fit_transform(train_df[target_col])
st.session_state.label_encoder = label_encoder
# Main sections
tab1, tab2, tab3 = st.tabs(["📊 Data Analysis", "🤖 Train Model", "🔍 Predictions"])
# Data Analysis Tab
with tab1:
st.header("📊 Data Analysis")
col1, col2 = st.columns(2)
with col1:
st.subheader("📈 Dataset Overview")
insights = get_data_insights(train_df, text_col, target_col)
st.metric("Total Samples", insights['shape'][0])
st.metric("Number of Features", insights['shape'][1])
st.metric("Average Text Length", f"{insights['avg_text_length']:.1f}")
st.subheader("🎯 Class Distribution")
class_dist_df = pd.DataFrame(list(insights['class_distribution'].items()),
columns=['Class', 'Count'])
st.dataframe(class_dist_df, use_container_width=True)
with col2:
st.subheader("📋 Data Preview")
preview_df = train_df[[text_col, target_col]].head()
st.dataframe(preview_df, use_container_width=True)
st.subheader("🧹 Cleaned Text Preview")
cleaned_preview = train_df[['clean_text', target_col]].head()
st.dataframe(cleaned_preview, use_container_width=True)
st.subheader("📊 Visualizations")
create_visualizations(train_df, text_col, target_col)
# Train Model Tab
with tab2:
st.header("🤖 Train Model")
col1, col2 = st.columns(2)
with col1:
st.subheader("🔧 Model Selection")
model_name = st.selectbox(
"Choose a model:",
["Logistic Regression", "Decision Tree", "Random Forest",
"Linear SVC", "SVC", "Multinomial Naive Bayes", "Gaussian Naive Bayes"]
)
with col2:
st.subheader("📊 Vectorizer Selection")
vectorizer_type = st.selectbox(
"Choose vectorizer:",
["TF-IDF Vectorizer", "Count Vectorizer"]
)
# Training parameters
st.subheader("⚙️ Training Parameters")
col3, col4 = st.columns(2)
with col3:
test_size = st.slider("Test size", 0.1, 0.5, 0.2, 0.05)
max_features = st.number_input("Max features", 1000, 20000, 10000, 1000)
if st.button("🚀 Train Model", type="primary"):
try:
with st.spinner("Training model... This may take a few minutes."):
# Initialize vectorizer
if vectorizer_type == "TF-IDF Vectorizer":
vectorizer = TfidfVectorizer(max_features=max_features, stop_words='english')
st.session_state.vectorizer_type = 'tfidf'
else:
vectorizer = CountVectorizer(max_features=max_features, stop_words='english')
st.session_state.vectorizer_type = 'count'
# Vectorize text
X = vectorizer.fit_transform(train_df['clean_text'])
y = train_df['encoded_target']
# Split data
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=test_size, random_state=42, stratify=y
)
# Train model
model, accuracy, y_pred = train_model(X_train, X_test, y_train, y_test, model_name)
# Store in session state
st.session_state.trained_model = model
st.session_state.vectorizer = vectorizer
# Display results
st.success("🎉 Model training completed!")
col5, col6 = st.columns(2)
with col5:
st.metric("🎯 Accuracy", f"{accuracy:.4f}")
st.metric("🏋️ Training Samples", len(X_train))
st.metric("🧪 Test Samples", len(X_test))
with col6:
st.subheader("📊 Classification Report")
report = classification_report(y_test, y_pred,
target_names=label_encoder.classes_,
output_dict=True)
report_df = pd.DataFrame(report).transpose()
st.dataframe(report_df.round(3), use_container_width=True)
except Exception as e:
st.error(f"❌ Error during training: {str(e)}")
# Predictions Tab
with tab3:
st.header("🔍 Make Predictions")
if st.session_state.trained_model is not None:
# Single prediction
st.subheader("📝 Single Text Prediction")
user_input = st.text_area("Enter text to classify:", height=100)
if st.button("🔮 Predict", type="primary"):
if user_input.strip():
try:
with st.spinner("Making prediction..."):
# Clean and vectorize input
text_cleaner = TextCleaner()
clean_input = text_cleaner.clean_text(user_input)
input_vector = st.session_state.vectorizer.transform([clean_input])
# Handle sparse matrix for Gaussian NB
if isinstance(st.session_state.trained_model, GaussianNB):
input_vector = input_vector.toarray()
# Make prediction
prediction = st.session_state.trained_model.predict(input_vector)[0]
predicted_label = st.session_state.label_encoder.inverse_transform([prediction])[0]
# Get probabilities if available
if hasattr(st.session_state.trained_model, 'predict_proba'):
try:
proba = st.session_state.trained_model.predict_proba(input_vector)[0]
st.success("🎉 Prediction completed!")
st.write(f"**Input:** {user_input}")
st.write(f"**Predicted Class:** {predicted_label}")
# Show probabilities
st.subheader("📊 Class Probabilities")
prob_df = pd.DataFrame({
'Class': st.session_state.label_encoder.classes_,
'Probability': proba
}).sort_values('Probability', ascending=False)
st.bar_chart(prob_df.set_index('Class'))
st.dataframe(prob_df.round(4), use_container_width=True)
except:
st.success("🎉 Prediction completed!")
st.write(f"**Predicted Class:** {predicted_label}")
else:
st.success("🎉 Prediction completed!")
st.write(f"**Predicted Class:** {predicted_label}")
except Exception as e:
st.error(f"❌ Error during prediction: {str(e)}")
else:
st.warning("⚠️ Please enter some text to classify")
# Batch predictions
st.subheader("📊 Batch Predictions")
batch_file = st.file_uploader("Upload CSV for batch predictions", type=["csv"])
if batch_file is not None:
try:
batch_df = safe_file_read(batch_file)
if batch_df is not None:
st.write("**Preview:**")
st.dataframe(batch_df.head(), use_container_width=True)
batch_text_col = st.selectbox("Select text column for prediction:",
batch_df.columns.tolist())
if st.button("🚀 Run Batch Predictions"):
with st.spinner("Processing batch predictions..."):
text_cleaner = TextCleaner()
predictions = []
for text in batch_df[batch_text_col]:
try:
clean_text = text_cleaner.clean_text(str(text))
text_vector = st.session_state.vectorizer.transform([clean_text])
if isinstance(st.session_state.trained_model, GaussianNB):
text_vector = text_vector.toarray()
pred = st.session_state.trained_model.predict(text_vector)[0]
pred_label = st.session_state.label_encoder.inverse_transform([pred])[0]
predictions.append(pred_label)
except:
predictions.append("Error")
batch_df['Predicted_Class'] = predictions
st.success("🎉 Batch predictions completed!")
st.dataframe(batch_df, use_container_width=True)
# Download results
csv_data = batch_df.to_csv(index=False)
st.download_button(
label="📥 Download Results",
data=csv_data,
file_name="batch_predictions.csv",
mime="text/csv"
)
except Exception as e:
st.error(f"❌ Error processing batch file: {str(e)}")
else:
st.warning("⚠️ No trained model found. Please train a model first in the 'Train Model' tab.")
else:
st.warning("⚠️ Please select different columns for text and target.")
except Exception as e:
st.error(f"❌ Error loading file: {str(e)}")
st.info("💡 Try these solutions:")
st.write("- Check if the file is a valid CSV")
st.write("- Ensure the file is not corrupted")
st.write("- Try saving the file with UTF-8 encoding")
else:
st.info("👆 Please upload a CSV file to get started")
# Show example data format
st.subheader("📋 Expected Data Format")
example_df = pd.DataFrame({
'text': [
"This product is amazing! I love it.",
"Terrible quality, waste of money.",
"Good value for the price.",
"Not what I expected, disappointed."
],
'sentiment': ['positive', 'negative', 'positive', 'negative']
})
st.dataframe(example_df, use_container_width=True)
# Footer
st.markdown("---")
st.markdown("Built with ❤️ using Streamlit | No Code Text Classification App")