speech_to_text / app.py
GoodResearch's picture
Upload app.py
d2c5e0d verified
import gradio as gr
import whisper
import subprocess
import os
import time
import re
import datetime
from pyannote.audio import Pipeline
print("Gradio version:", gr.__version__)
# huggingface token 用于访问 pyannote 模型(替换为你的)
hf_token = os.getenv("HF_TOKEN")
# Whisper 模型加载(提前加载以加速)
# asr_model = whisper.load_model("base").to("cuda")
diarization_pipeline = Pipeline.from_pretrained("pyannote/speaker-diarization", use_auth_token=hf_token)
def get_audio_duration(filename):
result = subprocess.run(
['ffprobe', '-v', 'error', '-show_entries', 'format=duration',
'-of', 'default=noprint_wrappers=1:nokey=1', filename],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
return float(result.stdout)
def transcribe_single(audio_path, language="auto", model_size="base"):
input_path = "audio.mp3"
os.rename(audio_path, input_path)
asr_model = whisper.load_model(model_size)#.to("cuda")
# Step 1: 静音检测
silence_cmd = f"ffmpeg -i {input_path} -af silencedetect=noise=-30dB:d=1 -f null - 2> silence_log.txt"
os.system(silence_cmd)
audio_duration = get_audio_duration(input_path)
# Step 2: 解析 silence_log.txt
silence_starts, silence_ends = [], []
with open("silence_log.txt", "r") as f:
for line in f:
if "silence_start" in line:
match = re.search(r"silence_start: (\d+\.?\d*)", line)
if match:
silence_starts.append(float(match.group(1)))
elif "silence_end" in line:
match = re.search(r"silence_end: (\d+\.?\d*)", line)
if match:
silence_ends.append(float(match.group(1)))
silence_starts.append(audio_duration)
silence_ends.append(audio_duration)
# Step 3: 分段
MIN_TARGET, MAX_TARGET = 480, 600
segments = []
current_start = 0.0
for i in range(len(silence_starts)):
silence_point = silence_starts[i]
segment_length = silence_point - current_start
if segment_length >= MIN_TARGET:
segment_end = silence_point if segment_length <= MAX_TARGET else current_start + MAX_TARGET
segments.append((current_start, segment_end))
current_start = silence_ends[i]
if current_start < audio_duration:
segments.append((current_start, None))
# Step 4: 分段 + whisper
output_lines = []
for idx, (start, end) in enumerate(segments):
chunk_file = f"chunk_{idx:03d}.mp3"
cmd = f"ffmpeg -i {input_path} -ss {start:.2f}"
if end:
cmd += f" -to {end:.2f}"
cmd += f" -c copy {chunk_file}"
os.system(cmd)
result = asr_model.transcribe(chunk_file, language=language)
output_lines.append(result["text"].strip())
os.remove(chunk_file)
with open("transcription_output.txt", "w", encoding="utf-8") as f:
f.write("\n".join(output_lines))
return "transcription_output.txt"
def transcribe_multi(audio_path, language="auto", model_size="base"):
input_path = "audio_multi.mp3"
os.rename(audio_path, input_path)
asr_model = whisper.load_model(model_size).to("cuda")
diarization = diarization_pipeline(input_path)
segments = []
for turn, _, speaker in diarization.itertracks(yield_label=True):
start_time = turn.start
end_time = turn.end
speaker_label = speaker
tmp_chunk = f"tmp_{start_time:.2f}_{end_time:.2f}.wav"
os.system(f"ffmpeg -y -i {input_path} -ss {start_time:.3f} -to {end_time:.3f} -ar 16000 -ac 1 -loglevel error {tmp_chunk}")
result = asr_model.transcribe(tmp_chunk, language=language)
text = result['text'].strip()
os.remove(tmp_chunk)
if text:
segments.append({
"start": start_time,
"end": end_time,
"speaker": speaker_label,
"text": text
})
speaker_map = {}
speaker_counter = 1
output_lines = []
for seg in segments:
speaker = seg["speaker"]
if speaker not in speaker_map:
speaker_map[speaker] = f"说话人{speaker_counter}"
speaker_counter += 1
speaker_name = speaker_map[speaker]
def format_ts(seconds):
return str(datetime.timedelta(seconds=int(seconds))) + f".{int((seconds % 1) * 1000):03d}"
start_str = format_ts(seg["start"])
end_str = format_ts(seg["end"])
line = f"[{start_str} - {end_str}] {speaker_name}:{seg['text']}"
output_lines.append(line)
with open("transcription_with_speakers.txt", "w", encoding="utf-8") as f:
f.write("\n".join(output_lines))
return "transcription_with_speakers.txt"
# def main(audio_file, is_multispeaker, language, model_size):
# start_time = time.time()
# result_file = transcribe_multi(audio_file, language, model_size) if is_multispeaker else transcribe_single(audio_file, language, model_size)
# end_time = time.time()
# elapsed = end_time - start_time
# time_info = f"⏱️ 转录耗时:{elapsed:.2f} 秒"
# return result_file, time_info
# 运行转录任务,放在线程里
import threading
def main_with_progress(audio_file, is_multispeaker, language, model_size):
start_time = time.time()
yield None, "⏳ 正在转录,请稍等...", None
result_file_holder = {"file": None}
def transcribe_task():
result_file_holder["file"] = transcribe_multi(audio_file, language, model_size) if is_multispeaker else transcribe_single(audio_file, language, model_size)
thread = threading.Thread(target=transcribe_task)
thread.start()
# 每秒更新状态,直到任务完成
while thread.is_alive():
elapsed = time.time() - start_time
yield None, f"⏳ 正在转录中... 已耗时 {elapsed:.1f} 秒", None
time.sleep(1)
# 完成后显示最终信息
elapsed = time.time() - start_time
result_file = result_file_holder["file"]
yield result_file, f"✅ 转录完成,⏱️总耗时:{elapsed:.2f} 秒"
with gr.Blocks() as demo:
gr.Markdown("# Whisper + PyAnnote 音频转录系统")
audio_input = gr.Audio(type="filepath", label="上传音频")
is_multi = gr.Checkbox(label="是否为多人对话音频(启用说话人分离)")
language = gr.Dropdown(
choices=[
("自动识别", "auto"),
("英语 (English)", "en"),
("中文 (Chinese)", "zh"),
("法语 (French)", "fr"),
("德语 (German)", "de"),
("西班牙语 (Spanish)", "es"),
("日语 (Japanese)", "ja"),
("韩语 (Korean)", "ko"),
("葡萄牙语 (Portuguese)", "pt"),
("俄语 (Russian)", "ru"),
],
value="auto",
label="音频语言"
)
model_size = gr.Dropdown(
choices=[
("tiny (39M)", "tiny"),
("base (74M)", "base"),
("small (244M)", "small"),
("medium (769M)", "medium"),
("large (1550M)", "large")
],
value="base",
label="Whisper 模型规模"
)
# status_box = gr.Textbox(label="状态更新", interactive=False)
output_file = gr.File(label="转录结果(.txt)")
elapsed_time = gr.Textbox(label="处理用时", interactive=False)
run_btn = gr.Button("开始转录")
run_btn.click(
fn=main_with_progress,
inputs=[audio_input, is_multi, language, model_size],
outputs=[output_file, elapsed_time]
)
if __name__ == "__main__":
demo.launch()