Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import whisper | |
| import subprocess | |
| import os | |
| import time | |
| import re | |
| import datetime | |
| from pyannote.audio import Pipeline | |
| print("Gradio version:", gr.__version__) | |
| # huggingface token 用于访问 pyannote 模型(替换为你的) | |
| hf_token = os.getenv("HF_TOKEN") | |
| # Whisper 模型加载(提前加载以加速) | |
| # asr_model = whisper.load_model("base").to("cuda") | |
| diarization_pipeline = Pipeline.from_pretrained("pyannote/speaker-diarization", use_auth_token=hf_token) | |
| def get_audio_duration(filename): | |
| result = subprocess.run( | |
| ['ffprobe', '-v', 'error', '-show_entries', 'format=duration', | |
| '-of', 'default=noprint_wrappers=1:nokey=1', filename], | |
| stdout=subprocess.PIPE, | |
| stderr=subprocess.STDOUT) | |
| return float(result.stdout) | |
| def transcribe_single(audio_path, language="auto", model_size="base"): | |
| input_path = "audio.mp3" | |
| os.rename(audio_path, input_path) | |
| asr_model = whisper.load_model(model_size)#.to("cuda") | |
| # Step 1: 静音检测 | |
| silence_cmd = f"ffmpeg -i {input_path} -af silencedetect=noise=-30dB:d=1 -f null - 2> silence_log.txt" | |
| os.system(silence_cmd) | |
| audio_duration = get_audio_duration(input_path) | |
| # Step 2: 解析 silence_log.txt | |
| silence_starts, silence_ends = [], [] | |
| with open("silence_log.txt", "r") as f: | |
| for line in f: | |
| if "silence_start" in line: | |
| match = re.search(r"silence_start: (\d+\.?\d*)", line) | |
| if match: | |
| silence_starts.append(float(match.group(1))) | |
| elif "silence_end" in line: | |
| match = re.search(r"silence_end: (\d+\.?\d*)", line) | |
| if match: | |
| silence_ends.append(float(match.group(1))) | |
| silence_starts.append(audio_duration) | |
| silence_ends.append(audio_duration) | |
| # Step 3: 分段 | |
| MIN_TARGET, MAX_TARGET = 480, 600 | |
| segments = [] | |
| current_start = 0.0 | |
| for i in range(len(silence_starts)): | |
| silence_point = silence_starts[i] | |
| segment_length = silence_point - current_start | |
| if segment_length >= MIN_TARGET: | |
| segment_end = silence_point if segment_length <= MAX_TARGET else current_start + MAX_TARGET | |
| segments.append((current_start, segment_end)) | |
| current_start = silence_ends[i] | |
| if current_start < audio_duration: | |
| segments.append((current_start, None)) | |
| # Step 4: 分段 + whisper | |
| output_lines = [] | |
| for idx, (start, end) in enumerate(segments): | |
| chunk_file = f"chunk_{idx:03d}.mp3" | |
| cmd = f"ffmpeg -i {input_path} -ss {start:.2f}" | |
| if end: | |
| cmd += f" -to {end:.2f}" | |
| cmd += f" -c copy {chunk_file}" | |
| os.system(cmd) | |
| result = asr_model.transcribe(chunk_file, language=language) | |
| output_lines.append(result["text"].strip()) | |
| os.remove(chunk_file) | |
| with open("transcription_output.txt", "w", encoding="utf-8") as f: | |
| f.write("\n".join(output_lines)) | |
| return "transcription_output.txt" | |
| def transcribe_multi(audio_path, language="auto", model_size="base"): | |
| input_path = "audio_multi.mp3" | |
| os.rename(audio_path, input_path) | |
| asr_model = whisper.load_model(model_size).to("cuda") | |
| diarization = diarization_pipeline(input_path) | |
| segments = [] | |
| for turn, _, speaker in diarization.itertracks(yield_label=True): | |
| start_time = turn.start | |
| end_time = turn.end | |
| speaker_label = speaker | |
| tmp_chunk = f"tmp_{start_time:.2f}_{end_time:.2f}.wav" | |
| os.system(f"ffmpeg -y -i {input_path} -ss {start_time:.3f} -to {end_time:.3f} -ar 16000 -ac 1 -loglevel error {tmp_chunk}") | |
| result = asr_model.transcribe(tmp_chunk, language=language) | |
| text = result['text'].strip() | |
| os.remove(tmp_chunk) | |
| if text: | |
| segments.append({ | |
| "start": start_time, | |
| "end": end_time, | |
| "speaker": speaker_label, | |
| "text": text | |
| }) | |
| speaker_map = {} | |
| speaker_counter = 1 | |
| output_lines = [] | |
| for seg in segments: | |
| speaker = seg["speaker"] | |
| if speaker not in speaker_map: | |
| speaker_map[speaker] = f"说话人{speaker_counter}" | |
| speaker_counter += 1 | |
| speaker_name = speaker_map[speaker] | |
| def format_ts(seconds): | |
| return str(datetime.timedelta(seconds=int(seconds))) + f".{int((seconds % 1) * 1000):03d}" | |
| start_str = format_ts(seg["start"]) | |
| end_str = format_ts(seg["end"]) | |
| line = f"[{start_str} - {end_str}] {speaker_name}:{seg['text']}" | |
| output_lines.append(line) | |
| with open("transcription_with_speakers.txt", "w", encoding="utf-8") as f: | |
| f.write("\n".join(output_lines)) | |
| return "transcription_with_speakers.txt" | |
| # def main(audio_file, is_multispeaker, language, model_size): | |
| # start_time = time.time() | |
| # result_file = transcribe_multi(audio_file, language, model_size) if is_multispeaker else transcribe_single(audio_file, language, model_size) | |
| # end_time = time.time() | |
| # elapsed = end_time - start_time | |
| # time_info = f"⏱️ 转录耗时:{elapsed:.2f} 秒" | |
| # return result_file, time_info | |
| # 运行转录任务,放在线程里 | |
| import threading | |
| def main_with_progress(audio_file, is_multispeaker, language, model_size): | |
| start_time = time.time() | |
| yield None, "⏳ 正在转录,请稍等...", None | |
| result_file_holder = {"file": None} | |
| def transcribe_task(): | |
| result_file_holder["file"] = transcribe_multi(audio_file, language, model_size) if is_multispeaker else transcribe_single(audio_file, language, model_size) | |
| thread = threading.Thread(target=transcribe_task) | |
| thread.start() | |
| # 每秒更新状态,直到任务完成 | |
| while thread.is_alive(): | |
| elapsed = time.time() - start_time | |
| yield None, f"⏳ 正在转录中... 已耗时 {elapsed:.1f} 秒", None | |
| time.sleep(1) | |
| # 完成后显示最终信息 | |
| elapsed = time.time() - start_time | |
| result_file = result_file_holder["file"] | |
| yield result_file, f"✅ 转录完成,⏱️总耗时:{elapsed:.2f} 秒" | |
| with gr.Blocks() as demo: | |
| gr.Markdown("# Whisper + PyAnnote 音频转录系统") | |
| audio_input = gr.Audio(type="filepath", label="上传音频") | |
| is_multi = gr.Checkbox(label="是否为多人对话音频(启用说话人分离)") | |
| language = gr.Dropdown( | |
| choices=[ | |
| ("自动识别", "auto"), | |
| ("英语 (English)", "en"), | |
| ("中文 (Chinese)", "zh"), | |
| ("法语 (French)", "fr"), | |
| ("德语 (German)", "de"), | |
| ("西班牙语 (Spanish)", "es"), | |
| ("日语 (Japanese)", "ja"), | |
| ("韩语 (Korean)", "ko"), | |
| ("葡萄牙语 (Portuguese)", "pt"), | |
| ("俄语 (Russian)", "ru"), | |
| ], | |
| value="auto", | |
| label="音频语言" | |
| ) | |
| model_size = gr.Dropdown( | |
| choices=[ | |
| ("tiny (39M)", "tiny"), | |
| ("base (74M)", "base"), | |
| ("small (244M)", "small"), | |
| ("medium (769M)", "medium"), | |
| ("large (1550M)", "large") | |
| ], | |
| value="base", | |
| label="Whisper 模型规模" | |
| ) | |
| # status_box = gr.Textbox(label="状态更新", interactive=False) | |
| output_file = gr.File(label="转录结果(.txt)") | |
| elapsed_time = gr.Textbox(label="处理用时", interactive=False) | |
| run_btn = gr.Button("开始转录") | |
| run_btn.click( | |
| fn=main_with_progress, | |
| inputs=[audio_input, is_multi, language, model_size], | |
| outputs=[output_file, elapsed_time] | |
| ) | |
| if __name__ == "__main__": | |
| demo.launch() | |