# ----------------------- # Video Creation Functions # ----------------------- import os import time import tempfile import requests import json import io import base64 import numpy as np import cv2 import logging import uuid import subprocess from pathlib import Path import urllib.parse import pandas as pd from PIL import ImageFont, ImageDraw, Image import seaborn as sns logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) def create_silent_video(images, durations, output_path, logo_path="sozo_logo2.png", font_path="lazy_dog.ttf"): try: print("Initializing video creation...") height, width = 720, 1280 fps = 24 fourcc = cv2.VideoWriter_fourcc(*'mp4v') video = cv2.VideoWriter(output_path, fourcc, fps, (width, height)) if not video.isOpened(): print("❌ ERROR: Failed to create video file.") return None # Load font try: font_size = 45 font = ImageFont.truetype(font_path, font_size) print("✅ Font loaded successfully.") except Exception as e: print(f"⚠️ Font load error: {e}") font = None # Load logo logo = None if logo_path: try: logo = cv2.imread(logo_path) if logo is not None: logo = cv2.resize(logo, (width, height)) print("✅ Logo loaded successfully.") else: print(f"⚠️ Warning: Failed to load logo from {logo_path}.") except Exception as e: print(f"⚠️ Error loading logo: {e}") print(f"Processing {len(images)} images...") for idx, (img, duration) in enumerate(zip(images, durations)): try: print(f"➡️ Processing image {idx + 1}/{len(images)}...") if img.mode != "RGB": img = img.convert("RGB") img_resized = img.resize((width, height)) frame = np.array(img_resized) # Convert to OpenCV format frame_cv = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR) except Exception as e: print(f"❌ ERROR: Invalid image detected: {e}") if logo is not None: frame_cv = logo else: frame_cv = np.zeros((height, width, 3), dtype=np.uint8) # Convert frame to PIL for text overlay pil_img = Image.fromarray(cv2.cvtColor(frame_cv, cv2.COLOR_BGR2RGB)) draw = ImageDraw.Draw(pil_img) # Add "Sozo Dream Lab" text text1 = "Made With" text2 = "Sozo Dream Lab" if font: bbox = draw.textbbox((0, 0), text1, font=font) text1_height = bbox[3] - bbox[1] text_position1 = (width - 270, height - 120) text_position2 = (width - 330, height - 120 + text1_height + 5) draw.text(text_position1, text1, font=font, fill=(81, 34, 97, 255)) draw.text(text_position2, text2, font=font, fill=(81, 34, 97, 255)) # Convert back to OpenCV format frame_cv = cv2.cvtColor(np.array(pil_img), cv2.COLOR_RGB2BGR) # Write frame multiple times for _ in range(int(duration * fps)): video.write(frame_cv) # Add full-screen logo at the end if logo is not None: for _ in range(int(3 * fps)): video.write(logo) video.release() print("✅ Video creation completed successfully!") return output_path except Exception as e: print(f"❌ ERROR in video generation: {e}") return None def combine_video_audio(video_path, audio_files, output_path=None): try: if output_path is None: output_path = f"final_video_{uuid.uuid4()}.mp4" temp_audio_file = tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") temp_audio_file.close() # Combine multiple audio files if needed if len(audio_files) > 1: concat_list_path = tempfile.NamedTemporaryFile(delete=False, suffix=".txt") with open(concat_list_path.name, 'w') as f: for af in audio_files: f.write(f"file '{af}'\n") concat_cmd = ['ffmpeg', '-y', '-f', 'concat', '-safe', '0', '-i', concat_list_path.name, '-c', 'copy', temp_audio_file.name] subprocess.run(concat_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) os.unlink(concat_list_path.name) combined_audio = temp_audio_file.name else: combined_audio = audio_files[0] if audio_files else None if not combined_audio: return video_path # Return silent video if no audio available # Combine video and audio with compatibility flags combine_cmd = [ 'ffmpeg', '-y', '-i', video_path, '-i', combined_audio, '-map', '0:v', '-map', '1:a', '-c:v', 'libx264', '-pix_fmt', 'yuv420p', '-movflags', '+faststart', '-crf', '23', '-c:a', 'aac', '-shortest', output_path ] result = subprocess.run(combine_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) logger.info("FFmpeg stdout: %s", result.stdout.decode("utf-8")) logger.info("FFmpeg stderr: %s", result.stderr.decode("utf-8")) os.unlink(temp_audio_file.name) return output_path except Exception as e: logger.error(f"Error combining video and audio: {e}") return video_path def get_audio_duration(audio_path): """ Get duration of an audio file using FFmpeg. """ try: result = subprocess.run(['ffprobe', '-i', audio_path, '-show_entries', 'format=duration', '-v', 'quiet', '-of', 'csv=p=0'], stdout=subprocess.PIPE, stderr=subprocess.PIPE) return float(result.stdout.strip()) if result.stdout else 5.0 except Exception as e: logger.warning(f"Failed to get audio duration for {audio_path}: {e}") return 5.0 def create_video(images, audio_files, output_path=None): try: # Check if FFmpeg is installed try: subprocess.run(['ffmpeg', '-version'], stdout=subprocess.PIPE, stderr=subprocess.PIPE) except FileNotFoundError: logger.error("FFmpeg not installed.") return None if output_path is None: output_path = f"output_video_{uuid.uuid4()}.mp4" silent_video_path = f"silent_{uuid.uuid4()}.mp4" # Get durations for each image durations = [get_audio_duration(af) if af else 5.0 for af in audio_files] if len(durations) < len(images): durations.extend([5.0] * (len(images) - len(durations))) # Create silent video silent_video = create_silent_video(images, durations, silent_video_path) if not silent_video: return None # Combine silent video with audio final_video = combine_video_audio(silent_video, audio_files, output_path) # Clean up temporary files try: os.unlink(silent_video_path) except Exception: pass return final_video except Exception as e: logger.error(f"Error creating video: {e}") return None