Spaces:
Running
on
Zero
Running
on
Zero
| """ | |
| Utilities for fetching reference reciter audio per verse and slicing it for comparison. | |
| """ | |
| from __future__ import annotations | |
| import base64 | |
| import io | |
| import json | |
| from functools import lru_cache | |
| from pathlib import Path | |
| from typing import Dict, List, Optional, Tuple | |
| from urllib.request import Request, urlopen | |
| import librosa | |
| import numpy as np | |
| import soundfile as sf | |
| from config import PROJECT_ROOT, SURAH_INFO_PATH | |
| from utils.phonemizer_utils import load_surah_info | |
| # Paths to metadata files | |
| VERSES_AUDIO_PATH = PROJECT_ROOT / "data" / "minshawi_audio.json" | |
| SEGMENTS_JSONL_PATH = PROJECT_ROOT / "data" / "minshawi_pause_segments.jsonl" | |
| WORD_SEGMENTS_JSON_PATH = PROJECT_ROOT / "data" / "minshawi_word_segments.json" | |
| _audio_cache: Dict[Tuple[str, int], np.ndarray] = {} | |
| _mp3_cache: Dict[str, bytes] = {} # Cache raw MP3 bytes by URL | |
| def clear_audio_caches() -> None: | |
| """Clear cached audio data to free memory.""" | |
| _audio_cache.clear() | |
| _mp3_cache.clear() | |
| def _safe_read_json(path: Path) -> Optional[dict]: | |
| try: | |
| return json.loads(path.read_text()) | |
| except Exception as exc: | |
| print(f"[REFERENCE] Failed to load {path.name}: {exc}") | |
| return None | |
| def _load_verse_audio_meta() -> Optional[dict]: | |
| return _safe_read_json(VERSES_AUDIO_PATH) | |
| def _load_surah_info() -> Optional[dict]: | |
| """Load surah_info.json with caching.""" | |
| return _safe_read_json(SURAH_INFO_PATH) | |
| def _load_segments_index() -> Dict[str, List[dict]]: | |
| """ | |
| Load precomputed word-span -> time ranges from segments.jsonl. | |
| Returns a dict keyed by "surah:ayah" with list of segments. | |
| """ | |
| index: Dict[str, List[dict]] = {} | |
| if not SEGMENTS_JSONL_PATH.exists(): | |
| return index | |
| try: | |
| with SEGMENTS_JSONL_PATH.open("r", encoding="utf-8") as f: | |
| for line in f: | |
| try: | |
| obj = json.loads(line.strip()) | |
| idx_from = obj.get("index_from") | |
| idx_to = obj.get("index_to") | |
| if not idx_from or not idx_to: | |
| continue | |
| surah_from, ayah_from, word_from = map(int, idx_from.split(":")) | |
| surah_to, ayah_to, word_to = map(int, idx_to.split(":")) | |
| # Only keep spans that live within a single ayah to keep mapping simple | |
| if surah_from != surah_to or ayah_from != ayah_to: | |
| continue | |
| verse_key = f"{surah_from}:{ayah_from}" | |
| segment = { | |
| "start_word": min(word_from, word_to), | |
| "end_word": max(word_from, word_to), | |
| "time_from": float(obj.get("time_from", 0.0)), | |
| "time_to": float(obj.get("time_to", 0.0)), | |
| } | |
| index.setdefault(verse_key, []).append(segment) | |
| except Exception: | |
| # Skip malformed lines but keep reading | |
| continue | |
| except Exception as exc: | |
| print(f"[REFERENCE] Failed to read segments index: {exc}") | |
| return index | |
| def _load_word_segments_index() -> Dict[str, List[dict]]: | |
| """ | |
| Load word-level timestamps from minshawi_word_segments.json. | |
| Returns a dict keyed by "surah:ayah" with list of segment info. | |
| Each entry has 'segments' where each segment is [word_idx_0based, word_idx_1based, start_ms, end_ms]. | |
| """ | |
| index: Dict[str, List[dict]] = {} | |
| if not WORD_SEGMENTS_JSON_PATH.exists(): | |
| print(f"[REFERENCE] Word segments file not found: {WORD_SEGMENTS_JSON_PATH}") | |
| return index | |
| try: | |
| with WORD_SEGMENTS_JSON_PATH.open("r", encoding="utf-8") as f: | |
| data = json.load(f) | |
| for entry in data: | |
| surah = entry.get("surah") | |
| ayah = entry.get("ayah") | |
| segments = entry.get("segments", []) | |
| if surah is not None and ayah is not None: | |
| verse_key = f"{surah}:{ayah}" | |
| index[verse_key] = segments | |
| except Exception as exc: | |
| print(f"[REFERENCE] Failed to read word segments: {exc}") | |
| return index | |
| def _get_word_time_range_from_segments( | |
| verse_key: str, | |
| target_start_word: int, | |
| target_end_word: int, | |
| verse_offsets: Dict[str, float], | |
| word_segments_index: Dict[str, List[list]], | |
| pad_ms: float = 100.0, | |
| ) -> Optional[Tuple[float, float]]: | |
| """ | |
| Get time range for a word span using word-level timestamps. | |
| Word indices are 1-based as used in the app. | |
| Returns (start_sec, end_sec) with padding applied, or None if not found. | |
| """ | |
| word_list = word_segments_index.get(verse_key, []) | |
| if not word_list: | |
| return None | |
| offset = verse_offsets.get(verse_key, 0.0) | |
| pad_sec = pad_ms / 1000.0 | |
| # Find earliest start and latest end for the word range | |
| # Each segment is [word_idx_0based, word_idx_1based, start_ms, end_ms] | |
| earliest_start_ms = None | |
| latest_end_ms = None | |
| for seg in word_list: | |
| if len(seg) < 4: | |
| continue | |
| word_idx_1based = seg[1] # 1-based word index | |
| start_ms = seg[2] | |
| end_ms = seg[3] | |
| if target_start_word <= word_idx_1based <= target_end_word: | |
| if earliest_start_ms is None or start_ms < earliest_start_ms: | |
| earliest_start_ms = start_ms | |
| if latest_end_ms is None or end_ms > latest_end_ms: | |
| latest_end_ms = end_ms | |
| if earliest_start_ms is None or latest_end_ms is None: | |
| return None | |
| # Convert ms to seconds and add offset + padding | |
| start_sec = max(0.0, offset + (earliest_start_ms / 1000.0) - pad_sec) | |
| end_sec = offset + (latest_end_ms / 1000.0) + pad_sec | |
| return (start_sec, end_sec) | |
| def _download_mp3_bytes(url: str) -> Optional[bytes]: | |
| """ | |
| Download MP3 from URL and return raw bytes. | |
| Uses cache to avoid repeated downloads. | |
| """ | |
| import time | |
| if url in _mp3_cache: | |
| print(f"[PROFILE] MP3 cache hit") | |
| return _mp3_cache[url] | |
| from urllib.error import URLError, HTTPError | |
| try: | |
| t0 = time.perf_counter() | |
| headers = { | |
| "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36", | |
| "Accept": "audio/mpeg,audio/*;q=0.9,*/*;q=0.8", | |
| } | |
| request = Request(url, headers=headers) | |
| with urlopen(request, timeout=30) as resp: | |
| data = resp.read() | |
| t1 = time.perf_counter() | |
| print(f"[PROFILE] HTTP download: {t1-t0:.3f}s ({len(data)} bytes)") | |
| if data and len(data) > 100: | |
| _mp3_cache[url] = data | |
| return data | |
| except (URLError, HTTPError) as exc: | |
| print(f"[REFERENCE] Failed to download {url}: {exc}") | |
| except Exception as exc: | |
| print(f"[REFERENCE] Error downloading {url}: {exc}") | |
| return None | |
| def _mp3_bytes_to_data_uri(mp3_bytes: bytes) -> str: | |
| """Convert raw MP3 bytes to data URI for browser playback.""" | |
| import time | |
| t0 = time.perf_counter() | |
| encoded = base64.b64encode(mp3_bytes).decode("ascii") | |
| t1 = time.perf_counter() | |
| print(f"[PROFILE] Base64 encode: {t1-t0:.3f}s") | |
| return f"data:audio/mpeg;base64,{encoded}" | |
| def get_verse_audio_uri(verse_ref: str) -> Optional[str]: | |
| """ | |
| Get audio data URI for a verse - fast path for playback only. | |
| Downloads MP3 and returns data URI directly without any processing. | |
| For single verse, returns the verse audio. | |
| For verse range, returns None (use lazy loading per verse). | |
| """ | |
| import time | |
| t_start = time.perf_counter() | |
| print(f"[PROFILE] get_verse_audio_uri({verse_ref}) started") | |
| if not verse_ref or "-" in verse_ref: | |
| return None # Ranges need per-verse loading | |
| t0 = time.perf_counter() | |
| audio_meta = _load_verse_audio_meta() | |
| t1 = time.perf_counter() | |
| print(f"[PROFILE] Load verse meta: {t1-t0:.3f}s") | |
| if not audio_meta: | |
| return None | |
| meta = audio_meta.get(verse_ref) | |
| if not meta: | |
| return None | |
| url = meta.get("audio_url") | |
| if not url: | |
| return None | |
| mp3_bytes = _download_mp3_bytes(url) | |
| if mp3_bytes: | |
| result = _mp3_bytes_to_data_uri(mp3_bytes) | |
| t_end = time.perf_counter() | |
| print(f"[PROFILE] get_verse_audio_uri TOTAL: {t_end-t_start:.3f}s") | |
| return result | |
| return None | |
| def get_single_verse_audio_uri(verse_key: str) -> Optional[str]: | |
| """ | |
| Get audio data URI for a single verse (for lazy loading). | |
| This is used by the lazy loading system to fetch individual verses | |
| on-demand as the user navigates through a verse range. | |
| Args: | |
| verse_key: Verse key like "2:5" | |
| Returns: | |
| Data URI string or None if failed | |
| """ | |
| audio_meta = _load_verse_audio_meta() | |
| if not audio_meta: | |
| return None | |
| meta = audio_meta.get(verse_key) | |
| if not meta or not meta.get("audio_url"): | |
| return None | |
| mp3_bytes = _download_mp3_bytes(meta["audio_url"]) | |
| if mp3_bytes: | |
| return _mp3_bytes_to_data_uri(mp3_bytes) | |
| return None | |
| def get_verse_audio_path(verse_key: str) -> Optional[str]: | |
| """ | |
| Get audio file path for a single verse (for native Gradio audio player). | |
| Downloads the verse audio to a temporary file and returns the path. | |
| This is used by the native Gradio gr.Audio component which requires | |
| a file path rather than a data URI. | |
| Args: | |
| verse_key: Verse key like "2:5" | |
| Returns: | |
| Path to temporary MP3 file or None if failed | |
| """ | |
| import tempfile | |
| import time | |
| audio_meta = _load_verse_audio_meta() | |
| if not audio_meta: | |
| return None | |
| meta = audio_meta.get(verse_key) | |
| if not meta or not meta.get("audio_url"): | |
| return None | |
| t0 = time.perf_counter() | |
| mp3_bytes = _download_mp3_bytes(meta["audio_url"]) | |
| t1 = time.perf_counter() | |
| if not mp3_bytes: | |
| return None | |
| # Write to temp file that Gradio can serve | |
| # Note: Gradio handles cleanup of temp files | |
| try: | |
| with tempfile.NamedTemporaryFile( | |
| suffix=".mp3", | |
| delete=False, | |
| prefix=f"verse_{verse_key.replace(':', '_')}_" | |
| ) as tmp: | |
| tmp.write(mp3_bytes) | |
| tmp.flush() | |
| t2 = time.perf_counter() | |
| print(f"[PROFILE] get_verse_audio_path({verse_key}): download={t1-t0:.3f}s, write={t2-t1:.3f}s") | |
| return tmp.name | |
| except Exception as exc: | |
| print(f"[REFERENCE] Failed to write temp file for {verse_key}: {exc}") | |
| return None | |
| def get_verse_audio_urls_for_range(verse_ref: str) -> dict: | |
| """ | |
| Get audio URLs (not data URIs) for all verses in a range. | |
| This is the lazy-loading version - returns direct CDN URLs that the browser | |
| will fetch on-demand when the user plays each verse. | |
| Supports: | |
| - Single verse: '2:5' -> Returns dict with one URL | |
| - Verse range: '2:5-2:8' -> Returns dict with URLs for verses 2:5, 2:6, 2:7, 2:8 | |
| - Whole chapter: '2' -> Returns URLs for all verses in chapter 2 | |
| Args: | |
| verse_ref: Verse reference string (e.g., "2:5", "2:5-2:8", "2") | |
| Returns: | |
| Dict with keys: | |
| urls: List[str] - Direct audio URLs for each verse | |
| from_verse: int - Starting verse number | |
| to_verse: int - Ending verse number | |
| chapter: int - Chapter number | |
| verse_keys: List[str] - Verse keys (e.g., ["2:5", "2:6", "2:7"]) | |
| """ | |
| if not verse_ref: | |
| return {"urls": [], "from_verse": 0, "to_verse": 0, "chapter": 0, "verse_keys": []} | |
| # Parse verse range into individual verse keys | |
| verse_keys = parse_verse_range(verse_ref) | |
| if not verse_keys: | |
| print(f"[REFERENCE] Failed to parse verse range: {verse_ref}") | |
| return {"urls": [], "from_verse": 0, "to_verse": 0, "chapter": 0, "verse_keys": []} | |
| # Extract chapter and verse numbers from first and last verse keys | |
| try: | |
| first_parts = verse_keys[0].split(":") | |
| last_parts = verse_keys[-1].split(":") | |
| chapter = int(first_parts[0]) | |
| from_verse = int(first_parts[1]) | |
| to_verse = int(last_parts[1]) | |
| except (ValueError, IndexError): | |
| print(f"[REFERENCE] Failed to extract verse numbers from: {verse_keys}") | |
| return {"urls": [], "from_verse": 0, "to_verse": 0, "chapter": 0, "verse_keys": []} | |
| # Load metadata (cached) | |
| audio_meta = _load_verse_audio_meta() | |
| if not audio_meta: | |
| return {"urls": [], "from_verse": from_verse, "to_verse": to_verse, "chapter": chapter, "verse_keys": []} | |
| # Get URLs for each verse (no downloading) | |
| urls = [] | |
| successful_verses = [] | |
| # Add cache-busting parameter to bypass HF Spaces cache issues | |
| # (net::ERR_CACHE_OPERATION_NOT_SUPPORTED) | |
| import time | |
| cache_buster = int(time.time()) | |
| for verse_key in verse_keys: | |
| meta = audio_meta.get(verse_key) | |
| if meta: | |
| url = meta.get("audio_url") | |
| if url: | |
| urls.append(f"{url}?t={cache_buster}") | |
| successful_verses.append(verse_key) | |
| else: | |
| print(f"[REFERENCE] No audio URL for verse: {verse_key}") | |
| else: | |
| print(f"[REFERENCE] No metadata for verse: {verse_key}") | |
| return { | |
| "urls": urls, | |
| "from_verse": from_verse, | |
| "to_verse": to_verse, | |
| "chapter": chapter, | |
| "verse_keys": successful_verses, | |
| } | |
| def get_verse_audio_uris_for_range(verse_ref: str) -> dict: | |
| """ | |
| Get audio data URIs for all verses in a range. | |
| NOTE: This downloads all audio upfront. For lazy loading, use | |
| get_verse_audio_urls_for_range() instead. | |
| Supports: | |
| - Single verse: '2:5' -> Returns dict with one URI | |
| - Verse range: '2:5-2:8' -> Returns dict with URIs for verses 2:5, 2:6, 2:7, 2:8 | |
| - Whole chapter: '2' -> Returns URIs for all verses in chapter 2 | |
| Args: | |
| verse_ref: Verse reference string (e.g., "2:5", "2:5-2:8", "2") | |
| Returns: | |
| Dict with keys: | |
| uris: List[str] - Data URIs for each verse in the range | |
| from_verse: int - Starting verse number | |
| to_verse: int - Ending verse number | |
| chapter: int - Chapter number | |
| verse_keys: List[str] - Verse keys (e.g., ["2:5", "2:6", "2:7"]) | |
| Returns empty dict with empty uris list if: | |
| - verse_ref is None or empty | |
| - No verses can be parsed from verse_ref | |
| - All verses fail to download | |
| """ | |
| import time | |
| t_start = time.perf_counter() | |
| print(f"[PROFILE] get_verse_audio_uris_for_range({verse_ref}) started") | |
| if not verse_ref: | |
| return {"uris": [], "from_verse": 0, "to_verse": 0, "chapter": 0, "verse_keys": []} | |
| # Parse verse range into individual verse keys | |
| verse_keys = parse_verse_range(verse_ref) | |
| if not verse_keys: | |
| print(f"[REFERENCE] Failed to parse verse range: {verse_ref}") | |
| return {"uris": [], "from_verse": 0, "to_verse": 0, "chapter": 0, "verse_keys": []} | |
| # Extract chapter and verse numbers from first and last verse keys | |
| try: | |
| first_parts = verse_keys[0].split(":") | |
| last_parts = verse_keys[-1].split(":") | |
| chapter = int(first_parts[0]) | |
| from_verse = int(first_parts[1]) | |
| to_verse = int(last_parts[1]) | |
| except (ValueError, IndexError): | |
| print(f"[REFERENCE] Failed to extract verse numbers from: {verse_keys}") | |
| return {"uris": [], "from_verse": 0, "to_verse": 0, "chapter": 0, "verse_keys": []} | |
| # Load audio URIs for each verse | |
| uris = [] | |
| successful_verses = [] | |
| for verse_key in verse_keys: | |
| try: | |
| uri = get_verse_audio_uri(verse_key) | |
| if uri: | |
| uris.append(uri) | |
| successful_verses.append(verse_key) | |
| else: | |
| print(f"[REFERENCE] Failed to load audio for verse: {verse_key}") | |
| except Exception as e: | |
| print(f"[REFERENCE] Error loading audio for verse {verse_key}: {e}") | |
| t_end = time.perf_counter() | |
| print(f"[PROFILE] get_verse_audio_uris_for_range TOTAL: {t_end-t_start:.3f}s (loaded {len(uris)}/{len(verse_keys)} verses)") | |
| # Return structured data | |
| return { | |
| "uris": uris, | |
| "from_verse": from_verse, | |
| "to_verse": to_verse, | |
| "chapter": chapter, | |
| "verse_keys": successful_verses, | |
| } | |
| def _download_audio(url: str, target_sr: int) -> Optional[np.ndarray]: | |
| """ | |
| Download audio from URL and return mono waveform at target_sr. | |
| Uses a tiny in-memory cache to avoid repeated downloads. | |
| Uses audioread backend for MP3 support via ffmpeg. | |
| """ | |
| cache_key = (url, target_sr) | |
| if cache_key in _audio_cache: | |
| return _audio_cache[cache_key] | |
| import tempfile | |
| import time | |
| from urllib.error import URLError, HTTPError | |
| # Patch soundfile to avoid librosa's exception handling bug | |
| # Some versions of soundfile don't have SoundFileRuntimeError | |
| if not hasattr(sf, "SoundFileRuntimeError"): | |
| sf.SoundFileRuntimeError = RuntimeError # type: ignore[attr-defined] | |
| max_retries = 3 | |
| retry_delay = 1.0 # seconds | |
| for attempt in range(max_retries): | |
| try: | |
| # Download with proper headers | |
| headers = { | |
| "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36", | |
| "Accept": "audio/mpeg,audio/*;q=0.9,*/*;q=0.8", | |
| "Accept-Encoding": "identity", # Disable compression to avoid issues | |
| } | |
| request = Request(url, headers=headers) | |
| with urlopen(request, timeout=30) as resp: | |
| data = resp.read() | |
| # Validate we got actual data | |
| if not data or len(data) < 100: | |
| raise ValueError(f"Downloaded data too small ({len(data)} bytes)") | |
| # MP3 files need to be saved to disk for audioread/ffmpeg to process them | |
| # Soundfile (the default librosa backend) doesn't support MP3 format | |
| # So we use a temp file which allows librosa to fall back to audioread backend | |
| with tempfile.NamedTemporaryFile(suffix=".mp3", delete=False) as tmp: | |
| tmp_path = tmp.name | |
| tmp.write(data) | |
| tmp.flush() | |
| try: | |
| # Load with librosa - it will use audioread backend for MP3 files | |
| # The audioread backend uses ffmpeg which handles MP3 perfectly | |
| # Suppress expected warnings about soundfile failing and audioread deprecation | |
| import warnings | |
| with warnings.catch_warnings(): | |
| warnings.filterwarnings("ignore", message="PySoundFile failed") | |
| warnings.filterwarnings("ignore", category=FutureWarning, module="librosa") | |
| audio, _ = librosa.load(tmp_path, sr=target_sr, mono=True, dtype=np.float32) | |
| _audio_cache[cache_key] = audio | |
| return audio | |
| finally: | |
| # Clean up temp file | |
| try: | |
| Path(tmp_path).unlink() | |
| except Exception: | |
| pass | |
| except (URLError, HTTPError) as exc: | |
| if attempt < max_retries - 1: | |
| print(f"[REFERENCE] Network error on attempt {attempt + 1}: {exc}, retrying...") | |
| time.sleep(retry_delay) | |
| retry_delay *= 2 | |
| else: | |
| print(f"[REFERENCE] Failed to download audio {url} after {max_retries} attempts: {exc}") | |
| return None | |
| except Exception as exc: | |
| if attempt < max_retries - 1: | |
| print(f"[REFERENCE] Attempt {attempt + 1} failed: {exc}, retrying in {retry_delay}s...") | |
| time.sleep(retry_delay) | |
| retry_delay *= 2 | |
| else: | |
| print(f"[REFERENCE] Failed to download audio {url}: {exc}") | |
| return None | |
| return None | |
| def _audio_to_data_uri(audio: np.ndarray, sample_rate: int) -> Optional[str]: | |
| """Encode audio array as MP3 data URI for embedding in HTML. | |
| Uses MP3 at 32kbps mono - roughly 8x smaller than WAV. | |
| Falls back to WAV if pydub is not available. | |
| """ | |
| if audio is None or len(audio) == 0: | |
| return None | |
| try: | |
| from pydub import AudioSegment | |
| # Convert to int16 | |
| audio_int16 = (audio * 32767).astype(np.int16) | |
| # Create AudioSegment from raw data | |
| audio_seg = AudioSegment( | |
| data=audio_int16.tobytes(), | |
| sample_width=2, | |
| frame_rate=sample_rate, | |
| channels=1 | |
| ) | |
| # Export as MP3 at low bitrate | |
| buf = io.BytesIO() | |
| audio_seg.export(buf, format="mp3", bitrate="32k") | |
| encoded = base64.b64encode(buf.getvalue()).decode("ascii") | |
| return f"data:audio/mp3;base64,{encoded}" | |
| except ImportError: | |
| # Fallback to WAV if pydub not available | |
| try: | |
| buf = io.BytesIO() | |
| sf.write(buf, audio, sample_rate, format="WAV") | |
| encoded = base64.b64encode(buf.getvalue()).decode("ascii") | |
| return f"data:audio/wav;base64,{encoded}" | |
| except Exception as exc: | |
| print(f"[REFERENCE] Failed to encode audio: {exc}") | |
| return None | |
| except Exception as exc: | |
| print(f"[REFERENCE] Failed to encode audio as MP3: {exc}") | |
| return None | |
| def _get_num_words(surah_info: dict, surah: int, ayah: int) -> int: | |
| """Look up the number of words for a given verse.""" | |
| surah_entry = surah_info.get(str(surah), {}) | |
| for verse_info in surah_entry.get("verses", []): | |
| if verse_info.get("verse") == ayah: | |
| return int(verse_info.get("num_words", 0)) | |
| return 0 | |
| def parse_verse_range(verse_ref: str) -> List[str]: | |
| """ | |
| Convert a verse reference into a list of verse keys. | |
| Supports: | |
| - Single verse: '2:5' -> ['2:5'] | |
| - Verse range: '2:2-2:5' -> ['2:2', '2:3', '2:4', '2:5'] | |
| - Whole chapter: '2' -> ['2:1', '2:2', ..., '2:286'] | |
| Only ranges within a single surah are supported. | |
| """ | |
| if not verse_ref: | |
| return [] | |
| # Handle verse range (e.g., "2:2-2:5") | |
| if "-" in verse_ref: | |
| try: | |
| start_ref, end_ref = verse_ref.split("-", 1) | |
| start_surah, start_ayah = map(int, start_ref.split(":")) | |
| end_surah, end_ayah = map(int, end_ref.split(":")) | |
| if start_surah != end_surah or start_ayah > end_ayah: | |
| return [] | |
| return [f"{start_surah}:{v}" for v in range(start_ayah, end_ayah + 1)] | |
| except Exception: | |
| return [] | |
| # Handle single verse (e.g., "2:5") or whole chapter (e.g., "2") | |
| try: | |
| if ":" in verse_ref: | |
| # Single verse | |
| surah, ayah = map(int, verse_ref.split(":")) | |
| return [f"{surah}:{ayah}"] | |
| else: | |
| # Whole chapter - get all verses from surah_info | |
| surah_num = int(verse_ref) | |
| surah_info = _load_surah_info() or _safe_read_json(SURAH_INFO_PATH) or {} | |
| surah_data = surah_info.get(str(surah_num)) | |
| if surah_data and "num_verses" in surah_data: | |
| num_verses = surah_data["num_verses"] | |
| return [f"{surah_num}:{v}" for v in range(1, num_verses + 1)] | |
| return [] | |
| except Exception: | |
| return [] | |
| def build_reference_audio_for_selection( | |
| verse_ref: str, target_sr: int = 16000 | |
| ) -> Optional[dict]: | |
| """ | |
| Download and stitch reference audio for the selected verses. | |
| Returns a dict with: | |
| full_uri: data URI for the full stitched audio | |
| audio: numpy array waveform | |
| sample_rate: sample rate used | |
| word_map: list of (surah, ayah, word_idx) per word in selection | |
| word_timings: list of (start_sec, end_sec) per word (None if unknown) | |
| verse_offsets: map of verse_key -> start time (sec) within stitched audio | |
| verse_durations: map of verse_key -> duration (sec) | |
| segments_index: segments lookups for slicing | |
| """ | |
| verse_keys = parse_verse_range(verse_ref) | |
| if not verse_keys: | |
| return None | |
| surah_info = _load_surah_info() or {} | |
| audio_meta = _load_verse_audio_meta() | |
| if not audio_meta: | |
| return None | |
| segments_index = _load_segments_index() | |
| combined_audio: List[np.ndarray] = [] | |
| word_map: List[Tuple[int, int, int]] = [] | |
| word_timings: List[Tuple[Optional[float], Optional[float]]] = [] | |
| verse_offsets: Dict[str, float] = {} | |
| verse_durations: Dict[str, float] = {} | |
| total_offset = 0.0 | |
| for verse_key in verse_keys: | |
| meta = audio_meta.get(verse_key) | |
| if not meta: | |
| continue | |
| url = meta.get("audio_url") | |
| if not url: | |
| continue | |
| audio = _download_audio(url, target_sr) | |
| if audio is None or len(audio) == 0: | |
| continue | |
| duration = len(audio) / float(target_sr) | |
| verse_offsets[verse_key] = total_offset | |
| verse_durations[verse_key] = duration | |
| surah_num, ayah_num = map(int, verse_key.split(":")) | |
| num_words = _get_num_words(surah_info, surah_num, ayah_num) | |
| # Map word timings from metadata | |
| seg_map = { | |
| int(entry[0]): (float(entry[1]) / 1000.0, float(entry[2]) / 1000.0) | |
| for entry in (meta.get("segments") or []) | |
| if isinstance(entry, (list, tuple)) and len(entry) == 3 | |
| } | |
| for word_idx in range(1, num_words + 1): | |
| times = seg_map.get(word_idx) | |
| if times: | |
| start, end = times | |
| word_timings.append((total_offset + start, total_offset + end)) | |
| else: | |
| word_timings.append((None, None)) | |
| word_map.append((surah_num, ayah_num, word_idx)) | |
| combined_audio.append(audio) | |
| total_offset += duration | |
| if not combined_audio: | |
| return None | |
| stitched = np.concatenate(combined_audio) | |
| full_uri = _audio_to_data_uri(stitched, target_sr) | |
| return { | |
| "full_uri": full_uri, | |
| "audio": stitched, | |
| "sample_rate": target_sr, | |
| "word_map": word_map, | |
| "word_timings": word_timings, | |
| "verse_offsets": verse_offsets, | |
| "verse_durations": verse_durations, | |
| "segments_index": segments_index, | |
| "word_segments_index": _load_word_segments_index(), | |
| } | |
| def _choose_segment_from_index( | |
| verse_key: str, | |
| target_start_word: int, | |
| target_end_word: int, | |
| verse_offsets: Dict[str, float], | |
| segments_index: Dict[str, List[dict]], | |
| ) -> Optional[Tuple[float, float]]: | |
| """ | |
| Select the tightest segment(s) covering the target word span using precomputed index. | |
| If no single segment covers the range, merges multiple consecutive segments. | |
| """ | |
| candidates = segments_index.get(verse_key, []) | |
| if not candidates: | |
| return None | |
| # Strategy 1: Try to find a single segment that covers the entire range | |
| best_single = None | |
| best_extra = None | |
| for seg in candidates: | |
| start_word = int(seg.get("start_word", 0)) | |
| end_word = int(seg.get("end_word", 0)) | |
| if start_word <= target_start_word and end_word >= target_end_word: | |
| extra = (target_start_word - start_word) + (end_word - target_end_word) | |
| if best_single is None or extra < best_extra or (extra == best_extra and (end_word - start_word) < (best_single["end_word"] - best_single["start_word"])): | |
| best_single = seg | |
| best_extra = extra | |
| if best_single: | |
| # Found a single segment that covers everything | |
| offset = verse_offsets.get(verse_key, 0.0) | |
| return offset + float(best_single["time_from"]), offset + float(best_single["time_to"]) | |
| # Strategy 2: Merge multiple segments to cover the range | |
| # Sort segments by start word | |
| sorted_segs = sorted(candidates, key=lambda s: int(s.get("start_word", 0))) | |
| # Find all segments that overlap with our target range | |
| overlapping = [] | |
| for seg in sorted_segs: | |
| start_word = int(seg.get("start_word", 0)) | |
| end_word = int(seg.get("end_word", 0)) | |
| # Check if segment overlaps with target range | |
| if not (end_word < target_start_word or start_word > target_end_word): | |
| overlapping.append(seg) | |
| if not overlapping: | |
| return None | |
| # Merge: take earliest start time and latest end time | |
| offset = verse_offsets.get(verse_key, 0.0) | |
| earliest_time = min(float(seg.get("time_from", 0)) for seg in overlapping) | |
| latest_time = max(float(seg.get("time_to", 0)) for seg in overlapping) | |
| return offset + earliest_time, offset + latest_time | |
| def get_clip_for_word_span( | |
| reference_data: dict, | |
| start_idx: int, | |
| end_idx: int, | |
| pad_seconds: float = 0.05, | |
| ) -> Optional[str]: | |
| """ | |
| Return a data URI for the reference reciter covering the requested word span. | |
| Uses precomputed segments from segments.jsonl, selecting the smallest segment | |
| that covers the target words (may include extra words). | |
| """ | |
| if not reference_data: | |
| return None | |
| audio = reference_data.get("audio") | |
| sr = reference_data.get("sample_rate") | |
| word_map: List[Tuple[int, int, int]] = reference_data.get("word_map", []) | |
| verse_offsets: Dict[str, float] = reference_data.get("verse_offsets", {}) | |
| segments_index = reference_data.get("segments_index", {}) | |
| word_segments_index = reference_data.get("word_segments_index", {}) | |
| # Import config settings for word segment mode | |
| try: | |
| from config import USE_WORD_SEGMENT_TIMESTAMPS, WORD_SEGMENT_PAD_MS | |
| except ImportError: | |
| USE_WORD_SEGMENT_TIMESTAMPS = False | |
| WORD_SEGMENT_PAD_MS = 100 | |
| if audio is None or sr is None or not word_map: | |
| return None | |
| total_words = len(word_map) | |
| if total_words == 0: | |
| return None | |
| start_idx = max(0, min(start_idx, total_words - 1)) | |
| end_idx = max(0, min(end_idx, total_words - 1)) | |
| if start_idx > end_idx: | |
| start_idx, end_idx = end_idx, start_idx | |
| start_word = word_map[start_idx] | |
| end_word = word_map[end_idx] | |
| # Handle both single-verse and multi-verse spans | |
| if start_word[0] == end_word[0] and start_word[1] == end_word[1]: | |
| # Single verse | |
| verse_key = f"{start_word[0]}:{start_word[1]}" | |
| # Try word-level timestamps first if enabled | |
| segment_range = None | |
| if USE_WORD_SEGMENT_TIMESTAMPS and word_segments_index: | |
| segment_range = _get_word_time_range_from_segments( | |
| verse_key=verse_key, | |
| target_start_word=start_word[2], | |
| target_end_word=end_word[2], | |
| verse_offsets=verse_offsets, | |
| word_segments_index=word_segments_index, | |
| pad_ms=WORD_SEGMENT_PAD_MS, | |
| ) | |
| if segment_range: | |
| clip_start = max(0.0, segment_range[0]) | |
| clip_end = min(len(audio) / float(sr), segment_range[1]) | |
| # Fall back to pause-based segments if word segments didn't work | |
| if segment_range is None: | |
| segment_range = _choose_segment_from_index( | |
| verse_key=verse_key, | |
| target_start_word=start_word[2], | |
| target_end_word=end_word[2], | |
| verse_offsets=verse_offsets, | |
| segments_index=segments_index, | |
| ) | |
| if segment_range is None: | |
| return None | |
| clip_start = max(0.0, segment_range[0] - pad_seconds) | |
| clip_end = min(len(audio) / float(sr), segment_range[1] + pad_seconds) | |
| else: | |
| # Multi-verse span - merge segments across verses | |
| # Collect all verses involved | |
| verses_involved = [] | |
| for idx in range(start_idx, end_idx + 1): | |
| word = word_map[idx] | |
| verse_key = f"{word[0]}:{word[1]}" | |
| if not verses_involved or verses_involved[-1] != verse_key: | |
| verses_involved.append(verse_key) | |
| if not verses_involved: | |
| return None | |
| # Find time range for each verse | |
| earliest_time = None | |
| latest_time = None | |
| for verse_key in verses_involved: | |
| # Determine which words from this verse to include | |
| verse_words = [w for w in word_map[start_idx:end_idx+1] if f"{w[0]}:{w[1]}" == verse_key] | |
| if not verse_words: | |
| continue | |
| # Get the word range within this verse | |
| verse_start_word = min(w[2] for w in verse_words) | |
| verse_end_word = max(w[2] for w in verse_words) | |
| # Try word-level timestamps first if enabled | |
| verse_segment_range = None | |
| if USE_WORD_SEGMENT_TIMESTAMPS and word_segments_index: | |
| verse_segment_range = _get_word_time_range_from_segments( | |
| verse_key=verse_key, | |
| target_start_word=verse_start_word, | |
| target_end_word=verse_end_word, | |
| verse_offsets=verse_offsets, | |
| word_segments_index=word_segments_index, | |
| pad_ms=WORD_SEGMENT_PAD_MS, | |
| ) | |
| # Fall back to pause-based segments | |
| if verse_segment_range is None: | |
| verse_segment_range = _choose_segment_from_index( | |
| verse_key=verse_key, | |
| target_start_word=verse_start_word, | |
| target_end_word=verse_end_word, | |
| verse_offsets=verse_offsets, | |
| segments_index=segments_index, | |
| ) | |
| if verse_segment_range: | |
| if earliest_time is None or verse_segment_range[0] < earliest_time: | |
| earliest_time = verse_segment_range[0] | |
| if latest_time is None or verse_segment_range[1] > latest_time: | |
| latest_time = verse_segment_range[1] | |
| if earliest_time is None or latest_time is None: | |
| return None | |
| # Note: padding for word segments is already applied by _get_word_time_range_from_segments | |
| # For pause-based segments, we apply pad_seconds here | |
| if USE_WORD_SEGMENT_TIMESTAMPS and word_segments_index: | |
| clip_start = max(0.0, earliest_time) | |
| clip_end = min(len(audio) / float(sr), latest_time) | |
| else: | |
| clip_start = max(0.0, earliest_time - pad_seconds) | |
| clip_end = min(len(audio) / float(sr), latest_time + pad_seconds) | |
| start_sample = int(clip_start * sr) | |
| end_sample = max(start_sample + 1, int(clip_end * sr)) | |
| end_sample = min(end_sample, len(audio)) | |
| snippet = audio[start_sample:end_sample] | |
| return _audio_to_data_uri(snippet, sr) | |
| def render_audio_player_html(data_uri: str, title: str) -> str: | |
| """Small helper to render an HTML audio player with a label.""" | |
| if not data_uri: | |
| return "" | |
| # Detect format from data URI | |
| audio_type = "audio/mpeg" if "audio/mp3" in data_uri else "audio/wav" | |
| return f''' | |
| <div style="margin-top: 12px; padding: 12px; border-radius: 10px; background: var(--background-fill-secondary, #f3f4f6); border: 1px solid var(--border-color-primary, #e5e7eb);"> | |
| <div style="font-weight: 700; margin-bottom: 8px; color: var(--body-text-color, inherit);">{title}</div> | |
| <audio controls style="width: 100%;"> | |
| <source src="{data_uri}" type="{audio_type}"> | |
| Your browser does not support the audio element. | |
| </audio> | |
| </div> | |
| ''' | |