Spaces:
Sleeping
Sleeping
| from video.media import MediaUtils | |
| import time | |
| from loguru import logger | |
| class VideoBuilder: | |
| """ | |
| Builder class for constructing FFmpeg video commands with a fluent interface. | |
| """ | |
| def __init__(self, dimensions: tuple[int, int], ffmpeg_path="ffmpeg"): | |
| if not isinstance(dimensions, tuple) or len(dimensions) != 2: | |
| raise ValueError("Dimensions must be a tuple of (width, height).") | |
| self.width, self.height = dimensions | |
| self.ffmpeg_path = ffmpeg_path | |
| # Components | |
| self.background = None | |
| self.audio_file = None | |
| self.captions = None | |
| self.output_path = "output.mp4" | |
| # Internal state | |
| self.media_utils = None | |
| def set_media_utils(self, media_utils: MediaUtils): | |
| """Set the media manager for duration calculations.""" | |
| self.media_utils = media_utils | |
| return self | |
| def set_background_image(self, file_path: str, effect_config: dict = None): | |
| """Set background as an image with optional visual effects. | |
| Args: | |
| file_path: Path to the image file | |
| effect_config: Configuration for visual effects. Supported effects: | |
| - Ken Burns (zoom): {"effect": "ken_burns", "zoom_factor": 0.001, "direction": "zoom-to-top-left"} | |
| - Pan: {"effect": "pan", "direction": "left-to-right", "speed": "normal"} | |
| """ | |
| self.background = { | |
| "type": "image", | |
| "file": file_path, | |
| "effect_config": effect_config or {"effect": "ken_burns"}, # Default to Ken Burns for backward compatibility | |
| } | |
| return self | |
| def set_background_video(self, file_path: str): | |
| """Set background as a video file.""" | |
| self.background = {"type": "video", "file": file_path} | |
| return self | |
| def set_audio(self, file_path: str): | |
| """Set audio file.""" | |
| self.audio_file = file_path | |
| return self | |
| def set_captions( | |
| self, | |
| file_path: str = None, | |
| config: dict = None, | |
| ): | |
| """Set caption subtitles | |
| Args: | |
| file_path: Path to subtitle file | |
| config: Optional configuration dict | |
| """ | |
| self.captions = { | |
| "file": file_path, | |
| **(config or {}), | |
| } | |
| return self | |
| def set_output_path(self, output_path: str): | |
| """Set output file path.""" | |
| self.output_path = output_path | |
| return self | |
| def build_command(self): | |
| """Build the complete FFmpeg command.""" | |
| if not self.background: | |
| raise ValueError("Background must be set (image or video).") | |
| if not self.audio_file and not self.captions: | |
| raise ValueError( | |
| "At least one of audio_file, or captions must be provided." | |
| ) | |
| # Validate combinations | |
| if self.background["type"] == "image" and not self.audio_file: | |
| raise ValueError("Audio file must be provided if background is an image.") | |
| if ( | |
| self.background["type"] == "video" | |
| and not self.audio_file | |
| and self.captions is None | |
| ): | |
| raise ValueError( | |
| "Audio file or captions must be provided if background is a video." | |
| ) | |
| # Get audio duration if audio file is provided | |
| audio_duration = None | |
| if self.audio_file: | |
| if not self.media_utils: | |
| raise ValueError( | |
| "Media manager must be set to determine audio duration." | |
| ) | |
| media_info = self.media_utils.get_audio_info(self.audio_file) | |
| audio_duration = media_info.get("duration") | |
| if not audio_duration: | |
| raise ValueError("Could not determine audio duration") | |
| # Build command | |
| cmd = [self.ffmpeg_path, "-y"] | |
| filter_parts = [] | |
| input_index = 0 | |
| # Add background input | |
| if self.background["type"] == "image": | |
| cmd.extend( | |
| ["-loop", "1", "-t", str(audio_duration), "-i", self.background["file"]] | |
| ) | |
| # Get effect configuration with backward compatibility | |
| effect_config = self.background.get("effect_config", {"effect": "ken_burns"}) | |
| # Handle backward compatibility for old ken_burns config | |
| if "ken_burns" in self.background and "effect_config" not in self.background: | |
| # Old format: {"ken_burns": {"zoom_factor": 0.001, "direction": "zoom-to-top-left"}} | |
| old_ken_burns = self.background.get("ken_burns", {}) | |
| effect_config = { | |
| "effect": "ken_burns", | |
| "zoom_factor": old_ken_burns.get("zoom_factor", 0.001), | |
| "direction": old_ken_burns.get("direction", "zoom-to-top-left") | |
| } | |
| effect_type = effect_config.get("effect", "ken_burns") | |
| fps = 25 | |
| duration_frames = int(audio_duration * fps) | |
| if effect_type == "ken_burns": | |
| # Ken Burns (zoom) effect | |
| zoom_factor = effect_config.get("zoom_factor", 0.001) | |
| direction = effect_config.get("direction", "zoom-to-top-left") | |
| # todo without upscaling we can't use the top and center zooms. upscaling increases the render time | |
| zoom_expressions = { | |
| "zoom-to-top": f"z='zoom+{zoom_factor}':x=iw/2-(iw/zoom/2):y=0", | |
| "zoom-to-center": f"z='zoom+{zoom_factor}':x=iw/2-(iw/zoom/2):y=ih/2-(ih/zoom/2)", | |
| "zoom-to-top-left": f"z='zoom+{zoom_factor}':x=0:y=0", | |
| } | |
| zoom_expr = zoom_expressions.get(direction, zoom_expressions["zoom-to-top-left"]) | |
| zoompan_d = duration_frames + 1 | |
| filter_parts.append( | |
| f"[{input_index}]scale={self.width}:-2,setsar=1:1," | |
| f"crop={self.width}:{self.height}," | |
| f"zoompan={zoom_expr}:d={zoompan_d}:s={self.width}x{self.height}:fps={fps}[bg]" | |
| ) | |
| elif effect_type == "pan": | |
| # Pan effect - camera moves across the image | |
| direction = effect_config.get("direction", "left-to-right") | |
| speed = effect_config.get("speed", "normal") | |
| # Speed multipliers | |
| speed_multipliers = { | |
| "slow": 0.5, | |
| "normal": 1.0, | |
| "fast": 2.0 | |
| } | |
| speed_mult = speed_multipliers.get(speed, 1.0) | |
| # Calculate pan distance based on direction | |
| # We'll scale the image larger to allow for panning | |
| scale_factor = 1.3 # Scale image 30% larger to allow room for panning | |
| scaled_width = int(self.width * scale_factor) | |
| scaled_height = int(self.height * scale_factor) | |
| # Pan expressions for different directions | |
| if direction == "left-to-right": | |
| # Start from left, move to right | |
| start_x = 0 | |
| end_x = scaled_width - self.width | |
| start_y = (scaled_height - self.height) // 2 | |
| end_y = start_y | |
| elif direction == "right-to-left": | |
| # Start from right, move to left | |
| start_x = scaled_width - self.width | |
| end_x = 0 | |
| start_y = (scaled_height - self.height) // 2 | |
| end_y = start_y | |
| elif direction == "top-to-bottom": | |
| # Start from top, move to bottom | |
| start_x = (scaled_width - self.width) // 2 | |
| end_x = start_x | |
| start_y = 0 | |
| end_y = scaled_height - self.height | |
| elif direction == "bottom-to-top": | |
| # Start from bottom, move to top | |
| start_x = (scaled_width - self.width) // 2 | |
| end_x = start_x | |
| start_y = scaled_height - self.height | |
| end_y = 0 | |
| else: | |
| # Default to left-to-right | |
| start_x = 0 | |
| end_x = scaled_width - self.width | |
| start_y = (scaled_height - self.height) // 2 | |
| end_y = start_y | |
| # Create pan expression | |
| # Linear interpolation from start to end position over the duration | |
| pan_x_expr = f"{start_x}+({end_x}-{start_x})*t/{audio_duration}*{speed_mult}" | |
| pan_y_expr = f"{start_y}+({end_y}-{start_y})*t/{audio_duration}*{speed_mult}" | |
| filter_parts.append( | |
| f"[{input_index}]scale={scaled_width}:{scaled_height},setsar=1:1," | |
| f"crop={self.width}:{self.height}:{pan_x_expr}:{pan_y_expr}[bg]" | |
| ) | |
| else: | |
| # No effect, just scale and crop | |
| filter_parts.append( | |
| f"[{input_index}]scale={self.width}:{self.height},setsar=1:1[bg]" | |
| ) | |
| elif self.background["type"] == "video": | |
| if audio_duration: | |
| cmd.extend( | |
| [ | |
| "-stream_loop", | |
| "-1", | |
| "-t", | |
| str(audio_duration), | |
| "-i", | |
| self.background["file"], | |
| ] | |
| ) | |
| else: | |
| cmd.extend(["-i", self.background["file"]]) | |
| filter_parts.append(f"[{input_index}]scale={self.width}:{self.height}[bg]") | |
| input_index += 1 | |
| current_video = "[bg]" | |
| # Add audio input | |
| audio_input_index = None | |
| if self.audio_file: | |
| cmd.extend(["-i", self.audio_file]) | |
| audio_input_index = input_index | |
| input_index += 1 | |
| # Add subtitles or caption images if provided | |
| if self.captions: | |
| subtitle_file = self.captions.get("file") | |
| if subtitle_file: | |
| filter_parts.append(f"{current_video}subtitles={subtitle_file}[v]") | |
| current_video = "[v]" | |
| else: | |
| # Rename final video output | |
| if current_video == "[bg]": | |
| current_video = "[v]" | |
| filter_parts.append(f"[bg]copy[v]") | |
| # Build filter complex | |
| if filter_parts: | |
| cmd.extend(["-filter_complex", ";".join(filter_parts)]) | |
| # Map video and audio | |
| cmd.extend(["-map", current_video]) | |
| if audio_input_index is not None: | |
| cmd.extend(["-map", f"{audio_input_index}:a"]) | |
| # Video codec settings | |
| cmd.extend(["-c:v", "libx264", "-preset", "ultrafast"]) | |
| cmd.extend(["-crf", "23", "-pix_fmt", "yuv420p"]) | |
| # Audio codec settings | |
| if self.audio_file: | |
| cmd.extend(["-c:a", "aac", "-b:a", "192k"]) | |
| if audio_duration: | |
| cmd.extend(["-t", str(audio_duration)]) | |
| cmd.append(self.output_path) | |
| return cmd | |
| def execute(self): | |
| """Build and execute the FFmpeg command using MediaUtils for progress tracking.""" | |
| if not self.media_utils: | |
| logger.error("MediaUtils must be set before executing video build") | |
| return False | |
| start = time.time() | |
| context_logger = logger.bind( | |
| dimensions=(self.width, self.height), | |
| background_type=self.background.get("type") if self.background else None, | |
| has_audio=bool(self.audio_file), | |
| has_captions=bool(self.captions), | |
| output_path=self.output_path, | |
| youtube_channel="https://www.youtube.com/@aiagentsaz" | |
| ) | |
| try: | |
| context_logger.debug("building video with VideoBuilder") | |
| cmd = self.build_command() | |
| # Calculate expected duration for progress tracking | |
| expected_duration = None | |
| if self.audio_file: | |
| audio_info = self.media_utils.get_audio_info(self.audio_file) | |
| expected_duration = audio_info.get("duration") | |
| elif self.background and self.background.get("type") == "video": | |
| video_info = self.media_utils.get_video_info(self.background["file"]) | |
| expected_duration = video_info.get("duration") | |
| context_logger.bind( | |
| command=" ".join(cmd), | |
| expected_duration=expected_duration, | |
| ).debug("executing video build command") | |
| # Execute using MediaUtils for proper logging and progress tracking | |
| success = self.media_utils.execute_ffmpeg_command( | |
| cmd, | |
| "build video", | |
| expected_duration=expected_duration, | |
| show_progress=True, | |
| ) | |
| if success: | |
| context_logger.bind(execution_time=time.time() - start).info( | |
| "video built successfully" | |
| ) | |
| return True | |
| else: | |
| context_logger.error("failed to build video") | |
| return False | |
| except Exception as e: | |
| context_logger.bind(error=str(e), execution_time=time.time() - start).error( | |
| "error during video rendering" | |
| ) | |
| return False | |