Spaces:
Sleeping
Sleeping
| import itertools | |
| import json | |
| import re | |
| import fasttext | |
| import pandas as pd | |
| import spacy | |
| from simpletransformers.ner import NERModel | |
| from spacy.matcher import PhraseMatcher | |
| from constants import POSITIVE_SENTIMENT_PATTERNS, LABEL_COLOR, CATEGORY_THRESHOLD | |
| # from django.conf import settings | |
| from emoji import demojize | |
| import unicodedata | |
| # base_directory = settings.BASE_DIR | |
| labels_file = f"ml_models/labels.json" | |
| ner_model_directory = f"ml_models/ner_model/" | |
| sentiment_model_file = f"ml_models/sentiment_model/model.ft" | |
| class MlProcessing: | |
| def __init__(self, comment_dict, language_model, sentiment_model, labels): | |
| self.comment_dict = comment_dict | |
| self.is_cleaned = False | |
| self.language_model = language_model | |
| self.sentiment_model = sentiment_model | |
| self.labels = labels | |
| def remove_prefix(self, label): | |
| return label.split('-')[-1] | |
| def labels_to_spans(self, tokens, labels): | |
| spans = [] | |
| for label, group in itertools.groupby(zip(tokens, labels), key=lambda x: self.remove_prefix(x[1])): | |
| if label == 'O': | |
| continue | |
| group_tokens = [t for t, _ in group] | |
| spans.append({'label': label, 'start': group_tokens[0]['start'], 'end': group_tokens[-1]['end'], | |
| 'n_tokens': len(group_tokens)}) | |
| return spans | |
| def score_to_str(self, score): | |
| if pd.isna(score): | |
| return '' | |
| return f'RATING_{int(score)}' | |
| def configure_matcher(self, nlp, patterns): | |
| matcher = PhraseMatcher(nlp.vocab, attr='LOWER') | |
| patterns = [nlp.make_doc(p) for p in patterns] | |
| matcher.add('positive', patterns) | |
| return matcher | |
| def cleaner(self): | |
| cleaner = ReviewsCleaner() | |
| self.comment_dict['text'] = cleaner.clean_text(self.comment_dict['text']) | |
| self.comment_dict['cleaned'] = True | |
| self.is_cleaned = True | |
| def clip(self, x, min_, max_): | |
| if x < min_: | |
| return min_ | |
| if x > max_: | |
| return max_ | |
| return x | |
| def get_score(self): | |
| record = dict() | |
| if "star_rating" in self.comment_dict and self.comment_dict['star_rating'] is not None and str(self.comment_dict['star_rating']).isnumeric(): | |
| record["score"] = self.clip(float(self.comment_dict['star_rating']), 0, 5) | |
| elif 'tali_score' in self.comment_dict and self.comment_dict['tali_score'] is not None and str(self.comment_dict['tali_score']).isnumeric(): | |
| record['score'] = self.clip(float(self.comment_dict['tali_score']) // 2, 0, 5) | |
| else: | |
| record['score'] = None | |
| record['score_str'] = self.score_to_str(record['score']) | |
| return record | |
| def reformat_output(self, data): | |
| text = data["text"] | |
| spans = data.get("spans", list()) | |
| new_spans = list() | |
| previous_span_end = -1 | |
| for i, span in enumerate(spans): | |
| span_start = span["start"] | |
| span_end = span["end"] | |
| # there's some unlabelled span between the last added span and present labelled span | |
| # this would work for first span as well | |
| if span_start != previous_span_end + 1: | |
| new_spans.append({ | |
| "label": text[previous_span_end + 1:span_start], | |
| "color": "", | |
| "value": "", | |
| "sentiment": "", | |
| "score": None | |
| }) | |
| # Add the present span | |
| new_spans.append({ | |
| "label": text[span_start:span_end], | |
| "color": LABEL_COLOR[span["label"]], | |
| "value": span["label"], | |
| "sentiment": span["sentiment"], | |
| "score": span["score"] | |
| }) | |
| previous_span_end = span_end | |
| # If the added span is the last labelled span but there's unlabelled text remaining | |
| # that needs to be added | |
| if (i == len(spans) - 1) and span_end < len(text): | |
| new_spans.append({ | |
| "label": text[span_end:], | |
| "color": "", | |
| "value": "", | |
| "sentiment": "", | |
| "score": None, | |
| }) | |
| previous_span_end = len(text) | |
| data.update({"spans": new_spans}) | |
| def preprocess_text(self, text): | |
| text = text.lower() | |
| text = re.sub('(?<=\.)\.', ' ', text) | |
| text = text.strip().strip('. ",') | |
| text = text.replace('\n', ' ') | |
| text = text.replace('’', "'") | |
| text = re.sub('\s+', ' ', text) | |
| return text | |
| def predict(self, model, text, category): | |
| text = self.preprocess_text(text) | |
| labels, probs = model.predict(text, k=2) | |
| if labels[0] == '__label__POSITIVE': | |
| prob = probs[0] | |
| else: | |
| prob = probs[1] | |
| if prob >= CATEGORY_THRESHOLD[category]: | |
| label = 'POSITIVE' | |
| else: | |
| label = 'NEGATIVE' | |
| return {'label': label, 'score': prob} | |
| def apply_sentiment_model(self, review_dict_entities): | |
| # nlp = spacy.load('en_core_web_sm') | |
| nlp = self.language_model | |
| sentence_finder = SentenceBoundsFinder(nlp) | |
| positive_sentiment_matcher = self.configure_matcher(nlp, POSITIVE_SENTIMENT_PATTERNS) | |
| sentiment_model = self.load_sentiment_model() | |
| if self.comment_dict['skip']: | |
| return self.comment_dict | |
| review = re.sub(r'["“”]|_x000D_', ' ', self.comment_dict['text']) | |
| sentence_bounds = sentence_finder(review) | |
| for span in self.comment_dict.get('spans', []): | |
| segment_text = self.comment_dict['text'][span['start']:span['end']].replace('\n', ' ') | |
| segment_doc = nlp(segment_text) | |
| matches = positive_sentiment_matcher(segment_doc) | |
| if matches: | |
| sentiments = {'label': 'POSITIVE', 'score': 1.} | |
| span['sentiment'] = sentiments.get('label') | |
| span['score'] = sentiments.get('score') | |
| else: | |
| span_start = self.get_sentence_start(sentence_bounds, span['start']) | |
| text = self.comment_dict['text'][span_start:span['end']].replace('\n', ' ') | |
| text = f"{self.comment_dict['score_str'].lower()} {span['label'].lower()} {text}" | |
| sentiments = self.predict(sentiment_model, text, span['label']) | |
| span['sentiment'] = sentiments.get('label') | |
| span['score'] = sentiments.get('score') | |
| return self.comment_dict | |
| def load_sentiment_model(self): | |
| # return fasttext.load_model(sentiment_model_file) | |
| return self.sentiment_model | |
| def get_sentence_start(self, sentence_bounds, position): | |
| for start, end in sentence_bounds: | |
| if start <= position <= end: | |
| return start | |
| raise RuntimeError('Failed to get sentence bound') | |
| def load_ner_model(self, max_seq_len=500, use_multiprocessing=True): | |
| args = {'overwrite_output_dir': False, 'reprocess_input_data': True, 'num_train_epochs': 30, | |
| 'evaluation_strategy': 'epoch', 'evaluate_during_training': True, 'silent': True, | |
| 'max_seq_length': max_seq_len, 'use_multiprocessing': use_multiprocessing, | |
| 'use_multiprocessing_for_evaluation': use_multiprocessing, 'fp16': True} | |
| labels = self.labels | |
| return NERModel('longformer', ner_model_directory, args=args, use_cuda=False, labels=labels) | |
| def apply_ner_model(self): | |
| nlp = self.language_model | |
| # nlp = spacy.load('en_core_web_sm') | |
| # nlp.add_pipe('sentencizer') | |
| regex = re.compile('(\(original.{0,3}\).+)', re.IGNORECASE | re.MULTILINE | re.DOTALL) | |
| if self.comment_dict['skip']: | |
| return self.comment_dict | |
| self.comment_dict['text'] = regex.sub('', self.comment_dict['text']) | |
| self.comment_dict['_doc'] = nlp(self.comment_dict['text']) | |
| seq_lengths = [len(self.comment_dict['_doc'])] | |
| seq_lengths = sorted(seq_lengths) | |
| len_1 = seq_lengths[int(len(seq_lengths) * 0.8)] | |
| len_2 = seq_lengths[-1] | |
| ner_model_1 = self.load_ner_model(int(1.5 * len_1)) | |
| try: | |
| model = ner_model_1 | |
| if len(self.comment_dict['_doc']) > len_1: | |
| ner_model_2 = self.load_ner_model(int(1.5 * len_2)) | |
| model = ner_model_2 | |
| self._apply_ner_model(model, self.comment_dict) | |
| return self.comment_dict | |
| except Exception as e: | |
| self.comment_dict['skip'] = True | |
| def _apply_ner_model(self, ner_model, item): | |
| doc = item['_doc'] | |
| del item['_doc'] | |
| predictions, _ = ner_model.predict([[t.text for t in doc]], split_on_space=False) | |
| predictions = predictions[0] | |
| tokens = doc.to_json()['tokens'] | |
| if len(tokens) != len(predictions): | |
| # set_failed(db, task, 'Failed to apply NER model.') | |
| item['spans'] = [] | |
| return | |
| for t, p in zip(tokens, predictions): | |
| t['label'] = list(p.values())[0] | |
| labels = [t['label'] for t in tokens] | |
| spans = self.labels_to_spans(tokens, labels) | |
| item['spans'] = self.postprocess_spans(spans) | |
| def postprocess_spans(self, spans): | |
| if spans: | |
| for j, span in enumerate(list(spans)): | |
| if span['n_tokens'] < 3: | |
| if len(spans) > 1: | |
| if j == 0: | |
| spans[j]['label'] = spans[j + 1]['label'] | |
| elif j == len(spans) - 1: | |
| spans[j]['label'] = spans[j - 1]['label'] | |
| elif spans[j - 1]['label'] == spans[j + 1]['label']: | |
| spans[j]['label'] = spans[j - 1]['label'] | |
| else: | |
| spans[j]['label'] = 'O' | |
| else: | |
| spans[j]['label'] = 'O' | |
| new_spans = [] | |
| for label, label_spans in itertools.groupby(spans, key=lambda s: s['label']): | |
| if label == 'O': | |
| continue | |
| label_spans = list(label_spans) | |
| new_spans.append({'start': label_spans[0]['start'], 'end': label_spans[-1]['end'], 'label': label}) | |
| return new_spans | |
| def process_comment(self): | |
| sentiment = dict() | |
| score_dict = self.get_score() | |
| self.comment_dict.update(score_dict) | |
| self.cleaner() | |
| try: | |
| review_dict_entities = self.apply_ner_model() | |
| sentiment = self.apply_sentiment_model(review_dict_entities) | |
| self.reformat_output(sentiment) | |
| # for very small texts ner model errors | |
| except AssertionError: | |
| self.comment_dict["skip"] = True | |
| sentiment.update(self.comment_dict) | |
| # sentiment.update({"spans": [{"label": review_json_cleaned["text"], "color": "", "value": "", "sentiment": "", "score": None}]}) | |
| label_color_mappings = list() | |
| for label, label_color in LABEL_COLOR.items(): | |
| label_color_mappings.append({"label": label, "color": label_color}) | |
| sentiment.update({"color_map": label_color_mappings}) | |
| return sentiment | |
| def main(self): | |
| return self.process_comment() | |
| class SentenceBoundsFinder: | |
| def __init__(self, nlp=None): | |
| # self._nlp = nlp or spacy.load('en_core_web_sm') | |
| # self._nlp.add_pipe('sentencizer') | |
| self._nlp = nlp or self.language_model | |
| def __call__(self, text): | |
| bounds = [] | |
| for sent in self._nlp(text).sents: | |
| bounds.append((sent.start_char, sent.end_char)) | |
| return bounds | |
| class ReviewsCleaner: | |
| """ | |
| Class for the cleaning of review dataset and collecting statistics on cleaning | |
| :param replace_emojis: Replace emojis to text representing them | |
| :param unicode_normalize: Normalize unicode chars | |
| :param remove_non_regular_chars: Remove chars with ordinal number <128 | |
| :param remove_junk: Remove characters that are not relevant for the reviews and often corrupt tokens (* \n \r \t) | |
| :param remove_double_spaces: Remove double spaces | |
| :param remove_boundary_quotes: Remove quotes which on boundaries of text | |
| :param same_quotes: Transform all quote marks into single quote mark | |
| """ | |
| def __init__(self, replace_emojis=True, unicode_normalize=True, remove_non_regular_chars=True, remove_junk=True, | |
| remove_double_spaces=True, remove_boundary_quotes=True, same_quotes=True): | |
| self.methods = [] | |
| # Add new methods here !!! MIND THE ORDER !!! | |
| if replace_emojis: | |
| self.methods.append(('Deemojize', lambda text: self.__demojize(text))) | |
| if unicode_normalize: | |
| self.methods.append(('Normalize', lambda text: ''.join( | |
| c for c in unicodedata.normalize('NFD', text) if unicodedata.category(c) != 'Mn'))) | |
| if same_quotes: | |
| self.methods.append(('Same quotes', lambda text: re.sub('"|’|`|“', '\'', text))) | |
| if remove_boundary_quotes: | |
| self.methods.append(('Rm boundary quotes', lambda text: self.__remove_boundary(text))) | |
| if remove_junk: | |
| self.methods.append(('Remove junk', lambda text: re.sub('\*|\n|\r|\t|_x000D_', ' ', text))) | |
| if remove_non_regular_chars: | |
| self.methods.append(('Remove non-regular', lambda text: ''.join(c for c in text if ord(c) < 128))) | |
| if remove_double_spaces: | |
| self.methods.append(('Remove double spaces', lambda text: ' '.join(text.split()))) | |
| self.stats = {name: [0, 0] for name, _ in self.methods} # name, characters changed, reviews affected | |
| self.analyzed_reviews = 0 | |
| self.skipped = 0 | |
| def clean_stats(self): | |
| """Reset statistics""" | |
| self.stats = {[name, 0, 0] for name, _ in self.methods} | |
| self.analyzed_reviews = 0 | |
| def print_stats(self): | |
| """Print statistics of used methods""" | |
| print(f'Reviews analyzed: {self.analyzed_reviews}') | |
| print("{:<20} {:<10} {:<10}".format('Name', 'Avg. % of chars', '% of reviews affected')) | |
| for name, item in self.stats.items(): | |
| print("{:<20} {:<10} {:<10}".format(name, f'{(100 * item[0] / self.analyzed_reviews):.2f}%', | |
| f'{(100 * item[1] / self.analyzed_reviews):.2f}%')) | |
| print(f'Language skip\t-\t{(100 * self.skipped / self.analyzed_reviews):.2f}%') | |
| def clean_text(self, text): | |
| """Clean line of text""" | |
| self.analyzed_reviews += 1 | |
| if len(text) == 0: | |
| return text | |
| for method_name, method_fun in self.methods: | |
| text = method_fun(text) | |
| return text | |
| def __demojize(text): | |
| text = demojize(text, delimiters=[' ', ' ']) | |
| text = re.sub('_[a-z]*_skin_tone', '', text) | |
| return text | |
| def __remove_boundary(text): | |
| if text[:1] == '\'': | |
| text = text[1:] | |
| if text[-1:] == '\'': | |
| text = text[:-1] | |
| return text | |
| def process_single_comment(raw_data, LANGUAGE_MODEL, SENTIMENT_MODEL, LABELS ): | |
| ml = MlProcessing(comment_dict=raw_data, language_model=LANGUAGE_MODEL, sentiment_model=SENTIMENT_MODEL, labels=LABELS ) | |
| processed_data = ml.main() | |
| spans = processed_data.get('spans', list()) | |
| has_sentiments = True | |
| if not any(spans): | |
| spans = [{'label': raw_data.get('text', str()), 'color': '', 'value': '', 'sentiment': '', 'score': ''}] | |
| has_sentiments = False | |
| processed_data['spans'] = spans | |
| return processed_data, has_sentiments | |