File size: 15,318 Bytes
15ca2ca 7323bbb 15ca2ca 7323bbb a87d440 ad6d387 15ca2ca a87d440 ad6d387 a87d440 7323bbb a87d440 7323bbb a87d440 ad6d387 a87d440 7323bbb a87d440 7323bbb ad6d387 7323bbb 15ca2ca ad6d387 15ca2ca ad6d387 15ca2ca a87d440 7323bbb a87d440 ad6d387 a87d440 15ca2ca a87d440 ad6d387 a87d440 15ca2ca a87d440 15ca2ca 7323bbb 15ca2ca a87d440 15ca2ca a87d440 15ca2ca a87d440 15ca2ca a87d440 7323bbb | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 | import streamlit as st
import os
import tempfile
from moviepy.editor import ImageSequenceClip, AudioFileClip, concatenate_videoclips, TextClip, CompositeVideoClip
import numpy as np
from concurrent.futures import ThreadPoolExecutor
import time
class VideoCreator:
def __init__(self):
# Ensure output directory exists
os.makedirs("outputs", exist_ok=True)
self.video_cache = {}
self.aspect_ratio = "1:1" # Default aspect ratio
self.max_segment_duration = 5.0 # Maximum duration for any segment in seconds
def set_aspect_ratio(self, aspect_ratio):
"""Set the aspect ratio for video creation"""
self.aspect_ratio = aspect_ratio
def set_max_segment_duration(self, duration):
"""Set the maximum duration for any segment in seconds"""
self.max_segment_duration = duration
def get_video_dimensions(self, base_size=None):
"""Get video dimensions based on aspect ratio"""
if base_size is None:
# Default base sizes for different aspect ratios
if self.aspect_ratio == "1:1":
return (640, 640) # Square
elif self.aspect_ratio == "16:9":
return (854, 480) # Landscape HD
elif self.aspect_ratio == "9:16":
return (480, 854) # Portrait (mobile)
else:
return (640, 640) # Default square
# Calculate dimensions based on base size and aspect ratio
base_pixels = base_size[0] * base_size[1]
if self.aspect_ratio == "1:1":
# Square format
side = int(np.sqrt(base_pixels))
# Ensure even dimensions for video compatibility
side = side if side % 2 == 0 else side + 1
return (side, side)
elif self.aspect_ratio == "16:9":
# Landscape format
width = int(np.sqrt(base_pixels * 16 / 9))
height = int(width * 9 / 16)
# Ensure even dimensions for video compatibility
width = width if width % 2 == 0 else width + 1
height = height if height % 2 == 0 else height + 1
return (width, height)
elif self.aspect_ratio == "9:16":
# Portrait format
height = int(np.sqrt(base_pixels * 16 / 9))
width = int(height * 9 / 16)
# Ensure even dimensions for video compatibility
width = width if width % 2 == 0 else width + 1
height = height if height % 2 == 0 else height + 1
return (width, height)
else:
# Default to original size
return base_size
def create_segment_clip(self, frames, segment_duration, segment_text=None):
"""Create a video clip from frames with optional text overlay"""
try:
# Limit segment duration to max_segment_duration
segment_duration = min(segment_duration, self.max_segment_duration)
# Calculate frame duration based on segment duration
frame_duration = segment_duration / len(frames)
# Create a clip from the frames
segment_clip = ImageSequenceClip(frames, durations=[frame_duration] * len(frames))
# Add text overlay if segment text is provided
if segment_text:
try:
# Adjust text size and position based on aspect ratio
fontsize = 24
position = ('center', 'bottom')
if self.aspect_ratio == "9:16":
# For portrait, make text smaller and position it lower
fontsize = 20
position = ('center', 0.9) # 90% from top
elif self.aspect_ratio == "16:9":
# For landscape, position text at bottom
position = ('center', 0.95) # 95% from top
txt_clip = TextClip(
segment_text,
fontsize=fontsize,
color='white',
bg_color='rgba(0,0,0,0.5)',
size=(segment_clip.w, None),
method='caption'
).set_duration(segment_clip.duration)
txt_clip = txt_clip.set_position(position)
segment_clip = CompositeVideoClip([segment_clip, txt_clip])
except Exception as e:
# If TextClip fails, continue without text overlay
st.warning(f"Could not add text overlay: {str(e)}")
return segment_clip
except Exception as e:
st.warning(f"Error creating segment clip: {str(e)}. Using fallback method.")
# Fallback: Create a simple clip with the first frame
try:
# Use just the first frame if there's an issue with the sequence
first_frame = frames[0] if frames else None
if first_frame and os.path.exists(first_frame):
segment_clip = ImageSequenceClip([first_frame], durations=[segment_duration])
return segment_clip
else:
# Create a blank clip if no frames are available
from PIL import Image
blank_img = Image.new('RGB', self.get_video_dimensions(), color=(0, 0, 0))
blank_path = tempfile.mktemp(suffix='.png')
blank_img.save(blank_path)
segment_clip = ImageSequenceClip([blank_path], durations=[segment_duration])
return segment_clip
except Exception as inner_e:
st.error(f"Critical error in fallback clip creation: {str(inner_e)}")
# Last resort: Create an extremely simple clip
from moviepy.editor import ColorClip
return ColorClip(self.get_video_dimensions(), color=(0, 0, 0), duration=segment_duration)
def create_video_from_frames(self, animated_frames, audio_file, segments=None, timestamps=None,
output_dir="outputs", parallel=False, max_workers=4):
"""Create a video from animated frames synchronized with audio using parallel processing"""
# Generate a cache key based on inputs
import hashlib
cache_key = f"{hashlib.md5(audio_file.getvalue()).hexdigest()}_{len(animated_frames)}_{self.aspect_ratio}_{self.max_segment_duration}"
# Check if result is in cache
if cache_key in self.video_cache:
return self.video_cache[cache_key]
# Save the uploaded audio to a temporary file
with tempfile.NamedTemporaryFile(delete=False, suffix=".wav") as tmp_file:
tmp_file.write(audio_file.getvalue())
audio_path = tmp_file.name
try:
# Load the audio to get its duration
audio_clip = AudioFileClip(audio_path)
total_duration = audio_clip.duration
# Calculate segment durations
if timestamps:
# Use provided timestamps but limit to max_segment_duration
segment_durations = [min(end - start, self.max_segment_duration) for start, end in timestamps]
else:
# Distribute evenly but limit to max_segment_duration
segment_durations = [min(total_duration / len(animated_frames), self.max_segment_duration)] * len(animated_frames)
# Create video clips for each animated segment
video_clips = []
try:
if parallel and len(animated_frames) > 1:
# Process segments in parallel
with ThreadPoolExecutor(max_workers=max_workers) as executor:
# Prepare arguments for parallel processing
args = []
for i, frames in enumerate(animated_frames):
segment_duration = segment_durations[min(i, len(segment_durations)-1)]
segment_text = segments[i] if segments and i < len(segments) else None
args.append((frames, segment_duration, segment_text))
# Process in parallel
video_clips = list(executor.map(lambda x: self.create_segment_clip(*x), args))
else:
# Process segments sequentially
for i, frames in enumerate(animated_frames):
segment_duration = segment_durations[min(i, len(segment_durations)-1)]
segment_text = segments[i] if segments and i < len(segments) else None
segment_clip = self.create_segment_clip(frames, segment_duration, segment_text)
video_clips.append(segment_clip)
except Exception as e:
st.warning(f"Error processing video segments: {str(e)}. Using fallback method.")
# Fallback: Create a simple clip for each segment
video_clips = []
for i, _ in enumerate(animated_frames):
segment_duration = min(segment_durations[min(i, len(segment_durations)-1)], self.max_segment_duration)
from moviepy.editor import ColorClip
clip = ColorClip(self.get_video_dimensions(), color=(0, 0, 0), duration=segment_duration)
video_clips.append(clip)
# Concatenate all clips
try:
final_clip = concatenate_videoclips(video_clips)
# Set the audio
# If the video is shorter than the audio due to max_segment_duration,
# we need to trim the audio to match the video duration
audio_clip = audio_clip.subclip(0, min(final_clip.duration, audio_clip.duration))
final_clip = final_clip.set_audio(audio_clip)
# Get target dimensions based on aspect ratio
target_dimensions = self.get_video_dimensions()
# Resize the final clip to match the target dimensions
final_clip = final_clip.resize(target_dimensions)
except Exception as e:
st.warning(f"Error creating final video: {str(e)}. Using fallback method.")
# Fallback: Create a simple video with the audio
from moviepy.editor import ColorClip
final_clip = ColorClip(self.get_video_dimensions(), color=(0, 0, 0), duration=total_duration)
final_clip = final_clip.set_audio(audio_clip)
# Write the result to a file
output_path = f"{output_dir}/output_video_{self.aspect_ratio.replace(':', '_')}_{int(time.time())}.mp4"
try:
# Use lower resolution and bitrate for faster processing
final_clip.write_videofile(
output_path,
fps=24,
codec='libx264',
audio_codec='aac',
preset='ultrafast', # Faster encoding
threads=max_workers, # Use multiple threads for encoding
bitrate='1000k' # Lower bitrate
)
except Exception as e:
st.warning(f"Error writing video file: {str(e)}. Trying with simpler settings.")
# Try with even simpler settings
try:
final_clip.write_videofile(
output_path,
fps=15, # Lower fps
codec='libx264',
audio_codec='aac',
preset='ultrafast',
threads=2, # Fewer threads
bitrate='800k' # Lower bitrate
)
except Exception as inner_e:
st.error(f"Critical error writing video: {str(inner_e)}")
# Create a text file explaining the error
error_path = f"{output_dir}/error_video_{int(time.time())}.txt"
with open(error_path, 'w') as f:
f.write(f"Error creating video: {str(e)}\nSecondary error: {str(inner_e)}")
return error_path
# Cache the result
self.video_cache[cache_key] = output_path
return output_path
except Exception as e:
st.error(f"Critical error in video creation: {str(e)}")
# Create a text file explaining the error
error_path = f"{output_dir}/error_video_{int(time.time())}.txt"
with open(error_path, 'w') as f:
f.write(f"Error creating video: {str(e)}")
return error_path
finally:
# Clean up temporary file
if os.path.exists(audio_path):
try:
os.unlink(audio_path)
except:
pass
def optimize_video(self, video_path, target_size=None, bitrate='1000k', threads=2):
"""Optimize video size and quality for web delivery"""
if not os.path.exists(video_path) or video_path.endswith('.txt'):
return video_path # Return as is if it's an error file or doesn't exist
try:
from moviepy.editor import VideoFileClip
# Load the video
clip = VideoFileClip(video_path)
# If target_size is not provided, use aspect ratio-based dimensions
if target_size is None:
target_size = self.get_video_dimensions()
# Resize to target size
clip_resized = clip.resize(target_size)
# Save optimized video
optimized_path = video_path.replace('.mp4', f'_optimized_{int(time.time())}.mp4')
try:
clip_resized.write_videofile(
optimized_path,
codec='libx264',
audio_codec='aac',
preset='ultrafast',
threads=threads,
bitrate=bitrate
)
except Exception as e:
st.warning(f"Error optimizing video: {str(e)}. Using original video.")
optimized_path = video_path
# Close clips to free memory
clip.close()
clip_resized.close()
return optimized_path
except Exception as e:
st.warning(f"Error in video optimization: {str(e)}. Using original video.")
return video_path
def clear_cache(self):
"""Clear the video cache"""
self.video_cache = {}
return True
|