| |
| """ |
| Модель частеречной разметки для русскоязычных текстов (проект https://github.com/Koziev/rupostagger) |
| 03.08.2019 небольшой баг с нормализацией (замена "ё" на "е") перед поиском в грамматическом словаре |
| """ |
|
|
| from __future__ import print_function |
| from __future__ import division |
|
|
| import os |
| import json |
| import pathlib |
| import re |
|
|
| import pycrfsuite |
| from .ruword2tags import RuWord2Tags |
| from .rusyllab import split_word |
|
|
|
|
| BEG_TOKEN = '<beg>' |
| END_TOKEN = '<end>' |
|
|
| token2tag = {BEG_TOKEN: BEG_TOKEN, END_TOKEN: END_TOKEN} |
|
|
|
|
| def is_num(token): |
| return re.match('^[0-9]+$', token) |
|
|
|
|
| class RuPosTagger(object): |
| def __init__(self): |
| self.winspan = -1 |
| self.use_w2v = -1 |
| self.use_syllabs = -1 |
| self.ending_len = -1 |
| self.word2tags = None |
|
|
| def load(self, word2tags_path=None): |
| module_folder = str(pathlib.Path(__file__).resolve().parent) |
| data_folder = os.path.join(module_folder, '../tmp') |
|
|
| config_path = os.path.join(data_folder, 'rupostagger.config') |
| if not os.path.exists(config_path): |
| data_folder = module_folder |
| config_path = os.path.join(data_folder, 'rupostagger.config') |
|
|
| |
| |
|
|
| with open(config_path, 'r') as rdr: |
| self.config = json.load(rdr) |
| self.winspan = self.config['winspan'] |
| self.use_gren = self.config['use_gren'] |
| self.use_w2v = self.config['use_w2v'] |
| self.use_syllabs = self.config['use_syllabs'] |
| self.ending_len = self.config['ending_len'] |
|
|
| self.word2tags = RuWord2Tags() |
| self.word2tags.load(word2tags_path) |
|
|
| model_path = os.path.join(data_folder, 'rupostagger.model') |
| self.tagger = pycrfsuite.Tagger() |
| self.tagger.open(model_path) |
|
|
| @staticmethod |
| def __normalize_word(word): |
| return word.replace(' - ', '-').replace(u'ё', u'е').lower() |
|
|
| def get_word_features(self, word, prefix): |
| assert(len(word) > 0) |
| features = [] |
| if word in token2tag: |
| features.append((u'tag[{}]={}'.format(prefix, token2tag[word]), 1.0)) |
| elif is_num(word): |
| features.append((u'tag[{}]=<num> tag[{}]=<num_{}>'.format(prefix, prefix, word[-1]), 1.0)) |
| elif len(word) == 1 and word[0] in u'‼≠™®•·[¡+<>`~;.,‚?!-…№”“„{}|‹›/\'"–—_:«»*]()‘’≈': |
| features.append((u'tag[{}]=punct_{}'.format(prefix, ord(word[0])), 1.0)) |
| else: |
| uword = self.__normalize_word(word) |
| first_char = word[0] |
| if first_char in u'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ': |
| features.append((u'word[{}]=<latin>'.format(prefix), 1.0)) |
| else: |
| if first_char in u'АБВГДЕЁЖЗИЙКЛМНОПРСТУФХЦЧШЩЪЫЬЭЮЯ': |
| features.append((u'word[{}]=<upper1>'.format(prefix), 1.0)) |
|
|
| if self.ending_len > 0: |
| ending = '~' + uword[-self.ending_len:] if len(uword) > self.ending_len else uword |
| features.append((u'ending[{}]={}'.format(prefix, ending), 1.0)) |
|
|
| if self.use_syllabs and first_char.lower() in u'абвгдеёжзийклмнопрстуфхцчшщъыьэюя': |
| syllabs = split_word(uword) |
| if len(syllabs) > 0: |
| if len(syllabs) == 1: |
| features.append((u'slb[{}]={}'.format(prefix, syllabs[0] + '~'), 1.0)) |
| else: |
| features.append((u'slb[{}]={}'.format(prefix, syllabs[0]+'~'), 1.0)) |
| for s in syllabs[1:-1]: |
| features.append((u'slb[{}]={}'.format(prefix, '~'+s+'~'), 1.0)) |
| features.append((u'slb[{}]={}'.format(prefix, '~'+syllabs[-1]), 1.0)) |
|
|
| if self.use_gren: |
| tags = set() |
| for tagset in self.word2tags[uword]: |
| tags.update(tagset.split(' ')) |
|
|
| for tag in tags: |
| features.append((u'tag[{}]={}'.format(prefix, tag), 1.0)) |
|
|
| return features |
|
|
| def vectorize_sample(self, words): |
| lines2 = [] |
| nb_words = len(words) |
| for iword, word in enumerate(words): |
| word_features = dict() |
| for j in range(-self.winspan, self.winspan + 1): |
| iword2 = iword + j |
| if iword2 < 0: |
| features = [('word[{}]=<beg>'.format(j), 1.0)] |
| elif iword2 >= nb_words: |
| features = [('word[{}]=<end>'.format(j), 1.0)] |
| else: |
| features = self.get_word_features(words[iword2], str(j)) |
| word_features.update(features) |
|
|
| lines2.append(word_features) |
|
|
| return lines2 |
|
|
| def tag(self, words): |
| |
| X = self.vectorize_sample(words) |
| y_pred = self.tagger.tag(X) |
| |
| return zip(words, y_pred) |
|
|
|
|
| def test1(tagger, phrase, required_labels): |
| pred_labels = list(tagger.tag(phrase.split())) |
| assert(len(required_labels.split()) == len(pred_labels)) |
| for required_label, (word, pred_label) in zip(required_labels.split(), pred_labels): |
| for tag in required_label.split('|'): |
| if tag not in pred_label: |
| print(u'Error: phrase={} word={} required_label={} pred_label={}'.format(phrase, word, required_label, pred_label)) |
| return False |
|
|
| return True |
|
|
|
|
| def run_tests(): |
| tagger = RuPosTagger() |
| tagger.load() |
|
|
| for phrase, required_labels in [(u'Кошки спят', u'NOUN|Number=Plur|Case=Nom VERB|Mood=Ind|Number=Plur|Person=3|Tense=Notpast|VerbForm=Fin'), |
| (u'Я рою колодец', u'PRON VERB NOUN|Number=Sing|Case=Acc'), |
| (u'Я мою окно', u'PRON VERB NOUN|Number=Sing|Case=Acc'), |
| (u'Ира мыла окно', u'NOUN|Case=Nom VERB NOUN|Number=Sing|Case=Acc'), |
| (u'Возьми мою пилу', u'VERB ADJ|Case=Acc NOUN|Case=Acc'), |
| (u'рой колодец', u'VERB NOUN|Number=Sing|Case=Acc'), |
| (u'У меня живёт черепаха', u'ADP PRON VERB NOUN'), |
| (u'какую еду ты любишь ?', u'ADJ NOUN PRON VERB PUNCT') |
| ]: |
| if not test1(tagger, phrase, required_labels): |
| print('Tests FAILED') |
| return |
|
|
| print('Tests PASSED OK') |
|
|
|
|
| if __name__ == '__main__': |
| run_tests() |
|
|
|
|