|
|
""" |
|
|
Utility functions for dealing with NER tagging. |
|
|
""" |
|
|
|
|
|
import logging |
|
|
|
|
|
from stanza.models.common.vocab import EMPTY |
|
|
|
|
|
logger = logging.getLogger('stanza') |
|
|
|
|
|
EMPTY_TAG = ('_', '-', '', None) |
|
|
EMPTY_OR_O_TAG = tuple(list(EMPTY_TAG) + ['O']) |
|
|
|
|
|
def is_basic_scheme(all_tags): |
|
|
""" |
|
|
Check if a basic tagging scheme is used. Return True if so. |
|
|
|
|
|
Args: |
|
|
all_tags: a list of NER tags |
|
|
|
|
|
Returns: |
|
|
True if the tagging scheme does not use B-, I-, etc, otherwise False |
|
|
""" |
|
|
for tag in all_tags: |
|
|
if len(tag) > 2 and tag[:2] in ('B-', 'I-', 'S-', 'E-', 'B_', 'I_', 'S_', 'E_'): |
|
|
return False |
|
|
return True |
|
|
|
|
|
|
|
|
def is_bio_scheme(all_tags): |
|
|
""" |
|
|
Check if BIO tagging scheme is used. Return True if so. |
|
|
|
|
|
Args: |
|
|
all_tags: a list of NER tags |
|
|
|
|
|
Returns: |
|
|
True if the tagging scheme is BIO, otherwise False |
|
|
""" |
|
|
for tag in all_tags: |
|
|
if tag in EMPTY_OR_O_TAG: |
|
|
continue |
|
|
elif len(tag) > 2 and tag[:2] in ('B-', 'I-', 'B_', 'I_'): |
|
|
continue |
|
|
else: |
|
|
return False |
|
|
return True |
|
|
|
|
|
def to_bio2(tags): |
|
|
""" |
|
|
Convert the original tag sequence to BIO2 format. If the input is already in BIO2 format, |
|
|
the original input is returned. |
|
|
|
|
|
Args: |
|
|
tags: a list of tags in either BIO or BIO2 format |
|
|
|
|
|
Returns: |
|
|
new_tags: a list of tags in BIO2 format |
|
|
""" |
|
|
new_tags = [] |
|
|
for i, tag in enumerate(tags): |
|
|
if tag in EMPTY_OR_O_TAG: |
|
|
new_tags.append(tag) |
|
|
elif tag[0] == 'I': |
|
|
if i == 0 or tags[i-1] == 'O' or tags[i-1][1:] != tag[1:]: |
|
|
new_tags.append('B' + tag[1:]) |
|
|
else: |
|
|
new_tags.append(tag) |
|
|
else: |
|
|
new_tags.append(tag) |
|
|
return new_tags |
|
|
|
|
|
def basic_to_bio(tags): |
|
|
""" |
|
|
Convert a basic tag sequence into a BIO sequence. |
|
|
You can compose this with bio2_to_bioes to convert to bioes |
|
|
|
|
|
Args: |
|
|
tags: a list of tags in basic (no B-, I-, etc) format |
|
|
|
|
|
Returns: |
|
|
new_tags: a list of tags in BIO format |
|
|
""" |
|
|
new_tags = [] |
|
|
for i, tag in enumerate(tags): |
|
|
if tag in EMPTY_OR_O_TAG: |
|
|
new_tags.append(tag) |
|
|
elif i == 0 or tags[i-1] == 'O' or tags[i-1] != tag: |
|
|
new_tags.append('B-' + tag) |
|
|
else: |
|
|
new_tags.append('I-' + tag) |
|
|
return new_tags |
|
|
|
|
|
|
|
|
def bio2_to_bioes(tags): |
|
|
""" |
|
|
Convert the BIO2 tag sequence into a BIOES sequence. |
|
|
|
|
|
Args: |
|
|
tags: a list of tags in BIO2 format |
|
|
|
|
|
Returns: |
|
|
new_tags: a list of tags in BIOES format |
|
|
""" |
|
|
new_tags = [] |
|
|
for i, tag in enumerate(tags): |
|
|
if tag in EMPTY_OR_O_TAG: |
|
|
new_tags.append(tag) |
|
|
else: |
|
|
if len(tag) < 2: |
|
|
raise Exception(f"Invalid BIO2 tag found: {tag}") |
|
|
else: |
|
|
if tag[:2] in ('I-', 'I_'): |
|
|
if i+1 < len(tags) and tags[i+1][:2] in ('I-', 'I_'): |
|
|
new_tags.append('I-' + tag[2:]) |
|
|
else: |
|
|
new_tags.append('E-' + tag[2:]) |
|
|
elif tag[:2] in ('B-', 'B_'): |
|
|
if i+1 < len(tags) and tags[i+1][:2] in ('I-', 'I_'): |
|
|
new_tags.append('B-' + tag[2:]) |
|
|
else: |
|
|
new_tags.append('S-' + tag[2:]) |
|
|
else: |
|
|
raise Exception(f"Invalid IOB tag found: {tag}") |
|
|
return new_tags |
|
|
|
|
|
def normalize_empty_tags(sentences): |
|
|
""" |
|
|
If any tags are None, _, -, or blank, turn them into EMPTY |
|
|
|
|
|
The input should be a list(sentence) of list(word) of tuple(text, list(tag)) |
|
|
which is the typical format for the data at the time data.py is preprocessing the tags |
|
|
""" |
|
|
new_sentences = [[(word[0], tuple(EMPTY if x in EMPTY_TAG else x for x in word[1])) for word in sentence] |
|
|
for sentence in sentences] |
|
|
return new_sentences |
|
|
|
|
|
def process_tags(sentences, scheme): |
|
|
""" |
|
|
Convert tags in these sentences to bioes |
|
|
|
|
|
We allow empty tags ('_', '-', None), which will represent tags |
|
|
that do not get any gradient when training |
|
|
""" |
|
|
all_words = [] |
|
|
all_tags = [] |
|
|
converted_tuples = False |
|
|
for sent_idx, sent in enumerate(sentences): |
|
|
words, tags = zip(*sent) |
|
|
all_words.append(words) |
|
|
|
|
|
|
|
|
|
|
|
if not converted_tuples and any(tag is None or isinstance(tag, str) for tag in tags): |
|
|
if sent_idx > 0: |
|
|
raise ValueError("Got a mix of tags and lists of tags. First non-list was in sentence %d" % sent_idx) |
|
|
converted_tuples = True |
|
|
if converted_tuples: |
|
|
if not all(tag is None or isinstance(tag, str) for tag in tags): |
|
|
raise ValueError("Got a mix of tags and lists of tags. First tag as a list was in sentence %d" % sent_idx) |
|
|
tags = [(tag,) for tag in tags] |
|
|
all_tags.append(tags) |
|
|
|
|
|
max_columns = max(len(x) for tags in all_tags for x in tags) |
|
|
for sent_idx, tags in enumerate(all_tags): |
|
|
if any(len(x) < max_columns for x in tags): |
|
|
raise ValueError("NER tags not uniform in length at sentence %d. TODO: extend those columns with O" % sent_idx) |
|
|
|
|
|
all_convert_bio_to_bioes = [] |
|
|
all_convert_basic_to_bioes = [] |
|
|
|
|
|
for column_idx in range(max_columns): |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
convert_bio_to_bioes = False |
|
|
convert_basic_to_bioes = False |
|
|
tag_column = [x[column_idx] for sent in all_tags for x in sent] |
|
|
is_bio = is_bio_scheme(tag_column) |
|
|
is_basic = not is_bio and is_basic_scheme(tag_column) |
|
|
if is_bio and scheme.lower() == 'bioes': |
|
|
convert_bio_to_bioes = True |
|
|
logger.debug("BIO tagging scheme found in input at column %d; converting into BIOES scheme..." % column_idx) |
|
|
elif is_basic and scheme.lower() == 'bioes': |
|
|
convert_basic_to_bioes = True |
|
|
logger.debug("Basic tagging scheme found in input at column %d; converting into BIOES scheme..." % column_idx) |
|
|
all_convert_bio_to_bioes.append(convert_bio_to_bioes) |
|
|
all_convert_basic_to_bioes.append(convert_basic_to_bioes) |
|
|
|
|
|
result = [] |
|
|
for words, tags in zip(all_words, all_tags): |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
tags = [[x for x in sentence_tags] for sentence_tags in tags] |
|
|
for column_idx, (convert_bio_to_bioes, convert_basic_to_bioes) in enumerate(zip(all_convert_bio_to_bioes, all_convert_basic_to_bioes)): |
|
|
tag_column = [x[column_idx] for x in tags] |
|
|
if convert_basic_to_bioes: |
|
|
|
|
|
tag_column = bio2_to_bioes(basic_to_bio(tag_column)) |
|
|
else: |
|
|
|
|
|
tag_column = to_bio2(tag_column) |
|
|
|
|
|
if convert_bio_to_bioes: |
|
|
tag_column = bio2_to_bioes(tag_column) |
|
|
for tag_idx, tag in enumerate(tag_column): |
|
|
tags[tag_idx][column_idx] = tag |
|
|
result.append([(w,tuple(t)) for w,t in zip(words, tags)]) |
|
|
|
|
|
if converted_tuples: |
|
|
result = [[(word[0], word[1][0]) for word in sentence] for sentence in result] |
|
|
return result |
|
|
|
|
|
|
|
|
def decode_from_bioes(tags): |
|
|
""" |
|
|
Decode from a sequence of BIOES tags, assuming default tag is 'O'. |
|
|
Args: |
|
|
tags: a list of BIOES tags |
|
|
|
|
|
Returns: |
|
|
A list of dict with start_idx, end_idx, and type values. |
|
|
""" |
|
|
res = [] |
|
|
ent_idxs = [] |
|
|
cur_type = None |
|
|
|
|
|
def flush(): |
|
|
if len(ent_idxs) > 0: |
|
|
res.append({ |
|
|
'start': ent_idxs[0], |
|
|
'end': ent_idxs[-1], |
|
|
'type': cur_type}) |
|
|
|
|
|
for idx, tag in enumerate(tags): |
|
|
if tag is None: |
|
|
tag = 'O' |
|
|
if tag == 'O': |
|
|
flush() |
|
|
ent_idxs = [] |
|
|
elif tag.startswith('B-'): |
|
|
flush() |
|
|
ent_idxs = [idx] |
|
|
cur_type = tag[2:] |
|
|
elif tag.startswith('I-'): |
|
|
ent_idxs.append(idx) |
|
|
cur_type = tag[2:] |
|
|
elif tag.startswith('E-'): |
|
|
ent_idxs.append(idx) |
|
|
cur_type = tag[2:] |
|
|
flush() |
|
|
ent_idxs = [] |
|
|
elif tag.startswith('S-'): |
|
|
flush() |
|
|
ent_idxs = [idx] |
|
|
cur_type = tag[2:] |
|
|
flush() |
|
|
ent_idxs = [] |
|
|
|
|
|
flush() |
|
|
return res |
|
|
|
|
|
|
|
|
def merge_tags(*sequences): |
|
|
""" |
|
|
Merge multiple sequences of NER tags into one sequence |
|
|
|
|
|
Only O is replaced, and the earlier tags have precedence |
|
|
""" |
|
|
tags = list(sequences[0]) |
|
|
for sequence in sequences[1:]: |
|
|
idx = 0 |
|
|
while idx < len(sequence): |
|
|
|
|
|
if sequence[idx] == 'O': |
|
|
idx += 1 |
|
|
continue |
|
|
|
|
|
|
|
|
if sequence[idx].startswith("S-"): |
|
|
if tags[idx] == 'O': |
|
|
tags[idx] = sequence[idx] |
|
|
idx += 1 |
|
|
continue |
|
|
|
|
|
|
|
|
if not sequence[idx].startswith("B-"): |
|
|
raise ValueError("Got unexpected tag sequence at idx {}: {}".format(idx, sequence)) |
|
|
|
|
|
|
|
|
start_idx = idx |
|
|
end_idx = start_idx + 1 |
|
|
while end_idx < len(sequence): |
|
|
if sequence[end_idx][2:] != sequence[start_idx][2:]: |
|
|
raise ValueError("Unexpected tag sequence at idx {}: {}".format(end_idx, sequence)) |
|
|
if sequence[end_idx].startswith("E-"): |
|
|
break |
|
|
if not sequence[end_idx].startswith("I-"): |
|
|
raise ValueError("Unexpected tag sequence at idx {}: {}".format(end_idx, sequence)) |
|
|
end_idx += 1 |
|
|
if end_idx == len(sequence): |
|
|
raise ValueError("Got a sequence with an unclosed tag: {}".format(sequence)) |
|
|
end_idx = end_idx + 1 |
|
|
|
|
|
|
|
|
|
|
|
if all(x == 'O' for x in tags[start_idx:end_idx]): |
|
|
tags[start_idx:end_idx] = sequence[start_idx:end_idx] |
|
|
idx = end_idx |
|
|
|
|
|
return tags |
|
|
|