Spaces:
Running
on
Zero
Running
on
Zero
| """ | |
| Segment processor for VAD-based audio segmentation, ASR, and text matching. | |
| Splits audio into segments using VAD, transcribes each with Whisper, | |
| matches to verse text using phonemizer, and returns segment info. | |
| """ | |
| import sys | |
| import torch | |
| import numpy as np | |
| import librosa | |
| import librosa.core.audio # Force eager load to avoid lazy_loader bug with Gradio hot-reload | |
| from pathlib import Path | |
| from dataclasses import dataclass | |
| from typing import List, Optional, Tuple | |
| # Add paths for imports | |
| sys.path.insert(0, str(Path(__file__).parent.parent)) | |
| from config import ( | |
| PROJECT_ROOT, SEGMENTER_MODEL, WHISPER_MODEL, | |
| MIN_SILENCE_DURATION_MS, MIN_SPEECH_DURATION_MS, PAD_DURATION_MS, | |
| MIN_MATCH_SCORE, | |
| ) | |
| # Centralized phonemizer utilities (single source of truth) | |
| from utils.phonemizer_utils import ( | |
| load_phonemizer, | |
| load_surah_info, | |
| match_text_to_verse, | |
| get_total_words_for_verse_range, | |
| ) | |
| # Add data directory for recitations_segmenter | |
| DATA_PATH = PROJECT_ROOT / "data" | |
| sys.path.insert(0, str(DATA_PATH)) | |
| class VadSegment: | |
| """Raw VAD segment with just timing info (before Whisper processing).""" | |
| start_time: float | |
| end_time: float | |
| segment_idx: int | |
| class SegmentInfo: | |
| """Information about a detected speech segment.""" | |
| start_time: float | |
| end_time: float | |
| transcribed_text: str | |
| matched_text: str # The canonical text portion matched to this segment | |
| matched_ref: str # The verse reference for this segment (e.g., "1:2" or "1:2-1:3") | |
| word_start_idx: int # 0-based index of first word in this segment | |
| word_end_idx: int # 0-based index of last word (inclusive) | |
| canonical_phonemes: str | |
| match_score: float | |
| error: Optional[str] = None | |
| class SegmentationResult: | |
| """Result of segmenting and processing audio.""" | |
| segments: List[SegmentInfo] | |
| full_coverage: bool # True if segments cover all words | |
| coverage_warning: Optional[str] = None | |
| total_words: int = 0 | |
| class VadResult: | |
| """Result of VAD-only processing.""" | |
| vad_segments: List[VadSegment] | |
| audio: np.ndarray # Preprocessed audio (float32, mono) | |
| sample_rate: int | |
| canonical_words: List[str] | |
| total_words: int | |
| # Module-level caches for models (phonemizer cache is in utils/phonemizer_utils.py) | |
| _segmenter_cache = {"model": None, "processor": None, "loaded": False} | |
| _whisper_cache = {"model": None, "processor": None, "gen_config": None, "prompt_ids": None, "loaded": False} | |
| def _get_device_and_dtype(): | |
| """Get device and dtype for model loading. | |
| On HF Spaces with ZeroGPU, returns CPU to defer CUDA init | |
| until inside a @gpu_decorator function. | |
| """ | |
| from config import IS_HF_SPACE | |
| if IS_HF_SPACE: | |
| return torch.device("cpu"), torch.float32 # Defer GPU until inference | |
| device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
| dtype = torch.float16 if device.type == "cuda" else torch.float32 | |
| return device, dtype | |
| def initialize_segment_models(): | |
| """ | |
| Pre-load segmenter and whisper models at app startup. | |
| Call this during app initialization to avoid delay on first audio processing. | |
| """ | |
| print("Loading segmentation models...") | |
| _load_segmenter() | |
| _load_whisper() | |
| load_phonemizer() # Uses centralized loader from phonemizer_utils | |
| print("Segmentation models ready.") | |
| def move_segment_models_to_gpu(): | |
| """Move segmenter and whisper models to GPU. | |
| Call this inside @gpu_decorator functions on HF Spaces. | |
| On ZeroGPU, models are loaded on CPU at startup to avoid CUDA init | |
| in the main process. This function moves them to GPU when a GPU | |
| lease is active. | |
| """ | |
| if not torch.cuda.is_available(): | |
| return | |
| device = torch.device("cuda") | |
| dtype = torch.float16 | |
| # Move segmenter | |
| if _segmenter_cache["model"] is not None: | |
| model = _segmenter_cache["model"] | |
| current_device = next(model.parameters()).device | |
| if current_device.type != "cuda": | |
| model = model.to(device, dtype=dtype) | |
| _segmenter_cache["model"] = model | |
| print(f"Moved segmenter to {device}") | |
| # Move whisper | |
| if _whisper_cache["model"] is not None: | |
| model = _whisper_cache["model"] | |
| current_device = next(model.parameters()).device | |
| if current_device.type != "cuda": | |
| model = model.to(device, dtype=dtype) | |
| _whisper_cache["model"] = model | |
| print(f"Moved Whisper to {device}") | |
| # Move prompt_ids tensor if present | |
| if _whisper_cache["prompt_ids"] is not None: | |
| prompt_ids = _whisper_cache["prompt_ids"] | |
| if prompt_ids.device.type != "cuda": | |
| _whisper_cache["prompt_ids"] = prompt_ids.to(device) | |
| def _load_segmenter(): | |
| """Load the VAD segmenter model. | |
| Note: This function only loads the model, it does NOT move it between devices. | |
| Use move_segment_models_to_gpu() to move models to GPU inside GPU contexts. | |
| """ | |
| # If already loaded, just return it (don't move it) | |
| if _segmenter_cache["loaded"]: | |
| return _segmenter_cache["model"], _segmenter_cache["processor"] | |
| device, dtype = _get_device_and_dtype() | |
| try: | |
| from transformers import AutoFeatureExtractor, AutoModelForAudioFrameClassification | |
| processor = AutoFeatureExtractor.from_pretrained(SEGMENTER_MODEL) | |
| model = AutoModelForAudioFrameClassification.from_pretrained(SEGMENTER_MODEL) | |
| model.to(device, dtype=dtype) | |
| model.eval() | |
| _segmenter_cache["model"] = model | |
| _segmenter_cache["processor"] = processor | |
| _segmenter_cache["loaded"] = True | |
| print(f"β Segmenter loaded: {SEGMENTER_MODEL}") | |
| return model, processor | |
| except Exception as e: | |
| print(f"β Failed to load segmenter: {e}") | |
| return None, None | |
| def _load_whisper(): | |
| """Load the Whisper ASR model. | |
| Note: This function only loads the model, it does NOT move it between devices. | |
| Use move_segment_models_to_gpu() to move models to GPU inside GPU contexts. | |
| """ | |
| # If already loaded, just return it (don't move it) | |
| if _whisper_cache["loaded"]: | |
| return (_whisper_cache["model"], _whisper_cache["processor"], | |
| _whisper_cache["gen_config"], _whisper_cache["prompt_ids"]) | |
| device, dtype = _get_device_and_dtype() | |
| try: | |
| from transformers import AutoProcessor, AutoModelForSpeechSeq2Seq, GenerationConfig | |
| processor = AutoProcessor.from_pretrained(WHISPER_MODEL) | |
| model = AutoModelForSpeechSeq2Seq.from_pretrained( | |
| WHISPER_MODEL, torch_dtype=dtype, low_cpu_mem_usage=True | |
| ).to(device).eval() | |
| # Build generation config | |
| tok = processor.tokenizer | |
| prompt_tokens = ["<|startoftranscript|>", "<|ar|>", "<|transcribe|>", "<|notimestamps|>"] | |
| prompt_ids_list = [] | |
| for t in prompt_tokens: | |
| try: | |
| tid = tok.convert_tokens_to_ids(t) | |
| unk = getattr(tok, "unk_token_id", None) | |
| if tid is not None and (unk is None or tid != unk): | |
| prompt_ids_list.append(int(tid)) | |
| except: | |
| pass | |
| gen_config = GenerationConfig() | |
| prompt_ids_tensor = None | |
| if prompt_ids_list: | |
| prompt_ids_tensor = torch.tensor(prompt_ids_list, dtype=torch.long, device=device) | |
| _whisper_cache["model"] = model | |
| _whisper_cache["processor"] = processor | |
| _whisper_cache["gen_config"] = gen_config | |
| _whisper_cache["prompt_ids"] = prompt_ids_tensor | |
| _whisper_cache["loaded"] = True | |
| print(f"β Whisper loaded: {WHISPER_MODEL}") | |
| return model, processor, gen_config, prompt_ids_tensor | |
| except Exception as e: | |
| print(f"β Failed to load Whisper: {e}") | |
| return None, None, None, None | |
| def _detect_speech_segments(audio: np.ndarray, sample_rate: int) -> List[Tuple[float, float]]: | |
| """ | |
| Detect speech segments in audio using VAD. | |
| Args: | |
| audio: Audio waveform (mono, float32) | |
| sample_rate: Sample rate of audio | |
| Returns: | |
| List of (start_time, end_time) tuples in seconds | |
| """ | |
| model, processor = _load_segmenter() | |
| if model is None: | |
| return [(0, len(audio) / sample_rate)] # Fallback: treat whole audio as one segment | |
| try: | |
| from recitations_segmenter import segment_recitations, clean_speech_intervals | |
| # Resample to 16kHz if needed (segmenter expects 16kHz) | |
| if sample_rate != 16000: | |
| audio = librosa.resample(audio, orig_sr=sample_rate, target_sr=16000) | |
| sample_rate = 16000 | |
| device = next(model.parameters()).device | |
| dtype = next(model.parameters()).dtype | |
| # Convert numpy array to torch tensor (segmenter expects tensors) | |
| audio_tensor = torch.from_numpy(audio).float() | |
| # Segment the audio | |
| outputs = segment_recitations( | |
| [audio_tensor], # List of waveforms as tensors | |
| model, | |
| processor, | |
| device=device, | |
| dtype=dtype, | |
| batch_size=1, | |
| ) | |
| if not outputs: | |
| return [(0, len(audio) / sample_rate)] | |
| # Clean speech intervals | |
| clean_out = clean_speech_intervals( | |
| outputs[0].speech_intervals, | |
| outputs[0].is_complete, | |
| min_silence_duration_ms=MIN_SILENCE_DURATION_MS, | |
| min_speech_duration_ms=MIN_SPEECH_DURATION_MS, | |
| pad_duration_ms=PAD_DURATION_MS, | |
| return_seconds=True, | |
| ) | |
| # Convert to list of tuples | |
| intervals = clean_out.clean_speech_intervals.tolist() | |
| return [(start, end) for start, end in intervals] | |
| except Exception as e: | |
| print(f"VAD error: {e}, using full audio as single segment") | |
| import traceback | |
| traceback.print_exc() | |
| return [(0, len(audio) / sample_rate)] | |
| def _transcribe_segment(audio: np.ndarray, sample_rate: int) -> str: | |
| """ | |
| Transcribe an audio segment to Arabic text using Whisper. | |
| Args: | |
| audio: Audio waveform (mono, float32) | |
| sample_rate: Sample rate | |
| Returns: | |
| Transcribed Arabic text | |
| """ | |
| model, processor, gen_config, prompt_ids = _load_whisper() | |
| if model is None: | |
| return "" | |
| try: | |
| # Resample to 16kHz if needed | |
| if sample_rate != 16000: | |
| audio = librosa.resample(audio, orig_sr=sample_rate, target_sr=16000) | |
| device = next(model.parameters()).device | |
| dtype = model.dtype | |
| # Process audio | |
| feats = processor(audio=audio, sampling_rate=16000, return_tensors="pt")["input_features"] | |
| feats = feats.to(device=device, dtype=dtype) | |
| # Generate transcription | |
| with torch.no_grad(): | |
| if prompt_ids is not None: | |
| out_ids = model.generate( | |
| feats, | |
| prompt_ids=prompt_ids, | |
| generation_config=gen_config, | |
| max_new_tokens=200, | |
| do_sample=False, | |
| num_beams=1, | |
| ) | |
| else: | |
| out_ids = model.generate( | |
| feats, | |
| generation_config=gen_config, | |
| max_new_tokens=200, | |
| do_sample=False, | |
| num_beams=1, | |
| ) | |
| text = processor.batch_decode(out_ids, skip_special_tokens=True)[0].strip() | |
| return text | |
| except Exception as e: | |
| print(f"Whisper transcription error: {e}") | |
| return "" | |
| def transcribe_segments_batched( | |
| segment_audios: List[np.ndarray], | |
| sample_rate: int | |
| ) -> List[str]: | |
| """ | |
| Transcribe multiple audio segments in a single batched Whisper call. | |
| Args: | |
| segment_audios: List of audio waveforms (mono, float32) | |
| sample_rate: Sample rate of audio | |
| Returns: | |
| List of transcribed Arabic texts (one per segment) | |
| """ | |
| import time | |
| if not segment_audios: | |
| return [] | |
| model, processor, gen_config, prompt_ids = _load_whisper() | |
| if model is None: | |
| return [""] * len(segment_audios) | |
| try: | |
| batch_start = time.time() | |
| # Collect segment duration stats | |
| segment_lengths = [len(audio) for audio in segment_audios] | |
| segment_durations = [length / sample_rate for length in segment_lengths] | |
| min_dur, max_dur = min(segment_durations), max(segment_durations) | |
| avg_dur = sum(segment_durations) / len(segment_durations) | |
| total_audio_dur = sum(segment_durations) | |
| # Resample all to 16kHz if needed | |
| resampled = [] | |
| for audio in segment_audios: | |
| if sample_rate != 16000: | |
| audio = librosa.resample(audio, orig_sr=sample_rate, target_sr=16000) | |
| resampled.append(audio) | |
| device = next(model.parameters()).device | |
| dtype = model.dtype | |
| # Process all audios - Whisper processor handles batching | |
| # Each audio becomes a feature tensor | |
| batch_features = [] | |
| for audio in resampled: | |
| feats = processor(audio=audio, sampling_rate=16000, return_tensors="pt")["input_features"] | |
| batch_features.append(feats) | |
| # Stack into batch (all Whisper features are same size: 80 x 3000) | |
| batch_input = torch.cat(batch_features, dim=0).to(device=device, dtype=dtype) | |
| # Create attention mask (all ones - no padding needed for Whisper mel features) | |
| attention_mask = torch.ones(batch_input.shape[:2], dtype=torch.long, device=device) | |
| # Generate transcriptions in batch | |
| # Note: prompt_ids doesn't work well with batched inference due to dimension issues | |
| # For batched mode, we use standard generation without custom prompts | |
| inference_start = time.time() | |
| with torch.no_grad(): | |
| out_ids = model.generate( | |
| batch_input, | |
| attention_mask=attention_mask, | |
| generation_config=gen_config, | |
| max_new_tokens=200, | |
| do_sample=False, | |
| num_beams=1, | |
| ) | |
| inference_time = time.time() - inference_start | |
| # Decode all transcriptions | |
| texts = processor.batch_decode(out_ids, skip_special_tokens=True) | |
| texts = [t.strip() for t in texts] | |
| batch_time = time.time() - batch_start | |
| # Calculate efficiency stats | |
| # Whisper's mel spectrogram clips to 30s max, compute padding waste for shorter segments | |
| whisper_window_s = 30.0 | |
| padded_total = len(segment_audios) * whisper_window_s | |
| padding_waste = padded_total - total_audio_dur | |
| padding_pct = (padding_waste / padded_total) * 100 if padded_total > 0 else 0 | |
| # Get GPU memory stats (if CUDA available) | |
| mem_allocated = 0 | |
| mem_reserved = 0 | |
| mem_peak = 0 | |
| gpu_name = "CPU" | |
| if torch.cuda.is_available() and device.type == "cuda": | |
| torch.cuda.synchronize() | |
| mem_allocated = torch.cuda.memory_allocated(device) / (1024**3) # GB | |
| mem_reserved = torch.cuda.memory_reserved(device) / (1024**3) # GB | |
| mem_peak = torch.cuda.max_memory_allocated(device) / (1024**3) # GB | |
| try: | |
| gpu_name = torch.cuda.get_device_name(device) | |
| except: | |
| gpu_name = "GPU" | |
| # Log detailed stats | |
| print(f"[BATCHED WHISPER] ββββββββββββββββββββββββββββββββββββββ") | |
| print(f" Segments: {len(segment_audios)} | Total audio: {total_audio_dur:.2f}s") | |
| print(f" Duration range: {min_dur:.2f}s - {max_dur:.2f}s (avg: {avg_dur:.2f}s)") | |
| print(f" Padding waste: {padding_waste:.2f}s ({padding_pct:.1f}% of batch capacity)") | |
| print(f" Inference time: {inference_time:.2f}s | Total time: {batch_time:.2f}s") | |
| print(f" Throughput: {total_audio_dur / batch_time:.2f}x realtime") | |
| if device.type == "cuda": | |
| print(f" GPU: {gpu_name}") | |
| print(f" Memory: {mem_allocated:.2f}GB allocated | {mem_reserved:.2f}GB reserved | {mem_peak:.2f}GB peak") | |
| print(f"[BATCHED WHISPER] ββββββββββββββββββββββββββββββββββββββ") | |
| return texts | |
| except Exception as e: | |
| print(f"Batched Whisper error: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| return [""] * len(segment_audios) | |
| def _match_text_to_verse(transcribed_text: str, verse_ref: str) -> Tuple[str, str, float, str]: | |
| """ | |
| Match transcribed text to verse using phonemizer. | |
| This is a thin wrapper around the centralized match_text_to_verse() function | |
| from phonemizer_utils, kept for backwards compatibility with existing code. | |
| Args: | |
| transcribed_text: Arabic text from ASR | |
| verse_ref: Verse reference (e.g., "1:2") | |
| Returns: | |
| Tuple of (matched_text, phonemes, match_score, matched_ref) | |
| """ | |
| # Use centralized helper from phonemizer_utils | |
| return match_text_to_verse(transcribed_text, verse_ref, stops=["compulsory_stop"]) | |
| def _parse_ref_range(ref: str) -> Tuple[int, int, int, int]: | |
| """ | |
| Parse verse reference to get chapter, start verse, end verse. | |
| Args: | |
| ref: Reference like "1:2" or "1:2-1:5" | |
| Returns: | |
| Tuple of (chapter, start_verse, end_verse) | |
| """ | |
| if '-' in ref: | |
| start_ref, end_ref = ref.split('-') | |
| start_ch, start_v = map(int, start_ref.split(':')) | |
| end_ch, end_v = map(int, end_ref.split(':')) | |
| return start_ch, start_v, end_ch, end_v | |
| else: | |
| ch, v = map(int, ref.split(':')) | |
| return ch, v, ch, v | |
| def _parse_word_indices_from_ref(matched_ref: str, verse_ref: str) -> Tuple[Optional[int], Optional[int]]: | |
| """ | |
| Parse word indices from phonemizer reference and convert to global indices. | |
| The phonemizer returns verse:word references (1-based within each verse), | |
| but we need global word indices (0-based across all verses in the selection). | |
| Example: matched_ref="112:1:1-112:2:2" for selection "112:1-112:4" | |
| - Verse 112:1 has 4 words, verse 112:2 has 2 words | |
| - "112:1:1" = global index 0 (first word of verse 1) | |
| - "112:2:2" = global index 5 (4 words from verse 1 + word 2 of verse 2) | |
| Args: | |
| matched_ref: Reference like "112:1:1-112:2:2" (surah:verse:word_start-surah:verse:word_end) | |
| or "47:38:5" (single word) or "47:38" (just verse, no word indices) | |
| verse_ref: The full verse reference for the selection (e.g., "112:1-112:4") | |
| Returns: | |
| Tuple of (start_word_0based, end_word_0based) or (None, None) if no word indices | |
| """ | |
| if not matched_ref or ':' not in matched_ref: | |
| return None, None | |
| # Load surah info to get word counts per verse (uses centralized loader) | |
| surah_info = load_surah_info() | |
| if not surah_info: | |
| return None, None | |
| # Parse the verse_ref to get the starting verse | |
| try: | |
| if '-' in verse_ref: | |
| start_verse_ref = verse_ref.split('-')[0] | |
| else: | |
| start_verse_ref = verse_ref | |
| if ':' not in start_verse_ref: | |
| # Whole chapter, starts at verse 1 | |
| selection_start_surah = int(start_verse_ref) | |
| selection_start_verse = 1 | |
| else: | |
| selection_start_surah, selection_start_verse = map(int, start_verse_ref.split(':')) | |
| except Exception: | |
| return None, None | |
| # Parse matched_ref to get verse:word positions | |
| if '-' in matched_ref: | |
| # Range format: "112:1:1-112:2:2" | |
| start_part, end_part = matched_ref.split('-') | |
| start_parts = start_part.split(':') | |
| end_parts = end_part.split(':') | |
| if len(start_parts) >= 3 and len(end_parts) >= 3: | |
| try: | |
| start_surah = int(start_parts[0]) | |
| start_verse = int(start_parts[1]) | |
| start_word_in_verse = int(start_parts[2]) | |
| end_surah = int(end_parts[0]) | |
| end_verse = int(end_parts[1]) | |
| end_word_in_verse = int(end_parts[2]) | |
| # Convert to global 0-based indices | |
| start_global = _verse_word_to_global_index( | |
| start_surah, start_verse, start_word_in_verse, | |
| selection_start_surah, selection_start_verse, surah_info | |
| ) | |
| end_global = _verse_word_to_global_index( | |
| end_surah, end_verse, end_word_in_verse, | |
| selection_start_surah, selection_start_verse, surah_info | |
| ) | |
| if start_global is not None and end_global is not None: | |
| return start_global, end_global | |
| except ValueError: | |
| return None, None | |
| else: | |
| # Single reference: "47:38:1" or "47:38" | |
| parts = matched_ref.split(':') | |
| if len(parts) >= 3: | |
| try: | |
| surah = int(parts[0]) | |
| verse = int(parts[1]) | |
| word_in_verse = int(parts[2]) | |
| global_idx = _verse_word_to_global_index( | |
| surah, verse, word_in_verse, | |
| selection_start_surah, selection_start_verse, surah_info | |
| ) | |
| if global_idx is not None: | |
| return global_idx, global_idx | |
| except ValueError: | |
| return None, None | |
| return None, None | |
| def _verse_word_to_global_index( | |
| target_surah: int, | |
| target_verse: int, | |
| word_in_verse: int, | |
| selection_start_surah: int, | |
| selection_start_verse: int, | |
| surah_info: dict | |
| ) -> Optional[int]: | |
| """ | |
| Convert a verse:word reference (1-based) to a global word index (0-based). | |
| Args: | |
| target_surah: Surah number of the target word | |
| target_verse: Verse number of the target word | |
| word_in_verse: Word index within the verse (1-based) | |
| selection_start_surah: Surah number where the selection starts | |
| selection_start_verse: Verse number where the selection starts | |
| surah_info: Surah info dictionary | |
| Returns: | |
| Global word index (0-based) or None if calculation fails | |
| """ | |
| if target_surah != selection_start_surah: | |
| # Cross-surah not supported yet | |
| return None | |
| try: | |
| # Get surah data | |
| surah_data = surah_info.get(str(target_surah)) | |
| if not surah_data or "verses" not in surah_data: | |
| return None | |
| # Count words from selection start to target verse | |
| word_offset = 0 | |
| for verse_data in surah_data["verses"]: | |
| verse_num = verse_data["verse"] | |
| if verse_num < selection_start_verse: | |
| continue | |
| elif verse_num < target_verse: | |
| # Add all words from this verse | |
| word_offset += verse_data.get("num_words", 0) | |
| elif verse_num == target_verse: | |
| # Add the word index within this verse (convert 1-based to 0-based) | |
| return word_offset + (word_in_verse - 1) | |
| else: | |
| break | |
| return None | |
| except Exception: | |
| return None | |
| def detect_vad_segments( | |
| audio_data: Tuple[int, np.ndarray], | |
| canonical_text: str, | |
| verse_ref: str = "", | |
| ) -> Optional[VadResult]: | |
| """ | |
| Run VAD only to detect speech segments (no Whisper yet). | |
| Args: | |
| audio_data: Tuple of (sample_rate, audio_array) from Gradio | |
| canonical_text: Expected Arabic text for the verse | |
| verse_ref: Verse reference (e.g., "1:2") for word count lookup | |
| Returns: | |
| VadResult with speech intervals and preprocessed audio, or None on error | |
| """ | |
| import time | |
| if audio_data is None: | |
| return None | |
| sample_rate, audio = audio_data | |
| # Convert to float32 if needed | |
| if audio.dtype == np.int16: | |
| audio = audio.astype(np.float32) / 32768.0 | |
| elif audio.dtype == np.int32: | |
| audio = audio.astype(np.float32) / 2147483648.0 | |
| # Convert stereo to mono | |
| if len(audio.shape) > 1: | |
| audio = audio.mean(axis=1) | |
| # Get canonical words for display/matching (includes verse markers, that's fine) | |
| canonical_words = canonical_text.split() | |
| # Get accurate word count from surah_info.json (this is the authoritative count) | |
| total_words = get_total_words_for_verse_range(verse_ref) if verse_ref else 0 | |
| if total_words == 0: | |
| # Fallback if surah_info lookup fails or no verse_ref | |
| total_words = len(canonical_words) | |
| print(f"[VAD] Warning: Using text.split() for word count (surah_info lookup failed)") | |
| else: | |
| print(f"[VAD] Word count from surah_info.json: {total_words} words") | |
| audio_duration = len(audio) / sample_rate | |
| print(f"\n[VAD] Processing {audio_duration:.2f}s of audio...") | |
| # Detect speech segments using VAD | |
| vad_start = time.time() | |
| speech_intervals = _detect_speech_segments(audio, sample_rate) | |
| vad_time = time.time() - vad_start | |
| print(f"[VAD] Completed in {vad_time:.2f}s - detected {len(speech_intervals)} segments") | |
| if not speech_intervals: | |
| print("[VAD] No speech detected") | |
| return None | |
| # Create VadSegment list | |
| vad_segments = [] | |
| for idx, (start_time, end_time) in enumerate(speech_intervals): | |
| vad_segments.append(VadSegment( | |
| start_time=start_time, | |
| end_time=end_time, | |
| segment_idx=idx | |
| )) | |
| return VadResult( | |
| vad_segments=vad_segments, | |
| audio=audio, | |
| sample_rate=sample_rate, | |
| canonical_words=canonical_words, | |
| total_words=total_words | |
| ) | |
| def process_single_segment( | |
| vad_segment: VadSegment, | |
| audio: np.ndarray, | |
| sample_rate: int, | |
| verse_ref: str, | |
| canonical_words: List[str], | |
| words_covered: set, | |
| ) -> SegmentInfo: | |
| """ | |
| Process a single VAD segment: Whisper transcription + text matching. | |
| Args: | |
| vad_segment: VAD segment with timing info | |
| audio: Preprocessed audio array | |
| sample_rate: Audio sample rate | |
| verse_ref: Verse reference (e.g., "1:2") | |
| canonical_words: List of canonical words | |
| words_covered: Set to track which words are covered (modified in place) | |
| Returns: | |
| SegmentInfo with transcription and matching results | |
| """ | |
| import time | |
| start_time = vad_segment.start_time | |
| end_time = vad_segment.end_time | |
| seg_idx = vad_segment.segment_idx | |
| total_words = len(canonical_words) | |
| # Extract audio segment | |
| start_sample = int(start_time * sample_rate) | |
| end_sample = int(end_time * sample_rate) | |
| segment_audio = audio[start_sample:end_sample] | |
| if len(segment_audio) < 1600: # Less than 0.1s at 16kHz | |
| print(f"[SEG {seg_idx+1}] Skipped (too short)") | |
| return SegmentInfo( | |
| start_time=start_time, | |
| end_time=end_time, | |
| transcribed_text="", | |
| matched_text="", | |
| matched_ref="", | |
| word_start_idx=0, | |
| word_end_idx=0, | |
| canonical_phonemes="", | |
| match_score=0.0, | |
| error="Segment too short" | |
| ) | |
| # Transcribe segment with Whisper | |
| whisper_start = time.time() | |
| transcribed_text = _transcribe_segment(segment_audio, sample_rate) | |
| whisper_time = time.time() - whisper_start | |
| if not transcribed_text: | |
| print(f"[SEG {seg_idx+1}] ({start_time:.1f}s-{end_time:.1f}s): Whisper {whisper_time:.2f}s - FAILED") | |
| return SegmentInfo( | |
| start_time=start_time, | |
| end_time=end_time, | |
| transcribed_text="", | |
| matched_text="", | |
| matched_ref="", | |
| word_start_idx=0, | |
| word_end_idx=0, | |
| canonical_phonemes="", | |
| match_score=0.0, | |
| error="Transcription failed" | |
| ) | |
| # Match transcribed text to verse using phonemizer | |
| match_start = time.time() | |
| matched_text, phonemes, match_score, matched_ref = _match_text_to_verse( | |
| transcribed_text, verse_ref | |
| ) | |
| match_time = time.time() - match_start | |
| # Debug logging | |
| print(f"[SEG {seg_idx+1}] Phonemizer results:") | |
| print(f" - Transcribed: '{transcribed_text}'") | |
| print(f" - Matched text: '{matched_text}'") | |
| print(f" - Matched ref: '{matched_ref}'") | |
| print(f" - Score: {match_score:.2f}") | |
| if match_score < MIN_MATCH_SCORE: | |
| print(f"[SEG {seg_idx+1}] ({start_time:.1f}s-{end_time:.1f}s): Whisper {whisper_time:.2f}s, Match {match_time:.2f}s - SCORE {match_score:.2f} (LOW)") | |
| return SegmentInfo( | |
| start_time=start_time, | |
| end_time=end_time, | |
| transcribed_text=transcribed_text, | |
| matched_text="", | |
| matched_ref="", | |
| word_start_idx=0, | |
| word_end_idx=0, | |
| canonical_phonemes="", | |
| match_score=match_score, | |
| error=f"Low match score ({match_score:.2f})" | |
| ) | |
| # Parse word indices from matched_ref (phonemizer gives us this!) | |
| start_word_0based, end_word_0based = _parse_word_indices_from_ref(matched_ref, verse_ref) | |
| print(f"[SEG {seg_idx+1}] Word indices from phonemizer ref:") | |
| print(f" - Parsed from '{matched_ref}': global indices {start_word_0based}-{end_word_0based} (0-based)") | |
| if start_word_0based is not None and end_word_0based is not None: | |
| # Already 0-based, just clamp to valid range | |
| word_start_idx = max(0, min(start_word_0based, total_words - 1)) | |
| word_end_idx = max(0, min(end_word_0based, total_words - 1)) | |
| print(f" - Clamped indices: word_start_idx={word_start_idx}, word_end_idx={word_end_idx}") | |
| print(f" - Covering words: {canonical_words[word_start_idx:word_end_idx+1]}") | |
| else: | |
| # Fallback: no word indices in ref, use matched text length | |
| matched_words = matched_text.split() | |
| num_matched = len(matched_words) | |
| word_start_idx = 0 | |
| word_end_idx = min(num_matched - 1, total_words - 1) if num_matched > 0 else 0 | |
| print(f" - No word indices in ref, using matched text length: {num_matched} words") | |
| print(f" - Default indices: word_start_idx={word_start_idx}, word_end_idx={word_end_idx}") | |
| # Mark words as covered | |
| for idx in range(word_start_idx, word_end_idx + 1): | |
| words_covered.add(idx) | |
| total_time = whisper_time + match_time | |
| print(f"[SEG {seg_idx+1}] ({start_time:.1f}s-{end_time:.1f}s): Whisper {whisper_time:.2f}s, Match {match_time:.2f}s, Total {total_time:.2f}s - words {word_start_idx+1}-{word_end_idx+1} - SCORE {match_score:.2f}") | |
| return SegmentInfo( | |
| start_time=start_time, | |
| end_time=end_time, | |
| transcribed_text=transcribed_text, | |
| matched_text=matched_text, | |
| matched_ref=matched_ref, | |
| word_start_idx=word_start_idx, | |
| word_end_idx=word_end_idx, | |
| canonical_phonemes=phonemes, | |
| match_score=match_score, | |
| error=None | |
| ) | |
| def process_audio_segments( | |
| audio_data: Tuple[int, np.ndarray], | |
| verse_ref: str, | |
| canonical_text: str, | |
| canonical_phonemes: str, | |
| ) -> SegmentationResult: | |
| """ | |
| Process audio with VAD segmentation, ASR, and text matching. | |
| Args: | |
| audio_data: Tuple of (sample_rate, audio_array) from Gradio | |
| verse_ref: Verse reference (e.g., "1:2") | |
| canonical_text: Expected Arabic text for the verse | |
| canonical_phonemes: Expected phonemes for the verse | |
| Returns: | |
| SegmentationResult with segment info and coverage status | |
| """ | |
| import time | |
| if audio_data is None: | |
| return SegmentationResult(segments=[], full_coverage=False, | |
| coverage_warning="No audio provided") | |
| total_start = time.time() | |
| sample_rate, audio = audio_data | |
| # Convert to float32 if needed | |
| if audio.dtype == np.int16: | |
| audio = audio.astype(np.float32) / 32768.0 | |
| elif audio.dtype == np.int32: | |
| audio = audio.astype(np.float32) / 2147483648.0 | |
| # Convert stereo to mono | |
| if len(audio.shape) > 1: | |
| audio = audio.mean(axis=1) | |
| # Get canonical words for display/matching (includes verse markers, that's fine) | |
| canonical_words = canonical_text.split() | |
| # Get accurate word count from surah_info.json (this is the authoritative count) | |
| total_words = get_total_words_for_verse_range(verse_ref) if verse_ref else 0 | |
| if total_words == 0: | |
| # Fallback if surah_info lookup fails or no verse_ref | |
| total_words = len(canonical_words) | |
| print(f"[Segmentation] Warning: Using text.split() for word count (surah_info lookup failed)") | |
| else: | |
| print(f"[Segmentation] Word count from surah_info.json: {total_words} words") | |
| audio_duration = len(audio) / sample_rate | |
| print(f"\n[Segmentation] Processing {audio_duration:.2f}s of audio...") | |
| # Detect speech segments using VAD | |
| vad_start = time.time() | |
| speech_intervals = _detect_speech_segments(audio, sample_rate) | |
| vad_time = time.time() - vad_start | |
| print(f"[Segmentation] VAD: {vad_time:.2f}s - detected {len(speech_intervals)} segments") | |
| if not speech_intervals: | |
| return SegmentationResult( | |
| segments=[], | |
| full_coverage=False, | |
| coverage_warning="No speech detected in audio", | |
| total_words=total_words | |
| ) | |
| # Track which words are covered | |
| words_covered = set() | |
| segments = [] | |
| for seg_idx, (start_time, end_time) in enumerate(speech_intervals): | |
| seg_start = time.time() | |
| # Extract audio segment | |
| start_sample = int(start_time * sample_rate) | |
| end_sample = int(end_time * sample_rate) | |
| segment_audio = audio[start_sample:end_sample] | |
| if len(segment_audio) < 1600: # Less than 0.1s at 16kHz | |
| print(f"[Segmentation] Segment {seg_idx+1}: skipped (too short)") | |
| continue | |
| # Transcribe segment with Whisper | |
| whisper_start = time.time() | |
| transcribed_text = _transcribe_segment(segment_audio, sample_rate) | |
| whisper_time = time.time() - whisper_start | |
| if not transcribed_text: | |
| print(f"[SEG {seg_idx+1}] ({start_time:.1f}s-{end_time:.1f}s): Whisper {whisper_time:.2f}s - FAILED") | |
| segments.append(SegmentInfo( | |
| start_time=start_time, | |
| end_time=end_time, | |
| transcribed_text="", | |
| matched_text="", | |
| matched_ref="", | |
| word_start_idx=0, | |
| word_end_idx=0, | |
| canonical_phonemes="", | |
| match_score=0.0, | |
| error="Transcription failed" | |
| )) | |
| continue | |
| # Match transcribed text to verse using phonemizer | |
| match_start = time.time() | |
| matched_text, phonemes, match_score, matched_ref = _match_text_to_verse( | |
| transcribed_text, verse_ref | |
| ) | |
| match_time = time.time() - match_start | |
| if match_score < MIN_MATCH_SCORE: | |
| print(f"[Segmentation] Segment {seg_idx+1} ({start_time:.1f}s-{end_time:.1f}s): Whisper {whisper_time:.2f}s, Match {match_time:.2f}s - LOW SCORE ({match_score:.2f})") | |
| segments.append(SegmentInfo( | |
| start_time=start_time, | |
| end_time=end_time, | |
| transcribed_text=transcribed_text, | |
| matched_text="", | |
| matched_ref="", | |
| word_start_idx=0, | |
| word_end_idx=0, | |
| canonical_phonemes="", | |
| match_score=match_score, | |
| error=f"Low match score ({match_score:.2f})" | |
| )) | |
| continue | |
| # Parse word indices from matched_ref (phonemizer gives us this!) | |
| start_word_0based, end_word_0based = _parse_word_indices_from_ref(matched_ref, verse_ref) | |
| if start_word_0based is not None and end_word_0based is not None: | |
| # Already 0-based, just clamp to valid range | |
| word_start_idx = max(0, min(start_word_0based, total_words - 1)) | |
| word_end_idx = max(0, min(end_word_0based, total_words - 1)) | |
| else: | |
| # Fallback: no word indices in ref, use matched text length | |
| matched_words = matched_text.split() | |
| num_matched = len(matched_words) | |
| word_start_idx = 0 | |
| word_end_idx = min(num_matched - 1, total_words - 1) if num_matched > 0 else 0 | |
| # Mark words as covered | |
| for idx in range(word_start_idx, word_end_idx + 1): | |
| words_covered.add(idx) | |
| seg_total = time.time() - seg_start | |
| print(f"[SEG {seg_idx+1}] ({start_time:.1f}s-{end_time:.1f}s): Whisper {whisper_time:.2f}s, Match {match_time:.2f}s, Total {seg_total:.2f}s - words {word_start_idx+1}-{word_end_idx+1} - SCORE {match_score:.2f}") | |
| segments.append(SegmentInfo( | |
| start_time=start_time, | |
| end_time=end_time, | |
| transcribed_text=transcribed_text, | |
| matched_text=matched_text, | |
| matched_ref=matched_ref, | |
| word_start_idx=word_start_idx, | |
| word_end_idx=word_end_idx, | |
| canonical_phonemes=phonemes, | |
| match_score=match_score, | |
| error=None | |
| )) | |
| # Check coverage | |
| full_coverage = len(words_covered) == total_words | |
| coverage_warning = None | |
| if not full_coverage: | |
| missing_words = [i for i in range(total_words) if i not in words_covered] | |
| if missing_words: | |
| # Group consecutive missing indices | |
| groups = [] | |
| start = missing_words[0] | |
| end = missing_words[0] | |
| for idx in missing_words[1:]: | |
| if idx == end + 1: | |
| end = idx | |
| else: | |
| groups.append((start, end)) | |
| start = end = idx | |
| groups.append((start, end)) | |
| missing_ranges = ", ".join( | |
| f"words {s+1}-{e+1}" if s != e else f"word {s+1}" | |
| for s, e in groups | |
| ) | |
| coverage_warning = f"β οΈ Incomplete coverage: {missing_ranges} not detected in segments" | |
| total_time = time.time() - total_start | |
| coverage_pct = (len(words_covered) / total_words * 100) if total_words > 0 else 0 | |
| print(f"[SEGMENTATION] COMPLETE: {total_time:.2f}s - {len(segments)} segments, {len(words_covered)}/{total_words} words ({coverage_pct:.0f}% coverage)") | |
| return SegmentationResult( | |
| segments=segments, | |
| full_coverage=full_coverage, | |
| coverage_warning=coverage_warning, | |
| total_words=total_words | |
| ) | |
| def run_text_matching(transcribed_texts, vad_segments, verse_ref, total_words): | |
| """ | |
| Run CPU text matching with CONSTRAINED SLIDING WINDOW. | |
| For continuous recitation: | |
| - First GLOBAL_SEGMENTS search full verse range (to establish position) | |
| - Subsequent segments search constrained window around last matched verse | |
| - Window = last_verse - LOOKBACK to last_verse + LOOKAHEAD | |
| This is much faster than searching full range every time. | |
| Args: | |
| transcribed_texts: List of transcribed text strings from Whisper | |
| vad_segments: List of VadSegment objects | |
| verse_ref: Full verse reference for the recitation (e.g., "1:2-1:7") | |
| total_words: Total number of words in the verse range | |
| Returns: | |
| Tuple of (match_results, words_covered) | |
| - match_results: List of tuples with matching info for each segment | |
| - words_covered: Set of word indices that were matched | |
| """ | |
| import time | |
| from recitation_engine.special_segments import is_special_segment | |
| from config import MIN_MATCH_SCORE, GLOBAL_SEARCH_SEGMENTS, MATCH_LOOKBACK_VERSES, MATCH_LOOKAHEAD_VERSES | |
| match_start = time.time() | |
| num_segments = len(transcribed_texts) | |
| match_results = [] | |
| words_covered = set() | |
| # Parse the full verse ref to get surah and verse range | |
| try: | |
| if '-' in verse_ref: | |
| start_ref, end_ref = verse_ref.split('-') | |
| start_surah, start_verse = map(int, start_ref.split(':')) | |
| end_surah, end_verse = map(int, end_ref.split(':')) | |
| else: | |
| start_surah, start_verse = map(int, verse_ref.split(':')) | |
| end_surah, end_verse = start_surah, start_verse | |
| except: | |
| start_surah, start_verse = 1, 1 | |
| end_surah, end_verse = 1, 1 | |
| # Track last matched verse for constrained search | |
| last_matched_verse = start_verse | |
| global_searches = 0 | |
| constrained_searches = 0 | |
| for i, (vad_seg, transcribed_text) in enumerate(zip(vad_segments, transcribed_texts)): | |
| if not transcribed_text: | |
| match_results.append((None, "", "", 0.0, "")) | |
| continue | |
| # Check if this is a special segment (Basmala/Isti'adha) to skip | |
| is_special, special_name = is_special_segment(transcribed_text, verse_ref) | |
| if is_special: | |
| # Mark as special segment to be filtered out later | |
| match_results.append((None, transcribed_text, "", 0.0, f"SKIP_SPECIAL:{special_name}")) | |
| continue | |
| # Determine search ref based on position | |
| if i < GLOBAL_SEARCH_SEGMENTS: | |
| # First N segments: search full range (global) | |
| search_ref = verse_ref | |
| global_searches += 1 | |
| else: | |
| # Constrained search: window around last matched verse | |
| window_start = max(start_verse, last_matched_verse - MATCH_LOOKBACK_VERSES) | |
| window_end = min(end_verse, last_matched_verse + MATCH_LOOKAHEAD_VERSES) | |
| if start_surah == end_surah: | |
| search_ref = f"{start_surah}:{window_start}-{start_surah}:{window_end}" | |
| else: | |
| # Multi-surah: just use full ref (complex case) | |
| search_ref = verse_ref | |
| constrained_searches += 1 | |
| # Match against verse | |
| matched_text, phonemes, match_score, matched_ref = _match_text_to_verse( | |
| transcribed_text, search_ref | |
| ) | |
| # If constrained search fails, try global fallback | |
| if match_score < MIN_MATCH_SCORE and i >= GLOBAL_SEARCH_SEGMENTS: | |
| matched_text, phonemes, match_score, matched_ref = _match_text_to_verse( | |
| transcribed_text, verse_ref # Full range fallback | |
| ) | |
| global_searches += 1 | |
| constrained_searches -= 1 | |
| if match_score < MIN_MATCH_SCORE: | |
| match_results.append((None, transcribed_text, "", match_score, f"Low match score ({match_score:.2f})")) | |
| continue | |
| # Parse word indices and update last_matched_verse | |
| start_word, end_word = _parse_word_indices_from_ref(matched_ref, verse_ref) | |
| seg_words_covered = set() | |
| if start_word is not None and end_word is not None: | |
| word_start_idx = max(0, min(start_word, total_words - 1)) | |
| word_end_idx = max(0, min(end_word, total_words - 1)) | |
| for w_idx in range(word_start_idx, word_end_idx + 1): | |
| seg_words_covered.add(w_idx) | |
| words_covered.add(w_idx) | |
| # Update last matched verse from matched_ref | |
| try: | |
| if ':' in matched_ref: | |
| parts = matched_ref.split('-')[-1].split(':') # Get end verse | |
| if len(parts) >= 2: | |
| last_matched_verse = int(parts[1]) | |
| except: | |
| pass | |
| else: | |
| word_start_idx = 0 | |
| word_end_idx = 0 | |
| match_results.append(( | |
| (word_start_idx, word_end_idx), | |
| transcribed_text, | |
| matched_text, | |
| match_score, | |
| matched_ref, | |
| phonemes | |
| )) | |
| match_time = time.time() - match_start | |
| avg_seg = match_time / num_segments if num_segments > 0 else 0 | |
| print(f"[CPU TEXT MATCHING] ββββββββββββββββββββββββββββββββββββββ") | |
| print(f" Segments: {num_segments} | Total: {match_time:.2f}s | Avg: {avg_seg*1000:.0f}ms/seg") | |
| print(f" Global searches: {global_searches} | Constrained: {constrained_searches}") | |
| print(f" Window: {MATCH_LOOKBACK_VERSES} back / {MATCH_LOOKAHEAD_VERSES} ahead") | |
| print(f"[CPU TEXT MATCHING] ββββββββββββββββββββββββββββββββββββββ") | |
| return match_results, words_covered | |
| def build_segment_infos(vad_segments, segment_audios, match_results, wav2vec_results): | |
| """ | |
| Build SegmentInfo objects from match results and Wav2Vec2 transcriptions. | |
| Segments marked as SKIP_SPECIAL (Basmala/Isti'adha) are filtered out entirely. | |
| Args: | |
| vad_segments: List of VadSegment objects | |
| segment_audios: List of audio arrays for each segment | |
| match_results: List of match result tuples from run_text_matching | |
| wav2vec_results: List of phoneme transcriptions from Wav2Vec2 | |
| Returns: | |
| Tuple of (segment_infos, predicted_phonemes, kept_indices) | |
| - segment_infos: List of SegmentInfo objects (special segments filtered out) | |
| - predicted_phonemes: List of phoneme strings matching segment_infos | |
| - kept_indices: List of original segment indices that were kept (for FA alignment) | |
| """ | |
| segment_infos = [] | |
| predicted_phonemes = [] | |
| kept_indices = [] # Track original indices for FA result alignment | |
| skipped_count = 0 | |
| empty_phoneme_count = 0 | |
| # Diagnostic logging: verify input lengths match | |
| print(f"[BUILD SEGMENT] Inputs: {len(vad_segments)} VAD segments, {len(segment_audios)} audios, " | |
| f"{len(match_results)} matches, {len(wav2vec_results)} wav2vec results") | |
| for i, (vad_seg, audio, match_result) in enumerate(zip(vad_segments, segment_audios, match_results)): | |
| if match_result[0] is None: | |
| # Check if this is a special segment to skip entirely | |
| error_msg = match_result[4] if len(match_result) > 4 else "" | |
| if error_msg.startswith("SKIP_SPECIAL:"): | |
| special_type = error_msg.split(":")[1] if ":" in error_msg else "unknown" | |
| print(f"[SEGMENT FILTER] Skipping {special_type} segment ({vad_seg.start_time:.1f}s-{vad_seg.end_time:.1f}s)") | |
| skipped_count += 1 | |
| continue # Don't add to segment_infos - completely invisible | |
| # Error case - no valid match (display error in UI) | |
| segment_infos.append(SegmentInfo( | |
| start_time=vad_seg.start_time, | |
| end_time=vad_seg.end_time, | |
| transcribed_text=match_result[1], | |
| matched_text="", | |
| matched_ref="", | |
| word_start_idx=0, | |
| word_end_idx=0, | |
| canonical_phonemes="", | |
| match_score=match_result[3] if len(match_result) > 3 else 0.0, | |
| error=error_msg if error_msg else "Transcription failed" | |
| )) | |
| predicted_phonemes.append("") | |
| kept_indices.append(i) # Track for FA alignment | |
| else: | |
| # Valid match - get transcription by index | |
| word_start_idx, word_end_idx = match_result[0] | |
| transcribed_text = match_result[1] | |
| matched_text = match_result[2] | |
| match_score = match_result[3] | |
| matched_ref = match_result[4] | |
| canonical_phonemes = match_result[5] if len(match_result) > 5 else "" | |
| # Direct index - wav2vec_results has same ordering as segments | |
| seg_transcription = wav2vec_results[i] if i < len(wav2vec_results) else "" | |
| if not seg_transcription: | |
| empty_phoneme_count += 1 | |
| print(f"[BUILD SEGMENT] Segment {i+1} ({vad_seg.start_time:.1f}s-{vad_seg.end_time:.1f}s): " | |
| f"Empty phonemes (matched verse: {matched_ref})") | |
| predicted_phonemes.append(seg_transcription) | |
| segment_infos.append(SegmentInfo( | |
| start_time=vad_seg.start_time, | |
| end_time=vad_seg.end_time, | |
| transcribed_text=transcribed_text, | |
| matched_text=matched_text, | |
| matched_ref=matched_ref, | |
| word_start_idx=word_start_idx, | |
| word_end_idx=word_end_idx, | |
| canonical_phonemes=canonical_phonemes, | |
| match_score=match_score, | |
| error=None | |
| )) | |
| kept_indices.append(i) # Track for FA alignment | |
| if skipped_count > 0: | |
| print(f"[SEGMENT FILTER] Skipped {skipped_count} special segment(s) (Basmala/Isti'adha)") | |
| # Summary logging | |
| valid_phoneme_count = len(predicted_phonemes) - empty_phoneme_count | |
| print(f"[BUILD SEGMENT] Output: {len(segment_infos)} segments, " | |
| f"{valid_phoneme_count}/{len(predicted_phonemes)} with phoneme data") | |
| if empty_phoneme_count > 0: | |
| print(f"[BUILD SEGMENT] WARNING: {empty_phoneme_count} segment(s) have no phoneme data") | |
| return segment_infos, predicted_phonemes, kept_indices | |