File size: 12,665 Bytes
4095301 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 |
import argparse
import opencc
import csv
import editdistance
from tqdm import tqdm
from collections import defaultdict
from functools import partial
from pypinyin import pinyin, lazy_pinyin, Style
from g2p_en import G2p # too slow, should use lexicon instead
import edit_distance
lexicon_fpath = '/root/distil-whisper/utils/lexicon.lst'
def cal_complete_mer(ref_data, hyp_data):
S, D, I, N = (0, 0, 0, 0)
count = 0
for ref, hyp in zip(ref_data, hyp_data):
_S, _D, _I, _N = cal_single_complete_mer(ref, hyp)
S += _S
D += _D
I += _I
N += _N
count += 1
return S, D, I, N, count
def cal_single_complete_mer(ref, hyp):
sm = edit_distance.SequenceMatcher(a=ref, b=hyp)
opcodes = sm.get_opcodes()
# Substitution
s = sum([(max(x[2] - x[1], x[4] - x[3]) if x[0] == 'replace' else 0) for x in opcodes])
# Deletion
d = sum([(max(x[2] - x[1], x[4] - x[3]) if x[0] == 'delete' else 0) for x in opcodes])
# Insertion
i = sum([(max(x[2] - x[1], x[4] - x[3]) if x[0] == 'insert' else 0) for x in opcodes])
n = len(ref)
return s, d, i, n
class MixErrorRate(object):
def __init__(
self,
to_simplified_chinese=True,
to_traditional_chinese=False,
phonemize=False,
separate_language=False,
test_only=False,
count_repetitive_hallucination=False,
calculate_complete_mer=False
):
self.converter = None
if to_simplified_chinese and to_traditional_chinese:
raise ValueError("Can't convert to both simplified and traditional chinese at the same time.")
if to_simplified_chinese:
print("Convert to simplified chinese")
self.converter = opencc.OpenCC('t2s.json')
elif to_traditional_chinese:
print("Convert to traditional chinese")
self.converter = opencc.OpenCC('s2t.json')
else:
print("No chinese conversion")
if phonemize:
if separate_language:
raise NotImplementedError("Can't separate language and phonemize at the same time.")
print("Phonemize chinese and english words")
print("Force traditional to simplified conversion")
self.converter = opencc.OpenCC('t2s.json')
self.zh_phonemizer = partial(lazy_pinyin, style=Style.BOPOMOFO, errors='ignore')
self.zh_bopomofo_stress_marks = ['ˊ', 'ˇ', 'ˋ', '˙']
# self.en_phonemizer = G2p()
# self.en_valid_phonemes = [p for p in self.en_phonemizer.phonemes]
# for p in self.en_phonemizer.phonemes:
# if p[-1].isnumeric():
# self.en_valid_phonemes.append(p[:-1])
# use lexicon instead
self.en_wrd2phn = defaultdict(lambda: [])
with open(lexicon_fpath, 'r', encoding='utf-8') as f:
for line in f:
word, phonemes = line.strip().split('\t')
self.en_wrd2phn[word] = phonemes.split()
self.phonemize = phonemize
self.test_only = test_only
self.separate_language = separate_language
self.count_repetitive_hallucination = count_repetitive_hallucination
self.calculate_complete_mer = calculate_complete_mer
if self.count_repetitive_hallucination:
print("Count repetitive hallucination (6gram-5repeat)")
def _from_str_to_list(self, cs_string):
cs_list = []
cur_en_word = ''
for s in cs_string:
# if is space, skip it
if s in [' ', '\t', '\n', '\r', ',', '.', '!', '?', '。', ',', '!', '?', '、', ';', ':', '「', '」', '『', '』', '(', ')', '(', ')', '\[', '\]', '{', '}', '<', '>', '《', '》', '“', '”', '‘', '’', '…', '—', '~', '·', '•']:
if cur_en_word != '':
cs_list.append(cur_en_word)
cur_en_word = ''
continue
# if it chinese character, add it to list
if u'\u4e00' <= s <= u'\u9fff':
if cur_en_word != '':
cs_list.append(cur_en_word)
cur_en_word = ''
if self.converter is not None:
s = self.converter.convert(s)
cs_list.append(s)
# check character, if it is english character, add it to current word
elif s.isalnum() or s in ["'", "-"]:
cur_en_word += s
else:
print(f"Unknown character during conversion: {s}")
if cur_en_word != '':
cs_list.append(cur_en_word)
return cs_list
def _unit_is_en(self, token):
if u'\u4e00' <= token[0] <= u'\u9fff':
return False
return True
def _unit_is_zh(self, token):
if u'\u4e00' <= token[0] <= u'\u9fff':
return True
return False
def _phonemized_cs_list(self, cs_list):
cur_zh_chars = []
phonemes = []
for unit in cs_list:
if u'\u4e00' <= unit[0] <= u'\u9fff':
cur_zh_chars.append(unit)
else:
if cur_zh_chars:
zh_phns = ''.join(self.zh_phonemizer(''.join(cur_zh_chars)))
phonemes.extend(filter(lambda p: p not in self.zh_bopomofo_stress_marks, zh_phns))
cur_zh_chars = []
phonemes.extend(self.en_wrd2phn[unit])
if cur_zh_chars:
zh_phns = ''.join(self.zh_phonemizer(''.join(cur_zh_chars)))
phonemes.extend(filter(lambda p: p not in self.zh_bopomofo_stress_marks, zh_phns))
cur_zh_chars = []
return phonemes
def _count_repetitive_hallucination(self, cs_str, n=6, repeat=5, reset_len=100):
count = 0
ngram_counts = defaultdict(lambda: 0)
if len(cs_str) < n:
return 0
prev_reset_idx = 0
for i in range(len(cs_str) - n + 1):
ngram = cs_str[i:i+n]
if '|>' in ngram or '<|' in ngram: continue
ngram_counts[ngram] += 1
if ngram_counts[ngram] >= repeat:
count += 1
# reset for next round calculation
ngram_counts = defaultdict(lambda: 0)
if i - prev_reset_idx >= reset_len:
ngram_counts = defaultdict(lambda: 0)
prev_reset_idx = i
return count
def compute(self, predictions=None, references=None, show_progress=True, empty_error_rate=1.0, **kwargs):
total_err = 0
total_ref_len = 0
total_en_err = 0
total_en_ref_len = 0
total_zh_err = 0
total_zh_ref_len = 0
repetitive_hallucination_count = 0
ref_repetitive_hallucination_count = 0
if self.test_only:
predictions = predictions[:10]
references = references[:10]
if self.calculate_complete_mer:
S, D, I, N = 0, 0, 0, 0
iterator = tqdm(enumerate(zip(predictions, references)), total=len(predictions), desc="Computing Mix Error Rate...") if show_progress and len(predictions) > 20 else enumerate(zip(predictions, references))
for i, (pred, ref) in iterator:
# if english use word error rate, if chinese use character error rate
# generate list for editdistance computation
if self.count_repetitive_hallucination:
repetitive_hallucination_count += self._count_repetitive_hallucination(pred)
ref_repetitive_hallucination_count += self._count_repetitive_hallucination(ref)
pred_list = self._from_str_to_list(pred)
ref_list = self._from_str_to_list(ref)
if self.test_only:
print(f"Prediction List First 20@{i}: {pred_list[:20]}")
print(f"Reference List First 20@{i}: {ref_list[:20]}")
if self.phonemize:
pred_list = self._phonemized_cs_list(pred_list)
ref_list = self._phonemized_cs_list(ref_list)
if self.calculate_complete_mer:
_S, _D, _I, _N = cal_single_complete_mer(ref_list, pred_list)
S += _S
D += _D
I += _I
N += _N
# compute edit distance
if self.separate_language:
en_pred_list = list(filter(self._unit_is_en, pred_list))
en_ref_list = list(filter(self._unit_is_en, ref_list))
zh_pred_list = list(filter(self._unit_is_zh, pred_list))
zh_ref_list = list(filter(self._unit_is_zh, ref_list))
en_err = editdistance.eval(en_pred_list, en_ref_list)
total_en_err += en_err
total_en_ref_len += len(en_ref_list)
zh_err = editdistance.eval(zh_pred_list, zh_ref_list)
total_zh_err += zh_err
total_zh_ref_len += len(zh_ref_list)
err = editdistance.eval(pred_list, ref_list)
total_err += err
total_ref_len += len(ref_list)
if self.test_only:
local_mer = {
"MER": err / len(ref_list),
"EN WER": en_err / len(en_ref_list) if len(en_ref_list) != 0 else 0,
"ZH CER": zh_err / len(zh_ref_list) if len(zh_ref_list) != 0 else 0
}
print(f"Local MER@{i}: {local_mer}")
if total_ref_len == 0:
print(f"No reference found, return {empty_error_rate*100}% error rate instead")
return empty_error_rate # if no reference, return 100% error rate instead
mer = total_err / total_ref_len
if self.separate_language or self.count_repetitive_hallucination:
result = {
"MER": mer,
}
if self.separate_language:
en_wer = total_en_err / total_en_ref_len if total_en_ref_len != 0 else 0
zh_cer = total_zh_err / total_zh_ref_len if total_zh_ref_len != 0 else 0
result["EN WER"] = en_wer
result["ZH CER"] = zh_cer
if self.count_repetitive_hallucination:
result["Hyp Repetitive Hallucination Count"] = repetitive_hallucination_count
result["Ref Repetitive Hallucination Count"] = ref_repetitive_hallucination_count
return result
if self.calculate_complete_mer:
print(f"SUB={S/N}, DEL={D/N}, INS={I/N}, (S, D, I, N)={(S, D, I, N)}, total_len={total_ref_len}")
return mer # mer
def load_output_csv(fpath, skip_header=True, hyp_col=1, ref_col=2, delimiter='\t'):
with open(fpath, 'r', encoding='utf-8') as f:
reader = csv.reader(f, delimiter=delimiter)
predictions = []
references = []
if skip_header:
columns = next(reader)
print(f"Columns: {columns}")
for i, row in enumerate(reader):
predictions.append(row[hyp_col])
references.append(row[ref_col])
return predictions, references
def main(args):
print(args)
mer = MixErrorRate(
to_simplified_chinese=args.to_simplified_chinese,
to_traditional_chinese=args.to_traditional_chinese,
separate_language=args.separate_language,
test_only=args.test_only,
count_repetitive_hallucination=args.count_repetitive_hallucination,
calculate_complete_mer=args.calculate_complete_mer
)
predictions, references = load_output_csv(args.csv_fpath)
mer_value = mer.compute(predictions=predictions, references=references)
print(f"Mix Error Rate: {mer_value}")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Compute Mix Error Rate")
parser.add_argument("--csv_fpath", type=str, required=True, help="Path to the csv file")
parser.add_argument("--to_simplified_chinese", action="store_true", help="Convert chinese to simplified chinese")
parser.add_argument("--to_traditional_chinese", action="store_true", help="Convert chinese to traditional chinese")
parser.add_argument("--separate_language", action="store_true", help="Compute MER separately for chinese and english")
parser.add_argument("--test_only", action="store_true", help="Run test and give some cs_list examples")
parser.add_argument("--count_repetitive_hallucination", action="store_true", help="Count repetitive hallucination")
parser.add_argument("--calculate_complete_mer", action="store_true", help="Calculate complete MER")
args = parser.parse_args()
main(args)
|