| """ |
| Sequence alignment: map timed ASR transcript onto reference lyrics. |
| |
| Transfers word-level timestamps from the imperfect ASR output to the correct |
| reference lyrics text. Handles ASR errors (substitutions, insertions, deletions) |
| using edit-distance-based alignment. |
| |
| Approach: |
| 1. Normalize both word sequences (lowercase, strip punctuation) |
| 2. Compute optimal alignment using difflib SequenceMatcher (global LCS-based) |
| 3. For 'equal' blocks: direct timestamp transfer |
| 4. For 'replace' blocks: linear interpolation across block |
| 5. For 'insert' blocks (ASR missed words): interpolate from neighbors |
| 6. For 'delete' blocks (ASR hallucinated): skip |
| |
| Optional enhancement: Use rapidfuzz for phonetic fuzzy matching before structural |
| alignment to handle common ASR phonetic errors ("gonna"→"going to", etc.) |
| """ |
|
|
| import logging |
| import re |
| import unicodedata |
| from dataclasses import dataclass |
| from difflib import SequenceMatcher |
| from typing import Optional |
|
|
| from lyric_sync.transcribe import TimedWord |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| @dataclass |
| class AlignmentStats: |
| """Statistics about the alignment quality.""" |
| total_ref_words: int |
| directly_matched: int |
| interpolated: int |
| unmatched: int |
| |
| @property |
| def match_rate(self) -> float: |
| """Fraction of reference words with direct ASR timestamp matches.""" |
| if self.total_ref_words == 0: |
| return 0.0 |
| return self.directly_matched / self.total_ref_words |
|
|
| @property |
| def coverage(self) -> float: |
| """Fraction of reference words that got any timing (direct or interpolated).""" |
| if self.total_ref_words == 0: |
| return 0.0 |
| return (self.directly_matched + self.interpolated) / self.total_ref_words |
|
|
|
|
| def normalize_word(word: str) -> str: |
| """ |
| Normalize a word for alignment matching. |
| Lowercase, strip punctuation, normalize unicode, expand contractions. |
| """ |
| |
| word = unicodedata.normalize("NFKD", word) |
| |
| word = word.lower() |
| |
| word = re.sub(r"[^\w']", "", word) |
| |
| word = word.strip("'") |
| return word |
|
|
|
|
| def normalize_for_matching(words: list[str]) -> list[str]: |
| """Normalize a word list for sequence matching.""" |
| normalized = [] |
| for w in words: |
| n = normalize_word(w) |
| if n: |
| normalized.append(n) |
| return normalized |
|
|
|
|
| def expand_contractions(text: str) -> str: |
| """Expand common English contractions for better ASR↔lyrics matching.""" |
| contractions = { |
| "don't": "do not", "doesn't": "does not", "didn't": "did not", |
| "won't": "will not", "wouldn't": "would not", "couldn't": "could not", |
| "shouldn't": "should not", "can't": "cannot", "isn't": "is not", |
| "aren't": "are not", "wasn't": "was not", "weren't": "were not", |
| "haven't": "have not", "hasn't": "has not", "hadn't": "had not", |
| "i'm": "i am", "you're": "you are", "we're": "we are", |
| "they're": "they are", "he's": "he is", "she's": "she is", |
| "it's": "it is", "that's": "that is", "what's": "what is", |
| "i've": "i have", "you've": "you have", "we've": "we have", |
| "they've": "they have", "i'll": "i will", "you'll": "you will", |
| "we'll": "we will", "they'll": "they will", "he'll": "he will", |
| "she'll": "she will", "it'll": "it will", "let's": "let us", |
| "gonna": "going to", "wanna": "want to", "gotta": "got to", |
| "'cause": "because", "cause": "because", |
| } |
| lower = text.lower() |
| for contraction, expansion in contractions.items(): |
| lower = lower.replace(contraction, expansion) |
| return lower |
|
|
|
|
| def align_words( |
| asr_words: list[TimedWord], |
| ref_words: list[str], |
| fuzzy_threshold: float = 0.75, |
| use_fuzzy_prepass: bool = True, |
| ) -> tuple[list[TimedWord], AlignmentStats]: |
| """ |
| Align ASR-timed words onto reference lyrics, transferring timestamps. |
| |
| This is the core alignment function. It handles: |
| - Exact matches (direct timestamp transfer) |
| - Substitution errors (phonetic variants, interpolation) |
| - Insertions in ASR (hallucinated words, skipped) |
| - Deletions in ASR (missed words, timestamps interpolated) |
| |
| Args: |
| asr_words: Word-level timed transcript from ASR |
| ref_words: Reference (correct) lyrics word list |
| fuzzy_threshold: Minimum similarity for fuzzy pre-matching (0-1) |
| use_fuzzy_prepass: Whether to fuzzy-normalize ASR words before alignment |
| |
| Returns: |
| (aligned_words, stats) — ref_words with timestamps, and quality metrics |
| """ |
| if not asr_words or not ref_words: |
| return [], AlignmentStats(len(ref_words), 0, 0, len(ref_words)) |
|
|
| |
| asr_normalized = normalize_for_matching([w.word for w in asr_words]) |
| ref_normalized = normalize_for_matching(ref_words) |
|
|
| |
| |
| asr_to_orig = _build_index_map([w.word for w in asr_words], asr_normalized) |
| ref_to_orig = _build_index_map(ref_words, ref_normalized) |
|
|
| |
| if use_fuzzy_prepass: |
| asr_normalized = _fuzzy_normalize(asr_normalized, ref_normalized, fuzzy_threshold) |
|
|
| |
| sm = SequenceMatcher(None, asr_normalized, ref_normalized, autojunk=False) |
| opcodes = sm.get_opcodes() |
|
|
| |
| result = [TimedWord(word=w, start=0.0, end=0.0, confidence=0.0) for w in ref_words] |
| |
| stats = AlignmentStats(total_ref_words=len(ref_words), directly_matched=0, interpolated=0, unmatched=0) |
|
|
| for tag, i1, i2, j1, j2 in opcodes: |
| if tag == "equal": |
| |
| for asr_idx, ref_idx in zip(range(i1, i2), range(j1, j2)): |
| orig_asr_idx = asr_to_orig[asr_idx] |
| orig_ref_idx = ref_to_orig[ref_idx] |
| if orig_asr_idx < len(asr_words) and orig_ref_idx < len(result): |
| result[orig_ref_idx] = TimedWord( |
| word=ref_words[orig_ref_idx], |
| start=asr_words[orig_asr_idx].start, |
| end=asr_words[orig_asr_idx].end, |
| confidence=asr_words[orig_asr_idx].confidence, |
| ) |
| stats.directly_matched += 1 |
|
|
| elif tag == "replace": |
| |
| orig_asr_start = asr_to_orig[i1] |
| orig_asr_end = asr_to_orig[i2 - 1] |
| t_start = asr_words[orig_asr_start].start |
| t_end = asr_words[orig_asr_end].end |
|
|
| n_ref = j2 - j1 |
| duration = t_end - t_start |
|
|
| for k, ref_idx in enumerate(range(j1, j2)): |
| orig_ref_idx = ref_to_orig[ref_idx] |
| if orig_ref_idx < len(result): |
| result[orig_ref_idx] = TimedWord( |
| word=ref_words[orig_ref_idx], |
| start=t_start + k * duration / n_ref, |
| end=t_start + (k + 1) * duration / n_ref, |
| confidence=0.5, |
| ) |
| stats.interpolated += 1 |
|
|
| elif tag == "delete": |
| |
| pass |
|
|
| elif tag == "insert": |
| |
| for ref_idx in range(j1, j2): |
| orig_ref_idx = ref_to_orig[ref_idx] |
| stats.interpolated += 1 |
| |
|
|
| |
| _fill_gaps(result) |
|
|
| |
| stats.unmatched = sum(1 for w in result if w.start == 0.0 and w.end == 0.0) |
|
|
| logger.info( |
| f"Alignment: {stats.directly_matched}/{stats.total_ref_words} direct matches " |
| f"({stats.match_rate:.1%}), {stats.interpolated} interpolated, " |
| f"{stats.unmatched} unmatched" |
| ) |
|
|
| return result, stats |
|
|
|
|
| def _build_index_map(original: list[str], normalized: list[str]) -> list[int]: |
| """ |
| Build mapping from normalized index → original index. |
| Handles cases where normalization removes words (punctuation-only tokens). |
| """ |
| mapping = [] |
| orig_idx = 0 |
| for norm_word in normalized: |
| while orig_idx < len(original): |
| if normalize_word(original[orig_idx]) == norm_word: |
| mapping.append(orig_idx) |
| orig_idx += 1 |
| break |
| orig_idx += 1 |
| return mapping |
|
|
|
|
| def _fuzzy_normalize( |
| asr_words: list[str], |
| ref_words: list[str], |
| threshold: float = 0.75, |
| ) -> list[str]: |
| """ |
| Pre-normalize ASR words to their closest reference word if similar enough. |
| This helps SequenceMatcher find more 'equal' blocks. |
| |
| Uses character-level edit distance ratio (no external dependency). |
| """ |
| ref_set = set(ref_words) |
| if not ref_set: |
| return asr_words |
|
|
| result = [] |
| for asr_w in asr_words: |
| if asr_w in ref_set: |
| result.append(asr_w) |
| continue |
|
|
| |
| best_match = asr_w |
| best_ratio = 0.0 |
|
|
| for ref_w in ref_set: |
| |
| if abs(len(asr_w) - len(ref_w)) > max(len(asr_w), len(ref_w)) * 0.4: |
| continue |
| ratio = SequenceMatcher(None, asr_w, ref_w).ratio() |
| if ratio > best_ratio: |
| best_ratio = ratio |
| best_match = ref_w |
|
|
| if best_ratio >= threshold: |
| result.append(best_match) |
| else: |
| result.append(asr_w) |
|
|
| return result |
|
|
|
|
| def _fill_gaps(words: list[TimedWord]): |
| """ |
| Fill in timestamps for words that didn't get assigned during alignment. |
| Uses linear interpolation between neighboring timed words. |
| """ |
| |
| anchors = [(i, w) for i, w in enumerate(words) if w.start > 0 or w.end > 0] |
|
|
| if not anchors: |
| return |
|
|
| |
| for gap_start_idx in range(len(words)): |
| if words[gap_start_idx].start > 0 or words[gap_start_idx].end > 0: |
| continue |
|
|
| |
| prev_anchor = None |
| next_anchor = None |
|
|
| for i, w in reversed(list(enumerate(words[:gap_start_idx]))): |
| if w.start > 0 or w.end > 0: |
| prev_anchor = (i, w) |
| break |
|
|
| for i, w in enumerate(words[gap_start_idx:], gap_start_idx): |
| if (w.start > 0 or w.end > 0) and i != gap_start_idx: |
| next_anchor = (i, w) |
| break |
|
|
| |
| if prev_anchor and next_anchor: |
| prev_end = prev_anchor[1].end |
| next_start = next_anchor[1].start |
| gap_size = next_anchor[0] - prev_anchor[0] - 1 |
| position_in_gap = gap_start_idx - prev_anchor[0] - 1 |
|
|
| if gap_size > 0: |
| t_per_word = (next_start - prev_end) / (gap_size + 1) |
| words[gap_start_idx].start = prev_end + position_in_gap * t_per_word |
| words[gap_start_idx].end = prev_end + (position_in_gap + 1) * t_per_word |
| words[gap_start_idx].confidence = 0.3 |
| elif prev_anchor: |
| |
| offset = gap_start_idx - prev_anchor[0] |
| words[gap_start_idx].start = prev_anchor[1].end + (offset - 1) * 0.3 |
| words[gap_start_idx].end = prev_anchor[1].end + offset * 0.3 |
| words[gap_start_idx].confidence = 0.2 |
| elif next_anchor: |
| |
| offset = next_anchor[0] - gap_start_idx |
| words[gap_start_idx].start = max(0.0, next_anchor[1].start - offset * 0.3) |
| words[gap_start_idx].end = max(0.0, next_anchor[1].start - (offset - 1) * 0.3) |
| words[gap_start_idx].confidence = 0.2 |
|
|
|
|
| def align_with_repeated_sections( |
| asr_words: list[TimedWord], |
| ref_words: list[str], |
| ref_lines: Optional[list[str]] = None, |
| ) -> tuple[list[TimedWord], AlignmentStats]: |
| """ |
| Enhanced alignment that handles repeated sections (chorus, verse repeats). |
| |
| Songs often have repeated lyrics (chorus appears 2-3 times). Naive global |
| alignment can misalign the second occurrence to the first's timestamps. |
| |
| Strategy: Detect repeated line groups, then align each section independently |
| using time-windowed local alignment. |
| |
| Args: |
| asr_words: Timed ASR words |
| ref_words: Full reference word list |
| ref_lines: Optional line-level structure for section detection |
| |
| Returns: |
| (aligned_words, stats) |
| """ |
| if not ref_lines: |
| |
| return align_words(asr_words, ref_words) |
|
|
| |
| sections = _detect_sections(ref_lines) |
|
|
| if not sections or len(sections) <= 1: |
| return align_words(asr_words, ref_words) |
|
|
| |
| all_aligned = [] |
| asr_cursor = 0 |
| total_stats = AlignmentStats(len(ref_words), 0, 0, 0) |
|
|
| for section_words in sections: |
| |
| ratio = len(section_words) / max(len(ref_words), 1) |
| estimated_asr_count = int(len(asr_words) * ratio * 1.3) |
|
|
| section_asr = asr_words[asr_cursor:asr_cursor + estimated_asr_count] |
| aligned_section, section_stats = align_words(section_asr, section_words) |
|
|
| all_aligned.extend(aligned_section) |
| asr_cursor += int(estimated_asr_count * 0.8) |
|
|
| total_stats.directly_matched += section_stats.directly_matched |
| total_stats.interpolated += section_stats.interpolated |
|
|
| total_stats.unmatched = total_stats.total_ref_words - total_stats.directly_matched - total_stats.interpolated |
| return all_aligned, total_stats |
|
|
|
|
| def _detect_sections(lines: list[str]) -> list[list[str]]: |
| """ |
| Detect repeated sections in lyrics and split into alignable chunks. |
| Returns list of word-lists, one per section. |
| """ |
| |
| sections = [] |
| current_section = [] |
|
|
| for line in lines: |
| if not line.strip(): |
| if current_section: |
| sections.append(current_section) |
| current_section = [] |
| else: |
| current_section.extend(line.split()) |
|
|
| if current_section: |
| sections.append(current_section) |
|
|
| return sections if len(sections) > 1 else [sum(sections, [])] |
|
|