| # import re | |
| # import os | |
| # import json | |
| # import math | |
| # import tempfile | |
| # from dataclasses import dataclass | |
| # from typing import List, Optional, Tuple, Dict, Any | |
| # import gradio as gr | |
| # import numpy as np | |
| # import srt | |
| # from pydub import AudioSegment | |
| # from langdetect import DetectorFactory, detect_langs | |
| # DetectorFactory.seed = 0 # deterministic | |
| # # ----------------------------- | |
| # # Data structures | |
| # # ----------------------------- | |
| # @dataclass | |
| # class Cue: | |
| # start: float | |
| # end: float | |
| # text: str | |
| # # ----------------------------- | |
| # # Language utilities | |
| # # ----------------------------- | |
| # LANG_LABELS = ["原文(不指定)", "中文", "English", "日本語"] | |
| # def normalize_lang_code(code: str) -> str: | |
| # c = (code or "").lower() | |
| # if c.startswith("zh"): | |
| # return "中文" | |
| # if c == "en": | |
| # return "English" | |
| # if c == "ja": | |
| # return "日本語" | |
| # return "其他" | |
| # def detect_language_label(text: str) -> Dict[str, Any]: | |
| # t = (text or "").strip() | |
| # if not t: | |
| # return {"raw_code": None, "raw_prob": None, "label": None, "normalized_label": None} | |
| # t = t.replace("\n", " ").strip()[:400] | |
| # try: | |
| # langs = detect_langs(t) | |
| # if not langs: | |
| # return {"raw_code": None, "raw_prob": None, "label": None, "normalized_label": None} | |
| # top = langs[0] | |
| # raw_code = getattr(top, "lang", None) | |
| # raw_prob = float(getattr(top, "prob", 0.0)) if top else None | |
| # norm = normalize_lang_code(raw_code) | |
| # return { | |
| # "raw_code": raw_code, | |
| # "raw_prob": round(raw_prob, 3) if raw_prob is not None else None, | |
| # "label": norm if norm != "其他" else "原文(不指定)", | |
| # "normalized_label": norm, | |
| # } | |
| # except Exception: | |
| # return {"raw_code": None, "raw_prob": None, "label": None, "normalized_label": None} | |
| # def check_declared_mismatch(declared: str, detected_norm: Optional[str]) -> bool: | |
| # if declared == "原文(不指定)": | |
| # return False | |
| # if detected_norm is None: | |
| # return True | |
| # if detected_norm == "其他": | |
| # return True | |
| # return detected_norm != declared | |
| # def analyze_language_for_cues(cues: List[Cue]) -> Dict[str, Any]: | |
| # counts = {"中文": 0, "English": 0, "日本語": 0, "其他": 0} | |
| # for c in cues: | |
| # d = detect_language_label(c.text) | |
| # norm = d.get("normalized_label") | |
| # if norm in counts: | |
| # counts[norm] += 1 | |
| # dominant = max(counts.items(), key=lambda x: x[1])[0] if cues else None | |
| # return {"counts": counts, "dominant_norm": dominant} | |
| # # ----------------------------- | |
| # # Subtitle parsing | |
| # # ----------------------------- | |
| # _TAG_RE = re.compile(r"</?[^>]+?>", re.IGNORECASE) | |
| # _VTT_TIME_RE = re.compile( | |
| # r"(?P<start>\d{2}:\d{2}:\d{2}\.\d{3}|\d{1,2}:\d{2}\.\d{3})\s*-->\s*" | |
| # r"(?P<end>\d{2}:\d{2}:\d{2}\.\d{3}|\d{1,2}:\d{2}\.\d{3})" | |
| # ) | |
| # def _strip_tags(text: str) -> str: | |
| # text = _TAG_RE.sub("", text) | |
| # text = text.replace("<c>", "").replace("</c>", "") | |
| # return text.strip() | |
| # def _time_to_seconds(t: str) -> float: | |
| # t = t.strip().split()[0] | |
| # parts = t.split(":") | |
| # if len(parts) == 3: | |
| # h = int(parts[0]) | |
| # m = int(parts[1]) | |
| # s = float(parts[2]) | |
| # return h * 3600 + m * 60 + s | |
| # if len(parts) == 2: | |
| # m = int(parts[0]) | |
| # s = float(parts[1]) | |
| # return m * 60 + s | |
| # raise ValueError(f"Unsupported time format: {t}") | |
| # def parse_vtt(content: str) -> List[Cue]: | |
| # content = content.replace("\ufeff", "") | |
| # content = re.sub(r"^\s*WEBVTT.*?\n", "", content, flags=re.IGNORECASE) | |
| # blocks = re.split(r"\r?\n\r?\n", content.strip()) | |
| # cues: List[Cue] = [] | |
| # for block in blocks: | |
| # lines = [ln.rstrip("\n") for ln in re.split(r"\r?\n", block) if ln.strip() != ""] | |
| # if not lines: | |
| # continue | |
| # time_line_idx = None | |
| # for idx in range(min(2, len(lines))): | |
| # if "-->" in lines[idx]: | |
| # time_line_idx = idx | |
| # break | |
| # if time_line_idx is None: | |
| # continue | |
| # m = _VTT_TIME_RE.search(lines[time_line_idx]) | |
| # if not m: | |
| # continue | |
| # start = _time_to_seconds(m.group("start")) | |
| # end = _time_to_seconds(m.group("end")) | |
| # if not (math.isfinite(start) and math.isfinite(end)) or end <= start: | |
| # continue | |
| # text_lines = lines[time_line_idx + 1 :] | |
| # text = _strip_tags("\n".join(text_lines)).strip() | |
| # if not text: | |
| # continue | |
| # cues.append(Cue(start=start, end=end, text=text)) | |
| # cues.sort(key=lambda x: x.start) | |
| # return cues | |
| # def parse_srt(content: str) -> List[Cue]: | |
| # content = content.replace("\ufeff", "") | |
| # subs = list(srt.parse(content)) | |
| # cues: List[Cue] = [] | |
| # for sub in subs: | |
| # cues.append( | |
| # Cue( | |
| # start=sub.start.total_seconds(), | |
| # end=sub.end.total_seconds(), | |
| # text=sub.content.strip(), | |
| # ) | |
| # ) | |
| # cues.sort(key=lambda x: x.start) | |
| # return cues | |
| # def parse_subtitle_file(path: Optional[str]) -> List[Cue]: | |
| # if not path: | |
| # return [] | |
| # with open(path, "r", encoding="utf-8") as f: | |
| # content = f.read() | |
| # head = content.lstrip()[:80].upper() | |
| # ext = os.path.splitext(path)[1].lower() | |
| # if "WEBVTT" in head or ext == ".vtt": | |
| # return parse_vtt(content) | |
| # return parse_srt(content) | |
| # # ----------------------------- | |
| # # Alignment | |
| # # ----------------------------- | |
| # def align_by_time( | |
| # a: List[Cue], | |
| # b: List[Cue], | |
| # max_mid_diff: float = 1.5, | |
| # ) -> Tuple[List[Dict[str, Any]], Dict[str, Any]]: | |
| # aligned: List[Dict[str, Any]] = [] | |
| # i, j = 0, 0 | |
| # matched_a = set() | |
| # matched_b = set() | |
| # while i < len(a) and j < len(b): | |
| # x = a[i] | |
| # y = b[j] | |
| # mid_x = (x.start + x.end) / 2 | |
| # mid_y = (y.start + y.end) / 2 | |
| # diff = mid_x - mid_y | |
| # if abs(diff) <= max_mid_diff: | |
| # # global window for "corresponding part" | |
| # g_start = min(x.start, y.start) | |
| # g_end = max(x.end, y.end) | |
| # aligned.append( | |
| # { | |
| # "idx": len(aligned) + 1, | |
| # "start": g_start, | |
| # "end": g_end, | |
| # "en_start": x.start, | |
| # "en_end": x.end, | |
| # "zh_start": y.start, | |
| # "zh_end": y.end, | |
| # "text_en": x.text, | |
| # "text_zh": y.text, | |
| # } | |
| # ) | |
| # matched_a.add(i) | |
| # matched_b.add(j) | |
| # i += 1 | |
| # j += 1 | |
| # elif diff < 0: | |
| # i += 1 | |
| # else: | |
| # j += 1 | |
| # stats = { | |
| # "trackA_total": len(a), | |
| # "trackB_total": len(b), | |
| # "aligned_pairs": len(aligned), | |
| # "trackA_unmatched": len(a) - len(matched_a), | |
| # "trackB_unmatched": len(b) - len(matched_b), | |
| # "max_mid_diff_sec": max_mid_diff, | |
| # } | |
| # return aligned, stats | |
| # # ----------------------------- | |
| # # Audio processing / QC | |
| # # ----------------------------- | |
| # def load_audio(path: str) -> AudioSegment: | |
| # return AudioSegment.from_file(path) | |
| # def segment_to_wav_file(audio: AudioSegment, start_s: float, end_s: float) -> str: | |
| # start_ms = max(0, int(start_s * 1000)) | |
| # end_ms = max(start_ms + 1, int(end_s * 1000)) | |
| # seg = audio[start_ms:end_ms] | |
| # tmp = tempfile.NamedTemporaryFile(delete=False, suffix=".wav") | |
| # tmp.close() | |
| # seg.export(tmp.name, format="wav") | |
| # return tmp.name | |
| # def compute_dbfs(audio: AudioSegment) -> float: | |
| # v = audio.dBFS | |
| # if v == float("-inf"): | |
| # return -120.0 | |
| # return float(v) | |
| # def qc_on_aligned_segments( | |
| # audio_a: AudioSegment, | |
| # audio_b: AudioSegment, | |
| # aligned: List[Dict[str, Any]], | |
| # silence_dbfs_threshold: float = -50.0, | |
| # low_dbfs_threshold: float = -40.0, | |
| # ) -> Tuple[List[Dict[str, Any]], Dict[str, Any]]: | |
| # issues: List[Dict[str, Any]] = [] | |
| # a_levels = [] | |
| # b_levels = [] | |
| # for seg in aligned: | |
| # a_start, a_end = seg["en_start"], seg["en_end"] | |
| # b_start, b_end = seg["zh_start"], seg["zh_end"] | |
| # a = audio_a[int(a_start * 1000) : int(a_end * 1000)] | |
| # b = audio_b[int(b_start * 1000) : int(b_end * 1000)] | |
| # a_dbfs = compute_dbfs(a) | |
| # b_dbfs = compute_dbfs(b) | |
| # a_levels.append(a_dbfs) | |
| # b_levels.append(b_dbfs) | |
| # problems = [] | |
| # if a_dbfs <= silence_dbfs_threshold: | |
| # problems.append("Track A 静音/近静音") | |
| # elif a_dbfs <= low_dbfs_threshold: | |
| # problems.append("Track A 音量偏低") | |
| # if b_dbfs <= silence_dbfs_threshold: | |
| # problems.append("Track B 静音/近静音") | |
| # elif b_dbfs <= low_dbfs_threshold: | |
| # problems.append("Track B 音量偏低") | |
| # if abs(a_dbfs - b_dbfs) >= 12.0: | |
| # problems.append("两路音量差异过大(≥12dB)") | |
| # if problems: | |
| # issues.append( | |
| # { | |
| # "segment": seg["idx"], | |
| # "time": f"{seg['start']:.2f}-{seg['end']:.2f}s", | |
| # "A_time": f"{a_start:.2f}-{a_end:.2f}s", | |
| # "B_time": f"{b_start:.2f}-{b_end:.2f}s", | |
| # "problems": problems, | |
| # "text_A": seg["text_en"][:120] + ("..." if len(seg["text_en"]) > 120 else ""), | |
| # "text_B": seg["text_zh"][:120] + ("..." if len(seg["text_zh"]) > 120 else ""), | |
| # "A_dbfs": round(a_dbfs, 1), | |
| # "B_dbfs": round(b_dbfs, 1), | |
| # } | |
| # ) | |
| # def _safe_mean(xs): | |
| # return float(np.mean(xs)) if xs else 0.0 | |
| # qc_stats = { | |
| # "aligned_pairs": len(aligned), | |
| # "issue_segments": len(issues), | |
| # "A_avg_dbfs": round(_safe_mean(a_levels), 1), | |
| # "B_avg_dbfs": round(_safe_mean(b_levels), 1), | |
| # "A_min_dbfs": round(float(np.min(a_levels)), 1) if a_levels else None, | |
| # "B_min_dbfs": round(float(np.min(b_levels)), 1) if b_levels else None, | |
| # "silence_dbfs_threshold": silence_dbfs_threshold, | |
| # "low_dbfs_threshold": low_dbfs_threshold, | |
| # } | |
| # return issues, qc_stats | |
| # # ----------------------------- | |
| # # Interactive HTML table (buttons per row) | |
| # # ----------------------------- | |
| # def _escape_html(s: str) -> str: | |
| # return (s or "").replace("&", "&").replace("<", "<").replace(">", ">") | |
| # def build_interactive_table_html(aligned: List[Dict[str, Any]]) -> str: | |
| # max_rows = 2000 | |
| # aligned_view = aligned[:max_rows] | |
| # rows_html = [] | |
| # for seg in aligned_view: | |
| # idx = seg["idx"] | |
| # t = f'{seg["start"]:.2f}-{seg["end"]:.2f}' | |
| # a_t = f'{seg["en_start"]:.2f}-{seg["en_end"]:.2f}' | |
| # b_t = f'{seg["zh_start"]:.2f}-{seg["zh_end"]:.2f}' | |
| # a_txt = _escape_html(seg["text_en"]) | |
| # b_txt = _escape_html(seg["text_zh"]) | |
| # a_lang = _escape_html(seg.get("en_lang_label") or "-") | |
| # b_lang = _escape_html(seg.get("zh_lang_label") or "-") | |
| # a_conf = seg.get("en_lang_prob") | |
| # b_conf = seg.get("zh_lang_prob") | |
| # a_conf_s = f"{a_conf:.3f}" if isinstance(a_conf, (int, float)) else "-" | |
| # b_conf_s = f"{b_conf:.3f}" if isinstance(b_conf, (int, float)) else "-" | |
| # a_bad = seg.get("en_lang_mismatch", False) | |
| # b_bad = seg.get("zh_lang_mismatch", False) | |
| # a_cls = "bad" if a_bad else "ok" | |
| # b_cls = "bad" if b_bad else "ok" | |
| # btns = ( | |
| # f'<button class="segbtn a" onclick="__playSeg({idx}, \'a\')">A</button>' | |
| # f'<button class="segbtn b" onclick="__playSeg({idx}, \'b\')">B</button>' | |
| # f'<button class="segbtn both" onclick="__playSeg({idx}, \'both\')">同时</button>' | |
| # ) | |
| # rows_html.append( | |
| # f""" | |
| # <tr> | |
| # <td class="mono">{idx:04d}</td> | |
| # <td class="mono">{t}</td> | |
| # <td class="btncell">{btns}</td> | |
| # <td class="textcell">{a_txt}</td> | |
| # <td class="textcell">{b_txt}</td> | |
| # <td class="mono">{a_t}</td> | |
| # <td class="mono">{b_t}</td> | |
| # <td class="mono {a_cls}">{a_lang} ({a_conf_s})</td> | |
| # <td class="mono {b_cls}">{b_lang} ({b_conf_s})</td> | |
| # </tr> | |
| # """.strip() | |
| # ) | |
| # # IMPORTANT FIX: | |
| # # - __qs must be `#${id}`, not `#${{id}}` | |
| # # - action_box/action_btn must exist in DOM (we keep them and hide via CSS) | |
| # # - Wait for audio src to change before play (avoid playing old segment) | |
| # table_html = f""" | |
| # <div class="segwrap"> | |
| # <div class="note"> | |
| # 点击每行按钮可生成并播放片段(A / B / 同时)。若浏览器阻止自动播放,请在右侧播放器手动点一次播放键。 | |
| # </div> | |
| # <div class="tablewrap"> | |
| # <table class="segtable"> | |
| # <thead> | |
| # <tr> | |
| # <th>#</th> | |
| # <th>Global(s)</th> | |
| # <th>Play</th> | |
| # <th>Track A</th> | |
| # <th>Track B</th> | |
| # <th>A time</th> | |
| # <th>B time</th> | |
| # <th>A Lang</th> | |
| # <th>B Lang</th> | |
| # </tr> | |
| # </thead> | |
| # <tbody> | |
| # {"".join(rows_html)} | |
| # </tbody> | |
| # </table> | |
| # </div> | |
| # </div> | |
| # <style> | |
| # .segwrap {{ margin-top: 8px; }} | |
| # .note {{ font-size: 12px; opacity: 0.85; margin-bottom: 8px; }} | |
| # .tablewrap {{ overflow: auto; max-height: 560px; border: 1px solid rgba(127,127,127,0.25); border-radius: 8px; }} | |
| # table.segtable {{ width: 100%; border-collapse: collapse; }} | |
| # table.segtable th, table.segtable td {{ border-bottom: 1px solid rgba(127,127,127,0.18); padding: 8px; vertical-align: top; }} | |
| # table.segtable thead th {{ position: sticky; top: 0; background: rgba(250,250,250,0.95); z-index: 1; }} | |
| # .mono {{ font-family: ui-monospace, SFMono-Regular, Menlo, Monaco, Consolas, "Liberation Mono", "Courier New", monospace; white-space: nowrap; }} | |
| # .textcell {{ min-width: 320px; white-space: pre-wrap; }} | |
| # .btncell {{ white-space: nowrap; min-width: 150px; }} | |
| # .segbtn {{ | |
| # padding: 4px 10px; | |
| # margin-right: 6px; | |
| # border-radius: 8px; | |
| # border: 1px solid rgba(127,127,127,0.35); | |
| # background: white; | |
| # cursor: pointer; | |
| # font-size: 12px; | |
| # }} | |
| # .segbtn:hover {{ background: rgba(0,0,0,0.04); }} | |
| # .segbtn.both {{ font-weight: 800; }} | |
| # .ok {{ color: inherit; }} | |
| # .bad {{ color: #b00020; font-weight: 800; }} | |
| # </style> | |
| # <script> | |
| # function __gradioAppRoot() {{ | |
| # const ga = document.querySelector("gradio-app"); | |
| # return ga ? ga.shadowRoot : document; | |
| # }} | |
| # function __qs(id) {{ | |
| # const root = __gradioAppRoot(); | |
| # return root.querySelector(`#${{id}}`); | |
| # }} | |
| # function __setTextboxValue(elemId, value) {{ | |
| # const box = __qs(elemId); | |
| # if (!box) return false; | |
| # const input = box.querySelector("textarea, input"); | |
| # if (!input) return false; | |
| # input.value = value; | |
| # input.dispatchEvent(new Event("input", {{ bubbles: true }})); | |
| # return true; | |
| # }} | |
| # function __clickButton(elemId) {{ | |
| # const btn = __qs(elemId); | |
| # if (!btn) return false; | |
| # const realBtn = btn.querySelector("button"); | |
| # if (!realBtn) return false; | |
| # realBtn.click(); | |
| # return true; | |
| # }} | |
| # function __getAudioTag(containerElemId) {{ | |
| # const c = __qs(containerElemId); | |
| # if (!c) return null; | |
| # return c.querySelector("audio"); | |
| # }} | |
| # window.__desiredPlayMode = null; // "a" | "b" | "both" | |
| # window.__desiredPlayNonce = 0; | |
| # function __waitAndPlay(nonce, prevASrc, prevBSrc) {{ | |
| # let tries = 0; | |
| # const maxTries = 80; // ~16s | |
| # const intervalMs = 200; | |
| # const timer = setInterval(() => {{ | |
| # tries += 1; | |
| # if (window.__desiredPlayNonce !== nonce) {{ | |
| # clearInterval(timer); | |
| # return; | |
| # }} | |
| # const aAudio = __getAudioTag("a_audio_seg"); | |
| # const bAudio = __getAudioTag("b_audio_seg"); | |
| # if (!aAudio && !bAudio) {{ | |
| # if (tries >= maxTries) clearInterval(timer); | |
| # return; | |
| # }} | |
| # const mode = window.__desiredPlayMode; | |
| # const aReady = aAudio && (aAudio.src && aAudio.src !== prevASrc) && aAudio.readyState >= 2; | |
| # const bReady = bAudio && (bAudio.src && bAudio.src !== prevBSrc) && bAudio.readyState >= 2; | |
| # const canPlay = | |
| # (mode === "a" && aReady) || | |
| # (mode === "b" && bReady) || | |
| # (mode === "both" && aReady && bReady); | |
| # if (canPlay) {{ | |
| # try {{ | |
| # if (mode === "a") {{ | |
| # if (bAudio) bAudio.pause(); | |
| # aAudio.currentTime = 0; | |
| # aAudio.play(); | |
| # }} else if (mode === "b") {{ | |
| # if (aAudio) aAudio.pause(); | |
| # bAudio.currentTime = 0; | |
| # bAudio.play(); | |
| # }} else if (mode === "both") {{ | |
| # aAudio.currentTime = 0; | |
| # bAudio.currentTime = 0; | |
| # aAudio.play(); | |
| # bAudio.play(); | |
| # }} | |
| # }} catch (e) {{}} | |
| # clearInterval(timer); | |
| # return; | |
| # }} | |
| # if (tries >= maxTries) {{ | |
| # clearInterval(timer); | |
| # }} | |
| # }}, intervalMs); | |
| # }} | |
| # function __playSeg(idx, mode) {{ | |
| # window.__desiredPlayMode = mode; | |
| # window.__desiredPlayNonce += 1; | |
| # const nonce = window.__desiredPlayNonce; | |
| # const aAudio = __getAudioTag("a_audio_seg"); | |
| # const bAudio = __getAudioTag("b_audio_seg"); | |
| # const prevASrc = aAudio ? aAudio.src : ""; | |
| # const prevBSrc = bAudio ? bAudio.src : ""; | |
| # const payload = JSON.stringify({{ idx: idx, mode: mode }}); | |
| # const ok1 = __setTextboxValue("action_box", payload); | |
| # const ok2 = __clickButton("action_btn"); | |
| # if (ok1 && ok2) {{ | |
| # __waitAndPlay(nonce, prevASrc, prevBSrc); | |
| # }} else {{ | |
| # console.warn("Failed to trigger backend action.", ok1, ok2); | |
| # }} | |
| # }} | |
| # </script> | |
| # """.strip() | |
| # if len(aligned) > max_rows: | |
| # table_html = ( | |
| # f"<div style='margin:8px 0;font-size:12px;opacity:.85;'>" | |
| # f"提示:对齐片段数为 {len(aligned)},为保证渲染性能仅展示前 {max_rows} 行。</div>\n" | |
| # + table_html | |
| # ) | |
| # return table_html | |
| # # ----------------------------- | |
| # # Segment generation (core fix: global window + offset) | |
| # # ----------------------------- | |
| # def _pick_window(seg: Dict[str, Any], mode: str, offset_a: float, offset_b: float) -> Tuple[float, float, float, float]: | |
| # """ | |
| # mode: | |
| # - "global": use seg["start"]/["end"] for BOTH tracks | |
| # - "per_track": use seg["en_start"]/["en_end"] for A, seg["zh_start"]/["zh_end"] for B | |
| # offsets are applied per track. | |
| # Returns: (a_start, a_end, b_start, b_end) | |
| # """ | |
| # if mode == "per_track": | |
| # a_start, a_end = seg["en_start"], seg["en_end"] | |
| # b_start, b_end = seg["zh_start"], seg["zh_end"] | |
| # else: | |
| # # global window (recommended) | |
| # a_start, a_end = seg["start"], seg["end"] | |
| # b_start, b_end = seg["start"], seg["end"] | |
| # a_start = max(0.0, a_start + float(offset_a)) | |
| # a_end = max(a_start + 0.01, a_end + float(offset_a)) | |
| # b_start = max(0.0, b_start + float(offset_b)) | |
| # b_end = max(b_start + 0.01, b_end + float(offset_b)) | |
| # return a_start, a_end, b_start, b_end | |
| # # ----------------------------- | |
| # # Gradio callbacks | |
| # # ----------------------------- | |
| # def parse_align_and_qc( | |
| # audio_a_path: Optional[str], | |
| # audio_b_path: Optional[str], | |
| # sub_a_path: Optional[str], | |
| # sub_b_path: Optional[str], | |
| # declared_a_lang: str, | |
| # declared_b_lang: str, | |
| # crop_mode: str, | |
| # offset_a: float, | |
| # offset_b: float, | |
| # max_mid_diff: float, | |
| # silence_th: float, | |
| # low_th: float, | |
| # ): | |
| # if not audio_a_path or not audio_b_path: | |
| # return ( | |
| # [], | |
| # "", | |
| # {"error": "请同时上传 Track A 与 Track B 音频。"}, | |
| # [], | |
| # None, | |
| # None, | |
| # gr.update(choices=[], value=None), | |
| # None, | |
| # ) | |
| # try: | |
| # cues_a = parse_subtitle_file(sub_a_path) if sub_a_path else [] | |
| # cues_b = parse_subtitle_file(sub_b_path) if sub_b_path else [] | |
| # if not cues_a or not cues_b: | |
| # return ( | |
| # [], | |
| # "", | |
| # {"error": "请同时提供两路字幕(SRT/VTT)。", "A_cues": len(cues_a), "B_cues": len(cues_b)}, | |
| # [], | |
| # audio_a_path, | |
| # audio_b_path, | |
| # gr.update(choices=[], value=None), | |
| # None, | |
| # ) | |
| # # overall language stats | |
| # a_lang_stats = analyze_language_for_cues(cues_a) | |
| # b_lang_stats = analyze_language_for_cues(cues_b) | |
| # aligned, align_stats = align_by_time(cues_a, cues_b, max_mid_diff=max_mid_diff) | |
| # # per-segment language + mismatch issues | |
| # lang_mismatch_issues = [] | |
| # for seg in aligned: | |
| # da = detect_language_label(seg["text_en"]) | |
| # db = detect_language_label(seg["text_zh"]) | |
| # seg["en_lang_label"] = da.get("label") | |
| # seg["en_lang_prob"] = da.get("raw_prob") | |
| # seg["en_lang_norm"] = da.get("normalized_label") | |
| # seg["zh_lang_label"] = db.get("label") | |
| # seg["zh_lang_prob"] = db.get("raw_prob") | |
| # seg["zh_lang_norm"] = db.get("normalized_label") | |
| # seg["en_lang_mismatch"] = check_declared_mismatch(declared_a_lang, seg["en_lang_norm"]) | |
| # seg["zh_lang_mismatch"] = check_declared_mismatch(declared_b_lang, seg["zh_lang_norm"]) | |
| # if seg["en_lang_mismatch"] or seg["zh_lang_mismatch"]: | |
| # problems = [] | |
| # if seg["en_lang_mismatch"]: | |
| # problems.append(f"Track A 声明={declared_a_lang} 检测={seg['en_lang_label']}") | |
| # if seg["zh_lang_mismatch"]: | |
| # problems.append(f"Track B 声明={declared_b_lang} 检测={seg['zh_lang_label']}") | |
| # lang_mismatch_issues.append( | |
| # { | |
| # "segment": seg["idx"], | |
| # "time": f"{seg['start']:.2f}-{seg['end']:.2f}s", | |
| # "type": "LanguageMismatch", | |
| # "problems": problems, | |
| # "text_A": seg["text_en"][:120] + ("..." if len(seg["text_en"]) > 120 else ""), | |
| # "text_B": seg["text_zh"][:120] + ("..." if len(seg["text_zh"]) > 120 else ""), | |
| # } | |
| # ) | |
| # # dataframe rows | |
| # rows = [] | |
| # for seg in aligned: | |
| # rows.append( | |
| # [ | |
| # seg["idx"], | |
| # f'{seg["start"]:.2f}', | |
| # f'{seg["end"]:.2f}', | |
| # seg["text_en"], | |
| # seg["text_zh"], | |
| # f'{seg["en_start"]:.2f}-{seg["en_end"]:.2f}', | |
| # f'{seg["zh_start"]:.2f}-{seg["zh_end"]:.2f}', | |
| # ] | |
| # ) | |
| # # audio QC (still based on per-track subtitle times; keep original semantics) | |
| # audio_a = load_audio(audio_a_path) | |
| # audio_b = load_audio(audio_b_path) | |
| # issues_qc, qc_stats = qc_on_aligned_segments( | |
| # audio_a, audio_b, aligned, | |
| # silence_dbfs_threshold=silence_th, | |
| # low_dbfs_threshold=low_th | |
| # ) | |
| # issues_all = lang_mismatch_issues + issues_qc | |
| # stats = { | |
| # "alignment": align_stats, | |
| # "qc": qc_stats, | |
| # "language": { | |
| # "declared": {"TrackA": declared_a_lang, "TrackB": declared_b_lang}, | |
| # "detected_overall": { | |
| # "TrackA_dominant_norm": a_lang_stats["dominant_norm"], | |
| # "TrackB_dominant_norm": b_lang_stats["dominant_norm"], | |
| # "TrackA_counts": a_lang_stats["counts"], | |
| # "TrackB_counts": b_lang_stats["counts"], | |
| # }, | |
| # "segment_mismatch_count": len(lang_mismatch_issues), | |
| # }, | |
| # "segment_crop": { | |
| # "mode": crop_mode, | |
| # "offset_a_sec": offset_a, | |
| # "offset_b_sec": offset_b, | |
| # "note": "若对比播放不对应:优先使用“对齐全局时间(推荐)”,并微调 Track A/B 时间偏移。", | |
| # } | |
| # } | |
| # choices = [f'{seg["idx"]:04d} | {seg["start"]:.2f}-{seg["end"]:.2f}s' for seg in aligned] | |
| # selector_update = gr.update(choices=choices, value=(choices[0] if choices else None)) | |
| # state = { | |
| # "aligned": aligned, | |
| # "audio_a_path": audio_a_path, | |
| # "audio_b_path": audio_b_path, | |
| # "declared": {"TrackA": declared_a_lang, "TrackB": declared_b_lang}, | |
| # "crop_mode": crop_mode, | |
| # "offset_a": float(offset_a), | |
| # "offset_b": float(offset_b), | |
| # } | |
| # html_table = build_interactive_table_html(aligned) | |
| # return rows, html_table, stats, issues_all, audio_a_path, audio_b_path, selector_update, state | |
| # except Exception as e: | |
| # return ( | |
| # [], | |
| # "", | |
| # {"error": f"解析/对齐失败: {str(e)}"}, | |
| # [{"error": str(e)}], | |
| # audio_a_path, | |
| # audio_b_path, | |
| # gr.update(choices=[], value=None), | |
| # None, | |
| # ) | |
| # def _parse_selector_value(v: Optional[str]) -> Optional[int]: | |
| # if not v: | |
| # return None | |
| # m = re.match(r"^\s*(\d+)\s*\|", v) | |
| # if not m: | |
| # return None | |
| # return int(m.group(1)) | |
| # def make_segment_audio_by_idx(idx: int, state: Optional[Dict[str, Any]]): | |
| # if not state or "aligned" not in state: | |
| # return None, None, {"error": "请先完成字幕解析与对齐。"} | |
| # aligned = state["aligned"] | |
| # if idx < 1 or idx > len(aligned): | |
| # return None, None, {"error": "片段索引越界。"} | |
| # seg = aligned[idx - 1] | |
| # audio_a_path = state["audio_a_path"] | |
| # audio_b_path = state["audio_b_path"] | |
| # crop_mode = state.get("crop_mode", "global") | |
| # offset_a = float(state.get("offset_a", 0.0)) | |
| # offset_b = float(state.get("offset_b", 0.0)) | |
| # try: | |
| # audio_a = load_audio(audio_a_path) | |
| # audio_b = load_audio(audio_b_path) | |
| # a_start, a_end, b_start, b_end = _pick_window(seg, crop_mode, offset_a, offset_b) | |
| # a_wav = segment_to_wav_file(audio_a, a_start, a_end) | |
| # b_wav = segment_to_wav_file(audio_b, b_start, b_end) | |
| # info = { | |
| # "segment": idx, | |
| # "global_time": f'{seg["start"]:.2f}-{seg["end"]:.2f}s', | |
| # "crop_mode": crop_mode, | |
| # "offset_a_sec": offset_a, | |
| # "offset_b_sec": offset_b, | |
| # "crop_A_time": f"{a_start:.2f}-{a_end:.2f}s", | |
| # "crop_B_time": f"{b_start:.2f}-{b_end:.2f}s", | |
| # "subtitle_A_time": f'{seg["en_start"]:.2f}-{seg["en_end"]:.2f}s', | |
| # "subtitle_B_time": f'{seg["zh_start"]:.2f}-{seg["zh_end"]:.2f}s', | |
| # "text_A": seg["text_en"], | |
| # "text_B": seg["text_zh"], | |
| # } | |
| # return a_wav, b_wav, info | |
| # except Exception as e: | |
| # return None, None, {"error": f"生成片段音频失败: {str(e)}"} | |
| # def make_segment_audio(selector_value: Optional[str], state: Optional[Dict[str, Any]]): | |
| # idx = _parse_selector_value(selector_value) | |
| # if idx is None: | |
| # return None, None, {"error": "请选择一个有效片段。"} | |
| # return make_segment_audio_by_idx(idx, state) | |
| # def make_segment_audio_from_action(action_json: str, state: Optional[Dict[str, Any]]): | |
| # """ | |
| # action_json: {"idx": <int>, "mode": "a"/"b"/"both"} | |
| # mode is only used by frontend for play policy; backend always returns both segment audios. | |
| # """ | |
| # try: | |
| # payload = json.loads(action_json or "{}") | |
| # idx = int(payload.get("idx")) | |
| # except Exception: | |
| # return None, None, {"error": "动作解析失败(action_json 无效)。"} | |
| # return make_segment_audio_by_idx(idx, state) | |
| # def clear_all(): | |
| # return ( | |
| # None, None, None, None, | |
| # "原文(不指定)", "原文(不指定)", | |
| # "global", 0.0, 0.0, | |
| # [], "", {}, [], | |
| # None, None, | |
| # gr.update(choices=[], value=None), | |
| # None, | |
| # None, None, {}, | |
| # "", | |
| # ) | |
| # # ----------------------------- | |
| # # UI | |
| # # ----------------------------- | |
| # CSS = """ | |
| # /* Keep components in DOM for injected JS to locate them */ | |
| # .dom-hidden { display: none !important; } | |
| # """ | |
| # with gr.Blocks(css=CSS) as demo: | |
| # gr.Markdown( | |
| # """ | |
| # # 可视化语音质检平台(交互逐行播放 + 语言一致性 + 对齐裁剪) | |
| # - 交互表格每行播放:A / B / 同时 | |
| # - “对齐全局时间(推荐)”可显著减少“对比不对应”的听感问题 | |
| # - “时间偏移”用于修正音频头部静音/时间轴偏差 | |
| # """.strip() | |
| # ) | |
| # state = gr.State(value=None) | |
| # # Must exist in DOM (NOT visible=False) | |
| # action_box = gr.Textbox( | |
| # label="__action_box", | |
| # value="", | |
| # elem_id="action_box", | |
| # elem_classes=["dom-hidden"], | |
| # ) | |
| # action_btn = gr.Button( | |
| # "__action_btn", | |
| # elem_id="action_btn", | |
| # elem_classes=["dom-hidden"], | |
| # ) | |
| # with gr.Row(): | |
| # with gr.Column(scale=1): | |
| # gr.Markdown("## 1) 上传文件") | |
| # audio_a = gr.File( | |
| # label="Track A 音频/视频", | |
| # file_types=[".mp3", ".wav", ".m4a", ".flac", ".ogg", ".aac", ".mp4", ".mov", ".avi"], | |
| # type="filepath", | |
| # ) | |
| # audio_b = gr.File( | |
| # label="Track B 音频/视频", | |
| # file_types=[".mp3", ".wav", ".m4a", ".flac", ".ogg", ".aac", ".mp4", ".mov", ".avi"], | |
| # type="filepath", | |
| # ) | |
| # sub_a = gr.File( | |
| # label="Track A 字幕(.srt/.vtt)", | |
| # file_types=[".srt", ".vtt", ".txt"], | |
| # type="filepath", | |
| # ) | |
| # sub_b = gr.File( | |
| # label="Track B 字幕(.srt/.vtt)", | |
| # file_types=[".srt", ".vtt", ".txt"], | |
| # type="filepath", | |
| # ) | |
| # gr.Markdown("## 2) 声明语言(用于一致性检查)") | |
| # declared_a_lang = gr.Dropdown( | |
| # label="Track A 声明语言", | |
| # choices=LANG_LABELS, | |
| # value="原文(不指定)", | |
| # ) | |
| # declared_b_lang = gr.Dropdown( | |
| # label="Track B 声明语言", | |
| # choices=LANG_LABELS, | |
| # value="原文(不指定)", | |
| # ) | |
| # gr.Markdown("## 3) 片段裁剪设置(解决“不对应”)") | |
| # crop_mode = gr.Radio( | |
| # label="裁剪基准", | |
| # choices=[ | |
| # ("对齐全局时间(推荐)", "global"), | |
| # ("各自字幕时间", "per_track"), | |
| # ], | |
| # value="global", | |
| # ) | |
| # offset_a = gr.Slider( | |
| # label="Track A 时间偏移(秒,可为负)", | |
| # minimum=-30.0, | |
| # maximum=30.0, | |
| # value=0.0, | |
| # step=0.1, | |
| # ) | |
| # offset_b = gr.Slider( | |
| # label="Track B 时间偏移(秒,可为负)", | |
| # minimum=-30.0, | |
| # maximum=30.0, | |
| # value=0.0, | |
| # step=0.1, | |
| # ) | |
| # gr.Markdown("## 4) 对齐与质检参数") | |
| # max_mid_diff = gr.Slider( | |
| # label="两路字幕对齐阈值(中点时间差,秒)", | |
| # minimum=0.3, | |
| # maximum=5.0, | |
| # value=1.5, | |
| # step=0.1, | |
| # ) | |
| # silence_th = gr.Slider( | |
| # label="静音/近静音阈值(dBFS)", | |
| # minimum=-80, | |
| # maximum=-20, | |
| # value=-50, | |
| # step=1, | |
| # ) | |
| # low_th = gr.Slider( | |
| # label="音量偏低阈值(dBFS)", | |
| # minimum=-80, | |
| # maximum=-20, | |
| # value=-40, | |
| # step=1, | |
| # ) | |
| # with gr.Row(): | |
| # btn_run = gr.Button("解析对齐 + 质检", variant="primary") | |
| # btn_clear = gr.Button("清空", variant="secondary") | |
| # with gr.Column(scale=2): | |
| # with gr.Tabs(): | |
| # with gr.TabItem("交互播放表格(推荐)"): | |
| # interactive_table = gr.HTML() | |
| # with gr.TabItem("分段对照表(Dataframe)"): | |
| # seg_table = gr.Dataframe( | |
| # headers=["#", "start(s)", "end(s)", "Track A", "Track B", "A time", "B time"], | |
| # datatype=["number", "str", "str", "str", "str", "str", "str"], | |
| # interactive=False, | |
| # wrap=True, | |
| # ) | |
| # with gr.TabItem("统计概览"): | |
| # stats = gr.JSON(label="统计信息(对齐 + QC + 语言一致性 + 裁剪设置)") | |
| # with gr.TabItem("问题片段"): | |
| # issues = gr.JSON(label="问题列表(含 LanguageMismatch)") | |
| # with gr.TabItem("音频对比播放"): | |
| # gr.Markdown("### 原始音频(整段)") | |
| # player_a_full = gr.Audio(label="Track A 原始音频", interactive=False) | |
| # player_b_full = gr.Audio(label="Track B 原始音频", interactive=False) | |
| # gr.Markdown("### 选择片段并生成对比音频(备用方式)") | |
| # selector = gr.Dropdown(label="片段选择", choices=[], value=None) | |
| # btn_make = gr.Button("生成所选片段(两路)", variant="primary") | |
| # # elem_id used by injected JS | |
| # player_a_seg = gr.Audio(label="Track A 片段", interactive=False, elem_id="a_audio_seg") | |
| # player_b_seg = gr.Audio(label="Track B 片段", interactive=False, elem_id="b_audio_seg") | |
| # seg_info = gr.JSON(label="片段信息(含裁剪窗口)") | |
| # btn_run.click( | |
| # fn=parse_align_and_qc, | |
| # inputs=[ | |
| # audio_a, audio_b, sub_a, sub_b, | |
| # declared_a_lang, declared_b_lang, | |
| # crop_mode, offset_a, offset_b, | |
| # max_mid_diff, silence_th, low_th | |
| # ], | |
| # outputs=[ | |
| # seg_table, interactive_table, stats, issues, | |
| # player_a_full, player_b_full, | |
| # selector, state | |
| # ], | |
| # ) | |
| # btn_make.click( | |
| # fn=make_segment_audio, | |
| # inputs=[selector, state], | |
| # outputs=[player_a_seg, player_b_seg, seg_info], | |
| # ) | |
| # # triggered by table buttons | |
| # action_btn.click( | |
| # fn=make_segment_audio_from_action, | |
| # inputs=[action_box, state], | |
| # outputs=[player_a_seg, player_b_seg, seg_info], | |
| # ) | |
| # btn_clear.click( | |
| # fn=clear_all, | |
| # inputs=[], | |
| # outputs=[ | |
| # audio_a, audio_b, sub_a, sub_b, | |
| # declared_a_lang, declared_b_lang, | |
| # crop_mode, offset_a, offset_b, | |
| # seg_table, interactive_table, stats, issues, | |
| # player_a_full, player_b_full, | |
| # selector, state, | |
| # player_a_seg, player_b_seg, seg_info, | |
| # action_box | |
| # ], | |
| # ) | |
| # if __name__ == "__main__": | |
| # demo.launch() | |
| import json | |
| import numpy as np | |
| import gradio as gr | |
| from huggingface_hub import hf_hub_download | |
| import soundfile as sf | |
| # ===================================================== | |
| # 工具函数 | |
| # ===================================================== | |
| def download_from_dataset(repo_id: str, file_path: str) -> str: | |
| repo_id = repo_id.strip() | |
| file_path = file_path.strip() | |
| if not repo_id: | |
| raise gr.Error("Audio / JSON Dataset repo 不能为空") | |
| if not file_path: | |
| raise gr.Error("Audio / JSON 文件路径不能为空") | |
| try: | |
| return hf_hub_download( | |
| repo_id=repo_id, | |
| filename=file_path, | |
| repo_type="dataset", | |
| ) | |
| except Exception as e: | |
| raise gr.Error( | |
| f"无法从 Dataset 下载文件:\n\n" | |
| f"- repo: {repo_id}\n" | |
| f"- path: {file_path}\n\n" | |
| f"错误信息:{str(e)}" | |
| ) | |
| def load_audio(local_audio_path: str): | |
| audio, sr = sf.read(local_audio_path) | |
| if audio.ndim == 2: | |
| audio = audio.mean(axis=1) | |
| return audio.astype(np.float32), sr | |
| def parse_segments(json_path: str): | |
| with open(json_path, "r", encoding="utf-8") as f: | |
| data = json.load(f) | |
| segments = [] | |
| for i, s in enumerate(data.get("segments", [])): | |
| segments.append({ | |
| "row_id": s.get("index", i), | |
| "start": float(s.get("start", 0.0)), | |
| "end": float(s.get("end", 0.0)), | |
| "dur": float(s.get("end", 0.0) - s.get("start", 0.0)), | |
| "status": s.get("status", ""), | |
| "speaker": s.get("speaker", ""), | |
| "gender": s.get("gender", ""), | |
| "age_group": s.get("age_group", ""), | |
| "emotion": s.get("emotion", ""), | |
| "text": s.get("text", "") or "", | |
| }) | |
| return segments, data.get("audio_name", "") | |
| def slice_audio(audio, sr, start, end): | |
| s = max(0, int(start * sr)) | |
| e = min(len(audio), int(end * sr)) | |
| return sr, audio[s:e] | |
| # ===================================================== | |
| # Gradio 回调 | |
| # ===================================================== | |
| def on_load( | |
| audio_repo, | |
| audio_path, | |
| json_repo, | |
| json_path, | |
| ): | |
| # ---- 清理输入 ---- | |
| audio_repo = audio_repo.strip() | |
| audio_path = audio_path.strip() | |
| json_repo = json_repo.strip() | |
| json_path = json_path.strip() | |
| if not audio_repo or not audio_path: | |
| raise gr.Error("请填写音频 Dataset repo 和文件路径") | |
| if not json_repo or not json_path: | |
| raise gr.Error("请填写字幕 JSON Dataset repo 和文件路径") | |
| # ---- 下载 ---- | |
| local_audio = download_from_dataset(audio_repo, audio_path) | |
| local_json = download_from_dataset(json_repo, json_path) | |
| # ---- 加载音频 ---- | |
| try: | |
| audio, sr = load_audio(local_audio) | |
| except Exception as e: | |
| raise gr.Error(f"音频加载失败:{str(e)}") | |
| # ---- 加载 JSON ---- | |
| try: | |
| segments, audio_name = parse_segments(local_json) | |
| except Exception as e: | |
| raise gr.Error(f"字幕 JSON 解析失败:{str(e)}") | |
| if not segments: | |
| raise gr.Error("字幕文件中未找到任何 segments") | |
| rows = [ | |
| [ | |
| s["row_id"], s["start"], s["end"], s["dur"], | |
| s["status"], s["speaker"], s["gender"], | |
| s["age_group"], s["emotion"], s["text"] | |
| ] | |
| for s in segments | |
| ] | |
| info = ( | |
| f"**Audio repo**: `{audio_repo}` \n" | |
| f"**Audio path**: `{audio_path}` \n" | |
| f"**JSON repo**: `{json_repo}` \n" | |
| f"**JSON path**: `{json_path}` \n" | |
| f"**Segments**: {len(segments)} \n" | |
| f"**Sample rate**: {sr} Hz" | |
| ) | |
| state = { | |
| "audio": audio, | |
| "sr": sr, | |
| "segments": segments, | |
| } | |
| return state, rows, info | |
| def on_select_segment(evt: gr.SelectData, state): | |
| row = evt.row_value | |
| start, end = float(row[1]), float(row[2]) | |
| sr, audio_seg = slice_audio( | |
| state["audio"], | |
| state["sr"], | |
| start, | |
| end, | |
| ) | |
| meta = ( | |
| f"- **speaker**: `{row[5]}`\n" | |
| f"- **gender**: `{row[6]}`\n" | |
| f"- **age_group**: `{row[7]}`\n" | |
| f"- **emotion**: `{row[8]}`\n" | |
| f"- **status**: `{row[4]}`" | |
| ) | |
| text = row[9] if row[9].strip() else "(empty)" | |
| return (sr, audio_seg), meta, text | |
| # ===================================================== | |
| # UI | |
| # ===================================================== | |
| with gr.Blocks(title="HF Dataset Audio + Subtitle Explorer") as demo: | |
| gr.Markdown( | |
| "# 🎧 音频 & 字幕分段可视化(Dataset 路径模式)\n" | |
| "分别指定 **音频** 与 **字幕 JSON** 所在的 Dataset 与路径。" | |
| ) | |
| state = gr.State() | |
| with gr.Row(): | |
| audio_repo = gr.Textbox( | |
| label="Audio Dataset repo", | |
| value="AlexTYJ/Multilingual-ASR-Benchmark", | |
| ) | |
| audio_path = gr.Textbox( | |
| label="Audio file path", | |
| placeholder="audio/testbatch/ARE/xxx.wav", | |
| ) | |
| with gr.Row(): | |
| json_repo = gr.Textbox( | |
| label="JSON Dataset repo", | |
| value="AlexTYJ/Multilingual-ASR-Benchmark", | |
| ) | |
| json_path = gr.Textbox( | |
| label="JSON file path", | |
| placeholder="text/ref/testbatch/ARE/xxx.json", | |
| ) | |
| load_btn = gr.Button("加载音频 & 字幕", variant="primary") | |
| info = gr.Markdown() | |
| df = gr.Dataframe( | |
| headers=[ | |
| "row_id", "start", "end", "dur", | |
| "status", "speaker", "gender", | |
| "age_group", "emotion", "text" | |
| ], | |
| wrap=True, | |
| interactive=False, | |
| max_height=420, | |
| ) | |
| with gr.Row(): | |
| audio_out = gr.Audio(label="分段播放", type="numpy") | |
| meta = gr.Markdown() | |
| text = gr.Textbox(label="字幕文本", lines=4) | |
| load_btn.click( | |
| on_load, | |
| inputs=[audio_repo, audio_path, json_repo, json_path], | |
| outputs=[state, df, info], | |
| ) | |
| df.select( | |
| on_select_segment, | |
| inputs=state, | |
| outputs=[audio_out, meta, text], | |
| ) | |
| demo.launch() | |