rikhoffbauer2 commited on
Commit
63c7ca6
·
verified ·
1 Parent(s): 1314870

Upload lyric_sync/align.py

Browse files
Files changed (1) hide show
  1. lyric_sync/align.py +406 -0
lyric_sync/align.py ADDED
@@ -0,0 +1,406 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Sequence alignment: map timed ASR transcript onto reference lyrics.
3
+
4
+ Transfers word-level timestamps from the imperfect ASR output to the correct
5
+ reference lyrics text. Handles ASR errors (substitutions, insertions, deletions)
6
+ using edit-distance-based alignment.
7
+
8
+ Approach:
9
+ 1. Normalize both word sequences (lowercase, strip punctuation)
10
+ 2. Compute optimal alignment using difflib SequenceMatcher (global LCS-based)
11
+ 3. For 'equal' blocks: direct timestamp transfer
12
+ 4. For 'replace' blocks: linear interpolation across block
13
+ 5. For 'insert' blocks (ASR missed words): interpolate from neighbors
14
+ 6. For 'delete' blocks (ASR hallucinated): skip
15
+
16
+ Optional enhancement: Use rapidfuzz for phonetic fuzzy matching before structural
17
+ alignment to handle common ASR phonetic errors ("gonna"→"going to", etc.)
18
+ """
19
+
20
+ import logging
21
+ import re
22
+ import unicodedata
23
+ from dataclasses import dataclass
24
+ from difflib import SequenceMatcher
25
+ from typing import Optional
26
+
27
+ from lyric_sync.transcribe import TimedWord
28
+
29
+ logger = logging.getLogger(__name__)
30
+
31
+
32
+ @dataclass
33
+ class AlignmentStats:
34
+ """Statistics about the alignment quality."""
35
+ total_ref_words: int
36
+ directly_matched: int # exact timestamp transfer
37
+ interpolated: int # timing estimated via interpolation
38
+ unmatched: int # no timing could be assigned
39
+
40
+ @property
41
+ def match_rate(self) -> float:
42
+ """Fraction of reference words with direct ASR timestamp matches."""
43
+ if self.total_ref_words == 0:
44
+ return 0.0
45
+ return self.directly_matched / self.total_ref_words
46
+
47
+ @property
48
+ def coverage(self) -> float:
49
+ """Fraction of reference words that got any timing (direct or interpolated)."""
50
+ if self.total_ref_words == 0:
51
+ return 0.0
52
+ return (self.directly_matched + self.interpolated) / self.total_ref_words
53
+
54
+
55
+ def normalize_word(word: str) -> str:
56
+ """
57
+ Normalize a word for alignment matching.
58
+ Lowercase, strip punctuation, normalize unicode, expand contractions.
59
+ """
60
+ # Unicode normalize
61
+ word = unicodedata.normalize("NFKD", word)
62
+ # Lowercase
63
+ word = word.lower()
64
+ # Strip common punctuation (preserve apostrophes for contractions)
65
+ word = re.sub(r"[^\w']", "", word)
66
+ # Remove leading/trailing apostrophes
67
+ word = word.strip("'")
68
+ return word
69
+
70
+
71
+ def normalize_for_matching(words: list[str]) -> list[str]:
72
+ """Normalize a word list for sequence matching."""
73
+ normalized = []
74
+ for w in words:
75
+ n = normalize_word(w)
76
+ if n: # skip empty strings from punctuation-only tokens
77
+ normalized.append(n)
78
+ return normalized
79
+
80
+
81
+ def expand_contractions(text: str) -> str:
82
+ """Expand common English contractions for better ASR↔lyrics matching."""
83
+ contractions = {
84
+ "don't": "do not", "doesn't": "does not", "didn't": "did not",
85
+ "won't": "will not", "wouldn't": "would not", "couldn't": "could not",
86
+ "shouldn't": "should not", "can't": "cannot", "isn't": "is not",
87
+ "aren't": "are not", "wasn't": "was not", "weren't": "were not",
88
+ "haven't": "have not", "hasn't": "has not", "hadn't": "had not",
89
+ "i'm": "i am", "you're": "you are", "we're": "we are",
90
+ "they're": "they are", "he's": "he is", "she's": "she is",
91
+ "it's": "it is", "that's": "that is", "what's": "what is",
92
+ "i've": "i have", "you've": "you have", "we've": "we have",
93
+ "they've": "they have", "i'll": "i will", "you'll": "you will",
94
+ "we'll": "we will", "they'll": "they will", "he'll": "he will",
95
+ "she'll": "she will", "it'll": "it will", "let's": "let us",
96
+ "gonna": "going to", "wanna": "want to", "gotta": "got to",
97
+ "'cause": "because", "cause": "because",
98
+ }
99
+ lower = text.lower()
100
+ for contraction, expansion in contractions.items():
101
+ lower = lower.replace(contraction, expansion)
102
+ return lower
103
+
104
+
105
+ def align_words(
106
+ asr_words: list[TimedWord],
107
+ ref_words: list[str],
108
+ fuzzy_threshold: float = 0.75,
109
+ use_fuzzy_prepass: bool = True,
110
+ ) -> tuple[list[TimedWord], AlignmentStats]:
111
+ """
112
+ Align ASR-timed words onto reference lyrics, transferring timestamps.
113
+
114
+ This is the core alignment function. It handles:
115
+ - Exact matches (direct timestamp transfer)
116
+ - Substitution errors (phonetic variants, interpolation)
117
+ - Insertions in ASR (hallucinated words, skipped)
118
+ - Deletions in ASR (missed words, timestamps interpolated)
119
+
120
+ Args:
121
+ asr_words: Word-level timed transcript from ASR
122
+ ref_words: Reference (correct) lyrics word list
123
+ fuzzy_threshold: Minimum similarity for fuzzy pre-matching (0-1)
124
+ use_fuzzy_prepass: Whether to fuzzy-normalize ASR words before alignment
125
+
126
+ Returns:
127
+ (aligned_words, stats) — ref_words with timestamps, and quality metrics
128
+ """
129
+ if not asr_words or not ref_words:
130
+ return [], AlignmentStats(len(ref_words), 0, 0, len(ref_words))
131
+
132
+ # Normalize both sequences for matching
133
+ asr_normalized = normalize_for_matching([w.word for w in asr_words])
134
+ ref_normalized = normalize_for_matching(ref_words)
135
+
136
+ # Build index mapping: normalized position → original position
137
+ # (normalization may remove empty-string words from punctuation-only tokens)
138
+ asr_to_orig = _build_index_map([w.word for w in asr_words], asr_normalized)
139
+ ref_to_orig = _build_index_map(ref_words, ref_normalized)
140
+
141
+ # Optional fuzzy pre-pass: try to pre-match phonetically similar words
142
+ if use_fuzzy_prepass:
143
+ asr_normalized = _fuzzy_normalize(asr_normalized, ref_normalized, fuzzy_threshold)
144
+
145
+ # Compute alignment using SequenceMatcher (LCS-based global alignment)
146
+ sm = SequenceMatcher(None, asr_normalized, ref_normalized, autojunk=False)
147
+ opcodes = sm.get_opcodes()
148
+
149
+ # Initialize result with None timestamps
150
+ result = [TimedWord(word=w, start=0.0, end=0.0, confidence=0.0) for w in ref_words]
151
+
152
+ stats = AlignmentStats(total_ref_words=len(ref_words), directly_matched=0, interpolated=0, unmatched=0)
153
+
154
+ for tag, i1, i2, j1, j2 in opcodes:
155
+ if tag == "equal":
156
+ # Direct timestamp transfer — highest confidence
157
+ for asr_idx, ref_idx in zip(range(i1, i2), range(j1, j2)):
158
+ orig_asr_idx = asr_to_orig[asr_idx]
159
+ orig_ref_idx = ref_to_orig[ref_idx]
160
+ if orig_asr_idx < len(asr_words) and orig_ref_idx < len(result):
161
+ result[orig_ref_idx] = TimedWord(
162
+ word=ref_words[orig_ref_idx],
163
+ start=asr_words[orig_asr_idx].start,
164
+ end=asr_words[orig_asr_idx].end,
165
+ confidence=asr_words[orig_asr_idx].confidence,
166
+ )
167
+ stats.directly_matched += 1
168
+
169
+ elif tag == "replace":
170
+ # ASR has different words — interpolate timestamps across the block
171
+ orig_asr_start = asr_to_orig[i1]
172
+ orig_asr_end = asr_to_orig[i2 - 1]
173
+ t_start = asr_words[orig_asr_start].start
174
+ t_end = asr_words[orig_asr_end].end
175
+
176
+ n_ref = j2 - j1
177
+ duration = t_end - t_start
178
+
179
+ for k, ref_idx in enumerate(range(j1, j2)):
180
+ orig_ref_idx = ref_to_orig[ref_idx]
181
+ if orig_ref_idx < len(result):
182
+ result[orig_ref_idx] = TimedWord(
183
+ word=ref_words[orig_ref_idx],
184
+ start=t_start + k * duration / n_ref,
185
+ end=t_start + (k + 1) * duration / n_ref,
186
+ confidence=0.5, # interpolated = lower confidence
187
+ )
188
+ stats.interpolated += 1
189
+
190
+ elif tag == "delete":
191
+ # ASR produced words not in reference — skip them
192
+ pass
193
+
194
+ elif tag == "insert":
195
+ # Reference has words ASR missed — interpolate from context
196
+ for ref_idx in range(j1, j2):
197
+ orig_ref_idx = ref_to_orig[ref_idx]
198
+ stats.interpolated += 1
199
+ # Will be filled in the gap-filling pass below
200
+
201
+ # Gap-filling pass: interpolate timestamps for any words still at (0, 0)
202
+ _fill_gaps(result)
203
+
204
+ # Count unmatched
205
+ stats.unmatched = sum(1 for w in result if w.start == 0.0 and w.end == 0.0)
206
+
207
+ logger.info(
208
+ f"Alignment: {stats.directly_matched}/{stats.total_ref_words} direct matches "
209
+ f"({stats.match_rate:.1%}), {stats.interpolated} interpolated, "
210
+ f"{stats.unmatched} unmatched"
211
+ )
212
+
213
+ return result, stats
214
+
215
+
216
+ def _build_index_map(original: list[str], normalized: list[str]) -> list[int]:
217
+ """
218
+ Build mapping from normalized index → original index.
219
+ Handles cases where normalization removes words (punctuation-only tokens).
220
+ """
221
+ mapping = []
222
+ orig_idx = 0
223
+ for norm_word in normalized:
224
+ while orig_idx < len(original):
225
+ if normalize_word(original[orig_idx]) == norm_word:
226
+ mapping.append(orig_idx)
227
+ orig_idx += 1
228
+ break
229
+ orig_idx += 1
230
+ return mapping
231
+
232
+
233
+ def _fuzzy_normalize(
234
+ asr_words: list[str],
235
+ ref_words: list[str],
236
+ threshold: float = 0.75,
237
+ ) -> list[str]:
238
+ """
239
+ Pre-normalize ASR words to their closest reference word if similar enough.
240
+ This helps SequenceMatcher find more 'equal' blocks.
241
+
242
+ Uses character-level edit distance ratio (no external dependency).
243
+ """
244
+ ref_set = set(ref_words)
245
+ if not ref_set:
246
+ return asr_words
247
+
248
+ result = []
249
+ for asr_w in asr_words:
250
+ if asr_w in ref_set:
251
+ result.append(asr_w)
252
+ continue
253
+
254
+ # Find closest reference word by edit distance
255
+ best_match = asr_w
256
+ best_ratio = 0.0
257
+
258
+ for ref_w in ref_set:
259
+ # Quick length filter
260
+ if abs(len(asr_w) - len(ref_w)) > max(len(asr_w), len(ref_w)) * 0.4:
261
+ continue
262
+ ratio = SequenceMatcher(None, asr_w, ref_w).ratio()
263
+ if ratio > best_ratio:
264
+ best_ratio = ratio
265
+ best_match = ref_w
266
+
267
+ if best_ratio >= threshold:
268
+ result.append(best_match)
269
+ else:
270
+ result.append(asr_w)
271
+
272
+ return result
273
+
274
+
275
+ def _fill_gaps(words: list[TimedWord]):
276
+ """
277
+ Fill in timestamps for words that didn't get assigned during alignment.
278
+ Uses linear interpolation between neighboring timed words.
279
+ """
280
+ # Find anchor points (words with valid timestamps)
281
+ anchors = [(i, w) for i, w in enumerate(words) if w.start > 0 or w.end > 0]
282
+
283
+ if not anchors:
284
+ return
285
+
286
+ # Fill gaps between anchors
287
+ for gap_start_idx in range(len(words)):
288
+ if words[gap_start_idx].start > 0 or words[gap_start_idx].end > 0:
289
+ continue
290
+
291
+ # Find surrounding anchors
292
+ prev_anchor = None
293
+ next_anchor = None
294
+
295
+ for i, w in reversed(list(enumerate(words[:gap_start_idx]))):
296
+ if w.start > 0 or w.end > 0:
297
+ prev_anchor = (i, w)
298
+ break
299
+
300
+ for i, w in enumerate(words[gap_start_idx:], gap_start_idx):
301
+ if (w.start > 0 or w.end > 0) and i != gap_start_idx:
302
+ next_anchor = (i, w)
303
+ break
304
+
305
+ # Interpolate
306
+ if prev_anchor and next_anchor:
307
+ prev_end = prev_anchor[1].end
308
+ next_start = next_anchor[1].start
309
+ gap_size = next_anchor[0] - prev_anchor[0] - 1
310
+ position_in_gap = gap_start_idx - prev_anchor[0] - 1
311
+
312
+ if gap_size > 0:
313
+ t_per_word = (next_start - prev_end) / (gap_size + 1)
314
+ words[gap_start_idx].start = prev_end + position_in_gap * t_per_word
315
+ words[gap_start_idx].end = prev_end + (position_in_gap + 1) * t_per_word
316
+ words[gap_start_idx].confidence = 0.3
317
+ elif prev_anchor:
318
+ # After last anchor — estimate ~0.3s per word
319
+ offset = gap_start_idx - prev_anchor[0]
320
+ words[gap_start_idx].start = prev_anchor[1].end + (offset - 1) * 0.3
321
+ words[gap_start_idx].end = prev_anchor[1].end + offset * 0.3
322
+ words[gap_start_idx].confidence = 0.2
323
+ elif next_anchor:
324
+ # Before first anchor — estimate backwards
325
+ offset = next_anchor[0] - gap_start_idx
326
+ words[gap_start_idx].start = max(0.0, next_anchor[1].start - offset * 0.3)
327
+ words[gap_start_idx].end = max(0.0, next_anchor[1].start - (offset - 1) * 0.3)
328
+ words[gap_start_idx].confidence = 0.2
329
+
330
+
331
+ def align_with_repeated_sections(
332
+ asr_words: list[TimedWord],
333
+ ref_words: list[str],
334
+ ref_lines: Optional[list[str]] = None,
335
+ ) -> tuple[list[TimedWord], AlignmentStats]:
336
+ """
337
+ Enhanced alignment that handles repeated sections (chorus, verse repeats).
338
+
339
+ Songs often have repeated lyrics (chorus appears 2-3 times). Naive global
340
+ alignment can misalign the second occurrence to the first's timestamps.
341
+
342
+ Strategy: Detect repeated line groups, then align each section independently
343
+ using time-windowed local alignment.
344
+
345
+ Args:
346
+ asr_words: Timed ASR words
347
+ ref_words: Full reference word list
348
+ ref_lines: Optional line-level structure for section detection
349
+
350
+ Returns:
351
+ (aligned_words, stats)
352
+ """
353
+ if not ref_lines:
354
+ # Fall back to simple alignment
355
+ return align_words(asr_words, ref_words)
356
+
357
+ # Detect repeated sections
358
+ sections = _detect_sections(ref_lines)
359
+
360
+ if not sections or len(sections) <= 1:
361
+ return align_words(asr_words, ref_words)
362
+
363
+ # Align each section independently with time windows
364
+ all_aligned = []
365
+ asr_cursor = 0
366
+ total_stats = AlignmentStats(len(ref_words), 0, 0, 0)
367
+
368
+ for section_words in sections:
369
+ # Estimate how many ASR words correspond to this section
370
+ ratio = len(section_words) / max(len(ref_words), 1)
371
+ estimated_asr_count = int(len(asr_words) * ratio * 1.3) # 30% margin
372
+
373
+ section_asr = asr_words[asr_cursor:asr_cursor + estimated_asr_count]
374
+ aligned_section, section_stats = align_words(section_asr, section_words)
375
+
376
+ all_aligned.extend(aligned_section)
377
+ asr_cursor += int(estimated_asr_count * 0.8) # advance with some overlap
378
+
379
+ total_stats.directly_matched += section_stats.directly_matched
380
+ total_stats.interpolated += section_stats.interpolated
381
+
382
+ total_stats.unmatched = total_stats.total_ref_words - total_stats.directly_matched - total_stats.interpolated
383
+ return all_aligned, total_stats
384
+
385
+
386
+ def _detect_sections(lines: list[str]) -> list[list[str]]:
387
+ """
388
+ Detect repeated sections in lyrics and split into alignable chunks.
389
+ Returns list of word-lists, one per section.
390
+ """
391
+ # Simple heuristic: split on blank lines or repeated line groups
392
+ sections = []
393
+ current_section = []
394
+
395
+ for line in lines:
396
+ if not line.strip():
397
+ if current_section:
398
+ sections.append(current_section)
399
+ current_section = []
400
+ else:
401
+ current_section.extend(line.split())
402
+
403
+ if current_section:
404
+ sections.append(current_section)
405
+
406
+ return sections if len(sections) > 1 else [sum(sections, [])]