turner_ml / ml_service.py
aamirtaymoor's picture
Update ml_service.py
7669c5a verified
import itertools
import json
import re
import fasttext
import pandas as pd
import spacy
from simpletransformers.ner import NERModel
from spacy.matcher import PhraseMatcher
from constants import POSITIVE_SENTIMENT_PATTERNS, LABEL_COLOR, CATEGORY_THRESHOLD
# from django.conf import settings
from emoji import demojize
import unicodedata
# base_directory = settings.BASE_DIR
labels_file = f"ml_models/labels.json"
ner_model_directory = f"ml_models/ner_model/"
sentiment_model_file = f"ml_models/sentiment_model/model.ft"
class MlProcessing:
def __init__(self, comment_dict, language_model, sentiment_model, labels):
self.comment_dict = comment_dict
self.is_cleaned = False
self.language_model = language_model
self.sentiment_model = sentiment_model
self.labels = labels
def remove_prefix(self, label):
return label.split('-')[-1]
def labels_to_spans(self, tokens, labels):
spans = []
for label, group in itertools.groupby(zip(tokens, labels), key=lambda x: self.remove_prefix(x[1])):
if label == 'O':
continue
group_tokens = [t for t, _ in group]
spans.append({'label': label, 'start': group_tokens[0]['start'], 'end': group_tokens[-1]['end'],
'n_tokens': len(group_tokens)})
return spans
def score_to_str(self, score):
if pd.isna(score):
return ''
return f'RATING_{int(score)}'
def configure_matcher(self, nlp, patterns):
matcher = PhraseMatcher(nlp.vocab, attr='LOWER')
patterns = [nlp.make_doc(p) for p in patterns]
matcher.add('positive', patterns)
return matcher
def cleaner(self):
cleaner = ReviewsCleaner()
self.comment_dict['text'] = cleaner.clean_text(self.comment_dict['text'])
self.comment_dict['cleaned'] = True
self.is_cleaned = True
def clip(self, x, min_, max_):
if x < min_:
return min_
if x > max_:
return max_
return x
def get_score(self):
record = dict()
if "star_rating" in self.comment_dict and self.comment_dict['star_rating'] is not None and str(self.comment_dict['star_rating']).isnumeric():
record["score"] = self.clip(float(self.comment_dict['star_rating']), 0, 5)
elif 'tali_score' in self.comment_dict and self.comment_dict['tali_score'] is not None and str(self.comment_dict['tali_score']).isnumeric():
record['score'] = self.clip(float(self.comment_dict['tali_score']) // 2, 0, 5)
else:
record['score'] = None
record['score_str'] = self.score_to_str(record['score'])
return record
def reformat_output(self, data):
text = data["text"]
spans = data.get("spans", list())
new_spans = list()
previous_span_end = -1
for i, span in enumerate(spans):
span_start = span["start"]
span_end = span["end"]
# there's some unlabelled span between the last added span and present labelled span
# this would work for first span as well
if span_start != previous_span_end + 1:
new_spans.append({
"label": text[previous_span_end + 1:span_start],
"color": "",
"value": "",
"sentiment": "",
"score": None
})
# Add the present span
new_spans.append({
"label": text[span_start:span_end],
"color": LABEL_COLOR[span["label"]],
"value": span["label"],
"sentiment": span["sentiment"],
"score": span["score"]
})
previous_span_end = span_end
# If the added span is the last labelled span but there's unlabelled text remaining
# that needs to be added
if (i == len(spans) - 1) and span_end < len(text):
new_spans.append({
"label": text[span_end:],
"color": "",
"value": "",
"sentiment": "",
"score": None,
})
previous_span_end = len(text)
data.update({"spans": new_spans})
def preprocess_text(self, text):
text = text.lower()
text = re.sub('(?<=\.)\.', ' ', text)
text = text.strip().strip('. ",')
text = text.replace('\n', ' ')
text = text.replace('’', "'")
text = re.sub('\s+', ' ', text)
return text
def predict(self, model, text, category):
text = self.preprocess_text(text)
labels, probs = model.predict(text, k=2)
if labels[0] == '__label__POSITIVE':
prob = probs[0]
else:
prob = probs[1]
if prob >= CATEGORY_THRESHOLD[category]:
label = 'POSITIVE'
else:
label = 'NEGATIVE'
return {'label': label, 'score': prob}
def apply_sentiment_model(self, review_dict_entities):
# nlp = spacy.load('en_core_web_sm')
nlp = self.language_model
sentence_finder = SentenceBoundsFinder(nlp)
positive_sentiment_matcher = self.configure_matcher(nlp, POSITIVE_SENTIMENT_PATTERNS)
sentiment_model = self.load_sentiment_model()
if self.comment_dict['skip']:
return self.comment_dict
review = re.sub(r'["“”]|_x000D_', ' ', self.comment_dict['text'])
sentence_bounds = sentence_finder(review)
for span in self.comment_dict.get('spans', []):
segment_text = self.comment_dict['text'][span['start']:span['end']].replace('\n', ' ')
segment_doc = nlp(segment_text)
matches = positive_sentiment_matcher(segment_doc)
if matches:
sentiments = {'label': 'POSITIVE', 'score': 1.}
span['sentiment'] = sentiments.get('label')
span['score'] = sentiments.get('score')
else:
span_start = self.get_sentence_start(sentence_bounds, span['start'])
text = self.comment_dict['text'][span_start:span['end']].replace('\n', ' ')
text = f"{self.comment_dict['score_str'].lower()} {span['label'].lower()} {text}"
sentiments = self.predict(sentiment_model, text, span['label'])
span['sentiment'] = sentiments.get('label')
span['score'] = sentiments.get('score')
return self.comment_dict
def load_sentiment_model(self):
# return fasttext.load_model(sentiment_model_file)
return self.sentiment_model
def get_sentence_start(self, sentence_bounds, position):
for start, end in sentence_bounds:
if start <= position <= end:
return start
raise RuntimeError('Failed to get sentence bound')
def load_ner_model(self, max_seq_len=500, use_multiprocessing=True):
args = {'overwrite_output_dir': False, 'reprocess_input_data': True, 'num_train_epochs': 30,
'evaluation_strategy': 'epoch', 'evaluate_during_training': True, 'silent': True,
'max_seq_length': max_seq_len, 'use_multiprocessing': use_multiprocessing,
'use_multiprocessing_for_evaluation': use_multiprocessing, 'fp16': True}
labels = self.labels
return NERModel('longformer', ner_model_directory, args=args, use_cuda=False, labels=labels)
def apply_ner_model(self):
nlp = self.language_model
# nlp = spacy.load('en_core_web_sm')
# nlp.add_pipe('sentencizer')
regex = re.compile('(\(original.{0,3}\).+)', re.IGNORECASE | re.MULTILINE | re.DOTALL)
if self.comment_dict['skip']:
return self.comment_dict
self.comment_dict['text'] = regex.sub('', self.comment_dict['text'])
self.comment_dict['_doc'] = nlp(self.comment_dict['text'])
seq_lengths = [len(self.comment_dict['_doc'])]
seq_lengths = sorted(seq_lengths)
len_1 = seq_lengths[int(len(seq_lengths) * 0.8)]
len_2 = seq_lengths[-1]
ner_model_1 = self.load_ner_model(int(1.5 * len_1))
try:
model = ner_model_1
if len(self.comment_dict['_doc']) > len_1:
ner_model_2 = self.load_ner_model(int(1.5 * len_2))
model = ner_model_2
self._apply_ner_model(model, self.comment_dict)
return self.comment_dict
except Exception as e:
self.comment_dict['skip'] = True
def _apply_ner_model(self, ner_model, item):
doc = item['_doc']
del item['_doc']
predictions, _ = ner_model.predict([[t.text for t in doc]], split_on_space=False)
predictions = predictions[0]
tokens = doc.to_json()['tokens']
if len(tokens) != len(predictions):
# set_failed(db, task, 'Failed to apply NER model.')
item['spans'] = []
return
for t, p in zip(tokens, predictions):
t['label'] = list(p.values())[0]
labels = [t['label'] for t in tokens]
spans = self.labels_to_spans(tokens, labels)
item['spans'] = self.postprocess_spans(spans)
def postprocess_spans(self, spans):
if spans:
for j, span in enumerate(list(spans)):
if span['n_tokens'] < 3:
if len(spans) > 1:
if j == 0:
spans[j]['label'] = spans[j + 1]['label']
elif j == len(spans) - 1:
spans[j]['label'] = spans[j - 1]['label']
elif spans[j - 1]['label'] == spans[j + 1]['label']:
spans[j]['label'] = spans[j - 1]['label']
else:
spans[j]['label'] = 'O'
else:
spans[j]['label'] = 'O'
new_spans = []
for label, label_spans in itertools.groupby(spans, key=lambda s: s['label']):
if label == 'O':
continue
label_spans = list(label_spans)
new_spans.append({'start': label_spans[0]['start'], 'end': label_spans[-1]['end'], 'label': label})
return new_spans
def process_comment(self):
sentiment = dict()
score_dict = self.get_score()
self.comment_dict.update(score_dict)
self.cleaner()
try:
review_dict_entities = self.apply_ner_model()
sentiment = self.apply_sentiment_model(review_dict_entities)
self.reformat_output(sentiment)
# for very small texts ner model errors
except AssertionError:
self.comment_dict["skip"] = True
sentiment.update(self.comment_dict)
# sentiment.update({"spans": [{"label": review_json_cleaned["text"], "color": "", "value": "", "sentiment": "", "score": None}]})
label_color_mappings = list()
for label, label_color in LABEL_COLOR.items():
label_color_mappings.append({"label": label, "color": label_color})
sentiment.update({"color_map": label_color_mappings})
return sentiment
def main(self):
return self.process_comment()
class SentenceBoundsFinder:
def __init__(self, nlp=None):
# self._nlp = nlp or spacy.load('en_core_web_sm')
# self._nlp.add_pipe('sentencizer')
self._nlp = nlp or self.language_model
def __call__(self, text):
bounds = []
for sent in self._nlp(text).sents:
bounds.append((sent.start_char, sent.end_char))
return bounds
class ReviewsCleaner:
"""
Class for the cleaning of review dataset and collecting statistics on cleaning
:param replace_emojis: Replace emojis to text representing them
:param unicode_normalize: Normalize unicode chars
:param remove_non_regular_chars: Remove chars with ordinal number <128
:param remove_junk: Remove characters that are not relevant for the reviews and often corrupt tokens (* \n \r \t)
:param remove_double_spaces: Remove double spaces
:param remove_boundary_quotes: Remove quotes which on boundaries of text
:param same_quotes: Transform all quote marks into single quote mark
"""
def __init__(self, replace_emojis=True, unicode_normalize=True, remove_non_regular_chars=True, remove_junk=True,
remove_double_spaces=True, remove_boundary_quotes=True, same_quotes=True):
self.methods = []
# Add new methods here !!! MIND THE ORDER !!!
if replace_emojis:
self.methods.append(('Deemojize', lambda text: self.__demojize(text)))
if unicode_normalize:
self.methods.append(('Normalize', lambda text: ''.join(
c for c in unicodedata.normalize('NFD', text) if unicodedata.category(c) != 'Mn')))
if same_quotes:
self.methods.append(('Same quotes', lambda text: re.sub('"|’|`|“', '\'', text)))
if remove_boundary_quotes:
self.methods.append(('Rm boundary quotes', lambda text: self.__remove_boundary(text)))
if remove_junk:
self.methods.append(('Remove junk', lambda text: re.sub('\*|\n|\r|\t|_x000D_', ' ', text)))
if remove_non_regular_chars:
self.methods.append(('Remove non-regular', lambda text: ''.join(c for c in text if ord(c) < 128)))
if remove_double_spaces:
self.methods.append(('Remove double spaces', lambda text: ' '.join(text.split())))
self.stats = {name: [0, 0] for name, _ in self.methods} # name, characters changed, reviews affected
self.analyzed_reviews = 0
self.skipped = 0
def clean_stats(self):
"""Reset statistics"""
self.stats = {[name, 0, 0] for name, _ in self.methods}
self.analyzed_reviews = 0
def print_stats(self):
"""Print statistics of used methods"""
print(f'Reviews analyzed: {self.analyzed_reviews}')
print("{:<20} {:<10} {:<10}".format('Name', 'Avg. % of chars', '% of reviews affected'))
for name, item in self.stats.items():
print("{:<20} {:<10} {:<10}".format(name, f'{(100 * item[0] / self.analyzed_reviews):.2f}%',
f'{(100 * item[1] / self.analyzed_reviews):.2f}%'))
print(f'Language skip\t-\t{(100 * self.skipped / self.analyzed_reviews):.2f}%')
def clean_text(self, text):
"""Clean line of text"""
self.analyzed_reviews += 1
if len(text) == 0:
return text
for method_name, method_fun in self.methods:
text = method_fun(text)
return text
@staticmethod
def __demojize(text):
text = demojize(text, delimiters=[' ', ' '])
text = re.sub('_[a-z]*_skin_tone', '', text)
return text
@staticmethod
def __remove_boundary(text):
if text[:1] == '\'':
text = text[1:]
if text[-1:] == '\'':
text = text[:-1]
return text
def process_single_comment(raw_data, LANGUAGE_MODEL, SENTIMENT_MODEL, LABELS ):
ml = MlProcessing(comment_dict=raw_data, language_model=LANGUAGE_MODEL, sentiment_model=SENTIMENT_MODEL, labels=LABELS )
processed_data = ml.main()
spans = processed_data.get('spans', list())
has_sentiments = True
if not any(spans):
spans = [{'label': raw_data.get('text', str()), 'color': '', 'value': '', 'sentiment': '', 'score': ''}]
has_sentiments = False
processed_data['spans'] = spans
return processed_data, has_sentiments