import numpy as np import json, subprocess, librosa import matplotlib.pyplot as plt import matplotlib.colors as colors from pydub import AudioSegment # based on captinifeedback.py # with extra/experimental visual outputs # for huggingface internal demo class FeedbackConverter(): def __init__(self, task_key_path, phone_key_path, lower_bound_100, upper_bound_100, not_scored_value = "TOO SHORT TO SCORE"): self.task_key_path = task_key_path self.phone_key_path = phone_key_path self.lower_bound_100 = lower_bound_100 self.upper_bound_100 = upper_bound_100 self.not_scored_value = not_scored_value self.range_100 = self.upper_bound_100 - self.lower_bound_100 try: with open(phone_key_path,'r') as handle: phone_key = handle.read().splitlines() phone_key=[l.split('\t') for l in phone_key] self.phone_key = {phone : float(binary_threshold) for phone, binary_threshold in phone_key} with open(task_key_path,'r') as handle: self.task_key = json.load(handle) except: raise Exception(f"At least one of the score key files {task_key_path} or {phone_key_path} couldn't be loaded.") # feedback for task-based scoring ----- def scale_binary_task(self,raw_score,unit,task_id): if raw_score == self.not_scored_value: return 1 # 1: be generous in case not scored elif raw_score >= self.task_key[task_id][unit]: return 1 # 1: above threshold, correct pronunciation else: return 0 # 0: below threshold, mispronunciation def b_list_task(self,scores_list,unit,task_id): return [(label, self.scale_binary_task(score,unit,task_id)) for label,score in scores_list] # scale score from interval [-1,1] to integers [0,100] # alternately could replace this with % of phones correct. def scale_100(self,raw_score): if raw_score == self.not_scored_value: return 100 # consider smoothing a different way if this ends up used elif raw_score <= self.lower_bound_100: return 0 elif raw_score >= self.upper_bound_100: return 100 else: rescaled_score = (raw_score - self.lower_bound_100) / self.range_100 return round(100*rescaled_score) # heuristics? # return 1 (correct) for phones/words that are too short to score, # EXCEPT when a word has score 0 and all phones in that word are too short, # then return 0 for all of that word's phones. # also, if a word has score 0 but all individual phones have binary score 1 # (as a real score, not when they are all too short), # CHANGE the lowest phone score to 0 so there is some corrective feedback # TODO turn that part off it if overcorrects native speakers def wordfix(self,word_phone_scores, word_score, task_id): if word_score == 1: return self.b_list_task(word_phone_scores,'phone',task_id) elif all([sc == self.not_scored_value for ph,sc in word_phone_scores]): return [(ph, 0) for ph,sc in word_phone_scores ] else: bin_scores = self.b_list_task(word_phone_scores,'phone',task_id) if all([sc == 1 for ph,sc in bin_scores]): sc_list = [1 if sc == self.not_scored_value else sc for ph,sc in word_phone_scores] min_ix = sc_list.index(min(sc_list)) bin_scores[min_ix] = (bin_scores[min_ix][0],0) return bin_scores # feedback for fallback phone scoring ----- def scale_binary_monophone(self,raw_score,phone_id): if raw_score == self.not_scored_value: return 1 elif raw_score >= self.phone_key[phone_id]: return 1 else: return 0 def b_list_monophone(self,scores_list): return [(label, self.scale_binary_monophone(score,label)) for label,score in scores_list] # score word 0 if any phone is 0, else 1 # TODO may cause overcorrection of native speakers, # or confusing inconsistency with 0-100 task score, # consider word score by average of phone raw scores instead def b_wordfromphone(self,phone_bins): return [( word, min([b for p,b in b_phones]) ) for word, b_phones in phone_bins] # yield score out of 100 as percent of phones correct def scale_100_monophone(self,phone_bins): plist = [] for w, b_phones in phone_bins: plist += [b for p,b in b_phones] return int(100*np.nanmean(plist)) ### -------- some colour printing.... # sort into 3 colours for printing # good, mispronounced, unable to score def phone_3sort_monophone(self,raw_score,phone_id): if raw_score == self.not_scored_value: return -1, phone_id elif raw_score >= self.phone_key[phone_id]: return 1, phone_id else: return 0, phone_id def phone_3sort_task(self,raw_score,unit,task_id, label): if raw_score == self.not_scored_value: return -1, label elif raw_score >= self.task_key[task_id][unit]: return 1, label else: return 0, label # put out html def hc_from_3(self, scoretype, pcontent): if scoretype == -1: # not scored value return f"{pcontent}" elif scoretype == 1: # correct return f"{pcontent}" elif scoretype == 0: # wrong return f"{pcontent}" else: # error return f"{pcontent}" def c3_list_monophone(self,scores_list): #return ''.join([ hc_from_3(self.phone_3sort_monophone(score,label)) for label,score in scores_list ]) return [self.phone_3sort_monophone(score,label) for label,score in scores_list] def c3_list_task(self,scores_list,unit,task_id): #return ''.join([ hc_from_3(self.phone_3sort_task(score,unit,task_id,label)) for label,score in scores_list]) return [self.phone_3sort_task(score,unit,task_id,label) for label,score in scores_list] # output is: # - one score 0-100 for the entire task # - a score 0/1 for each word # - a score 0/1 for each phone def convert(self,word_scores,phone_scores,task_id): if task_id in self.task_key.keys(): # score with full task model task_fb = self.scale_100( np.nanmean([sc for wd,sc in word_scores if sc != self.not_scored_value] or 1) ) word_fb = self.b_list_task(word_scores,'word',task_id) phone_fb = [(p_sc[0], self.wordfix(p_sc[1],w_fb[1],task_id) ) for w_fb, p_sc in zip(word_fb,phone_scores)] phone_fb2 = [(p_sc[0], self.c3_list_task(p_sc[1],'phone',task_id) ) for w_fb, p_sc in zip(word_fb,phone_scores)] else: # score with fallback monophone model phone_fb = [(p_sc[0], self.b_list_monophone(p_sc[1]) ) for p_sc in phone_scores] word_fb = self.b_wordfromphone(phone_fb) task_fb = self.scale_100_monophone(phone_fb) phone_fb2 = [(p_sc[0], self.c3_list_monophone(p_sc[1]) ) for p_sc in phone_scores] #return(task_fb, word_fb, phone_fb) return(task_fb, word_fb, phone_fb2) # ----------------------- stuff for visual ....... # TODO 2pass... def get_pitch_tracks(self,sound_path): reaper_exec = "/home/user/app/REAPER/build/reaper" orig_ftype = sound_path.split('.')[-1] if orig_ftype == '.wav': wav_path = sound_path else: aud_data = AudioSegment.from_file(sound_path, orig_ftype) curdir = subprocess.run(["pwd"], capture_output=True, text=True) curdir = curdir.stdout.splitlines()[0] fname = sound_path.split('/')[-1].replace(orig_ftype,'') tmp_path = f'{curdir}/{fname}_tmp.wav' aud_data.export(tmp_path, format="wav") wav_path = tmp_path f0_data = subprocess.run([reaper_exec, "-i", wav_path, '-f', '/dev/stdout', '-a'],capture_output=True).stdout f0_data = f0_data.decode() f0_data = f0_data.split('EST_Header_End\n')[1].splitlines() f0_data = [l.split(' ') for l in f0_data] f0_data = [l for l in f0_data if len(l) == 3] # the last line or 2 lines are other info, different format f0_data = [ [float(t), float(f)] for t,v,f in f0_data if v=='1'] if orig_ftype != '.wav': subprocess.run(["rm", tmp_path]) return f0_data # display colour corresponding to a gradient score per phone def generate_graphic_feedback_blocks(self,phone_scores): plt.close('all') phone_scores = [phs for wrd, phs in phone_scores] phone_scores = [lc for phs in phone_scores for lc in phs] phone_scores = [[p,np.nan] if c == self.not_scored_value else [p,c] for p,c in phone_scores] for i in range(len(phone_scores)): if np.isnan(phone_scores[i][1]): prev_c = phone_scores[max(i-1,0)][1] # would be nan only in case when i==0 j = min(i+1,len(phone_scores)-1) next_c = np.nan while (np.isnan(next_c) and j < len(phone_scores)): next_c = phone_scores[j][1] j += 1 # at least one of these has value unless the entire stimulus is nan score phone_scores[i][1] = np.nanmean([prev_c, next_c]) fig, axs = plt.subplots( figsize=(7, 1.5 )) #plt.gca().set_aspect(1) plt.ylim(-1,1.5) axs.tick_params(left=False, bottom=False, labelleft=False, labelbottom=False) axs.pcolormesh([[c for p,c in phone_scores]], cmap="rainbow_r", norm=colors.Normalize(vmin=self.lower_bound_100-0.01, vmax=self.upper_bound_100+0.01,clip=True )) #cmap="plasma") fig.tight_layout() for phi, pinfo in enumerate(phone_scores): plt.text(phi+0.5,-0.5,pinfo[0], ha='center',va='center',color='black',size=12)#, rotation=rt) return fig # TODO: # add subphone / frame level DTW feedback shading? def generate_graphic_feedback_0(self, sound_path, word_aligns, phone_aligns, phone_feedback, opts): plt.close('all') rec_start = word_aligns[0][1] rec_end = word_aligns[-1][2] f0_data = self.get_pitch_tracks(sound_path) if f0_data: f_max = max([f0 for t,f0 in f0_data]) + 50 else: f_max = 400 fig, axes1 = plt.subplots(figsize=(15,3)) plt.xlim([rec_start, rec_end]) axes1.set_ylim([0.0, f_max]) axes1.get_xaxis().set_visible(False) for w,s,e in word_aligns: plt.vlines(s,0,f_max,linewidth=0.5,color='black') plt.vlines(e,0,f_max,linewidth=0.5,color='dimgrey') #plt.text( (s+e)/2 - (len(w)*.01), f_max+15, w, fontsize=15) plt.text( (s+e)/2, f_max+15, w.split('__')[1], fontsize=15, ha="center") # arrange aligns for graphs... phone_aligns = [(wrd,phs) for wrd, phs in phone_aligns.items()] phone_aligns = sorted(phone_aligns, key = lambda x: x[0][:3]) phone_amends = zip([s for w,s,e in word_aligns], [phs for wrd, phs in phone_aligns]) phone_aligns = [[(p, s+offset, e+offset) for p,s,e in wphones] for offset, wphones in phone_amends] phone_aligns = [p for wrps in phone_aligns for p in wrps] phone_feedback = [phs for wrd, phs in phone_feedback] phone_feedback = [p for wrps in phone_feedback for p in wrps] phone_infos = zip(phone_aligns, phone_feedback) # basic 3way phone to colour key #cdict = {-1: 'gray', 0: 'red', 1: 'blue'} cdict = {-1: 'gray', 0: "#E85907", 1: "#26701C"} for paln, pfbk in phone_infos: ph_id, s, e = paln c, p = pfbk plt.vlines(s,0,f_max,linewidth=0.3,color='cadetblue',linestyle=(0,(10,4))) plt.vlines(e,0,f_max,linewidth=0.3,color='cadetblue',linestyle=(0,(10,4))) plt.text( (s+e)/2 - (len(p)*.01), -1*f_max/10, p, fontsize=18, color = cdict[c])#color='teal') #f0c = "blueviolet" #enc = 'peachpuff' f0c = "#88447F" enc = "#F49098" axes1.scatter([t for t,f0 in f0_data], [f0 for t,f0 in f0_data], color=f0c) # add rmse w, sr = librosa.load(sound_path) fr_l = 2048 # librosa default h_l = 512 # default rmse = librosa.feature.rms(y=w, frame_length = fr_l, hop_length = h_l) rmse = rmse[0] # show rms energy, only if opts if opts: axes2 = axes1.twinx() axes2.set_ylim([0.0, 0.5]) rms_xval = [(h_l*i)/sr for i in range(len(rmse))] axes2.plot(rms_xval,rmse,color=enc,linewidth=3.5) fig.tight_layout() return fig