Spaces:
Running
Running
improved cantonese version
Browse files- analyzer/ASR_zh_hk.py +122 -142
analyzer/ASR_zh_hk.py
CHANGED
|
@@ -14,223 +14,230 @@ print(f"INFO: ASR_zh_hk.py is configured to use device: {DEVICE}")
|
|
| 14 |
|
| 15 |
MODEL_NAME = "HK0712/Wav2Vec2_Cantonese"
|
| 16 |
|
| 17 |
-
# --- 1.
|
| 18 |
-
def
|
| 19 |
"""
|
| 20 |
-
|
| 21 |
-
|
|
|
|
| 22 |
"""
|
| 23 |
-
|
| 24 |
-
|
| 25 |
-
|
| 26 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 27 |
|
| 28 |
# --- 2. 智慧 G2P 歸屬邏輯 (中文版) ---
|
| 29 |
def _get_target_jyutping_by_char(sentence: str) -> (list, list):
|
| 30 |
"""
|
| 31 |
將中文句子轉換為「字」級別的粵拼目標。
|
| 32 |
-
邏輯:
|
| 33 |
-
1. 使用 pycantonese 進行分詞與標音 (考慮變調)。
|
| 34 |
-
2. 將分詞結果(如 '蛋糕' -> 'daan6gou1')拆解回單字('蛋'->'daan6', '糕'->'gou1')。
|
| 35 |
-
3. 回傳 (原始字列表, 每個字的粵拼 component 列表)。
|
| 36 |
"""
|
| 37 |
-
# pycantonese
|
|
|
|
| 38 |
segmented_result = pycantonese.characters_to_jyutping(sentence)
|
| 39 |
|
| 40 |
original_chars_flat = []
|
| 41 |
target_jyutping_groups = []
|
| 42 |
|
| 43 |
-
|
|
|
|
| 44 |
|
| 45 |
for word_segment, jyutping_segment in segmented_result:
|
| 46 |
-
# 如果是標點符號或無讀音字符,pycantonese 可能回傳 None 或原字符
|
| 47 |
if not jyutping_segment:
|
| 48 |
-
# 對於標點,我們暫時忽略或保留,這裡選擇忽略以專注於發音
|
| 49 |
continue
|
| 50 |
|
| 51 |
-
|
| 52 |
-
syllables = jyutping_pattern.findall(jyutping_segment)
|
| 53 |
|
| 54 |
-
#
|
| 55 |
-
# 注意:這在極少數多音字或特殊情況下可能不完美,但對絕大多數情況適用
|
| 56 |
if len(word_segment) == len(syllables):
|
| 57 |
for char, syl in zip(word_segment, syllables):
|
| 58 |
original_chars_flat.append(char)
|
| 59 |
-
#
|
| 60 |
-
target_jyutping_groups.append(
|
| 61 |
else:
|
| 62 |
-
#
|
| 63 |
-
print(f"WARNING:
|
| 64 |
-
|
| 65 |
-
for i, char in enumerate(word_segment):
|
| 66 |
original_chars_flat.append(char)
|
| 67 |
-
|
| 68 |
-
|
|
|
|
|
|
|
| 69 |
else:
|
| 70 |
-
target_jyutping_groups.append([])
|
| 71 |
|
| 72 |
return original_chars_flat, target_jyutping_groups
|
| 73 |
|
| 74 |
# --- 3. 核心分析函數 (主入口) ---
|
| 75 |
def analyze(audio_file_path: str, target_sentence: str, cache: dict = {}) -> dict:
|
| 76 |
-
"""
|
| 77 |
-
接收音訊檔案路徑和目標句子,回傳詳細的發音分析字典。
|
| 78 |
-
"""
|
| 79 |
-
# 檢查快取中是否已有模型,如果沒有則載入
|
| 80 |
if "model" not in cache:
|
| 81 |
print(f"Cache miss (ASR_zh_hk). Loading model '{MODEL_NAME}'...")
|
| 82 |
try:
|
| 83 |
-
# 不需要顯式傳遞 token,依賴環境變數或 Hugging Face Space 登入狀態
|
| 84 |
cache["processor"] = AutoProcessor.from_pretrained(MODEL_NAME)
|
| 85 |
model = AutoModelForCTC.from_pretrained(MODEL_NAME)
|
| 86 |
|
| 87 |
-
# 【【【 CPU 加速優化 】】】
|
| 88 |
if DEVICE == "cpu":
|
| 89 |
-
print("⚠️ CPU
|
| 90 |
-
model = torch.quantization.quantize_dynamic(
|
| 91 |
-
model,
|
| 92 |
-
{torch.nn.Linear},
|
| 93 |
-
dtype=torch.qint8
|
| 94 |
-
)
|
| 95 |
|
| 96 |
model.to(DEVICE)
|
| 97 |
cache["model"] = model
|
| 98 |
-
print(f"Model '{MODEL_NAME}' loaded
|
| 99 |
except Exception as e:
|
| 100 |
-
|
| 101 |
-
raise RuntimeError(f"Failed to load model '{MODEL_NAME}': {e}")
|
| 102 |
|
| 103 |
processor = cache["processor"]
|
| 104 |
model = cache["model"]
|
| 105 |
|
| 106 |
-
# 1.
|
| 107 |
-
# target_chars: ['檔', '案']
|
| 108 |
-
# target_jyutping_by_char: [['d','o','n','g','2'], ['o','n','3']]
|
| 109 |
target_chars, target_jyutping_by_char = _get_target_jyutping_by_char(target_sentence)
|
| 110 |
|
| 111 |
-
# 2.
|
| 112 |
try:
|
| 113 |
speech, sample_rate = sf.read(audio_file_path)
|
| 114 |
if sample_rate != 16000:
|
| 115 |
speech = librosa.resample(y=speech, orig_sr=sample_rate, target_sr=16000)
|
| 116 |
except Exception as e:
|
| 117 |
-
raise IOError(f"
|
| 118 |
|
| 119 |
input_values = processor(speech, sampling_rate=16000, return_tensors="pt").input_values
|
| 120 |
-
|
| 121 |
-
# 如果是量化模型(CPU),不需要 input.to(device)
|
| 122 |
-
if DEVICE == "cuda":
|
| 123 |
-
input_values = input_values.to(DEVICE)
|
| 124 |
|
| 125 |
with torch.no_grad():
|
| 126 |
logits = model(input_values).logits
|
| 127 |
predicted_ids = torch.argmax(logits, dim=-1)
|
| 128 |
|
| 129 |
-
# 3.
|
| 130 |
-
#
|
| 131 |
raw_output_str = processor.decode(predicted_ids[0])
|
| 132 |
|
| 133 |
-
#
|
| 134 |
-
#
|
| 135 |
-
#
|
| 136 |
-
user_jyutping_full_str = raw_output_str.replace(" ", "")
|
| 137 |
|
| 138 |
-
#
|
| 139 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 140 |
|
| 141 |
-
# 5. 格式化輸出
|
| 142 |
return _format_to_json_structure(word_alignments, target_sentence, target_chars)
|
| 143 |
|
|
|
|
|
|
|
|
|
|
|
|
|
| 144 |
|
| 145 |
-
#
|
| 146 |
-
|
| 147 |
-
|
| 148 |
-
|
| 149 |
-
|
| 150 |
-
"""
|
| 151 |
-
# 將使用者字串轉為列表: "dong2" -> ['d','o','n','g','2']
|
| 152 |
-
user_phonemes = list(user_phoneme_str)
|
| 153 |
-
|
| 154 |
target_phonemes_flat = []
|
| 155 |
word_boundaries_indices = []
|
| 156 |
current_idx = 0
|
| 157 |
|
| 158 |
-
# 展平目標發音以便進行全局對齊
|
| 159 |
for word_ipa_tokens in target_words_ipa_tokenized:
|
| 160 |
target_phonemes_flat.extend(word_ipa_tokens)
|
| 161 |
current_idx += len(word_ipa_tokens)
|
| 162 |
word_boundaries_indices.append(current_idx - 1)
|
| 163 |
|
| 164 |
-
# DP
|
| 165 |
dp = np.zeros((len(user_phonemes) + 1, len(target_phonemes_flat) + 1))
|
| 166 |
for i in range(1, len(user_phonemes) + 1): dp[i][0] = i
|
| 167 |
for j in range(1, len(target_phonemes_flat) + 1): dp[0][j] = j
|
| 168 |
|
| 169 |
-
# 填充 DP 表
|
| 170 |
for i in range(1, len(user_phonemes) + 1):
|
| 171 |
for j in range(1, len(target_phonemes_flat) + 1):
|
| 172 |
cost = 0 if user_phonemes[i-1] == target_phonemes_flat[j-1] else 1
|
| 173 |
dp[i][j] = min(dp[i-1][j] + 1, dp[i][j-1] + 1, dp[i-1][j-1] + cost)
|
| 174 |
|
| 175 |
-
# 回溯 (Backtracking) 找最佳路徑
|
| 176 |
i, j = len(user_phonemes), len(target_phonemes_flat)
|
| 177 |
user_path, target_path = [], []
|
| 178 |
while i > 0 or j > 0:
|
| 179 |
cost = float('inf') if i == 0 or j == 0 else (0 if user_phonemes[i-1] == target_phonemes_flat[j-1] else 1)
|
| 180 |
if i > 0 and j > 0 and dp[i][j] == dp[i-1][j-1] + cost:
|
| 181 |
-
user_path.insert(0, user_phonemes[i-1])
|
| 182 |
-
target_path.insert(0, target_phonemes_flat[j-1])
|
| 183 |
-
i -= 1; j -= 1
|
| 184 |
elif i > 0 and dp[i][j] == dp[i-1][j] + 1:
|
| 185 |
-
user_path.insert(0, user_phonemes[i-1])
|
| 186 |
-
target_path.insert(0, '-')
|
| 187 |
-
i -= 1
|
| 188 |
else:
|
| 189 |
-
user_path.insert(0, '-')
|
| 190 |
-
target_path.insert(0, target_phonemes_flat[j-1])
|
| 191 |
-
j -= 1
|
| 192 |
|
| 193 |
-
# 根據單字邊界切分對齊結果
|
| 194 |
alignments_by_word = []
|
| 195 |
word_start_idx_in_path = 0
|
| 196 |
target_phoneme_counter_in_path = 0
|
| 197 |
-
|
| 198 |
num_words_to_align = len(target_words_ipa_tokenized)
|
| 199 |
current_word_idx = 0
|
| 200 |
|
| 201 |
-
if not target_path:
|
| 202 |
-
return []
|
| 203 |
|
| 204 |
for path_idx, p in enumerate(target_path):
|
| 205 |
if p != '-':
|
| 206 |
if target_phoneme_counter_in_path in word_boundaries_indices:
|
| 207 |
if current_word_idx < num_words_to_align:
|
| 208 |
-
target_alignment = target_path[word_start_idx_in_path : path_idx + 1]
|
| 209 |
-
user_alignment = user_path[word_start_idx_in_path : path_idx + 1]
|
| 210 |
-
|
| 211 |
alignments_by_word.append({
|
| 212 |
-
"target":
|
| 213 |
-
"user":
|
| 214 |
})
|
| 215 |
-
|
| 216 |
word_start_idx_in_path = path_idx + 1
|
| 217 |
current_word_idx += 1
|
| 218 |
-
|
| 219 |
target_phoneme_counter_in_path += 1
|
| 220 |
|
| 221 |
-
# 處理最後一個字(如果有的話)
|
| 222 |
if word_start_idx_in_path < len(target_path) and current_word_idx < num_words_to_align:
|
| 223 |
-
target_alignment = target_path[word_start_idx_in_path:]
|
| 224 |
-
user_alignment = user_path[word_start_idx_in_path:]
|
| 225 |
alignments_by_word.append({
|
| 226 |
-
"target":
|
| 227 |
-
"user":
|
| 228 |
})
|
| 229 |
|
| 230 |
return alignments_by_word
|
| 231 |
|
| 232 |
-
|
| 233 |
-
# --- 5. 格式化函數 (JSON Output) ---
|
| 234 |
def _format_to_json_structure(alignments, sentence, original_words) -> dict:
|
| 235 |
total_phonemes = 0
|
| 236 |
total_errors = 0
|
|
@@ -251,63 +258,36 @@ def _format_to_json_structure(alignments, sentence, original_words) -> dict:
|
|
| 251 |
target_p = alignment['target'][j]
|
| 252 |
user_p = alignment['user'][j]
|
| 253 |
is_match = (user_p == target_p)
|
| 254 |
-
|
| 255 |
-
phonemes_data.append({
|
| 256 |
-
"target": target_p,
|
| 257 |
-
"user": user_p,
|
| 258 |
-
"isMatch": is_match
|
| 259 |
-
})
|
| 260 |
-
|
| 261 |
if not is_match:
|
| 262 |
word_is_correct = False
|
| 263 |
-
if not (user_p == '-' and target_p == '-'):
|
| 264 |
-
total_errors += 1
|
| 265 |
total_phonemes += sum(1 for p in alignment['target'] if p != '-')
|
| 266 |
|
| 267 |
-
if word_is_correct and phonemes_data:
|
| 268 |
-
|
| 269 |
-
|
| 270 |
-
words_data.append({
|
| 271 |
-
"word": original_words[i],
|
| 272 |
-
"isCorrect": word_is_correct,
|
| 273 |
-
"phonemes": phonemes_data
|
| 274 |
-
})
|
| 275 |
|
| 276 |
total_words = len(original_words)
|
| 277 |
-
# 處理漏讀的字
|
| 278 |
if len(words_data) < total_words:
|
| 279 |
-
# 需要計算剩餘字的預期 phonemes
|
| 280 |
_, remaining_targets = _get_target_jyutping_by_char("".join(original_words[len(words_data):]))
|
| 281 |
-
|
| 282 |
for i, target_group in enumerate(remaining_targets):
|
| 283 |
-
|
| 284 |
-
|
| 285 |
-
|
| 286 |
-
phonemes_data.append({"target": p_char, "user": "-", "isMatch": False})
|
| 287 |
-
total_errors += 1
|
| 288 |
-
total_phonemes += 1
|
| 289 |
-
|
| 290 |
-
words_data.append({
|
| 291 |
-
"word": original_words[current_word_idx],
|
| 292 |
-
"isCorrect": False,
|
| 293 |
-
"phonemes": phonemes_data
|
| 294 |
-
})
|
| 295 |
|
| 296 |
-
|
| 297 |
-
|
| 298 |
|
| 299 |
-
|
| 300 |
"sentence": sentence,
|
| 301 |
"analysisTimestampUTC": datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S (UTC)'),
|
| 302 |
"summary": {
|
| 303 |
-
"overallScore": round(
|
| 304 |
"totalWords": total_words,
|
| 305 |
"correctWords": correct_words_count,
|
| 306 |
-
"phonemeErrorRate": round(
|
| 307 |
"total_errors": total_errors,
|
| 308 |
"total_target_phonemes": total_phonemes
|
| 309 |
},
|
| 310 |
"words": words_data
|
| 311 |
-
}
|
| 312 |
-
|
| 313 |
-
return final_result
|
|
|
|
| 14 |
|
| 15 |
MODEL_NAME = "HK0712/Wav2Vec2_Cantonese"
|
| 16 |
|
| 17 |
+
# --- 1. 輔助函數:粵拼智慧切分器 (Linguistic Split) ---
|
| 18 |
+
def _tokenize_jyutping_smart(jyutping_str: str) -> list:
|
| 19 |
"""
|
| 20 |
+
將單個粵拼音節 (如 'gwong2') 根據聲韻學結構切分為 token。
|
| 21 |
+
Target: 'gwong2' -> ['gw', 'o', 'ng', '2']
|
| 22 |
+
這樣前端顯示時會是 "gw o ng 2",比 "g w o n g 2" 易讀得多。
|
| 23 |
"""
|
| 24 |
+
try:
|
| 25 |
+
# pycantonese.parse_jyutping 回傳的是一個列表,包含 Jyutping 物件
|
| 26 |
+
# 例如: parse_jyutping('gwong2') -> [Jyutping(onset='gw', nucleus='o', coda='ng', tone='2')]
|
| 27 |
+
parsed = pycantonese.parse_jyutping(jyutping_str)
|
| 28 |
+
|
| 29 |
+
tokens = []
|
| 30 |
+
for jp in parsed:
|
| 31 |
+
if jp.onset: tokens.append(jp.onset)
|
| 32 |
+
if jp.nucleus: tokens.append(jp.nucleus)
|
| 33 |
+
if jp.coda: tokens.append(jp.coda)
|
| 34 |
+
if jp.tone: tokens.append(jp.tone)
|
| 35 |
+
|
| 36 |
+
return tokens
|
| 37 |
+
except:
|
| 38 |
+
# 萬一解析失敗(例如模型輸出的拼音不標準),回退到簡單切分
|
| 39 |
+
# 但保留數字作為獨立 token
|
| 40 |
+
return re.findall(r'[a-z]+|[0-9]', jyutping_str)
|
| 41 |
|
| 42 |
# --- 2. 智慧 G2P 歸屬邏輯 (中文版) ---
|
| 43 |
def _get_target_jyutping_by_char(sentence: str) -> (list, list):
|
| 44 |
"""
|
| 45 |
將中文句子轉換為「字」級別的粵拼目標。
|
|
|
|
|
|
|
|
|
|
|
|
|
| 46 |
"""
|
| 47 |
+
# pycantonese.characters_to_jyutping 會處理變調與分詞
|
| 48 |
+
# 範例: "廣東話" -> [('廣東話', 'gwong2dung1waa2')]
|
| 49 |
segmented_result = pycantonese.characters_to_jyutping(sentence)
|
| 50 |
|
| 51 |
original_chars_flat = []
|
| 52 |
target_jyutping_groups = []
|
| 53 |
|
| 54 |
+
# 簡單的正則表達式,用來把連在一起的拼音分開 (e.g. 'gwong2dung1' -> 'gwong2', 'dung1')
|
| 55 |
+
jyutping_syllable_pattern = re.compile(r'([a-z]+[1-6])')
|
| 56 |
|
| 57 |
for word_segment, jyutping_segment in segmented_result:
|
|
|
|
| 58 |
if not jyutping_segment:
|
|
|
|
| 59 |
continue
|
| 60 |
|
| 61 |
+
syllables = jyutping_syllable_pattern.findall(jyutping_segment)
|
|
|
|
| 62 |
|
| 63 |
+
# 嘗試將分詞後的結果對齊回單個漢字
|
|
|
|
| 64 |
if len(word_segment) == len(syllables):
|
| 65 |
for char, syl in zip(word_segment, syllables):
|
| 66 |
original_chars_flat.append(char)
|
| 67 |
+
# 使用智慧切分:'gwong2' -> ['gw', 'o', 'ng', '2']
|
| 68 |
+
target_jyutping_groups.append(_tokenize_jyutping_smart(syl))
|
| 69 |
else:
|
| 70 |
+
# 長度不匹配時的備用方案 (逐字處理)
|
| 71 |
+
print(f"WARNING: Mismatch length for {word_segment}. Fallback to char-by-char G2P.")
|
| 72 |
+
for char in word_segment:
|
|
|
|
| 73 |
original_chars_flat.append(char)
|
| 74 |
+
# 對單字再做一次 G2P
|
| 75 |
+
single_res = pycantonese.characters_to_jyutping(char)
|
| 76 |
+
if single_res and single_res[0][1]:
|
| 77 |
+
target_jyutping_groups.append(_tokenize_jyutping_smart(single_res[0][1]))
|
| 78 |
else:
|
| 79 |
+
target_jyutping_groups.append([])
|
| 80 |
|
| 81 |
return original_chars_flat, target_jyutping_groups
|
| 82 |
|
| 83 |
# --- 3. 核心分析函數 (主入口) ---
|
| 84 |
def analyze(audio_file_path: str, target_sentence: str, cache: dict = {}) -> dict:
|
|
|
|
|
|
|
|
|
|
|
|
|
| 85 |
if "model" not in cache:
|
| 86 |
print(f"Cache miss (ASR_zh_hk). Loading model '{MODEL_NAME}'...")
|
| 87 |
try:
|
|
|
|
| 88 |
cache["processor"] = AutoProcessor.from_pretrained(MODEL_NAME)
|
| 89 |
model = AutoModelForCTC.from_pretrained(MODEL_NAME)
|
| 90 |
|
|
|
|
| 91 |
if DEVICE == "cpu":
|
| 92 |
+
print("⚠️ CPU detected. Quantizing model...")
|
| 93 |
+
model = torch.quantization.quantize_dynamic(model, {torch.nn.Linear}, dtype=torch.qint8)
|
|
|
|
|
|
|
|
|
|
|
|
|
| 94 |
|
| 95 |
model.to(DEVICE)
|
| 96 |
cache["model"] = model
|
| 97 |
+
print(f"Model '{MODEL_NAME}' loaded.")
|
| 98 |
except Exception as e:
|
| 99 |
+
raise RuntimeError(f"Failed to load model: {e}")
|
|
|
|
| 100 |
|
| 101 |
processor = cache["processor"]
|
| 102 |
model = cache["model"]
|
| 103 |
|
| 104 |
+
# 1. 準備目標 (Target)
|
|
|
|
|
|
|
| 105 |
target_chars, target_jyutping_by_char = _get_target_jyutping_by_char(target_sentence)
|
| 106 |
|
| 107 |
+
# 2. 推理 (Inference)
|
| 108 |
try:
|
| 109 |
speech, sample_rate = sf.read(audio_file_path)
|
| 110 |
if sample_rate != 16000:
|
| 111 |
speech = librosa.resample(y=speech, orig_sr=sample_rate, target_sr=16000)
|
| 112 |
except Exception as e:
|
| 113 |
+
raise IOError(f"Audio error: {e}")
|
| 114 |
|
| 115 |
input_values = processor(speech, sampling_rate=16000, return_tensors="pt").input_values
|
| 116 |
+
if DEVICE == "cuda": input_values = input_values.to(DEVICE)
|
|
|
|
|
|
|
|
|
|
| 117 |
|
| 118 |
with torch.no_grad():
|
| 119 |
logits = model(input_values).logits
|
| 120 |
predicted_ids = torch.argmax(logits, dim=-1)
|
| 121 |
|
| 122 |
+
# 3. 獲取使用者輸出 (User Output)
|
| 123 |
+
# 模型輸出: "gwong2 dung1 waa2" (字串)
|
| 124 |
raw_output_str = processor.decode(predicted_ids[0])
|
| 125 |
|
| 126 |
+
# 清理並準備對齊
|
| 127 |
+
# 我們需要把用戶的輸出也變成 ['gw', 'o', 'ng', '2', 'd', 'u', 'ng', '1'...] 的流
|
| 128 |
+
# 這樣才能跟 Target 的結構對齊
|
|
|
|
| 129 |
|
| 130 |
+
# 步驟 A: 移除空格,變成連續字串 "gwong2dung1waa2"
|
| 131 |
+
# 注意:這一步假設模型輸出的拼音是標準的。如果模型輸出亂碼,tokenize 可能會切得不完美,
|
| 132 |
+
# 但 Needleman-Wunsch 算法會處理這些 mismatch,所以沒關係。
|
| 133 |
+
user_jyutping_clean = raw_output_str.replace(" ", "")
|
| 134 |
+
|
| 135 |
+
# 步驟 B: 使用相同的邏輯切分用戶輸入
|
| 136 |
+
# 因為用戶輸入是一長串,我們用正則表達式把 [a-z] 和 [0-9] 分開,或者嘗試 parse
|
| 137 |
+
# 這裡用一個簡單的策略:把它當作一連串的 components
|
| 138 |
+
# 為了最佳對齊,我們這裡還是用 "Character + Number" 的粒度比較好,
|
| 139 |
+
# 因為用戶可能讀錯導致無法形成合法的 onset/nucleus。
|
| 140 |
+
#
|
| 141 |
+
# ★ 關鍵決策:為了避免用戶讀錯導致 crash,用戶端我們使用較細的粒度 (Regex Split),
|
| 142 |
+
# 然後讓對齊算法去匹配 Target 的 "gw", "o", "ng"。
|
| 143 |
+
# 等等,如果 Target 是 "gw" (1個token),User 是 "g", "w" (2個 tokens),對齊會錯位。
|
| 144 |
+
#
|
| 145 |
+
# ★ 修正策略:
|
| 146 |
+
# 我們也嘗試用 pycantonese.parse_jyutping 去解析用戶的整句輸出。
|
| 147 |
+
# 如果解析成功,我們就��結構化 token。如果失敗(亂讀),回退到字母切分。
|
| 148 |
+
|
| 149 |
+
user_tokens = []
|
| 150 |
+
# 嘗試把用戶輸出拆成音節 (e.g. "gwong2", "dung1")
|
| 151 |
+
user_syllables = re.findall(r'[a-z]+[0-9]', raw_output_str)
|
| 152 |
+
|
| 153 |
+
if user_syllables:
|
| 154 |
+
# 如果能抓到音節,就用結構化切分
|
| 155 |
+
for syl in user_syllables:
|
| 156 |
+
user_tokens.extend(_tokenize_jyutping_smart(syl))
|
| 157 |
+
else:
|
| 158 |
+
# 如果抓不到(例如沒聲調),就退化成字母切分
|
| 159 |
+
# 但這會導致跟 Target (gw) 對不上。
|
| 160 |
+
# 為了保險,我們這裡對於 Target 也許應該退化成簡單切分?
|
| 161 |
+
# 不,Target 是 Ground Truth,應該保持結構。
|
| 162 |
+
#
|
| 163 |
+
# 最終方案:讓 User stream 盡量 "粘" 在一起。
|
| 164 |
+
# 實際上,Wav2Vec2 輸出的通常是標準拼音。我們直接用 smart parse。
|
| 165 |
+
user_tokens = _tokenize_jyutping_smart(raw_output_str)
|
| 166 |
+
|
| 167 |
+
|
| 168 |
+
# 4. 對齊 (Alignment)
|
| 169 |
+
word_alignments = _get_phoneme_alignments_by_word(user_tokens, target_jyutping_by_char)
|
| 170 |
|
|
|
|
| 171 |
return _format_to_json_structure(word_alignments, target_sentence, target_chars)
|
| 172 |
|
| 173 |
+
# --- 4. 對齊與格式化 (保持原樣或微調) ---
|
| 174 |
+
# 這裡的邏輯與之前相同,不需要大改,因為它只是比較兩個 list 的相似度。
|
| 175 |
+
# 只要 user_tokens 和 target_jyutping_by_char 的元素 (token) 粒度一致即可。
|
| 176 |
+
# ... ( _get_phoneme_alignments_by_word 與 _format_to_json_structure 代碼同上) ...
|
| 177 |
|
| 178 |
+
# 為了節省篇幅,請使用上一版提供的 _get_phoneme_alignments_by_word 和 _format_to_json_structure
|
| 179 |
+
# 只需要替換上面的 _tokenize_jyutping_smart 和 analyze 函數即可。
|
| 180 |
+
# 下面我會把完整的 _get_phoneme_alignments_by_word 貼上以確保完整性。
|
| 181 |
+
|
| 182 |
+
def _get_phoneme_alignments_by_word(user_phonemes, target_words_ipa_tokenized):
|
|
|
|
|
|
|
|
|
|
|
|
|
| 183 |
target_phonemes_flat = []
|
| 184 |
word_boundaries_indices = []
|
| 185 |
current_idx = 0
|
| 186 |
|
|
|
|
| 187 |
for word_ipa_tokens in target_words_ipa_tokenized:
|
| 188 |
target_phonemes_flat.extend(word_ipa_tokens)
|
| 189 |
current_idx += len(word_ipa_tokens)
|
| 190 |
word_boundaries_indices.append(current_idx - 1)
|
| 191 |
|
| 192 |
+
# DP Matrix
|
| 193 |
dp = np.zeros((len(user_phonemes) + 1, len(target_phonemes_flat) + 1))
|
| 194 |
for i in range(1, len(user_phonemes) + 1): dp[i][0] = i
|
| 195 |
for j in range(1, len(target_phonemes_flat) + 1): dp[0][j] = j
|
| 196 |
|
|
|
|
| 197 |
for i in range(1, len(user_phonemes) + 1):
|
| 198 |
for j in range(1, len(target_phonemes_flat) + 1):
|
| 199 |
cost = 0 if user_phonemes[i-1] == target_phonemes_flat[j-1] else 1
|
| 200 |
dp[i][j] = min(dp[i-1][j] + 1, dp[i][j-1] + 1, dp[i-1][j-1] + cost)
|
| 201 |
|
|
|
|
| 202 |
i, j = len(user_phonemes), len(target_phonemes_flat)
|
| 203 |
user_path, target_path = [], []
|
| 204 |
while i > 0 or j > 0:
|
| 205 |
cost = float('inf') if i == 0 or j == 0 else (0 if user_phonemes[i-1] == target_phonemes_flat[j-1] else 1)
|
| 206 |
if i > 0 and j > 0 and dp[i][j] == dp[i-1][j-1] + cost:
|
| 207 |
+
user_path.insert(0, user_phonemes[i-1]); target_path.insert(0, target_phonemes_flat[j-1]); i -= 1; j -= 1
|
|
|
|
|
|
|
| 208 |
elif i > 0 and dp[i][j] == dp[i-1][j] + 1:
|
| 209 |
+
user_path.insert(0, user_phonemes[i-1]); target_path.insert(0, '-'); i -= 1
|
|
|
|
|
|
|
| 210 |
else:
|
| 211 |
+
user_path.insert(0, '-'); target_path.insert(0, target_phonemes_flat[j-1]); j -= 1
|
|
|
|
|
|
|
| 212 |
|
|
|
|
| 213 |
alignments_by_word = []
|
| 214 |
word_start_idx_in_path = 0
|
| 215 |
target_phoneme_counter_in_path = 0
|
|
|
|
| 216 |
num_words_to_align = len(target_words_ipa_tokenized)
|
| 217 |
current_word_idx = 0
|
| 218 |
|
| 219 |
+
if not target_path: return []
|
|
|
|
| 220 |
|
| 221 |
for path_idx, p in enumerate(target_path):
|
| 222 |
if p != '-':
|
| 223 |
if target_phoneme_counter_in_path in word_boundaries_indices:
|
| 224 |
if current_word_idx < num_words_to_align:
|
|
|
|
|
|
|
|
|
|
| 225 |
alignments_by_word.append({
|
| 226 |
+
"target": target_path[word_start_idx_in_path : path_idx + 1],
|
| 227 |
+
"user": user_path[word_start_idx_in_path : path_idx + 1]
|
| 228 |
})
|
|
|
|
| 229 |
word_start_idx_in_path = path_idx + 1
|
| 230 |
current_word_idx += 1
|
|
|
|
| 231 |
target_phoneme_counter_in_path += 1
|
| 232 |
|
|
|
|
| 233 |
if word_start_idx_in_path < len(target_path) and current_word_idx < num_words_to_align:
|
|
|
|
|
|
|
| 234 |
alignments_by_word.append({
|
| 235 |
+
"target": target_path[word_start_idx_in_path:],
|
| 236 |
+
"user": user_path[word_start_idx_in_path:]
|
| 237 |
})
|
| 238 |
|
| 239 |
return alignments_by_word
|
| 240 |
|
|
|
|
|
|
|
| 241 |
def _format_to_json_structure(alignments, sentence, original_words) -> dict:
|
| 242 |
total_phonemes = 0
|
| 243 |
total_errors = 0
|
|
|
|
| 258 |
target_p = alignment['target'][j]
|
| 259 |
user_p = alignment['user'][j]
|
| 260 |
is_match = (user_p == target_p)
|
| 261 |
+
phonemes_data.append({"target": target_p, "user": user_p, "isMatch": is_match})
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 262 |
if not is_match:
|
| 263 |
word_is_correct = False
|
| 264 |
+
if not (user_p == '-' and target_p == '-'): total_errors += 1
|
|
|
|
| 265 |
total_phonemes += sum(1 for p in alignment['target'] if p != '-')
|
| 266 |
|
| 267 |
+
if word_is_correct and phonemes_data: correct_words_count += 1
|
| 268 |
+
words_data.append({"word": original_words[i], "isCorrect": word_is_correct, "phonemes": phonemes_data})
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 269 |
|
| 270 |
total_words = len(original_words)
|
|
|
|
| 271 |
if len(words_data) < total_words:
|
|
|
|
| 272 |
_, remaining_targets = _get_target_jyutping_by_char("".join(original_words[len(words_data):]))
|
|
|
|
| 273 |
for i, target_group in enumerate(remaining_targets):
|
| 274 |
+
phonemes_data = [{"target": p, "user": "-", "isMatch": False} for p in target_group]
|
| 275 |
+
for _ in target_group: total_errors += 1; total_phonemes += 1
|
| 276 |
+
words_data.append({"word": original_words[len(words_data)], "isCorrect": False, "phonemes": phonemes_data})
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 277 |
|
| 278 |
+
score = (correct_words_count / total_words) * 100 if total_words > 0 else 0
|
| 279 |
+
per = (total_errors / total_phonemes) * 100 if total_phonemes > 0 else 0
|
| 280 |
|
| 281 |
+
return {
|
| 282 |
"sentence": sentence,
|
| 283 |
"analysisTimestampUTC": datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S (UTC)'),
|
| 284 |
"summary": {
|
| 285 |
+
"overallScore": round(score, 1),
|
| 286 |
"totalWords": total_words,
|
| 287 |
"correctWords": correct_words_count,
|
| 288 |
+
"phonemeErrorRate": round(per, 2),
|
| 289 |
"total_errors": total_errors,
|
| 290 |
"total_target_phonemes": total_phonemes
|
| 291 |
},
|
| 292 |
"words": words_data
|
| 293 |
+
}
|
|
|
|
|
|