Albin Thörn Cleland
Clean initial commit with LFS
19b8775
"""
Utility functions for dealing with NER tagging.
"""
import logging
from stanza.models.common.vocab import EMPTY
logger = logging.getLogger('stanza')
EMPTY_TAG = ('_', '-', '', None)
EMPTY_OR_O_TAG = tuple(list(EMPTY_TAG) + ['O'])
def is_basic_scheme(all_tags):
"""
Check if a basic tagging scheme is used. Return True if so.
Args:
all_tags: a list of NER tags
Returns:
True if the tagging scheme does not use B-, I-, etc, otherwise False
"""
for tag in all_tags:
if len(tag) > 2 and tag[:2] in ('B-', 'I-', 'S-', 'E-', 'B_', 'I_', 'S_', 'E_'):
return False
return True
def is_bio_scheme(all_tags):
"""
Check if BIO tagging scheme is used. Return True if so.
Args:
all_tags: a list of NER tags
Returns:
True if the tagging scheme is BIO, otherwise False
"""
for tag in all_tags:
if tag in EMPTY_OR_O_TAG:
continue
elif len(tag) > 2 and tag[:2] in ('B-', 'I-', 'B_', 'I_'):
continue
else:
return False
return True
def to_bio2(tags):
"""
Convert the original tag sequence to BIO2 format. If the input is already in BIO2 format,
the original input is returned.
Args:
tags: a list of tags in either BIO or BIO2 format
Returns:
new_tags: a list of tags in BIO2 format
"""
new_tags = []
for i, tag in enumerate(tags):
if tag in EMPTY_OR_O_TAG:
new_tags.append(tag)
elif tag[0] == 'I':
if i == 0 or tags[i-1] == 'O' or tags[i-1][1:] != tag[1:]:
new_tags.append('B' + tag[1:])
else:
new_tags.append(tag)
else:
new_tags.append(tag)
return new_tags
def basic_to_bio(tags):
"""
Convert a basic tag sequence into a BIO sequence.
You can compose this with bio2_to_bioes to convert to bioes
Args:
tags: a list of tags in basic (no B-, I-, etc) format
Returns:
new_tags: a list of tags in BIO format
"""
new_tags = []
for i, tag in enumerate(tags):
if tag in EMPTY_OR_O_TAG:
new_tags.append(tag)
elif i == 0 or tags[i-1] == 'O' or tags[i-1] != tag:
new_tags.append('B-' + tag)
else:
new_tags.append('I-' + tag)
return new_tags
def bio2_to_bioes(tags):
"""
Convert the BIO2 tag sequence into a BIOES sequence.
Args:
tags: a list of tags in BIO2 format
Returns:
new_tags: a list of tags in BIOES format
"""
new_tags = []
for i, tag in enumerate(tags):
if tag in EMPTY_OR_O_TAG:
new_tags.append(tag)
else:
if len(tag) < 2:
raise Exception(f"Invalid BIO2 tag found: {tag}")
else:
if tag[:2] in ('I-', 'I_'): # convert to E- if next tag is not I-
if i+1 < len(tags) and tags[i+1][:2] in ('I-', 'I_'):
new_tags.append('I-' + tag[2:]) # compensate for underscores
else:
new_tags.append('E-' + tag[2:])
elif tag[:2] in ('B-', 'B_'): # convert to S- if next tag is not I-
if i+1 < len(tags) and tags[i+1][:2] in ('I-', 'I_'):
new_tags.append('B-' + tag[2:])
else:
new_tags.append('S-' + tag[2:])
else:
raise Exception(f"Invalid IOB tag found: {tag}")
return new_tags
def normalize_empty_tags(sentences):
"""
If any tags are None, _, -, or blank, turn them into EMPTY
The input should be a list(sentence) of list(word) of tuple(text, list(tag))
which is the typical format for the data at the time data.py is preprocessing the tags
"""
new_sentences = [[(word[0], tuple(EMPTY if x in EMPTY_TAG else x for x in word[1])) for word in sentence]
for sentence in sentences]
return new_sentences
def process_tags(sentences, scheme):
"""
Convert tags in these sentences to bioes
We allow empty tags ('_', '-', None), which will represent tags
that do not get any gradient when training
"""
all_words = []
all_tags = []
converted_tuples = False
for sent_idx, sent in enumerate(sentences):
words, tags = zip(*sent)
all_words.append(words)
# if we got one dimension tags w/o tuples or lists, make them tuples
# but we also check that the format is consistent,
# as otherwise the result being converted might be confusing
if not converted_tuples and any(tag is None or isinstance(tag, str) for tag in tags):
if sent_idx > 0:
raise ValueError("Got a mix of tags and lists of tags. First non-list was in sentence %d" % sent_idx)
converted_tuples = True
if converted_tuples:
if not all(tag is None or isinstance(tag, str) for tag in tags):
raise ValueError("Got a mix of tags and lists of tags. First tag as a list was in sentence %d" % sent_idx)
tags = [(tag,) for tag in tags]
all_tags.append(tags)
max_columns = max(len(x) for tags in all_tags for x in tags)
for sent_idx, tags in enumerate(all_tags):
if any(len(x) < max_columns for x in tags):
raise ValueError("NER tags not uniform in length at sentence %d. TODO: extend those columns with O" % sent_idx)
all_convert_bio_to_bioes = []
all_convert_basic_to_bioes = []
for column_idx in range(max_columns):
# check if tag conversion is needed for each column
# we treat each column separately, although practically
# speaking it would be pretty weird for a dataset to have BIO
# in one column and basic in another, for example
convert_bio_to_bioes = False
convert_basic_to_bioes = False
tag_column = [x[column_idx] for sent in all_tags for x in sent]
is_bio = is_bio_scheme(tag_column)
is_basic = not is_bio and is_basic_scheme(tag_column)
if is_bio and scheme.lower() == 'bioes':
convert_bio_to_bioes = True
logger.debug("BIO tagging scheme found in input at column %d; converting into BIOES scheme..." % column_idx)
elif is_basic and scheme.lower() == 'bioes':
convert_basic_to_bioes = True
logger.debug("Basic tagging scheme found in input at column %d; converting into BIOES scheme..." % column_idx)
all_convert_bio_to_bioes.append(convert_bio_to_bioes)
all_convert_basic_to_bioes.append(convert_basic_to_bioes)
result = []
for words, tags in zip(all_words, all_tags):
# TODO: add a convert_basic_to_bio option as well
# process tags
# tags is a list of each column of tags for each word in this sentence
# copy the tags to a list so we can edit them
tags = [[x for x in sentence_tags] for sentence_tags in tags]
for column_idx, (convert_bio_to_bioes, convert_basic_to_bioes) in enumerate(zip(all_convert_bio_to_bioes, all_convert_basic_to_bioes)):
tag_column = [x[column_idx] for x in tags]
if convert_basic_to_bioes:
# if basic, convert tags -> bio -> bioes
tag_column = bio2_to_bioes(basic_to_bio(tag_column))
else:
# first ensure BIO2 scheme
tag_column = to_bio2(tag_column)
# then convert to BIOES
if convert_bio_to_bioes:
tag_column = bio2_to_bioes(tag_column)
for tag_idx, tag in enumerate(tag_column):
tags[tag_idx][column_idx] = tag
result.append([(w,tuple(t)) for w,t in zip(words, tags)])
if converted_tuples:
result = [[(word[0], word[1][0]) for word in sentence] for sentence in result]
return result
def decode_from_bioes(tags):
"""
Decode from a sequence of BIOES tags, assuming default tag is 'O'.
Args:
tags: a list of BIOES tags
Returns:
A list of dict with start_idx, end_idx, and type values.
"""
res = []
ent_idxs = []
cur_type = None
def flush():
if len(ent_idxs) > 0:
res.append({
'start': ent_idxs[0],
'end': ent_idxs[-1],
'type': cur_type})
for idx, tag in enumerate(tags):
if tag is None:
tag = 'O'
if tag == 'O':
flush()
ent_idxs = []
elif tag.startswith('B-'): # start of new ent
flush()
ent_idxs = [idx]
cur_type = tag[2:]
elif tag.startswith('I-'): # continue last ent
ent_idxs.append(idx)
cur_type = tag[2:]
elif tag.startswith('E-'): # end last ent
ent_idxs.append(idx)
cur_type = tag[2:]
flush()
ent_idxs = []
elif tag.startswith('S-'): # start single word ent
flush()
ent_idxs = [idx]
cur_type = tag[2:]
flush()
ent_idxs = []
# flush after whole sentence
flush()
return res
def merge_tags(*sequences):
"""
Merge multiple sequences of NER tags into one sequence
Only O is replaced, and the earlier tags have precedence
"""
tags = list(sequences[0])
for sequence in sequences[1:]:
idx = 0
while idx < len(sequence):
# skip empty tags in the later sequences
if sequence[idx] == 'O':
idx += 1
continue
# check for singletons. copy if not O in the original
if sequence[idx].startswith("S-"):
if tags[idx] == 'O':
tags[idx] = sequence[idx]
idx += 1
continue
# at this point, we know we have a B-... sequence
if not sequence[idx].startswith("B-"):
raise ValueError("Got unexpected tag sequence at idx {}: {}".format(idx, sequence))
# take the block of tags which are B- through E-
start_idx = idx
end_idx = start_idx + 1
while end_idx < len(sequence):
if sequence[end_idx][2:] != sequence[start_idx][2:]:
raise ValueError("Unexpected tag sequence at idx {}: {}".format(end_idx, sequence))
if sequence[end_idx].startswith("E-"):
break
if not sequence[end_idx].startswith("I-"):
raise ValueError("Unexpected tag sequence at idx {}: {}".format(end_idx, sequence))
end_idx += 1
if end_idx == len(sequence):
raise ValueError("Got a sequence with an unclosed tag: {}".format(sequence))
end_idx = end_idx + 1
# if all tags in the original are O, we can overwrite
# otherwise, keep the originals
if all(x == 'O' for x in tags[start_idx:end_idx]):
tags[start_idx:end_idx] = sequence[start_idx:end_idx]
idx = end_idx
return tags