Spaces:
Sleeping
Sleeping
| # -*- coding: utf-8 -*- | |
| import gradio as gr | |
| import whisper | |
| import os | |
| import subprocess | |
| import pandas as pd | |
| import tempfile | |
| import traceback | |
| from docx import Document | |
| import time | |
| import numpy as np | |
| import soundfile as sf | |
| from scipy import signal | |
| from datetime import datetime | |
| # 模型速度設置(調整後的值) | |
| model_speed = { | |
| "tiny.en": 1.0, # 原為 2.0 | |
| "tiny": 0.8, # 原為 2.0 | |
| "base.en": 0.7, # 原為 1.5 | |
| "base": 0.6, # 原為 1.5 | |
| "small.en": 0.5, # 原為 1.0 | |
| "small": 0.4, # 原為 1.0 | |
| "medium.en": 0.3, # 原為 0.75 | |
| "medium": 0.25, # 原為 0.75 | |
| "large-v1": 0.2, # 原為 0.5 | |
| "large-v2": 0.2, # 原為 0.5 | |
| "large-v3": 0.2, # 原為 0.5 | |
| "large": 0.2, # 原為 0.5 | |
| "large-v3-turbo": 0.25, # 原為 0.6 | |
| "turbo": 0.25 # 原為 0.6 | |
| } | |
| # "tiny.en": 32.0, # 最快速的模型 | |
| # "tiny": 32.0, | |
| # "base.en": 16.0, # 比tiny慢一半 | |
| # "base": 16.0, | |
| # "small.en": 6.0, # 比base慢約2.7倍 | |
| # "small": 6.0, | |
| # "medium.en": 2.0, # 比small慢3倍 | |
| # "medium": 2.0, | |
| # "large-v1": 1.0, # 最慢的基準模型 | |
| # "large-v2": 1.0, | |
| # "large-v3": 1.0, | |
| # "large": 1.0, | |
| # "large-v3-turbo": 1.5, # turbo版本稍快 | |
| # "turbo": 1.5 | |
| # 添加模型特色說明 | |
| model_features = { | |
| "tiny.en": "最小且最快的英文專用模型,適合簡單的英文語音。", | |
| "tiny": "最小且最快的多語言模型,適合簡單的多語言語音。", | |
| "base.en": "基礎英文專用模型,平衡速度和準確度。", | |
| "base": "基礎多語言模型,平衡速度和準確度。", | |
| "small.en": "較準確的英文專用模型,適合一般英文轉錄。", | |
| "small": "較準確的多語言模型,適合一般多語言轉錄。", | |
| "medium.en": "高準確度的英文專用模型,適合複雜英文內容。", | |
| "medium": "高準確度的多語言模型,適合複雜多語言內容。", | |
| "large-v1": "最早版本的大型模型,提供最佳準確度。", | |
| "large-v2": "改進版大型模型,提供更好的多語言支援。", | |
| "large-v3": "最新版大型模型,整體性能更優。", | |
| "large": "大型模型的最新版本別名。", | |
| "large-v3-turbo": "針對速度優化的 large-v3 模型。", | |
| "turbo": "針對速度優化的最新模型別名。" | |
| } | |
| # 定義可用模型選項 | |
| model_choices = list(model_speed.keys()) | |
| # 支援的音訊和影像格式 | |
| supported_audio_formats = [".mp3", ".wav", ".flac", ".aac", ".ogg", ".m4a"] | |
| supported_video_formats = [".mp4", ".mov", ".avi", ".mkv", ".webm"] | |
| # 支援的導出格式 | |
| export_formats = ['.txt', '.md', '.srt', '.docx'] | |
| # 全局模型緩存 | |
| loaded_models = {} | |
| def save_audio(audio_data, sr): | |
| try: | |
| if audio_data is None: | |
| print("無效的音頻數據") | |
| return None | |
| print(f"原始音頻數據類型: {type(audio_data)}") | |
| print(f"原始採樣率: {sr[:10]}...") | |
| # 創建臨時音頻文件 | |
| with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as temp_audio: | |
| # 使用固定採樣率 | |
| target_sr = 16000 | |
| # 使用採樣率數組作為音頻數據,並進行重採樣 | |
| if isinstance(sr, np.ndarray): | |
| audio_data = sr.astype(np.float32) | |
| # 假設原始採樣率為44100 | |
| original_sr = 44100 | |
| # 計算重採樣後的長度 | |
| new_length = int(len(audio_data) * target_sr / original_sr) | |
| # 重採樣 | |
| audio_data = signal.resample(audio_data, new_length) | |
| else: | |
| print("無法獲取有效的音頻數據") | |
| return None | |
| # 確保是二維數組 | |
| if audio_data.ndim == 1: | |
| audio_data = audio_data.reshape(-1, 1) | |
| # 歸一化音頻數據 | |
| max_val = np.abs(audio_data).max() | |
| if max_val > 0: | |
| audio_data = audio_data / max_val | |
| print(f"處理後的音頻數據形狀: {audio_data.shape}, 採樣率: {target_sr}, 數據類型: {audio_data.dtype}") | |
| print(f"音頻數據範圍: [{audio_data.min():.3f}, {audio_data.max():.3f}]") | |
| # 保存音頻文件 | |
| sf.write(temp_audio.name, audio_data, target_sr) | |
| # 驗證文件 | |
| if os.path.exists(temp_audio.name) and os.path.getsize(temp_audio.name) > 0: | |
| print(f"成功創建音頻文件: {temp_audio.name}") | |
| return temp_audio.name | |
| else: | |
| print("音頻文件創建失敗") | |
| return None | |
| except Exception as e: | |
| print(f"保存音頻失敗: {str(e)}") | |
| traceback.print_exc() | |
| return None | |
| # 顯示所有模型的預估轉錄時間 | |
| def estimate_all_models_transcription_time(file_path): | |
| try: | |
| file_extension = os.path.splitext(file_path)[1].lower() | |
| # 如果是影片文件,提取音訊 | |
| if file_extension in supported_video_formats: | |
| with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as temp_audio: | |
| audio_path = extract_audio_from_video(file_path, temp_audio.name) | |
| audio_duration = get_media_duration(audio_path) | |
| os.remove(audio_path) # 刪除臨時音訊文件 | |
| else: | |
| audio_duration = get_media_duration(file_path) | |
| # 建立預估時間的表格 | |
| estimates = [] | |
| for model_name in model_choices: | |
| estimated_time = audio_duration / model_speed[model_name] | |
| estimates.append({"模型名稱": model_name, | |
| "模型特色": model_features[model_name], | |
| "預估轉錄時間 (秒)": f"{estimated_time:.2f}"}) | |
| df = pd.DataFrame(estimates) | |
| return df | |
| except Exception as e: | |
| print(f"估算轉錄時間失敗: {str(e)}\n{traceback.format_exc()}") | |
| return f"估算轉錄時間失敗: {str(e)}" | |
| def display_model_estimations(file): | |
| if file is not None: | |
| return estimate_all_models_transcription_time(file.name) | |
| else: | |
| return "請上傳音訊或影片文件" | |
| # 加載遠端模型 | |
| # def load_model(model_name): | |
| # if model_name not in loaded_models: | |
| # print(f"正在加載模型:{model_name}") | |
| # try: | |
| # loaded_models[model_name] = whisper.load_model(model_name) | |
| # except Exception as e: | |
| # print(f"模型加載失敗: {str(e)}\n{traceback.format_exc()}") | |
| # raise RuntimeError(f"模型加載失敗: {str(e)}") | |
| # return loaded_models[model_name] | |
| # 加載本地模型 | |
| def load_model(model_name): | |
| if model_name not in loaded_models: | |
| print(f"正在加載本地模型:{model_name}") | |
| try: | |
| model_path = f"./models/{model_name}.pt" # 本地模型路徑 | |
| loaded_models[model_name] = whisper.load_model(model_path) # 加載本地模型 | |
| except Exception as e: | |
| print(f"模型加載失敗: {str(e)}\n{traceback.format_exc()}") | |
| raise RuntimeError(f"模型加載失敗: {str(e)}") | |
| return loaded_models[model_name] | |
| # 獲取媒體長度 | |
| def get_media_duration(file_path): | |
| try: | |
| command = [ | |
| "ffprobe", "-v", "error", "-show_entries", "format=duration", | |
| "-of", "default=noprint_wrappers=1:nokey=1", file_path | |
| ] | |
| result = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, check=True) | |
| duration = float(result.stdout.strip()) | |
| return duration | |
| except subprocess.CalledProcessError as e: | |
| raise RuntimeError(f"獲取媒體時長失敗: {e.stderr.strip()}") | |
| except Exception as e: | |
| raise RuntimeError(f"其他錯誤: {str(e)}\n{traceback.format_exc()}") | |
| # 提取影片音訊 | |
| def extract_audio_from_video(video_path, output_audio_path="extracted_audio.wav"): | |
| try: | |
| command = [ | |
| "ffmpeg", "-i", video_path, "-vn", "-acodec", "pcm_s16le", | |
| "-ar", "44100", "-ac", "2", output_audio_path | |
| ] | |
| subprocess.run(command, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) | |
| return output_audio_path | |
| except subprocess.CalledProcessError as e: | |
| raise RuntimeError(f"音訊提取失敗: {e.stderr.decode()}") | |
| def save_transcription_to_file(transcription, export_format, include_timestamps, result, file_name): | |
| if include_timestamps and "segments" in result: | |
| # 包含時間軸的處理 | |
| if export_format == ".srt": | |
| srt_content = "" | |
| for i, segment in enumerate(result["segments"], start=1): | |
| start = segment['start'] | |
| end = segment['end'] | |
| text = segment['text'] | |
| srt_content += f"{i}\n{format_srt_time(start)} --> {format_srt_time(end)}\n{text}\n\n" | |
| transcription = srt_content | |
| elif export_format == ".docx": | |
| doc = Document() | |
| for segment in result["segments"]: | |
| start = segment['start'] | |
| end = segment['end'] | |
| text = segment['text'] | |
| doc.add_paragraph(f"[{format_srt_time(start)} - {format_srt_time(end)}] {text}") | |
| doc.save(file_name) | |
| return file_name | |
| else: | |
| # 不包含時間軸的處理 | |
| transcription = "\n".join(segment["text"] for segment in result.get("segments", [{"text": transcription}])) | |
| # 保存純文字內容或包含時間軸內容到文件 | |
| if export_format == ".txt": | |
| with open(file_name, "w", encoding='utf-8') as f: | |
| f.write(transcription) | |
| elif export_format == ".md": | |
| with open(file_name, "w", encoding='utf-8') as f: | |
| f.write(transcription) | |
| elif export_format == ".srt": | |
| with open(file_name, "w", encoding='utf-8') as f: | |
| f.write(transcription) | |
| elif export_format == ".docx": | |
| doc = Document() | |
| doc.add_paragraph(transcription) | |
| doc.save(file_name) | |
| return file_name | |
| # 格式化時間為 SRT 格式 mm:ss | |
| def format_srt_time(seconds): | |
| minutes = int(seconds // 60) | |
| seconds = int(seconds % 60) | |
| return f"{minutes:02}:{seconds:02}" | |
| # 全局變數,用於存儲轉錄結果 | |
| transcription_result = {} | |
| def transcribe_and_export_with_progress(model_name, file, prompt): | |
| global transcription_result | |
| if file is None: | |
| yield "請上傳音訊或影片檔案", None | |
| return | |
| try: | |
| file_extension = os.path.splitext(file.name)[1].lower() | |
| # 提取音訊(若為影片) | |
| if file_extension in supported_video_formats: | |
| yield "開始提取音訊...", None | |
| with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as temp_audio: | |
| audio_path = extract_audio_from_video(file.name, temp_audio.name) | |
| else: | |
| audio_path = file.name | |
| # 獲取音訊總時長 | |
| total_duration = get_media_duration(audio_path) | |
| model_speed_factor = model_speed.get(model_name, 1.0) # 使用模型速度 | |
| estimated_total_time = total_duration / model_speed_factor # 預估轉錄時間 | |
| yield f"準備開始轉錄\n音訊總長度:{total_duration:.2f} 秒,預計完成時間:{estimated_total_time:.2f} 秒", None | |
| # 開始轉錄並顯示進度 | |
| print("開始轉錄音訊...") | |
| model = load_model(model_name) | |
| # 開始計時 | |
| start_time = time.time() | |
| def update_progress(): | |
| current_time = time.time() - start_time | |
| if current_time <= estimated_total_time: | |
| remaining_time = max(0, estimated_total_time - current_time) | |
| return (f"轉錄進行中...\n" | |
| f"已經過時間:{current_time:.1f} 秒\n" | |
| f"預估剩餘時間:{remaining_time:.1f} 秒") | |
| else: | |
| return (f"轉錄進行中...\n" | |
| f"已經過時間:{current_time:.1f} 秒\n" | |
| f"尚在處理中,請耐心等候~") | |
| # 每秒更新進度 | |
| while time.time() - start_time < estimated_total_time: | |
| yield update_progress(), None | |
| time.sleep(1) | |
| result = model.transcribe(audio_path, initial_prompt=prompt, word_timestamps=True) | |
| # 生成轉錄文本 | |
| transcription = "" | |
| for segment in result["segments"]: | |
| start = segment["start"] | |
| end = segment["end"] | |
| text = segment["text"] | |
| transcription += f"[{format_srt_time(start)} - {format_srt_time(end)}] {text}\n" | |
| # 計算總耗時 | |
| total_time = time.time() - start_time | |
| # 將結果保存到全局變數中 | |
| transcription_result = result | |
| # 最終完成時返回完整轉錄結果 | |
| yield f"轉錄完成!\n總耗時:{total_time:.1f} 秒", transcription | |
| except Exception as e: | |
| error_message = f"處理過程中出現錯誤: {str(e)}\n{traceback.format_exc()}" | |
| print(error_message) | |
| yield error_message, None | |
| # 設置 Gradio 界面 | |
| with gr.Blocks(theme=gr.themes.Default(primary_hue=gr.themes.colors.yellow, secondary_hue=gr.themes.colors.red)) as interface: | |
| # 標題和描述 | |
| gr.HTML(""" | |
| <div style='width: 800px; color: white;'> | |
| <h1>Whisper工具:音訊/影片轉錄成逐字稿</h1> | |
| <h3>使用者選擇音訊輸入方式:上傳檔案或是透過麥克風錄音,平台以Whisper模型進行內容轉錄。</h3> | |
| </div> | |
| """) | |
| with gr.Row(): | |
| with gr.Column(scale=2): | |
| gr.Markdown("請上傳音訊或影片檔案") | |
| upload_file = gr.File(label="上傳音訊或影片檔案", file_count="single") | |
| with gr.Column(scale=3): | |
| # audio_input = gr.Audio(type="numpy", label="使用麥克風錄音", sources=["microphone"]) | |
| gr.Markdown("請使用麥克風進行錄音作輸入") | |
| audio_input = gr.Audio( | |
| label="以麥克風錄音", | |
| sources=["microphone"], | |
| type="numpy", | |
| format="wav", | |
| #show_download_button=False # 指定格式為 wav | |
| streaming=False, # 確保完整錄製 | |
| show_download_button=True # 添加下載按鈕,讓使用者可以保存和播放錄音 | |
| ) | |
| # 播放用的 Audio 組件 | |
| # audio_playback = gr.Audio( | |
| # label="錄音播放", | |
| # type="filepath", | |
| # visible=True, | |
| # interactive=False | |
| # ) | |
| with gr.Row(): | |
| with gr.Column(scale=2): | |
| model_dropdown = gr.Dropdown(choices=model_choices, label="選擇模型") | |
| prompt_input = gr.Textbox(label="提示詞(非必填)", placeholder="可輸入領域專用詞或提示詞,幫助轉錄精確化") | |
| transcribe_button = gr.Button("開始轉錄") | |
| with gr.Column(scale=3): | |
| estimation_output = gr.Dataframe(label="模型特色 / 轉錄時間預估") | |
| progress_text = gr.Textbox(label="進度狀態(本數據會因網路及裝置等資源條件異動,僅估算供參考)", interactive=False, lines=3) | |
| # with gr.Row(): | |
| transcription_output = gr.Textbox(label="轉錄結果", interactive=False, lines=10) | |
| with gr.Row(visible=False) as options_row: | |
| with gr.Column(scale=2): | |
| export_dropdown = gr.Dropdown(choices=export_formats, label="選擇導出格式") | |
| include_timestamps_export = gr.Checkbox(label="導出時包含時間軸", value=True) # 默認包含時間軸 | |
| file_name_input = gr.Textbox(label="自定義檔案名稱(非必填)", placeholder="輸入您欲存檔的檔案名稱(不含副檔名)") | |
| with gr.Column(scale=3): | |
| generate_button = gr.Button("生成檔案") | |
| download_link = gr.File(label="下載結果", interactive=False, visible=False) | |
| # 修改 upload_file.upload 為新的函數 | |
| def handle_input(file_or_audio, is_file=True): | |
| try: | |
| if is_file and file_or_audio is not None: | |
| return display_model_estimations(file_or_audio) | |
| elif not is_file and file_or_audio is not None: | |
| audio_data, sr = file_or_audio | |
| audio_path = save_audio(audio_data, sr) | |
| if audio_path: | |
| return estimate_all_models_transcription_time(audio_path) | |
| # 修改這裡:返回空的 DataFrame 而不是字符串 | |
| return pd.DataFrame(columns=["模型名稱", "模型特色", "預估轉錄時間 (秒)"]) | |
| except Exception as e: | |
| print(f"處理輸入時發生錯誤: {str(e)}") | |
| # 發生錯誤時也返回空的 DataFrame | |
| return pd.DataFrame(columns=["模型名稱", "模型特色", "預估轉錄時間 (秒)"]) | |
| # 綁定輸入事件 | |
| upload_file.upload( | |
| fn=lambda x: handle_input(x, True), | |
| inputs=upload_file, | |
| outputs=estimation_output | |
| ) | |
| audio_input.stop_recording( | |
| fn=lambda x: handle_input(x, False), | |
| inputs=audio_input, | |
| outputs=estimation_output | |
| ) | |
| # 修改轉錄按鈕的處理函數 | |
| def handle_transcription(model_name, uploaded_file, recorded_audio, prompt): | |
| try: | |
| if uploaded_file is not None: | |
| # 處理上傳檔案的情況 | |
| for progress, transcription in transcribe_and_export_with_progress( | |
| model_name, | |
| uploaded_file, | |
| prompt | |
| ): | |
| yield progress, transcription | |
| elif recorded_audio is not None: | |
| # 處理錄音的情況 | |
| audio_data, sr = recorded_audio | |
| audio_path = save_audio(audio_data, sr) | |
| if audio_path: | |
| class AudioFile: | |
| def __init__(self, name): | |
| self.name = name | |
| for progress, transcription in transcribe_and_export_with_progress( | |
| model_name, | |
| AudioFile(audio_path), | |
| prompt | |
| ): | |
| yield progress, transcription | |
| else: | |
| yield "音頻保存失敗或數據無效,請重新錄音", None | |
| else: | |
| yield "請提供音訊輸入", None | |
| except Exception as e: | |
| error_msg = f"轉錄錯誤: {str(e)}" | |
| print(error_msg) | |
| traceback.print_exc() | |
| yield error_msg, None | |
| transcribe_button.click( | |
| fn=handle_transcription, | |
| inputs=[model_dropdown, upload_file, audio_input, prompt_input], | |
| outputs=[progress_text, transcription_output], | |
| show_progress=True | |
| ) | |
| transcription_output.change( | |
| fn=lambda x: gr.update(visible=True) if x else gr.update(visible=False), | |
| inputs=transcription_output, | |
| outputs=options_row | |
| ) | |
| # 文件生成處理,考慮「導出時包含時間軸」選項 | |
| def handle_file_generation(transcription, export_format, include_timestamps, custom_file_name, uploaded_file=None, is_recording=False): | |
| global transcription_result # 使用全局變數 | |
| if not custom_file_name: # 如果沒有自定義檔案名稱 | |
| if uploaded_file is not None: # 如果是上傳檔案 | |
| # 獲取上傳文件的名稱,並去掉副檔名 | |
| base_name = os.path.splitext(os.path.basename(uploaded_file.name))[0] | |
| file_name = f"{base_name}_transcription.{export_format[1:]}" | |
| elif is_recording: # 如果是錄音 | |
| # 使用當前時間作為檔案名稱 | |
| current_time = datetime.now().strftime("%Y-%m-%d-%H-%M-%S") | |
| file_name = f"recording_{current_time}.{export_format[1:]}" | |
| else: | |
| return "無效的輸入來源" | |
| else: | |
| # 使用自定義檔案名稱 | |
| file_name = f"{custom_file_name}.{export_format[1:]}" | |
| return save_transcription_to_file( | |
| transcription, | |
| export_format, | |
| include_timestamps, | |
| result=transcription_result, | |
| file_name=file_name | |
| ) | |
| generate_button.click( | |
| fn=handle_file_generation, | |
| inputs=[ | |
| transcription_output, | |
| export_dropdown, | |
| include_timestamps_export, | |
| file_name_input, | |
| upload_file, | |
| audio_input # 添加錄音輸入 | |
| ], | |
| outputs=download_link | |
| ) | |
| download_link.change( | |
| fn=lambda x: gr.update(visible=True) if x else gr.update(visible=False), | |
| inputs=download_link, | |
| outputs=download_link | |
| ) | |
| interface.queue().launch() | |