Spaces:
Sleeping
Sleeping
| import streamlit as st | |
| import pandas as pd | |
| import matplotlib.pyplot as plt | |
| import numpy as np | |
| import seaborn as sns | |
| from sklearn.feature_extraction.text import TfidfVectorizer, CountVectorizer | |
| from sklearn.model_selection import train_test_split | |
| from sklearn.preprocessing import LabelEncoder | |
| from sklearn.linear_model import LogisticRegression | |
| from sklearn.tree import DecisionTreeClassifier | |
| from sklearn.ensemble import RandomForestClassifier | |
| from sklearn.svm import LinearSVC, SVC | |
| from sklearn.naive_bayes import MultinomialNB, GaussianNB | |
| from sklearn.metrics import accuracy_score, classification_report, confusion_matrix | |
| import os | |
| import pickle | |
| import re | |
| import string | |
| from collections import Counter | |
| import plotly.express as px | |
| import plotly.graph_objects as go | |
| # Configure Streamlit page | |
| st.set_page_config( | |
| page_title="Text Classification App", | |
| page_icon="📝", | |
| layout="wide" | |
| ) | |
| # Text preprocessing class | |
| class TextCleaner: | |
| def __init__(self): | |
| self.stop_words = set(['the', 'a', 'an', 'and', 'or', 'but', 'in', 'on', 'at', 'to', 'for', 'of', 'with', 'by']) | |
| def clean_text(self, text): | |
| """Clean and preprocess text""" | |
| if pd.isna(text): | |
| return "" | |
| text = str(text).lower() | |
| text = re.sub(r'http\S+', '', text) # Remove URLs | |
| text = re.sub(r'[^a-zA-Z\s]', '', text) # Remove non-alphabetic characters | |
| text = re.sub(r'\s+', ' ', text) # Remove extra whitespace | |
| text = text.strip() | |
| # Remove stop words (optional) | |
| words = text.split() | |
| words = [word for word in words if word not in self.stop_words] | |
| return ' '.join(words) | |
| # Data analysis functions | |
| def get_data_insights(df, text_col, target_col): | |
| """Get basic insights from the dataset""" | |
| insights = { | |
| 'shape': df.shape, | |
| 'missing_values': df.isnull().sum().to_dict(), | |
| 'class_distribution': df[target_col].value_counts().to_dict(), | |
| 'text_length_stats': { | |
| 'mean': df[text_col].str.len().mean(), | |
| 'median': df[text_col].str.len().median(), | |
| 'min': df[text_col].str.len().min(), | |
| 'max': df[text_col].str.len().max() | |
| } | |
| } | |
| return insights | |
| # Model training functions | |
| def train_model(model_name, X_train, X_test, y_train, y_test): | |
| """Train and evaluate a model""" | |
| models = { | |
| 'Logistic Regression': LogisticRegression(random_state=42, max_iter=1000), | |
| 'Decision Tree': DecisionTreeClassifier(random_state=42), | |
| 'Random Forest': RandomForestClassifier(random_state=42, n_estimators=100), | |
| 'Linear SVC': LinearSVC(random_state=42, max_iter=1000), | |
| 'SVC': SVC(random_state=42, probability=True), | |
| 'Multinomial Naive Bayes': MultinomialNB(), | |
| 'Gaussian Naive Bayes': GaussianNB() | |
| } | |
| model = models[model_name] | |
| # For Gaussian NB, convert sparse matrix to dense | |
| if model_name == 'Gaussian Naive Bayes': | |
| X_train = X_train.toarray() | |
| X_test = X_test.toarray() | |
| # Train model | |
| model.fit(X_train, y_train) | |
| # Make predictions | |
| y_pred = model.predict(X_test) | |
| # Calculate metrics | |
| accuracy = accuracy_score(y_test, y_pred) | |
| # Save model | |
| os.makedirs("models", exist_ok=True) | |
| model_filename = f"{model_name.replace(' ', '_').lower()}.pkl" | |
| with open(os.path.join("models", model_filename), 'wb') as f: | |
| pickle.dump(model, f) | |
| return model, accuracy, y_pred, model_filename | |
| # Utility functions | |
| def save_artifacts(obj, folder_name, file_name): | |
| """Save artifacts like encoders and vectorizers""" | |
| os.makedirs(folder_name, exist_ok=True) | |
| with open(os.path.join(folder_name, file_name), 'wb') as f: | |
| pickle.dump(obj, f) | |
| def load_artifacts(folder_name, file_name): | |
| """Load saved artifacts""" | |
| try: | |
| with open(os.path.join(folder_name, file_name), 'rb') as f: | |
| return pickle.load(f) | |
| except FileNotFoundError: | |
| st.error(f"File {file_name} not found in {folder_name} folder") | |
| return None | |
| def predict_text(model_filename, text, vectorizer_type="tfidf"): | |
| """Make prediction on new text""" | |
| try: | |
| # Load model | |
| with open(os.path.join('models', model_filename), 'rb') as f: | |
| model = pickle.load(f) | |
| # Load vectorizer | |
| vectorizer_file = f"{vectorizer_type}_vectorizer.pkl" | |
| vectorizer = load_artifacts("artifacts", vectorizer_file) | |
| if vectorizer is None: | |
| return None, None | |
| # Load label encoder | |
| encoder = load_artifacts("artifacts", "encoder.pkl") | |
| if encoder is None: | |
| return None, None | |
| # Clean and vectorize text | |
| text_cleaner = TextCleaner() | |
| clean_text = text_cleaner.clean_text(text) | |
| # Transform text | |
| text_vector = vectorizer.transform([clean_text]) | |
| # For Gaussian NB, convert to dense | |
| if 'gaussian' in model_filename: | |
| text_vector = text_vector.toarray() | |
| # Make prediction | |
| prediction = model.predict(text_vector) | |
| prediction_proba = None | |
| # Get prediction probabilities if available | |
| if hasattr(model, 'predict_proba'): | |
| try: | |
| prediction_proba = model.predict_proba(text_vector)[0] | |
| except: | |
| pass | |
| # Decode prediction | |
| predicted_label = encoder.inverse_transform(prediction)[0] | |
| return predicted_label, prediction_proba | |
| except Exception as e: | |
| st.error(f"Error during prediction: {str(e)}") | |
| return None, None | |
| # Streamlit App | |
| st.title('📝 No Code Text Classification App') | |
| st.markdown('---') | |
| st.write('Analyze your text data and train machine learning models without coding!') | |
| # Sidebar | |
| st.sidebar.title("Navigation") | |
| section = st.sidebar.radio("Choose Section", ["📊 Data Analysis", "🤖 Train Model", "🔮 Predictions"]) | |
| # Upload Data | |
| st.sidebar.markdown("---") | |
| st.sidebar.subheader("📁 Upload Your Dataset") | |
| train_data = st.sidebar.file_uploader("Upload training data", type=["csv"]) | |
| test_data = st.sidebar.file_uploader("Upload test data (optional)", type=["csv"]) | |
| # Global variables to store data and settings | |
| if 'vectorizer_type' not in st.session_state: | |
| st.session_state.vectorizer_type = "tfidf" | |
| if train_data is not None: | |
| try: | |
| # Try different encodings | |
| encodings = ['utf-8', 'latin1', 'cp1252', 'iso-8859-1'] | |
| train_df = None | |
| for encoding in encodings: | |
| try: | |
| train_df = pd.read_csv(train_data, encoding=encoding) | |
| break | |
| except UnicodeDecodeError: | |
| continue | |
| if train_df is None: | |
| st.error("Unable to read the CSV file. Please check the file encoding.") | |
| else: | |
| if test_data is not None: | |
| for encoding in encodings: | |
| try: | |
| test_df = pd.read_csv(test_data, encoding=encoding) | |
| break | |
| except UnicodeDecodeError: | |
| continue | |
| else: | |
| test_df = None | |
| # Show data preview | |
| with st.sidebar.expander("📋 Data Preview", expanded=True): | |
| st.write("Shape:", train_df.shape) | |
| st.write(train_df.head(2)) | |
| columns = train_df.columns.tolist() | |
| text_data = st.sidebar.selectbox("📝 Choose the text column:", columns) | |
| target = st.sidebar.selectbox("🎯 Choose the target column:", columns) | |
| # Process data | |
| if text_data and target: | |
| # Clean text | |
| text_cleaner = TextCleaner() | |
| train_df['clean_text'] = train_df[text_data].apply(text_cleaner.clean_text) | |
| train_df['text_length'] = train_df[text_data].str.len() | |
| # Handle label encoding | |
| label_encoder = LabelEncoder() | |
| train_df['target_encoded'] = label_encoder.fit_transform(train_df[target]) | |
| # Save label encoder | |
| save_artifacts(label_encoder, "artifacts", "encoder.pkl") | |
| except Exception as e: | |
| st.error(f"Error loading data: {str(e)}") | |
| train_df = None | |
| # Data Analysis Section | |
| if section == "📊 Data Analysis": | |
| if train_data is not None and 'train_df' in locals() and train_df is not None: | |
| st.header("📊 Data Analysis") | |
| # Get insights | |
| insights = get_data_insights(train_df, text_data, target) | |
| # Display insights in columns | |
| col1, col2, col3, col4 = st.columns(4) | |
| with col1: | |
| st.metric("Total Samples", insights['shape'][0]) | |
| with col2: | |
| st.metric("Features", insights['shape'][1]) | |
| with col3: | |
| st.metric("Classes", len(insights['class_distribution'])) | |
| with col4: | |
| st.metric("Avg Text Length", f"{insights['text_length_stats']['mean']:.1f}") | |
| st.markdown("---") | |
| # Data quality section | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| st.subheader("📋 Dataset Overview") | |
| st.write("**Shape:**", insights['shape']) | |
| st.write("**Missing Values:**") | |
| missing_df = pd.DataFrame.from_dict(insights['missing_values'], orient='index', columns=['Count']) | |
| st.dataframe(missing_df[missing_df['Count'] > 0]) | |
| st.write("**Sample Data:**") | |
| st.dataframe(train_df[[text_data, target, 'text_length']].head()) | |
| with col2: | |
| st.subheader("📊 Class Distribution") | |
| class_dist = pd.DataFrame.from_dict(insights['class_distribution'], orient='index', columns=['Count']) | |
| st.dataframe(class_dist) | |
| # Plot class distribution | |
| fig = px.bar( | |
| x=class_dist.index, | |
| y=class_dist['Count'], | |
| title="Class Distribution", | |
| labels={'x': 'Class', 'y': 'Count'} | |
| ) | |
| st.plotly_chart(fig, use_container_width=True) | |
| st.markdown("---") | |
| # Text analysis section | |
| st.subheader("📝 Text Analysis") | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| # Text length distribution | |
| fig = px.histogram( | |
| train_df, | |
| x='text_length', | |
| title="Text Length Distribution", | |
| nbins=30 | |
| ) | |
| st.plotly_chart(fig, use_container_width=True) | |
| with col2: | |
| # Text length by class | |
| fig = px.box( | |
| train_df, | |
| x=target, | |
| y='text_length', | |
| title="Text Length by Class" | |
| ) | |
| st.plotly_chart(fig, use_container_width=True) | |
| # Word frequency analysis | |
| st.subheader("🔤 Most Common Words") | |
| all_text = ' '.join(train_df['clean_text'].astype(str)) | |
| word_freq = Counter(all_text.split()) | |
| top_words = word_freq.most_common(20) | |
| if top_words: | |
| words_df = pd.DataFrame(top_words, columns=['Word', 'Frequency']) | |
| fig = px.bar( | |
| words_df, | |
| x='Frequency', | |
| y='Word', | |
| orientation='h', | |
| title="Top 20 Most Common Words" | |
| ) | |
| fig.update_layout(yaxis={'categoryorder': 'total ascending'}) | |
| st.plotly_chart(fig, use_container_width=True) | |
| else: | |
| st.warning("📁 Please upload training data to perform analysis") | |
| # Train Model Section | |
| elif section == "🤖 Train Model": | |
| if train_data is not None and 'train_df' in locals() and train_df is not None: | |
| st.header("🤖 Train Machine Learning Model") | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| st.subheader("⚙️ Model Configuration") | |
| model_name = st.selectbox("Choose Model", [ | |
| "Logistic Regression", "Decision Tree", | |
| "Random Forest", "Linear SVC", "SVC", | |
| "Multinomial Naive Bayes", "Gaussian Naive Bayes" | |
| ]) | |
| with col2: | |
| st.subheader("📊 Vectorization Method") | |
| vectorizer_choice = st.selectbox("Choose Vectorizer", ["TF-IDF", "Count Vectorizer"]) | |
| # Model parameters | |
| st.subheader("🔧 Parameters") | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| max_features = st.slider("Max Features", 1000, 20000, 10000, step=1000) | |
| test_size = st.slider("Test Size", 0.1, 0.4, 0.2, step=0.05) | |
| with col2: | |
| random_state = st.number_input("Random State", 0, 1000, 42) | |
| min_df = st.slider("Min Document Frequency", 1, 10, 1) | |
| # Initialize vectorizer | |
| if vectorizer_choice == "TF-IDF": | |
| vectorizer = TfidfVectorizer( | |
| max_features=max_features, | |
| min_df=min_df, | |
| stop_words='english' | |
| ) | |
| st.session_state.vectorizer_type = "tfidf" | |
| else: | |
| vectorizer = CountVectorizer( | |
| max_features=max_features, | |
| min_df=min_df, | |
| stop_words='english' | |
| ) | |
| st.session_state.vectorizer_type = "count" | |
| # Show data info | |
| st.subheader("📋 Training Data Info") | |
| col1, col2, col3 = st.columns(3) | |
| with col1: | |
| st.metric("Total Samples", len(train_df)) | |
| with col2: | |
| st.metric("Unique Classes", train_df[target].nunique()) | |
| with col3: | |
| st.metric("Avg Text Length", f"{train_df['text_length'].mean():.1f}") | |
| if st.button("🚀 Start Training", type="primary"): | |
| with st.spinner("Training model... This may take a few minutes."): | |
| try: | |
| # Vectorize text data | |
| X = vectorizer.fit_transform(train_df['clean_text']) | |
| y = train_df['target_encoded'] | |
| # Split data | |
| X_train, X_test, y_train, y_test = train_test_split( | |
| X, y, | |
| test_size=test_size, | |
| random_state=random_state, | |
| stratify=y | |
| ) | |
| st.success(f"✅ Data split - Train: {X_train.shape}, Test: {X_test.shape}") | |
| # Save vectorizer | |
| vectorizer_filename = f"{st.session_state.vectorizer_type}_vectorizer.pkl" | |
| save_artifacts(vectorizer, "artifacts", vectorizer_filename) | |
| # Train model | |
| model, accuracy, y_pred, model_filename = train_model( | |
| model_name, X_train, X_test, y_train, y_test | |
| ) | |
| st.success("🎉 Model training completed!") | |
| # Display results | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| st.metric("🎯 Test Accuracy", f"{accuracy:.4f}") | |
| # Classification report | |
| st.subheader("📊 Classification Report") | |
| report = classification_report( | |
| y_test, y_pred, | |
| target_names=label_encoder.classes_, | |
| output_dict=True | |
| ) | |
| report_df = pd.DataFrame(report).transpose() | |
| st.dataframe(report_df.round(4)) | |
| with col2: | |
| # Confusion matrix | |
| st.subheader("🔄 Confusion Matrix") | |
| cm = confusion_matrix(y_test, y_pred) | |
| fig = px.imshow( | |
| cm, | |
| text_auto=True, | |
| aspect="auto", | |
| title="Confusion Matrix", | |
| labels=dict(x="Predicted", y="Actual"), | |
| x=label_encoder.classes_, | |
| y=label_encoder.classes_ | |
| ) | |
| st.plotly_chart(fig, use_container_width=True) | |
| st.info(f"✅ Model saved as: {model_filename}") | |
| st.info("🔮 You can now use the 'Predictions' section to classify new text!") | |
| except Exception as e: | |
| st.error(f"❌ Error during training: {str(e)}") | |
| else: | |
| st.warning("📁 Please upload training data to train a model") | |
| # Predictions Section | |
| elif section == "🔮 Predictions": | |
| st.header("🔮 Text Classification Predictions") | |
| # Check if models exist | |
| if os.path.exists("models") and os.listdir("models"): | |
| available_models = [f for f in os.listdir("models") if f.endswith('.pkl')] | |
| if available_models: | |
| # Single prediction | |
| st.subheader("📝 Single Text Classification") | |
| col1, col2 = st.columns([2, 1]) | |
| with col1: | |
| text_input = st.text_area("Enter text to classify:", height=150) | |
| with col2: | |
| selected_model = st.selectbox("Choose model:", available_models) | |
| predict_button = st.button("🔮 Predict", type="primary") | |
| if predict_button and text_input.strip(): | |
| with st.spinner("Making prediction..."): | |
| predicted_label, prediction_proba = predict_text( | |
| selected_model, | |
| text_input, | |
| st.session_state.get('vectorizer_type', 'tfidf') | |
| ) | |
| if predicted_label is not None: | |
| st.success("✅ Prediction completed!") | |
| # Display results | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| st.markdown("### 🎯 Results") | |
| st.markdown(f"**Input Text:** {text_input[:200]}{'...' if len(text_input) > 200 else ''}") | |
| st.markdown(f"**Predicted Class:** `{predicted_label}`") | |
| with col2: | |
| # Display probabilities if available | |
| if prediction_proba is not None: | |
| st.markdown("### 📊 Class Probabilities") | |
| encoder = load_artifacts("artifacts", "encoder.pkl") | |
| if encoder is not None: | |
| prob_df = pd.DataFrame({ | |
| 'Class': encoder.classes_, | |
| 'Probability': prediction_proba | |
| }).sort_values('Probability', ascending=False) | |
| fig = px.bar( | |
| prob_df, | |
| x='Probability', | |
| y='Class', | |
| orientation='h', | |
| title="Prediction Confidence" | |
| ) | |
| fig.update_layout(yaxis={'categoryorder': 'total ascending'}) | |
| st.plotly_chart(fig, use_container_width=True) | |
| elif predict_button: | |
| st.warning("⚠️ Please enter some text to classify") | |
| # Batch predictions | |
| st.markdown("---") | |
| st.subheader("📊 Batch Predictions") | |
| uploaded_file = st.file_uploader("Upload CSV file with texts to classify", type=['csv']) | |
| if uploaded_file is not None: | |
| try: | |
| # Try different encodings for batch file | |
| encodings = ['utf-8', 'latin1', 'cp1252', 'iso-8859-1'] | |
| batch_df = None | |
| for encoding in encodings: | |
| try: | |
| batch_df = pd.read_csv(uploaded_file, encoding=encoding) | |
| break | |
| except UnicodeDecodeError: | |
| continue | |
| if batch_df is not None: | |
| st.write("📋 Uploaded data preview:") | |
| st.dataframe(batch_df.head()) | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| text_column = st.selectbox("Select text column:", batch_df.columns.tolist()) | |
| with col2: | |
| batch_model = st.selectbox("Choose model:", available_models, key="batch_model") | |
| if st.button("🚀 Run Batch Predictions", type="primary"): | |
| with st.spinner("Processing batch predictions..."): | |
| predictions = [] | |
| confidences = [] | |
| progress_bar = st.progress(0) | |
| total_texts = len(batch_df) | |
| for i, text in enumerate(batch_df[text_column]): | |
| pred, proba = predict_text( | |
| batch_model, | |
| str(text), | |
| st.session_state.get('vectorizer_type', 'tfidf') | |
| ) | |
| predictions.append(pred if pred is not None else "Error") | |
| # Get confidence (max probability) | |
| if proba is not None: | |
| confidences.append(max(proba)) | |
| else: | |
| confidences.append(0.0) | |
| progress_bar.progress((i + 1) / total_texts) | |
| batch_df['Predicted_Class'] = predictions | |
| batch_df['Confidence'] = confidences | |
| st.success("✅ Batch predictions completed!") | |
| # Show results | |
| st.subheader("📊 Results") | |
| result_df = batch_df[[text_column, 'Predicted_Class', 'Confidence']] | |
| st.dataframe(result_df) | |
| # Summary statistics | |
| st.subheader("📈 Summary") | |
| col1, col2, col3 = st.columns(3) | |
| with col1: | |
| st.metric("Total Predictions", len(predictions)) | |
| with col2: | |
| successful_preds = sum(1 for p in predictions if p != "Error") | |
| st.metric("Successful", successful_preds) | |
| with col3: | |
| avg_confidence = sum(confidences) / len(confidences) if confidences else 0 | |
| st.metric("Avg Confidence", f"{avg_confidence:.3f}") | |
| # Class distribution of predictions | |
| pred_counts = pd.Series(predictions).value_counts() | |
| if len(pred_counts) > 0: | |
| fig = px.pie( | |
| values=pred_counts.values, | |
| names=pred_counts.index, | |
| title="Distribution of Predictions" | |
| ) | |
| st.plotly_chart(fig, use_container_width=True) | |
| # Download results | |
| csv = batch_df.to_csv(index=False) | |
| st.download_button( | |
| label="📥 Download Results as CSV", | |
| data=csv, | |
| file_name="batch_predictions.csv", | |
| mime="text/csv" | |
| ) | |
| else: | |
| st.error("❌ Unable to read the CSV file. Please check the file encoding.") | |
| except Exception as e: | |
| st.error(f"❌ Error in batch prediction: {str(e)}") | |
| else: | |
| st.warning("⚠️ No trained models found. Please train a model first.") | |
| else: | |
| st.warning("⚠️ No models directory found. Please go to 'Train Model' section to train a model first.") | |
| # Footer | |
| st.markdown("---") | |
| st.markdown("🚀 Built with Streamlit | 📊 No-Code Text Classification") |