hetchyy's picture
Upload folder using huggingface_hub
a3b9d0f verified
import os
import gradio as gr
from config import (MFA_SPACE_URL, MFA_TIMEOUT, MFA_PROGRESS_SEGMENT_RATE,
MFA_METHOD, MFA_BEAM, MFA_RETRY_BEAM, MFA_SHARED_CMVN)
# Lowercase special ref names for case-insensitive matching
_SPECIAL_REFS = {"basmala", "isti'adha"}
_BASMALA_TEXT = "ุจูุณู’ู…ู ูฑู„ู„ูŽู‘ู‡ู ูฑู„ุฑูŽู‘ุญู’ู…ูŽูฐู†ู ูฑู„ุฑูŽู‘ุญููŠู…"
_ISTIATHA_TEXT = "ุฃูŽุนููˆุฐู ุจููฑู„ู„ูŽู‘ู‡ู ู…ูู†ูŽ ุงู„ุดูŽู‘ูŠู’ุทูŽุงู†ู ุงู„ุฑูŽู‘ุฌููŠู…"
def _mfa_upload_and_submit(refs, audio_paths,
method=MFA_METHOD, beam=MFA_BEAM, retry_beam=MFA_RETRY_BEAM,
shared_cmvn=MFA_SHARED_CMVN, padding="forward"):
"""Upload audio files and submit alignment batch to the MFA Space.
Returns (event_id, headers, base_url) so the caller can yield a progress
update before blocking on the SSE result stream.
Args:
refs: List of reference strings.
audio_paths: List of audio file paths.
method: Alignment method ("kalpy", "align_one", "python_api", "cli").
beam: Viterbi beam width (default 10).
retry_beam: Retry beam width (default 40).
padding: Gap-padding strategy ("forward", "symmetric", "none").
"""
import requests
hf_token = os.environ.get("HF_TOKEN", "")
headers = {}
if hf_token:
headers["Authorization"] = f"Bearer {hf_token}"
base = MFA_SPACE_URL
# Upload all audio files in a single batched request
files_payload = []
open_handles = []
for path in audio_paths:
fh = open(path, "rb")
open_handles.append(fh)
files_payload.append(("files", (os.path.basename(path), fh, "audio/wav")))
try:
resp = requests.post(
f"{base}/gradio_api/upload",
headers=headers,
files=files_payload,
timeout=MFA_TIMEOUT,
)
resp.raise_for_status()
if "application/json" not in resp.headers.get("content-type", ""):
raise gr.Error(
"MFA Space is not running (may be paused or restarting). "
"Please try again in a minute."
)
uploaded_paths = resp.json()
finally:
for fh in open_handles:
fh.close()
# Build FileData objects
file_data_list = [
{"path": p, "meta": {"_type": "gradio.FileData"}}
for p in uploaded_paths
]
# Submit batch alignment (7 params: refs, files, method, beam, retry_beam, shared_cmvn, padding)
submit_resp = requests.post(
f"{base}/gradio_api/call/align_batch",
headers={**headers, "Content-Type": "application/json"},
json={"data": [refs, file_data_list, method, str(beam), str(retry_beam),
str(shared_cmvn).lower(), padding]},
timeout=MFA_TIMEOUT,
)
submit_resp.raise_for_status()
if "application/json" not in submit_resp.headers.get("content-type", ""):
raise gr.Error(
"MFA Space is not running (may be paused or restarting). "
"Please try again in a minute."
)
event_id = submit_resp.json()["event_id"]
return event_id, headers, base
def _mfa_wait_result(event_id, headers, base):
"""Wait for the MFA SSE stream and return parsed results list."""
import requests
import json
sse_resp = requests.get(
f"{base}/gradio_api/call/align_batch/{event_id}",
headers=headers,
stream=True,
timeout=MFA_TIMEOUT,
)
sse_resp.raise_for_status()
result_data = None
current_event = None
for line in sse_resp.iter_lines(decode_unicode=True):
if line and line.startswith("event: "):
current_event = line[7:]
elif line and line.startswith("data: "):
data_str = line[6:]
if current_event == "complete":
result_data = data_str
elif current_event == "error":
# Gradio 6.x may send null as error data; provide actionable message
if data_str.strip() in ("null", ""):
raise RuntimeError(
"MFA align_batch failed: Space returned null error. "
"This usually means a parameter count mismatch or "
"Gradio input validation failure. Check that the "
"client sends all required parameters."
)
raise RuntimeError(f"MFA align_batch SSE error: {data_str}")
if result_data is None:
raise RuntimeError("No data received from MFA align_batch SSE stream")
parsed = json.loads(result_data)
# Gradio wraps the return value in a list
if isinstance(parsed, list) and len(parsed) == 1:
parsed = parsed[0]
if parsed is None:
raise RuntimeError("MFA align_batch returned null result")
if not isinstance(parsed, dict) or parsed.get("status") != "ok":
raise RuntimeError(f"MFA align_batch failed: {parsed}")
return parsed["results"]
# ---------------------------------------------------------------------------
# MFA split helper (used by pipeline post-processing)
# ---------------------------------------------------------------------------
def mfa_split_timestamps(audio_int16, sample_rate, mfa_refs,
method=MFA_METHOD, beam=MFA_BEAM, retry_beam=MFA_RETRY_BEAM,
shared_cmvn=MFA_SHARED_CMVN):
"""Call MFA to get word timestamps for splitting segments.
Args:
audio_int16: List of int16 audio arrays (one per segment to split).
sample_rate: Audio sample rate.
mfa_refs: List of MFA ref strings (one per segment).
method: Alignment method ("kalpy", "align_one", "python_api", "cli").
beam: Viterbi beam width (default 10).
retry_beam: Retry beam width (default 40).
Returns:
List of results (one per segment), each a list of
{location, start, end} dicts, or None on failure for that segment.
"""
import tempfile
import wave
if not mfa_refs or not audio_int16:
return [None] * len(mfa_refs)
# Write segment audio to temp WAV files
audio_paths = []
for audio in audio_int16:
tmp = tempfile.NamedTemporaryFile(suffix=".wav", delete=False)
with wave.open(tmp.name, "wb") as wf:
wf.setnchannels(1)
wf.setsampwidth(2)
wf.setframerate(sample_rate)
wf.writeframes(audio.tobytes())
audio_paths.append(tmp.name)
try:
event_id, headers, base = _mfa_upload_and_submit(
mfa_refs, audio_paths, method=method, beam=beam, retry_beam=retry_beam,
shared_cmvn=shared_cmvn)
results = _mfa_wait_result(event_id, headers, base)
print(f"[MFA_SPLIT] Got {len(results)} results from MFA API")
out = []
for result in results:
if result.get("status") != "ok":
print(f"[MFA_SPLIT] Segment failed: ref={result.get('ref')} error={result.get('error')}")
out.append(None)
else:
out.append(result.get("words", []))
return out
except Exception as e:
print(f"[MFA_SPLIT] MFA call failed: {e}")
return [None] * len(mfa_refs)
finally:
import os as _os
for p in audio_paths:
try:
_os.unlink(p)
except OSError:
pass
# ---------------------------------------------------------------------------
# Reusable helpers (shared by UI generator and API function)
# ---------------------------------------------------------------------------
def _make_ts_key(result_idx, ref, loc):
"""Build the composite key used in word/letter timestamp dicts."""
is_special = ref.strip().lower() in _SPECIAL_REFS
is_fused = "+" in ref
if is_special:
base_key = f"{ref}:{loc}"
elif is_fused and loc.startswith("0:0:"):
base_key = f"{ref}:{loc}"
else:
base_key = loc
return f"{result_idx}:{base_key}"
def _build_mfa_ref(seg):
"""Build the MFA ref string for a single segment. Returns None to skip."""
ref_from = seg.get("ref_from", "")
ref_to = seg.get("ref_to", "")
confidence = seg.get("confidence", 0)
if not ref_from:
ref_from = seg.get("special_type", "")
ref_to = ref_from
if not ref_from or confidence <= 0:
return None
if ref_from == ref_to:
mfa_ref = ref_from
else:
mfa_ref = f"{ref_from}-{ref_to}"
_is_special_ref = ref_from.strip().lower() in _SPECIAL_REFS
if not _is_special_ref:
matched_text = seg.get("matched_text", "")
if matched_text.startswith(_ISTIATHA_TEXT):
mfa_ref = f"Isti'adha+{mfa_ref}"
elif matched_text.startswith(_BASMALA_TEXT):
mfa_ref = f"Basmala+{mfa_ref}"
return mfa_ref
def _ensure_segment_wavs(segments, segment_dir):
"""Write individual segment WAVs from full.wav on demand (for MFA).
Segments are sliced from the full recording using soundfile's
frame-level random access โ€” no need to load the entire file.
"""
if not segment_dir:
return
full_path = os.path.join(segment_dir, "full.wav")
if not os.path.exists(full_path):
return
import soundfile as sf
info = sf.info(full_path)
sr = info.samplerate
written = 0
for seg in segments:
idx = seg.get("segment", 0) - 1
wav_path = os.path.join(segment_dir, f"seg_{idx}.wav")
if os.path.exists(wav_path):
continue
start_frame = int(seg.get("time_from", 0) * sr)
stop_frame = int(seg.get("time_to", 0) * sr)
audio_slice, _ = sf.read(full_path, start=start_frame, stop=stop_frame, dtype='int16')
sf.write(wav_path, audio_slice, sr, format='WAV', subtype='PCM_16')
written += 1
if written:
print(f"[MFA] Wrote {written} segment WAVs on demand from full.wav")
def _build_mfa_refs(segments, segment_dir):
"""Build MFA refs and audio paths from segments.
Returns (refs, audio_paths, seg_to_result_idx).
"""
refs = []
audio_paths = []
seg_to_result_idx = {}
for seg in segments:
seg_idx = seg.get("segment", 0) - 1
mfa_ref = _build_mfa_ref(seg)
if mfa_ref is None:
continue
audio_path = os.path.join(segment_dir, f"seg_{seg_idx}.wav") if segment_dir else None
if not audio_path or not os.path.exists(audio_path):
continue
seg_to_result_idx[seg_idx] = len(refs)
refs.append(mfa_ref)
audio_paths.append(audio_path)
return refs, audio_paths, seg_to_result_idx
def _assign_letter_groups(letters, word_location):
"""Assign group_id to letters sharing identical (start, end) timestamps."""
if not letters:
return []
result = []
group_id = 0
prev_ts = None
for letter in letters:
ts = (letter.get("start"), letter.get("end"))
if ts != prev_ts:
group_id += 1
prev_ts = ts
result.append({
"char": letter.get("char", ""),
"start": letter.get("start"),
"end": letter.get("end"),
"group_id": f"{word_location}:{group_id}",
})
return result
def _build_timestamp_lookups(results):
"""Build timestamp lookup dicts from MFA results.
Returns (word_timestamps, letter_timestamps, word_to_all_results).
"""
word_timestamps = {}
letter_timestamps = {}
word_to_all_results = {}
for result_idx, result in enumerate(results):
if result.get("status") != "ok":
continue
ref = result.get("ref", "")
is_special = ref.strip().lower() in _SPECIAL_REFS
is_fused = "+" in ref
for word in result.get("words", []):
loc = word.get("location", "")
if loc:
key = _make_ts_key(result_idx, ref, loc)
word_timestamps[key] = (word["start"], word["end"])
letters = word.get("letters")
if letters:
letter_timestamps[key] = _assign_letter_groups(letters, loc)
if not is_special and not (is_fused and loc.startswith("0:0:")):
if loc not in word_to_all_results:
word_to_all_results[loc] = []
word_to_all_results[loc].append(result_idx)
return word_timestamps, letter_timestamps, word_to_all_results
def _build_crossword_groups(results, letter_ts_dict):
"""Build mapping of (key, letter_idx) -> cross-word group_id.
Only checks word boundaries: last letter(s) of word N vs first
letter(s) of word N+1.
"""
crossword_groups = {}
for result_idx, result in enumerate(results):
if result.get("status") != "ok":
continue
ref = result.get("ref", "")
words = result.get("words", [])
for word_i in range(len(words) - 1):
word_a = words[word_i]
word_b = words[word_i + 1]
loc_a = word_a.get("location", "")
loc_b = word_b.get("location", "")
if not loc_a or not loc_b:
continue
key_a = _make_ts_key(result_idx, ref, loc_a)
key_b = _make_ts_key(result_idx, ref, loc_b)
letters_a = letter_ts_dict.get(key_a, [])
letters_b = letter_ts_dict.get(key_b, [])
if not letters_a or not letters_b:
continue
for idx_a in range(len(letters_a) - 1, max(len(letters_a) - 3, -1), -1):
letter_a = letters_a[idx_a]
if letter_a.get("start") is None or letter_a.get("end") is None:
continue
for idx_b in range(min(3, len(letters_b))):
letter_b = letters_b[idx_b]
if letter_b.get("start") is None or letter_b.get("end") is None:
continue
if letter_a["start"] == letter_b["start"] and letter_a["end"] == letter_b["end"]:
group_id = f"xword-{result_idx}-{word_i}"
crossword_groups[(key_a, idx_a)] = group_id
crossword_groups[(key_b, idx_b)] = group_id
return crossword_groups
def _reconstruct_ref_key(seg):
"""Reconstruct the MFA ref key for a segment (for result matching)."""
ref_from = seg.get("ref_from", "")
ref_to = seg.get("ref_to", "")
if not ref_from:
ref_from = seg.get("special_type", "")
ref_to = ref_from
ref_key = f"{ref_from}-{ref_to}" if ref_from != ref_to else ref_from
is_special = ref_from.strip().lower() in _SPECIAL_REFS
if not is_special:
matched_text = seg.get("matched_text", "")
if matched_text.startswith(_ISTIATHA_TEXT):
ref_key = f"Isti'adha+{ref_key}"
elif matched_text.startswith(_BASMALA_TEXT):
ref_key = f"Basmala+{ref_key}"
return ref_key
def _extend_word_timestamps(word_timestamps, segments, seg_to_result_idx,
results, segment_dir):
"""Extend word ends to fill gaps between consecutive words.
Mutates word_timestamps in place.
"""
import wave
for seg in segments:
ref_from = seg.get("ref_from", "")
confidence = seg.get("confidence", 0)
if not ref_from:
ref_from = seg.get("special_type", "")
if not ref_from or confidence <= 0:
continue
seg_idx = seg.get("segment", 0) - 1
result_idx = seg_to_result_idx.get(seg_idx)
if result_idx is None:
continue
ref_key = _reconstruct_ref_key(seg)
seg_word_locs = []
for result in results:
if result.get("ref") == ref_key and result.get("status") == "ok":
for w in result.get("words", []):
loc = w.get("location", "")
if loc:
key = _make_ts_key(result_idx, ref_key, loc)
if key in word_timestamps:
seg_word_locs.append(key)
break
if not seg_word_locs:
continue
# Extend each word's end to the next word's start
for i in range(len(seg_word_locs) - 1):
cur_start, cur_end = word_timestamps[seg_word_locs[i]]
nxt_start, _ = word_timestamps[seg_word_locs[i + 1]]
if nxt_start > cur_end:
word_timestamps[seg_word_locs[i]] = (cur_start, nxt_start)
# Extend first word back to time 0 so highlight starts immediately
first_loc = seg_word_locs[0]
first_start, first_end = word_timestamps[first_loc]
if first_start > 0:
word_timestamps[first_loc] = (0, first_end)
# Extend last word to segment audio duration
last_loc = seg_word_locs[-1]
last_start, last_end = word_timestamps[last_loc]
audio_path = os.path.join(segment_dir, f"seg_{seg_idx}.wav") if segment_dir else None
if audio_path and os.path.exists(audio_path):
with wave.open(audio_path, 'rb') as wf:
seg_duration = wf.getnframes() / wf.getframerate()
if seg_duration > last_end:
word_timestamps[last_loc] = (last_start, seg_duration)
def _build_enriched_json(segments, results, seg_to_result_idx,
word_timestamps, letter_timestamps, granularity,
*, minimal=False):
"""Build enriched segments with word (and optionally letter) timestamps.
When *minimal* is True (API path), each segment only contains
``segment`` number + ``words`` array. When False (UI path), all
original segment fields are preserved.
Returns dict with "segments" key.
"""
from src.core.quran_index import get_quran_index
index = get_quran_index()
include_letters = (granularity == "words+chars")
def _get_word_text(location):
if not location or location.startswith("0:0:"):
return ""
try:
parts = location.split(":")
if len(parts) >= 3:
key = (int(parts[0]), int(parts[1]), int(parts[2]))
idx = index.word_lookup.get(key)
if idx is not None:
return index.words[idx].display_text
except (ValueError, IndexError):
pass
return ""
enriched_segments = []
for seg in segments:
seg_idx = seg.get("segment", 0) - 1
result_idx = seg_to_result_idx.get(seg_idx)
if minimal:
segment_data = {"segment": seg.get("segment", 0)}
else:
segment_data = dict(seg)
if result_idx is not None:
_ref = seg.get("ref_from", "") or seg.get("special_type", "")
is_special = _ref.lower() in _SPECIAL_REFS
special_words = seg.get("matched_text", "").replace(" \u06dd ", " ").split() if is_special else []
for i, result in enumerate(results):
if i != result_idx or result.get("status") != "ok":
continue
words_with_ts = []
for word_idx, word in enumerate(result.get("words", [])):
if word.get("start") is None or word.get("end") is None:
continue
location = word.get("location", "")
if minimal:
# API: compact โ€” [location, start, end] or [location, start, end, letters]
word_entry = [location, round(word["start"], 4), round(word["end"], 4)]
if include_letters and word.get("letters"):
word_entry.append([
[lt.get("char", ""), round(lt["start"], 4), round(lt["end"], 4)]
for lt in word.get("letters", [])
if lt.get("start") is not None
])
words_with_ts.append(word_entry)
else:
# UI: keyed objects with display text
if is_special or location.startswith("0:0:"):
word_text = special_words[word_idx] if word_idx < len(special_words) else ""
else:
word_text = _get_word_text(location)
word_data = {
"word": word_text,
"location": location,
"start": round(word["start"], 4),
"end": round(word["end"], 4),
}
if include_letters and word.get("letters"):
word_data["letters"] = [
{
"char": lt.get("char", ""),
"start": round(lt["start"], 4),
"end": round(lt["end"], 4),
}
for lt in word.get("letters", [])
if lt.get("start") is not None
]
words_with_ts.append(word_data)
if words_with_ts:
segment_data["words"] = words_with_ts
break
enriched_segments.append(segment_data)
return {"segments": enriched_segments}
# ---------------------------------------------------------------------------
# Synchronous API function
# ---------------------------------------------------------------------------
def compute_mfa_timestamps_api(segments, segment_dir, granularity="words",
method=MFA_METHOD, beam=MFA_BEAM, retry_beam=MFA_RETRY_BEAM,
shared_cmvn=MFA_SHARED_CMVN):
"""Run MFA forced alignment and return enriched segments (no UI/HTML).
Args:
segments: List of segment dicts (same format as alignment response).
segment_dir: Path to directory containing per-segment WAV files.
granularity: "words" or "words+chars".
method: Alignment method ("kalpy", "align_one", "python_api", "cli").
beam: Viterbi beam width (default 10).
retry_beam: Retry beam width (default 40).
Returns:
Dict with "segments" key containing enriched segment data.
"""
if not granularity or granularity not in ("words", "words+chars"):
granularity = "words"
# Write individual segment WAVs on demand (sliced from full.wav)
_ensure_segment_wavs(segments, segment_dir)
refs, audio_paths, seg_to_result_idx = _build_mfa_refs(segments, segment_dir)
if not refs:
return {"segments": segments}
event_id, headers, base = _mfa_upload_and_submit(
refs, audio_paths, method=method, beam=beam, retry_beam=retry_beam,
shared_cmvn=shared_cmvn)
results = _mfa_wait_result(event_id, headers, base)
word_ts, letter_ts, _ = _build_timestamp_lookups(results)
_build_crossword_groups(results, letter_ts)
_extend_word_timestamps(word_ts, segments, seg_to_result_idx, results, segment_dir)
return _build_enriched_json(segments, results, seg_to_result_idx,
word_ts, letter_ts, granularity, minimal=True)
# ---------------------------------------------------------------------------
# UI progress bar
# ---------------------------------------------------------------------------
def _ts_progress_bar_html(total_segments, rate, animated=True):
"""Return HTML for a progress bar showing Segment x/N.
When *animated* is False the bar is static at 0 %. When True the CSS fill
animation runs and an img-onerror trick drives the text counter (since
Gradio innerHTML doesn't execute <script> tags).
"""
import random
duration = total_segments * rate
uid = f"tspb{random.randint(0, 999999)}"
fill_anim = f"animation:{uid}-grow {duration}s linear forwards;" if animated else ""
keyframes = f"""<style>
@keyframes {uid}-grow {{
from {{ width:0%; }}
to {{ width:100%; }}
}}
</style>""" if animated else ""
# img onerror executes JS even when injected via innerHTML
counter_js = f'''<img src="data:," style="display:none"
onerror="(function(){{
var t={total_segments},r={rate * 1000},c=0,
el=document.getElementById('{uid}-text');
if(!el)return;
var iv=setInterval(function(){{
c++;
if(c>t+1){{clearInterval(iv);return;}}
if(c>t){{el.textContent='Almost Done...';}}
else{{el.textContent='Segment '+c+'/'+t;}}
}},r);
}})()" />''' if animated else ""
return f'''<div id="{uid}" style="
position:relative; width:100%; height:40px;
background:#e5e7eb; border-radius:8px; overflow:hidden;
font-family:system-ui,sans-serif; font-size:14px;
">
<div id="{uid}-fill" style="
position:absolute; top:0; left:0; height:100%;
width:0%; background:linear-gradient(90deg,#3b82f6,#2563eb);
border-radius:8px; {fill_anim}
"></div>
<span id="{uid}-text" style="
position:absolute; inset:0; display:flex;
align-items:center; justify-content:center;
color:#1f2937; font-weight:600; z-index:1;
text-shadow:0 0 4px rgba(255,255,255,0.8);
">{'Preparing Alignment...' if not animated else f'Segment 0/{total_segments}'}</span>
{keyframes}
{counter_js}
</div>'''
# ---------------------------------------------------------------------------
# UI generator (Gradio โ€” yields progress, injects HTML timestamps)
# ---------------------------------------------------------------------------
def compute_mfa_timestamps(current_html, json_output, segment_dir, cached_log_row=None,
method=MFA_METHOD, beam=MFA_BEAM, retry_beam=MFA_RETRY_BEAM,
shared_cmvn=MFA_SHARED_CMVN):
"""Compute word-level timestamps via MFA forced alignment and inject into HTML.
Generator that yields (output_html, compute_ts_btn, animate_all_html, progress_bar, json_output)
tuples. First yield shows the animated progress bar; final yield contains results with enriched JSON
including word/letter timestamps.
"""
import re
import traceback
if not current_html or '<span class="word"' not in current_html:
yield current_html, gr.update(), gr.update(), gr.update(), gr.update()
return
# Build refs and audio paths using shared helper
segments = json_output.get("segments", []) if json_output else []
# Write individual segment WAVs on demand (sliced from full.wav)
_ensure_segment_wavs(segments, segment_dir)
refs, audio_paths, seg_to_result_idx = _build_mfa_refs(segments, segment_dir)
if not refs:
yield current_html, gr.update(), gr.update(), gr.update(), gr.update()
return
# Yield 1: hide button, show static progress bar at 0/N
total_segments = len(refs)
static_bar = _ts_progress_bar_html(total_segments, MFA_PROGRESS_SEGMENT_RATE, animated=False)
yield (
gr.update(),
gr.update(visible=False),
gr.update(),
gr.update(value=static_bar, visible=True),
gr.update(),
)
# Upload files and submit batch (blocking โ€” bar stays at 0/N)
try:
event_id, mfa_headers, mfa_base = _mfa_upload_and_submit(
refs, audio_paths, method=method, beam=beam, retry_beam=retry_beam,
shared_cmvn=shared_cmvn)
except Exception as e:
traceback.print_exc()
yield (
gr.update(),
gr.update(visible=True, interactive=True, variant="primary"),
gr.update(),
gr.update(visible=False),
gr.update(),
)
raise
# Yield 2: switch to animated bar (counter starts now)
animated_bar = _ts_progress_bar_html(total_segments, MFA_PROGRESS_SEGMENT_RATE, animated=True)
yield (
gr.update(),
gr.update(),
gr.update(),
gr.update(value=animated_bar),
gr.update(),
)
# Wait for MFA result (blocking โ€” animation runs client-side)
try:
results = _mfa_wait_result(event_id, mfa_headers, mfa_base)
except Exception as e:
traceback.print_exc()
yield (
gr.update(),
gr.update(visible=True, interactive=True, variant="primary"),
gr.update(),
gr.update(visible=False),
gr.update(),
)
raise
html, enriched_json = inject_timestamps_into_html(
current_html, segments, results, seg_to_result_idx, segment_dir
)
# Log word and char timestamps to usage logger
if cached_log_row is not None:
try:
import json as _json
from src.core.usage_logger import update_word_timestamps
_ts_log = []
_char_ts_log = []
for result in results:
if result.get("status") != "ok":
continue
_ts_log.append({
"ref": result.get("ref", ""),
"words": [
{"word": w.get("word", ""), "start": round(w["start"], 4), "end": round(w["end"], 4)}
for w in result.get("words", []) if w.get("start") is not None and w.get("end") is not None
],
})
_char_ts_log.append({
"ref": result.get("ref", ""),
"words": [
{
"word": w.get("word", ""),
"location": w.get("location", ""),
"letters": [
{"char": lt.get("char", ""), "start": round(lt["start"], 4), "end": round(lt["end"], 4)}
for lt in w.get("letters", []) if lt.get("start") is not None and lt.get("end") is not None
],
}
for w in result.get("words", []) if w.get("letters")
],
})
update_word_timestamps(
cached_log_row,
_json.dumps(_ts_log),
_json.dumps(_char_ts_log) if any(entry["words"] for entry in _char_ts_log) else None,
)
except Exception as e:
print(f"[USAGE_LOG] Failed to log word timestamps: {e}")
# Final yield: updated HTML, hide progress bar, show Animate All, enriched JSON
animate_all_btn_html = '<button class="animate-all-btn">Animate All</button>'
yield (
html,
gr.update(visible=False),
gr.update(value=animate_all_btn_html, visible=True),
gr.update(visible=False),
enriched_json,
)
# ---------------------------------------------------------------------------
# Reusable HTML timestamp injection (shared by UI generator and Dev tab)
# ---------------------------------------------------------------------------
def inject_timestamps_into_html(current_html, segments, results, seg_to_result_idx, segment_dir):
"""Inject word and char timestamps into rendered segment HTML.
Builds lookups, cross-word groups, extends timestamps, then performs
regex-based injection of data-start/data-end attributes into word and
char spans. Reusable by both the main MFA flow and the Dev tab
log-based flow.
Returns (enriched_html, enriched_json).
"""
import re
import unicodedata
# Build timestamp lookups
word_timestamps, letter_timestamps, word_to_all_results = _build_timestamp_lookups(results)
crossword_groups = _build_crossword_groups(results, letter_timestamps)
_extend_word_timestamps(word_timestamps, segments, seg_to_result_idx, results, segment_dir)
# Inject timestamps into word spans, using segment boundaries to determine result_idx
seg_boundaries = []
for m in re.finditer(r'data-segment-idx="(\d+)"', current_html):
seg_boundaries.append((m.start(), int(m.group(1))))
seg_boundaries.sort(key=lambda x: x[0])
seg_offset_map = {}
for seg in segments:
idx = seg.get("segment", 0) - 1
seg_offset_map[idx] = seg.get("time_from", 0)
def _get_seg_idx_at_pos(pos):
seg_idx = None
for boundary_pos, idx in seg_boundaries:
if boundary_pos > pos:
break
seg_idx = idx
return seg_idx
word_open_re = r'<span class="word"[^>]*>'
def _inject_word_ts(m):
orig = m.group(0)
pos_m = re.search(r'data-pos="([^"]+)"', orig)
if not pos_m:
return orig
pos = pos_m.group(1)
seg_idx = _get_seg_idx_at_pos(m.start())
if seg_idx is None:
return orig
expected_result_idx = seg_to_result_idx.get(seg_idx)
result_idx = None
if pos and not pos.startswith("0:0:"):
candidates = word_to_all_results.get(pos, [])
if candidates:
if len(candidates) == 1:
result_idx = candidates[0]
elif expected_result_idx in candidates:
result_idx = expected_result_idx
else:
result_idx = min(candidates, key=lambda r: abs(r - (expected_result_idx or 0)))
if result_idx is None:
result_idx = expected_result_idx
if result_idx is None:
return orig
key = f"{result_idx}:{pos}"
ts = word_timestamps.get(key)
if not ts:
return orig
seg_offset = seg_offset_map.get(seg_idx, 0)
abs_start = ts[0] + seg_offset
abs_end = ts[1] + seg_offset
return orig[:-1] + f' data-result-idx="{result_idx}" data-start="{abs_start:.4f}" data-end="{abs_end:.4f}">'
html = re.sub(word_open_re, _inject_word_ts, current_html)
# Enable per-segment animate buttons
html = re.sub(r'(<button class="animate-btn"[^>]*?)\s+disabled(?:="[^"]*")?', r'\1', html)
# Create char spans for timestamped words that don't have them yet
# (char spans are deferred from initial render to reduce HTML size)
from src.ui.segments import split_into_char_groups, ZWSP, DAGGER_ALEF
def _create_char_spans(m):
word_open = m.group(1)
inner = m.group(2)
if '<span class="char">' in inner:
return m.group(0) # Already has char spans
chars = []
for g in split_into_char_groups(inner):
if g.startswith(DAGGER_ALEF):
chars.append(f'<span class="char">{ZWSP}{g}</span>')
else:
chars.append(f'<span class="char">{g}</span>')
return f'{word_open}{"".join(chars)}</span>'
html = re.sub(
r'(<span class="word"[^>]*data-start="[\d.]+"[^>]*>)(.*?)</span>',
_create_char_spans,
html,
)
# Stamp char spans with MFA letter timestamps
def _stamp_chars_with_mfa(word_m):
word_open = word_m.group(1)
word_abs_start = float(word_m.group(2))
inner = word_m.group(4)
pos_m = re.search(r'data-pos="([^"]+)"', word_open)
word_pos = pos_m.group(1) if pos_m else None
result_idx_m = re.search(r'data-result-idx="(\d+)"', word_open)
if result_idx_m:
result_idx = int(result_idx_m.group(1))
else:
result_idx = None
if word_pos and not word_pos.startswith("0:0:"):
candidates = word_to_all_results.get(word_pos, [])
if candidates:
if len(candidates) == 1:
result_idx = candidates[0]
else:
result_idx = candidates[0]
key = f"{result_idx}:{word_pos}" if result_idx is not None and word_pos else None
word_ts = word_timestamps.get(key) if key else None
mfa_letters = letter_timestamps.get(key) if key else None
if not mfa_letters or not word_ts:
return word_m.group(0)
word_rel_start = word_ts[0]
char_matches = list(re.finditer(r'<span class="char">([^<]*)</span>', inner))
if not char_matches:
return word_m.group(0)
mfa_chars = [l["char"] for l in mfa_letters]
html_chars = [m.group(1).replace('\u0640', '') for m in char_matches]
CHAR_EQUIVALENTS = {
'ู‰': 'ูŠ',
'ูŠ': 'ู‰',
}
def _first_base(s):
for c in unicodedata.normalize("NFD", s):
if not unicodedata.category(c).startswith('M'):
return c
return s[0] if s else ''
def chars_match(mfa_c, html_c):
if mfa_c == html_c or html_c in mfa_c or mfa_c in html_c:
return True
if CHAR_EQUIVALENTS.get(mfa_c) == html_c:
return True
mb, hb = _first_base(mfa_c), _first_base(html_c)
if mb and hb and (mb == hb or CHAR_EQUIVALENTS.get(mb) == hb):
return True
return False
mfa_idx = 0
char_replacements = []
stamped_html = set()
for html_idx, cm in enumerate(char_matches):
if html_idx in stamped_html:
continue
html_char = html_chars[html_idx]
if mfa_idx < len(mfa_letters):
mfa_char = mfa_chars[mfa_idx]
if chars_match(mfa_char, html_char):
letter = mfa_letters[mfa_idx]
if letter["start"] is None or letter["end"] is None:
if chars_match(mfa_char, html_char) or len(html_char) >= len(mfa_char):
mfa_idx += 1
continue
abs_start = word_abs_start + (letter["start"] - word_rel_start)
abs_end = word_abs_start + (letter["end"] - word_rel_start)
crossword_gid = crossword_groups.get((key, mfa_idx), "")
final_group_id = crossword_gid or letter.get("group_id", "")
char_replacements.append((
cm.start(), cm.end(),
f'<span class="char" data-start="{abs_start:.4f}" data-end="{abs_end:.4f}" data-group-id="{final_group_id}">{cm.group(1)}</span>'
))
mfa_nfd = unicodedata.normalize("NFD", letter["char"])
peek = html_idx + 1
while peek < len(char_matches):
peek_raw = char_matches[peek].group(1).replace('\u0640', '')
if not peek_raw or not all(unicodedata.category(c).startswith('M') for c in peek_raw):
break
if not any(c in mfa_nfd for c in peek_raw):
break
char_replacements.append((
char_matches[peek].start(), char_matches[peek].end(),
f'<span class="char" data-start="{abs_start:.4f}" data-end="{abs_end:.4f}" data-group-id="{final_group_id}">{char_matches[peek].group(1)}</span>'
))
stamped_html.add(peek)
peek += 1
if chars_match(mfa_char, html_char) or len(html_char) >= len(mfa_char):
mfa_idx += 1
stamped_inner = inner
for start, end, replacement in reversed(char_replacements):
stamped_inner = stamped_inner[:start] + replacement + stamped_inner[end:]
return f'{word_open}{stamped_inner}</span>'
html = re.sub(
r'(<span class="word"(?:\s+data-pos="[^"]*")?(?:\s+data-result-idx="\d+")?\s+data-start="([\d.]+)"\s+data-end="([\d.]+)">)((?:<span class="char">.*?</span>)+)</span>',
_stamp_chars_with_mfa,
html,
)
# Build enriched JSON (words only for download)
enriched_json = _build_enriched_json(
segments, results, seg_to_result_idx,
word_timestamps, letter_timestamps, "words",
)
return html, enriched_json