Chronicle-Creators / utility /render_engine.py
alamnahin's picture
Make captions optional - skip gracefully if text rendering fails
b217db3
import time
import os
import tempfile
import zipfile
import platform
import subprocess
import logging
from pathlib import Path
from moviepy.editor import (AudioFileClip, CompositeVideoClip, CompositeAudioClip, ImageClip,
TextClip, VideoFileClip)
from moviepy.audio.fx.audio_loop import audio_loop
from moviepy.audio.fx.audio_normalize import audio_normalize
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
logger = logging.getLogger(__name__)
def download_file(url, filename, max_retries=3, timeout=30):
"""Download a file with retry logic and timeout
Args:
url (str): URL to download from
filename (str): Path to save file to
max_retries (int): Number of retries on failure
timeout (int): Timeout in seconds per request
"""
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"
}
# Create session with retry strategy
session = requests.Session()
retry_strategy = Retry(
total=max_retries,
backoff_factor=1,
status_forcelist=[429, 500, 502, 503, 504],
allowed_methods=["GET"]
)
adapter = HTTPAdapter(max_retries=retry_strategy)
session.mount("http://", adapter)
session.mount("https://", adapter)
try:
# Stream response to handle large files better
response = session.get(url, headers=headers, timeout=timeout, stream=True)
response.raise_for_status()
# Write file with proper error handling
with open(filename, 'wb') as f:
total_size = int(response.headers.get('content-length', 0))
downloaded = 0
chunk_size = 8192
for chunk in response.iter_content(chunk_size=chunk_size):
if chunk: # filter out keep-alive new chunks
f.write(chunk)
downloaded += len(chunk)
if total_size:
pct = (downloaded / total_size) * 100
logger.debug(f"Downloaded {downloaded}/{total_size} bytes ({pct:.1f}%)")
# Verify file was written
if os.path.getsize(filename) == 0:
raise Exception("Downloaded file is empty")
logger.info(f"Successfully downloaded {filename}")
except requests.exceptions.Timeout:
if os.path.exists(filename):
os.remove(filename)
raise Exception(f"Download timeout: {url}")
except requests.exceptions.ConnectionError as e:
if os.path.exists(filename):
os.remove(filename)
raise Exception(f"Connection error downloading video: {str(e)}")
except Exception as e:
if os.path.exists(filename):
os.remove(filename)
raise Exception(f"Failed to download {url}: {str(e)}")
def search_program(program_name):
try:
search_cmd = "where" if platform.system() == "Windows" else "which"
return subprocess.check_output([search_cmd, program_name]).decode().strip()
except subprocess.CalledProcessError:
return None
def get_program_path(program_name):
program_path = search_program(program_name)
return program_path
def get_output_media(audio_file_path, timed_captions, background_video_data, video_server):
"""Generate final video with audio and captions
Args:
audio_file_path (str): Path to audio file
timed_captions (list): List of timed captions
background_video_data (list): List of background video data
video_server (str): Video server URL
Returns:
str: Path to output video file
Raises:
Exception: If video rendering fails
"""
OUTPUT_FILE_NAME = "rendered_video.mp4"
from utility.conf import IMAGEMAGICK_BINARY
from moviepy.config import change_settings
try:
# Validate input files
if not Path(audio_file_path).exists():
raise FileNotFoundError(f"Audio file not found at {audio_file_path}")
try:
change_settings({"IMAGEMAGICK_BINARY": IMAGEMAGICK_BINARY})
logger.info(f"Using ImageMagick from: {IMAGEMAGICK_BINARY}")
except Exception as e:
logger.error(f"Error configuring ImageMagick: {str(e)}")
raise Exception(f"ImageMagick configuration failed: {str(e)}")
except Exception as e:
logger.error(f"Error in initial setup: {str(e)}")
raise Exception(f"Initial setup failed: {str(e)}")
visual_clips = []
downloaded_files = []
video_found = False
for (t1, t2), video_url in background_video_data:
if not video_url:
logger.warning(f"Skipping empty video URL for segment {t1}-{t2}s")
continue
if t2 <= t1:
logger.warning(f"Skipping non-positive duration segment {t1}-{t2}s")
continue
try:
video_filename = tempfile.NamedTemporaryFile(delete=False, suffix=".mp4").name
logger.info(f"Downloading video from {video_url}")
try:
download_file(video_url, video_filename)
except Exception as download_err:
logger.warning(f"Failed to download video from {video_url}: {str(download_err)}")
if os.path.exists(video_filename):
os.remove(video_filename)
logger.info(f"Skipping this video and trying next alternative...")
continue
downloaded_files.append(video_filename)
if not Path(video_filename).exists():
logger.warning(f"Video file not created for {video_url}, trying next...")
continue
video_clip = VideoFileClip(video_filename)
if video_clip is None:
logger.warning(f"Failed to create video clip from {video_filename}, trying next...")
continue
clip_duration = min(video_clip.duration, max(0, t2 - t1))
if clip_duration <= 0:
logger.warning(f"Video clip duration invalid for {video_url}, trying next...")
continue
video_clip = video_clip.subclip(0, clip_duration).set_start(t1)
video_clip = video_clip.set_end(t1 + clip_duration)
visual_clips.append(video_clip)
video_found = True
logger.info(f"Added video clip from {video_url} ({t1}-{t1 + clip_duration}s)")
except Exception as e:
logger.error(f"Error processing video {video_url}: {str(e)}")
raise Exception(f"Failed to process video {video_url}: {str(e)}")
audio_clips = []
try:
# Verify audio file exists and is valid
if not os.path.exists(audio_file_path):
raise FileNotFoundError(f"Audio file not found: {audio_file_path}")
audio_file_clip = AudioFileClip(audio_file_path)
if audio_file_clip is None:
raise ValueError(f"Failed to create audio clip from {audio_file_path}")
# Normalize audio volume
audio_file_clip = audio_normalize(audio_file_clip)
# Verify audio duration
if audio_file_clip.duration <= 0:
raise ValueError("Audio file has zero or negative duration")
audio_clips.append(audio_file_clip)
logger.info(f"Added audio clip from {audio_file_path} (duration: {audio_file_clip.duration:.2f}s)")
except Exception as e:
logger.error(f"Error processing audio: {str(e)}")
raise Exception(f"Failed to process audio: {str(e)}")
for (t1, t2), text in timed_captions:
try:
# Try PIL method first to avoid ImageMagick security policy issues
try:
text_clip = TextClip(
txt=text,
fontsize=60,
color="white",
method="label",
font="DejaVu-Sans-Bold"
)
text_clip = text_clip.set_start(t1).set_end(t2).set_position(('center','bottom'))
visual_clips.append(text_clip)
logger.info(f"Added text clip: {text} ({t1}-{t2}s)")
except Exception as pil_err:
logger.warning(f"PIL text rendering failed, skipping captions: {str(pil_err)}")
# Skip captions if PIL fails to avoid blocking video generation
pass
except Exception as e:
logger.warning(f"Skipping text clip due to error: {str(e)}")
try:
if not video_found:
raise ValueError("No background videos available for rendering")
video = CompositeVideoClip(visual_clips)
if audio_clips:
audio = CompositeAudioClip(audio_clips)
# Ensure video duration matches audio and update video with audio properly
if video.duration < audio.duration:
last_clip = visual_clips[-1]
extended_clip = last_clip.set_end(audio.duration)
visual_clips[-1] = extended_clip
video = CompositeVideoClip(visual_clips)
video = video.set_duration(audio.duration)
# Updated audio application using set_audio
video = video.set_audio(audio)
logger.info(f"Audio synchronized with video (duration: {video.duration:.2f}s)")
logger.info(f"Rendering final video to {OUTPUT_FILE_NAME}")
video.write_videofile(OUTPUT_FILE_NAME, codec='libx264', audio_codec='aac', fps=25, preset='veryfast')
# Clean up downloaded files
for video_filename in downloaded_files:
if Path(video_filename).exists():
os.remove(video_filename)
logger.info(f"Cleaned up temporary file: {video_filename}")
if not Path(OUTPUT_FILE_NAME).exists():
raise FileNotFoundError(f"Failed to create output video at {OUTPUT_FILE_NAME}")
logger.info(f"Successfully rendered video at {OUTPUT_FILE_NAME}")
return OUTPUT_FILE_NAME
except Exception as e:
logger.error(f"Error rendering video: {str(e)}")
raise Exception(f"Video rendering failed: {str(e)}")