|
|
|
|
|
|
|
|
""" |
|
|
分词器 |
|
|
""" |
|
|
from collections import defaultdict |
|
|
import json |
|
|
import logging |
|
|
import re |
|
|
from typing import * |
|
|
import unicodedata |
|
|
|
|
|
from tqdm import tqdm |
|
|
|
|
|
from toolbox.string.character import Character, LowerCase, Pattern |
|
|
|
|
|
logger = logging.getLogger(__file__) |
|
|
|
|
|
|
|
|
_DEFAULT_SPLITTER_NAME = 'unknown' |
|
|
|
|
|
|
|
|
class Splitter(object): |
|
|
def __init__(self, name=_DEFAULT_SPLITTER_NAME): |
|
|
self.name = name |
|
|
|
|
|
def split(self, text: str) -> List[str]: |
|
|
raise NotImplementedError() |
|
|
|
|
|
def post_process(self, tokens: List[List[str]]): |
|
|
return tokens |
|
|
|
|
|
|
|
|
class ByCharSplitterV1(Splitter): |
|
|
def __init__(self, name=_DEFAULT_SPLITTER_NAME): |
|
|
super().__init__(name=name) |
|
|
|
|
|
def split(self, text: str) -> List[str]: |
|
|
return self._split(text) |
|
|
|
|
|
@staticmethod |
|
|
def _split(text: str) -> List[str]: |
|
|
flag = Character.f_unknown |
|
|
sep = '[sep]' |
|
|
ret = '' |
|
|
for c in text: |
|
|
if Character.is_hyphens(c): |
|
|
ret += c |
|
|
flag = Character.f_is_hyphens |
|
|
elif Character.is_punctuation(c) or Character.is_cjk_character(c) or Character.is_jap_character(c): |
|
|
if flag != Character.f_is_hyphens: |
|
|
c = sep + c |
|
|
ret += c |
|
|
flag = Character.f_is_punctuation |
|
|
elif Character.is_space(c): |
|
|
|
|
|
if flag != Character.f_is_space: |
|
|
c = sep + c |
|
|
ret += c |
|
|
flag = Character.f_is_space |
|
|
elif Character.is_alpha(c): |
|
|
if flag not in (Character.f_is_alpha, Character.f_is_hyphens): |
|
|
c = sep + c |
|
|
ret += c |
|
|
flag = Character.f_is_alpha |
|
|
elif Character.is_num(c): |
|
|
if flag not in (Character.f_is_num, Character.f_is_hyphens): |
|
|
c = sep + c |
|
|
ret += c |
|
|
flag = Character.f_is_num |
|
|
else: |
|
|
if flag not in (Character.f_unknown, Character.f_is_hyphens): |
|
|
c = sep + c |
|
|
ret += c |
|
|
flag = Character.f_unknown |
|
|
|
|
|
ret = ret.split(sep) |
|
|
ret = [ch for ch in ret if ch != ''] |
|
|
|
|
|
if len(''.join(ret)) != len(text): |
|
|
raise AssertionError('this method should not change the char num. ' |
|
|
'text: {}, ret: {}'.format(text, ''.join(ret))) |
|
|
return ret |
|
|
|
|
|
|
|
|
class ByCharSplitterV2(Splitter): |
|
|
""" |
|
|
在正则表达式的锚点识别时, `3000-3999` 应能分割出 `000`, 因此, 连续的数字须视作一个 token. |
|
|
于是定义了此类, 以区别于将连续的数字被识别为多个 token. |
|
|
ByCharSplitterV1 中, 连续的数字如 `3000` 将被分割为 ['3', '0', '0', '0'] |
|
|
""" |
|
|
def __init__(self, name=_DEFAULT_SPLITTER_NAME): |
|
|
super().__init__(name=name) |
|
|
|
|
|
def split(self, text: str) -> List[str]: |
|
|
return self._split(text) |
|
|
|
|
|
@staticmethod |
|
|
def _split(text: str) -> List[str]: |
|
|
"""将 text 分割为 token list, 然后再按 token 到 trie 树匹配, 分词. """ |
|
|
flag = Character.f_unknown |
|
|
sep = '[sep]' |
|
|
ret = '' |
|
|
for c in text: |
|
|
if Character.is_hyphens(c): |
|
|
|
|
|
c = sep + c |
|
|
ret += c |
|
|
flag = Character.f_is_hyphens |
|
|
elif Character.is_punctuation(c) or Character.is_cjk_character(c) or Character.is_jap_character(c): |
|
|
if flag != Character.f_is_hyphens: |
|
|
c = sep + c |
|
|
ret += c |
|
|
flag = Character.f_is_punctuation |
|
|
elif Character.is_space(c): |
|
|
|
|
|
if flag != Character.f_is_space: |
|
|
c = sep + c |
|
|
ret += c |
|
|
flag = Character.f_is_space |
|
|
elif Character.is_alpha(c): |
|
|
if flag not in (Character.f_is_alpha, Character.f_is_hyphens): |
|
|
c = sep + c |
|
|
ret += c |
|
|
flag = Character.f_is_alpha |
|
|
elif Character.is_num(c): |
|
|
|
|
|
if flag not in (Character.f_is_hyphens,): |
|
|
c = sep + c |
|
|
ret += c |
|
|
flag = Character.f_is_num |
|
|
else: |
|
|
if flag not in (Character.f_unknown, Character.f_is_hyphens): |
|
|
c = sep + c |
|
|
ret += c |
|
|
flag = Character.f_unknown |
|
|
|
|
|
ret = ret.split(sep) |
|
|
ret = [ch for ch in ret if ch != ''] |
|
|
|
|
|
if len(''.join(ret)) != len(text): |
|
|
raise AssertionError('this method should not change the char num. ' |
|
|
'text: {}, ret: {}'.format(text, ''.join(ret))) |
|
|
return ret |
|
|
|
|
|
|
|
|
class ListSplitter(Splitter): |
|
|
def split(self, text: str): |
|
|
return list(text) |
|
|
|
|
|
|
|
|
class ListEncodeOneSplitter(Splitter): |
|
|
def split(self, text: str): |
|
|
result = list() |
|
|
|
|
|
for c in text: |
|
|
dummy = '[{}]'.format(ord(c)) |
|
|
result.append(dummy) |
|
|
return result |
|
|
|
|
|
def post_process(self, tokens: List[List[str]]): |
|
|
tokens_ = list() |
|
|
for token in tokens: |
|
|
token_ = list() |
|
|
for t in token: |
|
|
idx = t[1:-1] |
|
|
t = chr(int(idx)) |
|
|
token_.append(t) |
|
|
tokens_.append(token_) |
|
|
|
|
|
return tokens_ |
|
|
|
|
|
|
|
|
_DEFAULT_SPLITTER_NAME_TO_SPLITTER = { |
|
|
'by_char_splitter_v1': ByCharSplitterV1(), |
|
|
'by_char_splitter_v2': ByCharSplitterV2(), |
|
|
'list_splitter': ListSplitter(), |
|
|
'list_encoder_one_splitter': ListEncodeOneSplitter(), |
|
|
} |
|
|
|
|
|
|
|
|
_DEFAULT_TOKENIZER_NAME = 'unknown' |
|
|
|
|
|
|
|
|
class Tokenizer(object): |
|
|
"""Abstract""" |
|
|
@staticmethod |
|
|
def lowercase(string: str) -> str: |
|
|
string = LowerCase.lowercase(string) |
|
|
return string |
|
|
|
|
|
def __init__(self, name=_DEFAULT_TOKENIZER_NAME, case_sensitive=False): |
|
|
self.name = name |
|
|
self.case_sensitive = case_sensitive |
|
|
|
|
|
def insert(self, word: str) -> None: |
|
|
raise NotImplementedError() |
|
|
|
|
|
def insert_from_list(self, words: Iterable[Any]) -> None: |
|
|
words = cast(List[Any], words) |
|
|
if len(words) == 0: |
|
|
return None |
|
|
for word in tqdm(words): |
|
|
self.insert(word) |
|
|
|
|
|
def insert_black(self, word: str) -> None: |
|
|
raise NotImplementedError() |
|
|
|
|
|
def insert_black_from_list(self, words: Iterable[Any]) -> None: |
|
|
words = cast(List[Any], words) |
|
|
if len(words) == 0: |
|
|
return None |
|
|
for word in tqdm(words): |
|
|
self.insert_black(word) |
|
|
|
|
|
def tokenize(self, text: str, full_mode: bool = False) -> Tuple[List[str], List[bool]]: |
|
|
raise NotImplementedError() |
|
|
|
|
|
@staticmethod |
|
|
def _merge_tokens(tokens: List[str], isword_list: List[bool]) -> Tuple[List[str], List[bool]]: |
|
|
""" |
|
|
在 tokenize 分词后, 由于应用了黑名单, 有些分割出的词被标记为 False, |
|
|
这导致结果中出现连续的两个 False. |
|
|
在 segmenter 中, 多个分词器选后执行, 连续的两个 False 应合并, 以优化后面的分词的效果. |
|
|
这里, 只合并连续的两个 False, 不处理其它符号. |
|
|
""" |
|
|
tokens2, isword_list2 = list(), list() |
|
|
false_token = '' |
|
|
for token, isword in zip(tokens, isword_list): |
|
|
if isword is False: |
|
|
false_token += str(token) |
|
|
continue |
|
|
|
|
|
if false_token != '': |
|
|
tokens2.append(false_token) |
|
|
isword_list2.append(False) |
|
|
|
|
|
tokens2.append(token) |
|
|
isword_list2.append(isword) |
|
|
false_token = '' |
|
|
else: |
|
|
if false_token != '': |
|
|
tokens2.append(false_token) |
|
|
isword_list2.append(False) |
|
|
return tokens2, isword_list2 |
|
|
|
|
|
|
|
|
class TrieNode(object): |
|
|
"""建立词典的Trie树节点""" |
|
|
|
|
|
def __init__(self, t_word=None): |
|
|
self.t_word = t_word |
|
|
self.children = dict() |
|
|
|
|
|
def add_children(self, k, v): |
|
|
self.children[k] = v |
|
|
|
|
|
@property |
|
|
def text(self): |
|
|
if self.t_word is None: |
|
|
return None |
|
|
return ''.join(self.t_word) |
|
|
|
|
|
@property |
|
|
def isword(self): |
|
|
if self.t_word is None: |
|
|
return False |
|
|
return True |
|
|
|
|
|
def __repr__(self): |
|
|
return '<{}.{} t_word={}>'.format(self.__module__, self.__class__.__name__, self.t_word) |
|
|
|
|
|
|
|
|
class FastTokenizer(Tokenizer): |
|
|
|
|
|
@staticmethod |
|
|
def demo1(): |
|
|
fast = FastTokenizer() |
|
|
fast.insert('我要退款') |
|
|
fast.insert('色彩显示') |
|
|
fast.insert('我要') |
|
|
fast.insert('退款') |
|
|
fast.insert('eid') |
|
|
fast.insert('手机') |
|
|
fast.insert('机不') |
|
|
text = '手机不错我要退款' |
|
|
|
|
|
c = fast.tokenize(text, full_mode=True) |
|
|
|
|
|
print(c) |
|
|
return |
|
|
|
|
|
@staticmethod |
|
|
def demo2(): |
|
|
fast = FastTokenizer(splitter=ListEncodeOneSplitter()) |
|
|
|
|
|
fast.insert('พูดว่า') |
|
|
fast.insert('นะ') |
|
|
fast.insert('พูดถึง') |
|
|
fast.insert('คำพูด') |
|
|
fast.insert('บอ') |
|
|
text = 'พูดว่าอะไรนะ' |
|
|
|
|
|
c = fast.tokenize(text, full_mode=False) |
|
|
|
|
|
print(c) |
|
|
return |
|
|
|
|
|
@staticmethod |
|
|
def token_list_to_string_list(token_list: List[List[str]]) -> List[str]: |
|
|
"""因为 spliter 是将句子分割为 List[str], tokenize 是将列表中的子字符串合并为词. """ |
|
|
ret = list() |
|
|
for l in token_list: |
|
|
ret.append(''.join(l)) |
|
|
return ret |
|
|
|
|
|
def __init__(self, splitter: Optional[Union[Splitter, str]] = None, name=_DEFAULT_TOKENIZER_NAME, case_sensitive=False): |
|
|
if isinstance(splitter, str): |
|
|
splitter = _DEFAULT_SPLITTER_NAME_TO_SPLITTER[splitter] |
|
|
self.splitter = splitter or ByCharSplitterV1() |
|
|
self.trie = TrieNode() |
|
|
self._black_list: List[str] = list() |
|
|
super(FastTokenizer, self).__init__(name=name, case_sensitive=case_sensitive) |
|
|
|
|
|
def insert(self, word: str) -> None: |
|
|
word = str(word) |
|
|
|
|
|
if not self.case_sensitive: |
|
|
word = self.lowercase(word) |
|
|
|
|
|
t_word = self.splitter.split(word) |
|
|
self._insert_node(t_word) |
|
|
|
|
|
def insert_black(self, word: str) -> None: |
|
|
""" |
|
|
黑名单. |
|
|
如遇到 `watch tv` 时, 不要识别出 `watch`. |
|
|
注意: 因为是最大匹配, 所以在 `watch` 在黑名单时, `watch tv` 是可以识别到的. |
|
|
""" |
|
|
if word not in self._black_list: |
|
|
self.insert(word) |
|
|
self._black_list.append(word) |
|
|
|
|
|
def _insert_node(self, t_word: List[str]) -> None: |
|
|
now = self.trie |
|
|
for t in t_word[:-1]: |
|
|
if t not in now.children: |
|
|
now.add_children(t, TrieNode()) |
|
|
now = now.children[t] |
|
|
t = t_word[-1] |
|
|
|
|
|
if t not in now.children: |
|
|
now.add_children(t, TrieNode(t_word)) |
|
|
else: |
|
|
now.children[t].t_word = t_word |
|
|
|
|
|
def _tokenize(self, t_word: list, full_mode: bool = False): |
|
|
outlst, iswlst = list(), list() |
|
|
l = len(t_word) |
|
|
b_idx = 0 |
|
|
l_idx = 0 |
|
|
max_e_idx = 0 |
|
|
while b_idx < l: |
|
|
now = self.trie |
|
|
found = False |
|
|
ptr = b_idx |
|
|
e_idx = None |
|
|
while True: |
|
|
t = t_word[ptr] |
|
|
if not self.case_sensitive: |
|
|
t = self.lowercase(t) |
|
|
|
|
|
if t not in now.children and e_idx is not None: |
|
|
found = True |
|
|
break |
|
|
if t not in now.children and e_idx is None: |
|
|
break |
|
|
if now.isword and full_mode: |
|
|
if full_mode: |
|
|
outlst.append(t_word[b_idx: ptr]) |
|
|
iswlst.append(True) |
|
|
|
|
|
now = now.children[t] |
|
|
ptr += 1 |
|
|
if now.isword: |
|
|
e_idx = ptr |
|
|
|
|
|
if ptr == l and e_idx is None: |
|
|
break |
|
|
if ptr == l and e_idx is not None: |
|
|
found = True |
|
|
break |
|
|
|
|
|
if found is True: |
|
|
if l_idx != b_idx: |
|
|
outlst.append(t_word[l_idx: b_idx]) |
|
|
iswlst.append(False) |
|
|
|
|
|
outlst.append(t_word[b_idx: e_idx]) |
|
|
iswlst.append(True) |
|
|
max_e_idx = max(max_e_idx, e_idx) |
|
|
if full_mode: |
|
|
b_idx += 1 |
|
|
else: |
|
|
b_idx = e_idx |
|
|
l_idx = b_idx |
|
|
else: |
|
|
b_idx += 1 |
|
|
|
|
|
if max_e_idx < l: |
|
|
outlst.append(t_word[l_idx:l]) |
|
|
iswlst.append(False) |
|
|
return outlst, iswlst |
|
|
|
|
|
def tokenize(self, text: Union[str, List[str]], full_mode=False) -> Tuple[List[str], List[bool]]: |
|
|
if isinstance(text, list): |
|
|
text_list = text |
|
|
else: |
|
|
text_list = [text] |
|
|
|
|
|
outlst, iswlst = list(), list() |
|
|
for text in text_list: |
|
|
t_word = self.splitter.split(text) |
|
|
outlst_tmp, iswlst_tmp = self._tokenize(t_word, full_mode) |
|
|
|
|
|
outlst.extend(outlst_tmp) |
|
|
iswlst.extend(iswlst_tmp) |
|
|
|
|
|
outlst = self.splitter.post_process(outlst) |
|
|
|
|
|
outlst = self.token_list_to_string_list(outlst) |
|
|
|
|
|
|
|
|
for idx, out in enumerate(outlst): |
|
|
if out in self._black_list: |
|
|
iswlst[idx] = False |
|
|
|
|
|
outlst, iswlst = self._merge_tokens(outlst, iswlst) |
|
|
return outlst, iswlst |
|
|
|
|
|
|
|
|
class TagTokenizer(FastTokenizer): |
|
|
def __init__(self, name=_DEFAULT_TOKENIZER_NAME, case_sensitive=False): |
|
|
super().__init__(name=name, case_sensitive=case_sensitive) |
|
|
self._word2flags_dict = defaultdict(list) |
|
|
|
|
|
def insert(self, word: str, tag: str = None) -> None: |
|
|
if tag is not None: |
|
|
self._word2flags_dict[word].append(tag) |
|
|
super().insert(word) |
|
|
|
|
|
def tokenize(self, text: Union[str, List[str]], full_mode: bool = False) -> Tuple[List[str], List[bool]]: |
|
|
outlst, iswlst = super().tokenize(text) |
|
|
|
|
|
iswlst2 = list() |
|
|
for out, isw in zip(outlst, iswlst): |
|
|
if isw is True: |
|
|
iswlst2.append(self._word2flags_dict.get(out, True)) |
|
|
else: |
|
|
iswlst2.append(False) |
|
|
return outlst, iswlst2 |
|
|
|
|
|
|
|
|
class RegularTokenizer(Tokenizer): |
|
|
""" |
|
|
不同于 FastTokenizer, 此处用正则表示代替词来进行匹配. |
|
|
|
|
|
优化: |
|
|
1. 基于正则表达式 index 的快速查找. |
|
|
2. re.compile. 在遇到无效正则表达式时, 会报错. |
|
|
""" |
|
|
@staticmethod |
|
|
def demo1(): |
|
|
regular = RegularTokenizer() |
|
|
regular.insert('我要退款') |
|
|
regular.insert('色彩显示') |
|
|
regular.insert('我要') |
|
|
regular.insert('退款') |
|
|
regular.insert('eid') |
|
|
regular.insert('手机') |
|
|
regular.insert('机不') |
|
|
regular.insert(r'\d+左右') |
|
|
|
|
|
text = '1500左右的手机不错我要退款' |
|
|
|
|
|
ret = regular.tokenize(text, full_mode=False) |
|
|
print(ret) |
|
|
return |
|
|
|
|
|
@staticmethod |
|
|
def _outlst_iswlst_append(token, isword, outlst, iswlst): |
|
|
if len(token) > 0: |
|
|
outlst.append(token) |
|
|
iswlst.append(isword) |
|
|
return outlst, iswlst |
|
|
|
|
|
def __init__(self, name=_DEFAULT_TOKENIZER_NAME, case_sensitive=False): |
|
|
self.regular_quick_find_tokenizer = RegularQuickFindTokenizer() |
|
|
self._black_list = list() |
|
|
super(RegularTokenizer, self).__init__(name=name, case_sensitive=case_sensitive) |
|
|
|
|
|
def insert(self, word: str) -> None: |
|
|
""" |
|
|
:param word: 正则表达式. |
|
|
""" |
|
|
self.regular_quick_find_tokenizer.insert(pattern=str(word)) |
|
|
|
|
|
def insert_black(self, word: str) -> None: |
|
|
"""添加黑名单""" |
|
|
if word not in self._black_list: |
|
|
self._black_list.append(word) |
|
|
|
|
|
def tokenize(self, text: str, full_mode: bool = False) -> Tuple[List[str], List[bool]]: |
|
|
text = str(text) |
|
|
if not self.case_sensitive: |
|
|
text_ = self.lowercase(text) |
|
|
else: |
|
|
text_ = text |
|
|
|
|
|
potential_pattern, no_index_pattern = self.regular_quick_find_tokenizer.get_potential_pattern(text=text_) |
|
|
|
|
|
pattern_set = potential_pattern | no_index_pattern |
|
|
span_list = list() |
|
|
for pattern in pattern_set: |
|
|
try: |
|
|
if self.case_sensitive: |
|
|
pattern = re.compile(pattern) |
|
|
else: |
|
|
pattern = re.compile(pattern, re.I) |
|
|
except re.error as e: |
|
|
logger.error('{}, pattern: {}'.format(e, pattern)) |
|
|
continue |
|
|
match_iter = re.finditer(pattern, text_) |
|
|
for match in match_iter: |
|
|
match_str = match.group(0).strip() |
|
|
if len(match_str) >= 2: |
|
|
span_list.append(match.span()) |
|
|
|
|
|
if full_mode: |
|
|
span_accept = span_list |
|
|
else: |
|
|
span_list = sorted(span_list, key=lambda x: x[1] - x[0], reverse=True) |
|
|
span_list = sorted(span_list, key=lambda x: x[0], reverse=False) |
|
|
|
|
|
span_accept = [(0, 0)] |
|
|
for span in span_list: |
|
|
if span[0] >= span_accept[-1][1]: |
|
|
span_accept.append(span) |
|
|
|
|
|
outlst, iswlst = list(), list() |
|
|
last_idx = None |
|
|
for b, e in span_accept: |
|
|
if last_idx is None: |
|
|
outlst, iswlst = self._outlst_iswlst_append(text[:b], False, outlst, iswlst) |
|
|
else: |
|
|
outlst, iswlst = self._outlst_iswlst_append(text[last_idx:b], False, outlst, iswlst) |
|
|
outlst, iswlst = self._outlst_iswlst_append(text[b:e], True, outlst, iswlst) |
|
|
last_idx = e |
|
|
outlst, iswlst = self._outlst_iswlst_append(text[last_idx:], False, outlst, iswlst) |
|
|
|
|
|
|
|
|
for idx, out in enumerate(outlst): |
|
|
if out in self._black_list: |
|
|
iswlst[idx] = False |
|
|
return self._merge_tokens(outlst, iswlst) |
|
|
|
|
|
|
|
|
class RegularQuickFindTokenizer(FastTokenizer): |
|
|
""" |
|
|
根据正则表达式的锚点, 快速查找可能在 text 上成立的正则表达式. |
|
|
|
|
|
1. insert 正则表达式, |
|
|
2. 获取索引, 并插入分词器, |
|
|
3. 使用分词器对句子分词, 匹配到的部分就有可能匹配其正则表达式. |
|
|
|
|
|
""" |
|
|
@staticmethod |
|
|
def demo1(): |
|
|
quick = RegularQuickFindTokenizer() |
|
|
quick.insert('.*[0-9]000.*到[0-9]999.*') |
|
|
quick.insert('^(?=.*(华为|苹果).*(手机|手表)).*(电脑|平板).*(?=.*小米(手机|手表)).*$') |
|
|
quick.insert(r'.*(输入密码)0米(\d{2.10}).*') |
|
|
quick.insert(r'.*(输入|密码)(\d{2.10}).*') |
|
|
quick.insert('^(?=.*(华为|苹果).*(电脑|平板|手表).*$') |
|
|
quick.insert('*0米.*(左|右).*') |
|
|
quick.insert('.*[0-9].*[0-9].*') |
|
|
quick.insert(r'\d+左右') |
|
|
|
|
|
text = '3000-3999 的华为手表, 有没有, 1500左右的也可以. ' |
|
|
|
|
|
ret = quick.tokenize(text) |
|
|
print(ret) |
|
|
ret = quick.get_potential_pattern(text) |
|
|
print(ret) |
|
|
return |
|
|
|
|
|
def __init__(self, splitter: Optional[Splitter] = None, name=_DEFAULT_TOKENIZER_NAME, case_sensitive=False): |
|
|
splitter = splitter or ByCharSplitterV2() |
|
|
self._no_index_pattern: Set[str] = set() |
|
|
self._index_to_pattern: Dict[str, Set[str]] = defaultdict(set) |
|
|
super().__init__(splitter=splitter, name=name, case_sensitive=case_sensitive) |
|
|
|
|
|
def insert(self, pattern: str) -> None: |
|
|
indexes: List[str] = RegularIndexParse.get_indexes(pattern) |
|
|
if indexes is None: |
|
|
self._no_index_pattern.add(pattern) |
|
|
else: |
|
|
for index in indexes: |
|
|
self._index_to_pattern[index].add(pattern) |
|
|
super(RegularQuickFindTokenizer, self).insert(index) |
|
|
|
|
|
def get_potential_pattern(self, text: str) -> Tuple[Set[str], Set[str]]: |
|
|
""" |
|
|
:return: 两个集合, 第一个是潜在正则表达式集合, 第二个是 insert 进来的无 index 正则, |
|
|
""" |
|
|
pattern = set() |
|
|
|
|
|
outlst, iswlst = self.tokenize(text, full_mode=True) |
|
|
for out, isw in zip(outlst, iswlst): |
|
|
if isw is True: |
|
|
|
|
|
pattern.update(self._index_to_pattern[out]) |
|
|
return pattern, self._no_index_pattern |
|
|
|
|
|
|
|
|
class RegularIndexParse(object): |
|
|
alp_num_ch = re.compile(Pattern.alp_num_ch) |
|
|
brackets = re.compile(Pattern.brackets) |
|
|
square_brackets = re.compile(Pattern.square_brackets) |
|
|
regex_dsw_find = re.compile(Pattern.regex_dsw_find) |
|
|
|
|
|
@staticmethod |
|
|
def demo1(): |
|
|
pattern = r'\d+左右' |
|
|
|
|
|
ret = RegularIndexParse.get_indexes(pattern) |
|
|
print(ret) |
|
|
return |
|
|
|
|
|
def __init__(self): |
|
|
pass |
|
|
|
|
|
@classmethod |
|
|
def _split_by_brackers(cls, text): |
|
|
|
|
|
brackets = ['(', ')'] |
|
|
result = [] |
|
|
tmp = '' |
|
|
flag = 0 |
|
|
for s in text: |
|
|
if s not in brackets: |
|
|
tmp += s |
|
|
elif s == '(': |
|
|
if tmp and flag == 0: |
|
|
result.append(tmp) |
|
|
tmp = '' |
|
|
tmp += s |
|
|
flag = flag + 1 |
|
|
else: |
|
|
tmp += s |
|
|
flag = flag - 1 |
|
|
if flag == 0: |
|
|
result.append(tmp) |
|
|
tmp = '' |
|
|
if tmp: |
|
|
result.append(tmp) |
|
|
return result |
|
|
|
|
|
@classmethod |
|
|
def _get_index_in_brackets(cls, text): |
|
|
|
|
|
|
|
|
|
|
|
index = cls._get_index_out_of_brackets(text) |
|
|
if index: |
|
|
return [index.group()] |
|
|
|
|
|
tmps = cls.brackets.findall(text) |
|
|
index = [] |
|
|
for tmp in tmps: |
|
|
tmp_index = cls.alp_num_ch.findall(tmp) |
|
|
if len(index) == 0: |
|
|
index = tmp_index |
|
|
elif len(tmp_index) < len(index): |
|
|
index = tmp_index |
|
|
return index |
|
|
|
|
|
@classmethod |
|
|
def _get_index_out_of_brackets(cls, text): |
|
|
|
|
|
tmp1 = cls.brackets.sub('', text) |
|
|
|
|
|
tmp2 = cls.square_brackets.sub('', tmp1) |
|
|
|
|
|
tmp3 = cls.regex_dsw_find.sub('', tmp2) |
|
|
|
|
|
tmp4 = cls.alp_num_ch.search(tmp3) |
|
|
return tmp4 |
|
|
|
|
|
@classmethod |
|
|
def get_indexes(cls, text: str) -> Union[List[str], None]: |
|
|
indexes = cls._get_index_out_of_brackets(text) |
|
|
if indexes: |
|
|
return [indexes.group()] |
|
|
pieces = cls._split_by_brackers(text) |
|
|
for p in pieces: |
|
|
if '(' in p: |
|
|
if '(' in p[1:-1]: |
|
|
tmp_index = cls._get_index_in_brackets(p[1:-1]) |
|
|
else: |
|
|
tmp_index = cls.alp_num_ch.findall(p) |
|
|
|
|
|
if indexes is None: |
|
|
indexes = tmp_index |
|
|
else: |
|
|
if len(tmp_index) < len(indexes): |
|
|
indexes = tmp_index |
|
|
return indexes |
|
|
|
|
|
|
|
|
class IndivisibleTokenizer(FastTokenizer): |
|
|
def __init__(self, |
|
|
indivisible_dict: Dict[str, Tuple[List[str], List[List[str]]]], |
|
|
case_sensitive=False): |
|
|
""" |
|
|
指定分割词 / 不可分割词. |
|
|
将词分按指定方式分割. 元组中第一项是分词的列表, 第二项是每个子词对应的词性(可以有多个词性). |
|
|
""" |
|
|
super(IndivisibleTokenizer, self).__init__(case_sensitive=case_sensitive) |
|
|
self.word2tags = defaultdict(list) |
|
|
for word, t_words in indivisible_dict.items(): |
|
|
self.insert(word, t_words) |
|
|
|
|
|
@classmethod |
|
|
def from_json_file(cls, filename, case_sensitive=False): |
|
|
with open(filename, 'r', encoding='utf-8') as f: |
|
|
indivisible_dict = json.load(f) |
|
|
return cls(indivisible_dict=indivisible_dict, case_sensitive=case_sensitive) |
|
|
|
|
|
def insert(self, word: str, tag: Tuple[List[str], List[List[str]]] = None) -> None: |
|
|
if tag is None: |
|
|
tag = list() |
|
|
self.word2tags[word] = tag |
|
|
super().insert(word) |
|
|
|
|
|
def tokenize(self, text: Union[str, List[str]], full_mode: bool = False) -> Tuple[List[str], List[bool]]: |
|
|
outlst, iswlst = super().tokenize(text) |
|
|
outlst2, iswlst2 = list(), list() |
|
|
for out, isw in zip(outlst, iswlst): |
|
|
if isw is True: |
|
|
word_list, tags_list = self.word2tags[out] |
|
|
outlst2.extend(word_list) |
|
|
iswlst2.extend(tags_list) |
|
|
else: |
|
|
outlst2.append(out) |
|
|
iswlst2.append(isw) |
|
|
return outlst2, iswlst2 |
|
|
|
|
|
|
|
|
def whitespace_tokenize(text): |
|
|
"""Runs basic whitespace cleaning and splitting on a piece of text.""" |
|
|
text = text.strip() |
|
|
if not text: |
|
|
return [] |
|
|
tokens = text.split() |
|
|
return tokens |
|
|
|
|
|
|
|
|
def demo1(): |
|
|
text = '我想买一个老人用的, 1500左右, huawei watch gt 感觉还可以, 它性价比高吗, 有优惠活动吗?' |
|
|
fast = FastTokenizer() |
|
|
|
|
|
fast.insert_from_list(['huawei watch gt', 'huawei p30系列', 'huawei p30 pro']) |
|
|
|
|
|
|
|
|
result = fast.tokenize(text) |
|
|
print(result) |
|
|
return |
|
|
|
|
|
|
|
|
def demo2(): |
|
|
text = '我想买一个老人用的, 1500左右, huawei watch gt 感觉还可以, 它性价比高吗, 有优惠活动吗?' |
|
|
fast = RegularTokenizer() |
|
|
fast.insert_from_list([r'\d+']) |
|
|
|
|
|
result = fast.tokenize(text) |
|
|
print(result) |
|
|
return |
|
|
|
|
|
|
|
|
def demo3(): |
|
|
text = '我想买一个老人用的, 1500左右, huawei watch gt 感觉还可以, 它性价比高吗, 有优惠活动吗?' |
|
|
RegularIndexParse.get_indexes('') |
|
|
ret = RegularIndexParse.get_indexes('.*[0-9]000.*到[0-9]999.*') |
|
|
print(ret) |
|
|
ret = RegularIndexParse.get_indexes('.*[0-9].*[0-9].*') |
|
|
print(ret) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return |
|
|
|
|
|
|
|
|
def demo4(): |
|
|
FastTokenizer.demo2() |
|
|
|
|
|
|
|
|
return |
|
|
|
|
|
|
|
|
if __name__ == '__main__': |
|
|
|
|
|
|
|
|
|
|
|
demo4() |
|
|
|