Spaces:
Running
Running
File size: 13,285 Bytes
2c42716 65b4068 2c42716 361e944 2c42716 361e944 2c42716 65b4068 2c42716 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 | import numpy as np
import json, subprocess, librosa
import matplotlib.pyplot as plt
import matplotlib.colors as colors
from pydub import AudioSegment
# based on captinifeedback.py
# with extra/experimental visual outputs
# for huggingface internal demo
class FeedbackConverter():
def __init__(self, task_key_path, phone_key_path, lower_bound_100, upper_bound_100, not_scored_value = "TOO SHORT TO SCORE"):
self.task_key_path = task_key_path
self.phone_key_path = phone_key_path
self.lower_bound_100 = lower_bound_100
self.upper_bound_100 = upper_bound_100
self.not_scored_value = not_scored_value
self.range_100 = self.upper_bound_100 - self.lower_bound_100
try:
with open(phone_key_path,'r') as handle:
phone_key = handle.read().splitlines()
phone_key=[l.split('\t') for l in phone_key]
self.phone_key = {phone : float(binary_threshold) for phone, binary_threshold in phone_key}
with open(task_key_path,'r') as handle:
self.task_key = json.load(handle)
except:
raise Exception(f"At least one of the score key files {task_key_path} or {phone_key_path} couldn't be loaded.")
# feedback for task-based scoring -----
def scale_binary_task(self,raw_score,unit,task_id):
if raw_score == self.not_scored_value:
return 1 # 1: be generous in case not scored
elif raw_score >= self.task_key[task_id][unit]:
return 1 # 1: above threshold, correct pronunciation
else:
return 0 # 0: below threshold, mispronunciation
def b_list_task(self,scores_list,unit,task_id):
return [(label, self.scale_binary_task(score,unit,task_id)) for label,score in scores_list]
# scale score from interval [-1,1] to integers [0,100]
# alternately could replace this with % of phones correct.
def scale_100(self,raw_score):
if raw_score == self.not_scored_value:
return 100 # consider smoothing a different way if this ends up used
elif raw_score <= self.lower_bound_100:
return 0
elif raw_score >= self.upper_bound_100:
return 100
else:
rescaled_score = (raw_score - self.lower_bound_100) / self.range_100
return round(100*rescaled_score)
# heuristics?
# return 1 (correct) for phones/words that are too short to score,
# EXCEPT when a word has score 0 and all phones in that word are too short,
# then return 0 for all of that word's phones.
# also, if a word has score 0 but all individual phones have binary score 1
# (as a real score, not when they are all too short),
# CHANGE the lowest phone score to 0 so there is some corrective feedback
# TODO turn that part off it if overcorrects native speakers
def wordfix(self,word_phone_scores, word_score, task_id):
if word_score == 1:
return self.b_list_task(word_phone_scores,'phone',task_id)
elif all([sc == self.not_scored_value for ph,sc in word_phone_scores]):
return [(ph, 0) for ph,sc in word_phone_scores ]
else:
bin_scores = self.b_list_task(word_phone_scores,'phone',task_id)
if all([sc == 1 for ph,sc in bin_scores]):
sc_list = [1 if sc == self.not_scored_value
else sc for ph,sc in word_phone_scores]
min_ix = sc_list.index(min(sc_list))
bin_scores[min_ix] = (bin_scores[min_ix][0],0)
return bin_scores
# feedback for fallback phone scoring -----
def scale_binary_monophone(self,raw_score,phone_id):
if raw_score == self.not_scored_value:
return 1
elif raw_score >= self.phone_key[phone_id]:
return 1
else:
return 0
def b_list_monophone(self,scores_list):
return [(label, self.scale_binary_monophone(score,label)) for label,score in scores_list]
# score word 0 if any phone is 0, else 1
# TODO may cause overcorrection of native speakers,
# or confusing inconsistency with 0-100 task score,
# consider word score by average of phone raw scores instead
def b_wordfromphone(self,phone_bins):
return [( word, min([b for p,b in b_phones]) ) for word, b_phones in phone_bins]
# yield score out of 100 as percent of phones correct
def scale_100_monophone(self,phone_bins):
plist = []
for w, b_phones in phone_bins:
plist += [b for p,b in b_phones]
return int(100*np.nanmean(plist))
### -------- some colour printing....
# sort into 3 colours for printing
# good, mispronounced, unable to score
def phone_3sort_monophone(self,raw_score,phone_id):
if raw_score == self.not_scored_value:
return -1, phone_id
elif raw_score >= self.phone_key[phone_id]:
return 1, phone_id
else:
return 0, phone_id
def phone_3sort_task(self,raw_score,unit,task_id, label):
if raw_score == self.not_scored_value:
return -1, label
elif raw_score >= self.task_key[task_id][unit]:
return 1, label
else:
return 0, label
# put out html
def hc_from_3(self, scoretype, pcontent):
if scoretype == -1: # not scored value
return f"<span style='color:#BBBBBB;'>{pcontent}</span>"
elif scoretype == 1: # correct
return f"<span style='color:#0000FF;'>{pcontent}</span>"
elif scoretype == 0: # wrong
return f"<span style='color:#FF0000;'>{pcontent}</span>"
else: # error
return f"<span>{pcontent}</span>"
def c3_list_monophone(self,scores_list):
#return ''.join([ hc_from_3(self.phone_3sort_monophone(score,label)) for label,score in scores_list ])
return [self.phone_3sort_monophone(score,label) for label,score in scores_list]
def c3_list_task(self,scores_list,unit,task_id):
#return ''.join([ hc_from_3(self.phone_3sort_task(score,unit,task_id,label)) for label,score in scores_list])
return [self.phone_3sort_task(score,unit,task_id,label) for label,score in scores_list]
# output is:
# - one score 0-100 for the entire task
# - a score 0/1 for each word
# - a score 0/1 for each phone
def convert(self,word_scores,phone_scores,task_id):
if task_id in self.task_key.keys(): # score with full task model
task_fb = self.scale_100( np.nanmean([sc for wd,sc in word_scores if sc != self.not_scored_value]
or 1) )
word_fb = self.b_list_task(word_scores,'word',task_id)
phone_fb = [(p_sc[0], self.wordfix(p_sc[1],w_fb[1],task_id) )
for w_fb, p_sc in zip(word_fb,phone_scores)]
phone_fb2 = [(p_sc[0], self.c3_list_task(p_sc[1],'phone',task_id) )
for w_fb, p_sc in zip(word_fb,phone_scores)]
else: # score with fallback monophone model
phone_fb = [(p_sc[0], self.b_list_monophone(p_sc[1]) ) for p_sc in phone_scores]
word_fb = self.b_wordfromphone(phone_fb)
task_fb = self.scale_100_monophone(phone_fb)
phone_fb2 = [(p_sc[0], self.c3_list_monophone(p_sc[1]) ) for p_sc in phone_scores]
#return(task_fb, word_fb, phone_fb)
return(task_fb, word_fb, phone_fb2)
# ----------------------- stuff for visual .......
# TODO 2pass...
def get_pitch_tracks(self,sound_path):
reaper_exec = "/home/user/app/REAPER/build/reaper"
orig_ftype = sound_path.split('.')[-1]
if orig_ftype == '.wav':
wav_path = sound_path
else:
aud_data = AudioSegment.from_file(sound_path, orig_ftype)
curdir = subprocess.run(["pwd"], capture_output=True, text=True)
curdir = curdir.stdout.splitlines()[0]
fname = sound_path.split('/')[-1].replace(orig_ftype,'')
tmp_path = f'{curdir}/{fname}_tmp.wav'
aud_data.export(tmp_path, format="wav")
wav_path = tmp_path
f0_data = subprocess.run([reaper_exec, "-i", wav_path, '-f', '/dev/stdout', '-a'],capture_output=True).stdout
f0_data = f0_data.decode()
f0_data = f0_data.split('EST_Header_End\n')[1].splitlines()
f0_data = [l.split(' ') for l in f0_data]
f0_data = [l for l in f0_data if len(l) == 3] # the last line or 2 lines are other info, different format
f0_data = [ [float(t), float(f)] for t,v,f in f0_data if v=='1']
if orig_ftype != '.wav':
subprocess.run(["rm", tmp_path])
return f0_data
# display colour corresponding to a gradient score per phone
def generate_graphic_feedback_blocks(self,phone_scores):
plt.close('all')
phone_scores = [phs for wrd, phs in phone_scores]
phone_scores = [lc for phs in phone_scores for lc in phs]
phone_scores = [[p,np.nan] if c == self.not_scored_value else [p,c] for p,c in phone_scores]
for i in range(len(phone_scores)):
if np.isnan(phone_scores[i][1]):
prev_c = phone_scores[max(i-1,0)][1] # would be nan only in case when i==0
j = min(i+1,len(phone_scores)-1)
next_c = np.nan
while (np.isnan(next_c) and j < len(phone_scores)):
next_c = phone_scores[j][1]
j += 1
# at least one of these has value unless the entire stimulus is nan score
phone_scores[i][1] = np.nanmean([prev_c, next_c])
fig, axs = plt.subplots( figsize=(7, 1.5 ))
#plt.gca().set_aspect(1)
plt.ylim(-1,1.5)
axs.tick_params(left=False, bottom=False, labelleft=False, labelbottom=False)
axs.pcolormesh([[c for p,c in phone_scores]],
cmap="rainbow_r", norm=colors.Normalize(vmin=self.lower_bound_100-0.01, vmax=self.upper_bound_100+0.01,clip=True ))
#cmap="plasma")
fig.tight_layout()
for phi, pinfo in enumerate(phone_scores):
plt.text(phi+0.5,-0.5,pinfo[0], ha='center',va='center',color='black',size=12)#, rotation=rt)
return fig
# TODO:
# add subphone / frame level DTW feedback shading?
def generate_graphic_feedback_0(self, sound_path, word_aligns, phone_aligns, phone_feedback, opts):
plt.close('all')
rec_start = word_aligns[0][1]
rec_end = word_aligns[-1][2]
f0_data = self.get_pitch_tracks(sound_path)
if f0_data:
f_max = max([f0 for t,f0 in f0_data]) + 50
else:
f_max = 400
fig, axes1 = plt.subplots(figsize=(15,3))
plt.xlim([rec_start, rec_end])
axes1.set_ylim([0.0, f_max])
axes1.get_xaxis().set_visible(False)
for w,s,e in word_aligns:
plt.vlines(s,0,f_max,linewidth=0.5,color='black')
plt.vlines(e,0,f_max,linewidth=0.5,color='dimgrey')
#plt.text( (s+e)/2 - (len(w)*.01), f_max+15, w, fontsize=15)
plt.text( (s+e)/2, f_max+15, w.split('__')[1], fontsize=15, ha="center")
# arrange aligns for graphs...
phone_aligns = [(wrd,phs) for wrd, phs in phone_aligns.items()]
phone_aligns = sorted(phone_aligns, key = lambda x: x[0][:3])
phone_amends = zip([s for w,s,e in word_aligns], [phs for wrd, phs in phone_aligns])
phone_aligns = [[(p, s+offset, e+offset) for p,s,e in wphones] for offset, wphones in phone_amends]
phone_aligns = [p for wrps in phone_aligns for p in wrps]
phone_feedback = [phs for wrd, phs in phone_feedback]
phone_feedback = [p for wrps in phone_feedback for p in wrps]
phone_infos = zip(phone_aligns, phone_feedback)
# basic 3way phone to colour key
#cdict = {-1: 'gray', 0: 'red', 1: 'blue'}
cdict = {-1: 'gray', 0: "#E85907", 1: "#26701C"}
for paln, pfbk in phone_infos:
ph_id, s, e = paln
c, p = pfbk
plt.vlines(s,0,f_max,linewidth=0.3,color='cadetblue',linestyle=(0,(10,4)))
plt.vlines(e,0,f_max,linewidth=0.3,color='cadetblue',linestyle=(0,(10,4)))
plt.text( (s+e)/2 - (len(p)*.01), -1*f_max/10, p, fontsize=18, color = cdict[c])#color='teal')
#f0c = "blueviolet"
#enc = 'peachpuff'
f0c = "#88447F"
enc = "#F49098"
axes1.scatter([t for t,f0 in f0_data], [f0 for t,f0 in f0_data], color=f0c)
# add rmse
w, sr = librosa.load(sound_path)
fr_l = 2048 # librosa default
h_l = 512 # default
rmse = librosa.feature.rms(y=w, frame_length = fr_l, hop_length = h_l)
rmse = rmse[0]
# show rms energy, only if opts
if opts:
axes2 = axes1.twinx()
axes2.set_ylim([0.0, 0.5])
rms_xval = [(h_l*i)/sr for i in range(len(rmse))]
axes2.plot(rms_xval,rmse,color=enc,linewidth=3.5)
fig.tight_layout()
return fig
|