| | |
| | |
| | |
| | |
| |
|
| | import re |
| | import os |
| | import sys |
| | import copy |
| | import librosa |
| | import logging |
| | import argparse |
| | import numpy as np |
| | import soundfile as sf |
| | from moviepy.editor import * |
| | import moviepy.editor as mpy |
| | from moviepy.video.tools.subtitles import SubtitlesClip, TextClip |
| | from moviepy.editor import VideoFileClip, concatenate_videoclips |
| | from moviepy.video.compositing import CompositeVideoClip |
| | from utils.subtitle_utils import generate_srt, generate_srt_clip,generate_audio_srt,trans_format |
| | from utils.argparse_tools import ArgumentParser, get_commandline_args |
| | from utils.trans_utils import pre_proc, proc, write_state, load_state, proc_spk, convert_pcm_to_float |
| | import whisper |
| |
|
| | class VideoClipper(): |
| | def __init__(self, model): |
| | logging.warning("Initializing VideoClipper.") |
| | |
| | self.GLOBAL_COUNT = 0 |
| | self.model = model |
| |
|
| | def recog(self, audio_input, state=None, output_dir=None,text=None): |
| | ''' |
| | 将音频输入转化为文本。它可以选择性地进行说话人分离(SD, Speaker Diarization)和生成字幕文件(SRT格式)。 |
| | return: |
| | res_text:识别出的文本内容。 |
| | res_srt:识别内容生成的 SRT 字幕格式。 |
| | state:包含了识别的原始结果、时间戳和句子信息的状态字典 |
| | ''' |
| | if state is None: |
| | state = {} |
| | sr, data = audio_input |
| |
|
| | |
| | data = convert_pcm_to_float(data) |
| |
|
| | |
| | if sr != 16000: |
| | data = librosa.resample(data, orig_sr=sr, target_sr=16000) |
| | if len(data.shape) == 2: |
| | logging.warning("Input wav shape: {}, only first channel reserved.".format(data.shape)) |
| | data = data[:,0] |
| | state['audio_input'] = (sr, data) |
| | rec_result = trans_format(text) |
| | res_srt = generate_srt(rec_result[0]['sentence_info']) |
| | state['recog_res_raw'] = rec_result[0]['raw_text'] |
| | state['timestamp'] = rec_result[0]['timestamp'] |
| | state['sentences'] = rec_result[0]['sentence_info'] |
| | res_text = rec_result[0]['text'] |
| | return res_text, res_srt, state |
| | |
| |
|
| | def clip(self, dest_text, start_ost, end_ost, state, dest_spk=None, output_dir=None, timestamp_list=None): |
| | |
| | ''' |
| | dest_text:目标文本,根据这个文本内容来定位音频中相应的片段。 |
| | start_ost 和 end_ost:起始和结束时间偏移量,用于微调音频片段的起止位置。 |
| | state:包含函数执行所需的数据状态,例如音频数据、识别结果、时间戳等。 |
| | dest_spk:目标说话者,如果指定了这个参数,函数会根据说话者信息来提取音频片段。 |
| | output_dir:输出目录,用于保存结果。 |
| | timestamp_list:时间戳列表,如果提供了时间戳,则直接按照这些时间戳提取音频片段。 |
| | ''' |
| | audio_input = state['audio_input'] |
| | recog_res_raw = state['recog_res_raw'] |
| | timestamp = state['timestamp'] |
| | sentences = state['sentences'] |
| | sr, data = audio_input |
| | data = data.astype(np.float64) |
| |
|
| | if timestamp_list is None: |
| | all_ts = [] |
| | if dest_spk is None or dest_spk == '' or 'sd_sentences' not in state: |
| | for _dest_text in dest_text.split('#'): |
| | if '[' in _dest_text: |
| | match = re.search(r'\[(\d+),\s*(\d+)\]', _dest_text) |
| | if match: |
| | offset_b, offset_e = map(int, match.groups()) |
| | log_append = "" |
| | else: |
| | offset_b, offset_e = 0, 0 |
| | log_append = "(Bracket detected in dest_text but offset time matching failed)" |
| | _dest_text = _dest_text[:_dest_text.find('[')] |
| | else: |
| | log_append = "" |
| | offset_b, offset_e = 0, 0 |
| | _dest_text = pre_proc(_dest_text) |
| | ts = proc(recog_res_raw, timestamp, _dest_text) |
| | for _ts in ts: all_ts.append([_ts[0]+offset_b*16, _ts[1]+offset_e*16]) |
| | if len(ts) > 1 and match: |
| | log_append += '(offsets detected but No.{} sub-sentence matched to {} periods in audio, \ |
| | offsets are applied to all periods)' |
| | else: |
| | for _dest_spk in dest_spk.split('#'): |
| | ts = proc_spk(_dest_spk, state['sd_sentences']) |
| | for _ts in ts: all_ts.append(_ts) |
| | log_append = "" |
| | else: |
| | all_ts = timestamp_list |
| | ts = all_ts |
| | |
| | srt_index = 0 |
| | clip_srt = "" |
| | if len(ts): |
| | start, end = ts[0] |
| | start = min(max(0, start+start_ost*16), len(data)) |
| | end = min(max(0, end+end_ost*16), len(data)) |
| | res_audio = data[start:end] |
| | start_end_info = "from {} to {}".format(start/16000, end/16000) |
| | srt_clip, _, srt_index = generate_srt_clip(sentences, start/16000.0, end/16000.0, begin_index=srt_index) |
| | clip_srt += srt_clip |
| | for _ts in ts[1:]: |
| | start, end = _ts |
| | start = min(max(0, start+start_ost*16), len(data)) |
| | end = min(max(0, end+end_ost*16), len(data)) |
| | start_end_info += ", from {} to {}".format(start, end) |
| | res_audio = np.concatenate([res_audio, data[start+start_ost*16:end+end_ost*16]], -1) |
| | srt_clip, _, srt_index = generate_srt_clip(sentences, start/16000.0, end/16000.0, begin_index=srt_index-1) |
| | clip_srt += srt_clip |
| | if len(ts): |
| | message = "{} periods found in the speech: ".format(len(ts)) + start_end_info + log_append |
| | else: |
| | message = "No period found in the speech, return raw speech. You may check the recognition result and try other destination text." |
| | res_audio = data |
| | return (sr, res_audio), message, clip_srt |
| |
|
| | def video_recog(self, video_filename, output_dir=None,ASR="whisper"): |
| | '''通过处理视频获得想要的视频、音频以及其他信息''' |
| | video = mpy.VideoFileClip(video_filename) |
| | |
| | if output_dir is not None: |
| | os.makedirs(output_dir, exist_ok=True) |
| | _, base_name = os.path.split(video_filename) |
| | base_name, _ = os.path.splitext(base_name) |
| | clip_video_file = base_name + '_clip.mp4' |
| | audio_file = base_name + '.wav' |
| | audio_file = os.path.join(output_dir, audio_file) |
| | else: |
| | base_name, _ = os.path.splitext(video_filename) |
| | clip_video_file = base_name + '_clip.mp4' |
| | audio_file = base_name + '.wav' |
| | video.audio.write_audiofile(audio_file) |
| | |
| | result_audio = self.model.transcribe(audio_file,language = "zh", word_timestamps=True) |
| | wav = librosa.load(audio_file, sr=16000)[0] |
| | |
| | if os.path.exists(audio_file): |
| | os.remove(audio_file) |
| | state = { |
| | 'video_filename': video_filename, |
| | 'clip_video_file': clip_video_file, |
| | 'video': video, |
| | } |
| | return self.recog((16000, wav), state, output_dir,text=result_audio) |
| |
|
| |
|
| | def video_clip(self, |
| | dest_text, |
| | start_ost, |
| | end_ost, |
| | state, |
| | font_size=32, |
| | font_color='white', |
| | add_sub=False, |
| | dest_spk=None, |
| | output_dir=None, |
| | timestamp_list=None): |
| | |
| | recog_res_raw = state['recog_res_raw'] |
| | timestamp = state['timestamp'] |
| | sentences = state['sentences'] |
| | video = state['video'] |
| | clip_video_file = state['clip_video_file'] |
| | video_filename = state['video_filename'] |
| | |
| | if timestamp_list is None: |
| | all_ts = [] |
| | if dest_spk is None or dest_spk == '' or 'sd_sentences' not in state: |
| | for _dest_text in dest_text.split('#'): |
| | if '[' in _dest_text: |
| | match = re.search(r'\[(\d+),\s*(\d+)\]', _dest_text) |
| | if match: |
| | offset_b, offset_e = map(int, match.groups()) |
| | log_append = "" |
| | else: |
| | offset_b, offset_e = 0, 0 |
| | log_append = "(Bracket detected in dest_text but offset time matching failed)" |
| | _dest_text = _dest_text[:_dest_text.find('[')] |
| | else: |
| | offset_b, offset_e = 0, 0 |
| | log_append = "" |
| | |
| | _dest_text = pre_proc(_dest_text) |
| | ts = proc(recog_res_raw, timestamp, _dest_text.lower()) |
| | for _ts in ts: all_ts.append([_ts[0]+offset_b*16, _ts[1]+offset_e*16]) |
| | if len(ts) > 1 and match: |
| | log_append += '(offsets detected but No.{} sub-sentence matched to {} periods in audio, \ |
| | offsets are applied to all periods)' |
| | else: |
| | for _dest_spk in dest_spk.split('#'): |
| | ts = proc_spk(_dest_spk, state['sd_sentences']) |
| | for _ts in ts: all_ts.append(_ts) |
| | else: |
| | all_ts = [[i[0]*16.0, i[1]*16.0] for i in timestamp_list] |
| | |
| | srt_index = 0 |
| | time_acc_ost = 0.0 |
| | ts = all_ts |
| | |
| | clip_srt = "" |
| | if len(ts): |
| | |
| | |
| | start, end = ts[0][0] / 16000, ts[0][1] / 16000 |
| | srt_clip, subs, srt_index = generate_srt_clip(sentences, start, end, begin_index=srt_index, time_acc_ost=time_acc_ost) |
| | start, end = start+start_ost/1000.0, end+end_ost/1000.0 |
| | video_clip = video.subclip(start, end) |
| | start_end_info = "from {} to {}".format(start, end) |
| | clip_srt += srt_clip |
| | if add_sub: |
| | generator = lambda txt: TextClip(txt, font='./font/STHeitiMedium.ttc', fontsize=font_size, color=font_color) |
| | subtitles = SubtitlesClip(subs, generator) |
| | video_clip = CompositeVideoClip([video_clip, subtitles.set_pos(('center','bottom'))]) |
| | concate_clip = [video_clip] |
| | time_acc_ost += end+end_ost/1000.0 - (start+start_ost/1000.0) |
| | for _ts in ts[1:]: |
| | start, end = _ts[0] / 16000, _ts[1] / 16000 |
| | srt_clip, subs, srt_index = generate_srt_clip(sentences, start, end, begin_index=srt_index-1, time_acc_ost=time_acc_ost) |
| | if not len(subs): |
| | continue |
| | chi_subs = [] |
| | sub_starts = subs[0][0][0] |
| | for sub in subs: |
| | chi_subs.append(((sub[0][0]-sub_starts, sub[0][1]-sub_starts), sub[1])) |
| | start, end = start+start_ost/1000.0, end+end_ost/1000.0 |
| | _video_clip = video.subclip(start, end) |
| | start_end_info += ", from {} to {}".format(str(start)[:5], str(end)[:5]) |
| | clip_srt += srt_clip |
| | if add_sub: |
| | generator = lambda txt: TextClip(txt, font='./font/STHeitiMedium.ttc', fontsize=font_size, color=font_color) |
| | subtitles = SubtitlesClip(chi_subs, generator) |
| | _video_clip = CompositeVideoClip([_video_clip, subtitles.set_pos(('center','bottom'))]) |
| | |
| | concate_clip.append(copy.copy(_video_clip)) |
| | time_acc_ost += end+end_ost/1000.0 - (start+start_ost/1000.0) |
| | message = "{} periods found in the audio: ".format(len(ts)) + start_end_info |
| | logging.warning("Concating...") |
| | if len(concate_clip) > 1: |
| | video_clip = concatenate_videoclips(concate_clip) |
| | |
| | if output_dir is not None: |
| | os.makedirs(output_dir, exist_ok=True) |
| | _, file_with_extension = os.path.split(clip_video_file) |
| | clip_video_file_name, _ = os.path.splitext(file_with_extension) |
| | print(output_dir, clip_video_file) |
| | clip_video_file = os.path.join(output_dir, "{}_no{}.mp4".format(clip_video_file_name, self.GLOBAL_COUNT)) |
| | temp_audio_file = os.path.join(output_dir, "{}_tempaudio_no{}.mp4".format(clip_video_file_name, self.GLOBAL_COUNT)) |
| | else: |
| | clip_video_file = clip_video_file[:-4] + '_no{}.mp4'.format(self.GLOBAL_COUNT) |
| | temp_audio_file = clip_video_file[:-4] + '_tempaudio_no{}.mp4'.format(self.GLOBAL_COUNT) |
| | video_clip.write_videofile(clip_video_file, audio_codec="aac", temp_audiofile=temp_audio_file,fps=25) |
| | self.GLOBAL_COUNT += 1 |
| | else: |
| | clip_video_file = video_filename |
| | message = "No period found in the audio, return raw speech. You may check the recognition result and try other destination text." |
| | srt_clip = '' |
| | return clip_video_file, message, clip_srt |
| |
|
| |
|
| | def get_parser(): |
| | parser = ArgumentParser( |
| | description="ClipVideo Argument", |
| | formatter_class=argparse.ArgumentDefaultsHelpFormatter, |
| | ) |
| | parser.add_argument( |
| | "--stage", |
| | type=int, |
| | choices=(1, 2), |
| | help="Stage, 0 for recognizing and 1 for clipping", |
| | required=True |
| | ) |
| | parser.add_argument( |
| | "--file", |
| | type=str, |
| | default=None, |
| | help="Input file path", |
| | required=True |
| | ) |
| | parser.add_argument( |
| | "--sd_switch", |
| | type=str, |
| | choices=("no", "yes"), |
| | default="no", |
| | help="Turn on the speaker diarization or not", |
| | ) |
| | parser.add_argument( |
| | "--output_dir", |
| | type=str, |
| | default='./output', |
| | help="Output files path", |
| | ) |
| | parser.add_argument( |
| | "--dest_text", |
| | type=str, |
| | default=None, |
| | help="Destination text string for clipping", |
| | ) |
| | parser.add_argument( |
| | "--dest_spk", |
| | type=str, |
| | default=None, |
| | help="Destination spk id for clipping", |
| | ) |
| | parser.add_argument( |
| | "--start_ost", |
| | type=int, |
| | default=0, |
| | help="Offset time in ms at beginning for clipping" |
| | ) |
| | parser.add_argument( |
| | "--end_ost", |
| | type=int, |
| | default=0, |
| | help="Offset time in ms at ending for clipping" |
| | ) |
| | parser.add_argument( |
| | "--output_file", |
| | type=str, |
| | default=None, |
| | help="Output file path" |
| | ) |
| | parser.add_argument( |
| | "--lang", |
| | type=str, |
| | default='zh', |
| | help="language" |
| | ) |
| | return parser |
| |
|
| |
|