AUDIO_ONE / app.py
ZTXRiley's picture
Update app.py
94483f4 verified
# import re
# import os
# import json
# import math
# import tempfile
# from dataclasses import dataclass
# from typing import List, Optional, Tuple, Dict, Any
# import gradio as gr
# import numpy as np
# import srt
# from pydub import AudioSegment
# from langdetect import DetectorFactory, detect_langs
# DetectorFactory.seed = 0 # deterministic
# # -----------------------------
# # Data structures
# # -----------------------------
# @dataclass
# class Cue:
# start: float
# end: float
# text: str
# # -----------------------------
# # Language utilities
# # -----------------------------
# LANG_LABELS = ["原文(不指定)", "中文", "English", "日本語"]
# def normalize_lang_code(code: str) -> str:
# c = (code or "").lower()
# if c.startswith("zh"):
# return "中文"
# if c == "en":
# return "English"
# if c == "ja":
# return "日本語"
# return "其他"
# def detect_language_label(text: str) -> Dict[str, Any]:
# t = (text or "").strip()
# if not t:
# return {"raw_code": None, "raw_prob": None, "label": None, "normalized_label": None}
# t = t.replace("\n", " ").strip()[:400]
# try:
# langs = detect_langs(t)
# if not langs:
# return {"raw_code": None, "raw_prob": None, "label": None, "normalized_label": None}
# top = langs[0]
# raw_code = getattr(top, "lang", None)
# raw_prob = float(getattr(top, "prob", 0.0)) if top else None
# norm = normalize_lang_code(raw_code)
# return {
# "raw_code": raw_code,
# "raw_prob": round(raw_prob, 3) if raw_prob is not None else None,
# "label": norm if norm != "其他" else "原文(不指定)",
# "normalized_label": norm,
# }
# except Exception:
# return {"raw_code": None, "raw_prob": None, "label": None, "normalized_label": None}
# def check_declared_mismatch(declared: str, detected_norm: Optional[str]) -> bool:
# if declared == "原文(不指定)":
# return False
# if detected_norm is None:
# return True
# if detected_norm == "其他":
# return True
# return detected_norm != declared
# def analyze_language_for_cues(cues: List[Cue]) -> Dict[str, Any]:
# counts = {"中文": 0, "English": 0, "日本語": 0, "其他": 0}
# for c in cues:
# d = detect_language_label(c.text)
# norm = d.get("normalized_label")
# if norm in counts:
# counts[norm] += 1
# dominant = max(counts.items(), key=lambda x: x[1])[0] if cues else None
# return {"counts": counts, "dominant_norm": dominant}
# # -----------------------------
# # Subtitle parsing
# # -----------------------------
# _TAG_RE = re.compile(r"</?[^>]+?>", re.IGNORECASE)
# _VTT_TIME_RE = re.compile(
# r"(?P<start>\d{2}:\d{2}:\d{2}\.\d{3}|\d{1,2}:\d{2}\.\d{3})\s*-->\s*"
# r"(?P<end>\d{2}:\d{2}:\d{2}\.\d{3}|\d{1,2}:\d{2}\.\d{3})"
# )
# def _strip_tags(text: str) -> str:
# text = _TAG_RE.sub("", text)
# text = text.replace("<c>", "").replace("</c>", "")
# return text.strip()
# def _time_to_seconds(t: str) -> float:
# t = t.strip().split()[0]
# parts = t.split(":")
# if len(parts) == 3:
# h = int(parts[0])
# m = int(parts[1])
# s = float(parts[2])
# return h * 3600 + m * 60 + s
# if len(parts) == 2:
# m = int(parts[0])
# s = float(parts[1])
# return m * 60 + s
# raise ValueError(f"Unsupported time format: {t}")
# def parse_vtt(content: str) -> List[Cue]:
# content = content.replace("\ufeff", "")
# content = re.sub(r"^\s*WEBVTT.*?\n", "", content, flags=re.IGNORECASE)
# blocks = re.split(r"\r?\n\r?\n", content.strip())
# cues: List[Cue] = []
# for block in blocks:
# lines = [ln.rstrip("\n") for ln in re.split(r"\r?\n", block) if ln.strip() != ""]
# if not lines:
# continue
# time_line_idx = None
# for idx in range(min(2, len(lines))):
# if "-->" in lines[idx]:
# time_line_idx = idx
# break
# if time_line_idx is None:
# continue
# m = _VTT_TIME_RE.search(lines[time_line_idx])
# if not m:
# continue
# start = _time_to_seconds(m.group("start"))
# end = _time_to_seconds(m.group("end"))
# if not (math.isfinite(start) and math.isfinite(end)) or end <= start:
# continue
# text_lines = lines[time_line_idx + 1 :]
# text = _strip_tags("\n".join(text_lines)).strip()
# if not text:
# continue
# cues.append(Cue(start=start, end=end, text=text))
# cues.sort(key=lambda x: x.start)
# return cues
# def parse_srt(content: str) -> List[Cue]:
# content = content.replace("\ufeff", "")
# subs = list(srt.parse(content))
# cues: List[Cue] = []
# for sub in subs:
# cues.append(
# Cue(
# start=sub.start.total_seconds(),
# end=sub.end.total_seconds(),
# text=sub.content.strip(),
# )
# )
# cues.sort(key=lambda x: x.start)
# return cues
# def parse_subtitle_file(path: Optional[str]) -> List[Cue]:
# if not path:
# return []
# with open(path, "r", encoding="utf-8") as f:
# content = f.read()
# head = content.lstrip()[:80].upper()
# ext = os.path.splitext(path)[1].lower()
# if "WEBVTT" in head or ext == ".vtt":
# return parse_vtt(content)
# return parse_srt(content)
# # -----------------------------
# # Alignment
# # -----------------------------
# def align_by_time(
# a: List[Cue],
# b: List[Cue],
# max_mid_diff: float = 1.5,
# ) -> Tuple[List[Dict[str, Any]], Dict[str, Any]]:
# aligned: List[Dict[str, Any]] = []
# i, j = 0, 0
# matched_a = set()
# matched_b = set()
# while i < len(a) and j < len(b):
# x = a[i]
# y = b[j]
# mid_x = (x.start + x.end) / 2
# mid_y = (y.start + y.end) / 2
# diff = mid_x - mid_y
# if abs(diff) <= max_mid_diff:
# # global window for "corresponding part"
# g_start = min(x.start, y.start)
# g_end = max(x.end, y.end)
# aligned.append(
# {
# "idx": len(aligned) + 1,
# "start": g_start,
# "end": g_end,
# "en_start": x.start,
# "en_end": x.end,
# "zh_start": y.start,
# "zh_end": y.end,
# "text_en": x.text,
# "text_zh": y.text,
# }
# )
# matched_a.add(i)
# matched_b.add(j)
# i += 1
# j += 1
# elif diff < 0:
# i += 1
# else:
# j += 1
# stats = {
# "trackA_total": len(a),
# "trackB_total": len(b),
# "aligned_pairs": len(aligned),
# "trackA_unmatched": len(a) - len(matched_a),
# "trackB_unmatched": len(b) - len(matched_b),
# "max_mid_diff_sec": max_mid_diff,
# }
# return aligned, stats
# # -----------------------------
# # Audio processing / QC
# # -----------------------------
# def load_audio(path: str) -> AudioSegment:
# return AudioSegment.from_file(path)
# def segment_to_wav_file(audio: AudioSegment, start_s: float, end_s: float) -> str:
# start_ms = max(0, int(start_s * 1000))
# end_ms = max(start_ms + 1, int(end_s * 1000))
# seg = audio[start_ms:end_ms]
# tmp = tempfile.NamedTemporaryFile(delete=False, suffix=".wav")
# tmp.close()
# seg.export(tmp.name, format="wav")
# return tmp.name
# def compute_dbfs(audio: AudioSegment) -> float:
# v = audio.dBFS
# if v == float("-inf"):
# return -120.0
# return float(v)
# def qc_on_aligned_segments(
# audio_a: AudioSegment,
# audio_b: AudioSegment,
# aligned: List[Dict[str, Any]],
# silence_dbfs_threshold: float = -50.0,
# low_dbfs_threshold: float = -40.0,
# ) -> Tuple[List[Dict[str, Any]], Dict[str, Any]]:
# issues: List[Dict[str, Any]] = []
# a_levels = []
# b_levels = []
# for seg in aligned:
# a_start, a_end = seg["en_start"], seg["en_end"]
# b_start, b_end = seg["zh_start"], seg["zh_end"]
# a = audio_a[int(a_start * 1000) : int(a_end * 1000)]
# b = audio_b[int(b_start * 1000) : int(b_end * 1000)]
# a_dbfs = compute_dbfs(a)
# b_dbfs = compute_dbfs(b)
# a_levels.append(a_dbfs)
# b_levels.append(b_dbfs)
# problems = []
# if a_dbfs <= silence_dbfs_threshold:
# problems.append("Track A 静音/近静音")
# elif a_dbfs <= low_dbfs_threshold:
# problems.append("Track A 音量偏低")
# if b_dbfs <= silence_dbfs_threshold:
# problems.append("Track B 静音/近静音")
# elif b_dbfs <= low_dbfs_threshold:
# problems.append("Track B 音量偏低")
# if abs(a_dbfs - b_dbfs) >= 12.0:
# problems.append("两路音量差异过大(≥12dB)")
# if problems:
# issues.append(
# {
# "segment": seg["idx"],
# "time": f"{seg['start']:.2f}-{seg['end']:.2f}s",
# "A_time": f"{a_start:.2f}-{a_end:.2f}s",
# "B_time": f"{b_start:.2f}-{b_end:.2f}s",
# "problems": problems,
# "text_A": seg["text_en"][:120] + ("..." if len(seg["text_en"]) > 120 else ""),
# "text_B": seg["text_zh"][:120] + ("..." if len(seg["text_zh"]) > 120 else ""),
# "A_dbfs": round(a_dbfs, 1),
# "B_dbfs": round(b_dbfs, 1),
# }
# )
# def _safe_mean(xs):
# return float(np.mean(xs)) if xs else 0.0
# qc_stats = {
# "aligned_pairs": len(aligned),
# "issue_segments": len(issues),
# "A_avg_dbfs": round(_safe_mean(a_levels), 1),
# "B_avg_dbfs": round(_safe_mean(b_levels), 1),
# "A_min_dbfs": round(float(np.min(a_levels)), 1) if a_levels else None,
# "B_min_dbfs": round(float(np.min(b_levels)), 1) if b_levels else None,
# "silence_dbfs_threshold": silence_dbfs_threshold,
# "low_dbfs_threshold": low_dbfs_threshold,
# }
# return issues, qc_stats
# # -----------------------------
# # Interactive HTML table (buttons per row)
# # -----------------------------
# def _escape_html(s: str) -> str:
# return (s or "").replace("&", "&amp;").replace("<", "&lt;").replace(">", "&gt;")
# def build_interactive_table_html(aligned: List[Dict[str, Any]]) -> str:
# max_rows = 2000
# aligned_view = aligned[:max_rows]
# rows_html = []
# for seg in aligned_view:
# idx = seg["idx"]
# t = f'{seg["start"]:.2f}-{seg["end"]:.2f}'
# a_t = f'{seg["en_start"]:.2f}-{seg["en_end"]:.2f}'
# b_t = f'{seg["zh_start"]:.2f}-{seg["zh_end"]:.2f}'
# a_txt = _escape_html(seg["text_en"])
# b_txt = _escape_html(seg["text_zh"])
# a_lang = _escape_html(seg.get("en_lang_label") or "-")
# b_lang = _escape_html(seg.get("zh_lang_label") or "-")
# a_conf = seg.get("en_lang_prob")
# b_conf = seg.get("zh_lang_prob")
# a_conf_s = f"{a_conf:.3f}" if isinstance(a_conf, (int, float)) else "-"
# b_conf_s = f"{b_conf:.3f}" if isinstance(b_conf, (int, float)) else "-"
# a_bad = seg.get("en_lang_mismatch", False)
# b_bad = seg.get("zh_lang_mismatch", False)
# a_cls = "bad" if a_bad else "ok"
# b_cls = "bad" if b_bad else "ok"
# btns = (
# f'<button class="segbtn a" onclick="__playSeg({idx}, \'a\')">A</button>'
# f'<button class="segbtn b" onclick="__playSeg({idx}, \'b\')">B</button>'
# f'<button class="segbtn both" onclick="__playSeg({idx}, \'both\')">同时</button>'
# )
# rows_html.append(
# f"""
# <tr>
# <td class="mono">{idx:04d}</td>
# <td class="mono">{t}</td>
# <td class="btncell">{btns}</td>
# <td class="textcell">{a_txt}</td>
# <td class="textcell">{b_txt}</td>
# <td class="mono">{a_t}</td>
# <td class="mono">{b_t}</td>
# <td class="mono {a_cls}">{a_lang} ({a_conf_s})</td>
# <td class="mono {b_cls}">{b_lang} ({b_conf_s})</td>
# </tr>
# """.strip()
# )
# # IMPORTANT FIX:
# # - __qs must be `#${id}`, not `#${{id}}`
# # - action_box/action_btn must exist in DOM (we keep them and hide via CSS)
# # - Wait for audio src to change before play (avoid playing old segment)
# table_html = f"""
# <div class="segwrap">
# <div class="note">
# 点击每行按钮可生成并播放片段(A / B / 同时)。若浏览器阻止自动播放,请在右侧播放器手动点一次播放键。
# </div>
# <div class="tablewrap">
# <table class="segtable">
# <thead>
# <tr>
# <th>#</th>
# <th>Global(s)</th>
# <th>Play</th>
# <th>Track A</th>
# <th>Track B</th>
# <th>A time</th>
# <th>B time</th>
# <th>A Lang</th>
# <th>B Lang</th>
# </tr>
# </thead>
# <tbody>
# {"".join(rows_html)}
# </tbody>
# </table>
# </div>
# </div>
# <style>
# .segwrap {{ margin-top: 8px; }}
# .note {{ font-size: 12px; opacity: 0.85; margin-bottom: 8px; }}
# .tablewrap {{ overflow: auto; max-height: 560px; border: 1px solid rgba(127,127,127,0.25); border-radius: 8px; }}
# table.segtable {{ width: 100%; border-collapse: collapse; }}
# table.segtable th, table.segtable td {{ border-bottom: 1px solid rgba(127,127,127,0.18); padding: 8px; vertical-align: top; }}
# table.segtable thead th {{ position: sticky; top: 0; background: rgba(250,250,250,0.95); z-index: 1; }}
# .mono {{ font-family: ui-monospace, SFMono-Regular, Menlo, Monaco, Consolas, "Liberation Mono", "Courier New", monospace; white-space: nowrap; }}
# .textcell {{ min-width: 320px; white-space: pre-wrap; }}
# .btncell {{ white-space: nowrap; min-width: 150px; }}
# .segbtn {{
# padding: 4px 10px;
# margin-right: 6px;
# border-radius: 8px;
# border: 1px solid rgba(127,127,127,0.35);
# background: white;
# cursor: pointer;
# font-size: 12px;
# }}
# .segbtn:hover {{ background: rgba(0,0,0,0.04); }}
# .segbtn.both {{ font-weight: 800; }}
# .ok {{ color: inherit; }}
# .bad {{ color: #b00020; font-weight: 800; }}
# </style>
# <script>
# function __gradioAppRoot() {{
# const ga = document.querySelector("gradio-app");
# return ga ? ga.shadowRoot : document;
# }}
# function __qs(id) {{
# const root = __gradioAppRoot();
# return root.querySelector(`#${{id}}`);
# }}
# function __setTextboxValue(elemId, value) {{
# const box = __qs(elemId);
# if (!box) return false;
# const input = box.querySelector("textarea, input");
# if (!input) return false;
# input.value = value;
# input.dispatchEvent(new Event("input", {{ bubbles: true }}));
# return true;
# }}
# function __clickButton(elemId) {{
# const btn = __qs(elemId);
# if (!btn) return false;
# const realBtn = btn.querySelector("button");
# if (!realBtn) return false;
# realBtn.click();
# return true;
# }}
# function __getAudioTag(containerElemId) {{
# const c = __qs(containerElemId);
# if (!c) return null;
# return c.querySelector("audio");
# }}
# window.__desiredPlayMode = null; // "a" | "b" | "both"
# window.__desiredPlayNonce = 0;
# function __waitAndPlay(nonce, prevASrc, prevBSrc) {{
# let tries = 0;
# const maxTries = 80; // ~16s
# const intervalMs = 200;
# const timer = setInterval(() => {{
# tries += 1;
# if (window.__desiredPlayNonce !== nonce) {{
# clearInterval(timer);
# return;
# }}
# const aAudio = __getAudioTag("a_audio_seg");
# const bAudio = __getAudioTag("b_audio_seg");
# if (!aAudio && !bAudio) {{
# if (tries >= maxTries) clearInterval(timer);
# return;
# }}
# const mode = window.__desiredPlayMode;
# const aReady = aAudio && (aAudio.src && aAudio.src !== prevASrc) && aAudio.readyState >= 2;
# const bReady = bAudio && (bAudio.src && bAudio.src !== prevBSrc) && bAudio.readyState >= 2;
# const canPlay =
# (mode === "a" && aReady) ||
# (mode === "b" && bReady) ||
# (mode === "both" && aReady && bReady);
# if (canPlay) {{
# try {{
# if (mode === "a") {{
# if (bAudio) bAudio.pause();
# aAudio.currentTime = 0;
# aAudio.play();
# }} else if (mode === "b") {{
# if (aAudio) aAudio.pause();
# bAudio.currentTime = 0;
# bAudio.play();
# }} else if (mode === "both") {{
# aAudio.currentTime = 0;
# bAudio.currentTime = 0;
# aAudio.play();
# bAudio.play();
# }}
# }} catch (e) {{}}
# clearInterval(timer);
# return;
# }}
# if (tries >= maxTries) {{
# clearInterval(timer);
# }}
# }}, intervalMs);
# }}
# function __playSeg(idx, mode) {{
# window.__desiredPlayMode = mode;
# window.__desiredPlayNonce += 1;
# const nonce = window.__desiredPlayNonce;
# const aAudio = __getAudioTag("a_audio_seg");
# const bAudio = __getAudioTag("b_audio_seg");
# const prevASrc = aAudio ? aAudio.src : "";
# const prevBSrc = bAudio ? bAudio.src : "";
# const payload = JSON.stringify({{ idx: idx, mode: mode }});
# const ok1 = __setTextboxValue("action_box", payload);
# const ok2 = __clickButton("action_btn");
# if (ok1 && ok2) {{
# __waitAndPlay(nonce, prevASrc, prevBSrc);
# }} else {{
# console.warn("Failed to trigger backend action.", ok1, ok2);
# }}
# }}
# </script>
# """.strip()
# if len(aligned) > max_rows:
# table_html = (
# f"<div style='margin:8px 0;font-size:12px;opacity:.85;'>"
# f"提示:对齐片段数为 {len(aligned)},为保证渲染性能仅展示前 {max_rows} 行。</div>\n"
# + table_html
# )
# return table_html
# # -----------------------------
# # Segment generation (core fix: global window + offset)
# # -----------------------------
# def _pick_window(seg: Dict[str, Any], mode: str, offset_a: float, offset_b: float) -> Tuple[float, float, float, float]:
# """
# mode:
# - "global": use seg["start"]/["end"] for BOTH tracks
# - "per_track": use seg["en_start"]/["en_end"] for A, seg["zh_start"]/["zh_end"] for B
# offsets are applied per track.
# Returns: (a_start, a_end, b_start, b_end)
# """
# if mode == "per_track":
# a_start, a_end = seg["en_start"], seg["en_end"]
# b_start, b_end = seg["zh_start"], seg["zh_end"]
# else:
# # global window (recommended)
# a_start, a_end = seg["start"], seg["end"]
# b_start, b_end = seg["start"], seg["end"]
# a_start = max(0.0, a_start + float(offset_a))
# a_end = max(a_start + 0.01, a_end + float(offset_a))
# b_start = max(0.0, b_start + float(offset_b))
# b_end = max(b_start + 0.01, b_end + float(offset_b))
# return a_start, a_end, b_start, b_end
# # -----------------------------
# # Gradio callbacks
# # -----------------------------
# def parse_align_and_qc(
# audio_a_path: Optional[str],
# audio_b_path: Optional[str],
# sub_a_path: Optional[str],
# sub_b_path: Optional[str],
# declared_a_lang: str,
# declared_b_lang: str,
# crop_mode: str,
# offset_a: float,
# offset_b: float,
# max_mid_diff: float,
# silence_th: float,
# low_th: float,
# ):
# if not audio_a_path or not audio_b_path:
# return (
# [],
# "",
# {"error": "请同时上传 Track A 与 Track B 音频。"},
# [],
# None,
# None,
# gr.update(choices=[], value=None),
# None,
# )
# try:
# cues_a = parse_subtitle_file(sub_a_path) if sub_a_path else []
# cues_b = parse_subtitle_file(sub_b_path) if sub_b_path else []
# if not cues_a or not cues_b:
# return (
# [],
# "",
# {"error": "请同时提供两路字幕(SRT/VTT)。", "A_cues": len(cues_a), "B_cues": len(cues_b)},
# [],
# audio_a_path,
# audio_b_path,
# gr.update(choices=[], value=None),
# None,
# )
# # overall language stats
# a_lang_stats = analyze_language_for_cues(cues_a)
# b_lang_stats = analyze_language_for_cues(cues_b)
# aligned, align_stats = align_by_time(cues_a, cues_b, max_mid_diff=max_mid_diff)
# # per-segment language + mismatch issues
# lang_mismatch_issues = []
# for seg in aligned:
# da = detect_language_label(seg["text_en"])
# db = detect_language_label(seg["text_zh"])
# seg["en_lang_label"] = da.get("label")
# seg["en_lang_prob"] = da.get("raw_prob")
# seg["en_lang_norm"] = da.get("normalized_label")
# seg["zh_lang_label"] = db.get("label")
# seg["zh_lang_prob"] = db.get("raw_prob")
# seg["zh_lang_norm"] = db.get("normalized_label")
# seg["en_lang_mismatch"] = check_declared_mismatch(declared_a_lang, seg["en_lang_norm"])
# seg["zh_lang_mismatch"] = check_declared_mismatch(declared_b_lang, seg["zh_lang_norm"])
# if seg["en_lang_mismatch"] or seg["zh_lang_mismatch"]:
# problems = []
# if seg["en_lang_mismatch"]:
# problems.append(f"Track A 声明={declared_a_lang} 检测={seg['en_lang_label']}")
# if seg["zh_lang_mismatch"]:
# problems.append(f"Track B 声明={declared_b_lang} 检测={seg['zh_lang_label']}")
# lang_mismatch_issues.append(
# {
# "segment": seg["idx"],
# "time": f"{seg['start']:.2f}-{seg['end']:.2f}s",
# "type": "LanguageMismatch",
# "problems": problems,
# "text_A": seg["text_en"][:120] + ("..." if len(seg["text_en"]) > 120 else ""),
# "text_B": seg["text_zh"][:120] + ("..." if len(seg["text_zh"]) > 120 else ""),
# }
# )
# # dataframe rows
# rows = []
# for seg in aligned:
# rows.append(
# [
# seg["idx"],
# f'{seg["start"]:.2f}',
# f'{seg["end"]:.2f}',
# seg["text_en"],
# seg["text_zh"],
# f'{seg["en_start"]:.2f}-{seg["en_end"]:.2f}',
# f'{seg["zh_start"]:.2f}-{seg["zh_end"]:.2f}',
# ]
# )
# # audio QC (still based on per-track subtitle times; keep original semantics)
# audio_a = load_audio(audio_a_path)
# audio_b = load_audio(audio_b_path)
# issues_qc, qc_stats = qc_on_aligned_segments(
# audio_a, audio_b, aligned,
# silence_dbfs_threshold=silence_th,
# low_dbfs_threshold=low_th
# )
# issues_all = lang_mismatch_issues + issues_qc
# stats = {
# "alignment": align_stats,
# "qc": qc_stats,
# "language": {
# "declared": {"TrackA": declared_a_lang, "TrackB": declared_b_lang},
# "detected_overall": {
# "TrackA_dominant_norm": a_lang_stats["dominant_norm"],
# "TrackB_dominant_norm": b_lang_stats["dominant_norm"],
# "TrackA_counts": a_lang_stats["counts"],
# "TrackB_counts": b_lang_stats["counts"],
# },
# "segment_mismatch_count": len(lang_mismatch_issues),
# },
# "segment_crop": {
# "mode": crop_mode,
# "offset_a_sec": offset_a,
# "offset_b_sec": offset_b,
# "note": "若对比播放不对应:优先使用“对齐全局时间(推荐)”,并微调 Track A/B 时间偏移。",
# }
# }
# choices = [f'{seg["idx"]:04d} | {seg["start"]:.2f}-{seg["end"]:.2f}s' for seg in aligned]
# selector_update = gr.update(choices=choices, value=(choices[0] if choices else None))
# state = {
# "aligned": aligned,
# "audio_a_path": audio_a_path,
# "audio_b_path": audio_b_path,
# "declared": {"TrackA": declared_a_lang, "TrackB": declared_b_lang},
# "crop_mode": crop_mode,
# "offset_a": float(offset_a),
# "offset_b": float(offset_b),
# }
# html_table = build_interactive_table_html(aligned)
# return rows, html_table, stats, issues_all, audio_a_path, audio_b_path, selector_update, state
# except Exception as e:
# return (
# [],
# "",
# {"error": f"解析/对齐失败: {str(e)}"},
# [{"error": str(e)}],
# audio_a_path,
# audio_b_path,
# gr.update(choices=[], value=None),
# None,
# )
# def _parse_selector_value(v: Optional[str]) -> Optional[int]:
# if not v:
# return None
# m = re.match(r"^\s*(\d+)\s*\|", v)
# if not m:
# return None
# return int(m.group(1))
# def make_segment_audio_by_idx(idx: int, state: Optional[Dict[str, Any]]):
# if not state or "aligned" not in state:
# return None, None, {"error": "请先完成字幕解析与对齐。"}
# aligned = state["aligned"]
# if idx < 1 or idx > len(aligned):
# return None, None, {"error": "片段索引越界。"}
# seg = aligned[idx - 1]
# audio_a_path = state["audio_a_path"]
# audio_b_path = state["audio_b_path"]
# crop_mode = state.get("crop_mode", "global")
# offset_a = float(state.get("offset_a", 0.0))
# offset_b = float(state.get("offset_b", 0.0))
# try:
# audio_a = load_audio(audio_a_path)
# audio_b = load_audio(audio_b_path)
# a_start, a_end, b_start, b_end = _pick_window(seg, crop_mode, offset_a, offset_b)
# a_wav = segment_to_wav_file(audio_a, a_start, a_end)
# b_wav = segment_to_wav_file(audio_b, b_start, b_end)
# info = {
# "segment": idx,
# "global_time": f'{seg["start"]:.2f}-{seg["end"]:.2f}s',
# "crop_mode": crop_mode,
# "offset_a_sec": offset_a,
# "offset_b_sec": offset_b,
# "crop_A_time": f"{a_start:.2f}-{a_end:.2f}s",
# "crop_B_time": f"{b_start:.2f}-{b_end:.2f}s",
# "subtitle_A_time": f'{seg["en_start"]:.2f}-{seg["en_end"]:.2f}s',
# "subtitle_B_time": f'{seg["zh_start"]:.2f}-{seg["zh_end"]:.2f}s',
# "text_A": seg["text_en"],
# "text_B": seg["text_zh"],
# }
# return a_wav, b_wav, info
# except Exception as e:
# return None, None, {"error": f"生成片段音频失败: {str(e)}"}
# def make_segment_audio(selector_value: Optional[str], state: Optional[Dict[str, Any]]):
# idx = _parse_selector_value(selector_value)
# if idx is None:
# return None, None, {"error": "请选择一个有效片段。"}
# return make_segment_audio_by_idx(idx, state)
# def make_segment_audio_from_action(action_json: str, state: Optional[Dict[str, Any]]):
# """
# action_json: {"idx": <int>, "mode": "a"/"b"/"both"}
# mode is only used by frontend for play policy; backend always returns both segment audios.
# """
# try:
# payload = json.loads(action_json or "{}")
# idx = int(payload.get("idx"))
# except Exception:
# return None, None, {"error": "动作解析失败(action_json 无效)。"}
# return make_segment_audio_by_idx(idx, state)
# def clear_all():
# return (
# None, None, None, None,
# "原文(不指定)", "原文(不指定)",
# "global", 0.0, 0.0,
# [], "", {}, [],
# None, None,
# gr.update(choices=[], value=None),
# None,
# None, None, {},
# "",
# )
# # -----------------------------
# # UI
# # -----------------------------
# CSS = """
# /* Keep components in DOM for injected JS to locate them */
# .dom-hidden { display: none !important; }
# """
# with gr.Blocks(css=CSS) as demo:
# gr.Markdown(
# """
# # 可视化语音质检平台(交互逐行播放 + 语言一致性 + 对齐裁剪)
# - 交互表格每行播放:A / B / 同时
# - “对齐全局时间(推荐)”可显著减少“对比不对应”的听感问题
# - “时间偏移”用于修正音频头部静音/时间轴偏差
# """.strip()
# )
# state = gr.State(value=None)
# # Must exist in DOM (NOT visible=False)
# action_box = gr.Textbox(
# label="__action_box",
# value="",
# elem_id="action_box",
# elem_classes=["dom-hidden"],
# )
# action_btn = gr.Button(
# "__action_btn",
# elem_id="action_btn",
# elem_classes=["dom-hidden"],
# )
# with gr.Row():
# with gr.Column(scale=1):
# gr.Markdown("## 1) 上传文件")
# audio_a = gr.File(
# label="Track A 音频/视频",
# file_types=[".mp3", ".wav", ".m4a", ".flac", ".ogg", ".aac", ".mp4", ".mov", ".avi"],
# type="filepath",
# )
# audio_b = gr.File(
# label="Track B 音频/视频",
# file_types=[".mp3", ".wav", ".m4a", ".flac", ".ogg", ".aac", ".mp4", ".mov", ".avi"],
# type="filepath",
# )
# sub_a = gr.File(
# label="Track A 字幕(.srt/.vtt)",
# file_types=[".srt", ".vtt", ".txt"],
# type="filepath",
# )
# sub_b = gr.File(
# label="Track B 字幕(.srt/.vtt)",
# file_types=[".srt", ".vtt", ".txt"],
# type="filepath",
# )
# gr.Markdown("## 2) 声明语言(用于一致性检查)")
# declared_a_lang = gr.Dropdown(
# label="Track A 声明语言",
# choices=LANG_LABELS,
# value="原文(不指定)",
# )
# declared_b_lang = gr.Dropdown(
# label="Track B 声明语言",
# choices=LANG_LABELS,
# value="原文(不指定)",
# )
# gr.Markdown("## 3) 片段裁剪设置(解决“不对应”)")
# crop_mode = gr.Radio(
# label="裁剪基准",
# choices=[
# ("对齐全局时间(推荐)", "global"),
# ("各自字幕时间", "per_track"),
# ],
# value="global",
# )
# offset_a = gr.Slider(
# label="Track A 时间偏移(秒,可为负)",
# minimum=-30.0,
# maximum=30.0,
# value=0.0,
# step=0.1,
# )
# offset_b = gr.Slider(
# label="Track B 时间偏移(秒,可为负)",
# minimum=-30.0,
# maximum=30.0,
# value=0.0,
# step=0.1,
# )
# gr.Markdown("## 4) 对齐与质检参数")
# max_mid_diff = gr.Slider(
# label="两路字幕对齐阈值(中点时间差,秒)",
# minimum=0.3,
# maximum=5.0,
# value=1.5,
# step=0.1,
# )
# silence_th = gr.Slider(
# label="静音/近静音阈值(dBFS)",
# minimum=-80,
# maximum=-20,
# value=-50,
# step=1,
# )
# low_th = gr.Slider(
# label="音量偏低阈值(dBFS)",
# minimum=-80,
# maximum=-20,
# value=-40,
# step=1,
# )
# with gr.Row():
# btn_run = gr.Button("解析对齐 + 质检", variant="primary")
# btn_clear = gr.Button("清空", variant="secondary")
# with gr.Column(scale=2):
# with gr.Tabs():
# with gr.TabItem("交互播放表格(推荐)"):
# interactive_table = gr.HTML()
# with gr.TabItem("分段对照表(Dataframe)"):
# seg_table = gr.Dataframe(
# headers=["#", "start(s)", "end(s)", "Track A", "Track B", "A time", "B time"],
# datatype=["number", "str", "str", "str", "str", "str", "str"],
# interactive=False,
# wrap=True,
# )
# with gr.TabItem("统计概览"):
# stats = gr.JSON(label="统计信息(对齐 + QC + 语言一致性 + 裁剪设置)")
# with gr.TabItem("问题片段"):
# issues = gr.JSON(label="问题列表(含 LanguageMismatch)")
# with gr.TabItem("音频对比播放"):
# gr.Markdown("### 原始音频(整段)")
# player_a_full = gr.Audio(label="Track A 原始音频", interactive=False)
# player_b_full = gr.Audio(label="Track B 原始音频", interactive=False)
# gr.Markdown("### 选择片段并生成对比音频(备用方式)")
# selector = gr.Dropdown(label="片段选择", choices=[], value=None)
# btn_make = gr.Button("生成所选片段(两路)", variant="primary")
# # elem_id used by injected JS
# player_a_seg = gr.Audio(label="Track A 片段", interactive=False, elem_id="a_audio_seg")
# player_b_seg = gr.Audio(label="Track B 片段", interactive=False, elem_id="b_audio_seg")
# seg_info = gr.JSON(label="片段信息(含裁剪窗口)")
# btn_run.click(
# fn=parse_align_and_qc,
# inputs=[
# audio_a, audio_b, sub_a, sub_b,
# declared_a_lang, declared_b_lang,
# crop_mode, offset_a, offset_b,
# max_mid_diff, silence_th, low_th
# ],
# outputs=[
# seg_table, interactive_table, stats, issues,
# player_a_full, player_b_full,
# selector, state
# ],
# )
# btn_make.click(
# fn=make_segment_audio,
# inputs=[selector, state],
# outputs=[player_a_seg, player_b_seg, seg_info],
# )
# # triggered by table buttons
# action_btn.click(
# fn=make_segment_audio_from_action,
# inputs=[action_box, state],
# outputs=[player_a_seg, player_b_seg, seg_info],
# )
# btn_clear.click(
# fn=clear_all,
# inputs=[],
# outputs=[
# audio_a, audio_b, sub_a, sub_b,
# declared_a_lang, declared_b_lang,
# crop_mode, offset_a, offset_b,
# seg_table, interactive_table, stats, issues,
# player_a_full, player_b_full,
# selector, state,
# player_a_seg, player_b_seg, seg_info,
# action_box
# ],
# )
# if __name__ == "__main__":
# demo.launch()
import json
import numpy as np
import gradio as gr
from huggingface_hub import hf_hub_download
import soundfile as sf
# =====================================================
# 工具函数
# =====================================================
def download_from_dataset(repo_id: str, file_path: str) -> str:
repo_id = repo_id.strip()
file_path = file_path.strip()
if not repo_id:
raise gr.Error("Audio / JSON Dataset repo 不能为空")
if not file_path:
raise gr.Error("Audio / JSON 文件路径不能为空")
try:
return hf_hub_download(
repo_id=repo_id,
filename=file_path,
repo_type="dataset",
)
except Exception as e:
raise gr.Error(
f"无法从 Dataset 下载文件:\n\n"
f"- repo: {repo_id}\n"
f"- path: {file_path}\n\n"
f"错误信息:{str(e)}"
)
def load_audio(local_audio_path: str):
audio, sr = sf.read(local_audio_path)
if audio.ndim == 2:
audio = audio.mean(axis=1)
return audio.astype(np.float32), sr
def parse_segments(json_path: str):
with open(json_path, "r", encoding="utf-8") as f:
data = json.load(f)
segments = []
for i, s in enumerate(data.get("segments", [])):
segments.append({
"row_id": s.get("index", i),
"start": float(s.get("start", 0.0)),
"end": float(s.get("end", 0.0)),
"dur": float(s.get("end", 0.0) - s.get("start", 0.0)),
"status": s.get("status", ""),
"speaker": s.get("speaker", ""),
"gender": s.get("gender", ""),
"age_group": s.get("age_group", ""),
"emotion": s.get("emotion", ""),
"text": s.get("text", "") or "",
})
return segments, data.get("audio_name", "")
def slice_audio(audio, sr, start, end):
s = max(0, int(start * sr))
e = min(len(audio), int(end * sr))
return sr, audio[s:e]
# =====================================================
# Gradio 回调
# =====================================================
def on_load(
audio_repo,
audio_path,
json_repo,
json_path,
):
# ---- 清理输入 ----
audio_repo = audio_repo.strip()
audio_path = audio_path.strip()
json_repo = json_repo.strip()
json_path = json_path.strip()
if not audio_repo or not audio_path:
raise gr.Error("请填写音频 Dataset repo 和文件路径")
if not json_repo or not json_path:
raise gr.Error("请填写字幕 JSON Dataset repo 和文件路径")
# ---- 下载 ----
local_audio = download_from_dataset(audio_repo, audio_path)
local_json = download_from_dataset(json_repo, json_path)
# ---- 加载音频 ----
try:
audio, sr = load_audio(local_audio)
except Exception as e:
raise gr.Error(f"音频加载失败:{str(e)}")
# ---- 加载 JSON ----
try:
segments, audio_name = parse_segments(local_json)
except Exception as e:
raise gr.Error(f"字幕 JSON 解析失败:{str(e)}")
if not segments:
raise gr.Error("字幕文件中未找到任何 segments")
rows = [
[
s["row_id"], s["start"], s["end"], s["dur"],
s["status"], s["speaker"], s["gender"],
s["age_group"], s["emotion"], s["text"]
]
for s in segments
]
info = (
f"**Audio repo**: `{audio_repo}` \n"
f"**Audio path**: `{audio_path}` \n"
f"**JSON repo**: `{json_repo}` \n"
f"**JSON path**: `{json_path}` \n"
f"**Segments**: {len(segments)} \n"
f"**Sample rate**: {sr} Hz"
)
state = {
"audio": audio,
"sr": sr,
"segments": segments,
}
return state, rows, info
def on_select_segment(evt: gr.SelectData, state):
row = evt.row_value
start, end = float(row[1]), float(row[2])
sr, audio_seg = slice_audio(
state["audio"],
state["sr"],
start,
end,
)
meta = (
f"- **speaker**: `{row[5]}`\n"
f"- **gender**: `{row[6]}`\n"
f"- **age_group**: `{row[7]}`\n"
f"- **emotion**: `{row[8]}`\n"
f"- **status**: `{row[4]}`"
)
text = row[9] if row[9].strip() else "(empty)"
return (sr, audio_seg), meta, text
# =====================================================
# UI
# =====================================================
with gr.Blocks(title="HF Dataset Audio + Subtitle Explorer") as demo:
gr.Markdown(
"# 🎧 音频 & 字幕分段可视化(Dataset 路径模式)\n"
"分别指定 **音频** 与 **字幕 JSON** 所在的 Dataset 与路径。"
)
state = gr.State()
with gr.Row():
audio_repo = gr.Textbox(
label="Audio Dataset repo",
value="AlexTYJ/Multilingual-ASR-Benchmark",
)
audio_path = gr.Textbox(
label="Audio file path",
placeholder="audio/testbatch/ARE/xxx.wav",
)
with gr.Row():
json_repo = gr.Textbox(
label="JSON Dataset repo",
value="AlexTYJ/Multilingual-ASR-Benchmark",
)
json_path = gr.Textbox(
label="JSON file path",
placeholder="text/ref/testbatch/ARE/xxx.json",
)
load_btn = gr.Button("加载音频 & 字幕", variant="primary")
info = gr.Markdown()
df = gr.Dataframe(
headers=[
"row_id", "start", "end", "dur",
"status", "speaker", "gender",
"age_group", "emotion", "text"
],
wrap=True,
interactive=False,
max_height=420,
)
with gr.Row():
audio_out = gr.Audio(label="分段播放", type="numpy")
meta = gr.Markdown()
text = gr.Textbox(label="字幕文本", lines=4)
load_btn.click(
on_load,
inputs=[audio_repo, audio_path, json_repo, json_path],
outputs=[state, df, info],
)
df.select(
on_select_segment,
inputs=state,
outputs=[audio_out, meta, text],
)
demo.launch()