Spaces:
Sleeping
Sleeping
| from pathlib import Path | |
| from typing import List, Union | |
| import random | |
| import re | |
| from datetime import timedelta | |
| from tqdm import trange | |
| import numpy as np | |
| import librosa | |
| import cv2 | |
| from zhon.hanzi import punctuation as zh_punc | |
| from moviepy.editor import ImageClip, AudioFileClip, CompositeAudioClip, \ | |
| CompositeVideoClip, ColorClip, VideoFileClip, VideoClip, TextClip, concatenate_audioclips | |
| import moviepy.video.compositing.transitions as transfx | |
| from moviepy.audio.AudioClip import AudioArrayClip | |
| from moviepy.audio.fx.all import audio_loop | |
| from moviepy.video.tools.subtitles import SubtitlesClip | |
| def generate_srt(timestamps: List, | |
| captions: List, | |
| save_path: Union[str, Path], | |
| max_single_length: int = 30): | |
| def format_time(seconds: float) -> str: | |
| td = timedelta(seconds=seconds) | |
| total_seconds = int(td.total_seconds()) | |
| millis = int((td.total_seconds() - total_seconds) * 1000) | |
| hours, remainder = divmod(total_seconds, 3600) | |
| minutes, seconds = divmod(remainder, 60) | |
| return f"{hours:02}:{minutes:02}:{seconds:02},{millis:03}" | |
| srt_content = [] | |
| num_caps = len(timestamps) | |
| for idx in range(num_caps): | |
| start_time, end_time = timestamps[idx] | |
| caption_chunks = split_caption(captions[idx], max_single_length).split("\n") | |
| num_chunks = len(caption_chunks) | |
| if num_chunks == 0: | |
| continue | |
| segment_duration = (end_time - start_time) / num_chunks | |
| for chunk_idx, chunk in enumerate(caption_chunks): | |
| chunk_start_time = start_time + segment_duration * chunk_idx | |
| chunk_end_time = start_time + segment_duration * (chunk_idx + 1) | |
| start_time_str = format_time(chunk_start_time) | |
| end_time_str = format_time(chunk_end_time) | |
| srt_content.append(f"{len(srt_content) // 2 + 1}\n{start_time_str} --> {end_time_str}\n{chunk}\n\n") | |
| with open(save_path, 'w') as srt_file: | |
| srt_file.writelines(srt_content) | |
| def add_caption(captions: List, | |
| srt_path: Union[str, Path], | |
| timestamps: List, | |
| video_clip: VideoClip, | |
| max_single_length: int = 30, | |
| **caption_config): | |
| generate_srt(timestamps, captions, srt_path, max_single_length) | |
| generator = lambda txt: TextClip(txt, **caption_config) | |
| subtitles = SubtitlesClip(srt_path.__str__(), generator) | |
| captioned_clip = CompositeVideoClip([video_clip, | |
| subtitles.set_position(("center", "bottom"), relative=True)]) | |
| return captioned_clip | |
| def split_keep_separator(text, separator): | |
| pattern = f'([{re.escape(separator)}])' | |
| pieces = re.split(pattern, text) | |
| return pieces | |
| def split_caption(caption, max_length=30): | |
| lines = [] | |
| if ord(caption[0]) >= ord("a") and ord(caption[0]) <= ord("z") or ord(caption[0]) >= ord("A") and ord(caption[0]) <= ord("Z"): | |
| words = caption.split(" ") | |
| current_words = [] | |
| for word in words: | |
| if len(" ".join(current_words + [word])) <= max_length: | |
| current_words += [word] | |
| else: | |
| if current_words: | |
| lines.append(" ".join(current_words)) | |
| current_words = [word] | |
| if current_words: | |
| lines.append(" ".join(current_words)) | |
| else: | |
| sentences = split_keep_separator(caption, zh_punc) | |
| current_line = "" | |
| for sentence in sentences: | |
| if len(current_line + sentence) <= max_length: | |
| current_line += sentence | |
| else: | |
| if current_line: | |
| lines.append(current_line) | |
| current_line = "" | |
| if sentence.startswith(tuple(zh_punc)): | |
| if lines: | |
| lines[-1] += sentence[0] | |
| current_line = sentence[1:] | |
| else: | |
| current_line = sentence | |
| if current_line: | |
| lines.append(current_line.strip()) | |
| return '\n'.join(lines) | |
| def add_bottom_black_area(clip: VideoFileClip, | |
| black_area_height: int = 64): | |
| """ | |
| Add a black area at the bottom of the video clip (for captions). | |
| Args: | |
| clip (VideoFileClip): Video clip to be processed. | |
| black_area_height (int): Height of the black area. | |
| Returns: | |
| VideoFileClip: Processed video clip. | |
| """ | |
| black_bar = ColorClip(size=(clip.w, black_area_height), color=(0, 0, 0), duration=clip.duration) | |
| extended_clip = CompositeVideoClip([clip, black_bar.set_position(("center", "bottom"))]) | |
| return extended_clip | |
| def add_zoom_effect(clip, speed=1.0, mode='in', position='center'): | |
| fps = clip.fps | |
| duration = clip.duration | |
| total_frames = int(duration * fps) | |
| def main(getframe, t): | |
| frame = getframe(t) | |
| h, w = frame.shape[: 2] | |
| i = t * fps | |
| if mode == 'out': | |
| i = total_frames - i | |
| zoom = 1 + (i * ((0.1 * speed) / total_frames)) | |
| positions = {'center': [(w - (w * zoom)) / 2, (h - (h * zoom)) / 2], | |
| 'left': [0, (h - (h * zoom)) / 2], | |
| 'right': [(w - (w * zoom)), (h - (h * zoom)) / 2], | |
| 'top': [(w - (w * zoom)) / 2, 0], | |
| 'topleft': [0, 0], | |
| 'topright': [(w - (w * zoom)), 0], | |
| 'bottom': [(w - (w * zoom)) / 2, (h - (h * zoom))], | |
| 'bottomleft': [0, (h - (h * zoom))], | |
| 'bottomright': [(w - (w * zoom)), (h - (h * zoom))]} | |
| tx, ty = positions[position] | |
| M = np.array([[zoom, 0, tx], [0, zoom, ty]]) | |
| frame = cv2.warpAffine(frame, M, (w, h)) | |
| return frame | |
| return clip.fl(main) | |
| def add_move_effect(clip, direction="left", move_raito=0.95): | |
| orig_width = clip.size[0] | |
| orig_height = clip.size[1] | |
| new_width = int(orig_width / move_raito) | |
| new_height = int(orig_height / move_raito) | |
| clip = clip.resize(width=new_width, height=new_height) | |
| if direction == "left": | |
| start_position = (0, 0) | |
| end_position = (orig_width - new_width, 0) | |
| elif direction == "right": | |
| start_position = (orig_width - new_width, 0) | |
| end_position = (0, 0) | |
| duration = clip.duration | |
| moving_clip = clip.set_position( | |
| lambda t: (start_position[0] + ( | |
| end_position[0] - start_position[0]) / duration * t, start_position[1]) | |
| ) | |
| final_clip = CompositeVideoClip([moving_clip], size=(orig_width, orig_height)) | |
| return final_clip | |
| def add_slide_effect(clips, slide_duration): | |
| ####### CAUTION: requires at least `slide_duration` of silence at the end of each clip ####### | |
| durations = [clip.duration for clip in clips] | |
| first_clip = CompositeVideoClip( | |
| [clips[0].fx(transfx.slide_out, duration=slide_duration, side="left")] | |
| ).set_start(0) | |
| slide_out_sides = ["left"] | |
| videos = [first_clip] | |
| out_to_in_mapping = {"left": "right", "right": "left"} | |
| for idx, clip in enumerate(clips[1: -1], start=1): | |
| # For all other clips in the middle, we need them to slide in to the previous clip and out for the next one | |
| # determine `slide_in_side` according to the `slide_out_side` of the previous clip | |
| slide_in_side = out_to_in_mapping[slide_out_sides[-1]] | |
| slide_out_side = "left" if random.random() <= 0.5 else "right" | |
| slide_out_sides.append(slide_out_side) | |
| videos.append( | |
| ( | |
| CompositeVideoClip( | |
| [clip.fx(transfx.slide_in, duration=slide_duration, side=slide_in_side)] | |
| ) | |
| .set_start(sum(durations[:idx]) - (slide_duration) * idx) | |
| .fx(transfx.slide_out, duration=slide_duration, side=slide_out_side) | |
| ) | |
| ) | |
| last_clip = CompositeVideoClip( | |
| [clips[-1].fx(transfx.slide_in, duration=slide_duration, side=out_to_in_mapping[slide_out_sides[-1]])] | |
| ).set_start(sum(durations[:-1]) - slide_duration * (len(clips) - 1)) | |
| videos.append(last_clip) | |
| video = CompositeVideoClip(videos) | |
| return video | |
| def compose_video(story_dir: Union[str, Path], | |
| save_path: Union[str, Path], | |
| captions: List, | |
| music_path: Union[str, Path], | |
| num_pages: int, | |
| fps: int = 10, | |
| audio_sample_rate: int = 16000, | |
| audio_codec: str = "mp3", | |
| caption_config: dict = {}, | |
| max_single_caption_length: int = 30, | |
| fade_duration: float = 1.0, | |
| slide_duration: float = 0.4, | |
| zoom_speed: float = 0.5, | |
| move_ratio: float = 0.95, | |
| sound_volume: float = 0.2, | |
| music_volume: float = 0.2, | |
| bg_speech_ratio: float = 0.4): | |
| if not isinstance(story_dir, Path): | |
| story_dir = Path(story_dir) | |
| sound_dir = story_dir / "sound" | |
| image_dir = story_dir / "image" | |
| speech_dir = story_dir / "speech" | |
| video_clips = [] | |
| # audio_durations = [] | |
| cur_duration = 0 | |
| timestamps = [] | |
| for page in trange(1, num_pages + 1): | |
| ##### speech track | |
| slide_silence = AudioArrayClip(np.zeros((int(audio_sample_rate * slide_duration), 2)), fps=audio_sample_rate) | |
| fade_silence = AudioArrayClip(np.zeros((int(audio_sample_rate * fade_duration), 2)), fps=audio_sample_rate) | |
| if (speech_dir / f"p{page}.wav").exists(): # single speech file | |
| single_utterance = True | |
| speech_file = (speech_dir / f"./p{page}.wav").__str__() | |
| speech_clip = AudioFileClip(speech_file, fps=audio_sample_rate) | |
| # speech_clip = speech_clip.audio_fadein(fade_duration) | |
| speech_clip = concatenate_audioclips([fade_silence, speech_clip, fade_silence]) | |
| else: # multiple speech files | |
| single_utterance = False | |
| speech_files = list(speech_dir.glob(f"p{page}_*.wav")) | |
| speech_files = sorted(speech_files, key=lambda x: int(x.stem.split("_")[-1])) | |
| speech_clips = [] | |
| for utt_idx, speech_file in enumerate(speech_files): | |
| speech_clip = AudioFileClip(speech_file.__str__(), fps=audio_sample_rate) | |
| # add multiple timestamps of the same speech clip | |
| if utt_idx == 0: | |
| timestamps.append([cur_duration + fade_duration, | |
| cur_duration + fade_duration + speech_clip.duration]) | |
| cur_duration += speech_clip.duration + fade_duration | |
| elif utt_idx == len(speech_files) - 1: | |
| timestamps.append([ | |
| cur_duration, | |
| cur_duration + speech_clip.duration | |
| ]) | |
| cur_duration += speech_clip.duration + fade_duration + slide_duration | |
| else: | |
| timestamps.append([ | |
| cur_duration, | |
| cur_duration + speech_clip.duration | |
| ]) | |
| cur_duration += speech_clip.duration | |
| speech_clips.append(speech_clip) | |
| speech_clip = concatenate_audioclips([fade_silence] + speech_clips + [fade_silence]) | |
| speech_file = speech_files[0] # for energy calculation | |
| # add slide silence | |
| if page == 1: | |
| speech_clip = concatenate_audioclips([speech_clip, slide_silence]) | |
| else: | |
| speech_clip = concatenate_audioclips([slide_silence, speech_clip, slide_silence]) | |
| # add the timestamp of the whole clip as a single element | |
| if single_utterance: | |
| if page == 1: | |
| timestamps.append([cur_duration + fade_duration, | |
| cur_duration + speech_clip.duration - fade_duration - slide_duration]) | |
| cur_duration += speech_clip.duration - slide_duration | |
| else: | |
| timestamps.append([cur_duration + fade_duration + slide_duration, | |
| cur_duration + speech_clip.duration - fade_duration - slide_duration]) | |
| cur_duration += speech_clip.duration - slide_duration | |
| speech_array, _ = librosa.core.load(speech_file, sr=None) | |
| speech_rms = librosa.feature.rms(y=speech_array)[0].mean() | |
| # set image as the main content, align the duration | |
| image_file = (image_dir / f"./p{page}.png").__str__() | |
| image_clip = ImageClip(image_file) | |
| image_clip = image_clip.set_duration(speech_clip.duration).set_fps(fps) | |
| image_clip = image_clip.crossfadein(fade_duration).crossfadeout(fade_duration) | |
| if random.random() <= 0.5: # zoom in or zoom out | |
| if random.random() <= 0.5: | |
| zoom_mode = "in" | |
| else: | |
| zoom_mode = "out" | |
| image_clip = add_zoom_effect(image_clip, zoom_speed, zoom_mode) | |
| else: # move left or right | |
| if random.random() <= 0.5: | |
| direction = "left" | |
| else: | |
| direction = "right" | |
| image_clip = add_move_effect(image_clip, direction=direction, move_raito=move_ratio) | |
| # sound track | |
| sound_file = sound_dir / f"p{page}.wav" | |
| if sound_file.exists(): | |
| sound_clip = AudioFileClip(sound_file.__str__(), fps=audio_sample_rate) | |
| sound_clip = sound_clip.audio_fadein(fade_duration) | |
| if sound_clip.duration < speech_clip.duration: | |
| sound_clip = audio_loop(sound_clip, duration=speech_clip.duration) | |
| else: | |
| sound_clip = sound_clip.subclip(0, speech_clip.duration) | |
| sound_array, _ = librosa.core.load(sound_file.__str__(), sr=None) | |
| sound_rms = librosa.feature.rms(y=sound_array)[0].mean() | |
| ratio = speech_rms / sound_rms * bg_speech_ratio | |
| audio_clip = CompositeAudioClip([speech_clip, sound_clip.volumex(sound_volume * ratio).audio_fadeout(fade_duration)]) | |
| else: | |
| audio_clip = speech_clip | |
| video_clip = image_clip.set_audio(audio_clip) | |
| video_clips.append(video_clip) | |
| # audio_durations.append(audio_clip.duration) | |
| # final_clip = concatenate_videoclips(video_clips, method="compose") | |
| composite_clip = add_slide_effect(video_clips, slide_duration=slide_duration) | |
| composite_clip = add_bottom_black_area(composite_clip, black_area_height=caption_config["area_height"]) | |
| del caption_config["area_height"] | |
| composite_clip = add_caption( | |
| captions, | |
| story_dir / "captions.srt", | |
| timestamps, | |
| composite_clip, | |
| max_single_caption_length, | |
| **caption_config | |
| ) | |
| # add music track, align the duration | |
| music_clip = AudioFileClip(music_path.__str__(), fps=audio_sample_rate) | |
| music_array, _ = librosa.core.load(music_path.__str__(), sr=None) | |
| music_rms = librosa.feature.rms(y=music_array)[0].mean() | |
| ratio = speech_rms / music_rms * bg_speech_ratio | |
| if music_clip.duration < composite_clip.duration: | |
| music_clip = audio_loop(music_clip, duration=composite_clip.duration) | |
| else: | |
| music_clip = music_clip.subclip(0, composite_clip.duration) | |
| all_audio_clip = CompositeAudioClip([composite_clip.audio, music_clip.volumex(music_volume * ratio)]) | |
| composite_clip = composite_clip.set_audio(all_audio_clip) | |
| composite_clip.write_videofile(save_path.__str__(), | |
| audio_fps=audio_sample_rate, | |
| audio_codec=audio_codec,) | |
| class VideoComposeAgent: | |
| def adjust_caption_config(self, width, height): | |
| area_height = int(height * 0.06) | |
| fontsize = int((width + height) / 2 * 0.025) | |
| return { | |
| "fontsize": fontsize, | |
| "area_height": area_height | |
| } | |
| def call(self, pages, config): | |
| height = config["image_generation"]["obj_cfg"]["height"] | |
| width = config["image_generation"]["obj_cfg"]["width"] | |
| config["caption_config"].update(self.adjust_caption_config(width, height)) | |
| compose_video( | |
| story_dir=Path(config["story_dir"]), | |
| save_path=Path(config["story_dir"]) / "output.mp4", | |
| captions=pages, | |
| music_path=Path(config["story_dir"]) / "music/music.wav", | |
| num_pages=len(pages), | |
| audio_sample_rate=config["audio_sample_rate"], | |
| audio_codec=config["audio_codec"], | |
| caption_config=config["caption_config"], | |
| max_single_caption_length=config["max_single_caption_length"], | |
| **config["slideshow_effect"] | |
| ) |