#!/usr/bin/env python3 """ Standalone Video Creator Automated video generation using extend video flow with one-time input. No frontend required - everything runs from this single script. """ import os import sys import time import json import asyncio import httpx from pathlib import Path from typing import List, Dict, Any, Optional from datetime import datetime from dotenv import load_dotenv # Import utilities from the existing codebase from utils.prompt_generator import ( VeoInputs, generate_segments_payload, split_script_into_segments ) # Try importing replicate (optional) try: import replicate REPLICATE_AVAILABLE = True except ImportError: REPLICATE_AVAILABLE = False # Load environment variables load_dotenv('.env.local') # Configuration KIE_API_BASE = "https://api.kie.ai" BACKEND_BASE = f"http://localhost:{os.getenv('SERVER_PORT', 4000)}" MAX_RETRIES = 3 POLLING_INTERVAL = 10 # seconds MAX_WAIT_TIME = 600 # 10 minutes per video class Colors: """Terminal colors for better output""" HEADER = '\033[95m' BLUE = '\033[94m' CYAN = '\033[96m' GREEN = '\033[92m' YELLOW = '\033[93m' RED = '\033[91m' END = '\033[0m' BOLD = '\033[1m' def print_status(message: str, color: str = Colors.CYAN): """Print colored status message""" print(f"{color}{message}{Colors.END}") def print_success(message: str): """Print success message""" print(f"{Colors.GREEN}โœ… {message}{Colors.END}") def print_error(message: str): """Print error message""" print(f"{Colors.RED}โŒ {message}{Colors.END}") def print_header(message: str): """Print header message""" print(f"\n{Colors.BOLD}{Colors.HEADER}{'='*60}{Colors.END}") print(f"{Colors.BOLD}{Colors.HEADER}{message}{Colors.END}") print(f"{Colors.BOLD}{Colors.HEADER}{'='*60}{Colors.END}\n") def get_api_key() -> str: """Get KIE API key from environment""" api_key = os.getenv('KIE_API_KEY') if not api_key: print_error("KIE_API_KEY not found in environment!") print("Please add KIE_API_KEY to .env.local file") print("Get your API key at: https://kie.ai/api-key") sys.exit(1) return api_key def get_openai_api_key() -> str: """Get OpenAI API key from environment""" api_key = os.getenv('OPENAI_API_KEY') if not api_key: print_error("OPENAI_API_KEY not found in environment!") print("Please add OPENAI_API_KEY to .env.local file") sys.exit(1) return api_key def get_replicate_api_key() -> str: """Get Replicate API key from environment""" api_key = os.getenv('REPLICATE_API_TOKEN') if not api_key: print_error("REPLICATE_API_TOKEN not found in environment!") print("Please add REPLICATE_API_TOKEN to .env.local file") print("Get your API token at: https://replicate.com/account/api-tokens") sys.exit(1) return api_key def collect_user_inputs() -> Dict[str, Any]: """Collect one-time inputs from user""" print_header("VIDEO CREATION SETUP") print("This script will generate a video using AI. You'll need to provide:") print("1. A script/text for the video") print("2. A reference image (character/scene)") print("3. Video style preferences\n") # Script input print_status("Enter your video script (press Enter twice when done):") script_lines = [] while True: line = input() if line == "" and script_lines and script_lines[-1] == "": script_lines.pop() # Remove last empty line break script_lines.append(line) script = "\n".join(script_lines).strip() if not script: print_error("Script cannot be empty!") sys.exit(1) # Image path print_status("\nEnter path to reference image:") image_path = input().strip() if not os.path.exists(image_path): print_error(f"Image file not found: {image_path}") sys.exit(1) # Style print_status("\nEnter video style (default: 'clean, lifestyle UGC'):") style = input().strip() or "clean, lifestyle UGC" # Voice type print_status("\nEnter voice type (Deep/Warm/Crisp/None, default: None):") voice_type = input().strip() or "None" # Model print_status("\nEnter video model (default: 'veo3_fast'):") model = input().strip() or "veo3_fast" # Aspect ratio print_status("\nEnter aspect ratio (16:9 or 9:16, default: '9:16'):") aspect_ratio = input().strip() or "9:16" # Camera style print_status("\nEnter camera style (default: 'handheld steadicam'):") camera_style = input().strip() or "handheld steadicam" # Provider selection print_status("\nSelect video generation provider:") print(" 1. KIE API (supports extend video flow)") print(" 2. Replicate (google/veo-3)") provider_choice = input().strip() or "1" if provider_choice == "2": if not REPLICATE_AVAILABLE: print_error("Replicate package not installed!") print("Please install it: pip install replicate") sys.exit(1) provider = "replicate" else: provider = "kie" # Seed for consistent lighting print_status("\nEnter seed for consistent lighting (optional, press Enter to skip):") seed_input = input().strip() seed = int(seed_input) if seed_input else None print_success("\nConfiguration complete!") return { 'script': script, 'image_path': image_path, 'style': style, 'voice_type': voice_type, 'model': model, 'aspect_ratio': aspect_ratio, 'camera_style': camera_style, 'seed': seed, 'provider': provider } async def generate_initial_video( prompt: Dict[str, Any], image_path: str, api_key: str, model: str = "veo3_fast", aspect_ratio: str = "9:16", voice_type: str = "None", seed: Optional[int] = None ) -> str: """ Generate the first video segment. Uses the backend server's callback endpoint for status updates. """ print_status(f"๐ŸŽฌ Generating initial video segment...") # Read image file from pathlib import Path with open(image_path, 'rb') as f: image_data = f.read() # Detect image format for upload image_ext = Path(image_path).suffix.lower() mime_type = { '.jpg': 'image/jpeg', '.jpeg': 'image/jpeg', '.png': 'image/png', '.gif': 'image/gif', '.webp': 'image/webp' }.get(image_ext, 'image/jpeg') # Get public URL and callback URL public_url = os.getenv('PUBLIC_URL', BACKEND_BASE) callback_url = f"{public_url}/api/veo/callback" # Upload image to backend so it's available to the /api/images/{id} endpoint print_status(f"๐Ÿ“ท Uploading image to backend for hosting...") async with httpx.AsyncClient(timeout=30.0) as client: upload_response = await client.post( f"{BACKEND_BASE}/api/upload-image", files={"file": ("reference_image" + image_ext, image_data, mime_type)}, ) if upload_response.status_code != 200: raise Exception(f"Image upload failed: HTTP {upload_response.status_code} - {upload_response.text}") upload_json = upload_response.json() hosted_image_url = upload_json.get("url") if not hosted_image_url: raise Exception(f"Image upload response missing URL: {upload_json}") print_success(f"Image hosted at: {hosted_image_url}") async with httpx.AsyncClient(timeout=30.0) as client: payload = { "prompt": prompt, "imageUrls": [hosted_image_url], # Use hosted URL "model": model, "aspectRatio": aspect_ratio, "generationType": "FIRST_AND_LAST_FRAMES_2_VIDEO", "enableTranslation": True, "callBackUrl": callback_url } if seed is not None: payload["seeds"] = seed if voice_type and voice_type.lower() != "none": payload["voiceType"] = voice_type # Debug: print request payload try: print_status("๐Ÿ“ฆ Initial generate payload:") print(json.dumps(payload, indent=2, ensure_ascii=False)) except Exception: # Fallback in case something isn't JSON serializable print_status(f"๐Ÿ“ฆ Initial generate payload (raw): {payload}") response = await client.post( f"{KIE_API_BASE}/api/v1/veo/generate", headers={ "Authorization": f"Bearer {api_key}", "Content-Type": "application/json" }, json=payload ) result = response.json() if result.get('code') != 200: raise Exception(f"Video generation failed: {result.get('msg')}") task_id = result['data']['taskId'] print_success(f"Initial video generation started: {task_id}") return task_id async def extend_video( task_id: str, prompt: Dict[str, Any], api_key: str, voice_type: str = "None", seed: Optional[int] = None ) -> str: """ Extend an existing video with new prompt. Uses the backend server's callback endpoint for status updates. """ print_status(f"๐ŸŽฌ Extending video from task: {task_id}") # Get public URL for callback public_url = os.getenv('PUBLIC_URL', BACKEND_BASE) callback_url = f"{public_url}/api/veo/callback" async with httpx.AsyncClient(timeout=30.0) as client: payload = { "taskId": task_id, "prompt": prompt, "callBackUrl": callback_url } if seed is not None: payload["seeds"] = seed if voice_type and voice_type.lower() != "none": payload["voiceType"] = voice_type # Debug: print request payload try: print_status("๐Ÿ“ฆ Extend payload:") print(json.dumps(payload, indent=2, ensure_ascii=False)) except Exception: print_status(f"๐Ÿ“ฆ Extend payload (raw): {payload}") response = await client.post( f"{KIE_API_BASE}/api/v1/veo/extend", headers={ "Authorization": f"Bearer {api_key}", "Content-Type": "application/json" }, json=payload ) result = response.json() if result.get('code') != 200: raise Exception(f"Video extension failed: {result.get('msg')}") new_task_id = result['data']['taskId'] print_success(f"Video extension started: {new_task_id}") return new_task_id async def wait_for_callback_result(task_id: str) -> str: """ Wait for callback result from backend server via SSE. Connects to the SSE endpoint and listens for real-time callback events. """ print_status(f"โณ Listening for video completion via SSE: {task_id}") start_time = time.time() try: async with httpx.AsyncClient(timeout=httpx.Timeout(MAX_WAIT_TIME, connect=10.0)) as client: async with client.stream( 'GET', f"{BACKEND_BASE}/api/veo/events/{task_id}" ) as response: if response.status_code != 200: raise Exception(f"Failed to connect to SSE: HTTP {response.status_code}") print_status(f"๐Ÿ”Œ Connected to SSE stream") async for line in response.aiter_lines(): # Check timeout if time.time() - start_time > MAX_WAIT_TIME: raise Exception(f"Video generation timed out after {MAX_WAIT_TIME}s") # Parse SSE data if line.startswith('data: '): data_str = line[6:] # Remove "data: " prefix try: data = json.loads(data_str) status = data.get('status') if status == 'succeeded': video_url = data.get('url') if video_url: elapsed = int(time.time() - start_time) print_success(f"Video completed in {elapsed}s: {task_id}") return video_url elif status == 'failed': error = data.get('error', 'Unknown error') raise Exception(f"Video generation failed: {error}") except json.JSONDecodeError: continue # Skip invalid JSON except httpx.TimeoutException: raise Exception(f"Video generation timed out after {MAX_WAIT_TIME}s") except Exception as e: if "timed out" in str(e).lower(): raise print_error(f"SSE connection error, falling back to polling: {str(e)}") # Fallback to simple polling if SSE fails return await poll_fallback(task_id) async def poll_fallback(task_id: str) -> str: """Fallback polling method if SSE fails""" print_status(f"โณ Polling for video completion: {task_id}") start_time = time.time() async with httpx.AsyncClient(timeout=40.0) as client: while time.time() - start_time < MAX_WAIT_TIME: try: response = await client.get( f"{BACKEND_BASE}/api/veo/status/{task_id}" ) if response.status_code == 200: result = response.json() status = result.get('status') if status == 'succeeded': video_url = result.get('url') if video_url: print_success(f"Video completed: {task_id}") return video_url elif status == 'failed': error = result.get('error', 'Unknown error') raise Exception(f"Video generation failed: {error}") await asyncio.sleep(POLLING_INTERVAL) print(f" Still processing... ({int(time.time() - start_time)}s)") except httpx.HTTPError as e: print_error(f"Error checking status: {str(e)}") await asyncio.sleep(POLLING_INTERVAL) raise Exception(f"Video generation timed out after {MAX_WAIT_TIME}s") def convert_segment_to_text_prompt(segment: Dict[str, Any]) -> str: """ Convert structured JSON segment to a comprehensive text prompt for Replicate. Replicate's Veo-3 expects a plain text prompt, not structured JSON. """ # Extract key information from structured segment action_timeline = segment.get('action_timeline', {}) dialogue = action_timeline.get('dialogue', '') character = segment.get('character_description', {}) physical = character.get('physical', '') clothing = character.get('clothing', '') current_state = character.get('current_state', '') scene = segment.get('scene_continuity', {}) environment = scene.get('environment', '') camera_position = scene.get('camera_position', '') camera_movement = scene.get('camera_movement', '') lighting_state = scene.get('lighting_state', '') # Build comprehensive text prompt prompt_parts = [] # Start with dialogue if available if dialogue: prompt_parts.append(f'"{dialogue}"') # Add character description if physical: prompt_parts.append(f"Character: {physical}") if clothing: prompt_parts.append(f"Wearing: {clothing}") if current_state: prompt_parts.append(f"Current state: {current_state}") # Add scene description if environment: prompt_parts.append(f"Scene: {environment}") if lighting_state: prompt_parts.append(f"Lighting: {lighting_state}") # Add camera details if camera_position: prompt_parts.append(f"Camera: {camera_position}") if camera_movement: prompt_parts.append(f"Camera movement: {camera_movement}") # Add synchronized actions if available synced_actions = action_timeline.get('synchronized_actions', {}) if synced_actions: actions_list = [] for key, value in synced_actions.items(): if value: # Convert key like "0:00-0:02" to readable format time_key = key.replace('f', '').replace('_', '-') if key.startswith('f') else key actions_list.append(f"{time_key}: {value}") if actions_list: prompt_parts.append(f"Actions: {'; '.join(actions_list)}") # Add instruction to not include captions/subtitles prompt_parts.append("Do not include any captions, subtitles, or text overlays in the video") # Add critical instruction to avoid blur transitions at start prompt_parts.append("The video must start immediately at 0:00 with a sharp, clear, in-focus frame. No fade-in, no blur transition, no gradual focus effect at the start. The subject must be fully visible and sharp from the very first frame.") # Join all parts with periods text_prompt = ". ".join(prompt_parts) return text_prompt.strip() async def generate_video_replicate( prompt: Dict[str, Any], image_path: Optional[str] = None, seed: Optional[int] = None, aspect_ratio: str = "9:16" ) -> str: """ Generate video using Replicate's Veo-3 model. Args: prompt: Structured JSON segment (dict) - will be converted to text image_path: Optional path to reference image seed: Optional seed for consistency aspect_ratio: Aspect ratio for video (e.g., "9:16", "16:9") Returns: Path to downloaded video file """ print_status(f"๐ŸŽฌ Generating video with Replicate (google/veo-3)...") if not REPLICATE_AVAILABLE: raise Exception("Replicate package not installed. Run: pip install replicate") # Set up Replicate client replicate_token = get_replicate_api_key() os.environ['REPLICATE_API_TOKEN'] = replicate_token # Stringify the JSON object and send as string # This preserves the structured data while meeting Replicate's string requirement prompt_string = json.dumps(prompt, ensure_ascii=False, indent=None) # Prepare input - send stringified JSON input_data = { "prompt": prompt_string # Send JSON as stringified string } # Add aspect ratio input_data["aspect_ratio"] = aspect_ratio # Add seed if provided if seed is not None: input_data["seed"] = seed # Add image if provided (Replicate expects a file object) image_file = None if image_path and os.path.exists(image_path): image_file = open(image_path, 'rb') input_data["image"] = image_file # Debug: print request payload try: print_status("๐Ÿ“ฆ Replicate input (stringified JSON):") debug_input = {k: v for k, v in input_data.items() if k != 'image'} if 'image' in input_data: debug_input['image'] = f"" # Show first 500 chars of stringified JSON if 'prompt' in debug_input and isinstance(debug_input['prompt'], str): prompt_preview = debug_input['prompt'][:500] + "..." if len(debug_input['prompt']) > 500 else debug_input['prompt'] debug_input['prompt'] = prompt_preview print(json.dumps(debug_input, indent=2, ensure_ascii=False)) print_status(f"๐Ÿ“ Full prompt length: {len(prompt_string)} characters (stringified JSON)") print_status(f"๐Ÿ“ Aspect ratio: {aspect_ratio}") except Exception: print_status(f"๐Ÿ“ฆ Replicate input (raw): {input_data}") # Run Replicate model print_status("โณ Waiting for Replicate to generate video...") try: try: output = replicate.run( "google/veo-3", input=input_data ) except Exception as e: # If aspect_ratio parameter is invalid, try alternative names error_str = str(e).lower() if "aspect" in error_str and ("invalid" in error_str or "unknown" in error_str): print_status("โš ๏ธ aspect_ratio parameter not recognized, trying alternative names...") # Try camelCase version if "aspect_ratio" in input_data: input_data["aspectRatio"] = input_data.pop("aspect_ratio") try: output = replicate.run("google/veo-3", input=input_data) except: # If that fails, try "ratio" if "aspectRatio" in input_data: input_data["ratio"] = input_data.pop("aspectRatio") output = replicate.run("google/veo-3", input=input_data) # If stringified JSON fails, try converting to natural language text and retry elif "invalid" in error_str or "expected" in error_str or "422" in error_str or "validation" in error_str: print_status("โš ๏ธ Stringified JSON rejected, converting to natural language text prompt and retrying...") # Convert to text prompt prompt_text = convert_segment_to_text_prompt(prompt) input_data["prompt"] = prompt_text # Debug: print text prompt try: print_status("๐Ÿ“ฆ Replicate input (natural language text format):") debug_input = {k: v for k, v in input_data.items() if k != 'image'} if 'image' in input_data: debug_input['image'] = f"" # Truncate prompt if too long for display if 'prompt' in debug_input and isinstance(debug_input['prompt'], str) and len(debug_input['prompt']) > 500: debug_input['prompt'] = debug_input['prompt'][:500] + "... (truncated)" print(json.dumps(debug_input, indent=2, ensure_ascii=False)) print_status(f"๐Ÿ“ Full prompt length: {len(prompt_text)} characters") except Exception: print_status(f"๐Ÿ“ฆ Replicate input (text format, raw): {input_data}") # Retry with text prompt output = replicate.run( "google/veo-3", input=input_data ) else: # Re-raise if it's a different error raise finally: # Close image file if opened if image_file: image_file.close() # Replicate output can be a URL string, file-like object, or object with url()/read() methods import tempfile temp_file = tempfile.NamedTemporaryFile(delete=False, suffix='.mp4') temp_path = temp_file.name temp_file.close() # Handle different output types if hasattr(output, 'read'): # File-like object - read directly print_status("Reading video from file-like object...") with open(temp_path, 'wb') as f: f.write(output.read()) elif hasattr(output, 'url'): # Object with url() method video_url = output.url() print_success(f"Video generated: {video_url}") # Download video async with httpx.AsyncClient(timeout=120.0) as client: response = await client.get(video_url) if response.status_code != 200: raise Exception(f"Failed to download video: HTTP {response.status_code}") with open(temp_path, 'wb') as f: f.write(response.content) elif isinstance(output, str): # URL string video_url = output print_success(f"Video generated: {video_url}") # Download video async with httpx.AsyncClient(timeout=120.0) as client: response = await client.get(video_url) if response.status_code != 200: raise Exception(f"Failed to download video: HTTP {response.status_code}") with open(temp_path, 'wb') as f: f.write(response.content) else: # Try to convert to string and treat as URL video_url = str(output) print_success(f"Video generated: {video_url}") async with httpx.AsyncClient(timeout=120.0) as client: response = await client.get(video_url) if response.status_code != 200: raise Exception(f"Failed to download video: HTTP {response.status_code}") with open(temp_path, 'wb') as f: f.write(response.content) print_success(f"Video saved to: {temp_path}") return temp_path async def download_video(url: str, output_path: str): """Download video from URL""" print_status(f"๐Ÿ“ฅ Downloading video to {output_path}...") async with httpx.AsyncClient(timeout=120.0) as client: response = await client.get(url) if response.status_code != 200: raise Exception(f"Failed to download video: HTTP {response.status_code}") with open(output_path, 'wb') as f: f.write(response.content) print_success(f"Video downloaded: {output_path}") async def merge_videos( video_paths: List[str], output_path: str, segments: Optional[List[Dict[str, Any]]] = None, use_whisper: bool = True, fallback_overlap: float = 0.7 ): """ Merge multiple videos into one using FFmpeg with Whisper-based precise trimming. Args: video_paths: List of video file paths to merge output_path: Output file path segments: Optional list of segment dicts with dialogue info for Whisper trimming use_whisper: If True, use Whisper to find optimal trim points at speech boundaries fallback_overlap: Fallback trim duration if Whisper fails (seconds) """ print_status(f"๐ŸŽฅ Merging {len(video_paths)} videos...") # Try to import Whisper utilities try: from utils.whisper_trim import find_last_word_timestamp, is_whisper_available WHISPER_AVAILABLE = is_whisper_available() except ImportError: WHISPER_AVAILABLE = False print_status("โš ๏ธ Whisper not available, using fallback trimming") # Optionally trim overlap from all but the first clip adjusted_paths = [] temp_trimmed_paths: List[str] = [] import subprocess for idx, path in enumerate(video_paths): if idx == 0: adjusted_paths.append(path) continue # No start trimming - keep full video from beginning # Only end trimming via Whisper is used (handled during video generation) # Skip start trimming for all segments - use full video adjusted_paths.append(path) # Create a temporary file list for FFmpeg concat list_file = "video_list.txt" with open(list_file, 'w') as f: for path in adjusted_paths: f.write(f"file '{path}'\n") try: # Use FFmpeg to concatenate videos cmd = [ 'ffmpeg', '-f', 'concat', '-safe', '0', '-i', list_file, '-c', 'copy', '-y', output_path ] result = subprocess.run( cmd, capture_output=True, text=True, timeout=300 ) if result.returncode != 0: print_error(f"FFmpeg concat error: {result.stderr}") raise Exception("Video merging failed") print_success(f"Videos merged: {output_path}") finally: # Clean up temp files if os.path.exists(list_file): os.remove(list_file) for tmp in temp_trimmed_paths: if os.path.exists(tmp): os.remove(tmp) async def main(): """Main execution flow""" try: # Banner print_header("๐ŸŽฅ STANDALONE VIDEO CREATOR") print("Automated video generation using extend video flow\n") # Collect user inputs first (to know which provider to use) config = collect_user_inputs() # Check API keys based on provider openai_api_key = get_openai_api_key() if config['provider'] == 'kie': kie_api_key = get_api_key() elif config['provider'] == 'replicate': # Replicate key will be checked when needed pass # Generate structured prompts using GPT-4o print_header("GENERATING VIDEO PROMPTS") print_status("๐Ÿค– Using GPT-4o to generate structured prompts...") # Read reference image with open(config['image_path'], 'rb') as f: image_bytes = f.read() # Create VeoInputs veo_inputs = VeoInputs( script=config['script'], style=config['style'], jsonFormat="standard", continuationMode=True, voiceType=config['voice_type'] if config['voice_type'] != "None" else None, cameraStyle=config['camera_style'], settingMode="single" ) # Generate prompts payload = generate_segments_payload( inputs=veo_inputs, image_bytes=image_bytes, model="gpt-4o", api_key=openai_api_key ) # Debug: print GPT-generated segments payload try: print_status("๐Ÿงพ Segments payload from GPT-4o:") print(json.dumps(payload, indent=2, ensure_ascii=False)) except Exception: print_status(f"๐Ÿงพ Segments payload from GPT-4o (raw): {payload}") segments = payload.get('segments', []) print_success(f"Generated {len(segments)} video segments") if not segments: print_error("No segments generated!") sys.exit(1) # Generate videos print_header("GENERATING VIDEOS") video_paths = [] if config['provider'] == 'replicate': # Replicate: Generate each segment independently with frame continuity print_status("Using Replicate (google/veo-3) for video generation") print_status("Note: Each segment uses last frame from previous trimmed segment for continuity\n") output_dir = Path("output_videos") output_dir.mkdir(exist_ok=True) timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") # Track current reference image (starts with original) current_image_path = config['image_path'] temp_frame_paths = [] # Track temp frame files for cleanup for i, segment in enumerate(segments, start=1): print_status(f"\n๐Ÿ“น Processing segment {i}/{len(segments)}") # Generate video with current reference image print_status(f" Using reference image: {current_image_path if i == 1 else 'last frame from previous segment'}") video_path = await generate_video_replicate( prompt=segment, # Send JSON segment directly image_path=current_image_path, seed=config['seed'], aspect_ratio=config['aspect_ratio'] ) # Save generated video to output directory with proper naming segment_output = output_dir / f"segment_{i}_untrimmed_{timestamp}.mp4" import shutil if os.path.exists(video_path): shutil.move(video_path, str(segment_output)) video_path = str(segment_output) # Trim video with Whisper to get optimal cut point # For all segments except the last, we also extract the last frame for next segment # For the last segment, we still trim it to avoid extra length should_extract_frame = (i < len(segments)) # Only extract frame if not last segment if should_extract_frame: print_status(f" Trimming segment {i} to extract last frame for next segment...") else: print_status(f" Trimming segment {i} (last segment) to optimal length...") # Get dialogue from segment for Whisper action_timeline = segment.get('action_timeline', {}) dialogue = action_timeline.get('dialogue', '') if dialogue: try: from utils.whisper_trim import find_last_word_timestamp from utils.video_processor import extract_frame # Find optimal trim point last_word_time = find_last_word_timestamp( video_path=video_path, script=dialogue, model_size="base" ) if last_word_time and last_word_time > 0: trim_point = last_word_time + 0.3 # 0.3s padding # Trim the video - rename to indicate it's trimmed trimmed_path = str(segment_output).replace("_untrimmed_", "_trimmed_") import subprocess cmd_trim = [ 'ffmpeg', '-y', '-ss', '0', '-i', video_path, '-t', str(trim_point), '-c', 'copy', trimmed_path ] result = subprocess.run( cmd_trim, capture_output=True, text=True, timeout=300 ) if result.returncode == 0: # Get video duration to extract last frame from utils.video_processor import get_video_info info = get_video_info(trimmed_path) duration = float(info['format']['duration']) # Extract last frame (0.1s before end to ensure we get a frame) frame_timestamp = max(0, duration - 0.1) frame_path = output_dir / f"frame_segment_{i}_{timestamp}.jpg" # Extract frame only if not the last segment if should_extract_frame: try: extract_frame( video_path=trimmed_path, timestamp=frame_timestamp, output_path=str(frame_path) ) # Update current_image_path for next segment current_image_path = str(frame_path) temp_frame_paths.append(str(frame_path)) print_success(f" โœ… Trimmed to {trim_point:.2f}s, extracted last frame for next segment") except Exception as frame_error: print_error(f" โš ๏ธ Frame extraction failed: {str(frame_error)}") # Still use trimmed video, but extract frame from it as fallback try: # Try extracting from trimmed video anyway extract_frame( video_path=trimmed_path, timestamp=duration - 0.5, # Try earlier timestamp output_path=str(frame_path) ) current_image_path = str(frame_path) temp_frame_paths.append(str(frame_path)) print_success(f" โœ… Trimmed to {trim_point:.2f}s (fallback frame extraction)") except: print_error(f" โš ๏ธ Frame extraction failed completely") else: print_success(f" โœ… Trimmed last segment to {trim_point:.2f}s") # Use trimmed version for merging (keep untrimmed version) # Both files are kept: _untrimmed_ and _trimmed_ video_path = trimmed_path print_status(f" ๐Ÿ“ Kept untrimmed: {Path(segment_output).name}") print_status(f" ๐Ÿ“ Created trimmed: {Path(trimmed_path).name}") else: print_error(f" โš ๏ธ Trimming failed: {result.stderr}") print_status(f" Using full video") # Extract frame from full video if needed if should_extract_frame: try: from utils.video_processor import get_video_info, extract_frame info = get_video_info(video_path) duration = float(info['format']['duration']) frame_timestamp = max(0, duration - 0.1) frame_path = output_dir / f"frame_segment_{i}_{timestamp}.jpg" extract_frame( video_path=video_path, timestamp=frame_timestamp, output_path=str(frame_path) ) current_image_path = str(frame_path) temp_frame_paths.append(str(frame_path)) print_success(f" โœ… Extracted last frame from full video") except Exception as frame_error: print_error(f" โš ๏ธ Frame extraction failed: {str(frame_error)}") else: print_status(f" โš ๏ธ Could not find trim point, using full video") # Extract frame from full video if needed if should_extract_frame: try: from utils.video_processor import get_video_info, extract_frame info = get_video_info(video_path) duration = float(info['format']['duration']) frame_timestamp = max(0, duration - 0.1) frame_path = output_dir / f"frame_segment_{i}_{timestamp}.jpg" extract_frame( video_path=video_path, timestamp=frame_timestamp, output_path=str(frame_path) ) current_image_path = str(frame_path) temp_frame_paths.append(str(frame_path)) print_success(f" โœ… Extracted last frame from full video") except Exception as frame_error: print_error(f" โš ๏ธ Frame extraction failed: {str(frame_error)}") except Exception as e: print_error(f" โš ๏ธ Whisper trimming failed: {str(e)}") print_status(f" Using full video, will extract last frame") # Fallback: extract last frame from full video try: from utils.video_processor import get_video_info, extract_frame info = get_video_info(video_path) duration = float(info['format']['duration']) frame_timestamp = max(0, duration - 0.1) frame_path = output_dir / f"frame_segment_{i}_{timestamp}.jpg" extract_frame( video_path=video_path, timestamp=frame_timestamp, output_path=str(frame_path) ) current_image_path = str(frame_path) temp_frame_paths.append(str(frame_path)) print_status(f" โœ… Extracted last frame from full video") except Exception as e: print_error(f" โš ๏ธ Frame extraction failed: {str(e)}") print_error(f" โš ๏ธ Next segment will use previous frame or original image") else: # No dialogue - still try to trim if we can, or just extract frame if should_extract_frame: print_status(f" No dialogue in segment {i}, extracting last frame from full video...") try: from utils.video_processor import get_video_info, extract_frame info = get_video_info(video_path) duration = float(info['format']['duration']) frame_timestamp = max(0, duration - 0.1) frame_path = output_dir / f"frame_segment_{i}_{timestamp}.jpg" extract_frame( video_path=video_path, timestamp=frame_timestamp, output_path=str(frame_path) ) current_image_path = str(frame_path) temp_frame_paths.append(str(frame_path)) print_status(f" โœ… Extracted last frame (no dialogue in segment)") except Exception as e: print_error(f" โš ๏ธ Frame extraction failed: {str(e)}") print_error(f" โš ๏ธ Next segment will use previous frame or original image") else: print_status(f" No dialogue in last segment, using full video") video_paths.append(video_path) else: # KIE API: Use extend video flow print_status("Using KIE API for video generation with extend flow\n") video_urls = [] task_ids = [] # Generate first video first_prompt = segments[0] task_id = await generate_initial_video( prompt=first_prompt, image_path=config['image_path'], api_key=kie_api_key, model=config['model'], aspect_ratio=config['aspect_ratio'], voice_type=config['voice_type'], seed=config['seed'] ) task_ids.append(task_id) # Wait for callback result video_url = await wait_for_callback_result(task_id) video_urls.append(video_url) # Extend video for remaining segments for i, segment in enumerate(segments[1:], start=2): print_status(f"\n๐Ÿ“น Processing segment {i}/{len(segments)}") # Extend from previous task task_id = await extend_video( task_id=task_ids[-1], # Use the last task ID prompt=segment, api_key=kie_api_key, voice_type=config['voice_type'], seed=config['seed'] ) task_ids.append(task_id) # Wait for callback result video_url = await wait_for_callback_result(task_id) video_urls.append(video_url) # Download all videos from URLs print_header("DOWNLOADING VIDEOS") output_dir = Path("output_videos") output_dir.mkdir(exist_ok=True) timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") for i, url in enumerate(video_urls, start=1): output_path = output_dir / f"segment_{i}_{timestamp}.mp4" await download_video(url, str(output_path)) video_paths.append(str(output_path)) # For Replicate, videos are already in output directory with proper naming # Files are named: segment_{i}_untrimmed_{timestamp}.mp4 and segment_{i}_trimmed_{timestamp}.mp4 # Both versions are kept - untrimmed and trimmed # video_paths contains the trimmed versions which will be used for merging # No need to rename - they're already properly named # Merge videos print_header("MERGING VIDEOS") output_dir = Path("output_videos") timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") final_output = output_dir / f"final_video_{timestamp}.mp4" # Pass segments to merge_videos for Whisper-based trimming # For Replicate, videos are already trimmed during generation, so we skip trimming # For KIE, we need to trim during merge skip_trimming = (config['provider'] == 'replicate') await merge_videos( video_paths, str(final_output), segments=segments, # Pass segments for Whisper to find optimal trim points use_whisper=not skip_trimming, # Skip Whisper trimming for Replicate (already done) fallback_overlap=0.7 if not skip_trimming else 0 # No trimming for Replicate ) # Success print_header("โœจ VIDEO CREATION COMPLETE!") print_success(f"Final video saved to: {final_output}") print(f"\nGenerated {len(segments)} segments") print(f"Total processing time: {time.strftime('%M:%S', time.gmtime(time.time()))}") except KeyboardInterrupt: print_error("\n\nVideo creation cancelled by user") sys.exit(1) except Exception as e: print_error(f"\nVideo creation failed: {str(e)}") import traceback traceback.print_exc() sys.exit(1) if __name__ == "__main__": # Run the async main function asyncio.run(main())