|
|
import streamlit as st |
|
|
import os |
|
|
import tempfile |
|
|
from moviepy.editor import ImageSequenceClip, AudioFileClip, concatenate_videoclips, TextClip, CompositeVideoClip |
|
|
import numpy as np |
|
|
from concurrent.futures import ThreadPoolExecutor |
|
|
import time |
|
|
|
|
|
class VideoCreator: |
|
|
def __init__(self): |
|
|
|
|
|
os.makedirs("outputs", exist_ok=True) |
|
|
self.video_cache = {} |
|
|
self.aspect_ratio = "1:1" |
|
|
self.max_segment_duration = 5.0 |
|
|
|
|
|
def set_aspect_ratio(self, aspect_ratio): |
|
|
"""Set the aspect ratio for video creation""" |
|
|
self.aspect_ratio = aspect_ratio |
|
|
|
|
|
def set_max_segment_duration(self, duration): |
|
|
"""Set the maximum duration for any segment in seconds""" |
|
|
self.max_segment_duration = duration |
|
|
|
|
|
def get_video_dimensions(self, base_size=None): |
|
|
"""Get video dimensions based on aspect ratio""" |
|
|
if base_size is None: |
|
|
|
|
|
if self.aspect_ratio == "1:1": |
|
|
return (640, 640) |
|
|
elif self.aspect_ratio == "16:9": |
|
|
return (854, 480) |
|
|
elif self.aspect_ratio == "9:16": |
|
|
return (480, 854) |
|
|
else: |
|
|
return (640, 640) |
|
|
|
|
|
|
|
|
base_pixels = base_size[0] * base_size[1] |
|
|
|
|
|
if self.aspect_ratio == "1:1": |
|
|
|
|
|
side = int(np.sqrt(base_pixels)) |
|
|
|
|
|
side = side if side % 2 == 0 else side + 1 |
|
|
return (side, side) |
|
|
elif self.aspect_ratio == "16:9": |
|
|
|
|
|
width = int(np.sqrt(base_pixels * 16 / 9)) |
|
|
height = int(width * 9 / 16) |
|
|
|
|
|
width = width if width % 2 == 0 else width + 1 |
|
|
height = height if height % 2 == 0 else height + 1 |
|
|
return (width, height) |
|
|
elif self.aspect_ratio == "9:16": |
|
|
|
|
|
height = int(np.sqrt(base_pixels * 16 / 9)) |
|
|
width = int(height * 9 / 16) |
|
|
|
|
|
width = width if width % 2 == 0 else width + 1 |
|
|
height = height if height % 2 == 0 else height + 1 |
|
|
return (width, height) |
|
|
else: |
|
|
|
|
|
return base_size |
|
|
|
|
|
def create_segment_clip(self, frames, segment_duration, segment_text=None): |
|
|
"""Create a video clip from frames with optional text overlay""" |
|
|
try: |
|
|
|
|
|
segment_duration = min(segment_duration, self.max_segment_duration) |
|
|
|
|
|
|
|
|
frame_duration = segment_duration / len(frames) |
|
|
|
|
|
|
|
|
segment_clip = ImageSequenceClip(frames, durations=[frame_duration] * len(frames)) |
|
|
|
|
|
|
|
|
if segment_text: |
|
|
try: |
|
|
|
|
|
fontsize = 24 |
|
|
position = ('center', 'bottom') |
|
|
|
|
|
if self.aspect_ratio == "9:16": |
|
|
|
|
|
fontsize = 20 |
|
|
position = ('center', 0.9) |
|
|
elif self.aspect_ratio == "16:9": |
|
|
|
|
|
position = ('center', 0.95) |
|
|
|
|
|
txt_clip = TextClip( |
|
|
segment_text, |
|
|
fontsize=fontsize, |
|
|
color='white', |
|
|
bg_color='rgba(0,0,0,0.5)', |
|
|
size=(segment_clip.w, None), |
|
|
method='caption' |
|
|
).set_duration(segment_clip.duration) |
|
|
|
|
|
txt_clip = txt_clip.set_position(position) |
|
|
segment_clip = CompositeVideoClip([segment_clip, txt_clip]) |
|
|
except Exception as e: |
|
|
|
|
|
st.warning(f"Could not add text overlay: {str(e)}") |
|
|
|
|
|
return segment_clip |
|
|
except Exception as e: |
|
|
st.warning(f"Error creating segment clip: {str(e)}. Using fallback method.") |
|
|
|
|
|
|
|
|
try: |
|
|
|
|
|
first_frame = frames[0] if frames else None |
|
|
if first_frame and os.path.exists(first_frame): |
|
|
segment_clip = ImageSequenceClip([first_frame], durations=[segment_duration]) |
|
|
return segment_clip |
|
|
else: |
|
|
|
|
|
from PIL import Image |
|
|
blank_img = Image.new('RGB', self.get_video_dimensions(), color=(0, 0, 0)) |
|
|
blank_path = tempfile.mktemp(suffix='.png') |
|
|
blank_img.save(blank_path) |
|
|
segment_clip = ImageSequenceClip([blank_path], durations=[segment_duration]) |
|
|
return segment_clip |
|
|
except Exception as inner_e: |
|
|
st.error(f"Critical error in fallback clip creation: {str(inner_e)}") |
|
|
|
|
|
from moviepy.editor import ColorClip |
|
|
return ColorClip(self.get_video_dimensions(), color=(0, 0, 0), duration=segment_duration) |
|
|
|
|
|
def create_video_from_frames(self, animated_frames, audio_file, segments=None, timestamps=None, |
|
|
output_dir="outputs", parallel=False, max_workers=4): |
|
|
"""Create a video from animated frames synchronized with audio using parallel processing""" |
|
|
|
|
|
import hashlib |
|
|
cache_key = f"{hashlib.md5(audio_file.getvalue()).hexdigest()}_{len(animated_frames)}_{self.aspect_ratio}_{self.max_segment_duration}" |
|
|
|
|
|
|
|
|
if cache_key in self.video_cache: |
|
|
return self.video_cache[cache_key] |
|
|
|
|
|
|
|
|
with tempfile.NamedTemporaryFile(delete=False, suffix=".wav") as tmp_file: |
|
|
tmp_file.write(audio_file.getvalue()) |
|
|
audio_path = tmp_file.name |
|
|
|
|
|
try: |
|
|
|
|
|
audio_clip = AudioFileClip(audio_path) |
|
|
total_duration = audio_clip.duration |
|
|
|
|
|
|
|
|
if timestamps: |
|
|
|
|
|
segment_durations = [min(end - start, self.max_segment_duration) for start, end in timestamps] |
|
|
else: |
|
|
|
|
|
segment_durations = [min(total_duration / len(animated_frames), self.max_segment_duration)] * len(animated_frames) |
|
|
|
|
|
|
|
|
video_clips = [] |
|
|
|
|
|
try: |
|
|
if parallel and len(animated_frames) > 1: |
|
|
|
|
|
with ThreadPoolExecutor(max_workers=max_workers) as executor: |
|
|
|
|
|
args = [] |
|
|
for i, frames in enumerate(animated_frames): |
|
|
segment_duration = segment_durations[min(i, len(segment_durations)-1)] |
|
|
segment_text = segments[i] if segments and i < len(segments) else None |
|
|
args.append((frames, segment_duration, segment_text)) |
|
|
|
|
|
|
|
|
video_clips = list(executor.map(lambda x: self.create_segment_clip(*x), args)) |
|
|
else: |
|
|
|
|
|
for i, frames in enumerate(animated_frames): |
|
|
segment_duration = segment_durations[min(i, len(segment_durations)-1)] |
|
|
segment_text = segments[i] if segments and i < len(segments) else None |
|
|
|
|
|
segment_clip = self.create_segment_clip(frames, segment_duration, segment_text) |
|
|
video_clips.append(segment_clip) |
|
|
except Exception as e: |
|
|
st.warning(f"Error processing video segments: {str(e)}. Using fallback method.") |
|
|
|
|
|
|
|
|
video_clips = [] |
|
|
for i, _ in enumerate(animated_frames): |
|
|
segment_duration = min(segment_durations[min(i, len(segment_durations)-1)], self.max_segment_duration) |
|
|
from moviepy.editor import ColorClip |
|
|
clip = ColorClip(self.get_video_dimensions(), color=(0, 0, 0), duration=segment_duration) |
|
|
video_clips.append(clip) |
|
|
|
|
|
|
|
|
try: |
|
|
final_clip = concatenate_videoclips(video_clips) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
audio_clip = audio_clip.subclip(0, min(final_clip.duration, audio_clip.duration)) |
|
|
final_clip = final_clip.set_audio(audio_clip) |
|
|
|
|
|
|
|
|
target_dimensions = self.get_video_dimensions() |
|
|
|
|
|
|
|
|
final_clip = final_clip.resize(target_dimensions) |
|
|
except Exception as e: |
|
|
st.warning(f"Error creating final video: {str(e)}. Using fallback method.") |
|
|
|
|
|
|
|
|
from moviepy.editor import ColorClip |
|
|
final_clip = ColorClip(self.get_video_dimensions(), color=(0, 0, 0), duration=total_duration) |
|
|
final_clip = final_clip.set_audio(audio_clip) |
|
|
|
|
|
|
|
|
output_path = f"{output_dir}/output_video_{self.aspect_ratio.replace(':', '_')}_{int(time.time())}.mp4" |
|
|
|
|
|
try: |
|
|
|
|
|
final_clip.write_videofile( |
|
|
output_path, |
|
|
fps=24, |
|
|
codec='libx264', |
|
|
audio_codec='aac', |
|
|
preset='ultrafast', |
|
|
threads=max_workers, |
|
|
bitrate='1000k' |
|
|
) |
|
|
except Exception as e: |
|
|
st.warning(f"Error writing video file: {str(e)}. Trying with simpler settings.") |
|
|
|
|
|
|
|
|
try: |
|
|
final_clip.write_videofile( |
|
|
output_path, |
|
|
fps=15, |
|
|
codec='libx264', |
|
|
audio_codec='aac', |
|
|
preset='ultrafast', |
|
|
threads=2, |
|
|
bitrate='800k' |
|
|
) |
|
|
except Exception as inner_e: |
|
|
st.error(f"Critical error writing video: {str(inner_e)}") |
|
|
|
|
|
error_path = f"{output_dir}/error_video_{int(time.time())}.txt" |
|
|
with open(error_path, 'w') as f: |
|
|
f.write(f"Error creating video: {str(e)}\nSecondary error: {str(inner_e)}") |
|
|
return error_path |
|
|
|
|
|
|
|
|
self.video_cache[cache_key] = output_path |
|
|
|
|
|
return output_path |
|
|
|
|
|
except Exception as e: |
|
|
st.error(f"Critical error in video creation: {str(e)}") |
|
|
|
|
|
error_path = f"{output_dir}/error_video_{int(time.time())}.txt" |
|
|
with open(error_path, 'w') as f: |
|
|
f.write(f"Error creating video: {str(e)}") |
|
|
return error_path |
|
|
finally: |
|
|
|
|
|
if os.path.exists(audio_path): |
|
|
try: |
|
|
os.unlink(audio_path) |
|
|
except: |
|
|
pass |
|
|
|
|
|
def optimize_video(self, video_path, target_size=None, bitrate='1000k', threads=2): |
|
|
"""Optimize video size and quality for web delivery""" |
|
|
if not os.path.exists(video_path) or video_path.endswith('.txt'): |
|
|
return video_path |
|
|
|
|
|
try: |
|
|
from moviepy.editor import VideoFileClip |
|
|
|
|
|
|
|
|
clip = VideoFileClip(video_path) |
|
|
|
|
|
|
|
|
if target_size is None: |
|
|
target_size = self.get_video_dimensions() |
|
|
|
|
|
|
|
|
clip_resized = clip.resize(target_size) |
|
|
|
|
|
|
|
|
optimized_path = video_path.replace('.mp4', f'_optimized_{int(time.time())}.mp4') |
|
|
|
|
|
try: |
|
|
clip_resized.write_videofile( |
|
|
optimized_path, |
|
|
codec='libx264', |
|
|
audio_codec='aac', |
|
|
preset='ultrafast', |
|
|
threads=threads, |
|
|
bitrate=bitrate |
|
|
) |
|
|
except Exception as e: |
|
|
st.warning(f"Error optimizing video: {str(e)}. Using original video.") |
|
|
optimized_path = video_path |
|
|
|
|
|
|
|
|
clip.close() |
|
|
clip_resized.close() |
|
|
|
|
|
return optimized_path |
|
|
except Exception as e: |
|
|
st.warning(f"Error in video optimization: {str(e)}. Using original video.") |
|
|
return video_path |
|
|
|
|
|
def clear_cache(self): |
|
|
"""Clear the video cache""" |
|
|
self.video_cache = {} |
|
|
return True |
|
|
|