sozo-api / video_gen.py
rairo's picture
Update video_gen.py
cfd4972 verified
raw
history blame
7.58 kB
# -----------------------
# Video Creation Functions
# -----------------------
import os
import time
import tempfile
import requests
import json
import io
import base64
import numpy as np
import cv2
import logging
import uuid
import subprocess
from pathlib import Path
import urllib.parse
import pandas as pd
from PIL import ImageFont, ImageDraw, Image
import seaborn as sns
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def create_silent_video(images, durations, output_path, logo_path="sozo_logo2.png", font_path="lazy_dog.ttf"):
try:
print("Initializing video creation...")
height, width = 720, 1280
fps = 24
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
video = cv2.VideoWriter(output_path, fourcc, fps, (width, height))
if not video.isOpened():
print("❌ ERROR: Failed to create video file.")
return None
# Load font
try:
font_size = 45
font = ImageFont.truetype(font_path, font_size)
print("✅ Font loaded successfully.")
except Exception as e:
print(f"⚠️ Font load error: {e}")
font = None
# Load logo
logo = None
if logo_path:
try:
logo = cv2.imread(logo_path)
if logo is not None:
logo = cv2.resize(logo, (width, height))
print("✅ Logo loaded successfully.")
else:
print(f"⚠️ Warning: Failed to load logo from {logo_path}.")
except Exception as e:
print(f"⚠️ Error loading logo: {e}")
print(f"Processing {len(images)} images...")
for idx, (img, duration) in enumerate(zip(images, durations)):
try:
print(f"➡️ Processing image {idx + 1}/{len(images)}...")
if img.mode != "RGB":
img = img.convert("RGB")
img_resized = img.resize((width, height))
frame = np.array(img_resized)
# Convert to OpenCV format
frame_cv = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)
except Exception as e:
print(f"❌ ERROR: Invalid image detected: {e}")
if logo is not None:
frame_cv = logo
else:
frame_cv = np.zeros((height, width, 3), dtype=np.uint8)
# Convert frame to PIL for text overlay
pil_img = Image.fromarray(cv2.cvtColor(frame_cv, cv2.COLOR_BGR2RGB))
draw = ImageDraw.Draw(pil_img)
# Add "Sozo Dream Lab" text
text1 = "Made With"
text2 = "Sozo Dream Lab"
if font:
bbox = draw.textbbox((0, 0), text1, font=font)
text1_height = bbox[3] - bbox[1]
text_position1 = (width - 270, height - 120)
text_position2 = (width - 330, height - 120 + text1_height + 5)
draw.text(text_position1, text1, font=font, fill=(81, 34, 97, 255))
draw.text(text_position2, text2, font=font, fill=(81, 34, 97, 255))
# Convert back to OpenCV format
frame_cv = cv2.cvtColor(np.array(pil_img), cv2.COLOR_RGB2BGR)
# Write frame multiple times
for _ in range(int(duration * fps)):
video.write(frame_cv)
# Add full-screen logo at the end
if logo is not None:
for _ in range(int(3 * fps)):
video.write(logo)
video.release()
print("✅ Video creation completed successfully!")
return output_path
except Exception as e:
print(f"❌ ERROR in video generation: {e}")
return None
def combine_video_audio(video_path, audio_files, output_path=None):
try:
if output_path is None:
output_path = f"final_video_{uuid.uuid4()}.mp4"
temp_audio_file = tempfile.NamedTemporaryFile(delete=False, suffix=".mp3")
temp_audio_file.close()
# Combine multiple audio files if needed
if len(audio_files) > 1:
concat_list_path = tempfile.NamedTemporaryFile(delete=False, suffix=".txt")
with open(concat_list_path.name, 'w') as f:
for af in audio_files:
f.write(f"file '{af}'\n")
concat_cmd = ['ffmpeg', '-y', '-f', 'concat', '-safe', '0',
'-i', concat_list_path.name, '-c', 'copy', temp_audio_file.name]
subprocess.run(concat_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
os.unlink(concat_list_path.name)
combined_audio = temp_audio_file.name
else:
combined_audio = audio_files[0] if audio_files else None
if not combined_audio:
return video_path # Return silent video if no audio available
# Combine video and audio with compatibility flags
combine_cmd = [
'ffmpeg', '-y',
'-i', video_path,
'-i', combined_audio,
'-map', '0:v',
'-map', '1:a',
'-c:v', 'libx264',
'-pix_fmt', 'yuv420p',
'-movflags', '+faststart',
'-crf', '23',
'-c:a', 'aac',
'-shortest',
output_path
]
result = subprocess.run(combine_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
logger.info("FFmpeg stdout: %s", result.stdout.decode("utf-8"))
logger.info("FFmpeg stderr: %s", result.stderr.decode("utf-8"))
os.unlink(temp_audio_file.name)
return output_path
except Exception as e:
logger.error(f"Error combining video and audio: {e}")
return video_path
def get_audio_duration(audio_path):
""" Get duration of an audio file using FFmpeg. """
try:
result = subprocess.run(['ffprobe', '-i', audio_path, '-show_entries', 'format=duration',
'-v', 'quiet', '-of', 'csv=p=0'], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
return float(result.stdout.strip()) if result.stdout else 5.0
except Exception as e:
logger.warning(f"Failed to get audio duration for {audio_path}: {e}")
return 5.0
def create_video(images, audio_files, output_path=None):
try:
# Check if FFmpeg is installed
try:
subprocess.run(['ffmpeg', '-version'], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
except FileNotFoundError:
logger.error("FFmpeg not installed.")
return None
if output_path is None:
output_path = f"output_video_{uuid.uuid4()}.mp4"
silent_video_path = f"silent_{uuid.uuid4()}.mp4"
# Get durations for each image
durations = [get_audio_duration(af) if af else 5.0 for af in audio_files]
if len(durations) < len(images):
durations.extend([5.0] * (len(images) - len(durations)))
# Create silent video
silent_video = create_silent_video(images, durations, silent_video_path)
if not silent_video:
return None
# Combine silent video with audio
final_video = combine_video_audio(silent_video, audio_files, output_path)
# Clean up temporary files
try:
os.unlink(silent_video_path)
except Exception:
pass
return final_video
except Exception as e:
logger.error(f"Error creating video: {e}")
return None