| | from transformers import AutoTokenizer |
| | from transformers import AutoModelForSequenceClassification |
| | from distutils.dir_util import copy_tree |
| | from underthesea import word_tokenize |
| | from utils.data_preprocessing import * |
| | from vncorenlp import VnCoreNLP |
| | from torch.utils.data import TensorDataset, DataLoader, RandomSampler, SequentialSampler |
| | import torch |
| | import pandas as pd |
| | import numpy as np |
| | from optimum.bettertransformer import BetterTransformer |
| | from stqdm import stqdm |
| |
|
| | MODEL_PATH = "D:\\Thesis Topic modelling\\Phobert-base-v2-shopee" |
| | TOKENIZE_PATH = "./vncorenlp/VnCoreNLP-1.1.1.jar" |
| |
|
| | def get_prediction(predictions, threshold=0.5): |
| | """ |
| | Produce probability from the classification model |
| | |
| | Parameters |
| | ---------- |
| | predictions : torch.tensor |
| | output from the last linear layer |
| | |
| | Returns |
| | ---------- |
| | numpy.array |
| | an array containing probabilities for each label |
| | """ |
| | |
| | sigmoid = torch.nn.Sigmoid() |
| | probs = sigmoid(torch.Tensor(predictions)) |
| | |
| | y_pred = np.zeros(probs.shape) |
| | y_pred[np.where(probs >= threshold)] = 1 |
| | return y_pred |
| |
|
| |
|
| | class InferencePhobert: |
| | def __init__(self, tokenize_model = "underthesea", classification_model = MODEL_PATH): |
| | """ |
| | A class for inferencing PhoBERT model |
| | |
| | Parameters |
| | ---------- |
| | tokenize_model : string |
| | choosing which model to tokenize text (underthesea or rdrsegementer) |
| | |
| | classification_model: string |
| | path to model weight |
| | |
| | """ |
| | labels = ["Quality", "Serve", "Pack", "Shipping", "Price", "Other"] |
| | id2label = {idx:label for idx, label in enumerate(labels)} |
| | label2id = {label:idx for idx, label in enumerate(labels)} |
| | model = AutoModelForSequenceClassification.from_pretrained(classification_model, problem_type="multi_label_classification", |
| | num_labels=len(labels), |
| | id2label=id2label, |
| | label2id=label2id) |
| | model.eval() |
| | self.model = BetterTransformer.transform(model, keep_original_model=True) |
| | self.tokenizer = AutoTokenizer.from_pretrained(classification_model) |
| | self.segmenter_path = tokenize_model |
| |
|
| | def rdrsegment(self, text): |
| | """ |
| | Tokenize text using rdrsegmenter |
| | |
| | Parameters |
| | ---------- |
| | text : string |
| | input text |
| | |
| | Returns |
| | ---------- |
| | string |
| | tokenized text (For example, "san pham tot" to "san_pham tot") |
| | |
| | """ |
| | text = self.rdrsegmenter.tokenize(text) |
| | text = ' '.join([' '.join(x) for x in text]) |
| | return text |
| |
|
| | def preprocess(self, data): |
| | """ |
| | Reformatting text to fit into PhoBERT model. This process include tokenzing, byte-pair-encoding and padding |
| | |
| | Parameters |
| | ---------- |
| | data : list |
| | input text data |
| | |
| | Returns |
| | ---------- |
| | dictionary |
| | Containing encoded values, masked attention. |
| | |
| | """ |
| |
|
| | text_list = [] |
| | if self.segmenter_path == "underthesea": |
| | for text in data: |
| | text = word_tokenize(text, format="text") |
| | text_list.append(text) |
| | else: |
| | self.rdrsegmenter = VnCoreNLP(self.segmenter_path, annotators="wseg", max_heap_size='-Xmx500m') |
| | for text in data: |
| | text = self.rdrsegmenter.tokenize(text) |
| | text = ' '.join([' '.join(x) for x in text]) |
| | text_list.append(text) |
| | encoding = self.tokenizer(text_list, padding = "max_length", truncation = True, max_length = 125) |
| | return encoding |
| |
|
| | def generate_dataset(self, processed_data, batch_size = 10): |
| | """ |
| | Gemerate torch dataset from data |
| | |
| | Parameters |
| | ---------- |
| | processed_data : dictionary |
| | output from preprocess function |
| | |
| | batch_size: int |
| | How many reviews to be included for each iteration |
| | |
| | Returns |
| | ---------- |
| | torch.dataset |
| | Dataset representing the reviews and associated labels |
| | |
| | """ |
| | inputs = torch.tensor(processed_data["input_ids"]) |
| | masks = torch.tensor(processed_data["attention_mask"]) |
| | dataset = TensorDataset(inputs, masks) |
| | dataset_sampler = SequentialSampler(dataset) |
| | data_loader = DataLoader(dataset, sampler=dataset_sampler, batch_size=batch_size) |
| | return data_loader |
| |
|
| | def predict(self, dataset): |
| | """ |
| | Get prediction from PhoBERT model |
| | |
| | Parameters |
| | ---------- |
| | dataset : torch.dataset |
| | output from generate_dataset function |
| | |
| | Returns |
| | ---------- |
| | numpy.array |
| | containing probabilities for each label |
| | |
| | """ |
| | predictions = [] |
| | for step, batch in stqdm(enumerate(dataset), total = len(dataset)): |
| | b_input_ids, b_input_mask = batch |
| | with torch.no_grad(): |
| | self.model.eval() |
| | input_ids = torch.tensor(b_input_ids) |
| | attention_mask = torch.tensor(b_input_mask) |
| | outputs = self.model(input_ids, |
| | token_type_ids=None, |
| | attention_mask=attention_mask) |
| | prediction = get_prediction(outputs[0], threshold=0.5) |
| | predictions.append(prediction) |
| | res = np.concatenate(predictions) |
| | return res |
| |
|
| | def predict_sentence(self, text): |
| | """ |
| | Get prediction from PhoBERT model for a single review |
| | |
| | Parameters |
| | ---------- |
| | text : string |
| | output from generate_dataset function |
| | |
| | Returns |
| | ---------- |
| | numpy.array |
| | containing probabilities for each label |
| | |
| | """ |
| | if self.segmenter_path == "underthesea": |
| | text = word_tokenize(text, format="text") |
| | else: |
| | self.rdrsegmenter = VnCoreNLP(self.segmenter_path, annotators="wseg", max_heap_size='-Xmx500m') |
| | text = self.rdrsegment(text) |
| | encoding = self.tokenizer([text], padding = "max_length", truncation = True, max_length = 125) |
| | inputs = torch.tensor(encoding["input_ids"]) |
| | masks = torch.tensor(encoding["attention_mask"]) |
| | with torch.no_grad(): |
| | self.model.eval() |
| | output = self.model(inputs, |
| | token_type_ids=None, |
| | attention_mask=masks) |
| | sigmoid = torch.nn.Sigmoid() |
| | probs = sigmoid(torch.Tensor(output[0])) |
| | return probs |