# import re # import os # import json # import math # import tempfile # from dataclasses import dataclass # from typing import List, Optional, Tuple, Dict, Any # import gradio as gr # import numpy as np # import srt # from pydub import AudioSegment # from langdetect import DetectorFactory, detect_langs # DetectorFactory.seed = 0 # deterministic # # ----------------------------- # # Data structures # # ----------------------------- # @dataclass # class Cue: # start: float # end: float # text: str # # ----------------------------- # # Language utilities # # ----------------------------- # LANG_LABELS = ["原文(不指定)", "中文", "English", "日本語"] # def normalize_lang_code(code: str) -> str: # c = (code or "").lower() # if c.startswith("zh"): # return "中文" # if c == "en": # return "English" # if c == "ja": # return "日本語" # return "其他" # def detect_language_label(text: str) -> Dict[str, Any]: # t = (text or "").strip() # if not t: # return {"raw_code": None, "raw_prob": None, "label": None, "normalized_label": None} # t = t.replace("\n", " ").strip()[:400] # try: # langs = detect_langs(t) # if not langs: # return {"raw_code": None, "raw_prob": None, "label": None, "normalized_label": None} # top = langs[0] # raw_code = getattr(top, "lang", None) # raw_prob = float(getattr(top, "prob", 0.0)) if top else None # norm = normalize_lang_code(raw_code) # return { # "raw_code": raw_code, # "raw_prob": round(raw_prob, 3) if raw_prob is not None else None, # "label": norm if norm != "其他" else "原文(不指定)", # "normalized_label": norm, # } # except Exception: # return {"raw_code": None, "raw_prob": None, "label": None, "normalized_label": None} # def check_declared_mismatch(declared: str, detected_norm: Optional[str]) -> bool: # if declared == "原文(不指定)": # return False # if detected_norm is None: # return True # if detected_norm == "其他": # return True # return detected_norm != declared # def analyze_language_for_cues(cues: List[Cue]) -> Dict[str, Any]: # counts = {"中文": 0, "English": 0, "日本語": 0, "其他": 0} # for c in cues: # d = detect_language_label(c.text) # norm = d.get("normalized_label") # if norm in counts: # counts[norm] += 1 # dominant = max(counts.items(), key=lambda x: x[1])[0] if cues else None # return {"counts": counts, "dominant_norm": dominant} # # ----------------------------- # # Subtitle parsing # # ----------------------------- # _TAG_RE = re.compile(r"]+?>", re.IGNORECASE) # _VTT_TIME_RE = re.compile( # r"(?P\d{2}:\d{2}:\d{2}\.\d{3}|\d{1,2}:\d{2}\.\d{3})\s*-->\s*" # r"(?P\d{2}:\d{2}:\d{2}\.\d{3}|\d{1,2}:\d{2}\.\d{3})" # ) # def _strip_tags(text: str) -> str: # text = _TAG_RE.sub("", text) # text = text.replace("", "").replace("", "") # return text.strip() # def _time_to_seconds(t: str) -> float: # t = t.strip().split()[0] # parts = t.split(":") # if len(parts) == 3: # h = int(parts[0]) # m = int(parts[1]) # s = float(parts[2]) # return h * 3600 + m * 60 + s # if len(parts) == 2: # m = int(parts[0]) # s = float(parts[1]) # return m * 60 + s # raise ValueError(f"Unsupported time format: {t}") # def parse_vtt(content: str) -> List[Cue]: # content = content.replace("\ufeff", "") # content = re.sub(r"^\s*WEBVTT.*?\n", "", content, flags=re.IGNORECASE) # blocks = re.split(r"\r?\n\r?\n", content.strip()) # cues: List[Cue] = [] # for block in blocks: # lines = [ln.rstrip("\n") for ln in re.split(r"\r?\n", block) if ln.strip() != ""] # if not lines: # continue # time_line_idx = None # for idx in range(min(2, len(lines))): # if "-->" in lines[idx]: # time_line_idx = idx # break # if time_line_idx is None: # continue # m = _VTT_TIME_RE.search(lines[time_line_idx]) # if not m: # continue # start = _time_to_seconds(m.group("start")) # end = _time_to_seconds(m.group("end")) # if not (math.isfinite(start) and math.isfinite(end)) or end <= start: # continue # text_lines = lines[time_line_idx + 1 :] # text = _strip_tags("\n".join(text_lines)).strip() # if not text: # continue # cues.append(Cue(start=start, end=end, text=text)) # cues.sort(key=lambda x: x.start) # return cues # def parse_srt(content: str) -> List[Cue]: # content = content.replace("\ufeff", "") # subs = list(srt.parse(content)) # cues: List[Cue] = [] # for sub in subs: # cues.append( # Cue( # start=sub.start.total_seconds(), # end=sub.end.total_seconds(), # text=sub.content.strip(), # ) # ) # cues.sort(key=lambda x: x.start) # return cues # def parse_subtitle_file(path: Optional[str]) -> List[Cue]: # if not path: # return [] # with open(path, "r", encoding="utf-8") as f: # content = f.read() # head = content.lstrip()[:80].upper() # ext = os.path.splitext(path)[1].lower() # if "WEBVTT" in head or ext == ".vtt": # return parse_vtt(content) # return parse_srt(content) # # ----------------------------- # # Alignment # # ----------------------------- # def align_by_time( # a: List[Cue], # b: List[Cue], # max_mid_diff: float = 1.5, # ) -> Tuple[List[Dict[str, Any]], Dict[str, Any]]: # aligned: List[Dict[str, Any]] = [] # i, j = 0, 0 # matched_a = set() # matched_b = set() # while i < len(a) and j < len(b): # x = a[i] # y = b[j] # mid_x = (x.start + x.end) / 2 # mid_y = (y.start + y.end) / 2 # diff = mid_x - mid_y # if abs(diff) <= max_mid_diff: # # global window for "corresponding part" # g_start = min(x.start, y.start) # g_end = max(x.end, y.end) # aligned.append( # { # "idx": len(aligned) + 1, # "start": g_start, # "end": g_end, # "en_start": x.start, # "en_end": x.end, # "zh_start": y.start, # "zh_end": y.end, # "text_en": x.text, # "text_zh": y.text, # } # ) # matched_a.add(i) # matched_b.add(j) # i += 1 # j += 1 # elif diff < 0: # i += 1 # else: # j += 1 # stats = { # "trackA_total": len(a), # "trackB_total": len(b), # "aligned_pairs": len(aligned), # "trackA_unmatched": len(a) - len(matched_a), # "trackB_unmatched": len(b) - len(matched_b), # "max_mid_diff_sec": max_mid_diff, # } # return aligned, stats # # ----------------------------- # # Audio processing / QC # # ----------------------------- # def load_audio(path: str) -> AudioSegment: # return AudioSegment.from_file(path) # def segment_to_wav_file(audio: AudioSegment, start_s: float, end_s: float) -> str: # start_ms = max(0, int(start_s * 1000)) # end_ms = max(start_ms + 1, int(end_s * 1000)) # seg = audio[start_ms:end_ms] # tmp = tempfile.NamedTemporaryFile(delete=False, suffix=".wav") # tmp.close() # seg.export(tmp.name, format="wav") # return tmp.name # def compute_dbfs(audio: AudioSegment) -> float: # v = audio.dBFS # if v == float("-inf"): # return -120.0 # return float(v) # def qc_on_aligned_segments( # audio_a: AudioSegment, # audio_b: AudioSegment, # aligned: List[Dict[str, Any]], # silence_dbfs_threshold: float = -50.0, # low_dbfs_threshold: float = -40.0, # ) -> Tuple[List[Dict[str, Any]], Dict[str, Any]]: # issues: List[Dict[str, Any]] = [] # a_levels = [] # b_levels = [] # for seg in aligned: # a_start, a_end = seg["en_start"], seg["en_end"] # b_start, b_end = seg["zh_start"], seg["zh_end"] # a = audio_a[int(a_start * 1000) : int(a_end * 1000)] # b = audio_b[int(b_start * 1000) : int(b_end * 1000)] # a_dbfs = compute_dbfs(a) # b_dbfs = compute_dbfs(b) # a_levels.append(a_dbfs) # b_levels.append(b_dbfs) # problems = [] # if a_dbfs <= silence_dbfs_threshold: # problems.append("Track A 静音/近静音") # elif a_dbfs <= low_dbfs_threshold: # problems.append("Track A 音量偏低") # if b_dbfs <= silence_dbfs_threshold: # problems.append("Track B 静音/近静音") # elif b_dbfs <= low_dbfs_threshold: # problems.append("Track B 音量偏低") # if abs(a_dbfs - b_dbfs) >= 12.0: # problems.append("两路音量差异过大(≥12dB)") # if problems: # issues.append( # { # "segment": seg["idx"], # "time": f"{seg['start']:.2f}-{seg['end']:.2f}s", # "A_time": f"{a_start:.2f}-{a_end:.2f}s", # "B_time": f"{b_start:.2f}-{b_end:.2f}s", # "problems": problems, # "text_A": seg["text_en"][:120] + ("..." if len(seg["text_en"]) > 120 else ""), # "text_B": seg["text_zh"][:120] + ("..." if len(seg["text_zh"]) > 120 else ""), # "A_dbfs": round(a_dbfs, 1), # "B_dbfs": round(b_dbfs, 1), # } # ) # def _safe_mean(xs): # return float(np.mean(xs)) if xs else 0.0 # qc_stats = { # "aligned_pairs": len(aligned), # "issue_segments": len(issues), # "A_avg_dbfs": round(_safe_mean(a_levels), 1), # "B_avg_dbfs": round(_safe_mean(b_levels), 1), # "A_min_dbfs": round(float(np.min(a_levels)), 1) if a_levels else None, # "B_min_dbfs": round(float(np.min(b_levels)), 1) if b_levels else None, # "silence_dbfs_threshold": silence_dbfs_threshold, # "low_dbfs_threshold": low_dbfs_threshold, # } # return issues, qc_stats # # ----------------------------- # # Interactive HTML table (buttons per row) # # ----------------------------- # def _escape_html(s: str) -> str: # return (s or "").replace("&", "&").replace("<", "<").replace(">", ">") # def build_interactive_table_html(aligned: List[Dict[str, Any]]) -> str: # max_rows = 2000 # aligned_view = aligned[:max_rows] # rows_html = [] # for seg in aligned_view: # idx = seg["idx"] # t = f'{seg["start"]:.2f}-{seg["end"]:.2f}' # a_t = f'{seg["en_start"]:.2f}-{seg["en_end"]:.2f}' # b_t = f'{seg["zh_start"]:.2f}-{seg["zh_end"]:.2f}' # a_txt = _escape_html(seg["text_en"]) # b_txt = _escape_html(seg["text_zh"]) # a_lang = _escape_html(seg.get("en_lang_label") or "-") # b_lang = _escape_html(seg.get("zh_lang_label") or "-") # a_conf = seg.get("en_lang_prob") # b_conf = seg.get("zh_lang_prob") # a_conf_s = f"{a_conf:.3f}" if isinstance(a_conf, (int, float)) else "-" # b_conf_s = f"{b_conf:.3f}" if isinstance(b_conf, (int, float)) else "-" # a_bad = seg.get("en_lang_mismatch", False) # b_bad = seg.get("zh_lang_mismatch", False) # a_cls = "bad" if a_bad else "ok" # b_cls = "bad" if b_bad else "ok" # btns = ( # f'' # f'' # f'' # ) # rows_html.append( # f""" # # {idx:04d} # {t} # {btns} # {a_txt} # {b_txt} # {a_t} # {b_t} # {a_lang} ({a_conf_s}) # {b_lang} ({b_conf_s}) # # """.strip() # ) # # IMPORTANT FIX: # # - __qs must be `#${id}`, not `#${{id}}` # # - action_box/action_btn must exist in DOM (we keep them and hide via CSS) # # - Wait for audio src to change before play (avoid playing old segment) # table_html = f""" #
#
# 点击每行按钮可生成并播放片段(A / B / 同时)。若浏览器阻止自动播放,请在右侧播放器手动点一次播放键。 #
#
# # # # # # # # # # # # # # # # {"".join(rows_html)} # #
#Global(s)PlayTrack ATrack BA timeB timeA LangB Lang
#
#
# # # """.strip() # if len(aligned) > max_rows: # table_html = ( # f"
" # f"提示:对齐片段数为 {len(aligned)},为保证渲染性能仅展示前 {max_rows} 行。
\n" # + table_html # ) # return table_html # # ----------------------------- # # Segment generation (core fix: global window + offset) # # ----------------------------- # def _pick_window(seg: Dict[str, Any], mode: str, offset_a: float, offset_b: float) -> Tuple[float, float, float, float]: # """ # mode: # - "global": use seg["start"]/["end"] for BOTH tracks # - "per_track": use seg["en_start"]/["en_end"] for A, seg["zh_start"]/["zh_end"] for B # offsets are applied per track. # Returns: (a_start, a_end, b_start, b_end) # """ # if mode == "per_track": # a_start, a_end = seg["en_start"], seg["en_end"] # b_start, b_end = seg["zh_start"], seg["zh_end"] # else: # # global window (recommended) # a_start, a_end = seg["start"], seg["end"] # b_start, b_end = seg["start"], seg["end"] # a_start = max(0.0, a_start + float(offset_a)) # a_end = max(a_start + 0.01, a_end + float(offset_a)) # b_start = max(0.0, b_start + float(offset_b)) # b_end = max(b_start + 0.01, b_end + float(offset_b)) # return a_start, a_end, b_start, b_end # # ----------------------------- # # Gradio callbacks # # ----------------------------- # def parse_align_and_qc( # audio_a_path: Optional[str], # audio_b_path: Optional[str], # sub_a_path: Optional[str], # sub_b_path: Optional[str], # declared_a_lang: str, # declared_b_lang: str, # crop_mode: str, # offset_a: float, # offset_b: float, # max_mid_diff: float, # silence_th: float, # low_th: float, # ): # if not audio_a_path or not audio_b_path: # return ( # [], # "", # {"error": "请同时上传 Track A 与 Track B 音频。"}, # [], # None, # None, # gr.update(choices=[], value=None), # None, # ) # try: # cues_a = parse_subtitle_file(sub_a_path) if sub_a_path else [] # cues_b = parse_subtitle_file(sub_b_path) if sub_b_path else [] # if not cues_a or not cues_b: # return ( # [], # "", # {"error": "请同时提供两路字幕(SRT/VTT)。", "A_cues": len(cues_a), "B_cues": len(cues_b)}, # [], # audio_a_path, # audio_b_path, # gr.update(choices=[], value=None), # None, # ) # # overall language stats # a_lang_stats = analyze_language_for_cues(cues_a) # b_lang_stats = analyze_language_for_cues(cues_b) # aligned, align_stats = align_by_time(cues_a, cues_b, max_mid_diff=max_mid_diff) # # per-segment language + mismatch issues # lang_mismatch_issues = [] # for seg in aligned: # da = detect_language_label(seg["text_en"]) # db = detect_language_label(seg["text_zh"]) # seg["en_lang_label"] = da.get("label") # seg["en_lang_prob"] = da.get("raw_prob") # seg["en_lang_norm"] = da.get("normalized_label") # seg["zh_lang_label"] = db.get("label") # seg["zh_lang_prob"] = db.get("raw_prob") # seg["zh_lang_norm"] = db.get("normalized_label") # seg["en_lang_mismatch"] = check_declared_mismatch(declared_a_lang, seg["en_lang_norm"]) # seg["zh_lang_mismatch"] = check_declared_mismatch(declared_b_lang, seg["zh_lang_norm"]) # if seg["en_lang_mismatch"] or seg["zh_lang_mismatch"]: # problems = [] # if seg["en_lang_mismatch"]: # problems.append(f"Track A 声明={declared_a_lang} 检测={seg['en_lang_label']}") # if seg["zh_lang_mismatch"]: # problems.append(f"Track B 声明={declared_b_lang} 检测={seg['zh_lang_label']}") # lang_mismatch_issues.append( # { # "segment": seg["idx"], # "time": f"{seg['start']:.2f}-{seg['end']:.2f}s", # "type": "LanguageMismatch", # "problems": problems, # "text_A": seg["text_en"][:120] + ("..." if len(seg["text_en"]) > 120 else ""), # "text_B": seg["text_zh"][:120] + ("..." if len(seg["text_zh"]) > 120 else ""), # } # ) # # dataframe rows # rows = [] # for seg in aligned: # rows.append( # [ # seg["idx"], # f'{seg["start"]:.2f}', # f'{seg["end"]:.2f}', # seg["text_en"], # seg["text_zh"], # f'{seg["en_start"]:.2f}-{seg["en_end"]:.2f}', # f'{seg["zh_start"]:.2f}-{seg["zh_end"]:.2f}', # ] # ) # # audio QC (still based on per-track subtitle times; keep original semantics) # audio_a = load_audio(audio_a_path) # audio_b = load_audio(audio_b_path) # issues_qc, qc_stats = qc_on_aligned_segments( # audio_a, audio_b, aligned, # silence_dbfs_threshold=silence_th, # low_dbfs_threshold=low_th # ) # issues_all = lang_mismatch_issues + issues_qc # stats = { # "alignment": align_stats, # "qc": qc_stats, # "language": { # "declared": {"TrackA": declared_a_lang, "TrackB": declared_b_lang}, # "detected_overall": { # "TrackA_dominant_norm": a_lang_stats["dominant_norm"], # "TrackB_dominant_norm": b_lang_stats["dominant_norm"], # "TrackA_counts": a_lang_stats["counts"], # "TrackB_counts": b_lang_stats["counts"], # }, # "segment_mismatch_count": len(lang_mismatch_issues), # }, # "segment_crop": { # "mode": crop_mode, # "offset_a_sec": offset_a, # "offset_b_sec": offset_b, # "note": "若对比播放不对应:优先使用“对齐全局时间(推荐)”,并微调 Track A/B 时间偏移。", # } # } # choices = [f'{seg["idx"]:04d} | {seg["start"]:.2f}-{seg["end"]:.2f}s' for seg in aligned] # selector_update = gr.update(choices=choices, value=(choices[0] if choices else None)) # state = { # "aligned": aligned, # "audio_a_path": audio_a_path, # "audio_b_path": audio_b_path, # "declared": {"TrackA": declared_a_lang, "TrackB": declared_b_lang}, # "crop_mode": crop_mode, # "offset_a": float(offset_a), # "offset_b": float(offset_b), # } # html_table = build_interactive_table_html(aligned) # return rows, html_table, stats, issues_all, audio_a_path, audio_b_path, selector_update, state # except Exception as e: # return ( # [], # "", # {"error": f"解析/对齐失败: {str(e)}"}, # [{"error": str(e)}], # audio_a_path, # audio_b_path, # gr.update(choices=[], value=None), # None, # ) # def _parse_selector_value(v: Optional[str]) -> Optional[int]: # if not v: # return None # m = re.match(r"^\s*(\d+)\s*\|", v) # if not m: # return None # return int(m.group(1)) # def make_segment_audio_by_idx(idx: int, state: Optional[Dict[str, Any]]): # if not state or "aligned" not in state: # return None, None, {"error": "请先完成字幕解析与对齐。"} # aligned = state["aligned"] # if idx < 1 or idx > len(aligned): # return None, None, {"error": "片段索引越界。"} # seg = aligned[idx - 1] # audio_a_path = state["audio_a_path"] # audio_b_path = state["audio_b_path"] # crop_mode = state.get("crop_mode", "global") # offset_a = float(state.get("offset_a", 0.0)) # offset_b = float(state.get("offset_b", 0.0)) # try: # audio_a = load_audio(audio_a_path) # audio_b = load_audio(audio_b_path) # a_start, a_end, b_start, b_end = _pick_window(seg, crop_mode, offset_a, offset_b) # a_wav = segment_to_wav_file(audio_a, a_start, a_end) # b_wav = segment_to_wav_file(audio_b, b_start, b_end) # info = { # "segment": idx, # "global_time": f'{seg["start"]:.2f}-{seg["end"]:.2f}s', # "crop_mode": crop_mode, # "offset_a_sec": offset_a, # "offset_b_sec": offset_b, # "crop_A_time": f"{a_start:.2f}-{a_end:.2f}s", # "crop_B_time": f"{b_start:.2f}-{b_end:.2f}s", # "subtitle_A_time": f'{seg["en_start"]:.2f}-{seg["en_end"]:.2f}s', # "subtitle_B_time": f'{seg["zh_start"]:.2f}-{seg["zh_end"]:.2f}s', # "text_A": seg["text_en"], # "text_B": seg["text_zh"], # } # return a_wav, b_wav, info # except Exception as e: # return None, None, {"error": f"生成片段音频失败: {str(e)}"} # def make_segment_audio(selector_value: Optional[str], state: Optional[Dict[str, Any]]): # idx = _parse_selector_value(selector_value) # if idx is None: # return None, None, {"error": "请选择一个有效片段。"} # return make_segment_audio_by_idx(idx, state) # def make_segment_audio_from_action(action_json: str, state: Optional[Dict[str, Any]]): # """ # action_json: {"idx": , "mode": "a"/"b"/"both"} # mode is only used by frontend for play policy; backend always returns both segment audios. # """ # try: # payload = json.loads(action_json or "{}") # idx = int(payload.get("idx")) # except Exception: # return None, None, {"error": "动作解析失败(action_json 无效)。"} # return make_segment_audio_by_idx(idx, state) # def clear_all(): # return ( # None, None, None, None, # "原文(不指定)", "原文(不指定)", # "global", 0.0, 0.0, # [], "", {}, [], # None, None, # gr.update(choices=[], value=None), # None, # None, None, {}, # "", # ) # # ----------------------------- # # UI # # ----------------------------- # CSS = """ # /* Keep components in DOM for injected JS to locate them */ # .dom-hidden { display: none !important; } # """ # with gr.Blocks(css=CSS) as demo: # gr.Markdown( # """ # # 可视化语音质检平台(交互逐行播放 + 语言一致性 + 对齐裁剪) # - 交互表格每行播放:A / B / 同时 # - “对齐全局时间(推荐)”可显著减少“对比不对应”的听感问题 # - “时间偏移”用于修正音频头部静音/时间轴偏差 # """.strip() # ) # state = gr.State(value=None) # # Must exist in DOM (NOT visible=False) # action_box = gr.Textbox( # label="__action_box", # value="", # elem_id="action_box", # elem_classes=["dom-hidden"], # ) # action_btn = gr.Button( # "__action_btn", # elem_id="action_btn", # elem_classes=["dom-hidden"], # ) # with gr.Row(): # with gr.Column(scale=1): # gr.Markdown("## 1) 上传文件") # audio_a = gr.File( # label="Track A 音频/视频", # file_types=[".mp3", ".wav", ".m4a", ".flac", ".ogg", ".aac", ".mp4", ".mov", ".avi"], # type="filepath", # ) # audio_b = gr.File( # label="Track B 音频/视频", # file_types=[".mp3", ".wav", ".m4a", ".flac", ".ogg", ".aac", ".mp4", ".mov", ".avi"], # type="filepath", # ) # sub_a = gr.File( # label="Track A 字幕(.srt/.vtt)", # file_types=[".srt", ".vtt", ".txt"], # type="filepath", # ) # sub_b = gr.File( # label="Track B 字幕(.srt/.vtt)", # file_types=[".srt", ".vtt", ".txt"], # type="filepath", # ) # gr.Markdown("## 2) 声明语言(用于一致性检查)") # declared_a_lang = gr.Dropdown( # label="Track A 声明语言", # choices=LANG_LABELS, # value="原文(不指定)", # ) # declared_b_lang = gr.Dropdown( # label="Track B 声明语言", # choices=LANG_LABELS, # value="原文(不指定)", # ) # gr.Markdown("## 3) 片段裁剪设置(解决“不对应”)") # crop_mode = gr.Radio( # label="裁剪基准", # choices=[ # ("对齐全局时间(推荐)", "global"), # ("各自字幕时间", "per_track"), # ], # value="global", # ) # offset_a = gr.Slider( # label="Track A 时间偏移(秒,可为负)", # minimum=-30.0, # maximum=30.0, # value=0.0, # step=0.1, # ) # offset_b = gr.Slider( # label="Track B 时间偏移(秒,可为负)", # minimum=-30.0, # maximum=30.0, # value=0.0, # step=0.1, # ) # gr.Markdown("## 4) 对齐与质检参数") # max_mid_diff = gr.Slider( # label="两路字幕对齐阈值(中点时间差,秒)", # minimum=0.3, # maximum=5.0, # value=1.5, # step=0.1, # ) # silence_th = gr.Slider( # label="静音/近静音阈值(dBFS)", # minimum=-80, # maximum=-20, # value=-50, # step=1, # ) # low_th = gr.Slider( # label="音量偏低阈值(dBFS)", # minimum=-80, # maximum=-20, # value=-40, # step=1, # ) # with gr.Row(): # btn_run = gr.Button("解析对齐 + 质检", variant="primary") # btn_clear = gr.Button("清空", variant="secondary") # with gr.Column(scale=2): # with gr.Tabs(): # with gr.TabItem("交互播放表格(推荐)"): # interactive_table = gr.HTML() # with gr.TabItem("分段对照表(Dataframe)"): # seg_table = gr.Dataframe( # headers=["#", "start(s)", "end(s)", "Track A", "Track B", "A time", "B time"], # datatype=["number", "str", "str", "str", "str", "str", "str"], # interactive=False, # wrap=True, # ) # with gr.TabItem("统计概览"): # stats = gr.JSON(label="统计信息(对齐 + QC + 语言一致性 + 裁剪设置)") # with gr.TabItem("问题片段"): # issues = gr.JSON(label="问题列表(含 LanguageMismatch)") # with gr.TabItem("音频对比播放"): # gr.Markdown("### 原始音频(整段)") # player_a_full = gr.Audio(label="Track A 原始音频", interactive=False) # player_b_full = gr.Audio(label="Track B 原始音频", interactive=False) # gr.Markdown("### 选择片段并生成对比音频(备用方式)") # selector = gr.Dropdown(label="片段选择", choices=[], value=None) # btn_make = gr.Button("生成所选片段(两路)", variant="primary") # # elem_id used by injected JS # player_a_seg = gr.Audio(label="Track A 片段", interactive=False, elem_id="a_audio_seg") # player_b_seg = gr.Audio(label="Track B 片段", interactive=False, elem_id="b_audio_seg") # seg_info = gr.JSON(label="片段信息(含裁剪窗口)") # btn_run.click( # fn=parse_align_and_qc, # inputs=[ # audio_a, audio_b, sub_a, sub_b, # declared_a_lang, declared_b_lang, # crop_mode, offset_a, offset_b, # max_mid_diff, silence_th, low_th # ], # outputs=[ # seg_table, interactive_table, stats, issues, # player_a_full, player_b_full, # selector, state # ], # ) # btn_make.click( # fn=make_segment_audio, # inputs=[selector, state], # outputs=[player_a_seg, player_b_seg, seg_info], # ) # # triggered by table buttons # action_btn.click( # fn=make_segment_audio_from_action, # inputs=[action_box, state], # outputs=[player_a_seg, player_b_seg, seg_info], # ) # btn_clear.click( # fn=clear_all, # inputs=[], # outputs=[ # audio_a, audio_b, sub_a, sub_b, # declared_a_lang, declared_b_lang, # crop_mode, offset_a, offset_b, # seg_table, interactive_table, stats, issues, # player_a_full, player_b_full, # selector, state, # player_a_seg, player_b_seg, seg_info, # action_box # ], # ) # if __name__ == "__main__": # demo.launch() import json import numpy as np import gradio as gr from huggingface_hub import hf_hub_download import soundfile as sf # ===================================================== # 工具函数 # ===================================================== def download_from_dataset(repo_id: str, file_path: str) -> str: repo_id = repo_id.strip() file_path = file_path.strip() if not repo_id: raise gr.Error("Audio / JSON Dataset repo 不能为空") if not file_path: raise gr.Error("Audio / JSON 文件路径不能为空") try: return hf_hub_download( repo_id=repo_id, filename=file_path, repo_type="dataset", ) except Exception as e: raise gr.Error( f"无法从 Dataset 下载文件:\n\n" f"- repo: {repo_id}\n" f"- path: {file_path}\n\n" f"错误信息:{str(e)}" ) def load_audio(local_audio_path: str): audio, sr = sf.read(local_audio_path) if audio.ndim == 2: audio = audio.mean(axis=1) return audio.astype(np.float32), sr def parse_segments(json_path: str): with open(json_path, "r", encoding="utf-8") as f: data = json.load(f) segments = [] for i, s in enumerate(data.get("segments", [])): segments.append({ "row_id": s.get("index", i), "start": float(s.get("start", 0.0)), "end": float(s.get("end", 0.0)), "dur": float(s.get("end", 0.0) - s.get("start", 0.0)), "status": s.get("status", ""), "speaker": s.get("speaker", ""), "gender": s.get("gender", ""), "age_group": s.get("age_group", ""), "emotion": s.get("emotion", ""), "text": s.get("text", "") or "", }) return segments, data.get("audio_name", "") def slice_audio(audio, sr, start, end): s = max(0, int(start * sr)) e = min(len(audio), int(end * sr)) return sr, audio[s:e] # ===================================================== # Gradio 回调 # ===================================================== def on_load( audio_repo, audio_path, json_repo, json_path, ): # ---- 清理输入 ---- audio_repo = audio_repo.strip() audio_path = audio_path.strip() json_repo = json_repo.strip() json_path = json_path.strip() if not audio_repo or not audio_path: raise gr.Error("请填写音频 Dataset repo 和文件路径") if not json_repo or not json_path: raise gr.Error("请填写字幕 JSON Dataset repo 和文件路径") # ---- 下载 ---- local_audio = download_from_dataset(audio_repo, audio_path) local_json = download_from_dataset(json_repo, json_path) # ---- 加载音频 ---- try: audio, sr = load_audio(local_audio) except Exception as e: raise gr.Error(f"音频加载失败:{str(e)}") # ---- 加载 JSON ---- try: segments, audio_name = parse_segments(local_json) except Exception as e: raise gr.Error(f"字幕 JSON 解析失败:{str(e)}") if not segments: raise gr.Error("字幕文件中未找到任何 segments") rows = [ [ s["row_id"], s["start"], s["end"], s["dur"], s["status"], s["speaker"], s["gender"], s["age_group"], s["emotion"], s["text"] ] for s in segments ] info = ( f"**Audio repo**: `{audio_repo}` \n" f"**Audio path**: `{audio_path}` \n" f"**JSON repo**: `{json_repo}` \n" f"**JSON path**: `{json_path}` \n" f"**Segments**: {len(segments)} \n" f"**Sample rate**: {sr} Hz" ) state = { "audio": audio, "sr": sr, "segments": segments, } return state, rows, info def on_select_segment(evt: gr.SelectData, state): row = evt.row_value start, end = float(row[1]), float(row[2]) sr, audio_seg = slice_audio( state["audio"], state["sr"], start, end, ) meta = ( f"- **speaker**: `{row[5]}`\n" f"- **gender**: `{row[6]}`\n" f"- **age_group**: `{row[7]}`\n" f"- **emotion**: `{row[8]}`\n" f"- **status**: `{row[4]}`" ) text = row[9] if row[9].strip() else "(empty)" return (sr, audio_seg), meta, text # ===================================================== # UI # ===================================================== with gr.Blocks(title="HF Dataset Audio + Subtitle Explorer") as demo: gr.Markdown( "# 🎧 音频 & 字幕分段可视化(Dataset 路径模式)\n" "分别指定 **音频** 与 **字幕 JSON** 所在的 Dataset 与路径。" ) state = gr.State() with gr.Row(): audio_repo = gr.Textbox( label="Audio Dataset repo", value="AlexTYJ/Multilingual-ASR-Benchmark", ) audio_path = gr.Textbox( label="Audio file path", placeholder="audio/testbatch/ARE/xxx.wav", ) with gr.Row(): json_repo = gr.Textbox( label="JSON Dataset repo", value="AlexTYJ/Multilingual-ASR-Benchmark", ) json_path = gr.Textbox( label="JSON file path", placeholder="text/ref/testbatch/ARE/xxx.json", ) load_btn = gr.Button("加载音频 & 字幕", variant="primary") info = gr.Markdown() df = gr.Dataframe( headers=[ "row_id", "start", "end", "dur", "status", "speaker", "gender", "age_group", "emotion", "text" ], wrap=True, interactive=False, max_height=420, ) with gr.Row(): audio_out = gr.Audio(label="分段播放", type="numpy") meta = gr.Markdown() text = gr.Textbox(label="字幕文本", lines=4) load_btn.click( on_load, inputs=[audio_repo, audio_path, json_repo, json_path], outputs=[state, df, info], ) df.select( on_select_segment, inputs=state, outputs=[audio_out, meta, text], ) demo.launch()