Spaces:
Runtime error
Runtime error
| import streamlit as st | |
| import pandas as pd | |
| import numpy as np | |
| import torch | |
| import torch.nn as nn | |
| import torch.nn.functional as F | |
| from torch.utils.data import Dataset, DataLoader | |
| from transformers import AutoTokenizer,AutoModel | |
| import random | |
| from bs4 import BeautifulSoup | |
| import re | |
| from transformers import AutoModelForSequenceClassification | |
| import pytorch_lightning as pl | |
| device = "cuda:0" if torch.cuda.is_available() else "cpu" | |
| train_path = "train.csv" | |
| test_path = "test.csv" | |
| test_labels_paths = "test_labels.csv" | |
| test_df = pd.read_csv(test_path) | |
| test_labels_df = pd.read_csv(test_labels_paths) | |
| test_df = pd.concat([test_df.iloc[:, 1], test_labels_df.iloc[:, 1:]], axis = 1) | |
| test_df.to_csv("test-dataset.csv") | |
| test_dataset_path = "test-dataset.csv" | |
| #Lets make a new column labeled "healthy" | |
| def healthy_filter(df): | |
| if (df["toxic"]==0) and (df["severe_toxic"]==0) and (df["obscene"]==0) and (df["threat"]==0) and (df["insult"]==0) and (df["identity_hate"]==0): | |
| return 1 | |
| else: | |
| return 0 | |
| attributes = ['toxic', 'severe_toxic', 'obscene', 'threat', | |
| 'insult', 'identity_hate', 'healthy'] | |
| class Comments_Dataset(Dataset): | |
| def __init__(self, data_path, tokenizer, attributes, max_token_len = 128, sample=5000): | |
| self.data_path = data_path | |
| self.tokenizer = tokenizer | |
| self.attributes = attributes | |
| self.max_token_len = max_token_len | |
| self.sample = sample | |
| self._prepare_data() | |
| def _prepare_data(self): | |
| data = pd.read_csv(self.data_path) | |
| data["healthy"] = data.apply(healthy_filter,axis=1) | |
| data["unhealthy"] = np.where(data['healthy']==1, 0, 1) | |
| if self.sample is not None: | |
| unhealthy = data.loc[data["healthy"] == 0] | |
| healthy = data.loc[data["healthy"] ==1] | |
| self.data = pd.concat([unhealthy, healthy.sample(self.sample, random_state=42)]) | |
| else: | |
| self.data = data | |
| def __len__(self): | |
| return len(self.data) | |
| def __getitem__(self,index): | |
| item = self.data.iloc[index] | |
| comment = str(item.comment_text) | |
| attributes = torch.FloatTensor(item[self.attributes]) | |
| tokens = self.tokenizer.encode_plus(comment, | |
| add_special_tokens=True, | |
| return_tensors='pt', | |
| truncation=True, | |
| padding='max_length', | |
| max_length=self.max_token_len, | |
| return_attention_mask = True) | |
| return {'input_ids': tokens.input_ids.flatten(), 'attention_mask': tokens.attention_mask.flatten(), 'labels': attributes} | |
| class Comments_Data_Module(pl.LightningDataModule): | |
| def __init__(self, train_path, val_path, attributes, batch_size: int = 16, max_token_length: int = 128, model_name='roberta-base'): | |
| super().__init__() | |
| self.train_path = train_path | |
| self.val_path = val_path | |
| self.attributes = attributes | |
| self.batch_size = batch_size | |
| self.max_token_length = max_token_length | |
| self.model_name = model_name | |
| self.tokenizer = AutoTokenizer.from_pretrained(model_name) | |
| def setup(self, stage = None): | |
| if stage in (None, "fit"): | |
| self.train_dataset = Comments_Dataset(self.train_path, attributes=self.attributes, tokenizer=self.tokenizer) | |
| self.val_dataset = Comments_Dataset(self.val_path, attributes=self.attributes, tokenizer=self.tokenizer, sample=None) | |
| if stage == 'predict': | |
| self.val_dataset = Comments_Dataset(self.val_path, attributes=self.attributes, tokenizer=self.tokenizer, sample=None) | |
| def train_dataloader(self): | |
| return DataLoader(self.train_dataset, batch_size = self.batch_size, num_workers=4, shuffle=True) | |
| def val_dataloader(self): | |
| return DataLoader(self.val_dataset, batch_size = self.batch_size, num_workers=4, shuffle=False) | |
| def predict_dataloader(self): | |
| return DataLoader(self.val_dataset, batch_size = self.batch_size, num_workers=4, shuffle=False) | |
| comments_data_module = Comments_Data_Module(train_path, test_dataset_path, attributes=attributes) | |
| comments_data_module.setup() | |
| comments_data_module.train_dataloader() | |
| class Comment_Classifier(pl.LightningModule): | |
| #the config dict has the hugginface parameters in it | |
| def __init__(self, config: dict): | |
| super().__init__() | |
| self.config = config | |
| self.pretrained_model = AutoModel.from_pretrained(config['model_name'], return_dict = True) | |
| self.hidden = torch.nn.Linear(self.pretrained_model.config.hidden_size, self.pretrained_model.config.hidden_size) | |
| self.classifier = torch.nn.Linear(self.pretrained_model.config.hidden_size, self.config['n_labels']) | |
| torch.nn.init.xavier_uniform_(self.classifier.weight) | |
| self.loss_func = nn.CrossEntropyLoss() | |
| self.dropout = nn.Dropout() | |
| def forward(self, input_ids, attention_mask, labels=None): | |
| # roberta layer | |
| output = self.pretrained_model(input_ids=input_ids, attention_mask=attention_mask) | |
| pooled_output = torch.mean(output.last_hidden_state, 1) | |
| # final logits / classification layers | |
| pooled_output = self.dropout(pooled_output) | |
| pooled_output = self.hidden(pooled_output) | |
| pooled_output = F.relu(pooled_output) | |
| pooled_output = self.dropout(pooled_output) | |
| logits = self.classifier(pooled_output) | |
| # calculate loss | |
| loss = 0 | |
| if labels is not None: | |
| loss = self.loss_func(logits.view(-1, self.config['n_labels']), labels.view(-1, self.config['n_labels'])) | |
| return loss, logits | |
| def training_step(self, batch, batch_index): | |
| loss, outputs = self(**batch) | |
| self.log("train loss ", loss, prog_bar = True, logger=True) | |
| return {"loss":loss, "predictions":outputs, "labels": batch["labels"]} | |
| def validation_step(self, batch, batch_index): | |
| loss, outputs = self(**batch) | |
| self.log("validation loss ", loss, prog_bar = True, logger=True) | |
| return {"val_loss": loss, "predictions":outputs, "labels": batch["labels"]} | |
| def predict_step(self, batch, batch_index): | |
| loss, outputs = self(**batch) | |
| return outputs | |
| def configure_optimizers(self): | |
| optimizer = AdamW(self.parameters(), lr=self.config['lr'], weight_decay=self.config['weight_decay']) | |
| total_steps = self.config['train_size']/self.config['batch_size'] | |
| warmup_steps = math.floor(total_steps * self.config['warmup']) | |
| scheduler = get_cosine_schedule_with_warmup(optimizer, warmup_steps, total_steps) | |
| return [optimizer],[scheduler] | |
| config = { | |
| 'model_name': 'distilroberta-base', | |
| 'n_labels': len(attributes), | |
| 'batch_size': 128, | |
| 'lr': 1.5e-6, | |
| 'warmup': 0.2, | |
| 'train_size': len(comments_data_module.train_dataloader()), | |
| 'weight_decay': 0.001, | |
| 'n_epochs': 100 | |
| } | |
| model_name = 'distilroberta-base' | |
| tokenizer = AutoTokenizer.from_pretrained(model_name) | |
| model = Comment_Classifier(config=config) | |
| model.load_state_dict(torch.load("model_state_dict.pt")) | |
| model.eval() | |
| def prepare_tokenized_review(raw_review): | |
| # Remove HTML tags with BS | |
| review_text = BeautifulSoup(raw_review).get_text() | |
| # Removing non-letters using a regular expression | |
| review_text = re.sub("[^a-zA-Z!?]"," ", review_text) | |
| # Convert words to lower case and split them | |
| words = review_text.lower().split() | |
| return " ".join(words) | |
| def get_encodings(text): | |
| MAX_LEN=256 | |
| encodings = tokenizer.encode_plus( | |
| text, | |
| None, | |
| add_special_tokens=True, | |
| max_length=MAX_LEN, | |
| padding='max_length', | |
| truncation=True, | |
| return_attention_mask=True, | |
| return_tensors='pt') | |
| return encodings | |
| def run_inference(encoding): | |
| with torch.no_grad(): | |
| input_ids = encoding['input_ids'].to(device, dtype=torch.long) | |
| attention_mask = encoding['attention_mask'].to(device, dtype=torch.long) | |
| output = model(input_ids, attention_mask) | |
| final_output = torch.softmax(output[1][0],dim=0).cpu() | |
| print(final_output.numpy().tolist()) | |
| return final_output.numpy().tolist() | |
| test_tweets = test_df["comment_text"].values | |
| #streamlit section | |
| models = ["distilroberta-base"] | |
| model_pointers = ["default: distilroberta-base"] | |
| # current_random_tweet = test_tweets[random.randint(0,len(test_tweets))] | |
| # current_random_tweet = prepare_tokenized_review(current_random_tweet) | |
| st.write("1. Hit the button to view and see the analyis of a random tweet") | |
| with st.form(key="init_form"): | |
| current_random_tweet = test_tweets[random.randint(0,len(test_tweets))] | |
| current_random_tweet = prepare_tokenized_review(current_random_tweet) | |
| choice = st.selectbox("Choose Model", model_pointers) | |
| user_picked_model = models[model_pointers.index(choice)] | |
| with st.spinner("Analyzing..."): | |
| text_encoding = get_encodings(current_random_tweet) | |
| result = run_inference(text_encoding) | |
| df = pd.DataFrame({"Tweet":current_random_tweet}, index=[0]) | |
| df["Highest Toxicity Class"] = attributes[result.index(max(result))] | |
| df["Sentiment Score"] = max(result) | |
| st.table(df) | |
| next_tweet = st.form_submit_button("Next Tweet") | |
| if next_tweet: | |
| with st.spinner("Analyzing..."): | |
| st.write("") |