Spaces:
Sleeping
Sleeping
File size: 7,791 Bytes
45de075 d2c5e0d 45de075 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 |
import gradio as gr
import whisper
import subprocess
import os
import time
import re
import datetime
from pyannote.audio import Pipeline
print("Gradio version:", gr.__version__)
# huggingface token 用于访问 pyannote 模型(替换为你的)
hf_token = os.getenv("HF_TOKEN")
# Whisper 模型加载(提前加载以加速)
# asr_model = whisper.load_model("base").to("cuda")
diarization_pipeline = Pipeline.from_pretrained("pyannote/speaker-diarization", use_auth_token=hf_token)
def get_audio_duration(filename):
result = subprocess.run(
['ffprobe', '-v', 'error', '-show_entries', 'format=duration',
'-of', 'default=noprint_wrappers=1:nokey=1', filename],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
return float(result.stdout)
def transcribe_single(audio_path, language="auto", model_size="base"):
input_path = "audio.mp3"
os.rename(audio_path, input_path)
asr_model = whisper.load_model(model_size)#.to("cuda")
# Step 1: 静音检测
silence_cmd = f"ffmpeg -i {input_path} -af silencedetect=noise=-30dB:d=1 -f null - 2> silence_log.txt"
os.system(silence_cmd)
audio_duration = get_audio_duration(input_path)
# Step 2: 解析 silence_log.txt
silence_starts, silence_ends = [], []
with open("silence_log.txt", "r") as f:
for line in f:
if "silence_start" in line:
match = re.search(r"silence_start: (\d+\.?\d*)", line)
if match:
silence_starts.append(float(match.group(1)))
elif "silence_end" in line:
match = re.search(r"silence_end: (\d+\.?\d*)", line)
if match:
silence_ends.append(float(match.group(1)))
silence_starts.append(audio_duration)
silence_ends.append(audio_duration)
# Step 3: 分段
MIN_TARGET, MAX_TARGET = 480, 600
segments = []
current_start = 0.0
for i in range(len(silence_starts)):
silence_point = silence_starts[i]
segment_length = silence_point - current_start
if segment_length >= MIN_TARGET:
segment_end = silence_point if segment_length <= MAX_TARGET else current_start + MAX_TARGET
segments.append((current_start, segment_end))
current_start = silence_ends[i]
if current_start < audio_duration:
segments.append((current_start, None))
# Step 4: 分段 + whisper
output_lines = []
for idx, (start, end) in enumerate(segments):
chunk_file = f"chunk_{idx:03d}.mp3"
cmd = f"ffmpeg -i {input_path} -ss {start:.2f}"
if end:
cmd += f" -to {end:.2f}"
cmd += f" -c copy {chunk_file}"
os.system(cmd)
result = asr_model.transcribe(chunk_file, language=language)
output_lines.append(result["text"].strip())
os.remove(chunk_file)
with open("transcription_output.txt", "w", encoding="utf-8") as f:
f.write("\n".join(output_lines))
return "transcription_output.txt"
def transcribe_multi(audio_path, language="auto", model_size="base"):
input_path = "audio_multi.mp3"
os.rename(audio_path, input_path)
asr_model = whisper.load_model(model_size).to("cuda")
diarization = diarization_pipeline(input_path)
segments = []
for turn, _, speaker in diarization.itertracks(yield_label=True):
start_time = turn.start
end_time = turn.end
speaker_label = speaker
tmp_chunk = f"tmp_{start_time:.2f}_{end_time:.2f}.wav"
os.system(f"ffmpeg -y -i {input_path} -ss {start_time:.3f} -to {end_time:.3f} -ar 16000 -ac 1 -loglevel error {tmp_chunk}")
result = asr_model.transcribe(tmp_chunk, language=language)
text = result['text'].strip()
os.remove(tmp_chunk)
if text:
segments.append({
"start": start_time,
"end": end_time,
"speaker": speaker_label,
"text": text
})
speaker_map = {}
speaker_counter = 1
output_lines = []
for seg in segments:
speaker = seg["speaker"]
if speaker not in speaker_map:
speaker_map[speaker] = f"说话人{speaker_counter}"
speaker_counter += 1
speaker_name = speaker_map[speaker]
def format_ts(seconds):
return str(datetime.timedelta(seconds=int(seconds))) + f".{int((seconds % 1) * 1000):03d}"
start_str = format_ts(seg["start"])
end_str = format_ts(seg["end"])
line = f"[{start_str} - {end_str}] {speaker_name}:{seg['text']}"
output_lines.append(line)
with open("transcription_with_speakers.txt", "w", encoding="utf-8") as f:
f.write("\n".join(output_lines))
return "transcription_with_speakers.txt"
# def main(audio_file, is_multispeaker, language, model_size):
# start_time = time.time()
# result_file = transcribe_multi(audio_file, language, model_size) if is_multispeaker else transcribe_single(audio_file, language, model_size)
# end_time = time.time()
# elapsed = end_time - start_time
# time_info = f"⏱️ 转录耗时:{elapsed:.2f} 秒"
# return result_file, time_info
# 运行转录任务,放在线程里
import threading
def main_with_progress(audio_file, is_multispeaker, language, model_size):
start_time = time.time()
yield None, "⏳ 正在转录,请稍等...", None
result_file_holder = {"file": None}
def transcribe_task():
result_file_holder["file"] = transcribe_multi(audio_file, language, model_size) if is_multispeaker else transcribe_single(audio_file, language, model_size)
thread = threading.Thread(target=transcribe_task)
thread.start()
# 每秒更新状态,直到任务完成
while thread.is_alive():
elapsed = time.time() - start_time
yield None, f"⏳ 正在转录中... 已耗时 {elapsed:.1f} 秒", None
time.sleep(1)
# 完成后显示最终信息
elapsed = time.time() - start_time
result_file = result_file_holder["file"]
yield result_file, f"✅ 转录完成,⏱️总耗时:{elapsed:.2f} 秒"
with gr.Blocks() as demo:
gr.Markdown("# Whisper + PyAnnote 音频转录系统")
audio_input = gr.Audio(type="filepath", label="上传音频")
is_multi = gr.Checkbox(label="是否为多人对话音频(启用说话人分离)")
language = gr.Dropdown(
choices=[
("自动识别", "auto"),
("英语 (English)", "en"),
("中文 (Chinese)", "zh"),
("法语 (French)", "fr"),
("德语 (German)", "de"),
("西班牙语 (Spanish)", "es"),
("日语 (Japanese)", "ja"),
("韩语 (Korean)", "ko"),
("葡萄牙语 (Portuguese)", "pt"),
("俄语 (Russian)", "ru"),
],
value="auto",
label="音频语言"
)
model_size = gr.Dropdown(
choices=[
("tiny (39M)", "tiny"),
("base (74M)", "base"),
("small (244M)", "small"),
("medium (769M)", "medium"),
("large (1550M)", "large")
],
value="base",
label="Whisper 模型规模"
)
# status_box = gr.Textbox(label="状态更新", interactive=False)
output_file = gr.File(label="转录结果(.txt)")
elapsed_time = gr.Textbox(label="处理用时", interactive=False)
run_btn = gr.Button("开始转录")
run_btn.click(
fn=main_with_progress,
inputs=[audio_input, is_multi, language, model_size],
outputs=[output_file, elapsed_time]
)
if __name__ == "__main__":
demo.launch()
|