Spaces:
Sleeping
Sleeping
| import streamlit as st | |
| import pandas as pd | |
| import matplotlib.pyplot as plt | |
| import numpy as np | |
| import os | |
| import pickle | |
| import io | |
| import traceback | |
| import sys | |
| import base64 | |
| from datetime import datetime | |
| # Import ML libraries with error handling | |
| try: | |
| from sklearn.feature_extraction.text import TfidfVectorizer, CountVectorizer | |
| from sklearn.metrics import accuracy_score, classification_report, confusion_matrix | |
| from sklearn.preprocessing import LabelEncoder | |
| st.success("โ Sklearn imported successfully") | |
| except ImportError as e: | |
| st.error(f"โ Sklearn import error: {e}") | |
| # Import custom modules with error handling | |
| try: | |
| from NoCodeTextClassifier.EDA import Informations, Visualizations | |
| from NoCodeTextClassifier.preprocessing import process, TextCleaner, Vectorization | |
| from NoCodeTextClassifier.models import Models | |
| st.success("โ NoCodeTextClassifier imported successfully") | |
| except ImportError as e: | |
| st.error(f"โ NoCodeTextClassifier import error: {e}") | |
| st.info("Please ensure NoCodeTextClassifier package is installed") | |
| # Set page config | |
| st.set_page_config(page_title="Fixed Text Classification", page_icon="๐ง", layout="wide") | |
| # Debug section | |
| st.sidebar.header("๐ Debug Information") | |
| debug_mode = st.sidebar.checkbox("Enable Debug Mode", value=True) | |
| def debug_log(message, level="INFO"): | |
| """Debug logging function""" | |
| if debug_mode: | |
| timestamp = datetime.now().strftime("%H:%M:%S") | |
| st.sidebar.write(f"**{timestamp} [{level}]:** {message}") | |
| # Alternative file upload methods | |
| def alternative_file_upload(): | |
| """Alternative file upload methods to bypass 403 error""" | |
| st.subheader("๐ง Alternative File Upload Methods") | |
| # Method 1: Text area paste | |
| st.markdown("### Method 1: Copy-Paste CSV Content") | |
| st.info("Copy your CSV content and paste it in the text area below") | |
| csv_content = st.text_area( | |
| "Paste your CSV content here:", | |
| height=200, | |
| placeholder="name,age,city\nJohn,25,New York\nJane,30,London" | |
| ) | |
| if csv_content and st.button("Load from Text Area", type="primary"): | |
| try: | |
| df = pd.read_csv(io.StringIO(csv_content)) | |
| st.success("โ CSV loaded from text area!") | |
| return df, "text_area" | |
| except Exception as e: | |
| st.error(f"Error parsing CSV: {e}") | |
| return None, None | |
| # Method 2: Base64 upload (for advanced users) | |
| st.markdown("### Method 2: Base64 Upload") | |
| with st.expander("For Advanced Users - Base64 Upload"): | |
| st.info("Convert your CSV to base64 and paste here") | |
| st.code(""" | |
| # Python code to convert CSV to base64: | |
| import base64 | |
| with open('your_file.csv', 'rb') as f: | |
| encoded = base64.b64encode(f.read()).decode() | |
| print(encoded) | |
| """) | |
| base64_content = st.text_area("Paste base64 encoded CSV:", height=100) | |
| if base64_content and st.button("Load from Base64"): | |
| try: | |
| decoded = base64.b64decode(base64_content) | |
| df = pd.read_csv(io.BytesIO(decoded)) | |
| st.success("โ CSV loaded from base64!") | |
| return df, "base64" | |
| except Exception as e: | |
| st.error(f"Error decoding base64: {e}") | |
| return None, None | |
| # Method 3: Sample data | |
| st.markdown("### Method 3: Use Sample Data") | |
| if st.button("Load Sample Text Classification Data"): | |
| # Create sample data | |
| sample_data = { | |
| 'text': [ | |
| 'I love this product, it works great!', | |
| 'This is terrible, waste of money', | |
| 'Good quality and fast delivery', | |
| 'Not satisfied with the purchase', | |
| 'Excellent service and support', | |
| 'Poor quality, arrived damaged', | |
| 'Amazing product, highly recommend', | |
| 'Disappointed with the results' | |
| ], | |
| 'label': ['positive', 'negative', 'positive', 'negative', | |
| 'positive', 'negative', 'positive', 'negative'] | |
| } | |
| df = pd.DataFrame(sample_data) | |
| st.success("โ Sample data loaded!") | |
| return df, "sample" | |
| return None, None | |
| def safe_file_uploader_with_fallback(): | |
| """Try normal upload first, then fallback methods""" | |
| st.markdown("### ๐ Upload Your CSV File") | |
| # Try standard uploader first | |
| uploaded_file = st.file_uploader( | |
| "Choose a CSV file", | |
| type=['csv'], | |
| help="If upload fails with 403 error, use alternative methods below" | |
| ) | |
| if uploaded_file is not None: | |
| try: | |
| debug_log("๐ File uploaded successfully via standard method") | |
| df = pd.read_csv(uploaded_file) | |
| st.success("โ File uploaded successfully!") | |
| return df, "standard" | |
| except Exception as e: | |
| st.error(f"Error reading uploaded file: {e}") | |
| debug_log(f"โ Standard upload failed: {e}", "ERROR") | |
| # If standard upload fails or no file uploaded, show alternatives | |
| st.markdown("---") | |
| st.markdown("### ๐ Alternative Upload Methods") | |
| st.warning("If you're getting a 403 error, try one of these alternative methods:") | |
| return alternative_file_upload() | |
| # Utility functions (same as before but with debug) | |
| def save_artifacts(obj, folder_name, file_name): | |
| """Save artifacts with debugging""" | |
| debug_log(f"๐พ Saving {file_name} to {folder_name}") | |
| try: | |
| os.makedirs(folder_name, exist_ok=True) | |
| full_path = os.path.join(folder_name, file_name) | |
| with open(full_path, 'wb') as f: | |
| pickle.dump(obj, f) | |
| debug_log(f"โ Successfully saved {file_name}") | |
| return True | |
| except Exception as e: | |
| debug_log(f"โ Error saving {file_name}: {str(e)}", "ERROR") | |
| st.error(f"Save error: {str(e)}") | |
| return False | |
| def load_artifacts(folder_name, file_name): | |
| """Load artifacts with debugging""" | |
| debug_log(f"๐ Loading {file_name} from {folder_name}") | |
| try: | |
| full_path = os.path.join(folder_name, file_name) | |
| if not os.path.exists(full_path): | |
| debug_log(f"โ File not found: {full_path}", "ERROR") | |
| return None | |
| with open(full_path, 'rb') as f: | |
| obj = pickle.load(f) | |
| debug_log(f"โ Successfully loaded {file_name}") | |
| return obj | |
| except Exception as e: | |
| debug_log(f"โ Error loading {file_name}: {str(e)}", "ERROR") | |
| return None | |
| def load_model(model_name): | |
| """Load model with debugging""" | |
| debug_log(f"๐ค Loading model: {model_name}") | |
| return load_artifacts("models", model_name) | |
| def predict_text(model_name, text, vectorizer_type="tfidf"): | |
| """Make prediction with debugging""" | |
| debug_log(f"๐ฎ Starting prediction with {model_name}") | |
| try: | |
| # Load components | |
| model = load_model(model_name) | |
| if model is None: | |
| return None, None | |
| vectorizer_file = f"{vectorizer_type}_vectorizer.pkl" | |
| vectorizer = load_artifacts("artifacts", vectorizer_file) | |
| if vectorizer is None: | |
| return None, None | |
| encoder = load_artifacts("artifacts", "encoder.pkl") | |
| if encoder is None: | |
| return None, None | |
| debug_log("๐งน Cleaning text...") | |
| text_cleaner = TextCleaner() | |
| clean_text = text_cleaner.clean_text(text) | |
| debug_log("๐ข Vectorizing text...") | |
| text_vector = vectorizer.transform([clean_text]) | |
| debug_log("๐ฏ Making prediction...") | |
| prediction = model.predict(text_vector) | |
| prediction_proba = None | |
| if hasattr(model, 'predict_proba'): | |
| try: | |
| prediction_proba = model.predict_proba(text_vector)[0] | |
| except: | |
| debug_log("No prediction probabilities available", "WARNING") | |
| predicted_label = encoder.inverse_transform(prediction)[0] | |
| debug_log(f"โ Prediction complete: {predicted_label}") | |
| return predicted_label, prediction_proba | |
| except Exception as e: | |
| debug_log(f"โ Prediction error: {str(e)}", "ERROR") | |
| st.error(f"Prediction error: {str(e)}") | |
| return None, None | |
| # Main App | |
| st.title('๐ง Fixed Text Classification App') | |
| st.write('Workaround version to bypass 403 upload errors') | |
| # Show environment info in sidebar if debug mode | |
| if debug_mode: | |
| st.sidebar.subheader("๐ฅ๏ธ Environment Info") | |
| st.sidebar.write(f"Python version: {sys.version}") | |
| st.sidebar.write(f"Streamlit version: {st.__version__}") | |
| st.sidebar.write(f"Current directory: {os.getcwd()}") | |
| # Navigation | |
| section = st.sidebar.radio("Choose Section", [ | |
| "Upload Data", "Data Analysis", "Train Model", "Predictions" | |
| ]) | |
| # Session state | |
| if 'train_df' not in st.session_state: | |
| st.session_state.train_df = None | |
| if 'upload_method' not in st.session_state: | |
| st.session_state.upload_method = None | |
| if 'vectorizer_type' not in st.session_state: | |
| st.session_state.vectorizer_type = "tfidf" | |
| # Upload Data Section | |
| if section == "Upload Data": | |
| st.subheader("๐ Upload Your Dataset") | |
| df, method = safe_file_uploader_with_fallback() | |
| if df is not None: | |
| st.session_state.train_df = df | |
| st.session_state.upload_method = method | |
| st.write("### ๐ Data Preview") | |
| st.dataframe(df.head()) | |
| st.write("### ๐ Basic Info") | |
| col1, col2, col3 = st.columns(3) | |
| with col1: | |
| st.metric("Rows", df.shape[0]) | |
| with col2: | |
| st.metric("Columns", df.shape[1]) | |
| with col3: | |
| st.metric("Missing Values", df.isnull().sum().sum()) | |
| st.write("### ๐ท๏ธ Select Columns") | |
| columns = df.columns.tolist() | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| text_column = st.selectbox("Select text column:", columns) | |
| with col2: | |
| target_column = st.selectbox("Select target/label column:", columns) | |
| if text_column and target_column: | |
| st.session_state.text_column = text_column | |
| st.session_state.target_column = target_column | |
| # Show sample data | |
| st.write("### ๐ Sample Data") | |
| sample_df = df[[text_column, target_column]].head() | |
| st.dataframe(sample_df) | |
| # Show target distribution | |
| st.write("### ๐ฏ Target Distribution") | |
| target_counts = df[target_column].value_counts() | |
| st.bar_chart(target_counts) | |
| st.success("โ Data ready for processing!") | |
| # Data Analysis Section | |
| elif section == "Data Analysis": | |
| if st.session_state.train_df is not None: | |
| df = st.session_state.train_df | |
| text_col = st.session_state.get('text_column') | |
| target_col = st.session_state.get('target_column') | |
| if text_col and target_col: | |
| st.subheader("๐ Data Analysis") | |
| try: | |
| # Process data using custom classes | |
| info = Informations(df, text_col, target_col) | |
| df['clean_text'] = info.clean_text() | |
| df['text_length'] = info.text_length() | |
| # Update session state | |
| st.session_state.train_df = df | |
| # Show analysis | |
| st.write("**Data Shape:**", info.shape()) | |
| st.write("**Class Distribution:**", info.class_imbalanced()) | |
| st.write("**Missing Values:**", info.missing_values()) | |
| # Text length analysis | |
| st.write("**Text Length Analysis:**") | |
| st.write(info.analysis_text_length('text_length')) | |
| # Visualizations | |
| vis = Visualizations(df, text_col, target_col) | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| st.write("**Class Distribution:**") | |
| vis.class_distribution() | |
| with col2: | |
| st.write("**Text Length Distribution:**") | |
| vis.text_length_distribution() | |
| except Exception as e: | |
| st.error(f"Error in analysis: {e}") | |
| debug_log(f"Analysis error: {e}", "ERROR") | |
| else: | |
| st.warning("Please select text and target columns in the Upload Data section.") | |
| else: | |
| st.warning("Please upload data first.") | |
| # Train Model Section | |
| elif section == "Train Model": | |
| if st.session_state.train_df is not None: | |
| df = st.session_state.train_df | |
| text_col = st.session_state.get('text_column') | |
| target_col = st.session_state.get('target_column') | |
| if text_col and target_col and 'clean_text' in df.columns: | |
| st.subheader("๐ค Train Model") | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| model_choice = st.selectbox("Choose Model:", [ | |
| "Logistic Regression", "Decision Tree", "Random Forest", | |
| "Linear SVC", "SVC", "Multinomial Naive Bayes" | |
| ]) | |
| with col2: | |
| vectorizer_choice = st.selectbox("Choose Vectorizer:", | |
| ["Tfidf Vectorizer", "Count Vectorizer"]) | |
| if st.button("๐ Train Model", type="primary"): | |
| with st.spinner("Training model..."): | |
| try: | |
| # Prepare data | |
| if vectorizer_choice == "Tfidf Vectorizer": | |
| vectorizer = TfidfVectorizer(max_features=10000) | |
| st.session_state.vectorizer_type = "tfidf" | |
| else: | |
| vectorizer = CountVectorizer(max_features=10000) | |
| st.session_state.vectorizer_type = "count" | |
| # Label encoding | |
| label_encoder = LabelEncoder() | |
| y = label_encoder.fit_transform(df[target_col]) | |
| X = vectorizer.fit_transform(df['clean_text']) | |
| # Split data | |
| X_train, X_test, y_train, y_test = process.split_data(X, y) | |
| # Save artifacts | |
| save_artifacts(vectorizer, "artifacts", f"{st.session_state.vectorizer_type}_vectorizer.pkl") | |
| save_artifacts(label_encoder, "artifacts", "encoder.pkl") | |
| # Train model | |
| models = Models(X_train=X_train, X_test=X_test, y_train=y_train, y_test=y_test) | |
| if model_choice == "Logistic Regression": | |
| models.LogisticRegression() | |
| elif model_choice == "Decision Tree": | |
| models.DecisionTree() | |
| elif model_choice == "Random Forest": | |
| models.RandomForestClassifier() | |
| elif model_choice == "Linear SVC": | |
| models.LinearSVC() | |
| elif model_choice == "SVC": | |
| models.SVC() | |
| elif model_choice == "Multinomial Naive Bayes": | |
| models.MultinomialNB() | |
| st.success("๐ Model trained successfully!") | |
| except Exception as e: | |
| st.error(f"Training error: {e}") | |
| debug_log(f"Training error: {e}", "ERROR") | |
| else: | |
| st.warning("Please complete data analysis first to process the text data.") | |
| else: | |
| st.warning("Please upload data first.") | |
| # Predictions Section | |
| elif section == "Predictions": | |
| st.subheader("๐ฎ Make Predictions") | |
| # Check for models | |
| if os.path.exists("models") and os.listdir("models"): | |
| available_models = [f for f in os.listdir("models") if f.endswith('.pkl')] | |
| if available_models: | |
| selected_model = st.selectbox("Choose trained model:", available_models) | |
| # Single prediction | |
| st.write("### Single Text Prediction") | |
| text_input = st.text_area("Enter text to classify:", height=100) | |
| if st.button("๐ฏ Predict") and text_input: | |
| prediction, probabilities = predict_text( | |
| selected_model, | |
| text_input, | |
| st.session_state.get('vectorizer_type', 'tfidf') | |
| ) | |
| if prediction is not None: | |
| st.success(f"**Prediction:** {prediction}") | |
| if probabilities is not None: | |
| encoder = load_artifacts("artifacts", "encoder.pkl") | |
| if encoder is not None: | |
| prob_df = pd.DataFrame({ | |
| 'Class': encoder.classes_, | |
| 'Probability': probabilities | |
| }).sort_values('Probability', ascending=False) | |
| st.bar_chart(prob_df.set_index('Class')) | |
| else: | |
| st.info("No trained models found. Train a model first.") | |
| else: | |
| st.info("No models directory found. Train a model first.") | |
| # Show upload method used in sidebar | |
| if st.session_state.upload_method: | |
| st.sidebar.success(f"โ Data loaded via: {st.session_state.upload_method}") |