| |
| """Sentiment_analysis.ipynb |
| |
| Automatically generated by Colaboratory. |
| |
| Original file is located at |
| https://colab.research.google.com/drive/1EHgMQQJzwbNja0JVMM2DVvrVTMHIS3Vg |
| """ |
|
|
| !pip install transformers |
|
|
| import pandas as pd |
| from wordcloud import WordCloud |
| import seaborn as sns |
| import re |
| import string |
| from collections import Counter, defaultdict |
|
|
| from sklearn.feature_extraction.text import CountVectorizer, TfidfVectorizer |
|
|
| import plotly.express as px |
| from plotly.subplots import make_subplots |
| import plotly.graph_objects as go |
| from plotly.offline import plot |
|
|
| import matplotlib.gridspec as gridspec |
| from matplotlib.ticker import MaxNLocator |
| import matplotlib.patches as mpatches |
| import matplotlib.pyplot as plt |
| import warnings |
| warnings.filterwarnings('ignore') |
| import nltk |
| nltk.download('stopwords') |
| from nltk.corpus import stopwords |
| stopWords_nltk = set(stopwords.words('english')) |
|
|
|
|
| import re |
| from typing import Union, List |
|
|
| class CleanText(): |
| """ clearing text except digits () . , word character """ |
|
|
| def __init__(self, clean_pattern = r"[^A-ZĞÜŞİÖÇIa-zğüı'şöç0-9.\"',()]"): |
| self.clean_pattern =clean_pattern |
|
|
| def __call__(self, text: Union[str, list]) -> str: |
|
|
| if isinstance(text, str): |
| docs = [[text]] |
|
|
| if isinstance(text, list): |
| docs = text |
|
|
| text = [[re.sub(self.clean_pattern, " ", sent) for sent in sents] for sents in docs] |
|
|
| |
| text = ' '.join([' '.join(sents) for sents in text]) |
|
|
| return text |
|
|
| def remove_emoji(data): |
| emoj = re.compile("[" |
| u"\U0001F600-\U0001F64F" |
| u"\U0001F300-\U0001F5FF" |
| u"\U0001F680-\U0001F6FF" |
| u"\U0001F1E0-\U0001F1FF" |
| u"\U00002500-\U00002BEF" |
| u"\U00002702-\U000027B0" |
| u"\U00002702-\U000027B0" |
| u"\U000024C2-\U0001F251" |
| u"\U0001f926-\U0001f937" |
| u"\U00010000-\U0010ffff" |
| u"\u2640-\u2642" |
| u"\u2600-\u2B55" |
| u"\u200d" |
| u"\u23cf" |
| u"\u23e9" |
| u"\u231a" |
| u"\ufe0f" |
| u"\u3030" |
| "]+", re.UNICODE) |
| return re.sub(emoj, '', data) |
|
|
| def tokenize(text): |
| """ basic tokenize method with word character, non word character and digits """ |
| text = re.sub(r" +", " ", str(text)) |
| text = re.split(r"(\d+|[a-zA-ZğüşıöçĞÜŞİÖÇ]+|\W)", text) |
| text = list(filter(lambda x: x != '' and x != ' ', text)) |
| sent_tokenized = ' '.join(text) |
| return sent_tokenized |
|
|
| regex = re.compile('[%s]' % re.escape(string.punctuation)) |
|
|
| def remove_punct(text): |
| text = regex.sub(" ", text) |
| return text |
|
|
| clean = CleanText() |
|
|
| def label_encode(x): |
| if x == 1 or x == 2: |
| return 0 |
| if x == 3: |
| return 1 |
| if x == 5 or x == 4: |
| return 2 |
|
|
| def label2name(x): |
| if x == 0: |
| return "Negative" |
| if x == 1: |
| return "Neutral" |
| if x == 2: |
| return "Positive" |
|
|
| from google.colab import files |
| uploaded = files.upload() |
| df = pd.read_csv('tripadvisor_hotel_reviews.csv') |
|
|
| print("df.columns: ", df.columns) |
|
|
| fig = px.histogram(df, |
| x = 'Rating', |
| title = 'Histogram of Review Rating', |
| template = 'ggplot2', |
| color = 'Rating', |
| color_discrete_sequence= px.colors.sequential.Blues_r, |
| opacity = 0.8, |
| height = 525, |
| width = 835, |
| ) |
|
|
| fig.update_yaxes(title='Count') |
| fig.show() |
|
|
| df.info() |
|
|
| df["label"] = df["Rating"].apply(lambda x: label_encode(x)) |
| df["label_name"] = df["label"].apply(lambda x: label2name(x)) |
|
|
| df["Review"] = df["Review"].apply(lambda x: remove_punct(clean(remove_emoji(x).lower())[0][0])) |
|
|
| df.head() |
|
|
| fig = make_subplots(rows=1, cols=2, specs=[[{"type": "pie"}, {"type": "bar"}]]) |
| colors = ['gold', 'mediumturquoise', 'lightgreen'] |
| fig.add_trace(go.Pie(labels=df.label_name.value_counts().index, |
| values=df.label.value_counts().values), 1, 1) |
|
|
| fig.update_traces(hoverinfo='label+percent', textfont_size=20, |
| marker=dict(colors=colors, line=dict(color='#000000', width=2))) |
|
|
| fig.add_trace(go.Bar(x=df.label_name.value_counts().index, y=df.label.value_counts().values, marker_color = colors), 1,2) |
|
|
| fig.show() |
|
|
| import pandas as pd |
| import numpy as np |
| import os |
| import random |
| from pathlib import Path |
| import json |
|
|
| import torch |
| from tqdm.notebook import tqdm |
|
|
| from transformers import BertTokenizer |
| from torch.utils.data import TensorDataset |
|
|
| from transformers import BertForSequenceClassification |
|
|
| class Config(): |
| seed_val = 17 |
| device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") |
| epochs = 5 |
| batch_size = 6 |
| seq_length = 512 |
| lr = 2e-5 |
| eps = 1e-8 |
| pretrained_model = 'bert-base-uncased' |
| test_size=0.15 |
| random_state=42 |
| add_special_tokens=True |
| return_attention_mask=True |
| pad_to_max_length=True |
| do_lower_case=False |
| return_tensors='pt' |
| config = Config() |
|
|
| |
| params = {"seed_val": config.seed_val, |
| "device":str(config.device), |
| "epochs":config.epochs, |
| "batch_size":config.batch_size, |
| "seq_length":config.seq_length, |
| "lr":config.lr, |
| "eps":config.eps, |
| "pretrained_model": config.pretrained_model, |
| "test_size":config.test_size, |
| "random_state":config.random_state, |
| "add_special_tokens":config.add_special_tokens, |
| "return_attention_mask":config.return_attention_mask, |
| "pad_to_max_length":config.pad_to_max_length, |
| "do_lower_case":config.do_lower_case, |
| "return_tensors":config.return_tensors, |
| } |
|
|
| import random |
|
|
| device = config.device |
|
|
| random.seed(config.seed_val) |
| np.random.seed(config.seed_val) |
| torch.manual_seed(config.seed_val) |
| torch.cuda.manual_seed_all(config.seed_val) |
|
|
| df.head() |
|
|
| from sklearn.model_selection import train_test_split |
|
|
| train_df_, val_df = train_test_split(df, |
| test_size=0.10, |
| random_state=config.random_state, |
| stratify=df.label.values) |
|
|
| train_df_.head() |
|
|
| train_df, test_df = train_test_split(train_df_, |
| test_size=0.10, |
| random_state=42, |
| stratify=train_df_.label.values) |
|
|
| print(len(train_df['label'].unique())) |
| print(train_df.shape) |
|
|
| print(len(val_df['label'].unique())) |
| print(val_df.shape) |
|
|
| print(len(test_df['label'].unique())) |
| print(test_df.shape) |
|
|
| tokenizer = BertTokenizer.from_pretrained(config.pretrained_model, |
| do_lower_case=config.do_lower_case) |
|
|
| encoded_data_train = tokenizer.batch_encode_plus( |
| train_df.Review.values, |
| add_special_tokens=config.add_special_tokens, |
| return_attention_mask=config.return_attention_mask, |
| pad_to_max_length=config.pad_to_max_length, |
| max_length=config.seq_length, |
| return_tensors=config.return_tensors |
| ) |
| encoded_data_val = tokenizer.batch_encode_plus( |
| val_df.Review.values, |
| add_special_tokens=config.add_special_tokens, |
| return_attention_mask=config.return_attention_mask, |
| pad_to_max_length=config.pad_to_max_length, |
| max_length=config.seq_length, |
| return_tensors=config.return_tensors |
| ) |
|
|
| input_ids_train = encoded_data_train['input_ids'] |
| attention_masks_train = encoded_data_train['attention_mask'] |
| labels_train = torch.tensor(train_df.label.values) |
|
|
| input_ids_val = encoded_data_val['input_ids'] |
| attention_masks_val = encoded_data_val['attention_mask'] |
| labels_val = torch.tensor(val_df.label.values) |
|
|
| dataset_train = TensorDataset(input_ids_train, attention_masks_train, labels_train) |
| dataset_val = TensorDataset(input_ids_val, attention_masks_val, labels_val) |
|
|
| model = BertForSequenceClassification.from_pretrained(config.pretrained_model, |
| num_labels=3, |
| output_attentions=False, |
| output_hidden_states=False) |
|
|
| from torch.utils.data import DataLoader, RandomSampler, SequentialSampler |
|
|
| dataloader_train = DataLoader(dataset_train, |
| sampler=RandomSampler(dataset_train), |
| batch_size=config.batch_size) |
|
|
| dataloader_validation = DataLoader(dataset_val, |
| sampler=SequentialSampler(dataset_val), |
| batch_size=config.batch_size) |
|
|
| from transformers import AdamW, get_linear_schedule_with_warmup |
|
|
| optimizer = AdamW(model.parameters(), |
| lr=config.lr, |
| eps=config.eps) |
|
|
|
|
| scheduler = get_linear_schedule_with_warmup(optimizer, |
| num_warmup_steps=0, |
| num_training_steps=len(dataloader_train)*config.epochs) |
|
|
| from sklearn.metrics import f1_score |
|
|
| def f1_score_func(preds, labels): |
| preds_flat = np.argmax(preds, axis=1).flatten() |
| labels_flat = labels.flatten() |
| return f1_score(labels_flat, preds_flat, average='weighted') |
|
|
| def accuracy_per_class(preds, labels, label_dict): |
| label_dict_inverse = {v: k for k, v in label_dict.items()} |
|
|
| preds_flat = np.argmax(preds, axis=1).flatten() |
| labels_flat = labels.flatten() |
|
|
| for label in np.unique(labels_flat): |
| y_preds = preds_flat[labels_flat==label] |
| y_true = labels_flat[labels_flat==label] |
| print(f'Class: {label_dict_inverse[label]}') |
| print(f'Accuracy: {len(y_preds[y_preds==label])}/{len(y_true)}\n') |
|
|
| def evaluate(dataloader_val): |
|
|
| model.eval() |
|
|
| loss_val_total = 0 |
| predictions, true_vals = [], [] |
|
|
| for batch in dataloader_val: |
|
|
| batch = tuple(b.to(config.device) for b in batch) |
|
|
| inputs = {'input_ids': batch[0], |
| 'attention_mask': batch[1], |
| 'labels': batch[2], |
| } |
|
|
| with torch.no_grad(): |
| outputs = model(**inputs) |
|
|
| loss = outputs[0] |
| logits = outputs[1] |
| loss_val_total += loss.item() |
|
|
| logits = logits.detach().cpu().numpy() |
| label_ids = inputs['labels'].cpu().numpy() |
| predictions.append(logits) |
| true_vals.append(label_ids) |
|
|
| |
| loss_val_avg = loss_val_total/len(dataloader_val) |
|
|
| predictions = np.concatenate(predictions, axis=0) |
| true_vals = np.concatenate(true_vals, axis=0) |
|
|
| return loss_val_avg, predictions, true_vals |
|
|
| config.device |
|
|
| model.to(config.device) |
|
|
| for epoch in tqdm(range(1, config.epochs+1)): |
|
|
| model.train() |
|
|
| loss_train_total = 0 |
| |
| progress_bar = tqdm(dataloader_train, desc='Epoch {:1d}'.format(epoch), leave=False, disable=False) |
|
|
| for batch in progress_bar: |
|
|
| model.zero_grad() |
|
|
| batch = tuple(b.to(config.device) for b in batch) |
|
|
| inputs = {'input_ids': batch[0], |
| 'attention_mask': batch[1], |
| 'labels': batch[2], |
| } |
|
|
| outputs = model(**inputs) |
|
|
| loss = outputs[0] |
| loss_train_total += loss.item() |
| loss.backward() |
|
|
| torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0) |
|
|
| optimizer.step() |
| scheduler.step() |
| progress_bar.set_postfix({'training_loss': '{:.3f}'.format(loss.item()/len(batch))}) |
|
|
|
|
| torch.save(model.state_dict(), f'_BERT_epoch_{epoch}.model') |
|
|
| tqdm.write(f'\nEpoch {epoch}') |
|
|
| loss_train_avg = loss_train_total/len(dataloader_train) |
| tqdm.write(f'Training loss: {loss_train_avg}') |
|
|
| val_loss, predictions, true_vals = evaluate(dataloader_validation) |
| val_f1 = f1_score_func(predictions, true_vals) |
| tqdm.write(f'Validation loss: {val_loss}') |
|
|
| tqdm.write(f'F1 Score (Weighted): {val_f1}'); |
| |
| with Path('params.json').open("w") as f: |
| json.dump(params, f, ensure_ascii=False, indent=4) |
|
|
| model.load_state_dict(torch.load(f'./_BERT_epoch_3.model', map_location=torch.device('cpu'))) |
|
|
| from sklearn.metrics import classification_report |
|
|
| preds_flat = np.argmax(predictions, axis=1).flatten() |
| print(classification_report(preds_flat, true_vals)) |
|
|
| pred_final = [] |
|
|
| for i, row in tqdm(val_df.iterrows(), total=val_df.shape[0]): |
| predictions = [] |
|
|
| review = row["Review"] |
| encoded_data_test_single = tokenizer.batch_encode_plus( |
| [review], |
| add_special_tokens=config.add_special_tokens, |
| return_attention_mask=config.return_attention_mask, |
| pad_to_max_length=config.pad_to_max_length, |
| max_length=config.seq_length, |
| return_tensors=config.return_tensors |
| ) |
| input_ids_test = encoded_data_test_single['input_ids'] |
| attention_masks_test = encoded_data_test_single['attention_mask'] |
|
|
|
|
| inputs = {'input_ids': input_ids_test.to(device), |
| 'attention_mask':attention_masks_test.to(device), |
| } |
|
|
| with torch.no_grad(): |
| outputs = model(**inputs) |
|
|
| logits = outputs[0] |
| logits = logits.detach().cpu().numpy() |
| predictions.append(logits) |
| predictions = np.concatenate(predictions, axis=0) |
| pred_final.append(np.argmax(predictions, axis=1).flatten()[0]) |
|
|
| val_df["pred"] = pred_final |
| |
| control = val_df.pred.values == val_df.label.values |
| val_df["control"] = control |
| |
| val_df = val_df[val_df.control == False] |
|
|
|
|
|
|
| name2label = {"Negative":0, |
| "Neutral":1, |
| "Positive":2 |
| } |
| label2name = {v: k for k, v in name2label.items()} |
|
|
| val_df["pred_name"] = val_df.pred.apply(lambda x: label2name.get(x)) |
| from sklearn.metrics import confusion_matrix |
|
|
| |
| pred_name_values = val_df.pred_name.values |
| label_values = val_df.label_name.values |
| confmat = confusion_matrix(label_values, pred_name_values, labels=list(name2label.keys())) |
|
|
| confmat |
|
|
| df_confusion_val = pd.crosstab(label_values, pred_name_values) |
| df_confusion_val |
|
|
| df_confusion_val.to_csv("val_df_confusion.csv") |
|
|
| test_df.head() |
|
|
| encoded_data_test = tokenizer.batch_encode_plus( |
| test_df.Review.values, |
| add_special_tokens=config.add_special_tokens, |
| return_attention_mask=config.return_attention_mask, |
| pad_to_max_length=config.pad_to_max_length, |
| max_length=config.seq_length, |
| return_tensors=config.return_tensors |
| ) |
| input_ids_test = encoded_data_test['input_ids'] |
| attention_masks_test = encoded_data_test['attention_mask'] |
| labels_test = torch.tensor(test_df.label.values) |
|
|
| model = BertForSequenceClassification.from_pretrained(config.pretrained_model, |
| num_labels=3, |
| output_attentions=False, |
| output_hidden_states=False) |
|
|
| model.to(config.device) |
|
|
| model.load_state_dict(torch.load(f'./_BERT_epoch_3.model', map_location=torch.device('cpu'))) |
|
|
| _, predictions_test, true_vals_test = evaluate(dataloader_validation) |
| |
|
|
| def predict_sentiment(text): |
| |
| encoded_text = tokenizer.encode_plus( |
| text, |
| add_special_tokens=config.add_special_tokens, |
| return_attention_mask=config.return_attention_mask, |
| pad_to_max_length=config.pad_to_max_length, |
| max_length=config.seq_length, |
| return_tensors=config.return_tensors |
| ) |
|
|
| |
| input_ids = encoded_text['input_ids'].to(config.device) |
| attention_mask = encoded_text['attention_mask'].to(config.device) |
|
|
| |
| model.eval() |
| with torch.no_grad(): |
| outputs = model(input_ids, attention_mask) |
|
|
| |
| logits = outputs[0] |
| logits = logits.detach().cpu().numpy() |
|
|
| |
| pred = np.argmax(logits, axis=1).flatten()[0] |
|
|
| |
| pred_name = label2name.get(pred) |
|
|
| return pred_name |
|
|
| text = "Your text here" |
| prediction = predict_sentiment(text) |
| print(f"The sentiment of the text is: {prediction}") |
|
|
| from sklearn.metrics import classification_report |
|
|
| preds_flat_test = np.argmax(predictions_test, axis=1).flatten() |
| print(classification_report(preds_flat_test, true_vals_test)) |
|
|
| pred_final = [] |
|
|
| for i, row in tqdm(test_df.iterrows(), total=test_df.shape[0]): |
| predictions = [] |
|
|
| review = row["Review"] |
| encoded_data_test_single = tokenizer.batch_encode_plus( |
| [review], |
| add_special_tokens=config.add_special_tokens, |
| return_attention_mask=config.return_attention_mask, |
| pad_to_max_length=config.pad_to_max_length, |
| max_length=config.seq_length, |
| return_tensors=config.return_tensors |
| ) |
| input_ids_test = encoded_data_test_single['input_ids'] |
| attention_masks_test = encoded_data_test_single['attention_mask'] |
|
|
| inputs = {'input_ids': input_ids_test.to(device), |
| 'attention_mask':attention_masks_test.to(device), |
| } |
|
|
| with torch.no_grad(): |
| outputs = model(**inputs) |
|
|
| logits = outputs[0] |
| logits = logits.detach().cpu().numpy() |
| predictions.append(logits) |
| predictions = np.concatenate(predictions, axis=0) |
| pred_final.append(np.argmax(predictions, axis=1).flatten()[0]) |
|
|
| |
| test_df["pred"] = pred_final |
| |
| control = test_df.pred.values == test_df.label.values |
| test_df["control"] = control |
| |
| test_df = test_df[test_df.control == False] |
| test_df["pred_name"] = test_df.pred.apply(lambda x: label2name.get(x)) |
|
|
| from sklearn.metrics import confusion_matrix |
|
|
| |
| pred_name_values = test_df.pred_name.values |
| label_values = test_df.label_name.values |
| confmat = confusion_matrix(label_values, pred_name_values, labels=list(name2label.keys())) |
| confmat |
|
|
| df_confusion_test = pd.crosstab(label_values, pred_name_values) |
| df_confusion_test |
|
|
| import matplotlib.pyplot as plt |
| import seaborn as sns |
|
|
| |
|
|
| fig, ax = plt.subplots(figsize=(10,10)) |
| sns.heatmap(confmat, annot=True, fmt='d', |
| xticklabels=name2label.keys(), yticklabels=name2label.keys()) |
| plt.ylabel('Vraies valeurs') |
| plt.xlabel('Prédictions') |
| plt.show() |