| | import paddle |
| | import numpy as np |
| | import random |
| | from paddlenlp.transformers import SkepTokenizer, SkepModel |
| | import gradio as gr |
| | from seqeval.metrics.sequence_labeling import get_entities |
| | label_ext_path = "./data/data121190/label_ext.dict" |
| | label_cls_path = "./data/data121242/label_cls.dict" |
| | ext_model_path = "./best_ext.pdparams" |
| | cls_model_path = "./best_cls.pdparams" |
| | def set_seed(seed): |
| | paddle.seed(seed) |
| | random.seed(seed) |
| | np.random.seed(seed) |
| | def format_print(results): |
| | for result in results: |
| | aspect, opinion = result[0], set(result[1:]) |
| | print(f"aspect: {aspect}, opinion: {opinion}\n") |
| |
|
| | def decoding(text, tag_seq): |
| | assert len(text) == len(tag_seq), f"text len: {len(text)}, tag_seq len: {len(tag_seq)}" |
| |
|
| | puncs = list(",.?;!,。?;!") |
| | splits = [idx for idx in range(len(text)) if text[idx] in puncs] |
| |
|
| | prev = 0 |
| | sub_texts, sub_tag_seqs = [], [] |
| | for i, split in enumerate(splits): |
| | sub_tag_seqs.append(tag_seq[prev:split]) |
| | sub_texts.append(text[prev:split]) |
| | prev = split |
| | sub_tag_seqs.append(tag_seq[prev:]) |
| | sub_texts.append((text[prev:])) |
| |
|
| | ents_list = [] |
| | for sub_text, sub_tag_seq in zip(sub_texts, sub_tag_seqs): |
| | ents = get_entities(sub_tag_seq, suffix=False) |
| | ents_list.append((sub_text, ents)) |
| |
|
| | aps = [] |
| | no_a_words = [] |
| | for sub_tag_seq, ent_list in ents_list: |
| | sub_aps = [] |
| | sub_no_a_words = [] |
| | |
| | for ent in ent_list: |
| | ent_name, start, end = ent |
| | if ent_name == "Aspect": |
| | aspect = sub_tag_seq[start:end+1] |
| | sub_aps.append([aspect]) |
| | if len(sub_no_a_words) > 0: |
| | sub_aps[-1].extend(sub_no_a_words) |
| | sub_no_a_words.clear() |
| | else: |
| | ent_name == "Opinion" |
| | opinion = sub_tag_seq[start:end + 1] |
| | if len(sub_aps) > 0: |
| | sub_aps[-1].append(opinion) |
| | else: |
| | sub_no_a_words.append(opinion) |
| |
|
| | if sub_aps: |
| | aps.extend(sub_aps) |
| | if len(no_a_words) > 0: |
| | aps[-1].extend(no_a_words) |
| | no_a_words.clear() |
| | elif sub_no_a_words: |
| | if len(aps) > 0: |
| | aps[-1].extend(sub_no_a_words) |
| | else: |
| | no_a_words.extend(sub_no_a_words) |
| |
|
| | if no_a_words: |
| | no_a_words.insert(0, "None") |
| | aps.append(no_a_words) |
| |
|
| | return aps |
| | |
| | def is_aspect_first(text, aspect, opinion_word): |
| | return text.find(aspect) <= text.find(opinion_word) |
| |
|
| | def concate_aspect_and_opinion(text, aspect, opinion_words): |
| | aspect_text = "" |
| | for opinion_word in opinion_words: |
| | if is_aspect_first(text, aspect, opinion_word): |
| | aspect_text += aspect+opinion_word+"," |
| | else: |
| | aspect_text += opinion_word+aspect+"," |
| | aspect_text = aspect_text[:-1] |
| |
|
| | return aspect_text |
| |
|
| | def format_print(results): |
| | for result in results: |
| | aspect, opinions, sentiment = result["aspect"], result["opinions"], result["sentiment"] |
| | print(f"aspect: {aspect}, opinions: {opinions}, sentiment: {sentiment}") |
| | print() |
| | return f"aspect: {aspect}, opinions: {opinions}, sentiment: {sentiment}" |
| |
|
| | def is_target_first(text, target, word): |
| | return text.find(target) <= text.find(word) |
| |
|
| |
|
| | def ext_load_dict(dict_path): |
| | with open(dict_path, "r", encoding="utf-8") as f: |
| | words = [word.strip() for word in f.readlines()] |
| | word2id = dict(zip(words, range(len(words)))) |
| | id2word = dict((v, k) for k, v in word2id.items()) |
| |
|
| | return word2id, id2word |
| |
|
| |
|
| | def cls_load_dict(dict_path): |
| | with open(dict_path, "r", encoding="utf-8") as f: |
| | words = [word.strip() for word in f.readlines()] |
| | word2id = dict(zip(words, range(len(words)))) |
| | id2word = dict((v, k) for k, v in word2id.items()) |
| |
|
| | return word2id, id2word |
| |
|
| |
|
| | def read(data_path): |
| | with open(data_path, "r", encoding="utf-8") as f: |
| | for line in f.readlines(): |
| | items = line.strip().split("\t") |
| | assert len(items) == 3 |
| | example = {"label": int( |
| | items[0]), "target_text": items[1], "text": items[2]} |
| |
|
| | yield example |
| |
|
| |
|
| | def convert_example_to_feature(example, tokenizer, label2id, max_seq_len=512, is_test=False): |
| | encoded_inputs = tokenizer( |
| | example["target_text"], text_pair=example["text"], max_seq_len=max_seq_len, return_length=True) |
| |
|
| | if not is_test: |
| | label = example["label"] |
| | return encoded_inputs["input_ids"], encoded_inputs["token_type_ids"], encoded_inputs["seq_len"], label |
| |
|
| | return encoded_inputs["input_ids"], encoded_inputs["token_type_ids"], encoded_inputs["seq_len"] |
| | class SkepForTokenClassification(paddle.nn.Layer): |
| | def __init__(self, skep, num_classes=2, dropout=None): |
| | super(SkepForTokenClassification, self).__init__() |
| | self.num_classes = num_classes |
| | self.skep = skep |
| | self.dropout = paddle.nn.Dropout( |
| | dropout if dropout is not None else self.skep.config["hidden_dropout_prob"]) |
| | self.classifier = paddle.nn.Linear( |
| | self.skep.config["hidden_size"], num_classes) |
| |
|
| | def forward(self, input_ids, token_type_ids=None, position_ids=None, attention_mask=None): |
| | sequence_output, _ = self.skep( |
| | input_ids, token_type_ids=token_type_ids, position_ids=position_ids, attention_mask=attention_mask) |
| |
|
| | sequence_output = self.dropout(sequence_output) |
| | logits = self.classifier(sequence_output) |
| | return logits |
| | class SkepForSequenceClassification(paddle.nn.Layer): |
| | def __init__(self, skep, num_classes=2, dropout=None): |
| | super(SkepForSequenceClassification, self).__init__() |
| | self.num_classes = num_classes |
| | self.skep = skep |
| | self.dropout = paddle.nn.Dropout( |
| | dropout if dropout is not None else self.skep.config["hidden_dropout_prob"]) |
| | self.classifier = paddle.nn.Linear( |
| | self.skep.config["hidden_size"], num_classes) |
| |
|
| | def forward(self, input_ids, token_type_ids=None, position_ids=None, attention_mask=None): |
| | _, pooled_output = self.skep(input_ids, token_type_ids=token_type_ids, |
| | position_ids=position_ids, attention_mask=attention_mask) |
| |
|
| | pooled_output = self.dropout(pooled_output) |
| | logits = self.classifier(pooled_output) |
| | return logits |
| | |
| | model_name = "skep_ernie_1.0_large_ch" |
| | target1_dir = "./skepTokenizer" |
| | target2_dir = "./skepModel" |
| | ext_label2id, ext_id2label = ext_load_dict(label_ext_path) |
| | cls_label2id, cls_id2label = cls_load_dict(label_cls_path) |
| | tokenizer = SkepTokenizer.from_pretrained(target1_dir) |
| | print("label dict loaded.") |
| |
|
| | |
| | ext_state_dict = paddle.load(ext_model_path) |
| | ext_skep = SkepModel.from_pretrained(target2_dir) |
| | ext_model = SkepForTokenClassification(ext_skep, num_classes=len(ext_label2id)) |
| | ext_model.load_dict(ext_state_dict) |
| | print("extraction model loaded.") |
| |
|
| | |
| | cls_state_dict = paddle.load(cls_model_path) |
| | cls_skep = ext_skep |
| | cls_model = SkepForSequenceClassification( |
| | cls_skep, num_classes=len(cls_label2id)) |
| | cls_model.load_dict(cls_state_dict) |
| | print("classification model loaded.") |
| | def predict(input_text): |
| |
|
| | ext_model.eval() |
| | cls_model.eval() |
| |
|
| | |
| | encoded_inputs = tokenizer(list(input_text), is_split_into_words=True, max_seq_len=max_seq_len,) |
| | input_ids = paddle.to_tensor([encoded_inputs["input_ids"]]) |
| | token_type_ids = paddle.to_tensor([encoded_inputs["token_type_ids"]]) |
| |
|
| | |
| | logits = ext_model(input_ids, token_type_ids=token_type_ids) |
| | predictions = logits.argmax(axis=2).numpy()[0] |
| | tag_seq = [ext_id2label[idx] for idx in predictions][1:-1] |
| | aps = decoding(input_text, tag_seq) |
| |
|
| | |
| | results = [] |
| | for ap in aps: |
| | aspect = ap[0] |
| | opinion_words = list(set(ap[1:])) |
| | aspect_text = concate_aspect_and_opinion(input_text, aspect, opinion_words) |
| | |
| | encoded_inputs = tokenizer(aspect_text, text_pair=input_text, max_seq_len=max_seq_len, return_length=True) |
| | input_ids = paddle.to_tensor([encoded_inputs["input_ids"]]) |
| | token_type_ids = paddle.to_tensor([encoded_inputs["token_type_ids"]]) |
| |
|
| | logits = cls_model(input_ids, token_type_ids=token_type_ids) |
| | prediction = logits.argmax(axis=1).numpy()[0] |
| |
|
| | result = {"aspect": aspect, "opinions": opinion_words, "sentiment": cls_id2label[prediction]} |
| | results.append(result) |
| |
|
| | |
| | return format_print(results) |
| | max_seq_len = 512 |
| | gr.Interface(inputs=["text"],outputs=["text"],fn= predict).launch() |