""" Precision alignment service - Word-center-based speaker assignment. Merges word-level transcription with speaker diarization using precise timestamps. """ import logging from pathlib import Path from typing import List, Tuple, Optional from dataclasses import dataclass from app.core.config import get_settings from app.services.transcription import WordTimestamp from app.services.diarization import SpeakerSegment from app.schemas.models import TranscriptSegment logger = logging.getLogger(__name__) settings = get_settings() @dataclass class WordWithSpeaker: """A word with assigned speaker.""" word: str start: float end: float speaker: str class AlignmentService: """ Precision alignment service. Uses word-center-based algorithm for accurate speaker-to-text mapping. """ PAUSE_THRESHOLD = 0.45 CENTER_TOL = 0.15 # s (150 ms) OVERLAP_TH = 0.12 # > x% segments DIA_MERGE_GAP = 0.25 MAX_SEGMENT_DURATION = 7.5 @staticmethod def get_word_center(word: WordTimestamp) -> float: """Calculate the center time of a word.""" return (word.start + word.end) / 2 @staticmethod def overlap_ratio(w_start, w_end, s_start, s_end): overlap = max(0.0, min(w_end, s_end) - max(w_start, s_start)) dur = max(1e-6, w_end - w_start) return overlap / dur # Diarization merge @classmethod def merge_dia_segments(cls, segments: List[SpeakerSegment]) -> List[SpeakerSegment]: if not segments: return [] segments = sorted(segments, key=lambda s: s.start) merged = [segments[0]] for s in segments[1:]: p = merged[-1] if s.speaker == p.speaker and (s.start - p.end) <= cls.DIA_MERGE_GAP: p.end = s.end else: merged.append(s) return merged @classmethod def find_speaker_center( cls, time: float, speaker_segments: List[SpeakerSegment], ) -> Optional[str]: for seg in speaker_segments: if seg.start - cls.CENTER_TOL <= time <= seg.end + cls.CENTER_TOL: return seg.speaker return None @staticmethod def find_closest_speaker(time: float, speaker_segments: List[SpeakerSegment]) -> str: if not speaker_segments: return "Unknown" min_dist = float("inf") closest = "Unknown" for seg in speaker_segments: d = min(abs(time - seg.start), abs(time - seg.end)) if d < min_dist: min_dist = d closest = seg.speaker return closest @classmethod def assign_speakers_to_words( cls, words: List[WordTimestamp], speaker_segments: List[SpeakerSegment], ) -> List[WordWithSpeaker]: words = [w for w in words if w.word and w.word.strip()] if not speaker_segments: logger.warning("No diarization, fallback single speaker") return [ WordWithSpeaker(w.word, w.start, w.end, "Speaker 1") for w in words ] speaker_segments = cls.merge_dia_segments(speaker_segments) results = [] for word in words: center = cls.get_word_center(word) # 1. CENTER speaker = cls.find_speaker_center(center, speaker_segments) if speaker is None: # 2. OVERLAP best_ratio = 0 best_spk = None for seg in speaker_segments: r = cls.overlap_ratio(word.start, word.end, seg.start, seg.end) if r > best_ratio: best_ratio = r best_spk = seg.speaker if best_ratio >= cls.OVERLAP_TH: speaker = best_spk else: # 3. CLOSEST speaker = cls.find_closest_speaker(center, speaker_segments) results.append( WordWithSpeaker(word.word, word.start, word.end, speaker) ) return results @classmethod def reconstruct_segments( cls, words_with_speakers: List[WordWithSpeaker] ) -> List[TranscriptSegment]: """ Step 3d: Reconstruct sentence segments from words. Groups consecutive words of the same speaker into segments. Creates new segment when: - Speaker changes - Pause > PAUSE_THRESHOLD between words Args: words_with_speakers: List of words with speaker assignments Returns: List of TranscriptSegment with complete sentences """ if not words_with_speakers: return [] segments = [] # Start first segment current_speaker = words_with_speakers[0].speaker current_start = words_with_speakers[0].start current_end = words_with_speakers[0].end current_words = [words_with_speakers[0].word] for i in range(1, len(words_with_speakers)): word = words_with_speakers[i] prev_word = words_with_speakers[i - 1] # Calculate pause between words pause = word.start - prev_word.end # Check if we need to start a new segment speaker_changed = word.speaker != current_speaker significant_pause = pause > cls.PAUSE_THRESHOLD segment_duration = current_end - current_start too_long = segment_duration > cls.MAX_SEGMENT_DURATION and pause > 0.15 if speaker_changed or significant_pause or too_long: # Save current segment segments.append(TranscriptSegment( start=current_start, end=current_end, speaker=current_speaker, role="UNKNOWN", text=" ".join(current_words) )) # Start new segment current_speaker = word.speaker current_start = word.start current_end = word.end current_words = [word.word] else: # Continue current segment current_end = word.end current_words.append(word.word) if current_words: segments.append(TranscriptSegment( start=current_start, end=current_end, speaker=current_speaker, role="UNKNOWN", text=" ".join(current_words) )) logger.debug(f"Reconstructed {len(segments)} segments from {len(words_with_speakers)} words") return segments @classmethod def resize_and_merge_segments( cls, segments: List[TranscriptSegment] ) -> List[TranscriptSegment]: """ Merge consecutive segments of the same speaker if the gap is small. Also filters out extremely short segments. """ if not segments: return [] # Filter 1: Remove extremely short blips (noise) segments = [s for s in segments if (s.end - s.start) >= settings.min_segment_duration_s] if not segments: return [] merged = [] curr = segments[0] for i in range(1, len(segments)): next_seg = segments[i] # If same speaker and gap is small, merge gap = next_seg.start - curr.end if next_seg.speaker == curr.speaker and gap < settings.merge_threshold_s: curr.end = next_seg.end curr.text += " " + next_seg.text else: merged.append(curr) curr = next_seg merged.append(curr) logger.debug(f"Merged segments: {len(segments)} -> {len(merged)}") return merged @classmethod def align_precision( cls, words: List[WordTimestamp], speaker_segments: List[SpeakerSegment] ) -> List[TranscriptSegment]: """ Full precision alignment pipeline. Args: words: Word-level timestamps from transcription speaker_segments: Speaker segments from diarization Returns: List of TranscriptSegment with proper speaker assignments """ # Step 3c: Assign speakers to words words_with_speakers = cls.assign_speakers_to_words(words, speaker_segments) # Step 3d: Reconstruct segments segments = cls.reconstruct_segments(words_with_speakers) # Step 3e: Clustering/Merging (Optimization) segments = cls.resize_and_merge_segments(segments) return segments