import gradio as gr import whisper import subprocess import os import time import re import datetime from pyannote.audio import Pipeline print("Gradio version:", gr.__version__) # huggingface token 用于访问 pyannote 模型(替换为你的) hf_token = os.getenv("HF_TOKEN") # Whisper 模型加载(提前加载以加速) # asr_model = whisper.load_model("base").to("cuda") diarization_pipeline = Pipeline.from_pretrained("pyannote/speaker-diarization", use_auth_token=hf_token) def get_audio_duration(filename): result = subprocess.run( ['ffprobe', '-v', 'error', '-show_entries', 'format=duration', '-of', 'default=noprint_wrappers=1:nokey=1', filename], stdout=subprocess.PIPE, stderr=subprocess.STDOUT) return float(result.stdout) def transcribe_single(audio_path, language="auto", model_size="base"): input_path = "audio.mp3" os.rename(audio_path, input_path) asr_model = whisper.load_model(model_size)#.to("cuda") # Step 1: 静音检测 silence_cmd = f"ffmpeg -i {input_path} -af silencedetect=noise=-30dB:d=1 -f null - 2> silence_log.txt" os.system(silence_cmd) audio_duration = get_audio_duration(input_path) # Step 2: 解析 silence_log.txt silence_starts, silence_ends = [], [] with open("silence_log.txt", "r") as f: for line in f: if "silence_start" in line: match = re.search(r"silence_start: (\d+\.?\d*)", line) if match: silence_starts.append(float(match.group(1))) elif "silence_end" in line: match = re.search(r"silence_end: (\d+\.?\d*)", line) if match: silence_ends.append(float(match.group(1))) silence_starts.append(audio_duration) silence_ends.append(audio_duration) # Step 3: 分段 MIN_TARGET, MAX_TARGET = 480, 600 segments = [] current_start = 0.0 for i in range(len(silence_starts)): silence_point = silence_starts[i] segment_length = silence_point - current_start if segment_length >= MIN_TARGET: segment_end = silence_point if segment_length <= MAX_TARGET else current_start + MAX_TARGET segments.append((current_start, segment_end)) current_start = silence_ends[i] if current_start < audio_duration: segments.append((current_start, None)) # Step 4: 分段 + whisper output_lines = [] for idx, (start, end) in enumerate(segments): chunk_file = f"chunk_{idx:03d}.mp3" cmd = f"ffmpeg -i {input_path} -ss {start:.2f}" if end: cmd += f" -to {end:.2f}" cmd += f" -c copy {chunk_file}" os.system(cmd) result = asr_model.transcribe(chunk_file, language=language) output_lines.append(result["text"].strip()) os.remove(chunk_file) with open("transcription_output.txt", "w", encoding="utf-8") as f: f.write("\n".join(output_lines)) return "transcription_output.txt" def transcribe_multi(audio_path, language="auto", model_size="base"): input_path = "audio_multi.mp3" os.rename(audio_path, input_path) asr_model = whisper.load_model(model_size).to("cuda") diarization = diarization_pipeline(input_path) segments = [] for turn, _, speaker in diarization.itertracks(yield_label=True): start_time = turn.start end_time = turn.end speaker_label = speaker tmp_chunk = f"tmp_{start_time:.2f}_{end_time:.2f}.wav" os.system(f"ffmpeg -y -i {input_path} -ss {start_time:.3f} -to {end_time:.3f} -ar 16000 -ac 1 -loglevel error {tmp_chunk}") result = asr_model.transcribe(tmp_chunk, language=language) text = result['text'].strip() os.remove(tmp_chunk) if text: segments.append({ "start": start_time, "end": end_time, "speaker": speaker_label, "text": text }) speaker_map = {} speaker_counter = 1 output_lines = [] for seg in segments: speaker = seg["speaker"] if speaker not in speaker_map: speaker_map[speaker] = f"说话人{speaker_counter}" speaker_counter += 1 speaker_name = speaker_map[speaker] def format_ts(seconds): return str(datetime.timedelta(seconds=int(seconds))) + f".{int((seconds % 1) * 1000):03d}" start_str = format_ts(seg["start"]) end_str = format_ts(seg["end"]) line = f"[{start_str} - {end_str}] {speaker_name}:{seg['text']}" output_lines.append(line) with open("transcription_with_speakers.txt", "w", encoding="utf-8") as f: f.write("\n".join(output_lines)) return "transcription_with_speakers.txt" # def main(audio_file, is_multispeaker, language, model_size): # start_time = time.time() # result_file = transcribe_multi(audio_file, language, model_size) if is_multispeaker else transcribe_single(audio_file, language, model_size) # end_time = time.time() # elapsed = end_time - start_time # time_info = f"⏱️ 转录耗时:{elapsed:.2f} 秒" # return result_file, time_info # 运行转录任务,放在线程里 import threading def main_with_progress(audio_file, is_multispeaker, language, model_size): start_time = time.time() yield None, "⏳ 正在转录,请稍等...", None result_file_holder = {"file": None} def transcribe_task(): result_file_holder["file"] = transcribe_multi(audio_file, language, model_size) if is_multispeaker else transcribe_single(audio_file, language, model_size) thread = threading.Thread(target=transcribe_task) thread.start() # 每秒更新状态,直到任务完成 while thread.is_alive(): elapsed = time.time() - start_time yield None, f"⏳ 正在转录中... 已耗时 {elapsed:.1f} 秒", None time.sleep(1) # 完成后显示最终信息 elapsed = time.time() - start_time result_file = result_file_holder["file"] yield result_file, f"✅ 转录完成,⏱️总耗时:{elapsed:.2f} 秒" with gr.Blocks() as demo: gr.Markdown("# Whisper + PyAnnote 音频转录系统") audio_input = gr.Audio(type="filepath", label="上传音频") is_multi = gr.Checkbox(label="是否为多人对话音频(启用说话人分离)") language = gr.Dropdown( choices=[ ("自动识别", "auto"), ("英语 (English)", "en"), ("中文 (Chinese)", "zh"), ("法语 (French)", "fr"), ("德语 (German)", "de"), ("西班牙语 (Spanish)", "es"), ("日语 (Japanese)", "ja"), ("韩语 (Korean)", "ko"), ("葡萄牙语 (Portuguese)", "pt"), ("俄语 (Russian)", "ru"), ], value="auto", label="音频语言" ) model_size = gr.Dropdown( choices=[ ("tiny (39M)", "tiny"), ("base (74M)", "base"), ("small (244M)", "small"), ("medium (769M)", "medium"), ("large (1550M)", "large") ], value="base", label="Whisper 模型规模" ) # status_box = gr.Textbox(label="状态更新", interactive=False) output_file = gr.File(label="转录结果(.txt)") elapsed_time = gr.Textbox(label="处理用时", interactive=False) run_btn = gr.Button("开始转录") run_btn.click( fn=main_with_progress, inputs=[audio_input, is_multi, language, model_size], outputs=[output_file, elapsed_time] ) if __name__ == "__main__": demo.launch()