lyric-sync / lyric_sync /align.py
rikhoffbauer2's picture
Upload lyric_sync/align.py
63c7ca6 verified
"""
Sequence alignment: map timed ASR transcript onto reference lyrics.
Transfers word-level timestamps from the imperfect ASR output to the correct
reference lyrics text. Handles ASR errors (substitutions, insertions, deletions)
using edit-distance-based alignment.
Approach:
1. Normalize both word sequences (lowercase, strip punctuation)
2. Compute optimal alignment using difflib SequenceMatcher (global LCS-based)
3. For 'equal' blocks: direct timestamp transfer
4. For 'replace' blocks: linear interpolation across block
5. For 'insert' blocks (ASR missed words): interpolate from neighbors
6. For 'delete' blocks (ASR hallucinated): skip
Optional enhancement: Use rapidfuzz for phonetic fuzzy matching before structural
alignment to handle common ASR phonetic errors ("gonna"→"going to", etc.)
"""
import logging
import re
import unicodedata
from dataclasses import dataclass
from difflib import SequenceMatcher
from typing import Optional
from lyric_sync.transcribe import TimedWord
logger = logging.getLogger(__name__)
@dataclass
class AlignmentStats:
"""Statistics about the alignment quality."""
total_ref_words: int
directly_matched: int # exact timestamp transfer
interpolated: int # timing estimated via interpolation
unmatched: int # no timing could be assigned
@property
def match_rate(self) -> float:
"""Fraction of reference words with direct ASR timestamp matches."""
if self.total_ref_words == 0:
return 0.0
return self.directly_matched / self.total_ref_words
@property
def coverage(self) -> float:
"""Fraction of reference words that got any timing (direct or interpolated)."""
if self.total_ref_words == 0:
return 0.0
return (self.directly_matched + self.interpolated) / self.total_ref_words
def normalize_word(word: str) -> str:
"""
Normalize a word for alignment matching.
Lowercase, strip punctuation, normalize unicode, expand contractions.
"""
# Unicode normalize
word = unicodedata.normalize("NFKD", word)
# Lowercase
word = word.lower()
# Strip common punctuation (preserve apostrophes for contractions)
word = re.sub(r"[^\w']", "", word)
# Remove leading/trailing apostrophes
word = word.strip("'")
return word
def normalize_for_matching(words: list[str]) -> list[str]:
"""Normalize a word list for sequence matching."""
normalized = []
for w in words:
n = normalize_word(w)
if n: # skip empty strings from punctuation-only tokens
normalized.append(n)
return normalized
def expand_contractions(text: str) -> str:
"""Expand common English contractions for better ASR↔lyrics matching."""
contractions = {
"don't": "do not", "doesn't": "does not", "didn't": "did not",
"won't": "will not", "wouldn't": "would not", "couldn't": "could not",
"shouldn't": "should not", "can't": "cannot", "isn't": "is not",
"aren't": "are not", "wasn't": "was not", "weren't": "were not",
"haven't": "have not", "hasn't": "has not", "hadn't": "had not",
"i'm": "i am", "you're": "you are", "we're": "we are",
"they're": "they are", "he's": "he is", "she's": "she is",
"it's": "it is", "that's": "that is", "what's": "what is",
"i've": "i have", "you've": "you have", "we've": "we have",
"they've": "they have", "i'll": "i will", "you'll": "you will",
"we'll": "we will", "they'll": "they will", "he'll": "he will",
"she'll": "she will", "it'll": "it will", "let's": "let us",
"gonna": "going to", "wanna": "want to", "gotta": "got to",
"'cause": "because", "cause": "because",
}
lower = text.lower()
for contraction, expansion in contractions.items():
lower = lower.replace(contraction, expansion)
return lower
def align_words(
asr_words: list[TimedWord],
ref_words: list[str],
fuzzy_threshold: float = 0.75,
use_fuzzy_prepass: bool = True,
) -> tuple[list[TimedWord], AlignmentStats]:
"""
Align ASR-timed words onto reference lyrics, transferring timestamps.
This is the core alignment function. It handles:
- Exact matches (direct timestamp transfer)
- Substitution errors (phonetic variants, interpolation)
- Insertions in ASR (hallucinated words, skipped)
- Deletions in ASR (missed words, timestamps interpolated)
Args:
asr_words: Word-level timed transcript from ASR
ref_words: Reference (correct) lyrics word list
fuzzy_threshold: Minimum similarity for fuzzy pre-matching (0-1)
use_fuzzy_prepass: Whether to fuzzy-normalize ASR words before alignment
Returns:
(aligned_words, stats) — ref_words with timestamps, and quality metrics
"""
if not asr_words or not ref_words:
return [], AlignmentStats(len(ref_words), 0, 0, len(ref_words))
# Normalize both sequences for matching
asr_normalized = normalize_for_matching([w.word for w in asr_words])
ref_normalized = normalize_for_matching(ref_words)
# Build index mapping: normalized position → original position
# (normalization may remove empty-string words from punctuation-only tokens)
asr_to_orig = _build_index_map([w.word for w in asr_words], asr_normalized)
ref_to_orig = _build_index_map(ref_words, ref_normalized)
# Optional fuzzy pre-pass: try to pre-match phonetically similar words
if use_fuzzy_prepass:
asr_normalized = _fuzzy_normalize(asr_normalized, ref_normalized, fuzzy_threshold)
# Compute alignment using SequenceMatcher (LCS-based global alignment)
sm = SequenceMatcher(None, asr_normalized, ref_normalized, autojunk=False)
opcodes = sm.get_opcodes()
# Initialize result with None timestamps
result = [TimedWord(word=w, start=0.0, end=0.0, confidence=0.0) for w in ref_words]
stats = AlignmentStats(total_ref_words=len(ref_words), directly_matched=0, interpolated=0, unmatched=0)
for tag, i1, i2, j1, j2 in opcodes:
if tag == "equal":
# Direct timestamp transfer — highest confidence
for asr_idx, ref_idx in zip(range(i1, i2), range(j1, j2)):
orig_asr_idx = asr_to_orig[asr_idx]
orig_ref_idx = ref_to_orig[ref_idx]
if orig_asr_idx < len(asr_words) and orig_ref_idx < len(result):
result[orig_ref_idx] = TimedWord(
word=ref_words[orig_ref_idx],
start=asr_words[orig_asr_idx].start,
end=asr_words[orig_asr_idx].end,
confidence=asr_words[orig_asr_idx].confidence,
)
stats.directly_matched += 1
elif tag == "replace":
# ASR has different words — interpolate timestamps across the block
orig_asr_start = asr_to_orig[i1]
orig_asr_end = asr_to_orig[i2 - 1]
t_start = asr_words[orig_asr_start].start
t_end = asr_words[orig_asr_end].end
n_ref = j2 - j1
duration = t_end - t_start
for k, ref_idx in enumerate(range(j1, j2)):
orig_ref_idx = ref_to_orig[ref_idx]
if orig_ref_idx < len(result):
result[orig_ref_idx] = TimedWord(
word=ref_words[orig_ref_idx],
start=t_start + k * duration / n_ref,
end=t_start + (k + 1) * duration / n_ref,
confidence=0.5, # interpolated = lower confidence
)
stats.interpolated += 1
elif tag == "delete":
# ASR produced words not in reference — skip them
pass
elif tag == "insert":
# Reference has words ASR missed — interpolate from context
for ref_idx in range(j1, j2):
orig_ref_idx = ref_to_orig[ref_idx]
stats.interpolated += 1
# Will be filled in the gap-filling pass below
# Gap-filling pass: interpolate timestamps for any words still at (0, 0)
_fill_gaps(result)
# Count unmatched
stats.unmatched = sum(1 for w in result if w.start == 0.0 and w.end == 0.0)
logger.info(
f"Alignment: {stats.directly_matched}/{stats.total_ref_words} direct matches "
f"({stats.match_rate:.1%}), {stats.interpolated} interpolated, "
f"{stats.unmatched} unmatched"
)
return result, stats
def _build_index_map(original: list[str], normalized: list[str]) -> list[int]:
"""
Build mapping from normalized index → original index.
Handles cases where normalization removes words (punctuation-only tokens).
"""
mapping = []
orig_idx = 0
for norm_word in normalized:
while orig_idx < len(original):
if normalize_word(original[orig_idx]) == norm_word:
mapping.append(orig_idx)
orig_idx += 1
break
orig_idx += 1
return mapping
def _fuzzy_normalize(
asr_words: list[str],
ref_words: list[str],
threshold: float = 0.75,
) -> list[str]:
"""
Pre-normalize ASR words to their closest reference word if similar enough.
This helps SequenceMatcher find more 'equal' blocks.
Uses character-level edit distance ratio (no external dependency).
"""
ref_set = set(ref_words)
if not ref_set:
return asr_words
result = []
for asr_w in asr_words:
if asr_w in ref_set:
result.append(asr_w)
continue
# Find closest reference word by edit distance
best_match = asr_w
best_ratio = 0.0
for ref_w in ref_set:
# Quick length filter
if abs(len(asr_w) - len(ref_w)) > max(len(asr_w), len(ref_w)) * 0.4:
continue
ratio = SequenceMatcher(None, asr_w, ref_w).ratio()
if ratio > best_ratio:
best_ratio = ratio
best_match = ref_w
if best_ratio >= threshold:
result.append(best_match)
else:
result.append(asr_w)
return result
def _fill_gaps(words: list[TimedWord]):
"""
Fill in timestamps for words that didn't get assigned during alignment.
Uses linear interpolation between neighboring timed words.
"""
# Find anchor points (words with valid timestamps)
anchors = [(i, w) for i, w in enumerate(words) if w.start > 0 or w.end > 0]
if not anchors:
return
# Fill gaps between anchors
for gap_start_idx in range(len(words)):
if words[gap_start_idx].start > 0 or words[gap_start_idx].end > 0:
continue
# Find surrounding anchors
prev_anchor = None
next_anchor = None
for i, w in reversed(list(enumerate(words[:gap_start_idx]))):
if w.start > 0 or w.end > 0:
prev_anchor = (i, w)
break
for i, w in enumerate(words[gap_start_idx:], gap_start_idx):
if (w.start > 0 or w.end > 0) and i != gap_start_idx:
next_anchor = (i, w)
break
# Interpolate
if prev_anchor and next_anchor:
prev_end = prev_anchor[1].end
next_start = next_anchor[1].start
gap_size = next_anchor[0] - prev_anchor[0] - 1
position_in_gap = gap_start_idx - prev_anchor[0] - 1
if gap_size > 0:
t_per_word = (next_start - prev_end) / (gap_size + 1)
words[gap_start_idx].start = prev_end + position_in_gap * t_per_word
words[gap_start_idx].end = prev_end + (position_in_gap + 1) * t_per_word
words[gap_start_idx].confidence = 0.3
elif prev_anchor:
# After last anchor — estimate ~0.3s per word
offset = gap_start_idx - prev_anchor[0]
words[gap_start_idx].start = prev_anchor[1].end + (offset - 1) * 0.3
words[gap_start_idx].end = prev_anchor[1].end + offset * 0.3
words[gap_start_idx].confidence = 0.2
elif next_anchor:
# Before first anchor — estimate backwards
offset = next_anchor[0] - gap_start_idx
words[gap_start_idx].start = max(0.0, next_anchor[1].start - offset * 0.3)
words[gap_start_idx].end = max(0.0, next_anchor[1].start - (offset - 1) * 0.3)
words[gap_start_idx].confidence = 0.2
def align_with_repeated_sections(
asr_words: list[TimedWord],
ref_words: list[str],
ref_lines: Optional[list[str]] = None,
) -> tuple[list[TimedWord], AlignmentStats]:
"""
Enhanced alignment that handles repeated sections (chorus, verse repeats).
Songs often have repeated lyrics (chorus appears 2-3 times). Naive global
alignment can misalign the second occurrence to the first's timestamps.
Strategy: Detect repeated line groups, then align each section independently
using time-windowed local alignment.
Args:
asr_words: Timed ASR words
ref_words: Full reference word list
ref_lines: Optional line-level structure for section detection
Returns:
(aligned_words, stats)
"""
if not ref_lines:
# Fall back to simple alignment
return align_words(asr_words, ref_words)
# Detect repeated sections
sections = _detect_sections(ref_lines)
if not sections or len(sections) <= 1:
return align_words(asr_words, ref_words)
# Align each section independently with time windows
all_aligned = []
asr_cursor = 0
total_stats = AlignmentStats(len(ref_words), 0, 0, 0)
for section_words in sections:
# Estimate how many ASR words correspond to this section
ratio = len(section_words) / max(len(ref_words), 1)
estimated_asr_count = int(len(asr_words) * ratio * 1.3) # 30% margin
section_asr = asr_words[asr_cursor:asr_cursor + estimated_asr_count]
aligned_section, section_stats = align_words(section_asr, section_words)
all_aligned.extend(aligned_section)
asr_cursor += int(estimated_asr_count * 0.8) # advance with some overlap
total_stats.directly_matched += section_stats.directly_matched
total_stats.interpolated += section_stats.interpolated
total_stats.unmatched = total_stats.total_ref_words - total_stats.directly_matched - total_stats.interpolated
return all_aligned, total_stats
def _detect_sections(lines: list[str]) -> list[list[str]]:
"""
Detect repeated sections in lyrics and split into alignable chunks.
Returns list of word-lists, one per section.
"""
# Simple heuristic: split on blank lines or repeated line groups
sections = []
current_section = []
for line in lines:
if not line.strip():
if current_section:
sections.append(current_section)
current_section = []
else:
current_section.extend(line.split())
if current_section:
sections.append(current_section)
return sections if len(sections) > 1 else [sum(sections, [])]