Video-Fx / video_creator.py
garyuzair's picture
Upload 6 files
ad6d387 verified
import streamlit as st
import os
import tempfile
from moviepy.editor import ImageSequenceClip, AudioFileClip, concatenate_videoclips, TextClip, CompositeVideoClip
import numpy as np
from concurrent.futures import ThreadPoolExecutor
import time
class VideoCreator:
def __init__(self):
# Ensure output directory exists
os.makedirs("outputs", exist_ok=True)
self.video_cache = {}
self.aspect_ratio = "1:1" # Default aspect ratio
self.max_segment_duration = 5.0 # Maximum duration for any segment in seconds
def set_aspect_ratio(self, aspect_ratio):
"""Set the aspect ratio for video creation"""
self.aspect_ratio = aspect_ratio
def set_max_segment_duration(self, duration):
"""Set the maximum duration for any segment in seconds"""
self.max_segment_duration = duration
def get_video_dimensions(self, base_size=None):
"""Get video dimensions based on aspect ratio"""
if base_size is None:
# Default base sizes for different aspect ratios
if self.aspect_ratio == "1:1":
return (640, 640) # Square
elif self.aspect_ratio == "16:9":
return (854, 480) # Landscape HD
elif self.aspect_ratio == "9:16":
return (480, 854) # Portrait (mobile)
else:
return (640, 640) # Default square
# Calculate dimensions based on base size and aspect ratio
base_pixels = base_size[0] * base_size[1]
if self.aspect_ratio == "1:1":
# Square format
side = int(np.sqrt(base_pixels))
# Ensure even dimensions for video compatibility
side = side if side % 2 == 0 else side + 1
return (side, side)
elif self.aspect_ratio == "16:9":
# Landscape format
width = int(np.sqrt(base_pixels * 16 / 9))
height = int(width * 9 / 16)
# Ensure even dimensions for video compatibility
width = width if width % 2 == 0 else width + 1
height = height if height % 2 == 0 else height + 1
return (width, height)
elif self.aspect_ratio == "9:16":
# Portrait format
height = int(np.sqrt(base_pixels * 16 / 9))
width = int(height * 9 / 16)
# Ensure even dimensions for video compatibility
width = width if width % 2 == 0 else width + 1
height = height if height % 2 == 0 else height + 1
return (width, height)
else:
# Default to original size
return base_size
def create_segment_clip(self, frames, segment_duration, segment_text=None):
"""Create a video clip from frames with optional text overlay"""
try:
# Limit segment duration to max_segment_duration
segment_duration = min(segment_duration, self.max_segment_duration)
# Calculate frame duration based on segment duration
frame_duration = segment_duration / len(frames)
# Create a clip from the frames
segment_clip = ImageSequenceClip(frames, durations=[frame_duration] * len(frames))
# Add text overlay if segment text is provided
if segment_text:
try:
# Adjust text size and position based on aspect ratio
fontsize = 24
position = ('center', 'bottom')
if self.aspect_ratio == "9:16":
# For portrait, make text smaller and position it lower
fontsize = 20
position = ('center', 0.9) # 90% from top
elif self.aspect_ratio == "16:9":
# For landscape, position text at bottom
position = ('center', 0.95) # 95% from top
txt_clip = TextClip(
segment_text,
fontsize=fontsize,
color='white',
bg_color='rgba(0,0,0,0.5)',
size=(segment_clip.w, None),
method='caption'
).set_duration(segment_clip.duration)
txt_clip = txt_clip.set_position(position)
segment_clip = CompositeVideoClip([segment_clip, txt_clip])
except Exception as e:
# If TextClip fails, continue without text overlay
st.warning(f"Could not add text overlay: {str(e)}")
return segment_clip
except Exception as e:
st.warning(f"Error creating segment clip: {str(e)}. Using fallback method.")
# Fallback: Create a simple clip with the first frame
try:
# Use just the first frame if there's an issue with the sequence
first_frame = frames[0] if frames else None
if first_frame and os.path.exists(first_frame):
segment_clip = ImageSequenceClip([first_frame], durations=[segment_duration])
return segment_clip
else:
# Create a blank clip if no frames are available
from PIL import Image
blank_img = Image.new('RGB', self.get_video_dimensions(), color=(0, 0, 0))
blank_path = tempfile.mktemp(suffix='.png')
blank_img.save(blank_path)
segment_clip = ImageSequenceClip([blank_path], durations=[segment_duration])
return segment_clip
except Exception as inner_e:
st.error(f"Critical error in fallback clip creation: {str(inner_e)}")
# Last resort: Create an extremely simple clip
from moviepy.editor import ColorClip
return ColorClip(self.get_video_dimensions(), color=(0, 0, 0), duration=segment_duration)
def create_video_from_frames(self, animated_frames, audio_file, segments=None, timestamps=None,
output_dir="outputs", parallel=False, max_workers=4):
"""Create a video from animated frames synchronized with audio using parallel processing"""
# Generate a cache key based on inputs
import hashlib
cache_key = f"{hashlib.md5(audio_file.getvalue()).hexdigest()}_{len(animated_frames)}_{self.aspect_ratio}_{self.max_segment_duration}"
# Check if result is in cache
if cache_key in self.video_cache:
return self.video_cache[cache_key]
# Save the uploaded audio to a temporary file
with tempfile.NamedTemporaryFile(delete=False, suffix=".wav") as tmp_file:
tmp_file.write(audio_file.getvalue())
audio_path = tmp_file.name
try:
# Load the audio to get its duration
audio_clip = AudioFileClip(audio_path)
total_duration = audio_clip.duration
# Calculate segment durations
if timestamps:
# Use provided timestamps but limit to max_segment_duration
segment_durations = [min(end - start, self.max_segment_duration) for start, end in timestamps]
else:
# Distribute evenly but limit to max_segment_duration
segment_durations = [min(total_duration / len(animated_frames), self.max_segment_duration)] * len(animated_frames)
# Create video clips for each animated segment
video_clips = []
try:
if parallel and len(animated_frames) > 1:
# Process segments in parallel
with ThreadPoolExecutor(max_workers=max_workers) as executor:
# Prepare arguments for parallel processing
args = []
for i, frames in enumerate(animated_frames):
segment_duration = segment_durations[min(i, len(segment_durations)-1)]
segment_text = segments[i] if segments and i < len(segments) else None
args.append((frames, segment_duration, segment_text))
# Process in parallel
video_clips = list(executor.map(lambda x: self.create_segment_clip(*x), args))
else:
# Process segments sequentially
for i, frames in enumerate(animated_frames):
segment_duration = segment_durations[min(i, len(segment_durations)-1)]
segment_text = segments[i] if segments and i < len(segments) else None
segment_clip = self.create_segment_clip(frames, segment_duration, segment_text)
video_clips.append(segment_clip)
except Exception as e:
st.warning(f"Error processing video segments: {str(e)}. Using fallback method.")
# Fallback: Create a simple clip for each segment
video_clips = []
for i, _ in enumerate(animated_frames):
segment_duration = min(segment_durations[min(i, len(segment_durations)-1)], self.max_segment_duration)
from moviepy.editor import ColorClip
clip = ColorClip(self.get_video_dimensions(), color=(0, 0, 0), duration=segment_duration)
video_clips.append(clip)
# Concatenate all clips
try:
final_clip = concatenate_videoclips(video_clips)
# Set the audio
# If the video is shorter than the audio due to max_segment_duration,
# we need to trim the audio to match the video duration
audio_clip = audio_clip.subclip(0, min(final_clip.duration, audio_clip.duration))
final_clip = final_clip.set_audio(audio_clip)
# Get target dimensions based on aspect ratio
target_dimensions = self.get_video_dimensions()
# Resize the final clip to match the target dimensions
final_clip = final_clip.resize(target_dimensions)
except Exception as e:
st.warning(f"Error creating final video: {str(e)}. Using fallback method.")
# Fallback: Create a simple video with the audio
from moviepy.editor import ColorClip
final_clip = ColorClip(self.get_video_dimensions(), color=(0, 0, 0), duration=total_duration)
final_clip = final_clip.set_audio(audio_clip)
# Write the result to a file
output_path = f"{output_dir}/output_video_{self.aspect_ratio.replace(':', '_')}_{int(time.time())}.mp4"
try:
# Use lower resolution and bitrate for faster processing
final_clip.write_videofile(
output_path,
fps=24,
codec='libx264',
audio_codec='aac',
preset='ultrafast', # Faster encoding
threads=max_workers, # Use multiple threads for encoding
bitrate='1000k' # Lower bitrate
)
except Exception as e:
st.warning(f"Error writing video file: {str(e)}. Trying with simpler settings.")
# Try with even simpler settings
try:
final_clip.write_videofile(
output_path,
fps=15, # Lower fps
codec='libx264',
audio_codec='aac',
preset='ultrafast',
threads=2, # Fewer threads
bitrate='800k' # Lower bitrate
)
except Exception as inner_e:
st.error(f"Critical error writing video: {str(inner_e)}")
# Create a text file explaining the error
error_path = f"{output_dir}/error_video_{int(time.time())}.txt"
with open(error_path, 'w') as f:
f.write(f"Error creating video: {str(e)}\nSecondary error: {str(inner_e)}")
return error_path
# Cache the result
self.video_cache[cache_key] = output_path
return output_path
except Exception as e:
st.error(f"Critical error in video creation: {str(e)}")
# Create a text file explaining the error
error_path = f"{output_dir}/error_video_{int(time.time())}.txt"
with open(error_path, 'w') as f:
f.write(f"Error creating video: {str(e)}")
return error_path
finally:
# Clean up temporary file
if os.path.exists(audio_path):
try:
os.unlink(audio_path)
except:
pass
def optimize_video(self, video_path, target_size=None, bitrate='1000k', threads=2):
"""Optimize video size and quality for web delivery"""
if not os.path.exists(video_path) or video_path.endswith('.txt'):
return video_path # Return as is if it's an error file or doesn't exist
try:
from moviepy.editor import VideoFileClip
# Load the video
clip = VideoFileClip(video_path)
# If target_size is not provided, use aspect ratio-based dimensions
if target_size is None:
target_size = self.get_video_dimensions()
# Resize to target size
clip_resized = clip.resize(target_size)
# Save optimized video
optimized_path = video_path.replace('.mp4', f'_optimized_{int(time.time())}.mp4')
try:
clip_resized.write_videofile(
optimized_path,
codec='libx264',
audio_codec='aac',
preset='ultrafast',
threads=threads,
bitrate=bitrate
)
except Exception as e:
st.warning(f"Error optimizing video: {str(e)}. Using original video.")
optimized_path = video_path
# Close clips to free memory
clip.close()
clip_resized.close()
return optimized_path
except Exception as e:
st.warning(f"Error in video optimization: {str(e)}. Using original video.")
return video_path
def clear_cache(self):
"""Clear the video cache"""
self.video_cache = {}
return True