Spaces:
Sleeping
Sleeping
| import time | |
| import os | |
| import tempfile | |
| import zipfile | |
| import platform | |
| import subprocess | |
| import logging | |
| from pathlib import Path | |
| from moviepy.editor import (AudioFileClip, CompositeVideoClip, CompositeAudioClip, ImageClip, | |
| TextClip, VideoFileClip) | |
| from moviepy.audio.fx.audio_loop import audio_loop | |
| from moviepy.audio.fx.audio_normalize import audio_normalize | |
| import requests | |
| from requests.adapters import HTTPAdapter | |
| from urllib3.util.retry import Retry | |
| logger = logging.getLogger(__name__) | |
| def download_file(url, filename, max_retries=3, timeout=30): | |
| """Download a file with retry logic and timeout | |
| Args: | |
| url (str): URL to download from | |
| filename (str): Path to save file to | |
| max_retries (int): Number of retries on failure | |
| timeout (int): Timeout in seconds per request | |
| """ | |
| headers = { | |
| "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36" | |
| } | |
| # Create session with retry strategy | |
| session = requests.Session() | |
| retry_strategy = Retry( | |
| total=max_retries, | |
| backoff_factor=1, | |
| status_forcelist=[429, 500, 502, 503, 504], | |
| allowed_methods=["GET"] | |
| ) | |
| adapter = HTTPAdapter(max_retries=retry_strategy) | |
| session.mount("http://", adapter) | |
| session.mount("https://", adapter) | |
| try: | |
| # Stream response to handle large files better | |
| response = session.get(url, headers=headers, timeout=timeout, stream=True) | |
| response.raise_for_status() | |
| # Write file with proper error handling | |
| with open(filename, 'wb') as f: | |
| total_size = int(response.headers.get('content-length', 0)) | |
| downloaded = 0 | |
| chunk_size = 8192 | |
| for chunk in response.iter_content(chunk_size=chunk_size): | |
| if chunk: # filter out keep-alive new chunks | |
| f.write(chunk) | |
| downloaded += len(chunk) | |
| if total_size: | |
| pct = (downloaded / total_size) * 100 | |
| logger.debug(f"Downloaded {downloaded}/{total_size} bytes ({pct:.1f}%)") | |
| # Verify file was written | |
| if os.path.getsize(filename) == 0: | |
| raise Exception("Downloaded file is empty") | |
| logger.info(f"Successfully downloaded {filename}") | |
| except requests.exceptions.Timeout: | |
| if os.path.exists(filename): | |
| os.remove(filename) | |
| raise Exception(f"Download timeout: {url}") | |
| except requests.exceptions.ConnectionError as e: | |
| if os.path.exists(filename): | |
| os.remove(filename) | |
| raise Exception(f"Connection error downloading video: {str(e)}") | |
| except Exception as e: | |
| if os.path.exists(filename): | |
| os.remove(filename) | |
| raise Exception(f"Failed to download {url}: {str(e)}") | |
| def search_program(program_name): | |
| try: | |
| search_cmd = "where" if platform.system() == "Windows" else "which" | |
| return subprocess.check_output([search_cmd, program_name]).decode().strip() | |
| except subprocess.CalledProcessError: | |
| return None | |
| def get_program_path(program_name): | |
| program_path = search_program(program_name) | |
| return program_path | |
| def get_output_media(audio_file_path, timed_captions, background_video_data, video_server): | |
| """Generate final video with audio and captions | |
| Args: | |
| audio_file_path (str): Path to audio file | |
| timed_captions (list): List of timed captions | |
| background_video_data (list): List of background video data | |
| video_server (str): Video server URL | |
| Returns: | |
| str: Path to output video file | |
| Raises: | |
| Exception: If video rendering fails | |
| """ | |
| OUTPUT_FILE_NAME = "rendered_video.mp4" | |
| from utility.conf import IMAGEMAGICK_BINARY | |
| from moviepy.config import change_settings | |
| try: | |
| # Validate input files | |
| if not Path(audio_file_path).exists(): | |
| raise FileNotFoundError(f"Audio file not found at {audio_file_path}") | |
| try: | |
| change_settings({"IMAGEMAGICK_BINARY": IMAGEMAGICK_BINARY}) | |
| logger.info(f"Using ImageMagick from: {IMAGEMAGICK_BINARY}") | |
| except Exception as e: | |
| logger.error(f"Error configuring ImageMagick: {str(e)}") | |
| raise Exception(f"ImageMagick configuration failed: {str(e)}") | |
| except Exception as e: | |
| logger.error(f"Error in initial setup: {str(e)}") | |
| raise Exception(f"Initial setup failed: {str(e)}") | |
| visual_clips = [] | |
| downloaded_files = [] | |
| video_found = False | |
| for (t1, t2), video_url in background_video_data: | |
| if not video_url: | |
| logger.warning(f"Skipping empty video URL for segment {t1}-{t2}s") | |
| continue | |
| if t2 <= t1: | |
| logger.warning(f"Skipping non-positive duration segment {t1}-{t2}s") | |
| continue | |
| try: | |
| video_filename = tempfile.NamedTemporaryFile(delete=False, suffix=".mp4").name | |
| logger.info(f"Downloading video from {video_url}") | |
| try: | |
| download_file(video_url, video_filename) | |
| except Exception as download_err: | |
| logger.warning(f"Failed to download video from {video_url}: {str(download_err)}") | |
| if os.path.exists(video_filename): | |
| os.remove(video_filename) | |
| logger.info(f"Skipping this video and trying next alternative...") | |
| continue | |
| downloaded_files.append(video_filename) | |
| if not Path(video_filename).exists(): | |
| logger.warning(f"Video file not created for {video_url}, trying next...") | |
| continue | |
| video_clip = VideoFileClip(video_filename) | |
| if video_clip is None: | |
| logger.warning(f"Failed to create video clip from {video_filename}, trying next...") | |
| continue | |
| clip_duration = min(video_clip.duration, max(0, t2 - t1)) | |
| if clip_duration <= 0: | |
| logger.warning(f"Video clip duration invalid for {video_url}, trying next...") | |
| continue | |
| video_clip = video_clip.subclip(0, clip_duration).set_start(t1) | |
| video_clip = video_clip.set_end(t1 + clip_duration) | |
| visual_clips.append(video_clip) | |
| video_found = True | |
| logger.info(f"Added video clip from {video_url} ({t1}-{t1 + clip_duration}s)") | |
| except Exception as e: | |
| logger.error(f"Error processing video {video_url}: {str(e)}") | |
| raise Exception(f"Failed to process video {video_url}: {str(e)}") | |
| audio_clips = [] | |
| try: | |
| # Verify audio file exists and is valid | |
| if not os.path.exists(audio_file_path): | |
| raise FileNotFoundError(f"Audio file not found: {audio_file_path}") | |
| audio_file_clip = AudioFileClip(audio_file_path) | |
| if audio_file_clip is None: | |
| raise ValueError(f"Failed to create audio clip from {audio_file_path}") | |
| # Normalize audio volume | |
| audio_file_clip = audio_normalize(audio_file_clip) | |
| # Verify audio duration | |
| if audio_file_clip.duration <= 0: | |
| raise ValueError("Audio file has zero or negative duration") | |
| audio_clips.append(audio_file_clip) | |
| logger.info(f"Added audio clip from {audio_file_path} (duration: {audio_file_clip.duration:.2f}s)") | |
| except Exception as e: | |
| logger.error(f"Error processing audio: {str(e)}") | |
| raise Exception(f"Failed to process audio: {str(e)}") | |
| for (t1, t2), text in timed_captions: | |
| try: | |
| # Try PIL method first to avoid ImageMagick security policy issues | |
| try: | |
| text_clip = TextClip( | |
| txt=text, | |
| fontsize=60, | |
| color="white", | |
| method="label", | |
| font="DejaVu-Sans-Bold" | |
| ) | |
| text_clip = text_clip.set_start(t1).set_end(t2).set_position(('center','bottom')) | |
| visual_clips.append(text_clip) | |
| logger.info(f"Added text clip: {text} ({t1}-{t2}s)") | |
| except Exception as pil_err: | |
| logger.warning(f"PIL text rendering failed, skipping captions: {str(pil_err)}") | |
| # Skip captions if PIL fails to avoid blocking video generation | |
| pass | |
| except Exception as e: | |
| logger.warning(f"Skipping text clip due to error: {str(e)}") | |
| try: | |
| if not video_found: | |
| raise ValueError("No background videos available for rendering") | |
| video = CompositeVideoClip(visual_clips) | |
| if audio_clips: | |
| audio = CompositeAudioClip(audio_clips) | |
| # Ensure video duration matches audio and update video with audio properly | |
| if video.duration < audio.duration: | |
| last_clip = visual_clips[-1] | |
| extended_clip = last_clip.set_end(audio.duration) | |
| visual_clips[-1] = extended_clip | |
| video = CompositeVideoClip(visual_clips) | |
| video = video.set_duration(audio.duration) | |
| # Updated audio application using set_audio | |
| video = video.set_audio(audio) | |
| logger.info(f"Audio synchronized with video (duration: {video.duration:.2f}s)") | |
| logger.info(f"Rendering final video to {OUTPUT_FILE_NAME}") | |
| video.write_videofile(OUTPUT_FILE_NAME, codec='libx264', audio_codec='aac', fps=25, preset='veryfast') | |
| # Clean up downloaded files | |
| for video_filename in downloaded_files: | |
| if Path(video_filename).exists(): | |
| os.remove(video_filename) | |
| logger.info(f"Cleaned up temporary file: {video_filename}") | |
| if not Path(OUTPUT_FILE_NAME).exists(): | |
| raise FileNotFoundError(f"Failed to create output video at {OUTPUT_FILE_NAME}") | |
| logger.info(f"Successfully rendered video at {OUTPUT_FILE_NAME}") | |
| return OUTPUT_FILE_NAME | |
| except Exception as e: | |
| logger.error(f"Error rendering video: {str(e)}") | |
| raise Exception(f"Video rendering failed: {str(e)}") | |