""" Sequence alignment: map timed ASR transcript onto reference lyrics. Transfers word-level timestamps from the imperfect ASR output to the correct reference lyrics text. Handles ASR errors (substitutions, insertions, deletions) using edit-distance-based alignment. Approach: 1. Normalize both word sequences (lowercase, strip punctuation) 2. Compute optimal alignment using difflib SequenceMatcher (global LCS-based) 3. For 'equal' blocks: direct timestamp transfer 4. For 'replace' blocks: linear interpolation across block 5. For 'insert' blocks (ASR missed words): interpolate from neighbors 6. For 'delete' blocks (ASR hallucinated): skip Optional enhancement: Use rapidfuzz for phonetic fuzzy matching before structural alignment to handle common ASR phonetic errors ("gonna"→"going to", etc.) """ import logging import re import unicodedata from dataclasses import dataclass from difflib import SequenceMatcher from typing import Optional from lyric_sync.transcribe import TimedWord logger = logging.getLogger(__name__) @dataclass class AlignmentStats: """Statistics about the alignment quality.""" total_ref_words: int directly_matched: int # exact timestamp transfer interpolated: int # timing estimated via interpolation unmatched: int # no timing could be assigned @property def match_rate(self) -> float: """Fraction of reference words with direct ASR timestamp matches.""" if self.total_ref_words == 0: return 0.0 return self.directly_matched / self.total_ref_words @property def coverage(self) -> float: """Fraction of reference words that got any timing (direct or interpolated).""" if self.total_ref_words == 0: return 0.0 return (self.directly_matched + self.interpolated) / self.total_ref_words def normalize_word(word: str) -> str: """ Normalize a word for alignment matching. Lowercase, strip punctuation, normalize unicode, expand contractions. """ # Unicode normalize word = unicodedata.normalize("NFKD", word) # Lowercase word = word.lower() # Strip common punctuation (preserve apostrophes for contractions) word = re.sub(r"[^\w']", "", word) # Remove leading/trailing apostrophes word = word.strip("'") return word def normalize_for_matching(words: list[str]) -> list[str]: """Normalize a word list for sequence matching.""" normalized = [] for w in words: n = normalize_word(w) if n: # skip empty strings from punctuation-only tokens normalized.append(n) return normalized def expand_contractions(text: str) -> str: """Expand common English contractions for better ASR↔lyrics matching.""" contractions = { "don't": "do not", "doesn't": "does not", "didn't": "did not", "won't": "will not", "wouldn't": "would not", "couldn't": "could not", "shouldn't": "should not", "can't": "cannot", "isn't": "is not", "aren't": "are not", "wasn't": "was not", "weren't": "were not", "haven't": "have not", "hasn't": "has not", "hadn't": "had not", "i'm": "i am", "you're": "you are", "we're": "we are", "they're": "they are", "he's": "he is", "she's": "she is", "it's": "it is", "that's": "that is", "what's": "what is", "i've": "i have", "you've": "you have", "we've": "we have", "they've": "they have", "i'll": "i will", "you'll": "you will", "we'll": "we will", "they'll": "they will", "he'll": "he will", "she'll": "she will", "it'll": "it will", "let's": "let us", "gonna": "going to", "wanna": "want to", "gotta": "got to", "'cause": "because", "cause": "because", } lower = text.lower() for contraction, expansion in contractions.items(): lower = lower.replace(contraction, expansion) return lower def align_words( asr_words: list[TimedWord], ref_words: list[str], fuzzy_threshold: float = 0.75, use_fuzzy_prepass: bool = True, ) -> tuple[list[TimedWord], AlignmentStats]: """ Align ASR-timed words onto reference lyrics, transferring timestamps. This is the core alignment function. It handles: - Exact matches (direct timestamp transfer) - Substitution errors (phonetic variants, interpolation) - Insertions in ASR (hallucinated words, skipped) - Deletions in ASR (missed words, timestamps interpolated) Args: asr_words: Word-level timed transcript from ASR ref_words: Reference (correct) lyrics word list fuzzy_threshold: Minimum similarity for fuzzy pre-matching (0-1) use_fuzzy_prepass: Whether to fuzzy-normalize ASR words before alignment Returns: (aligned_words, stats) — ref_words with timestamps, and quality metrics """ if not asr_words or not ref_words: return [], AlignmentStats(len(ref_words), 0, 0, len(ref_words)) # Normalize both sequences for matching asr_normalized = normalize_for_matching([w.word for w in asr_words]) ref_normalized = normalize_for_matching(ref_words) # Build index mapping: normalized position → original position # (normalization may remove empty-string words from punctuation-only tokens) asr_to_orig = _build_index_map([w.word for w in asr_words], asr_normalized) ref_to_orig = _build_index_map(ref_words, ref_normalized) # Optional fuzzy pre-pass: try to pre-match phonetically similar words if use_fuzzy_prepass: asr_normalized = _fuzzy_normalize(asr_normalized, ref_normalized, fuzzy_threshold) # Compute alignment using SequenceMatcher (LCS-based global alignment) sm = SequenceMatcher(None, asr_normalized, ref_normalized, autojunk=False) opcodes = sm.get_opcodes() # Initialize result with None timestamps result = [TimedWord(word=w, start=0.0, end=0.0, confidence=0.0) for w in ref_words] stats = AlignmentStats(total_ref_words=len(ref_words), directly_matched=0, interpolated=0, unmatched=0) for tag, i1, i2, j1, j2 in opcodes: if tag == "equal": # Direct timestamp transfer — highest confidence for asr_idx, ref_idx in zip(range(i1, i2), range(j1, j2)): orig_asr_idx = asr_to_orig[asr_idx] orig_ref_idx = ref_to_orig[ref_idx] if orig_asr_idx < len(asr_words) and orig_ref_idx < len(result): result[orig_ref_idx] = TimedWord( word=ref_words[orig_ref_idx], start=asr_words[orig_asr_idx].start, end=asr_words[orig_asr_idx].end, confidence=asr_words[orig_asr_idx].confidence, ) stats.directly_matched += 1 elif tag == "replace": # ASR has different words — interpolate timestamps across the block orig_asr_start = asr_to_orig[i1] orig_asr_end = asr_to_orig[i2 - 1] t_start = asr_words[orig_asr_start].start t_end = asr_words[orig_asr_end].end n_ref = j2 - j1 duration = t_end - t_start for k, ref_idx in enumerate(range(j1, j2)): orig_ref_idx = ref_to_orig[ref_idx] if orig_ref_idx < len(result): result[orig_ref_idx] = TimedWord( word=ref_words[orig_ref_idx], start=t_start + k * duration / n_ref, end=t_start + (k + 1) * duration / n_ref, confidence=0.5, # interpolated = lower confidence ) stats.interpolated += 1 elif tag == "delete": # ASR produced words not in reference — skip them pass elif tag == "insert": # Reference has words ASR missed — interpolate from context for ref_idx in range(j1, j2): orig_ref_idx = ref_to_orig[ref_idx] stats.interpolated += 1 # Will be filled in the gap-filling pass below # Gap-filling pass: interpolate timestamps for any words still at (0, 0) _fill_gaps(result) # Count unmatched stats.unmatched = sum(1 for w in result if w.start == 0.0 and w.end == 0.0) logger.info( f"Alignment: {stats.directly_matched}/{stats.total_ref_words} direct matches " f"({stats.match_rate:.1%}), {stats.interpolated} interpolated, " f"{stats.unmatched} unmatched" ) return result, stats def _build_index_map(original: list[str], normalized: list[str]) -> list[int]: """ Build mapping from normalized index → original index. Handles cases where normalization removes words (punctuation-only tokens). """ mapping = [] orig_idx = 0 for norm_word in normalized: while orig_idx < len(original): if normalize_word(original[orig_idx]) == norm_word: mapping.append(orig_idx) orig_idx += 1 break orig_idx += 1 return mapping def _fuzzy_normalize( asr_words: list[str], ref_words: list[str], threshold: float = 0.75, ) -> list[str]: """ Pre-normalize ASR words to their closest reference word if similar enough. This helps SequenceMatcher find more 'equal' blocks. Uses character-level edit distance ratio (no external dependency). """ ref_set = set(ref_words) if not ref_set: return asr_words result = [] for asr_w in asr_words: if asr_w in ref_set: result.append(asr_w) continue # Find closest reference word by edit distance best_match = asr_w best_ratio = 0.0 for ref_w in ref_set: # Quick length filter if abs(len(asr_w) - len(ref_w)) > max(len(asr_w), len(ref_w)) * 0.4: continue ratio = SequenceMatcher(None, asr_w, ref_w).ratio() if ratio > best_ratio: best_ratio = ratio best_match = ref_w if best_ratio >= threshold: result.append(best_match) else: result.append(asr_w) return result def _fill_gaps(words: list[TimedWord]): """ Fill in timestamps for words that didn't get assigned during alignment. Uses linear interpolation between neighboring timed words. """ # Find anchor points (words with valid timestamps) anchors = [(i, w) for i, w in enumerate(words) if w.start > 0 or w.end > 0] if not anchors: return # Fill gaps between anchors for gap_start_idx in range(len(words)): if words[gap_start_idx].start > 0 or words[gap_start_idx].end > 0: continue # Find surrounding anchors prev_anchor = None next_anchor = None for i, w in reversed(list(enumerate(words[:gap_start_idx]))): if w.start > 0 or w.end > 0: prev_anchor = (i, w) break for i, w in enumerate(words[gap_start_idx:], gap_start_idx): if (w.start > 0 or w.end > 0) and i != gap_start_idx: next_anchor = (i, w) break # Interpolate if prev_anchor and next_anchor: prev_end = prev_anchor[1].end next_start = next_anchor[1].start gap_size = next_anchor[0] - prev_anchor[0] - 1 position_in_gap = gap_start_idx - prev_anchor[0] - 1 if gap_size > 0: t_per_word = (next_start - prev_end) / (gap_size + 1) words[gap_start_idx].start = prev_end + position_in_gap * t_per_word words[gap_start_idx].end = prev_end + (position_in_gap + 1) * t_per_word words[gap_start_idx].confidence = 0.3 elif prev_anchor: # After last anchor — estimate ~0.3s per word offset = gap_start_idx - prev_anchor[0] words[gap_start_idx].start = prev_anchor[1].end + (offset - 1) * 0.3 words[gap_start_idx].end = prev_anchor[1].end + offset * 0.3 words[gap_start_idx].confidence = 0.2 elif next_anchor: # Before first anchor — estimate backwards offset = next_anchor[0] - gap_start_idx words[gap_start_idx].start = max(0.0, next_anchor[1].start - offset * 0.3) words[gap_start_idx].end = max(0.0, next_anchor[1].start - (offset - 1) * 0.3) words[gap_start_idx].confidence = 0.2 def align_with_repeated_sections( asr_words: list[TimedWord], ref_words: list[str], ref_lines: Optional[list[str]] = None, ) -> tuple[list[TimedWord], AlignmentStats]: """ Enhanced alignment that handles repeated sections (chorus, verse repeats). Songs often have repeated lyrics (chorus appears 2-3 times). Naive global alignment can misalign the second occurrence to the first's timestamps. Strategy: Detect repeated line groups, then align each section independently using time-windowed local alignment. Args: asr_words: Timed ASR words ref_words: Full reference word list ref_lines: Optional line-level structure for section detection Returns: (aligned_words, stats) """ if not ref_lines: # Fall back to simple alignment return align_words(asr_words, ref_words) # Detect repeated sections sections = _detect_sections(ref_lines) if not sections or len(sections) <= 1: return align_words(asr_words, ref_words) # Align each section independently with time windows all_aligned = [] asr_cursor = 0 total_stats = AlignmentStats(len(ref_words), 0, 0, 0) for section_words in sections: # Estimate how many ASR words correspond to this section ratio = len(section_words) / max(len(ref_words), 1) estimated_asr_count = int(len(asr_words) * ratio * 1.3) # 30% margin section_asr = asr_words[asr_cursor:asr_cursor + estimated_asr_count] aligned_section, section_stats = align_words(section_asr, section_words) all_aligned.extend(aligned_section) asr_cursor += int(estimated_asr_count * 0.8) # advance with some overlap total_stats.directly_matched += section_stats.directly_matched total_stats.interpolated += section_stats.interpolated total_stats.unmatched = total_stats.total_ref_words - total_stats.directly_matched - total_stats.interpolated return all_aligned, total_stats def _detect_sections(lines: list[str]) -> list[list[str]]: """ Detect repeated sections in lyrics and split into alignable chunks. Returns list of word-lists, one per section. """ # Simple heuristic: split on blank lines or repeated line groups sections = [] current_section = [] for line in lines: if not line.strip(): if current_section: sections.append(current_section) current_section = [] else: current_section.extend(line.split()) if current_section: sections.append(current_section) return sections if len(sections) > 1 else [sum(sections, [])]