import itertools import json import re import fasttext import pandas as pd import spacy from simpletransformers.ner import NERModel from spacy.matcher import PhraseMatcher from constants import POSITIVE_SENTIMENT_PATTERNS, LABEL_COLOR, CATEGORY_THRESHOLD # from django.conf import settings from emoji import demojize import unicodedata # base_directory = settings.BASE_DIR labels_file = f"ml_models/labels.json" ner_model_directory = f"ml_models/ner_model/" sentiment_model_file = f"ml_models/sentiment_model/model.ft" class MlProcessing: def __init__(self, comment_dict, language_model, sentiment_model, labels): self.comment_dict = comment_dict self.is_cleaned = False self.language_model = language_model self.sentiment_model = sentiment_model self.labels = labels def remove_prefix(self, label): return label.split('-')[-1] def labels_to_spans(self, tokens, labels): spans = [] for label, group in itertools.groupby(zip(tokens, labels), key=lambda x: self.remove_prefix(x[1])): if label == 'O': continue group_tokens = [t for t, _ in group] spans.append({'label': label, 'start': group_tokens[0]['start'], 'end': group_tokens[-1]['end'], 'n_tokens': len(group_tokens)}) return spans def score_to_str(self, score): if pd.isna(score): return '' return f'RATING_{int(score)}' def configure_matcher(self, nlp, patterns): matcher = PhraseMatcher(nlp.vocab, attr='LOWER') patterns = [nlp.make_doc(p) for p in patterns] matcher.add('positive', patterns) return matcher def cleaner(self): cleaner = ReviewsCleaner() self.comment_dict['text'] = cleaner.clean_text(self.comment_dict['text']) self.comment_dict['cleaned'] = True self.is_cleaned = True def clip(self, x, min_, max_): if x < min_: return min_ if x > max_: return max_ return x def get_score(self): record = dict() if "star_rating" in self.comment_dict and self.comment_dict['star_rating'] is not None and str(self.comment_dict['star_rating']).isnumeric(): record["score"] = self.clip(float(self.comment_dict['star_rating']), 0, 5) elif 'tali_score' in self.comment_dict and self.comment_dict['tali_score'] is not None and str(self.comment_dict['tali_score']).isnumeric(): record['score'] = self.clip(float(self.comment_dict['tali_score']) // 2, 0, 5) else: record['score'] = None record['score_str'] = self.score_to_str(record['score']) return record def reformat_output(self, data): text = data["text"] spans = data.get("spans", list()) new_spans = list() previous_span_end = -1 for i, span in enumerate(spans): span_start = span["start"] span_end = span["end"] # there's some unlabelled span between the last added span and present labelled span # this would work for first span as well if span_start != previous_span_end + 1: new_spans.append({ "label": text[previous_span_end + 1:span_start], "color": "", "value": "", "sentiment": "", "score": None }) # Add the present span new_spans.append({ "label": text[span_start:span_end], "color": LABEL_COLOR[span["label"]], "value": span["label"], "sentiment": span["sentiment"], "score": span["score"] }) previous_span_end = span_end # If the added span is the last labelled span but there's unlabelled text remaining # that needs to be added if (i == len(spans) - 1) and span_end < len(text): new_spans.append({ "label": text[span_end:], "color": "", "value": "", "sentiment": "", "score": None, }) previous_span_end = len(text) data.update({"spans": new_spans}) def preprocess_text(self, text): text = text.lower() text = re.sub('(?<=\.)\.', ' ', text) text = text.strip().strip('. ",') text = text.replace('\n', ' ') text = text.replace('’', "'") text = re.sub('\s+', ' ', text) return text def predict(self, model, text, category): text = self.preprocess_text(text) labels, probs = model.predict(text, k=2) if labels[0] == '__label__POSITIVE': prob = probs[0] else: prob = probs[1] if prob >= CATEGORY_THRESHOLD[category]: label = 'POSITIVE' else: label = 'NEGATIVE' return {'label': label, 'score': prob} def apply_sentiment_model(self, review_dict_entities): # nlp = spacy.load('en_core_web_sm') nlp = self.language_model sentence_finder = SentenceBoundsFinder(nlp) positive_sentiment_matcher = self.configure_matcher(nlp, POSITIVE_SENTIMENT_PATTERNS) sentiment_model = self.load_sentiment_model() if self.comment_dict['skip']: return self.comment_dict review = re.sub(r'["“”]|_x000D_', ' ', self.comment_dict['text']) sentence_bounds = sentence_finder(review) for span in self.comment_dict.get('spans', []): segment_text = self.comment_dict['text'][span['start']:span['end']].replace('\n', ' ') segment_doc = nlp(segment_text) matches = positive_sentiment_matcher(segment_doc) if matches: sentiments = {'label': 'POSITIVE', 'score': 1.} span['sentiment'] = sentiments.get('label') span['score'] = sentiments.get('score') else: span_start = self.get_sentence_start(sentence_bounds, span['start']) text = self.comment_dict['text'][span_start:span['end']].replace('\n', ' ') text = f"{self.comment_dict['score_str'].lower()} {span['label'].lower()} {text}" sentiments = self.predict(sentiment_model, text, span['label']) span['sentiment'] = sentiments.get('label') span['score'] = sentiments.get('score') return self.comment_dict def load_sentiment_model(self): # return fasttext.load_model(sentiment_model_file) return self.sentiment_model def get_sentence_start(self, sentence_bounds, position): for start, end in sentence_bounds: if start <= position <= end: return start raise RuntimeError('Failed to get sentence bound') def load_ner_model(self, max_seq_len=500, use_multiprocessing=True): args = {'overwrite_output_dir': False, 'reprocess_input_data': True, 'num_train_epochs': 30, 'evaluation_strategy': 'epoch', 'evaluate_during_training': True, 'silent': True, 'max_seq_length': max_seq_len, 'use_multiprocessing': use_multiprocessing, 'use_multiprocessing_for_evaluation': use_multiprocessing, 'fp16': True} labels = self.labels return NERModel('longformer', ner_model_directory, args=args, use_cuda=False, labels=labels) def apply_ner_model(self): nlp = self.language_model # nlp = spacy.load('en_core_web_sm') # nlp.add_pipe('sentencizer') regex = re.compile('(\(original.{0,3}\).+)', re.IGNORECASE | re.MULTILINE | re.DOTALL) if self.comment_dict['skip']: return self.comment_dict self.comment_dict['text'] = regex.sub('', self.comment_dict['text']) self.comment_dict['_doc'] = nlp(self.comment_dict['text']) seq_lengths = [len(self.comment_dict['_doc'])] seq_lengths = sorted(seq_lengths) len_1 = seq_lengths[int(len(seq_lengths) * 0.8)] len_2 = seq_lengths[-1] ner_model_1 = self.load_ner_model(int(1.5 * len_1)) try: model = ner_model_1 if len(self.comment_dict['_doc']) > len_1: ner_model_2 = self.load_ner_model(int(1.5 * len_2)) model = ner_model_2 self._apply_ner_model(model, self.comment_dict) return self.comment_dict except Exception as e: self.comment_dict['skip'] = True def _apply_ner_model(self, ner_model, item): doc = item['_doc'] del item['_doc'] predictions, _ = ner_model.predict([[t.text for t in doc]], split_on_space=False) predictions = predictions[0] tokens = doc.to_json()['tokens'] if len(tokens) != len(predictions): # set_failed(db, task, 'Failed to apply NER model.') item['spans'] = [] return for t, p in zip(tokens, predictions): t['label'] = list(p.values())[0] labels = [t['label'] for t in tokens] spans = self.labels_to_spans(tokens, labels) item['spans'] = self.postprocess_spans(spans) def postprocess_spans(self, spans): if spans: for j, span in enumerate(list(spans)): if span['n_tokens'] < 3: if len(spans) > 1: if j == 0: spans[j]['label'] = spans[j + 1]['label'] elif j == len(spans) - 1: spans[j]['label'] = spans[j - 1]['label'] elif spans[j - 1]['label'] == spans[j + 1]['label']: spans[j]['label'] = spans[j - 1]['label'] else: spans[j]['label'] = 'O' else: spans[j]['label'] = 'O' new_spans = [] for label, label_spans in itertools.groupby(spans, key=lambda s: s['label']): if label == 'O': continue label_spans = list(label_spans) new_spans.append({'start': label_spans[0]['start'], 'end': label_spans[-1]['end'], 'label': label}) return new_spans def process_comment(self): sentiment = dict() score_dict = self.get_score() self.comment_dict.update(score_dict) self.cleaner() try: review_dict_entities = self.apply_ner_model() sentiment = self.apply_sentiment_model(review_dict_entities) self.reformat_output(sentiment) # for very small texts ner model errors except AssertionError: self.comment_dict["skip"] = True sentiment.update(self.comment_dict) # sentiment.update({"spans": [{"label": review_json_cleaned["text"], "color": "", "value": "", "sentiment": "", "score": None}]}) label_color_mappings = list() for label, label_color in LABEL_COLOR.items(): label_color_mappings.append({"label": label, "color": label_color}) sentiment.update({"color_map": label_color_mappings}) return sentiment def main(self): return self.process_comment() class SentenceBoundsFinder: def __init__(self, nlp=None): # self._nlp = nlp or spacy.load('en_core_web_sm') # self._nlp.add_pipe('sentencizer') self._nlp = nlp or self.language_model def __call__(self, text): bounds = [] for sent in self._nlp(text).sents: bounds.append((sent.start_char, sent.end_char)) return bounds class ReviewsCleaner: """ Class for the cleaning of review dataset and collecting statistics on cleaning :param replace_emojis: Replace emojis to text representing them :param unicode_normalize: Normalize unicode chars :param remove_non_regular_chars: Remove chars with ordinal number <128 :param remove_junk: Remove characters that are not relevant for the reviews and often corrupt tokens (* \n \r \t) :param remove_double_spaces: Remove double spaces :param remove_boundary_quotes: Remove quotes which on boundaries of text :param same_quotes: Transform all quote marks into single quote mark """ def __init__(self, replace_emojis=True, unicode_normalize=True, remove_non_regular_chars=True, remove_junk=True, remove_double_spaces=True, remove_boundary_quotes=True, same_quotes=True): self.methods = [] # Add new methods here !!! MIND THE ORDER !!! if replace_emojis: self.methods.append(('Deemojize', lambda text: self.__demojize(text))) if unicode_normalize: self.methods.append(('Normalize', lambda text: ''.join( c for c in unicodedata.normalize('NFD', text) if unicodedata.category(c) != 'Mn'))) if same_quotes: self.methods.append(('Same quotes', lambda text: re.sub('"|’|`|“', '\'', text))) if remove_boundary_quotes: self.methods.append(('Rm boundary quotes', lambda text: self.__remove_boundary(text))) if remove_junk: self.methods.append(('Remove junk', lambda text: re.sub('\*|\n|\r|\t|_x000D_', ' ', text))) if remove_non_regular_chars: self.methods.append(('Remove non-regular', lambda text: ''.join(c for c in text if ord(c) < 128))) if remove_double_spaces: self.methods.append(('Remove double spaces', lambda text: ' '.join(text.split()))) self.stats = {name: [0, 0] for name, _ in self.methods} # name, characters changed, reviews affected self.analyzed_reviews = 0 self.skipped = 0 def clean_stats(self): """Reset statistics""" self.stats = {[name, 0, 0] for name, _ in self.methods} self.analyzed_reviews = 0 def print_stats(self): """Print statistics of used methods""" print(f'Reviews analyzed: {self.analyzed_reviews}') print("{:<20} {:<10} {:<10}".format('Name', 'Avg. % of chars', '% of reviews affected')) for name, item in self.stats.items(): print("{:<20} {:<10} {:<10}".format(name, f'{(100 * item[0] / self.analyzed_reviews):.2f}%', f'{(100 * item[1] / self.analyzed_reviews):.2f}%')) print(f'Language skip\t-\t{(100 * self.skipped / self.analyzed_reviews):.2f}%') def clean_text(self, text): """Clean line of text""" self.analyzed_reviews += 1 if len(text) == 0: return text for method_name, method_fun in self.methods: text = method_fun(text) return text @staticmethod def __demojize(text): text = demojize(text, delimiters=[' ', ' ']) text = re.sub('_[a-z]*_skin_tone', '', text) return text @staticmethod def __remove_boundary(text): if text[:1] == '\'': text = text[1:] if text[-1:] == '\'': text = text[:-1] return text def process_single_comment(raw_data, LANGUAGE_MODEL, SENTIMENT_MODEL, LABELS ): ml = MlProcessing(comment_dict=raw_data, language_model=LANGUAGE_MODEL, sentiment_model=SENTIMENT_MODEL, labels=LABELS ) processed_data = ml.main() spans = processed_data.get('spans', list()) has_sentiments = True if not any(spans): spans = [{'label': raw_data.get('text', str()), 'color': '', 'value': '', 'sentiment': '', 'score': ''}] has_sentiments = False processed_data['spans'] = spans return processed_data, has_sentiments