import streamlit as st import os import tempfile from moviepy.editor import ImageSequenceClip, AudioFileClip, concatenate_videoclips, TextClip, CompositeVideoClip import numpy as np from concurrent.futures import ThreadPoolExecutor import time class VideoCreator: def __init__(self): # Ensure output directory exists os.makedirs("outputs", exist_ok=True) self.video_cache = {} self.aspect_ratio = "1:1" # Default aspect ratio self.max_segment_duration = 5.0 # Maximum duration for any segment in seconds def set_aspect_ratio(self, aspect_ratio): """Set the aspect ratio for video creation""" self.aspect_ratio = aspect_ratio def set_max_segment_duration(self, duration): """Set the maximum duration for any segment in seconds""" self.max_segment_duration = duration def get_video_dimensions(self, base_size=None): """Get video dimensions based on aspect ratio""" if base_size is None: # Default base sizes for different aspect ratios if self.aspect_ratio == "1:1": return (640, 640) # Square elif self.aspect_ratio == "16:9": return (854, 480) # Landscape HD elif self.aspect_ratio == "9:16": return (480, 854) # Portrait (mobile) else: return (640, 640) # Default square # Calculate dimensions based on base size and aspect ratio base_pixels = base_size[0] * base_size[1] if self.aspect_ratio == "1:1": # Square format side = int(np.sqrt(base_pixels)) # Ensure even dimensions for video compatibility side = side if side % 2 == 0 else side + 1 return (side, side) elif self.aspect_ratio == "16:9": # Landscape format width = int(np.sqrt(base_pixels * 16 / 9)) height = int(width * 9 / 16) # Ensure even dimensions for video compatibility width = width if width % 2 == 0 else width + 1 height = height if height % 2 == 0 else height + 1 return (width, height) elif self.aspect_ratio == "9:16": # Portrait format height = int(np.sqrt(base_pixels * 16 / 9)) width = int(height * 9 / 16) # Ensure even dimensions for video compatibility width = width if width % 2 == 0 else width + 1 height = height if height % 2 == 0 else height + 1 return (width, height) else: # Default to original size return base_size def create_segment_clip(self, frames, segment_duration, segment_text=None): """Create a video clip from frames with optional text overlay""" try: # Limit segment duration to max_segment_duration segment_duration = min(segment_duration, self.max_segment_duration) # Calculate frame duration based on segment duration frame_duration = segment_duration / len(frames) # Create a clip from the frames segment_clip = ImageSequenceClip(frames, durations=[frame_duration] * len(frames)) # Add text overlay if segment text is provided if segment_text: try: # Adjust text size and position based on aspect ratio fontsize = 24 position = ('center', 'bottom') if self.aspect_ratio == "9:16": # For portrait, make text smaller and position it lower fontsize = 20 position = ('center', 0.9) # 90% from top elif self.aspect_ratio == "16:9": # For landscape, position text at bottom position = ('center', 0.95) # 95% from top txt_clip = TextClip( segment_text, fontsize=fontsize, color='white', bg_color='rgba(0,0,0,0.5)', size=(segment_clip.w, None), method='caption' ).set_duration(segment_clip.duration) txt_clip = txt_clip.set_position(position) segment_clip = CompositeVideoClip([segment_clip, txt_clip]) except Exception as e: # If TextClip fails, continue without text overlay st.warning(f"Could not add text overlay: {str(e)}") return segment_clip except Exception as e: st.warning(f"Error creating segment clip: {str(e)}. Using fallback method.") # Fallback: Create a simple clip with the first frame try: # Use just the first frame if there's an issue with the sequence first_frame = frames[0] if frames else None if first_frame and os.path.exists(first_frame): segment_clip = ImageSequenceClip([first_frame], durations=[segment_duration]) return segment_clip else: # Create a blank clip if no frames are available from PIL import Image blank_img = Image.new('RGB', self.get_video_dimensions(), color=(0, 0, 0)) blank_path = tempfile.mktemp(suffix='.png') blank_img.save(blank_path) segment_clip = ImageSequenceClip([blank_path], durations=[segment_duration]) return segment_clip except Exception as inner_e: st.error(f"Critical error in fallback clip creation: {str(inner_e)}") # Last resort: Create an extremely simple clip from moviepy.editor import ColorClip return ColorClip(self.get_video_dimensions(), color=(0, 0, 0), duration=segment_duration) def create_video_from_frames(self, animated_frames, audio_file, segments=None, timestamps=None, output_dir="outputs", parallel=False, max_workers=4): """Create a video from animated frames synchronized with audio using parallel processing""" # Generate a cache key based on inputs import hashlib cache_key = f"{hashlib.md5(audio_file.getvalue()).hexdigest()}_{len(animated_frames)}_{self.aspect_ratio}_{self.max_segment_duration}" # Check if result is in cache if cache_key in self.video_cache: return self.video_cache[cache_key] # Save the uploaded audio to a temporary file with tempfile.NamedTemporaryFile(delete=False, suffix=".wav") as tmp_file: tmp_file.write(audio_file.getvalue()) audio_path = tmp_file.name try: # Load the audio to get its duration audio_clip = AudioFileClip(audio_path) total_duration = audio_clip.duration # Calculate segment durations if timestamps: # Use provided timestamps but limit to max_segment_duration segment_durations = [min(end - start, self.max_segment_duration) for start, end in timestamps] else: # Distribute evenly but limit to max_segment_duration segment_durations = [min(total_duration / len(animated_frames), self.max_segment_duration)] * len(animated_frames) # Create video clips for each animated segment video_clips = [] try: if parallel and len(animated_frames) > 1: # Process segments in parallel with ThreadPoolExecutor(max_workers=max_workers) as executor: # Prepare arguments for parallel processing args = [] for i, frames in enumerate(animated_frames): segment_duration = segment_durations[min(i, len(segment_durations)-1)] segment_text = segments[i] if segments and i < len(segments) else None args.append((frames, segment_duration, segment_text)) # Process in parallel video_clips = list(executor.map(lambda x: self.create_segment_clip(*x), args)) else: # Process segments sequentially for i, frames in enumerate(animated_frames): segment_duration = segment_durations[min(i, len(segment_durations)-1)] segment_text = segments[i] if segments and i < len(segments) else None segment_clip = self.create_segment_clip(frames, segment_duration, segment_text) video_clips.append(segment_clip) except Exception as e: st.warning(f"Error processing video segments: {str(e)}. Using fallback method.") # Fallback: Create a simple clip for each segment video_clips = [] for i, _ in enumerate(animated_frames): segment_duration = min(segment_durations[min(i, len(segment_durations)-1)], self.max_segment_duration) from moviepy.editor import ColorClip clip = ColorClip(self.get_video_dimensions(), color=(0, 0, 0), duration=segment_duration) video_clips.append(clip) # Concatenate all clips try: final_clip = concatenate_videoclips(video_clips) # Set the audio # If the video is shorter than the audio due to max_segment_duration, # we need to trim the audio to match the video duration audio_clip = audio_clip.subclip(0, min(final_clip.duration, audio_clip.duration)) final_clip = final_clip.set_audio(audio_clip) # Get target dimensions based on aspect ratio target_dimensions = self.get_video_dimensions() # Resize the final clip to match the target dimensions final_clip = final_clip.resize(target_dimensions) except Exception as e: st.warning(f"Error creating final video: {str(e)}. Using fallback method.") # Fallback: Create a simple video with the audio from moviepy.editor import ColorClip final_clip = ColorClip(self.get_video_dimensions(), color=(0, 0, 0), duration=total_duration) final_clip = final_clip.set_audio(audio_clip) # Write the result to a file output_path = f"{output_dir}/output_video_{self.aspect_ratio.replace(':', '_')}_{int(time.time())}.mp4" try: # Use lower resolution and bitrate for faster processing final_clip.write_videofile( output_path, fps=24, codec='libx264', audio_codec='aac', preset='ultrafast', # Faster encoding threads=max_workers, # Use multiple threads for encoding bitrate='1000k' # Lower bitrate ) except Exception as e: st.warning(f"Error writing video file: {str(e)}. Trying with simpler settings.") # Try with even simpler settings try: final_clip.write_videofile( output_path, fps=15, # Lower fps codec='libx264', audio_codec='aac', preset='ultrafast', threads=2, # Fewer threads bitrate='800k' # Lower bitrate ) except Exception as inner_e: st.error(f"Critical error writing video: {str(inner_e)}") # Create a text file explaining the error error_path = f"{output_dir}/error_video_{int(time.time())}.txt" with open(error_path, 'w') as f: f.write(f"Error creating video: {str(e)}\nSecondary error: {str(inner_e)}") return error_path # Cache the result self.video_cache[cache_key] = output_path return output_path except Exception as e: st.error(f"Critical error in video creation: {str(e)}") # Create a text file explaining the error error_path = f"{output_dir}/error_video_{int(time.time())}.txt" with open(error_path, 'w') as f: f.write(f"Error creating video: {str(e)}") return error_path finally: # Clean up temporary file if os.path.exists(audio_path): try: os.unlink(audio_path) except: pass def optimize_video(self, video_path, target_size=None, bitrate='1000k', threads=2): """Optimize video size and quality for web delivery""" if not os.path.exists(video_path) or video_path.endswith('.txt'): return video_path # Return as is if it's an error file or doesn't exist try: from moviepy.editor import VideoFileClip # Load the video clip = VideoFileClip(video_path) # If target_size is not provided, use aspect ratio-based dimensions if target_size is None: target_size = self.get_video_dimensions() # Resize to target size clip_resized = clip.resize(target_size) # Save optimized video optimized_path = video_path.replace('.mp4', f'_optimized_{int(time.time())}.mp4') try: clip_resized.write_videofile( optimized_path, codec='libx264', audio_codec='aac', preset='ultrafast', threads=threads, bitrate=bitrate ) except Exception as e: st.warning(f"Error optimizing video: {str(e)}. Using original video.") optimized_path = video_path # Close clips to free memory clip.close() clip_resized.close() return optimized_path except Exception as e: st.warning(f"Error in video optimization: {str(e)}. Using original video.") return video_path def clear_cache(self): """Clear the video cache""" self.video_cache = {} return True