Spaces:
Runtime error
Runtime error
| import os | |
| from datetime import datetime | |
| import random | |
| import whisper | |
| import shutil | |
| import wave | |
| import base64 | |
| from moviepy.editor import (VideoFileClip, AudioFileClip, TextClip, | |
| concatenate_videoclips, CompositeVideoClip, CompositeAudioClip, ImageClip) | |
| import moviepy.audio.fx.all as afx | |
| import moviepy.video.fx.all as vfx | |
| import gradio as gr | |
| from PIL import Image, ImageDraw, ImageFont | |
| import numpy as np | |
| from functools import lru_cache | |
| import urllib.request | |
| from google import genai | |
| from google.genai import types | |
| # CHANGED: Create local directories instead of Google Drive paths | |
| os.makedirs('video_clips', exist_ok=True) | |
| os.makedirs('background_music', exist_ok=True) | |
| os.makedirs('voice_over', exist_ok=True) | |
| os.makedirs('exports', exist_ok=True) | |
| # CHANGED: Get API key from environment variable (secure method) | |
| GOOGLE_API_KEY = os.getenv('GOOGLE_API_KEY', '') | |
| if GOOGLE_API_KEY: | |
| os.environ['GOOGLE_API_KEY'] = GOOGLE_API_KEY | |
| generation_cancelled = False | |
| current_video_clip = None | |
| AVAILABLE_VOICES = { | |
| "Puck": {"name": "Puck", "description": "Young adult female (US)"}, | |
| "Charon": {"name": "Charon", "description": "Young adult male (US)"}, | |
| "Kore": {"name": "Kore", "description": "Young adult female (US)"}, | |
| "Fenrir": {"name": "Fenrir", "description": "Young adult male (US)"}, | |
| "Aoede": {"name": "Aoede", "description": "Young adult female (US)"} | |
| } | |
| def wave_file(filename, pcm_data, channels=1, rate=24000, sample_width=2): | |
| with wave.open(filename, "wb") as wf: | |
| wf.setnchannels(channels) | |
| wf.setsampwidth(sample_width) | |
| wf.setframerate(rate) | |
| wf.writeframes(pcm_data) | |
| def generate_tts_audio(text_input, voice_name="Puck"): | |
| global generation_cancelled | |
| try: | |
| if generation_cancelled: | |
| return None, "Generation cancelled" | |
| client = genai.Client() | |
| response = client.models.generate_content( | |
| model="gemini-2.5-flash-preview-tts", | |
| contents=text_input, | |
| config=types.GenerateContentConfig( | |
| response_modalities=["AUDIO"], | |
| speech_config=types.SpeechConfig( | |
| voice_config=types.VoiceConfig( | |
| prebuilt_voice_config=types.PrebuiltVoiceConfig(voice_name=voice_name) | |
| ) | |
| ), | |
| ) | |
| ) | |
| if generation_cancelled: | |
| return None, "Generation cancelled" | |
| audio_data = response.candidates[0].content.parts[0].inline_data.data | |
| if isinstance(audio_data, str): | |
| audio_data = base64.b64decode(audio_data) | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| temp_audio_path = f'/tmp/tts_audio_{timestamp}.wav' | |
| wave_file(temp_audio_path, audio_data) | |
| return temp_audio_path, "TTS generated" | |
| except Exception as e: | |
| return None, f"Error: {str(e)}" | |
| def split_text_into_lines(data): | |
| MaxChars, MaxDuration, MaxGap = 60, 2.5, 1.5 | |
| subtitles, line, line_duration = [], [], 0 | |
| for idx, word_data in enumerate(data): | |
| line.append(word_data) | |
| line_duration += word_data["end"] - word_data["start"] | |
| chars_exceeded = len(" ".join(item["word"] for item in line)) > MaxChars | |
| duration_exceeded = line_duration > MaxDuration | |
| sentence_ended = word_data["word"].rstrip().endswith(('.', '!', '?')) | |
| maxgap_exceeded = idx > 0 and word_data['start'] - data[idx-1]['end'] > MaxGap | |
| if chars_exceeded or duration_exceeded or sentence_ended or maxgap_exceeded: | |
| if line: | |
| subtitles.append({ | |
| "word": " ".join(item["word"] for item in line), | |
| "start": line[0]["start"], | |
| "end": line[-1]["end"], | |
| "textcontents": line | |
| }) | |
| line, line_duration = [], 0 | |
| if line: | |
| subtitles.append({ | |
| "word": " ".join(item["word"] for item in line), | |
| "start": line[0]["start"], | |
| "end": line[-1]["end"], | |
| "textcontents": line | |
| }) | |
| return subtitles | |
| def get_cached_text_clip(text, font, fontsize, color): | |
| return TextClip(text, font=font, fontsize=fontsize, color=color) | |
| def create_title_overlay(title_text, framesize, duration=4): | |
| if not title_text or not title_text.strip(): | |
| return [] | |
| frame_width, frame_height = framesize | |
| FONT_URL = "https://github.com/google/fonts/raw/main/ofl/poppins/Poppins-Bold.ttf" | |
| FONT_PATH = "/tmp/Poppins-Bold.ttf" | |
| if not os.path.exists(FONT_PATH): | |
| try: | |
| urllib.request.urlretrieve(FONT_URL, FONT_PATH) | |
| except: | |
| FONT_PATH = None | |
| TOP_MARGIN = int(frame_height * 0.115) | |
| FONT_SIZE = int(frame_height * 0.042) | |
| STROKE_WIDTH = max(1, int(frame_height * 0.003)) | |
| LINE_SPACING = max(4, int(frame_height * 0.008)) | |
| def load_font(size): | |
| try: | |
| if FONT_PATH and os.path.exists(FONT_PATH): | |
| return ImageFont.truetype(FONT_PATH, size) | |
| return ImageFont.truetype("/usr/share/fonts/truetype/liberation/LiberationSans-Bold.ttf", size) | |
| except: | |
| return ImageFont.load_default() | |
| font_obj = load_font(FONT_SIZE) | |
| base = Image.new("RGBA", (frame_width, frame_height), (0, 0, 0, 0)) | |
| temp_img = Image.new("RGBA", (frame_width, frame_height), (0,0,0,0)) | |
| temp_draw = ImageDraw.Draw(temp_img) | |
| def measure_text(text, font): | |
| try: | |
| bbox = temp_draw.textbbox((0,0), text, font=font, stroke_width=STROKE_WIDTH) | |
| return bbox[2]-bbox[0], bbox[3]-bbox[1] | |
| except: | |
| return 100, 50 | |
| def wrap_text(text, font, max_width): | |
| words = text.upper().split() | |
| lines, current = [], [] | |
| for word in words: | |
| test_line = " ".join(current + [word]) | |
| w, _ = measure_text(test_line, font) | |
| if w <= max_width: | |
| current.append(word) | |
| else: | |
| if current: | |
| lines.append(" ".join(current)) | |
| current = [word] | |
| else: | |
| lines.append(word) | |
| current = [] | |
| if current: | |
| lines.append(" ".join(current)) | |
| return lines[:4] | |
| lines = wrap_text(title_text, font_obj, frame_width * 0.90) | |
| line_heights = [measure_text(line, font_obj)[1] for line in lines] | |
| y_start = TOP_MARGIN | |
| x_center = frame_width // 2 | |
| draw = ImageDraw.Draw(base) | |
| y = y_start | |
| for i, line in enumerate(lines): | |
| w, h = measure_text(line, font_obj) | |
| x = x_center - w // 2 | |
| draw.text((x+2, y+2), line, font=font_obj, fill=(0,0,0,180)) | |
| draw.text((x, y), line, font=font_obj, fill=(255,255,255,255), stroke_width=STROKE_WIDTH, stroke_fill=(0,0,0,255)) | |
| y += line_heights[i] + LINE_SPACING | |
| return [ImageClip(np.array(base), duration=duration)] | |
| def create_caption(textJSON, framesize, font="Helvetica-Bold", fontsize=14, color='white'): | |
| full_duration = textJSON['end'] - textJSON['start'] | |
| word_clips = [] | |
| xy_textclips_positions = [] | |
| frame_width, frame_height = framesize | |
| max_line_width = frame_width * 0.8 | |
| lines, current_line, current_line_width = [], [], 0 | |
| for wordJSON in textJSON['textcontents']: | |
| word_upper = wordJSON['word'].upper() | |
| temp_word = get_cached_text_clip(word_upper, font, fontsize, color) | |
| temp_space = get_cached_text_clip(" ", font, fontsize, color) | |
| word_width, word_height = temp_word.size | |
| space_width, _ = temp_space.size | |
| if current_line_width + word_width + space_width > max_line_width and current_line: | |
| lines.append({'words': current_line.copy(), 'width': current_line_width, 'height': word_height}) | |
| current_line = [wordJSON] | |
| current_line_width = word_width + space_width | |
| else: | |
| current_line.append(wordJSON) | |
| current_line_width += word_width + space_width | |
| if current_line: | |
| word_upper = current_line[0]['word'].upper() | |
| temp_word = get_cached_text_clip(word_upper, font, fontsize, color) | |
| _, word_height = temp_word.size | |
| lines.append({'words': current_line, 'width': current_line_width, 'height': word_height}) | |
| total_text_height = sum(line['height'] for line in lines) + (len(lines) - 1) * 3 | |
| subtitle_y_position = int(frame_height * 0.65) | |
| current_y = subtitle_y_position | |
| if lines: | |
| shadow_padding = 25 | |
| shadow_height_extra = 15 | |
| total_subtitle_width = max(line['width'] for line in lines) | |
| bg_width = int(total_subtitle_width + shadow_padding * 2) | |
| bg_height = int(total_text_height + shadow_height_extra * 2) | |
| img = Image.new('RGBA', (bg_width, bg_height), (0, 0, 0, 0)) | |
| draw = ImageDraw.Draw(img) | |
| draw.rounded_rectangle([(0, 0), (bg_width-1, bg_height-1)], radius=15, fill=(0, 0, 0, 128)) | |
| img_array = np.array(img) | |
| shadow_bg = ImageClip(img_array, duration=full_duration).set_start(textJSON['start']) | |
| shadow_x = (frame_width - total_subtitle_width) / 2 - shadow_padding | |
| shadow_y = subtitle_y_position - shadow_height_extra | |
| shadow_bg = shadow_bg.set_position((shadow_x, shadow_y)) | |
| word_clips.append(shadow_bg) | |
| for line in lines: | |
| line_words = line['words'] | |
| word_dimensions = [] | |
| for wordJSON in line_words: | |
| word_upper = wordJSON['word'].upper() | |
| temp_word = get_cached_text_clip(word_upper, font, fontsize, color) | |
| temp_space = get_cached_text_clip(" ", font, fontsize, color) | |
| word_width, word_height = temp_word.size | |
| space_width, _ = temp_space.size | |
| word_dimensions.append({ | |
| 'word_data': wordJSON, | |
| 'word_width': word_width, | |
| 'word_height': word_height, | |
| 'space_width': space_width, | |
| 'word_upper': word_upper | |
| }) | |
| line_start_x = (frame_width - line['width']) / 2 | |
| current_x = line_start_x | |
| for word_dim in word_dimensions: | |
| wordJSON = word_dim['word_data'] | |
| word_width = word_dim['word_width'] | |
| word_height = word_dim['word_height'] | |
| space_width = word_dim['space_width'] | |
| word_upper = word_dim['word_upper'] | |
| shadow_text = get_cached_text_clip(word_upper, font, fontsize, 'black') | |
| shadow_text = shadow_text.set_start(textJSON['start']).set_duration(full_duration) | |
| shadow_text = shadow_text.set_position((current_x + 1, current_y + 1)).set_opacity(0.3) | |
| word_clips.append(shadow_text) | |
| word_clip = get_cached_text_clip(word_upper, font, fontsize, color) | |
| word_clip = word_clip.set_start(textJSON['start']).set_duration(full_duration) | |
| word_clip = word_clip.set_position((current_x, current_y)) | |
| space_clip = get_cached_text_clip(" ", font, fontsize, color) | |
| space_clip = space_clip.set_start(textJSON['start']).set_duration(full_duration) | |
| space_clip = space_clip.set_position((current_x + word_width, current_y)) | |
| xy_textclips_positions.append({ | |
| "x_pos": current_x, | |
| "y_pos": current_y, | |
| "width": word_width, | |
| "height": word_height, | |
| "word": word_upper, | |
| "start": wordJSON['start'], | |
| "end": wordJSON['end'], | |
| "duration": wordJSON['end'] - wordJSON['start'] | |
| }) | |
| word_clips.append(word_clip) | |
| word_clips.append(space_clip) | |
| current_x += word_width + space_width | |
| current_y += line['height'] + 3 | |
| for highlight_word in xy_textclips_positions: | |
| bg_width = int(highlight_word['width'] + 16) | |
| bg_height = int(highlight_word['height'] + 8) | |
| img = Image.new('RGBA', (bg_width, bg_height), (0, 0, 0, 0)) | |
| draw = ImageDraw.Draw(img) | |
| draw.rounded_rectangle([(0, 0), (bg_width-1, bg_height-1)], radius=8, fill=(147, 0, 211, 180)) | |
| img_array = np.array(img) | |
| bg_clip = ImageClip(img_array, duration=highlight_word['duration']) | |
| bg_clip = bg_clip.set_start(highlight_word['start']) | |
| bg_x = highlight_word['x_pos'] - 8 | |
| bg_y = highlight_word['y_pos'] - 4 | |
| bg_clip = bg_clip.set_position((bg_x, bg_y)) | |
| shadow_highlight = get_cached_text_clip(highlight_word['word'], font, fontsize, 'black') | |
| shadow_highlight = shadow_highlight.set_start(highlight_word['start']).set_duration(highlight_word['duration']) | |
| shadow_highlight = shadow_highlight.set_position((highlight_word['x_pos'] + 1, highlight_word['y_pos'] + 1)).set_opacity(0.4) | |
| word_clip_highlight = get_cached_text_clip(highlight_word['word'], font, fontsize, 'white') | |
| word_clip_highlight = word_clip_highlight.set_start(highlight_word['start']).set_duration(highlight_word['duration']) | |
| word_clip_highlight = word_clip_highlight.set_position((highlight_word['x_pos'], highlight_word['y_pos'])) | |
| word_clips.append(bg_clip) | |
| word_clips.append(shadow_highlight) | |
| word_clips.append(word_clip_highlight) | |
| return word_clips | |
| def get_random_subclip_and_slow(clip): | |
| subclip_durations = [2, 3, 4] | |
| subclip_duration = random.choice(subclip_durations) | |
| if clip.duration < subclip_duration: | |
| return clip.speedx(0.5) | |
| start_time = random.uniform(0, clip.duration - subclip_duration) | |
| subclip = clip.subclip(start_time, start_time + subclip_duration) | |
| return subclip.speedx(0.5) | |
| def ensure_even_dimensions(clip): | |
| width, height = clip.size | |
| if width % 2 != 0: | |
| width -= 1 | |
| if height % 2 != 0: | |
| height -= 1 | |
| if (width, height) != clip.size: | |
| return clip.resize((width, height)) | |
| return clip | |
| def apply_transition_effect(clip1, clip2, transition_type, duration=0.5): | |
| if transition_type == "Smooth Blend": | |
| return clip1.crossfadeout(duration), clip2.crossfadein(duration) | |
| elif transition_type == "Ken Burns Zoom": | |
| def zoom_in(t): | |
| return 1 + (0.15 * min(t / clip1.duration, 1)) | |
| clip1_zoom = clip1.resize(zoom_in) | |
| clip1_out = clip1_zoom.crossfadeout(duration) | |
| def zoom_out(t): | |
| return 1.15 - (0.15 * min(t / duration, 1)) | |
| clip2_zoom = clip2.resize(zoom_out) if clip2.duration >= duration else clip2 | |
| clip2_in = clip2_zoom.crossfadein(duration) | |
| return clip1_out, clip2_in | |
| elif transition_type == "Whip Pan": | |
| return clip1.fadeout(duration * 0.5), clip2.fadein(duration * 0.5) | |
| elif transition_type == "Dreamy Fade": | |
| return clip1.crossfadeout(duration * 1.2), clip2.crossfadein(duration * 1.2) | |
| elif transition_type == "Snap Cut": | |
| return clip1, clip2 | |
| else: | |
| return clip1.crossfadeout(duration), clip2.crossfadein(duration) | |
| def process_voiceover_to_subtitles(voice_over_path): | |
| global generation_cancelled | |
| try: | |
| if generation_cancelled: | |
| return [], "" | |
| model = whisper.load_model("tiny") | |
| result = model.transcribe(voice_over_path, word_timestamps=True, fp16=False) | |
| if generation_cancelled: | |
| return [], "" | |
| wordlevel_info = [] | |
| for segment in result['segments']: | |
| if generation_cancelled: | |
| return [], "" | |
| if 'words' in segment: | |
| for word in segment['words']: | |
| wordlevel_info.append({'word': word['word'].strip(), 'start': word['start'], 'end': word['end']}) | |
| return split_text_into_lines(wordlevel_info), result['text'] | |
| except Exception as e: | |
| if generation_cancelled: | |
| return [], "" | |
| raise e | |
| def cleanup_resources(): | |
| global current_video_clip | |
| try: | |
| if current_video_clip: | |
| current_video_clip.close() | |
| current_video_clip = None | |
| except: | |
| pass | |
| def cancel_generation(): | |
| global generation_cancelled | |
| generation_cancelled = True | |
| cleanup_resources() | |
| return "Generation cancelled", None | |
| def merge_videos_with_subtitles(text_input, voice_selection, audio_input, title_text, duration_minutes, video_quality, transition_type, progress=gr.Progress(track_tqdm=True)): | |
| global generation_cancelled, current_video_clip | |
| generation_cancelled = False | |
| current_video_clip = None | |
| progress(0, desc="Starting...") | |
| if generation_cancelled: | |
| return None, "Generation cancelled" | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| # CHANGED: Use local paths instead of Google Drive | |
| source_path = 'video_clips' | |
| if not os.path.isdir(source_path): | |
| return None, "Video clips folder not found" | |
| output_path = 'exports' | |
| os.makedirs(output_path, exist_ok=True) | |
| video_extensions = ('.mp4', '.avi', '.mkv', '.mov') | |
| all_files = [f for f in os.listdir(source_path) if f.lower().endswith(video_extensions)] | |
| if not all_files: | |
| return None, "No video files found" | |
| random.shuffle(all_files) | |
| if generation_cancelled: | |
| return None, "Generation cancelled" | |
| bg_music_path = None | |
| # CHANGED: Use local background_music folder | |
| bg_music_folder_path = 'background_music' | |
| if os.path.isdir(bg_music_folder_path): | |
| audio_extensions = ('.mp3', '.wav', '.m4a', '.aac') | |
| possible_files = [f for f in os.listdir(bg_music_folder_path) if f.lower().endswith(audio_extensions) and not f.startswith('voiceover_')] | |
| if len(possible_files) >= 1: | |
| bg_music_path = os.path.join(bg_music_folder_path, possible_files[0]) | |
| target_duration_seconds = 0 | |
| voice_over_audio = None | |
| linelevel_subtitles = None | |
| voice_over_path = None | |
| if text_input and text_input.strip(): | |
| progress(0.1, desc="Generating TTS...") | |
| voice_name = AVAILABLE_VOICES[voice_selection]["name"] if voice_selection in AVAILABLE_VOICES else "Puck" | |
| tts_path, tts_message = generate_tts_audio(text_input, voice_name) | |
| if generation_cancelled: | |
| return None, "Generation cancelled" | |
| if tts_path: | |
| # CHANGED: Use local voice_over folder | |
| voice_over_folder_path = 'voice_over' | |
| os.makedirs(voice_over_folder_path, exist_ok=True) | |
| voice_filename = f"tts_voiceover_{timestamp}.wav" | |
| saved_voice_path = os.path.join(voice_over_folder_path, voice_filename) | |
| shutil.copy2(tts_path, saved_voice_path) | |
| voice_over_path = saved_voice_path | |
| else: | |
| return None, f"TTS failed: {tts_message}" | |
| elif audio_input: | |
| if generation_cancelled: | |
| return None, "Generation cancelled" | |
| voice_over_folder_path = 'voice_over' | |
| os.makedirs(voice_over_folder_path, exist_ok=True) | |
| voice_filename = f"uploaded_voiceover_{timestamp}.mp3" | |
| saved_voice_path = os.path.join(voice_over_folder_path, voice_filename) | |
| shutil.copy2(audio_input, saved_voice_path) | |
| voice_over_path = saved_voice_path | |
| if voice_over_path: | |
| try: | |
| progress(0.2, desc="Processing audio...") | |
| if generation_cancelled: | |
| return None, "Generation cancelled" | |
| voice_over_audio = AudioFileClip(voice_over_path) | |
| target_duration_seconds = voice_over_audio.duration | |
| linelevel_subtitles, _ = process_voiceover_to_subtitles(voice_over_path) | |
| if generation_cancelled: | |
| voice_over_audio.close() | |
| return None, "Generation cancelled" | |
| except Exception as e: | |
| return None, f"Audio error: {str(e)}" | |
| else: | |
| if not bg_music_path: | |
| return None, "Need text/audio or background music" | |
| target_duration_seconds = duration_minutes * 60 | |
| progress(0.3, desc="Preparing audio...") | |
| if generation_cancelled: | |
| if voice_over_audio: | |
| voice_over_audio.close() | |
| return None, "Generation cancelled" | |
| audio_tracks = [] | |
| if voice_over_audio: | |
| audio_tracks.append(voice_over_audio) | |
| if bg_music_path: | |
| try: | |
| background_audio = AudioFileClip(bg_music_path) | |
| # CHANGED: Increased volume from 0.015 to 0.10 (louder background music) | |
| background_audio = background_audio.fx(afx.volumex, 0.10) | |
| background_audio = background_audio.fx(afx.audio_loop, duration=target_duration_seconds) | |
| audio_tracks.append(background_audio) | |
| except Exception as e: | |
| print(f"Background music error: {e}") | |
| final_audio = CompositeAudioClip(audio_tracks) if len(audio_tracks) > 1 else (audio_tracks[0] if audio_tracks else None) | |
| progress(0.4, desc="Setting up video...") | |
| if generation_cancelled: | |
| cleanup_resources() | |
| return None, "Generation cancelled" | |
| if video_quality == "High": | |
| target_height, bitrate, preset, crf = 1080, "8000k", "veryfast", "20" | |
| elif video_quality == "Standard": | |
| target_height, bitrate, preset, crf = 720, "4000k", "veryfast", "24" | |
| else: | |
| target_height, bitrate, preset, crf = 480, "1000k", "ultrafast", "28" | |
| progress(0.5, desc="Processing clips...") | |
| video_clips = [] | |
| current_duration = 0 | |
| file_index = 0 | |
| safety_counter = 0 | |
| max_iterations = len(all_files) * 3 | |
| while current_duration < target_duration_seconds and safety_counter < max_iterations: | |
| if generation_cancelled: | |
| for clip in video_clips: | |
| try: | |
| clip.close() | |
| except: | |
| pass | |
| cleanup_resources() | |
| return None, "Generation cancelled" | |
| if file_index >= len(all_files): | |
| file_index = 0 | |
| random.shuffle(all_files) | |
| video_file = all_files[file_index] | |
| file_index += 1 | |
| safety_counter += 1 | |
| try: | |
| full_clip = VideoFileClip(os.path.join(source_path, video_file)) | |
| current_video_clip = full_clip | |
| if generation_cancelled: | |
| full_clip.close() | |
| cleanup_resources() | |
| return None, "Generation cancelled" | |
| if full_clip.h != target_height: | |
| aspect_ratio = full_clip.w / full_clip.h | |
| new_width = int(target_height * aspect_ratio) | |
| if new_width % 2 != 0: | |
| new_width -= 1 | |
| adjusted_height = target_height if target_height % 2 == 0 else target_height - 1 | |
| full_clip = full_clip.resize((new_width, adjusted_height)) | |
| else: | |
| full_clip = ensure_even_dimensions(full_clip) | |
| subclip = get_random_subclip_and_slow(full_clip) | |
| remaining_duration = target_duration_seconds - current_duration | |
| if subclip.duration > remaining_duration: | |
| subclip = subclip.subclip(0, remaining_duration) | |
| video_clips.append(ensure_even_dimensions(subclip)) | |
| current_duration += subclip.duration | |
| progress(0.5 + (safety_counter * 0.1 / max_iterations), desc=f"Clip {len(video_clips)}") | |
| except Exception as e: | |
| print(f"Error: {e}") | |
| continue | |
| if generation_cancelled: | |
| for clip in video_clips: | |
| try: | |
| clip.close() | |
| except: | |
| pass | |
| cleanup_resources() | |
| return None, "Generation cancelled" | |
| if not video_clips: | |
| return None, "No clips processed" | |
| # FIXED: Ensure exact duration match to prevent black screens | |
| total_video_duration = sum(clip.duration for clip in video_clips) | |
| duration_diff = total_video_duration - target_duration_seconds | |
| if abs(duration_diff) > 0.1: | |
| if duration_diff > 0: | |
| trim_amount = duration_diff | |
| new_last_clip = video_clips[-1].subclip(0, video_clips[-1].duration - trim_amount) | |
| video_clips[-1] = new_last_clip | |
| else: | |
| extend_amount = abs(duration_diff) | |
| new_last_clip = video_clips[-1].fx(vfx.loop, duration=video_clips[-1].duration + extend_amount) | |
| video_clips[-1] = new_last_clip | |
| progress(0.6, desc="Applying transitions...") | |
| transition_duration = {"Snap Cut": 0.1, "Whip Pan": 0.3, "Dreamy Fade": 0.8, "Smooth Blend": 0.5, "Ken Burns Zoom": 0.5}.get(transition_type, 0.5) | |
| processed_clips = [] | |
| for i in range(len(video_clips)): | |
| if i == 0: | |
| if len(video_clips) > 1: | |
| clip_out, _ = apply_transition_effect(video_clips[i], video_clips[i+1], transition_type, transition_duration) | |
| processed_clips.append(clip_out) | |
| else: | |
| processed_clips.append(video_clips[i]) | |
| elif i == len(video_clips) - 1: | |
| _, clip_in = apply_transition_effect(video_clips[i-1], video_clips[i], transition_type, transition_duration) | |
| processed_clips.append(clip_in) | |
| else: | |
| _, clip_with_transition = apply_transition_effect(video_clips[i-1], video_clips[i], transition_type, transition_duration) | |
| processed_clips.append(clip_with_transition) | |
| progress(0.7, desc="Concatenating...") | |
| if generation_cancelled: | |
| for c in processed_clips: | |
| try: | |
| c.close() | |
| except: | |
| pass | |
| cleanup_resources() | |
| return None, "Generation cancelled" | |
| if transition_type == "Snap Cut": | |
| final_video_only = concatenate_videoclips(processed_clips, method="compose") | |
| else: | |
| final_video_only = concatenate_videoclips(processed_clips, method="compose", padding=-transition_duration) | |
| final_video_only = ensure_even_dimensions(final_video_only) | |
| current_video_clip = final_video_only | |
| # FIXED: Loop video if shorter than audio to prevent black screen | |
| if final_audio and final_video_only.duration < final_audio.duration: | |
| final_video_only = final_video_only.fx(vfx.loop, duration=final_audio.duration) | |
| progress(0.8, desc="Adding overlays...") | |
| if generation_cancelled: | |
| try: | |
| final_video_only.close() | |
| except: | |
| pass | |
| cleanup_resources() | |
| return None, "Generation cancelled" | |
| all_subtitle_clips = [] | |
| if linelevel_subtitles: | |
| for line in linelevel_subtitles: | |
| if generation_cancelled: | |
| try: | |
| final_video_only.close() | |
| except: | |
| pass | |
| cleanup_resources() | |
| return None, "Generation cancelled" | |
| try: | |
| subtitle_fontsize = min(42, final_video_only.size[1] // 25) | |
| all_subtitle_clips.extend(create_caption(line, final_video_only.size, font="Helvetica-Bold", fontsize=subtitle_fontsize, color='white')) | |
| except Exception as e: | |
| print(f"Subtitle error: {e}") | |
| continue | |
| all_clips = [final_video_only.set_opacity(0.65)] | |
| if all_subtitle_clips: | |
| all_clips.extend(all_subtitle_clips) | |
| if title_text and title_text.strip(): | |
| title_clips = create_title_overlay(title_text, final_video_only.size, duration=4) | |
| all_clips.extend(title_clips) | |
| final_video = CompositeVideoClip(all_clips) | |
| current_video_clip = final_video | |
| if final_audio: | |
| final_video = final_video.set_audio(final_audio) | |
| progress(0.9, desc="Exporting...") | |
| if generation_cancelled: | |
| try: | |
| final_video.close() | |
| except: | |
| pass | |
| cleanup_resources() | |
| return None, "Generation cancelled" | |
| output_filename = f'video_{timestamp}.mp4' | |
| final_output_path = os.path.join(output_path, output_filename) | |
| try: | |
| final_video.write_videofile( | |
| final_output_path, | |
| codec="libx264", | |
| audio_codec="aac", | |
| fps=24, | |
| preset=preset, | |
| bitrate=bitrate, | |
| audio_bitrate="128k", | |
| threads=8, | |
| ffmpeg_params=["-crf", crf, "-pix_fmt", "yuv420p", "-movflags", "+faststart", "-tune", "fastdecode"] | |
| ) | |
| except Exception as e: | |
| if generation_cancelled: | |
| return None, "Generation cancelled" | |
| return None, f"Export error: {str(e)}" | |
| progress(1.0, desc="Done") | |
| if generation_cancelled: | |
| try: | |
| if os.path.exists(final_output_path): | |
| os.remove(final_output_path) | |
| except: | |
| pass | |
| cleanup_resources() | |
| return None, "Generation cancelled" | |
| try: | |
| final_video.close() | |
| if voice_over_audio: | |
| voice_over_audio.close() | |
| current_video_clip = None | |
| except: | |
| pass | |
| audio_source = "" | |
| if text_input and text_input.strip(): | |
| audio_source = f"TTS ({AVAILABLE_VOICES[voice_selection]['name'] if voice_selection in AVAILABLE_VOICES else 'Puck'})" | |
| elif voice_over_path: | |
| audio_source = "Uploaded Audio" | |
| else: | |
| audio_source = "Background Music" | |
| summary = f"Complete\n{output_filename}\n{audio_source}\n{transition_type}\n{target_duration_seconds:.1f}s\n{len(linelevel_subtitles) if linelevel_subtitles else 0} subtitles" | |
| return final_output_path, summary | |
| # CHANGED: Removed share=True and debug=True for production | |
| with gr.Blocks(title="Video Generator", theme=gr.themes.Soft()) as interface: | |
| gr.Markdown("# 🎬 AI Video Generator") | |
| gr.Markdown("Upload video clips to `video_clips` folder and optionally background music to `background_music` folder.") | |
| with gr.Row(): | |
| with gr.Column(): | |
| text_input = gr.Textbox(label="Text for TTS", lines=4, placeholder="Enter text to convert to speech...") | |
| voice_dropdown = gr.Dropdown( | |
| choices=[(f"{v['name']} - {v['description']}", k) for k, v in AVAILABLE_VOICES.items()], | |
| value="Puck", | |
| label="Voice Selection" | |
| ) | |
| audio_input = gr.Audio(type="filepath", label="Or Upload Audio File") | |
| title_input = gr.Textbox(label="Video Title (Optional)", lines=2, placeholder="Enter video title...") | |
| duration_slider = gr.Slider(0.5, 10, 2, 0.5, label="Duration (minutes) - only used if no audio") | |
| quality_radio = gr.Radio(["High", "Standard", "Preview"], value="High", label="Video Quality") | |
| transition_radio = gr.Radio( | |
| ["Smooth Blend", "Ken Burns Zoom", "Whip Pan", "Dreamy Fade", "Snap Cut"], | |
| value="Smooth Blend", | |
| label="Transition Effect" | |
| ) | |
| with gr.Row(): | |
| submit_btn = gr.Button("🎥 Generate Video", variant="primary", size="lg") | |
| stop_btn = gr.Button("⏹️ Stop", variant="stop", size="lg") | |
| with gr.Column(): | |
| video_output = gr.Video(label="Generated Video") | |
| summary_output = gr.Textbox(label="Status", lines=8) | |
| submit_btn.click( | |
| fn=merge_videos_with_subtitles, | |
| inputs=[text_input, voice_dropdown, audio_input, title_input, duration_slider, quality_radio, transition_radio], | |
| outputs=[video_output, summary_output] | |
| ) | |
| stop_btn.click(fn=cancel_generation, outputs=[summary_output, video_output]) | |
| # CHANGED: Updated launch settings for Hugging Face | |
| if __name__ == "__main__": | |
| interface.launch( | |
| server_name="0.0.0.0", | |
| server_port=7860, | |
| show_error=True | |
| ) |