Spaces:
Runtime error
Runtime error
| import glob | |
| import itertools | |
| import os | |
| import random | |
| import gc | |
| import shutil | |
| from typing import List | |
| from loguru import logger | |
| from moviepy import ( | |
| AudioFileClip, | |
| ColorClip, | |
| CompositeAudioClip, | |
| CompositeVideoClip, | |
| ImageClip, | |
| TextClip, | |
| VideoFileClip, | |
| afx, | |
| concatenate_videoclips, | |
| ) | |
| from moviepy.video.tools.subtitles import SubtitlesClip | |
| from PIL import ImageFont | |
| from app.models import const | |
| from app.models.schema import ( | |
| MaterialInfo, | |
| VideoAspect, | |
| VideoConcatMode, | |
| VideoParams, | |
| VideoTransitionMode, | |
| ) | |
| from app.services.utils import video_effects | |
| from app.utils import utils | |
| class SubClippedVideoClip: | |
| def __init__(self, file_path, start_time=None, end_time=None, width=None, height=None, duration=None): | |
| self.file_path = file_path | |
| self.start_time = start_time | |
| self.end_time = end_time | |
| self.width = width | |
| self.height = height | |
| if duration is None: | |
| self.duration = end_time - start_time | |
| else: | |
| self.duration = duration | |
| def __str__(self): | |
| return f"SubClippedVideoClip(file_path={self.file_path}, start_time={self.start_time}, end_time={self.end_time}, duration={self.duration}, width={self.width}, height={self.height})" | |
| audio_codec = "aac" | |
| video_codec = "libx264" | |
| fps = 30 | |
| def close_clip(clip): | |
| if clip is None: | |
| return | |
| try: | |
| # close main resources | |
| if hasattr(clip, 'reader') and clip.reader is not None: | |
| clip.reader.close() | |
| # close audio resources | |
| if hasattr(clip, 'audio') and clip.audio is not None: | |
| if hasattr(clip.audio, 'reader') and clip.audio.reader is not None: | |
| clip.audio.reader.close() | |
| del clip.audio | |
| # close mask resources | |
| if hasattr(clip, 'mask') and clip.mask is not None: | |
| if hasattr(clip.mask, 'reader') and clip.mask.reader is not None: | |
| clip.mask.reader.close() | |
| del clip.mask | |
| # handle child clips in composite clips | |
| if hasattr(clip, 'clips') and clip.clips: | |
| for child_clip in clip.clips: | |
| if child_clip is not clip: # avoid possible circular references | |
| close_clip(child_clip) | |
| # clear clip list | |
| if hasattr(clip, 'clips'): | |
| clip.clips = [] | |
| except Exception as e: | |
| logger.error(f"failed to close clip: {str(e)}") | |
| del clip | |
| gc.collect() | |
| def delete_files(files: List[str] | str): | |
| if isinstance(files, str): | |
| files = [files] | |
| for file in files: | |
| try: | |
| os.remove(file) | |
| except: | |
| pass | |
| def get_bgm_file(bgm_type: str = "random", bgm_file: str = ""): | |
| if not bgm_type: | |
| return "" | |
| if bgm_file and os.path.exists(bgm_file): | |
| return bgm_file | |
| if bgm_type == "random": | |
| suffix = "*.mp3" | |
| song_dir = utils.song_dir() | |
| files = glob.glob(os.path.join(song_dir, suffix)) | |
| return random.choice(files) | |
| return "" | |
| def combine_videos( | |
| combined_video_path: str, | |
| video_paths: List[str], | |
| audio_file: str, | |
| video_aspect: VideoAspect = VideoAspect.portrait, | |
| video_concat_mode: VideoConcatMode = VideoConcatMode.random, | |
| video_transition_mode: VideoTransitionMode = None, | |
| max_clip_duration: int = 5, | |
| threads: int = 2, | |
| ) -> str: | |
| audio_clip = AudioFileClip(audio_file) | |
| audio_duration = audio_clip.duration | |
| logger.info(f"audio duration: {audio_duration} seconds") | |
| # Required duration of each clip | |
| req_dur = audio_duration / len(video_paths) | |
| req_dur = max_clip_duration | |
| logger.info(f"maximum clip duration: {req_dur} seconds") | |
| output_dir = os.path.dirname(combined_video_path) | |
| aspect = VideoAspect(video_aspect) | |
| video_width, video_height = aspect.to_resolution() | |
| processed_clips = [] | |
| subclipped_items = [] | |
| video_duration = 0 | |
| for video_path in video_paths: | |
| clip = VideoFileClip(video_path) | |
| clip_duration = clip.duration | |
| clip_w, clip_h = clip.size | |
| close_clip(clip) | |
| start_time = 0 | |
| while start_time < clip_duration: | |
| end_time = min(start_time + max_clip_duration, clip_duration) | |
| if clip_duration - start_time >= max_clip_duration: | |
| subclipped_items.append(SubClippedVideoClip(file_path= video_path, start_time=start_time, end_time=end_time, width=clip_w, height=clip_h)) | |
| start_time = end_time | |
| if video_concat_mode.value == VideoConcatMode.sequential.value: | |
| break | |
| # random subclipped_items order | |
| if video_concat_mode.value == VideoConcatMode.random.value: | |
| random.shuffle(subclipped_items) | |
| logger.debug(f"total subclipped items: {len(subclipped_items)}") | |
| # Add downloaded clips over and over until the duration of the audio (max_duration) has been reached | |
| for i, subclipped_item in enumerate(subclipped_items): | |
| if video_duration > audio_duration: | |
| break | |
| logger.debug(f"processing clip {i+1}: {subclipped_item.width}x{subclipped_item.height}, current duration: {video_duration:.2f}s, remaining: {audio_duration - video_duration:.2f}s") | |
| try: | |
| clip = VideoFileClip(subclipped_item.file_path).subclipped(subclipped_item.start_time, subclipped_item.end_time) | |
| clip_duration = clip.duration | |
| # Not all videos are same size, so we need to resize them | |
| clip_w, clip_h = clip.size | |
| if clip_w != video_width or clip_h != video_height: | |
| clip_ratio = clip.w / clip.h | |
| video_ratio = video_width / video_height | |
| logger.debug(f"resizing clip, source: {clip_w}x{clip_h}, ratio: {clip_ratio:.2f}, target: {video_width}x{video_height}, ratio: {video_ratio:.2f}") | |
| if clip_ratio == video_ratio: | |
| clip = clip.resized(new_size=(video_width, video_height)) | |
| else: | |
| if clip_ratio > video_ratio: | |
| scale_factor = video_width / clip_w | |
| else: | |
| scale_factor = video_height / clip_h | |
| new_width = int(clip_w * scale_factor) | |
| new_height = int(clip_h * scale_factor) | |
| background = ColorClip(size=(video_width, video_height), color=(0, 0, 0)).with_duration(clip_duration) | |
| clip_resized = clip.resized(new_size=(new_width, new_height)).with_position("center") | |
| clip = CompositeVideoClip([background, clip_resized]) | |
| shuffle_side = random.choice(["left", "right", "top", "bottom"]) | |
| if video_transition_mode.value == VideoTransitionMode.none.value: | |
| clip = clip | |
| elif video_transition_mode.value == VideoTransitionMode.fade_in.value: | |
| clip = video_effects.fadein_transition(clip, 1) | |
| elif video_transition_mode.value == VideoTransitionMode.fade_out.value: | |
| clip = video_effects.fadeout_transition(clip, 1) | |
| elif video_transition_mode.value == VideoTransitionMode.slide_in.value: | |
| clip = video_effects.slidein_transition(clip, 1, shuffle_side) | |
| elif video_transition_mode.value == VideoTransitionMode.slide_out.value: | |
| clip = video_effects.slideout_transition(clip, 1, shuffle_side) | |
| elif video_transition_mode.value == VideoTransitionMode.shuffle.value: | |
| transition_funcs = [ | |
| lambda c: video_effects.fadein_transition(c, 1), | |
| lambda c: video_effects.fadeout_transition(c, 1), | |
| lambda c: video_effects.slidein_transition(c, 1, shuffle_side), | |
| lambda c: video_effects.slideout_transition(c, 1, shuffle_side), | |
| ] | |
| shuffle_transition = random.choice(transition_funcs) | |
| clip = shuffle_transition(clip) | |
| if clip.duration > max_clip_duration: | |
| clip = clip.subclipped(0, max_clip_duration) | |
| # wirte clip to temp file | |
| clip_file = f"{output_dir}/temp-clip-{i+1}.mp4" | |
| clip.write_videofile(clip_file, logger=None, fps=fps, codec=video_codec) | |
| close_clip(clip) | |
| processed_clips.append(SubClippedVideoClip(file_path=clip_file, duration=clip.duration, width=clip_w, height=clip_h)) | |
| video_duration += clip.duration | |
| except Exception as e: | |
| logger.error(f"failed to process clip: {str(e)}") | |
| # loop processed clips until the video duration matches or exceeds the audio duration. | |
| if video_duration < audio_duration: | |
| logger.warning(f"video duration ({video_duration:.2f}s) is shorter than audio duration ({audio_duration:.2f}s), looping clips to match audio length.") | |
| base_clips = processed_clips.copy() | |
| for clip in itertools.cycle(base_clips): | |
| if video_duration >= audio_duration: | |
| break | |
| processed_clips.append(clip) | |
| video_duration += clip.duration | |
| logger.info(f"video duration: {video_duration:.2f}s, audio duration: {audio_duration:.2f}s, looped {len(processed_clips)-len(base_clips)} clips") | |
| # merge video clips progressively, avoid loading all videos at once to avoid memory overflow | |
| logger.info("starting clip merging process") | |
| if not processed_clips: | |
| logger.warning("no clips available for merging") | |
| return combined_video_path | |
| # if there is only one clip, use it directly | |
| if len(processed_clips) == 1: | |
| logger.info("using single clip directly") | |
| shutil.copy(processed_clips[0].file_path, combined_video_path) | |
| delete_files(processed_clips) | |
| logger.info("video combining completed") | |
| return combined_video_path | |
| # create initial video file as base | |
| base_clip_path = processed_clips[0].file_path | |
| temp_merged_video = f"{output_dir}/temp-merged-video.mp4" | |
| temp_merged_next = f"{output_dir}/temp-merged-next.mp4" | |
| # copy first clip as initial merged video | |
| shutil.copy(base_clip_path, temp_merged_video) | |
| # merge remaining video clips one by one | |
| for i, clip in enumerate(processed_clips[1:], 1): | |
| logger.info(f"merging clip {i}/{len(processed_clips)-1}, duration: {clip.duration:.2f}s") | |
| try: | |
| # load current base video and next clip to merge | |
| base_clip = VideoFileClip(temp_merged_video) | |
| next_clip = VideoFileClip(clip.file_path) | |
| # merge these two clips | |
| merged_clip = concatenate_videoclips([base_clip, next_clip]) | |
| # save merged result to temp file | |
| merged_clip.write_videofile( | |
| filename=temp_merged_next, | |
| threads=threads, | |
| logger=None, | |
| temp_audiofile_path=output_dir, | |
| audio_codec=audio_codec, | |
| fps=fps, | |
| ) | |
| close_clip(base_clip) | |
| close_clip(next_clip) | |
| close_clip(merged_clip) | |
| # replace base file with new merged file | |
| delete_files(temp_merged_video) | |
| os.rename(temp_merged_next, temp_merged_video) | |
| except Exception as e: | |
| logger.error(f"failed to merge clip: {str(e)}") | |
| continue | |
| # after merging, rename final result to target file name | |
| os.rename(temp_merged_video, combined_video_path) | |
| # clean temp files | |
| clip_files = [clip.file_path for clip in processed_clips] | |
| delete_files(clip_files) | |
| logger.info("video combining completed") | |
| return combined_video_path | |
| def wrap_text(text, max_width, font="Arial", fontsize=60): | |
| # Create ImageFont | |
| font = ImageFont.truetype(font, fontsize) | |
| def get_text_size(inner_text): | |
| inner_text = inner_text.strip() | |
| left, top, right, bottom = font.getbbox(inner_text) | |
| return right - left, bottom - top | |
| width, height = get_text_size(text) | |
| if width <= max_width: | |
| return text, height | |
| processed = True | |
| _wrapped_lines_ = [] | |
| words = text.split(" ") | |
| _txt_ = "" | |
| for word in words: | |
| _before = _txt_ | |
| _txt_ += f"{word} " | |
| _width, _height = get_text_size(_txt_) | |
| if _width <= max_width: | |
| continue | |
| else: | |
| if _txt_.strip() == word.strip(): | |
| processed = False | |
| break | |
| _wrapped_lines_.append(_before) | |
| _txt_ = f"{word} " | |
| _wrapped_lines_.append(_txt_) | |
| if processed: | |
| _wrapped_lines_ = [line.strip() for line in _wrapped_lines_] | |
| result = "\n".join(_wrapped_lines_).strip() | |
| height = len(_wrapped_lines_) * height | |
| return result, height | |
| _wrapped_lines_ = [] | |
| chars = list(text) | |
| _txt_ = "" | |
| for word in chars: | |
| _txt_ += word | |
| _width, _height = get_text_size(_txt_) | |
| if _width <= max_width: | |
| continue | |
| else: | |
| _wrapped_lines_.append(_txt_) | |
| _txt_ = "" | |
| _wrapped_lines_.append(_txt_) | |
| result = "\n".join(_wrapped_lines_).strip() | |
| height = len(_wrapped_lines_) * height | |
| return result, height | |
| def generate_video( | |
| video_path: str, | |
| audio_path: str, | |
| subtitle_path: str, | |
| output_file: str, | |
| params: VideoParams, | |
| ): | |
| aspect = VideoAspect(params.video_aspect) | |
| video_width, video_height = aspect.to_resolution() | |
| logger.info(f"generating video: {video_width} x {video_height}") | |
| logger.info(f" ① video: {video_path}") | |
| logger.info(f" ② audio: {audio_path}") | |
| logger.info(f" ③ subtitle: {subtitle_path}") | |
| logger.info(f" ④ output: {output_file}") | |
| # https://github.com/harry0703/MoneyPrinterTurbo/issues/217 | |
| # PermissionError: [WinError 32] The process cannot access the file because it is being used by another process: 'final-1.mp4.tempTEMP_MPY_wvf_snd.mp3' | |
| # write into the same directory as the output file | |
| output_dir = os.path.dirname(output_file) | |
| font_path = "" | |
| if params.subtitle_enabled: | |
| if not params.font_name: | |
| params.font_name = "STHeitiMedium.ttc" | |
| font_path = os.path.join(utils.font_dir(), params.font_name) | |
| if os.name == "nt": | |
| font_path = font_path.replace("\\", "/") | |
| logger.info(f" ⑤ font: {font_path}") | |
| def create_text_clip(subtitle_item): | |
| params.font_size = int(params.font_size) | |
| params.stroke_width = int(params.stroke_width) | |
| phrase = subtitle_item[1] | |
| max_width = video_width * 0.9 | |
| wrapped_txt, txt_height = wrap_text( | |
| phrase, max_width=max_width, font=font_path, fontsize=params.font_size | |
| ) | |
| interline = int(params.font_size * 0.25) | |
| size=(int(max_width), int(txt_height + params.font_size * 0.25 + (interline * (wrapped_txt.count("\n") + 1)))) | |
| _clip = TextClip( | |
| text=wrapped_txt, | |
| font=font_path, | |
| font_size=params.font_size, | |
| color=params.text_fore_color, | |
| bg_color=params.text_background_color, | |
| stroke_color=params.stroke_color, | |
| stroke_width=params.stroke_width, | |
| # interline=interline, | |
| # size=size, | |
| ) | |
| duration = subtitle_item[0][1] - subtitle_item[0][0] | |
| _clip = _clip.with_start(subtitle_item[0][0]) | |
| _clip = _clip.with_end(subtitle_item[0][1]) | |
| _clip = _clip.with_duration(duration) | |
| if params.subtitle_position == "bottom": | |
| _clip = _clip.with_position(("center", video_height * 0.95 - _clip.h)) | |
| elif params.subtitle_position == "top": | |
| _clip = _clip.with_position(("center", video_height * 0.05)) | |
| elif params.subtitle_position == "custom": | |
| # Ensure the subtitle is fully within the screen bounds | |
| margin = 10 # Additional margin, in pixels | |
| max_y = video_height - _clip.h - margin | |
| min_y = margin | |
| custom_y = (video_height - _clip.h) * (params.custom_position / 100) | |
| custom_y = max( | |
| min_y, min(custom_y, max_y) | |
| ) # Constrain the y value within the valid range | |
| _clip = _clip.with_position(("center", custom_y)) | |
| else: # center | |
| _clip = _clip.with_position(("center", "center")) | |
| return _clip | |
| video_clip = VideoFileClip(video_path).without_audio() | |
| audio_clip = AudioFileClip(audio_path).with_effects( | |
| [afx.MultiplyVolume(params.voice_volume)] | |
| ) | |
| def make_textclip(text): | |
| return TextClip( | |
| text=text, | |
| font=font_path, | |
| font_size=params.font_size, | |
| ) | |
| if subtitle_path and os.path.exists(subtitle_path): | |
| sub = SubtitlesClip( | |
| subtitles=subtitle_path, encoding="utf-8", make_textclip=make_textclip | |
| ) | |
| text_clips = [] | |
| for item in sub.subtitles: | |
| clip = create_text_clip(subtitle_item=item) | |
| text_clips.append(clip) | |
| video_clip = CompositeVideoClip([video_clip, *text_clips]) | |
| bgm_file = get_bgm_file(bgm_type=params.bgm_type, bgm_file=params.bgm_file) | |
| if bgm_file: | |
| try: | |
| bgm_clip = AudioFileClip(bgm_file).with_effects( | |
| [ | |
| afx.MultiplyVolume(params.bgm_volume), | |
| afx.AudioFadeOut(3), | |
| afx.AudioLoop(duration=video_clip.duration), | |
| ] | |
| ) | |
| audio_clip = CompositeAudioClip([audio_clip, bgm_clip]) | |
| except Exception as e: | |
| logger.error(f"failed to add bgm: {str(e)}") | |
| video_clip = video_clip.with_audio(audio_clip) | |
| video_clip.write_videofile( | |
| output_file, | |
| audio_codec=audio_codec, | |
| temp_audiofile_path=output_dir, | |
| threads=params.n_threads or 2, | |
| logger=None, | |
| fps=fps, | |
| ) | |
| video_clip.close() | |
| del video_clip | |
| def preprocess_video(materials: List[MaterialInfo], clip_duration=4): | |
| for material in materials: | |
| if not material.url: | |
| continue | |
| ext = utils.parse_extension(material.url) | |
| try: | |
| clip = VideoFileClip(material.url) | |
| except Exception: | |
| clip = ImageClip(material.url) | |
| width = clip.size[0] | |
| height = clip.size[1] | |
| if width < 480 or height < 480: | |
| logger.warning(f"low resolution material: {width}x{height}, minimum 480x480 required") | |
| continue | |
| if ext in const.FILE_TYPE_IMAGES: | |
| logger.info(f"processing image: {material.url}") | |
| # Create an image clip and set its duration to 3 seconds | |
| clip = ( | |
| ImageClip(material.url) | |
| .with_duration(clip_duration) | |
| .with_position("center") | |
| ) | |
| # Apply a zoom effect using the resize method. | |
| # A lambda function is used to make the zoom effect dynamic over time. | |
| # The zoom effect starts from the original size and gradually scales up to 120%. | |
| # t represents the current time, and clip.duration is the total duration of the clip (3 seconds). | |
| # Note: 1 represents 100% size, so 1.2 represents 120% size. | |
| zoom_clip = clip.resized( | |
| lambda t: 1 + (clip_duration * 0.03) * (t / clip.duration) | |
| ) | |
| # Optionally, create a composite video clip containing the zoomed clip. | |
| # This is useful when you want to add other elements to the video. | |
| final_clip = CompositeVideoClip([zoom_clip]) | |
| # Output the video to a file. | |
| video_file = f"{material.url}.mp4" | |
| final_clip.write_videofile(video_file, fps=30, logger=None) | |
| close_clip(clip) | |
| material.url = video_file | |
| logger.success(f"image processed: {video_file}") | |
| return materials |