""" Utilities for fetching reference reciter audio per verse and slicing it for comparison. """ from __future__ import annotations import base64 import io import json from functools import lru_cache from pathlib import Path from typing import Dict, List, Optional, Tuple from urllib.request import Request, urlopen import librosa import numpy as np import soundfile as sf from config import PROJECT_ROOT, SURAH_INFO_PATH from utils.phonemizer_utils import load_surah_info # Paths to metadata files VERSES_AUDIO_PATH = PROJECT_ROOT / "data" / "minshawi_audio.json" SEGMENTS_JSONL_PATH = PROJECT_ROOT / "data" / "minshawi_pause_segments.jsonl" WORD_SEGMENTS_JSON_PATH = PROJECT_ROOT / "data" / "minshawi_word_segments.json" _audio_cache: Dict[Tuple[str, int], np.ndarray] = {} _mp3_cache: Dict[str, bytes] = {} # Cache raw MP3 bytes by URL def clear_audio_caches() -> None: """Clear cached audio data to free memory.""" _audio_cache.clear() _mp3_cache.clear() def _safe_read_json(path: Path) -> Optional[dict]: try: return json.loads(path.read_text()) except Exception as exc: print(f"[REFERENCE] Failed to load {path.name}: {exc}") return None @lru_cache(maxsize=1) def _load_verse_audio_meta() -> Optional[dict]: return _safe_read_json(VERSES_AUDIO_PATH) @lru_cache(maxsize=1) def _load_surah_info() -> Optional[dict]: """Load surah_info.json with caching.""" return _safe_read_json(SURAH_INFO_PATH) @lru_cache(maxsize=1) def _load_segments_index() -> Dict[str, List[dict]]: """ Load precomputed word-span -> time ranges from segments.jsonl. Returns a dict keyed by "surah:ayah" with list of segments. """ index: Dict[str, List[dict]] = {} if not SEGMENTS_JSONL_PATH.exists(): return index try: with SEGMENTS_JSONL_PATH.open("r", encoding="utf-8") as f: for line in f: try: obj = json.loads(line.strip()) idx_from = obj.get("index_from") idx_to = obj.get("index_to") if not idx_from or not idx_to: continue surah_from, ayah_from, word_from = map(int, idx_from.split(":")) surah_to, ayah_to, word_to = map(int, idx_to.split(":")) # Only keep spans that live within a single ayah to keep mapping simple if surah_from != surah_to or ayah_from != ayah_to: continue verse_key = f"{surah_from}:{ayah_from}" segment = { "start_word": min(word_from, word_to), "end_word": max(word_from, word_to), "time_from": float(obj.get("time_from", 0.0)), "time_to": float(obj.get("time_to", 0.0)), } index.setdefault(verse_key, []).append(segment) except Exception: # Skip malformed lines but keep reading continue except Exception as exc: print(f"[REFERENCE] Failed to read segments index: {exc}") return index @lru_cache(maxsize=1) def _load_word_segments_index() -> Dict[str, List[dict]]: """ Load word-level timestamps from minshawi_word_segments.json. Returns a dict keyed by "surah:ayah" with list of segment info. Each entry has 'segments' where each segment is [word_idx_0based, word_idx_1based, start_ms, end_ms]. """ index: Dict[str, List[dict]] = {} if not WORD_SEGMENTS_JSON_PATH.exists(): print(f"[REFERENCE] Word segments file not found: {WORD_SEGMENTS_JSON_PATH}") return index try: with WORD_SEGMENTS_JSON_PATH.open("r", encoding="utf-8") as f: data = json.load(f) for entry in data: surah = entry.get("surah") ayah = entry.get("ayah") segments = entry.get("segments", []) if surah is not None and ayah is not None: verse_key = f"{surah}:{ayah}" index[verse_key] = segments except Exception as exc: print(f"[REFERENCE] Failed to read word segments: {exc}") return index def _get_word_time_range_from_segments( verse_key: str, target_start_word: int, target_end_word: int, verse_offsets: Dict[str, float], word_segments_index: Dict[str, List[list]], pad_ms: float = 100.0, ) -> Optional[Tuple[float, float]]: """ Get time range for a word span using word-level timestamps. Word indices are 1-based as used in the app. Returns (start_sec, end_sec) with padding applied, or None if not found. """ word_list = word_segments_index.get(verse_key, []) if not word_list: return None offset = verse_offsets.get(verse_key, 0.0) pad_sec = pad_ms / 1000.0 # Find earliest start and latest end for the word range # Each segment is [word_idx_0based, word_idx_1based, start_ms, end_ms] earliest_start_ms = None latest_end_ms = None for seg in word_list: if len(seg) < 4: continue word_idx_1based = seg[1] # 1-based word index start_ms = seg[2] end_ms = seg[3] if target_start_word <= word_idx_1based <= target_end_word: if earliest_start_ms is None or start_ms < earliest_start_ms: earliest_start_ms = start_ms if latest_end_ms is None or end_ms > latest_end_ms: latest_end_ms = end_ms if earliest_start_ms is None or latest_end_ms is None: return None # Convert ms to seconds and add offset + padding start_sec = max(0.0, offset + (earliest_start_ms / 1000.0) - pad_sec) end_sec = offset + (latest_end_ms / 1000.0) + pad_sec return (start_sec, end_sec) def _download_mp3_bytes(url: str) -> Optional[bytes]: """ Download MP3 from URL and return raw bytes. Uses cache to avoid repeated downloads. """ import time if url in _mp3_cache: print(f"[PROFILE] MP3 cache hit") return _mp3_cache[url] from urllib.error import URLError, HTTPError try: t0 = time.perf_counter() headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36", "Accept": "audio/mpeg,audio/*;q=0.9,*/*;q=0.8", } request = Request(url, headers=headers) with urlopen(request, timeout=30) as resp: data = resp.read() t1 = time.perf_counter() print(f"[PROFILE] HTTP download: {t1-t0:.3f}s ({len(data)} bytes)") if data and len(data) > 100: _mp3_cache[url] = data return data except (URLError, HTTPError) as exc: print(f"[REFERENCE] Failed to download {url}: {exc}") except Exception as exc: print(f"[REFERENCE] Error downloading {url}: {exc}") return None def _mp3_bytes_to_data_uri(mp3_bytes: bytes) -> str: """Convert raw MP3 bytes to data URI for browser playback.""" import time t0 = time.perf_counter() encoded = base64.b64encode(mp3_bytes).decode("ascii") t1 = time.perf_counter() print(f"[PROFILE] Base64 encode: {t1-t0:.3f}s") return f"data:audio/mpeg;base64,{encoded}" def get_verse_audio_uri(verse_ref: str) -> Optional[str]: """ Get audio data URI for a verse - fast path for playback only. Downloads MP3 and returns data URI directly without any processing. For single verse, returns the verse audio. For verse range, returns None (use lazy loading per verse). """ import time t_start = time.perf_counter() print(f"[PROFILE] get_verse_audio_uri({verse_ref}) started") if not verse_ref or "-" in verse_ref: return None # Ranges need per-verse loading t0 = time.perf_counter() audio_meta = _load_verse_audio_meta() t1 = time.perf_counter() print(f"[PROFILE] Load verse meta: {t1-t0:.3f}s") if not audio_meta: return None meta = audio_meta.get(verse_ref) if not meta: return None url = meta.get("audio_url") if not url: return None mp3_bytes = _download_mp3_bytes(url) if mp3_bytes: result = _mp3_bytes_to_data_uri(mp3_bytes) t_end = time.perf_counter() print(f"[PROFILE] get_verse_audio_uri TOTAL: {t_end-t_start:.3f}s") return result return None def get_single_verse_audio_uri(verse_key: str) -> Optional[str]: """ Get audio data URI for a single verse (for lazy loading). This is used by the lazy loading system to fetch individual verses on-demand as the user navigates through a verse range. Args: verse_key: Verse key like "2:5" Returns: Data URI string or None if failed """ audio_meta = _load_verse_audio_meta() if not audio_meta: return None meta = audio_meta.get(verse_key) if not meta or not meta.get("audio_url"): return None mp3_bytes = _download_mp3_bytes(meta["audio_url"]) if mp3_bytes: return _mp3_bytes_to_data_uri(mp3_bytes) return None def get_verse_audio_path(verse_key: str) -> Optional[str]: """ Get audio file path for a single verse (for native Gradio audio player). Downloads the verse audio to a temporary file and returns the path. This is used by the native Gradio gr.Audio component which requires a file path rather than a data URI. Args: verse_key: Verse key like "2:5" Returns: Path to temporary MP3 file or None if failed """ import tempfile import time audio_meta = _load_verse_audio_meta() if not audio_meta: return None meta = audio_meta.get(verse_key) if not meta or not meta.get("audio_url"): return None t0 = time.perf_counter() mp3_bytes = _download_mp3_bytes(meta["audio_url"]) t1 = time.perf_counter() if not mp3_bytes: return None # Write to temp file that Gradio can serve # Note: Gradio handles cleanup of temp files try: with tempfile.NamedTemporaryFile( suffix=".mp3", delete=False, prefix=f"verse_{verse_key.replace(':', '_')}_" ) as tmp: tmp.write(mp3_bytes) tmp.flush() t2 = time.perf_counter() print(f"[PROFILE] get_verse_audio_path({verse_key}): download={t1-t0:.3f}s, write={t2-t1:.3f}s") return tmp.name except Exception as exc: print(f"[REFERENCE] Failed to write temp file for {verse_key}: {exc}") return None def get_verse_audio_urls_for_range(verse_ref: str) -> dict: """ Get audio URLs (not data URIs) for all verses in a range. This is the lazy-loading version - returns direct CDN URLs that the browser will fetch on-demand when the user plays each verse. Supports: - Single verse: '2:5' -> Returns dict with one URL - Verse range: '2:5-2:8' -> Returns dict with URLs for verses 2:5, 2:6, 2:7, 2:8 - Whole chapter: '2' -> Returns URLs for all verses in chapter 2 Args: verse_ref: Verse reference string (e.g., "2:5", "2:5-2:8", "2") Returns: Dict with keys: urls: List[str] - Direct audio URLs for each verse from_verse: int - Starting verse number to_verse: int - Ending verse number chapter: int - Chapter number verse_keys: List[str] - Verse keys (e.g., ["2:5", "2:6", "2:7"]) """ if not verse_ref: return {"urls": [], "from_verse": 0, "to_verse": 0, "chapter": 0, "verse_keys": []} # Parse verse range into individual verse keys verse_keys = parse_verse_range(verse_ref) if not verse_keys: print(f"[REFERENCE] Failed to parse verse range: {verse_ref}") return {"urls": [], "from_verse": 0, "to_verse": 0, "chapter": 0, "verse_keys": []} # Extract chapter and verse numbers from first and last verse keys try: first_parts = verse_keys[0].split(":") last_parts = verse_keys[-1].split(":") chapter = int(first_parts[0]) from_verse = int(first_parts[1]) to_verse = int(last_parts[1]) except (ValueError, IndexError): print(f"[REFERENCE] Failed to extract verse numbers from: {verse_keys}") return {"urls": [], "from_verse": 0, "to_verse": 0, "chapter": 0, "verse_keys": []} # Load metadata (cached) audio_meta = _load_verse_audio_meta() if not audio_meta: return {"urls": [], "from_verse": from_verse, "to_verse": to_verse, "chapter": chapter, "verse_keys": []} # Get URLs for each verse (no downloading) urls = [] successful_verses = [] # Add cache-busting parameter to bypass HF Spaces cache issues # (net::ERR_CACHE_OPERATION_NOT_SUPPORTED) import time cache_buster = int(time.time()) for verse_key in verse_keys: meta = audio_meta.get(verse_key) if meta: url = meta.get("audio_url") if url: urls.append(f"{url}?t={cache_buster}") successful_verses.append(verse_key) else: print(f"[REFERENCE] No audio URL for verse: {verse_key}") else: print(f"[REFERENCE] No metadata for verse: {verse_key}") return { "urls": urls, "from_verse": from_verse, "to_verse": to_verse, "chapter": chapter, "verse_keys": successful_verses, } def get_verse_audio_uris_for_range(verse_ref: str) -> dict: """ Get audio data URIs for all verses in a range. NOTE: This downloads all audio upfront. For lazy loading, use get_verse_audio_urls_for_range() instead. Supports: - Single verse: '2:5' -> Returns dict with one URI - Verse range: '2:5-2:8' -> Returns dict with URIs for verses 2:5, 2:6, 2:7, 2:8 - Whole chapter: '2' -> Returns URIs for all verses in chapter 2 Args: verse_ref: Verse reference string (e.g., "2:5", "2:5-2:8", "2") Returns: Dict with keys: uris: List[str] - Data URIs for each verse in the range from_verse: int - Starting verse number to_verse: int - Ending verse number chapter: int - Chapter number verse_keys: List[str] - Verse keys (e.g., ["2:5", "2:6", "2:7"]) Returns empty dict with empty uris list if: - verse_ref is None or empty - No verses can be parsed from verse_ref - All verses fail to download """ import time t_start = time.perf_counter() print(f"[PROFILE] get_verse_audio_uris_for_range({verse_ref}) started") if not verse_ref: return {"uris": [], "from_verse": 0, "to_verse": 0, "chapter": 0, "verse_keys": []} # Parse verse range into individual verse keys verse_keys = parse_verse_range(verse_ref) if not verse_keys: print(f"[REFERENCE] Failed to parse verse range: {verse_ref}") return {"uris": [], "from_verse": 0, "to_verse": 0, "chapter": 0, "verse_keys": []} # Extract chapter and verse numbers from first and last verse keys try: first_parts = verse_keys[0].split(":") last_parts = verse_keys[-1].split(":") chapter = int(first_parts[0]) from_verse = int(first_parts[1]) to_verse = int(last_parts[1]) except (ValueError, IndexError): print(f"[REFERENCE] Failed to extract verse numbers from: {verse_keys}") return {"uris": [], "from_verse": 0, "to_verse": 0, "chapter": 0, "verse_keys": []} # Load audio URIs for each verse uris = [] successful_verses = [] for verse_key in verse_keys: try: uri = get_verse_audio_uri(verse_key) if uri: uris.append(uri) successful_verses.append(verse_key) else: print(f"[REFERENCE] Failed to load audio for verse: {verse_key}") except Exception as e: print(f"[REFERENCE] Error loading audio for verse {verse_key}: {e}") t_end = time.perf_counter() print(f"[PROFILE] get_verse_audio_uris_for_range TOTAL: {t_end-t_start:.3f}s (loaded {len(uris)}/{len(verse_keys)} verses)") # Return structured data return { "uris": uris, "from_verse": from_verse, "to_verse": to_verse, "chapter": chapter, "verse_keys": successful_verses, } def _download_audio(url: str, target_sr: int) -> Optional[np.ndarray]: """ Download audio from URL and return mono waveform at target_sr. Uses a tiny in-memory cache to avoid repeated downloads. Uses audioread backend for MP3 support via ffmpeg. """ cache_key = (url, target_sr) if cache_key in _audio_cache: return _audio_cache[cache_key] import tempfile import time from urllib.error import URLError, HTTPError # Patch soundfile to avoid librosa's exception handling bug # Some versions of soundfile don't have SoundFileRuntimeError if not hasattr(sf, "SoundFileRuntimeError"): sf.SoundFileRuntimeError = RuntimeError # type: ignore[attr-defined] max_retries = 3 retry_delay = 1.0 # seconds for attempt in range(max_retries): try: # Download with proper headers headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36", "Accept": "audio/mpeg,audio/*;q=0.9,*/*;q=0.8", "Accept-Encoding": "identity", # Disable compression to avoid issues } request = Request(url, headers=headers) with urlopen(request, timeout=30) as resp: data = resp.read() # Validate we got actual data if not data or len(data) < 100: raise ValueError(f"Downloaded data too small ({len(data)} bytes)") # MP3 files need to be saved to disk for audioread/ffmpeg to process them # Soundfile (the default librosa backend) doesn't support MP3 format # So we use a temp file which allows librosa to fall back to audioread backend with tempfile.NamedTemporaryFile(suffix=".mp3", delete=False) as tmp: tmp_path = tmp.name tmp.write(data) tmp.flush() try: # Load with librosa - it will use audioread backend for MP3 files # The audioread backend uses ffmpeg which handles MP3 perfectly # Suppress expected warnings about soundfile failing and audioread deprecation import warnings with warnings.catch_warnings(): warnings.filterwarnings("ignore", message="PySoundFile failed") warnings.filterwarnings("ignore", category=FutureWarning, module="librosa") audio, _ = librosa.load(tmp_path, sr=target_sr, mono=True, dtype=np.float32) _audio_cache[cache_key] = audio return audio finally: # Clean up temp file try: Path(tmp_path).unlink() except Exception: pass except (URLError, HTTPError) as exc: if attempt < max_retries - 1: print(f"[REFERENCE] Network error on attempt {attempt + 1}: {exc}, retrying...") time.sleep(retry_delay) retry_delay *= 2 else: print(f"[REFERENCE] Failed to download audio {url} after {max_retries} attempts: {exc}") return None except Exception as exc: if attempt < max_retries - 1: print(f"[REFERENCE] Attempt {attempt + 1} failed: {exc}, retrying in {retry_delay}s...") time.sleep(retry_delay) retry_delay *= 2 else: print(f"[REFERENCE] Failed to download audio {url}: {exc}") return None return None def _audio_to_data_uri(audio: np.ndarray, sample_rate: int) -> Optional[str]: """Encode audio array as MP3 data URI for embedding in HTML. Uses MP3 at 32kbps mono - roughly 8x smaller than WAV. Falls back to WAV if pydub is not available. """ if audio is None or len(audio) == 0: return None try: from pydub import AudioSegment # Convert to int16 audio_int16 = (audio * 32767).astype(np.int16) # Create AudioSegment from raw data audio_seg = AudioSegment( data=audio_int16.tobytes(), sample_width=2, frame_rate=sample_rate, channels=1 ) # Export as MP3 at low bitrate buf = io.BytesIO() audio_seg.export(buf, format="mp3", bitrate="32k") encoded = base64.b64encode(buf.getvalue()).decode("ascii") return f"data:audio/mp3;base64,{encoded}" except ImportError: # Fallback to WAV if pydub not available try: buf = io.BytesIO() sf.write(buf, audio, sample_rate, format="WAV") encoded = base64.b64encode(buf.getvalue()).decode("ascii") return f"data:audio/wav;base64,{encoded}" except Exception as exc: print(f"[REFERENCE] Failed to encode audio: {exc}") return None except Exception as exc: print(f"[REFERENCE] Failed to encode audio as MP3: {exc}") return None def _get_num_words(surah_info: dict, surah: int, ayah: int) -> int: """Look up the number of words for a given verse.""" surah_entry = surah_info.get(str(surah), {}) for verse_info in surah_entry.get("verses", []): if verse_info.get("verse") == ayah: return int(verse_info.get("num_words", 0)) return 0 def parse_verse_range(verse_ref: str) -> List[str]: """ Convert a verse reference into a list of verse keys. Supports: - Single verse: '2:5' -> ['2:5'] - Verse range: '2:2-2:5' -> ['2:2', '2:3', '2:4', '2:5'] - Whole chapter: '2' -> ['2:1', '2:2', ..., '2:286'] Only ranges within a single surah are supported. """ if not verse_ref: return [] # Handle verse range (e.g., "2:2-2:5") if "-" in verse_ref: try: start_ref, end_ref = verse_ref.split("-", 1) start_surah, start_ayah = map(int, start_ref.split(":")) end_surah, end_ayah = map(int, end_ref.split(":")) if start_surah != end_surah or start_ayah > end_ayah: return [] return [f"{start_surah}:{v}" for v in range(start_ayah, end_ayah + 1)] except Exception: return [] # Handle single verse (e.g., "2:5") or whole chapter (e.g., "2") try: if ":" in verse_ref: # Single verse surah, ayah = map(int, verse_ref.split(":")) return [f"{surah}:{ayah}"] else: # Whole chapter - get all verses from surah_info surah_num = int(verse_ref) surah_info = _load_surah_info() or _safe_read_json(SURAH_INFO_PATH) or {} surah_data = surah_info.get(str(surah_num)) if surah_data and "num_verses" in surah_data: num_verses = surah_data["num_verses"] return [f"{surah_num}:{v}" for v in range(1, num_verses + 1)] return [] except Exception: return [] def build_reference_audio_for_selection( verse_ref: str, target_sr: int = 16000 ) -> Optional[dict]: """ Download and stitch reference audio for the selected verses. Returns a dict with: full_uri: data URI for the full stitched audio audio: numpy array waveform sample_rate: sample rate used word_map: list of (surah, ayah, word_idx) per word in selection word_timings: list of (start_sec, end_sec) per word (None if unknown) verse_offsets: map of verse_key -> start time (sec) within stitched audio verse_durations: map of verse_key -> duration (sec) segments_index: segments lookups for slicing """ verse_keys = parse_verse_range(verse_ref) if not verse_keys: return None surah_info = _load_surah_info() or {} audio_meta = _load_verse_audio_meta() if not audio_meta: return None segments_index = _load_segments_index() combined_audio: List[np.ndarray] = [] word_map: List[Tuple[int, int, int]] = [] word_timings: List[Tuple[Optional[float], Optional[float]]] = [] verse_offsets: Dict[str, float] = {} verse_durations: Dict[str, float] = {} total_offset = 0.0 for verse_key in verse_keys: meta = audio_meta.get(verse_key) if not meta: continue url = meta.get("audio_url") if not url: continue audio = _download_audio(url, target_sr) if audio is None or len(audio) == 0: continue duration = len(audio) / float(target_sr) verse_offsets[verse_key] = total_offset verse_durations[verse_key] = duration surah_num, ayah_num = map(int, verse_key.split(":")) num_words = _get_num_words(surah_info, surah_num, ayah_num) # Map word timings from metadata seg_map = { int(entry[0]): (float(entry[1]) / 1000.0, float(entry[2]) / 1000.0) for entry in (meta.get("segments") or []) if isinstance(entry, (list, tuple)) and len(entry) == 3 } for word_idx in range(1, num_words + 1): times = seg_map.get(word_idx) if times: start, end = times word_timings.append((total_offset + start, total_offset + end)) else: word_timings.append((None, None)) word_map.append((surah_num, ayah_num, word_idx)) combined_audio.append(audio) total_offset += duration if not combined_audio: return None stitched = np.concatenate(combined_audio) full_uri = _audio_to_data_uri(stitched, target_sr) return { "full_uri": full_uri, "audio": stitched, "sample_rate": target_sr, "word_map": word_map, "word_timings": word_timings, "verse_offsets": verse_offsets, "verse_durations": verse_durations, "segments_index": segments_index, "word_segments_index": _load_word_segments_index(), } def _choose_segment_from_index( verse_key: str, target_start_word: int, target_end_word: int, verse_offsets: Dict[str, float], segments_index: Dict[str, List[dict]], ) -> Optional[Tuple[float, float]]: """ Select the tightest segment(s) covering the target word span using precomputed index. If no single segment covers the range, merges multiple consecutive segments. """ candidates = segments_index.get(verse_key, []) if not candidates: return None # Strategy 1: Try to find a single segment that covers the entire range best_single = None best_extra = None for seg in candidates: start_word = int(seg.get("start_word", 0)) end_word = int(seg.get("end_word", 0)) if start_word <= target_start_word and end_word >= target_end_word: extra = (target_start_word - start_word) + (end_word - target_end_word) if best_single is None or extra < best_extra or (extra == best_extra and (end_word - start_word) < (best_single["end_word"] - best_single["start_word"])): best_single = seg best_extra = extra if best_single: # Found a single segment that covers everything offset = verse_offsets.get(verse_key, 0.0) return offset + float(best_single["time_from"]), offset + float(best_single["time_to"]) # Strategy 2: Merge multiple segments to cover the range # Sort segments by start word sorted_segs = sorted(candidates, key=lambda s: int(s.get("start_word", 0))) # Find all segments that overlap with our target range overlapping = [] for seg in sorted_segs: start_word = int(seg.get("start_word", 0)) end_word = int(seg.get("end_word", 0)) # Check if segment overlaps with target range if not (end_word < target_start_word or start_word > target_end_word): overlapping.append(seg) if not overlapping: return None # Merge: take earliest start time and latest end time offset = verse_offsets.get(verse_key, 0.0) earliest_time = min(float(seg.get("time_from", 0)) for seg in overlapping) latest_time = max(float(seg.get("time_to", 0)) for seg in overlapping) return offset + earliest_time, offset + latest_time def get_clip_for_word_span( reference_data: dict, start_idx: int, end_idx: int, pad_seconds: float = 0.05, ) -> Optional[str]: """ Return a data URI for the reference reciter covering the requested word span. Uses precomputed segments from segments.jsonl, selecting the smallest segment that covers the target words (may include extra words). """ if not reference_data: return None audio = reference_data.get("audio") sr = reference_data.get("sample_rate") word_map: List[Tuple[int, int, int]] = reference_data.get("word_map", []) verse_offsets: Dict[str, float] = reference_data.get("verse_offsets", {}) segments_index = reference_data.get("segments_index", {}) word_segments_index = reference_data.get("word_segments_index", {}) # Import config settings for word segment mode try: from config import USE_WORD_SEGMENT_TIMESTAMPS, WORD_SEGMENT_PAD_MS except ImportError: USE_WORD_SEGMENT_TIMESTAMPS = False WORD_SEGMENT_PAD_MS = 100 if audio is None or sr is None or not word_map: return None total_words = len(word_map) if total_words == 0: return None start_idx = max(0, min(start_idx, total_words - 1)) end_idx = max(0, min(end_idx, total_words - 1)) if start_idx > end_idx: start_idx, end_idx = end_idx, start_idx start_word = word_map[start_idx] end_word = word_map[end_idx] # Handle both single-verse and multi-verse spans if start_word[0] == end_word[0] and start_word[1] == end_word[1]: # Single verse verse_key = f"{start_word[0]}:{start_word[1]}" # Try word-level timestamps first if enabled segment_range = None if USE_WORD_SEGMENT_TIMESTAMPS and word_segments_index: segment_range = _get_word_time_range_from_segments( verse_key=verse_key, target_start_word=start_word[2], target_end_word=end_word[2], verse_offsets=verse_offsets, word_segments_index=word_segments_index, pad_ms=WORD_SEGMENT_PAD_MS, ) if segment_range: clip_start = max(0.0, segment_range[0]) clip_end = min(len(audio) / float(sr), segment_range[1]) # Fall back to pause-based segments if word segments didn't work if segment_range is None: segment_range = _choose_segment_from_index( verse_key=verse_key, target_start_word=start_word[2], target_end_word=end_word[2], verse_offsets=verse_offsets, segments_index=segments_index, ) if segment_range is None: return None clip_start = max(0.0, segment_range[0] - pad_seconds) clip_end = min(len(audio) / float(sr), segment_range[1] + pad_seconds) else: # Multi-verse span - merge segments across verses # Collect all verses involved verses_involved = [] for idx in range(start_idx, end_idx + 1): word = word_map[idx] verse_key = f"{word[0]}:{word[1]}" if not verses_involved or verses_involved[-1] != verse_key: verses_involved.append(verse_key) if not verses_involved: return None # Find time range for each verse earliest_time = None latest_time = None for verse_key in verses_involved: # Determine which words from this verse to include verse_words = [w for w in word_map[start_idx:end_idx+1] if f"{w[0]}:{w[1]}" == verse_key] if not verse_words: continue # Get the word range within this verse verse_start_word = min(w[2] for w in verse_words) verse_end_word = max(w[2] for w in verse_words) # Try word-level timestamps first if enabled verse_segment_range = None if USE_WORD_SEGMENT_TIMESTAMPS and word_segments_index: verse_segment_range = _get_word_time_range_from_segments( verse_key=verse_key, target_start_word=verse_start_word, target_end_word=verse_end_word, verse_offsets=verse_offsets, word_segments_index=word_segments_index, pad_ms=WORD_SEGMENT_PAD_MS, ) # Fall back to pause-based segments if verse_segment_range is None: verse_segment_range = _choose_segment_from_index( verse_key=verse_key, target_start_word=verse_start_word, target_end_word=verse_end_word, verse_offsets=verse_offsets, segments_index=segments_index, ) if verse_segment_range: if earliest_time is None or verse_segment_range[0] < earliest_time: earliest_time = verse_segment_range[0] if latest_time is None or verse_segment_range[1] > latest_time: latest_time = verse_segment_range[1] if earliest_time is None or latest_time is None: return None # Note: padding for word segments is already applied by _get_word_time_range_from_segments # For pause-based segments, we apply pad_seconds here if USE_WORD_SEGMENT_TIMESTAMPS and word_segments_index: clip_start = max(0.0, earliest_time) clip_end = min(len(audio) / float(sr), latest_time) else: clip_start = max(0.0, earliest_time - pad_seconds) clip_end = min(len(audio) / float(sr), latest_time + pad_seconds) start_sample = int(clip_start * sr) end_sample = max(start_sample + 1, int(clip_end * sr)) end_sample = min(end_sample, len(audio)) snippet = audio[start_sample:end_sample] return _audio_to_data_uri(snippet, sr) def render_audio_player_html(data_uri: str, title: str) -> str: """Small helper to render an HTML audio player with a label.""" if not data_uri: return "" # Detect format from data URI audio_type = "audio/mpeg" if "audio/mp3" in data_uri else "audio/wav" return f'''