Spaces:
Runtime error
Runtime error
| import streamlit as st | |
| import pandas as pd | |
| from transformers import AutoTokenizer, AutoModelForSequenceClassification | |
| from transformers import pipeline | |
| from fuzzywuzzy import fuzz | |
| from sklearn.feature_extraction.text import TfidfVectorizer | |
| import torch.nn.functional as F | |
| import torch | |
| import io | |
| import base64 | |
| from stqdm import stqdm | |
| import nltk | |
| from nltk.corpus import stopwords | |
| nltk.download('stopwords') | |
| import matplotlib.pyplot as plt | |
| import numpy as np | |
| stopwords_list = stopwords.words('english') + ['your_additional_stopwords_here'] | |
| # Define the model and tokenizer | |
| model_name = 'nlptown/bert-base-multilingual-uncased-sentiment' | |
| model = AutoModelForSequenceClassification.from_pretrained(model_name) | |
| tokenizer = AutoTokenizer.from_pretrained(model_name) | |
| st.set_page_config(layout="wide") | |
| # Import the new model and tokenizer | |
| classifier = pipeline("zero-shot-classification", model="facebook/bart-large-mnli") | |
| BATCH_SIZE = 20 | |
| #defs | |
| def classify_reviews(reviews): | |
| probabilities = [] | |
| for i in range(0, len(reviews), BATCH_SIZE): | |
| inputs = tokenizer(reviews[i:i+BATCH_SIZE], return_tensors='pt', truncation=True, padding=True, max_length=512) | |
| outputs = model(**inputs) | |
| probabilities.extend(F.softmax(outputs.logits, dim=1).tolist()) | |
| return probabilities | |
| def top_rating(scores): | |
| return scores.index(max(scores)) + 1 | |
| def top_prob(scores): | |
| return max(scores) | |
| def get_table_download_link(df): | |
| csv = df.to_csv(index=False) | |
| b64 = base64.b64encode(csv.encode()).decode() | |
| return f'<a href="data:file/csv;base64,{b64}" download="data.csv">Download csv file</a>' | |
| def filter_dataframe(df, review_column, filter_words): | |
| # Return full DataFrame if filter_words is empty or contains only spaces | |
| if not filter_words or all(word.isspace() for word in filter_words): | |
| return df | |
| filter_scores = df[review_column].apply(lambda x: max([fuzz.token_set_ratio(x, word) for word in filter_words])) | |
| return df[filter_scores > 70] # Adjust this threshold as necessary | |
| def process_filter_words(filter_words_input): | |
| filter_words = [word.strip() for word in filter_words_input.split(',')] | |
| return filter_words | |
| # Function for classifying with the new model | |
| def classify_with_new_classes(reviews, class_names): | |
| class_scores = [] | |
| for i in range(0, len(reviews), BATCH_SIZE): | |
| batch_reviews = reviews[i:i+BATCH_SIZE] | |
| for review in batch_reviews: | |
| result = classifier(review, class_names) | |
| scores_dict = dict(zip(result['labels'], result['scores'])) | |
| # Reorder scores to match the original class_names order | |
| scores = [scores_dict[name] for name in class_names] | |
| class_scores.append(scores) | |
| return class_scores | |
| def main(): | |
| st.title('Sentiment Analysis') | |
| st.markdown('Upload an Excel file to get sentiment analytics') | |
| file = st.file_uploader("Upload an excel file", type=['xlsx']) | |
| review_column = None | |
| df = None | |
| class_names = None # New variable for class names | |
| if file is not None: | |
| try: | |
| df = pd.read_excel(file) | |
| # Drop rows where all columns are NaN | |
| df = df.dropna(how='all') | |
| # Replace blank spaces with NaN, then drop rows where all columns are NaN again | |
| df = df.replace(r'^\s*$', np.nan, regex=True) | |
| df = df.dropna(how='all') | |
| review_column = st.selectbox('Select the column from your excel file containing text', df.columns) | |
| df[review_column] = df[review_column].astype(str) | |
| filter_words_input = st.text_input('Enter words to filter the data by, separated by comma (or leave empty)') # New input field for filter words | |
| filter_words = [] if filter_words_input.strip() == "" else process_filter_words(filter_words_input) # Process the filter words | |
| class_names = st.text_input('Enter the possible class names separated by comma') # New input field for class names | |
| df = filter_dataframe(df, review_column, filter_words) # Filter the DataFrame | |
| except Exception as e: | |
| st.write("An error occurred while reading the uploaded file. Please make sure it's a valid Excel file.") | |
| return | |
| start_button = st.button('Start Analysis') | |
| if start_button and df is not None: | |
| # Drop rows with NaN or blank values in the review_column | |
| df = df[df[review_column].notna()] | |
| df = df[df[review_column].str.strip() != ''] | |
| class_names = [name.strip() for name in class_names.split(',')] # Split class names into a list | |
| for name in class_names: # Add a new column for each class name | |
| if name not in df.columns: | |
| df[name] = 0.0 | |
| if review_column in df.columns: | |
| with st.spinner('Performing sentiment analysis...'): | |
| df, df_display = process_reviews(df, review_column, class_names) | |
| display_ratings(df, review_column) # updated this line | |
| display_dataframe(df, df_display) | |
| else: | |
| st.write(f'No column named "{review_column}" found in the uploaded file.') | |
| def process_reviews(df, review_column, class_names): | |
| with st.spinner('Classifying reviews...'): | |
| progress_bar = st.progress(0) | |
| total_reviews = len(df[review_column].tolist()) | |
| review_counter = 0 | |
| raw_scores = classify_reviews(df[review_column].tolist()) | |
| for i in range(0, len(raw_scores), BATCH_SIZE): | |
| review_counter += min(BATCH_SIZE, len(raw_scores) - i) # Avoids overshooting the total reviews count | |
| progress = min(review_counter / total_reviews, 1) # Ensures progress does not exceed 1 | |
| progress_bar.progress(progress) | |
| with st.spinner('Generating classes...'): | |
| class_scores = classify_with_new_classes(df[review_column].tolist(), class_names) | |
| class_scores_dict = {} # New dictionary to store class scores | |
| for i, name in enumerate(class_names): | |
| df[name] = [score[i] for score in class_scores] | |
| class_scores_dict[name] = [score[i] for score in class_scores] | |
| # Add a new column with the class that has the highest score | |
| if class_names and not all(name.isspace() for name in class_names): | |
| df['Highest Class'] = df[class_names].idxmax(axis=1) | |
| df_new = df.copy() | |
| df_new['raw_scores'] = raw_scores | |
| scores_to_df(df_new) | |
| df_display = scores_to_percent(df_new.copy()) | |
| # Get all columns excluding the created ones and the review_column | |
| remaining_columns = [col for col in df.columns if col not in [review_column, 'raw_scores', 'Weighted Rating', 'Rating', 'Probability', '1 Star', '2 Star', '3 Star', '4 Star', '5 Star', 'Highest Class'] + class_names] | |
| # Reorder the dataframe with selected columns first, created columns next, then the remaining columns | |
| df_new = df_new[[review_column, 'Weighted Rating', 'Rating', 'Probability', '1 Star', '2 Star', '3 Star', '4 Star', '5 Star'] + class_names + ['Highest Class'] + remaining_columns] | |
| # Reorder df_display as well | |
| df_display = df_display[[review_column, 'Weighted Rating', 'Rating', 'Probability', '1 Star', '2 Star', '3 Star', '4 Star', '5 Star'] + class_names + ['Highest Class'] + remaining_columns] | |
| return df_new, df_display | |
| def scores_to_df(df): | |
| for i in range(1, 6): | |
| df[f'{i} Star'] = df['raw_scores'].apply(lambda scores: scores[i-1]).round(2) | |
| df['Rating'] = df['raw_scores'].apply(top_rating) | |
| df['Probability'] = df['raw_scores'].apply(top_prob).round(2) | |
| # Compute the Weighted Rating | |
| df['Weighted Rating'] = sum(df[f'{i} Star']*i for i in range(1, 6)) | |
| df.drop(columns=['raw_scores'], inplace=True) | |
| def scores_to_percent(df): | |
| for i in range(1, 6): | |
| df[f'{i} Star'] = df[f'{i} Star'].apply(lambda x: f'{x*100:.0f}%') | |
| df['Probability'] = df['Probability'].apply(lambda x: f'{x*100:.0f}%') | |
| return df | |
| def convert_df_to_csv(df): | |
| return df.to_csv(index=False).encode('utf-8') | |
| def display_dataframe(df, df_display): | |
| csv = convert_df_to_csv(df) | |
| col1, col2, col3, col4, col5, col6, col7, col8, col9 = st.columns(9) | |
| with col1: | |
| st.download_button( | |
| "Download CSV", | |
| csv, | |
| "data.csv", | |
| "text/csv", | |
| key='download-csv' | |
| ) | |
| st.dataframe(df_display) | |
| def important_words(reviews, num_words=5): | |
| if len(reviews) == 0: | |
| return [] | |
| vectorizer = TfidfVectorizer(stop_words=stopwords_list, max_features=10000) | |
| vectors = vectorizer.fit_transform(reviews) | |
| features = vectorizer.get_feature_names_out() | |
| indices = np.argsort(vectorizer.idf_)[::-1] | |
| top_features = [features[i] for i in indices[:num_words]] | |
| return top_features | |
| def display_ratings(df, review_column): | |
| cols = st.columns(5) | |
| for i in range(1, 6): | |
| rating_reviews = df[df['Rating'] == i][review_column] | |
| top_words = important_words(rating_reviews) | |
| rating_counts = rating_reviews.shape[0] | |
| cols[i-1].markdown(f"### {rating_counts}") | |
| cols[i-1].markdown(f"{'⭐' * i}") | |
| # Display the most important words for each rating | |
| cols[i-1].markdown(f"#### Most Important Words:") | |
| if top_words: | |
| for word in top_words: | |
| cols[i-1].markdown(f"**{word}**") | |
| else: | |
| cols[i-1].markdown("No important words to display") | |
| if __name__ == "__main__": | |
| main() | |