Spaces:
Runtime error
Runtime error
| from modelscope.pipelines import pipeline as pipeline_ali | |
| from modelscope.utils.constant import Tasks | |
| from moviepy.editor import VideoFileClip | |
| import httpx, json | |
| import os | |
| ROOT_DIR = os.path.dirname(os.path.abspath(__file__)) | |
| import ffmpeg | |
| from faster_whisper import WhisperModel | |
| import math | |
| import torch | |
| from transformers import AutoTokenizer, AutoModelForSeq2SeqLM,pipeline | |
| from slicer2 import Slicer | |
| import librosa | |
| import soundfile | |
| from funasr import AutoModel | |
| from funasr.utils.postprocess_utils import rich_transcription_postprocess | |
| # 指定本地目录 | |
| local_dir_root = "./models_from_modelscope" | |
| # model_dir_cirm = snapshot_download('damo/speech_frcrn_ans_cirm_16k', cache_dir=local_dir_root) | |
| # model_dir_ins = snapshot_download('damo/nlp_csanmt_translation_en2zh', cache_dir=local_dir_root) | |
| model_dir_cirm = f'{ROOT_DIR}/models_from_modelscope/damo/speech_frcrn_ans_cirm_16k' | |
| model_dir_ins = f'{ROOT_DIR}/models_from_modelscope/damo/nlp_csanmt_translation_en2zh' | |
| device = "cuda" if torch.cuda.is_available() else "cpu" | |
| import ollama | |
| def deep_tran(text,_s,_t): | |
| deeplx_api = "http://127.0.0.1:1188/translate" | |
| data = { | |
| "text": text, | |
| "source_lang": _s, | |
| "target_lang": _t | |
| } | |
| post_data = json.dumps(data) | |
| r = httpx.post(url = deeplx_api, data = post_data).json() | |
| print(r["data"]) | |
| return r["data"] | |
| # 合并字幕 | |
| def merge_sub(video_path,srt_path): | |
| if os.path.exists("test_srt.mp4"): | |
| os.remove("test_srt.mp4") | |
| ffmpeg.input(video_path).output("test_srt.mp4", vf="subtitles=" + srt_path).run() | |
| return "test_srt.mp4" | |
| def make_tran_ja2zh_neverLife(srt_path): | |
| model_path = "neverLife/nllb-200-distilled-600M-ja-zh" | |
| model = AutoModelForSeq2SeqLM.from_pretrained(model_path, from_pt=True) | |
| tokenizer = AutoTokenizer.from_pretrained(model_path, src_lang="jpn_Jpan", tgt_lang="zho_Hans", from_pt=True) | |
| # pipe = pipeline(model="larryvrh/mt5-translation-ja_zh") | |
| with open(srt_path, 'r',encoding="utf-8") as file: | |
| gweight_data = file.read() | |
| result = gweight_data.split("\n\n") | |
| if os.path.exists("./two.srt"): | |
| os.remove("./two.srt") | |
| for res in result: | |
| line_srt = res.split("\n") | |
| try: | |
| # translated_text = pipe(f'<-ja2zh-> {line_srt[2]}')[0]['translation_text'] | |
| # print(translated_text) | |
| input_ids = tokenizer.encode(line_srt[2], max_length=128, padding=True, return_tensors='pt') | |
| outputs = model.generate(input_ids, num_beams=4, max_new_tokens=128) | |
| translated_text = tokenizer.decode(outputs[0], skip_special_tokens=True) | |
| print(translated_text) | |
| except IndexError as e: | |
| # 处理下标越界异常 | |
| print(f"翻译完毕") | |
| break | |
| except Exception as e: | |
| print(str(e)) | |
| with open("./two.srt","a",encoding="utf-8")as f:f.write(f"{line_srt[0]}\n{line_srt[1]}\n{line_srt[2]}\n{translated_text}\n\n") | |
| with open("./two.srt","r",encoding="utf-8") as f: | |
| content = f.read() | |
| return content | |
| def make_tran_ko2zh(srt_path): | |
| # pipe = pipeline(model="yesj1234/mbart_cycle1_ko-zh",device=device,from_pt=True) | |
| model_path = "./model_from_hg/ko-zh/" | |
| tokenizer = AutoTokenizer.from_pretrained(model_path,local_files_only=True) | |
| model = AutoModelForSeq2SeqLM.from_pretrained(model_path,local_files_only=True) | |
| with open(srt_path, 'r',encoding="utf-8") as file: | |
| gweight_data = file.read() | |
| result = gweight_data.split("\n\n") | |
| if os.path.exists("./two.srt"): | |
| os.remove("./two.srt") | |
| for res in result: | |
| line_srt = res.split("\n") | |
| try: | |
| # translated_text = pipe(f'<-ja2zh-> {line_srt[2]}')[0]['translation_text'] | |
| # print(translated_text) | |
| input_ids = tokenizer.encode(line_srt[2], max_length=128, padding=True, return_tensors='pt') | |
| outputs = model.generate(input_ids, num_beams=4, max_new_tokens=128) | |
| translated_text = tokenizer.decode(outputs[0], skip_special_tokens=True) | |
| print(translated_text) | |
| except IndexError as e: | |
| # 处理下标越界异常 | |
| print(f"翻译完毕") | |
| break | |
| except Exception as e: | |
| print(str(e)) | |
| with open("./two.srt","a",encoding="utf-8")as f:f.write(f"{line_srt[0]}\n{line_srt[1]}\n{line_srt[2]}\n{translated_text}\n\n") | |
| with open("./two.srt","r",encoding="utf-8") as f: | |
| content = f.read() | |
| return content | |
| def make_tran_ja2zh(srt_path): | |
| # pipe = pipeline(model="larryvrh/mt5-translation-ja_zh",device=device) | |
| model_path = "./model_from_hg/ja-zh/" | |
| tokenizer = AutoTokenizer.from_pretrained(model_path,local_files_only=True) | |
| model = AutoModelForSeq2SeqLM.from_pretrained(model_path,local_files_only=True) | |
| with open(srt_path, 'r',encoding="utf-8") as file: | |
| gweight_data = file.read() | |
| result = gweight_data.split("\n\n") | |
| if os.path.exists("./two.srt"): | |
| os.remove("./two.srt") | |
| for res in result: | |
| line_srt = res.split("\n") | |
| try: | |
| # translated_text = pipe(f'<-ja2zh-> {line_srt[2]}')[0]['translation_text'] | |
| # print(translated_text) | |
| input_ids = tokenizer.encode(f'<-ja2zh-> {line_srt[2]}', max_length=128, padding=True, return_tensors='pt') | |
| outputs = model.generate(input_ids, num_beams=4, max_new_tokens=128) | |
| translated_text = tokenizer.decode(outputs[0], skip_special_tokens=True) | |
| print(translated_text) | |
| except IndexError as e: | |
| # 处理下标越界异常 | |
| print(f"翻译完毕") | |
| break | |
| except Exception as e: | |
| print(str(e)) | |
| with open("./two.srt","a",encoding="utf-8")as f:f.write(f"{line_srt[0]}\n{line_srt[1]}\n{line_srt[2]}\n{translated_text}\n\n") | |
| with open("./two.srt","r",encoding="utf-8") as f: | |
| content = f.read() | |
| return content | |
| def make_tran_zh2en(srt_path): | |
| model_path = "./model_from_hg/zh-en/" | |
| tokenizer = AutoTokenizer.from_pretrained(model_path,local_files_only=True) | |
| model = AutoModelForSeq2SeqLM.from_pretrained(model_path,local_files_only=True) | |
| with open(srt_path, 'r',encoding="utf-8") as file: | |
| gweight_data = file.read() | |
| result = gweight_data.split("\n\n") | |
| if os.path.exists("./two.srt"): | |
| os.remove("./two.srt") | |
| for res in result: | |
| line_srt = res.split("\n") | |
| try: | |
| tokenized_text = tokenizer.prepare_seq2seq_batch([line_srt[2]], return_tensors='pt') | |
| translation = model.generate(**tokenized_text) | |
| translated_text = tokenizer.batch_decode(translation, skip_special_tokens=False)[0] | |
| translated_text = translated_text.replace("<pad>","").replace("</s>","").strip() | |
| print(translated_text) | |
| except IndexError as e: | |
| # 处理下标越界异常 | |
| print(f"翻译完毕") | |
| break | |
| except Exception as e: | |
| print(str(e)) | |
| with open("./two.srt","a",encoding="utf-8")as f:f.write(f"{line_srt[0]}\n{line_srt[1]}\n{line_srt[2]}\n{translated_text}\n\n") | |
| with open("./two.srt","r",encoding="utf-8") as f: | |
| content = f.read() | |
| return content | |
| # 翻译字幕 英译中 | |
| def make_tran(srt_path): | |
| model_path = "./model_from_hg/en-zh/" | |
| tokenizer = AutoTokenizer.from_pretrained(model_path,local_files_only=True) | |
| model = AutoModelForSeq2SeqLM.from_pretrained(model_path,local_files_only=True) | |
| with open(srt_path, 'r',encoding="utf-8") as file: | |
| gweight_data = file.read() | |
| result = gweight_data.split("\n\n") | |
| if os.path.exists("./two.srt"): | |
| os.remove("./two.srt") | |
| for res in result: | |
| line_srt = res.split("\n") | |
| try: | |
| tokenized_text = tokenizer.prepare_seq2seq_batch([line_srt[2]], return_tensors='pt') | |
| translation = model.generate(**tokenized_text) | |
| translated_text = tokenizer.batch_decode(translation, skip_special_tokens=False)[0] | |
| translated_text = translated_text.replace("<pad>","").replace("</s>","").strip() | |
| print(translated_text) | |
| except IndexError as e: | |
| # 处理下标越界异常 | |
| print(f"翻译完毕") | |
| break | |
| except Exception as e: | |
| print(str(e)) | |
| with open("./two.srt","a",encoding="utf-8")as f:f.write(f"{line_srt[0]}\n{line_srt[1]}\n{line_srt[2]}\n{translated_text}\n\n") | |
| with open("./two.srt","r",encoding="utf-8") as f: | |
| content = f.read() | |
| return content | |
| # 翻译字幕 deepl | |
| def make_tran_deep(srt_path,_s,_t): | |
| with open(srt_path, 'r',encoding="utf-8") as file: | |
| gweight_data = file.read() | |
| result = gweight_data.split("\n\n") | |
| if os.path.exists(f"{ROOT_DIR}/output/two.srt"): | |
| os.remove(f"{ROOT_DIR}/output/two.srt") | |
| if os.path.exists(f"{ROOT_DIR}/output/t_sin_{_t}.srt"): | |
| os.remove(f"{ROOT_DIR}/output/t_sin_{_t}.srt") | |
| for res in result: | |
| line_srt = res.split("\n") | |
| try: | |
| text = line_srt[2] | |
| translated_text = deep_tran(text,_s,_t) | |
| with open(f"{ROOT_DIR}/output/two.srt","a",encoding="utf-8")as f:f.write(f"{line_srt[0]}\n{line_srt[1]}\n{line_srt[2]}\n{translated_text}\n\n") | |
| with open(f"{ROOT_DIR}/output/t_sin_{_t}.srt","a",encoding="utf-8")as f:f.write(f"{line_srt[0]}\n{line_srt[1]}\n{translated_text}\n\n") | |
| except IndexError as e: | |
| print(str(e)) | |
| # 处理下标越界异常 | |
| print(f"翻译完毕") | |
| break | |
| except Exception as e: | |
| print(str(e)) | |
| with open(f"{ROOT_DIR}/output/two.srt","r",encoding="utf-8") as f: | |
| content = f.read() | |
| with open(f"{ROOT_DIR}/output/t_sin_{_t}.srt","r",encoding="utf-8") as f: | |
| content_2 = f.read() | |
| return content,content_2,f"{ROOT_DIR}/output/t_sin_{_t}.srt" | |
| # 翻译字幕 英译中 qwen2 | |
| def make_tran_qwen2(model_name,srt_path,lang): | |
| with open(srt_path, 'r',encoding="utf-8") as file: | |
| gweight_data = file.read() | |
| result = gweight_data.split("\n\n") | |
| if os.path.exists(f"{ROOT_DIR}/output/two.srt"): | |
| os.remove(f"{ROOT_DIR}/output/two.srt") | |
| if os.path.exists(f"{ROOT_DIR}/output/two_single.srt"): | |
| os.remove(f"{ROOT_DIR}/output/two_single.srt") | |
| for res in result: | |
| line_srt = res.split("\n") | |
| try: | |
| if lang == "zh": | |
| lang = "中文" | |
| elif lang == "en": | |
| lang = "英文" | |
| elif lang == "ja": | |
| lang = "日文" | |
| elif lang == "ko": | |
| lang = "韩文" | |
| text = line_srt[2] | |
| content = f'"{text}" 翻译为{lang},只给我文本的翻译,别添加其他的内容,因为我要做字幕,谢谢' | |
| response = ollama.chat(model=model_name,messages=[ | |
| { | |
| 'role':'user', | |
| 'content':content | |
| }]) | |
| translated_text = response['message']['content'] | |
| print(translated_text) | |
| except IndexError as e: | |
| # 处理下标越界异常 | |
| print(f"翻译完毕") | |
| break | |
| except Exception as e: | |
| print(str(e)) | |
| with open(f"{ROOT_DIR}/output/two.srt","a",encoding="utf-8")as f:f.write(f"{line_srt[0]}\n{line_srt[1]}\n{line_srt[2]}\n{translated_text}\n\n") | |
| with open(f"{ROOT_DIR}/output/two_single.srt","a",encoding="utf-8")as f:f.write(f"{line_srt[0]}\n{line_srt[1]}\n{translated_text}\n\n") | |
| with open(f"{ROOT_DIR}/output/two.srt","r",encoding="utf-8") as f: | |
| content = f.read() | |
| with open(f"{ROOT_DIR}/output/two_single.srt","r",encoding="utf-8") as f: | |
| content_2 = f.read() | |
| return content,content_2 | |
| # # 翻译字幕 | |
| # def make_tran_ali(): | |
| # pipeline_ins = pipeline(task=Tasks.translation, model=model_dir_ins) | |
| # with open("./video.srt", 'r',encoding="utf-8") as file: | |
| # gweight_data = file.read() | |
| # result = gweight_data.split("\n\n") | |
| # if os.path.exists("./two.srt"): | |
| # os.remove("./two.srt") | |
| # for res in result: | |
| # line_srt = res.split("\n") | |
| # try: | |
| # outputs = pipeline_ins(input=line_srt[2]) | |
| # print(outputs['translation']) | |
| # except IndexError as e: | |
| # # 处理下标越界异常 | |
| # print(f"翻译完毕") | |
| # break | |
| # except Exception as e: | |
| # print(str(e)) | |
| # with open("./two.srt","a",encoding="utf-8")as f:f.write(f"{line_srt[0]}\n{line_srt[1]}\n{line_srt[2]}\n{outputs['translation']}\n\n") | |
| # return "翻译完毕" | |
| def convert_seconds_to_hms(seconds): | |
| hours, remainder = divmod(seconds, 3600) | |
| minutes, seconds = divmod(remainder, 60) | |
| milliseconds = math.floor((seconds % 1) * 1000) | |
| output = f"{int(hours):02}:{int(minutes):02}:{int(seconds):02},{milliseconds:03}" | |
| return output | |
| emo_dict = { | |
| "<|HAPPY|>": "😊", | |
| "<|SAD|>": "😔", | |
| "<|ANGRY|>": "😡", | |
| "<|NEUTRAL|>": "", | |
| "<|FEARFUL|>": "😰", | |
| "<|DISGUSTED|>": "🤢", | |
| "<|SURPRISED|>": "😮", | |
| } | |
| event_dict = { | |
| "<|BGM|>": "🎼", | |
| "<|Speech|>": "", | |
| "<|Applause|>": "👏", | |
| "<|Laughter|>": "😀", | |
| "<|Cry|>": "😭", | |
| "<|Sneeze|>": "🤧", | |
| "<|Breath|>": "", | |
| "<|Cough|>": "🤧", | |
| } | |
| emoji_dict = { | |
| "<|nospeech|><|Event_UNK|>": "", | |
| "<|zh|>": "", | |
| "<|en|>": "", | |
| "<|yue|>": "", | |
| "<|ja|>": "", | |
| "<|ko|>": "", | |
| "<|nospeech|>": "", | |
| "<|HAPPY|>": "", | |
| "<|SAD|>": "", | |
| "<|ANGRY|>": "", | |
| "<|NEUTRAL|>": "", | |
| "<|BGM|>": "", | |
| "<|Speech|>": "", | |
| "<|Applause|>": "", | |
| "<|Laughter|>": "", | |
| "<|FEARFUL|>": "", | |
| "<|DISGUSTED|>": "", | |
| "<|SURPRISED|>": "", | |
| "<|Cry|>": "", | |
| "<|EMO_UNKNOWN|>": "", | |
| "<|Sneeze|>": "", | |
| "<|Breath|>": "", | |
| "<|Cough|>": "", | |
| "<|Sing|>": "", | |
| "<|Speech_Noise|>": "", | |
| "<|withitn|>": "", | |
| "<|woitn|>": "", | |
| "<|GBG|>": "", | |
| "<|Event_UNK|>": "", | |
| } | |
| lang_dict = { | |
| "<|zh|>": "<|lang|>", | |
| "<|en|>": "<|lang|>", | |
| "<|yue|>": "<|lang|>", | |
| "<|ja|>": "<|lang|>", | |
| "<|ko|>": "<|lang|>", | |
| "<|nospeech|>": "<|lang|>", | |
| } | |
| emo_set = {"😊", "😔", "😡", "😰", "🤢", "😮"} | |
| event_set = {"🎼", "👏", "😀", "😭", "🤧", "😷",} | |
| lang2token = { | |
| 'zh': "ZH|", | |
| 'ja': "JP|", | |
| "en": "EN|", | |
| "ko": "KO|", | |
| "yue": "YUE|", | |
| } | |
| def format_str(s): | |
| for sptk in emoji_dict: | |
| s = s.replace(sptk, emoji_dict[sptk]) | |
| return s | |
| def format_str_v2(s): | |
| sptk_dict = {} | |
| for sptk in emoji_dict: | |
| sptk_dict[sptk] = s.count(sptk) | |
| s = s.replace(sptk, "") | |
| emo = "<|NEUTRAL|>" | |
| for e in emo_dict: | |
| if sptk_dict[e] > sptk_dict[emo]: | |
| emo = e | |
| for e in event_dict: | |
| if sptk_dict[e] > 0: | |
| s = event_dict[e] + s | |
| s = s + emo_dict[emo] | |
| for emoji in emo_set.union(event_set): | |
| s = s.replace(" " + emoji, emoji) | |
| s = s.replace(emoji + " ", emoji) | |
| return s.strip() | |
| def format_str_v3(s): | |
| def get_emo(s): | |
| return s[-1] if s[-1] in emo_set else None | |
| def get_event(s): | |
| return s[0] if s[0] in event_set else None | |
| s = s.replace("<|nospeech|><|Event_UNK|>", "❓") | |
| for lang in lang_dict: | |
| s = s.replace(lang, "<|lang|>") | |
| s_list = [format_str_v2(s_i).strip(" ") for s_i in s.split("<|lang|>")] | |
| new_s = " " + s_list[0] | |
| cur_ent_event = get_event(new_s) | |
| for i in range(1, len(s_list)): | |
| if len(s_list[i]) == 0: | |
| continue | |
| if get_event(s_list[i]) == cur_ent_event and get_event(s_list[i]) != None: | |
| s_list[i] = s_list[i][1:] | |
| #else: | |
| cur_ent_event = get_event(s_list[i]) | |
| if get_emo(s_list[i]) != None and get_emo(s_list[i]) == get_emo(new_s): | |
| new_s = new_s[:-1] | |
| new_s += s_list[i].strip().lstrip() | |
| new_s = new_s.replace("The.", " ") | |
| return new_s.strip() | |
| def ms_to_srt_time(ms): | |
| N = int(ms) | |
| hours, remainder = divmod(N, 3600000) | |
| minutes, remainder = divmod(remainder, 60000) | |
| seconds, milliseconds = divmod(remainder, 1000) | |
| timesrt = f"{hours:02d}:{minutes:02d}:{seconds:02d},{milliseconds:03d}" | |
| # print(timesrt) | |
| return timesrt | |
| def time_to_srt(time_in_seconds): | |
| """ | |
| 将秒数转换为 SRT 时间戳格式。 | |
| Args: | |
| time_in_seconds: 秒数。 | |
| Returns: | |
| 一个 SRT 时间戳字符串。 | |
| """ | |
| milliseconds = int(time_in_seconds * 1000) | |
| hours = milliseconds // 3600000 | |
| minutes = (milliseconds % 3600000) // 60000 | |
| seconds = (milliseconds % 60000) // 1000 | |
| milliseconds = milliseconds % 1000 | |
| return f"{hours:02}:{minutes:02}:{seconds:02},{milliseconds:03}" | |
| # 制作字幕文件 阿里 | |
| def make_srt_sv(file_path): | |
| model_dir = "iic/SenseVoiceSmall" | |
| input_file = (file_path) | |
| model = AutoModel(model=model_dir, | |
| vad_model="fsmn-vad", | |
| vad_kwargs={"max_single_segment_time": 30000}, | |
| trust_remote_code=True, device="cuda:0") | |
| res = model.generate( | |
| input=input_file, | |
| cache={}, | |
| language="auto", # "zn", "en", "yue", "ja", "ko", "nospeech" | |
| use_itn=False, | |
| batch_size_s=0, | |
| ) | |
| print(res) | |
| text = res[0]["text"] | |
| # text = format_str_v3(text) | |
| text = rich_transcription_postprocess(text) | |
| print(text) | |
| return text | |
| # for filename in os.listdir("./wavs"): | |
| # if filename.endswith(".wav"): | |
| # filepath = os.path.join("./wavs/", filename) | |
| # try: | |
| # if os.path.isfile(filepath): | |
| # os.remove(filepath) | |
| # print(f"已删除文件: {filepath}") | |
| # except Exception as e: | |
| # print(f"删除文件时出错: {filepath} - {e}") | |
| # # 第一步,先切片 | |
| # audio, sr = librosa.load(file_path, sr=None, mono=False) | |
| # # 创建Slicer对象 | |
| # slicer = Slicer( | |
| # sr=sr, | |
| # threshold=-40, | |
| # min_length=1500, | |
| # min_interval=300, | |
| # hop_size=1, | |
| # max_sil_kept=150000 | |
| # ) | |
| # # 切割音频 | |
| # chunks = slicer.slice(audio) | |
| # for i, chunk in enumerate(chunks): | |
| # if len(chunk.shape) > 1: | |
| # chunk = chunk.T # Swap axes if the audio is stereo. | |
| # soundfile.write(f'./wavs/chunk_{i}.wav', chunk, sr) | |
| # srtlines = [] | |
| # audio_samples = 0 | |
| # audio_opt = [] | |
| # for filename in os.listdir("./wavs"): | |
| # if filename.endswith(".wav"): | |
| # filepath = os.path.join("./wavs/", filename) | |
| # print(filepath) | |
| # model_dir = "iic/SenseVoiceSmall" | |
| # input_file = (filepath) | |
| # model = AutoModel(model=model_dir, | |
| # vad_model="fsmn-vad", | |
| # vad_kwargs={"max_single_segment_time": 30000}, | |
| # trust_remote_code=True, device="cuda:0") | |
| # res = model.generate( | |
| # input=input_file, | |
| # cache={}, | |
| # language="auto", # "zn", "en", "yue", "ja", "ko", "nospeech" | |
| # use_itn=False, | |
| # batch_size_s=0, | |
| # ) | |
| # # print(res) | |
| # text = res[0]["text"] | |
| # # text = format_str_v3(text) | |
| # text = rich_transcription_postprocess(text) | |
| # print(text) | |
| # audio, sampling_rate = soundfile.read(filepath) | |
| # audio_opt.append(audio) | |
| # srtline_begin=ms_to_srt_time(audio_samples*1000.0 / sampling_rate) | |
| # audio_samples += audio.size | |
| # srtline_end=ms_to_srt_time(audio_samples*1000.0 / sampling_rate) | |
| # srtlines.append(f"{len(audio_opt)}\n") | |
| # srtlines.append(srtline_begin+' --> '+srtline_end+"\n") | |
| # srtlines.append(text+"\n\n") | |
| # exit(-1) | |
| with open('./video.srt', 'w', encoding='utf-8') as f: | |
| f.writelines(srtlines) | |
| with open("./video.srt","r",encoding="utf-8") as f: | |
| content = f.read() | |
| return content | |
| # 制作字幕文件 | |
| def make_srt(file_path,model_name="small"): | |
| # if device == "cuda": | |
| # model = WhisperModel(model_name, device="cuda", compute_type="float16",download_root="./model_from_whisper",local_files_only=False) | |
| # else: | |
| # model = WhisperModel(model_name, device="cpu", compute_type="int8",download_root="./model_from_whisper",local_files_only=False) | |
| # or run on GPU with INT8 | |
| # model = WhisperModel(model_size, device="cuda", compute_type="int8_float16") | |
| if model_name != "faster-whisper-large-v3-turbo-ct2": | |
| if device == "cuda": | |
| try: | |
| model = WhisperModel(model_name, device="cuda", compute_type="float16",download_root="./model_from_whisper",local_files_only=False) | |
| except Exception as e: | |
| model = WhisperModel(model_name, device="cuda", compute_type="int8_float16",download_root="./model_from_whisper",local_files_only=False) | |
| else: | |
| model = WhisperModel(model_name, device="cpu", compute_type="int8",download_root="./model_from_whisper",local_files_only=False) | |
| else: | |
| model_name = f"{ROOT_DIR}/faster-whisper-large-v3-turbo-ct2" | |
| print(model_name) | |
| if device == "cuda": | |
| try: | |
| model = WhisperModel(model_name, device="cuda", compute_type="float16") | |
| except Exception as e: | |
| model = WhisperModel(model_name, device="cuda", compute_type="int8_float16") | |
| else: | |
| model = WhisperModel(model_name, device="cpu", compute_type="int8") | |
| segments, info = model.transcribe(file_path, beam_size=5,vad_filter=True,vad_parameters=dict(min_silence_duration_ms=500)) | |
| print("Detected language '%s' with probability %f" % (info.language, info.language_probability)) | |
| count = 0 | |
| with open(f'{ROOT_DIR}/output/video.srt', 'w',encoding="utf-8") as f: # Open file for writing | |
| for segment in segments: | |
| count +=1 | |
| duration = f"{convert_seconds_to_hms(segment.start)} --> {convert_seconds_to_hms(segment.end)}\n" | |
| text = f"{segment.text.lstrip()}\n\n" | |
| f.write(f"{count}\n{duration}{text}") # Write formatted string to the file | |
| print(f"{duration}{text}",end='') | |
| with open(f"{ROOT_DIR}/output/video.srt","r",encoding="utf-8") as f: | |
| content = f.read() | |
| return content | |
| # 提取人声 | |
| def movie2audio(video_path): | |
| # 读取视频文件 | |
| video = VideoFileClip(video_path) | |
| # 提取视频文件中的声音 | |
| audio = video.audio | |
| # 将声音保存为WAV格式 | |
| audio.write_audiofile(f"{ROOT_DIR}/audio.wav") | |
| ans = pipeline_ali( | |
| Tasks.acoustic_noise_suppression, | |
| model=model_dir_cirm) | |
| ans(f'{ROOT_DIR}/audio.wav',output_path=f'{ROOT_DIR}/output.wav') | |
| return f"{ROOT_DIR}/output.wav" | |