Spaces:
Sleeping
Sleeping
| # ----------------------- | |
| # Video Creation Functions | |
| # ----------------------- | |
| import os | |
| import time | |
| import tempfile | |
| import requests | |
| import json | |
| import io | |
| import base64 | |
| import numpy as np | |
| import cv2 | |
| import logging | |
| import uuid | |
| import subprocess | |
| from pathlib import Path | |
| import urllib.parse | |
| import pandas as pd | |
| from PIL import ImageFont, ImageDraw, Image | |
| import seaborn as sns | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| def create_silent_video(images, durations, output_path, logo_path="sozo_logo2.png", font_path="lazy_dog.ttf"): | |
| try: | |
| print("Initializing video creation...") | |
| height, width = 720, 1280 | |
| fps = 24 | |
| fourcc = cv2.VideoWriter_fourcc(*'mp4v') | |
| video = cv2.VideoWriter(output_path, fourcc, fps, (width, height)) | |
| if not video.isOpened(): | |
| print("❌ ERROR: Failed to create video file.") | |
| return None | |
| # Load font | |
| try: | |
| font_size = 45 | |
| font = ImageFont.truetype(font_path, font_size) | |
| print("✅ Font loaded successfully.") | |
| except Exception as e: | |
| print(f"⚠️ Font load error: {e}") | |
| font = None | |
| # Load logo | |
| logo = None | |
| if logo_path: | |
| try: | |
| logo = cv2.imread(logo_path) | |
| if logo is not None: | |
| logo = cv2.resize(logo, (width, height)) | |
| print("✅ Logo loaded successfully.") | |
| else: | |
| print(f"⚠️ Warning: Failed to load logo from {logo_path}.") | |
| except Exception as e: | |
| print(f"⚠️ Error loading logo: {e}") | |
| print(f"Processing {len(images)} images...") | |
| for idx, (img, duration) in enumerate(zip(images, durations)): | |
| try: | |
| print(f"➡️ Processing image {idx + 1}/{len(images)}...") | |
| if img.mode != "RGB": | |
| img = img.convert("RGB") | |
| img_resized = img.resize((width, height)) | |
| frame = np.array(img_resized) | |
| # Convert to OpenCV format | |
| frame_cv = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR) | |
| except Exception as e: | |
| print(f"❌ ERROR: Invalid image detected: {e}") | |
| if logo is not None: | |
| frame_cv = logo | |
| else: | |
| frame_cv = np.zeros((height, width, 3), dtype=np.uint8) | |
| # Convert frame to PIL for text overlay | |
| pil_img = Image.fromarray(cv2.cvtColor(frame_cv, cv2.COLOR_BGR2RGB)) | |
| draw = ImageDraw.Draw(pil_img) | |
| # Add "Sozo Dream Lab" text | |
| text1 = "Made With" | |
| text2 = "Sozo Dream Lab" | |
| if font: | |
| bbox = draw.textbbox((0, 0), text1, font=font) | |
| text1_height = bbox[3] - bbox[1] | |
| text_position1 = (width - 270, height - 120) | |
| text_position2 = (width - 330, height - 120 + text1_height + 5) | |
| draw.text(text_position1, text1, font=font, fill=(81, 34, 97, 255)) | |
| draw.text(text_position2, text2, font=font, fill=(81, 34, 97, 255)) | |
| # Convert back to OpenCV format | |
| frame_cv = cv2.cvtColor(np.array(pil_img), cv2.COLOR_RGB2BGR) | |
| # Write frame multiple times | |
| for _ in range(int(duration * fps)): | |
| video.write(frame_cv) | |
| # Add full-screen logo at the end | |
| if logo is not None: | |
| for _ in range(int(3 * fps)): | |
| video.write(logo) | |
| video.release() | |
| print("✅ Video creation completed successfully!") | |
| return output_path | |
| except Exception as e: | |
| print(f"❌ ERROR in video generation: {e}") | |
| return None | |
| def combine_video_audio(video_path, audio_files, output_path=None): | |
| try: | |
| if output_path is None: | |
| output_path = f"final_video_{uuid.uuid4()}.mp4" | |
| temp_audio_file = tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") | |
| temp_audio_file.close() | |
| # Combine multiple audio files if needed | |
| if len(audio_files) > 1: | |
| concat_list_path = tempfile.NamedTemporaryFile(delete=False, suffix=".txt") | |
| with open(concat_list_path.name, 'w') as f: | |
| for af in audio_files: | |
| f.write(f"file '{af}'\n") | |
| concat_cmd = ['ffmpeg', '-y', '-f', 'concat', '-safe', '0', | |
| '-i', concat_list_path.name, '-c', 'copy', temp_audio_file.name] | |
| subprocess.run(concat_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) | |
| os.unlink(concat_list_path.name) | |
| combined_audio = temp_audio_file.name | |
| else: | |
| combined_audio = audio_files[0] if audio_files else None | |
| if not combined_audio: | |
| return video_path # Return silent video if no audio available | |
| # Combine video and audio with compatibility flags | |
| combine_cmd = [ | |
| 'ffmpeg', '-y', | |
| '-i', video_path, | |
| '-i', combined_audio, | |
| '-map', '0:v', | |
| '-map', '1:a', | |
| '-c:v', 'libx264', | |
| '-pix_fmt', 'yuv420p', | |
| '-movflags', '+faststart', | |
| '-crf', '23', | |
| '-c:a', 'aac', | |
| '-shortest', | |
| output_path | |
| ] | |
| result = subprocess.run(combine_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) | |
| logger.info("FFmpeg stdout: %s", result.stdout.decode("utf-8")) | |
| logger.info("FFmpeg stderr: %s", result.stderr.decode("utf-8")) | |
| os.unlink(temp_audio_file.name) | |
| return output_path | |
| except Exception as e: | |
| logger.error(f"Error combining video and audio: {e}") | |
| return video_path | |
| def get_audio_duration(audio_path): | |
| """ Get duration of an audio file using FFmpeg. """ | |
| try: | |
| result = subprocess.run(['ffprobe', '-i', audio_path, '-show_entries', 'format=duration', | |
| '-v', 'quiet', '-of', 'csv=p=0'], stdout=subprocess.PIPE, stderr=subprocess.PIPE) | |
| return float(result.stdout.strip()) if result.stdout else 5.0 | |
| except Exception as e: | |
| logger.warning(f"Failed to get audio duration for {audio_path}: {e}") | |
| return 5.0 | |
| def create_video(images, audio_files, output_path=None): | |
| try: | |
| # Check if FFmpeg is installed | |
| try: | |
| subprocess.run(['ffmpeg', '-version'], stdout=subprocess.PIPE, stderr=subprocess.PIPE) | |
| except FileNotFoundError: | |
| logger.error("FFmpeg not installed.") | |
| return None | |
| if output_path is None: | |
| output_path = f"output_video_{uuid.uuid4()}.mp4" | |
| silent_video_path = f"silent_{uuid.uuid4()}.mp4" | |
| # Get durations for each image | |
| durations = [get_audio_duration(af) if af else 5.0 for af in audio_files] | |
| if len(durations) < len(images): | |
| durations.extend([5.0] * (len(images) - len(durations))) | |
| # Create silent video | |
| silent_video = create_silent_video(images, durations, silent_video_path) | |
| if not silent_video: | |
| return None | |
| # Combine silent video with audio | |
| final_video = combine_video_audio(silent_video, audio_files, output_path) | |
| # Clean up temporary files | |
| try: | |
| os.unlink(silent_video_path) | |
| except Exception: | |
| pass | |
| return final_video | |
| except Exception as e: | |
| logger.error(f"Error creating video: {e}") | |
| return None |