Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| Standalone Video Creator | |
| Automated video generation using extend video flow with one-time input. | |
| No frontend required - everything runs from this single script. | |
| """ | |
| import os | |
| import sys | |
| import time | |
| import json | |
| import asyncio | |
| import httpx | |
| from pathlib import Path | |
| from typing import List, Dict, Any, Optional | |
| from datetime import datetime | |
| from dotenv import load_dotenv | |
| # Import utilities from the existing codebase | |
| from utils.prompt_generator import ( | |
| VeoInputs, | |
| generate_segments_payload, | |
| split_script_into_segments | |
| ) | |
| # Try importing replicate (optional) | |
| try: | |
| import replicate | |
| REPLICATE_AVAILABLE = True | |
| except ImportError: | |
| REPLICATE_AVAILABLE = False | |
| # Load environment variables | |
| load_dotenv('.env.local') | |
| # Configuration | |
| KIE_API_BASE = "https://api.kie.ai" | |
| BACKEND_BASE = f"http://localhost:{os.getenv('SERVER_PORT', 4000)}" | |
| MAX_RETRIES = 3 | |
| POLLING_INTERVAL = 10 # seconds | |
| MAX_WAIT_TIME = 600 # 10 minutes per video | |
| class Colors: | |
| """Terminal colors for better output""" | |
| HEADER = '\033[95m' | |
| BLUE = '\033[94m' | |
| CYAN = '\033[96m' | |
| GREEN = '\033[92m' | |
| YELLOW = '\033[93m' | |
| RED = '\033[91m' | |
| END = '\033[0m' | |
| BOLD = '\033[1m' | |
| def print_status(message: str, color: str = Colors.CYAN): | |
| """Print colored status message""" | |
| print(f"{color}{message}{Colors.END}") | |
| def print_success(message: str): | |
| """Print success message""" | |
| print(f"{Colors.GREEN}✅ {message}{Colors.END}") | |
| def print_error(message: str): | |
| """Print error message""" | |
| print(f"{Colors.RED}❌ {message}{Colors.END}") | |
| def print_header(message: str): | |
| """Print header message""" | |
| print(f"\n{Colors.BOLD}{Colors.HEADER}{'='*60}{Colors.END}") | |
| print(f"{Colors.BOLD}{Colors.HEADER}{message}{Colors.END}") | |
| print(f"{Colors.BOLD}{Colors.HEADER}{'='*60}{Colors.END}\n") | |
| def get_api_key() -> str: | |
| """Get KIE API key from environment""" | |
| api_key = os.getenv('KIE_API_KEY') | |
| if not api_key: | |
| print_error("KIE_API_KEY not found in environment!") | |
| print("Please add KIE_API_KEY to .env.local file") | |
| print("Get your API key at: https://kie.ai/api-key") | |
| sys.exit(1) | |
| return api_key | |
| def get_openai_api_key() -> str: | |
| """Get OpenAI API key from environment""" | |
| api_key = os.getenv('OPENAI_API_KEY') | |
| if not api_key: | |
| print_error("OPENAI_API_KEY not found in environment!") | |
| print("Please add OPENAI_API_KEY to .env.local file") | |
| sys.exit(1) | |
| return api_key | |
| def get_replicate_api_key() -> str: | |
| """Get Replicate API key from environment""" | |
| api_key = os.getenv('REPLICATE_API_TOKEN') | |
| if not api_key: | |
| print_error("REPLICATE_API_TOKEN not found in environment!") | |
| print("Please add REPLICATE_API_TOKEN to .env.local file") | |
| print("Get your API token at: https://replicate.com/account/api-tokens") | |
| sys.exit(1) | |
| return api_key | |
| def collect_user_inputs() -> Dict[str, Any]: | |
| """Collect one-time inputs from user""" | |
| print_header("VIDEO CREATION SETUP") | |
| print("This script will generate a video using AI. You'll need to provide:") | |
| print("1. A script/text for the video") | |
| print("2. A reference image (character/scene)") | |
| print("3. Video style preferences\n") | |
| # Script input | |
| print_status("Enter your video script (press Enter twice when done):") | |
| script_lines = [] | |
| while True: | |
| line = input() | |
| if line == "" and script_lines and script_lines[-1] == "": | |
| script_lines.pop() # Remove last empty line | |
| break | |
| script_lines.append(line) | |
| script = "\n".join(script_lines).strip() | |
| if not script: | |
| print_error("Script cannot be empty!") | |
| sys.exit(1) | |
| # Image path | |
| print_status("\nEnter path to reference image:") | |
| image_path = input().strip() | |
| if not os.path.exists(image_path): | |
| print_error(f"Image file not found: {image_path}") | |
| sys.exit(1) | |
| # Style | |
| print_status("\nEnter video style (default: 'clean, lifestyle UGC'):") | |
| style = input().strip() or "clean, lifestyle UGC" | |
| # Voice type | |
| print_status("\nEnter voice type (Deep/Warm/Crisp/None, default: None):") | |
| voice_type = input().strip() or "None" | |
| # Model | |
| print_status("\nEnter video model (default: 'veo3_fast'):") | |
| model = input().strip() or "veo3_fast" | |
| # Aspect ratio | |
| print_status("\nEnter aspect ratio (16:9 or 9:16, default: '9:16'):") | |
| aspect_ratio = input().strip() or "9:16" | |
| # Camera style | |
| print_status("\nEnter camera style (default: 'handheld steadicam'):") | |
| camera_style = input().strip() or "handheld steadicam" | |
| # Provider selection | |
| print_status("\nSelect video generation provider:") | |
| print(" 1. KIE API (supports extend video flow)") | |
| print(" 2. Replicate (google/veo-3)") | |
| provider_choice = input().strip() or "1" | |
| if provider_choice == "2": | |
| if not REPLICATE_AVAILABLE: | |
| print_error("Replicate package not installed!") | |
| print("Please install it: pip install replicate") | |
| sys.exit(1) | |
| provider = "replicate" | |
| else: | |
| provider = "kie" | |
| # Seed for consistent lighting | |
| print_status("\nEnter seed for consistent lighting (optional, press Enter to skip):") | |
| seed_input = input().strip() | |
| seed = int(seed_input) if seed_input else None | |
| print_success("\nConfiguration complete!") | |
| return { | |
| 'script': script, | |
| 'image_path': image_path, | |
| 'style': style, | |
| 'voice_type': voice_type, | |
| 'model': model, | |
| 'aspect_ratio': aspect_ratio, | |
| 'camera_style': camera_style, | |
| 'seed': seed, | |
| 'provider': provider | |
| } | |
| async def generate_initial_video( | |
| prompt: Dict[str, Any], | |
| image_path: str, | |
| api_key: str, | |
| model: str = "veo3_fast", | |
| aspect_ratio: str = "9:16", | |
| voice_type: str = "None", | |
| seed: Optional[int] = None | |
| ) -> str: | |
| """ | |
| Generate the first video segment. | |
| Uses the backend server's callback endpoint for status updates. | |
| """ | |
| print_status(f"🎬 Generating initial video segment...") | |
| # Read image file | |
| from pathlib import Path | |
| with open(image_path, 'rb') as f: | |
| image_data = f.read() | |
| # Detect image format for upload | |
| image_ext = Path(image_path).suffix.lower() | |
| mime_type = { | |
| '.jpg': 'image/jpeg', | |
| '.jpeg': 'image/jpeg', | |
| '.png': 'image/png', | |
| '.gif': 'image/gif', | |
| '.webp': 'image/webp' | |
| }.get(image_ext, 'image/jpeg') | |
| # Get public URL and callback URL | |
| public_url = os.getenv('PUBLIC_URL', BACKEND_BASE) | |
| callback_url = f"{public_url}/api/veo/callback" | |
| # Upload image to backend so it's available to the /api/images/{id} endpoint | |
| print_status(f"📷 Uploading image to backend for hosting...") | |
| async with httpx.AsyncClient(timeout=30.0) as client: | |
| upload_response = await client.post( | |
| f"{BACKEND_BASE}/api/upload-image", | |
| files={"file": ("reference_image" + image_ext, image_data, mime_type)}, | |
| ) | |
| if upload_response.status_code != 200: | |
| raise Exception(f"Image upload failed: HTTP {upload_response.status_code} - {upload_response.text}") | |
| upload_json = upload_response.json() | |
| hosted_image_url = upload_json.get("url") | |
| if not hosted_image_url: | |
| raise Exception(f"Image upload response missing URL: {upload_json}") | |
| print_success(f"Image hosted at: {hosted_image_url}") | |
| async with httpx.AsyncClient(timeout=30.0) as client: | |
| payload = { | |
| "prompt": prompt, | |
| "imageUrls": [hosted_image_url], # Use hosted URL | |
| "model": model, | |
| "aspectRatio": aspect_ratio, | |
| "generationType": "FIRST_AND_LAST_FRAMES_2_VIDEO", | |
| "enableTranslation": True, | |
| "callBackUrl": callback_url | |
| } | |
| if seed is not None: | |
| payload["seeds"] = seed | |
| if voice_type and voice_type.lower() != "none": | |
| payload["voiceType"] = voice_type | |
| # Debug: print request payload | |
| try: | |
| print_status("📦 Initial generate payload:") | |
| print(json.dumps(payload, indent=2, ensure_ascii=False)) | |
| except Exception: | |
| # Fallback in case something isn't JSON serializable | |
| print_status(f"📦 Initial generate payload (raw): {payload}") | |
| response = await client.post( | |
| f"{KIE_API_BASE}/api/v1/veo/generate", | |
| headers={ | |
| "Authorization": f"Bearer {api_key}", | |
| "Content-Type": "application/json" | |
| }, | |
| json=payload | |
| ) | |
| result = response.json() | |
| if result.get('code') != 200: | |
| raise Exception(f"Video generation failed: {result.get('msg')}") | |
| task_id = result['data']['taskId'] | |
| print_success(f"Initial video generation started: {task_id}") | |
| return task_id | |
| async def extend_video( | |
| task_id: str, | |
| prompt: Dict[str, Any], | |
| api_key: str, | |
| voice_type: str = "None", | |
| seed: Optional[int] = None | |
| ) -> str: | |
| """ | |
| Extend an existing video with new prompt. | |
| Uses the backend server's callback endpoint for status updates. | |
| """ | |
| print_status(f"🎬 Extending video from task: {task_id}") | |
| # Get public URL for callback | |
| public_url = os.getenv('PUBLIC_URL', BACKEND_BASE) | |
| callback_url = f"{public_url}/api/veo/callback" | |
| async with httpx.AsyncClient(timeout=30.0) as client: | |
| payload = { | |
| "taskId": task_id, | |
| "prompt": prompt, | |
| "callBackUrl": callback_url | |
| } | |
| if seed is not None: | |
| payload["seeds"] = seed | |
| if voice_type and voice_type.lower() != "none": | |
| payload["voiceType"] = voice_type | |
| # Debug: print request payload | |
| try: | |
| print_status("📦 Extend payload:") | |
| print(json.dumps(payload, indent=2, ensure_ascii=False)) | |
| except Exception: | |
| print_status(f"📦 Extend payload (raw): {payload}") | |
| response = await client.post( | |
| f"{KIE_API_BASE}/api/v1/veo/extend", | |
| headers={ | |
| "Authorization": f"Bearer {api_key}", | |
| "Content-Type": "application/json" | |
| }, | |
| json=payload | |
| ) | |
| result = response.json() | |
| if result.get('code') != 200: | |
| raise Exception(f"Video extension failed: {result.get('msg')}") | |
| new_task_id = result['data']['taskId'] | |
| print_success(f"Video extension started: {new_task_id}") | |
| return new_task_id | |
| async def wait_for_callback_result(task_id: str) -> str: | |
| """ | |
| Wait for callback result from backend server via SSE. | |
| Connects to the SSE endpoint and listens for real-time callback events. | |
| """ | |
| print_status(f"⏳ Listening for video completion via SSE: {task_id}") | |
| start_time = time.time() | |
| try: | |
| async with httpx.AsyncClient(timeout=httpx.Timeout(MAX_WAIT_TIME, connect=10.0)) as client: | |
| async with client.stream( | |
| 'GET', | |
| f"{BACKEND_BASE}/api/veo/events/{task_id}" | |
| ) as response: | |
| if response.status_code != 200: | |
| raise Exception(f"Failed to connect to SSE: HTTP {response.status_code}") | |
| print_status(f"🔌 Connected to SSE stream") | |
| async for line in response.aiter_lines(): | |
| # Check timeout | |
| if time.time() - start_time > MAX_WAIT_TIME: | |
| raise Exception(f"Video generation timed out after {MAX_WAIT_TIME}s") | |
| # Parse SSE data | |
| if line.startswith('data: '): | |
| data_str = line[6:] # Remove "data: " prefix | |
| try: | |
| data = json.loads(data_str) | |
| status = data.get('status') | |
| if status == 'succeeded': | |
| video_url = data.get('url') | |
| if video_url: | |
| elapsed = int(time.time() - start_time) | |
| print_success(f"Video completed in {elapsed}s: {task_id}") | |
| return video_url | |
| elif status == 'failed': | |
| error = data.get('error', 'Unknown error') | |
| raise Exception(f"Video generation failed: {error}") | |
| except json.JSONDecodeError: | |
| continue # Skip invalid JSON | |
| except httpx.TimeoutException: | |
| raise Exception(f"Video generation timed out after {MAX_WAIT_TIME}s") | |
| except Exception as e: | |
| if "timed out" in str(e).lower(): | |
| raise | |
| print_error(f"SSE connection error, falling back to polling: {str(e)}") | |
| # Fallback to simple polling if SSE fails | |
| return await poll_fallback(task_id) | |
| async def poll_fallback(task_id: str) -> str: | |
| """Fallback polling method if SSE fails""" | |
| print_status(f"⏳ Polling for video completion: {task_id}") | |
| start_time = time.time() | |
| async with httpx.AsyncClient(timeout=40.0) as client: | |
| while time.time() - start_time < MAX_WAIT_TIME: | |
| try: | |
| response = await client.get( | |
| f"{BACKEND_BASE}/api/veo/status/{task_id}" | |
| ) | |
| if response.status_code == 200: | |
| result = response.json() | |
| status = result.get('status') | |
| if status == 'succeeded': | |
| video_url = result.get('url') | |
| if video_url: | |
| print_success(f"Video completed: {task_id}") | |
| return video_url | |
| elif status == 'failed': | |
| error = result.get('error', 'Unknown error') | |
| raise Exception(f"Video generation failed: {error}") | |
| await asyncio.sleep(POLLING_INTERVAL) | |
| print(f" Still processing... ({int(time.time() - start_time)}s)") | |
| except httpx.HTTPError as e: | |
| print_error(f"Error checking status: {str(e)}") | |
| await asyncio.sleep(POLLING_INTERVAL) | |
| raise Exception(f"Video generation timed out after {MAX_WAIT_TIME}s") | |
| def convert_segment_to_text_prompt(segment: Dict[str, Any]) -> str: | |
| """ | |
| Convert structured JSON segment to a comprehensive text prompt for Replicate. | |
| Replicate's Veo-3 expects a plain text prompt, not structured JSON. | |
| """ | |
| # Extract key information from structured segment | |
| action_timeline = segment.get('action_timeline', {}) | |
| dialogue = action_timeline.get('dialogue', '') | |
| character = segment.get('character_description', {}) | |
| physical = character.get('physical', '') | |
| clothing = character.get('clothing', '') | |
| current_state = character.get('current_state', '') | |
| scene = segment.get('scene_continuity', {}) | |
| environment = scene.get('environment', '') | |
| camera_position = scene.get('camera_position', '') | |
| camera_movement = scene.get('camera_movement', '') | |
| lighting_state = scene.get('lighting_state', '') | |
| # Build comprehensive text prompt | |
| prompt_parts = [] | |
| # Start with dialogue if available | |
| if dialogue: | |
| prompt_parts.append(f'"{dialogue}"') | |
| # Add character description | |
| if physical: | |
| prompt_parts.append(f"Character: {physical}") | |
| if clothing: | |
| prompt_parts.append(f"Wearing: {clothing}") | |
| if current_state: | |
| prompt_parts.append(f"Current state: {current_state}") | |
| # Add scene description | |
| if environment: | |
| prompt_parts.append(f"Scene: {environment}") | |
| if lighting_state: | |
| prompt_parts.append(f"Lighting: {lighting_state}") | |
| # Add camera details | |
| if camera_position: | |
| prompt_parts.append(f"Camera: {camera_position}") | |
| if camera_movement: | |
| prompt_parts.append(f"Camera movement: {camera_movement}") | |
| # Add synchronized actions if available | |
| synced_actions = action_timeline.get('synchronized_actions', {}) | |
| if synced_actions: | |
| actions_list = [] | |
| for key, value in synced_actions.items(): | |
| if value: | |
| # Convert key like "0:00-0:02" to readable format | |
| time_key = key.replace('f', '').replace('_', '-') if key.startswith('f') else key | |
| actions_list.append(f"{time_key}: {value}") | |
| if actions_list: | |
| prompt_parts.append(f"Actions: {'; '.join(actions_list)}") | |
| # Add instruction to not include captions/subtitles | |
| prompt_parts.append("Do not include any captions, subtitles, or text overlays in the video") | |
| # Add critical instruction to avoid blur transitions at start | |
| prompt_parts.append("The video must start immediately at 0:00 with a sharp, clear, in-focus frame. No fade-in, no blur transition, no gradual focus effect at the start. The subject must be fully visible and sharp from the very first frame.") | |
| # Join all parts with periods | |
| text_prompt = ". ".join(prompt_parts) | |
| return text_prompt.strip() | |
| async def generate_video_replicate( | |
| prompt: Dict[str, Any], | |
| image_path: Optional[str] = None, | |
| seed: Optional[int] = None, | |
| aspect_ratio: str = "9:16" | |
| ) -> str: | |
| """ | |
| Generate video using Replicate's Veo-3 model. | |
| Args: | |
| prompt: Structured JSON segment (dict) - will be converted to text | |
| image_path: Optional path to reference image | |
| seed: Optional seed for consistency | |
| aspect_ratio: Aspect ratio for video (e.g., "9:16", "16:9") | |
| Returns: | |
| Path to downloaded video file | |
| """ | |
| print_status(f"🎬 Generating video with Replicate (google/veo-3)...") | |
| if not REPLICATE_AVAILABLE: | |
| raise Exception("Replicate package not installed. Run: pip install replicate") | |
| # Set up Replicate client | |
| replicate_token = get_replicate_api_key() | |
| os.environ['REPLICATE_API_TOKEN'] = replicate_token | |
| # Stringify the JSON object and send as string | |
| # This preserves the structured data while meeting Replicate's string requirement | |
| prompt_string = json.dumps(prompt, ensure_ascii=False, indent=None) | |
| # Prepare input - send stringified JSON | |
| input_data = { | |
| "prompt": prompt_string # Send JSON as stringified string | |
| } | |
| # Add aspect ratio | |
| input_data["aspect_ratio"] = aspect_ratio | |
| # Add seed if provided | |
| if seed is not None: | |
| input_data["seed"] = seed | |
| # Add image if provided (Replicate expects a file object) | |
| image_file = None | |
| if image_path and os.path.exists(image_path): | |
| image_file = open(image_path, 'rb') | |
| input_data["image"] = image_file | |
| # Debug: print request payload | |
| try: | |
| print_status("📦 Replicate input (stringified JSON):") | |
| debug_input = {k: v for k, v in input_data.items() if k != 'image'} | |
| if 'image' in input_data: | |
| debug_input['image'] = f"<file: {image_path}>" | |
| # Show first 500 chars of stringified JSON | |
| if 'prompt' in debug_input and isinstance(debug_input['prompt'], str): | |
| prompt_preview = debug_input['prompt'][:500] + "..." if len(debug_input['prompt']) > 500 else debug_input['prompt'] | |
| debug_input['prompt'] = prompt_preview | |
| print(json.dumps(debug_input, indent=2, ensure_ascii=False)) | |
| print_status(f"📝 Full prompt length: {len(prompt_string)} characters (stringified JSON)") | |
| print_status(f"📐 Aspect ratio: {aspect_ratio}") | |
| except Exception: | |
| print_status(f"📦 Replicate input (raw): {input_data}") | |
| # Run Replicate model | |
| print_status("⏳ Waiting for Replicate to generate video...") | |
| try: | |
| try: | |
| output = replicate.run( | |
| "google/veo-3", | |
| input=input_data | |
| ) | |
| except Exception as e: | |
| # If aspect_ratio parameter is invalid, try alternative names | |
| error_str = str(e).lower() | |
| if "aspect" in error_str and ("invalid" in error_str or "unknown" in error_str): | |
| print_status("⚠️ aspect_ratio parameter not recognized, trying alternative names...") | |
| # Try camelCase version | |
| if "aspect_ratio" in input_data: | |
| input_data["aspectRatio"] = input_data.pop("aspect_ratio") | |
| try: | |
| output = replicate.run("google/veo-3", input=input_data) | |
| except: | |
| # If that fails, try "ratio" | |
| if "aspectRatio" in input_data: | |
| input_data["ratio"] = input_data.pop("aspectRatio") | |
| output = replicate.run("google/veo-3", input=input_data) | |
| # If stringified JSON fails, try converting to natural language text and retry | |
| elif "invalid" in error_str or "expected" in error_str or "422" in error_str or "validation" in error_str: | |
| print_status("⚠️ Stringified JSON rejected, converting to natural language text prompt and retrying...") | |
| # Convert to text prompt | |
| prompt_text = convert_segment_to_text_prompt(prompt) | |
| input_data["prompt"] = prompt_text | |
| # Debug: print text prompt | |
| try: | |
| print_status("📦 Replicate input (natural language text format):") | |
| debug_input = {k: v for k, v in input_data.items() if k != 'image'} | |
| if 'image' in input_data: | |
| debug_input['image'] = f"<file: {image_path}>" | |
| # Truncate prompt if too long for display | |
| if 'prompt' in debug_input and isinstance(debug_input['prompt'], str) and len(debug_input['prompt']) > 500: | |
| debug_input['prompt'] = debug_input['prompt'][:500] + "... (truncated)" | |
| print(json.dumps(debug_input, indent=2, ensure_ascii=False)) | |
| print_status(f"📝 Full prompt length: {len(prompt_text)} characters") | |
| except Exception: | |
| print_status(f"📦 Replicate input (text format, raw): {input_data}") | |
| # Retry with text prompt | |
| output = replicate.run( | |
| "google/veo-3", | |
| input=input_data | |
| ) | |
| else: | |
| # Re-raise if it's a different error | |
| raise | |
| finally: | |
| # Close image file if opened | |
| if image_file: | |
| image_file.close() | |
| # Replicate output can be a URL string, file-like object, or object with url()/read() methods | |
| import tempfile | |
| temp_file = tempfile.NamedTemporaryFile(delete=False, suffix='.mp4') | |
| temp_path = temp_file.name | |
| temp_file.close() | |
| # Handle different output types | |
| if hasattr(output, 'read'): | |
| # File-like object - read directly | |
| print_status("Reading video from file-like object...") | |
| with open(temp_path, 'wb') as f: | |
| f.write(output.read()) | |
| elif hasattr(output, 'url'): | |
| # Object with url() method | |
| video_url = output.url() | |
| print_success(f"Video generated: {video_url}") | |
| # Download video | |
| async with httpx.AsyncClient(timeout=120.0) as client: | |
| response = await client.get(video_url) | |
| if response.status_code != 200: | |
| raise Exception(f"Failed to download video: HTTP {response.status_code}") | |
| with open(temp_path, 'wb') as f: | |
| f.write(response.content) | |
| elif isinstance(output, str): | |
| # URL string | |
| video_url = output | |
| print_success(f"Video generated: {video_url}") | |
| # Download video | |
| async with httpx.AsyncClient(timeout=120.0) as client: | |
| response = await client.get(video_url) | |
| if response.status_code != 200: | |
| raise Exception(f"Failed to download video: HTTP {response.status_code}") | |
| with open(temp_path, 'wb') as f: | |
| f.write(response.content) | |
| else: | |
| # Try to convert to string and treat as URL | |
| video_url = str(output) | |
| print_success(f"Video generated: {video_url}") | |
| async with httpx.AsyncClient(timeout=120.0) as client: | |
| response = await client.get(video_url) | |
| if response.status_code != 200: | |
| raise Exception(f"Failed to download video: HTTP {response.status_code}") | |
| with open(temp_path, 'wb') as f: | |
| f.write(response.content) | |
| print_success(f"Video saved to: {temp_path}") | |
| return temp_path | |
| async def download_video(url: str, output_path: str): | |
| """Download video from URL""" | |
| print_status(f"📥 Downloading video to {output_path}...") | |
| async with httpx.AsyncClient(timeout=120.0) as client: | |
| response = await client.get(url) | |
| if response.status_code != 200: | |
| raise Exception(f"Failed to download video: HTTP {response.status_code}") | |
| with open(output_path, 'wb') as f: | |
| f.write(response.content) | |
| print_success(f"Video downloaded: {output_path}") | |
| async def merge_videos( | |
| video_paths: List[str], | |
| output_path: str, | |
| segments: Optional[List[Dict[str, Any]]] = None, | |
| use_whisper: bool = True, | |
| fallback_overlap: float = 0.7 | |
| ): | |
| """ | |
| Merge multiple videos into one using FFmpeg with Whisper-based precise trimming. | |
| Args: | |
| video_paths: List of video file paths to merge | |
| output_path: Output file path | |
| segments: Optional list of segment dicts with dialogue info for Whisper trimming | |
| use_whisper: If True, use Whisper to find optimal trim points at speech boundaries | |
| fallback_overlap: Fallback trim duration if Whisper fails (seconds) | |
| """ | |
| print_status(f"🎥 Merging {len(video_paths)} videos...") | |
| # Try to import Whisper utilities | |
| try: | |
| from utils.whisper_trim import find_last_word_timestamp, is_whisper_available | |
| WHISPER_AVAILABLE = is_whisper_available() | |
| except ImportError: | |
| WHISPER_AVAILABLE = False | |
| print_status("⚠️ Whisper not available, using fallback trimming") | |
| # Optionally trim overlap from all but the first clip | |
| adjusted_paths = [] | |
| temp_trimmed_paths: List[str] = [] | |
| import subprocess | |
| for idx, path in enumerate(video_paths): | |
| if idx == 0: | |
| adjusted_paths.append(path) | |
| continue | |
| # No start trimming - keep full video from beginning | |
| # Only end trimming via Whisper is used (handled during video generation) | |
| # Skip start trimming for all segments - use full video | |
| adjusted_paths.append(path) | |
| # Create a temporary file list for FFmpeg concat | |
| list_file = "video_list.txt" | |
| with open(list_file, 'w') as f: | |
| for path in adjusted_paths: | |
| f.write(f"file '{path}'\n") | |
| try: | |
| # Use FFmpeg to concatenate videos | |
| cmd = [ | |
| 'ffmpeg', | |
| '-f', 'concat', | |
| '-safe', '0', | |
| '-i', list_file, | |
| '-c', 'copy', | |
| '-y', | |
| output_path | |
| ] | |
| result = subprocess.run( | |
| cmd, | |
| capture_output=True, | |
| text=True, | |
| timeout=300 | |
| ) | |
| if result.returncode != 0: | |
| print_error(f"FFmpeg concat error: {result.stderr}") | |
| raise Exception("Video merging failed") | |
| print_success(f"Videos merged: {output_path}") | |
| finally: | |
| # Clean up temp files | |
| if os.path.exists(list_file): | |
| os.remove(list_file) | |
| for tmp in temp_trimmed_paths: | |
| if os.path.exists(tmp): | |
| os.remove(tmp) | |
| async def main(): | |
| """Main execution flow""" | |
| try: | |
| # Banner | |
| print_header("🎥 STANDALONE VIDEO CREATOR") | |
| print("Automated video generation using extend video flow\n") | |
| # Collect user inputs first (to know which provider to use) | |
| config = collect_user_inputs() | |
| # Check API keys based on provider | |
| openai_api_key = get_openai_api_key() | |
| if config['provider'] == 'kie': | |
| kie_api_key = get_api_key() | |
| elif config['provider'] == 'replicate': | |
| # Replicate key will be checked when needed | |
| pass | |
| # Generate structured prompts using GPT-4o | |
| print_header("GENERATING VIDEO PROMPTS") | |
| print_status("🤖 Using GPT-4o to generate structured prompts...") | |
| # Read reference image | |
| with open(config['image_path'], 'rb') as f: | |
| image_bytes = f.read() | |
| # Create VeoInputs | |
| veo_inputs = VeoInputs( | |
| script=config['script'], | |
| style=config['style'], | |
| jsonFormat="standard", | |
| continuationMode=True, | |
| voiceType=config['voice_type'] if config['voice_type'] != "None" else None, | |
| cameraStyle=config['camera_style'], | |
| settingMode="single" | |
| ) | |
| # Generate prompts | |
| payload = generate_segments_payload( | |
| inputs=veo_inputs, | |
| image_bytes=image_bytes, | |
| model="gpt-4o", | |
| api_key=openai_api_key | |
| ) | |
| # Debug: print GPT-generated segments payload | |
| try: | |
| print_status("🧾 Segments payload from GPT-4o:") | |
| print(json.dumps(payload, indent=2, ensure_ascii=False)) | |
| except Exception: | |
| print_status(f"🧾 Segments payload from GPT-4o (raw): {payload}") | |
| segments = payload.get('segments', []) | |
| print_success(f"Generated {len(segments)} video segments") | |
| if not segments: | |
| print_error("No segments generated!") | |
| sys.exit(1) | |
| # Generate videos | |
| print_header("GENERATING VIDEOS") | |
| video_paths = [] | |
| if config['provider'] == 'replicate': | |
| # Replicate: Generate each segment independently with frame continuity | |
| print_status("Using Replicate (google/veo-3) for video generation") | |
| print_status("Note: Each segment uses last frame from previous trimmed segment for continuity\n") | |
| output_dir = Path("output_videos") | |
| output_dir.mkdir(exist_ok=True) | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| # Track current reference image (starts with original) | |
| current_image_path = config['image_path'] | |
| temp_frame_paths = [] # Track temp frame files for cleanup | |
| for i, segment in enumerate(segments, start=1): | |
| print_status(f"\n📹 Processing segment {i}/{len(segments)}") | |
| # Generate video with current reference image | |
| print_status(f" Using reference image: {current_image_path if i == 1 else 'last frame from previous segment'}") | |
| video_path = await generate_video_replicate( | |
| prompt=segment, # Send JSON segment directly | |
| image_path=current_image_path, | |
| seed=config['seed'], | |
| aspect_ratio=config['aspect_ratio'] | |
| ) | |
| # Save generated video to output directory with proper naming | |
| segment_output = output_dir / f"segment_{i}_untrimmed_{timestamp}.mp4" | |
| import shutil | |
| if os.path.exists(video_path): | |
| shutil.move(video_path, str(segment_output)) | |
| video_path = str(segment_output) | |
| # Trim video with Whisper to get optimal cut point | |
| # For all segments except the last, we also extract the last frame for next segment | |
| # For the last segment, we still trim it to avoid extra length | |
| should_extract_frame = (i < len(segments)) # Only extract frame if not last segment | |
| if should_extract_frame: | |
| print_status(f" Trimming segment {i} to extract last frame for next segment...") | |
| else: | |
| print_status(f" Trimming segment {i} (last segment) to optimal length...") | |
| # Get dialogue from segment for Whisper | |
| action_timeline = segment.get('action_timeline', {}) | |
| dialogue = action_timeline.get('dialogue', '') | |
| if dialogue: | |
| try: | |
| from utils.whisper_trim import find_last_word_timestamp | |
| from utils.video_processor import extract_frame | |
| # Find optimal trim point | |
| last_word_time = find_last_word_timestamp( | |
| video_path=video_path, | |
| script=dialogue, | |
| model_size="base" | |
| ) | |
| if last_word_time and last_word_time > 0: | |
| trim_point = last_word_time + 0.3 # 0.3s padding | |
| # Trim the video - rename to indicate it's trimmed | |
| trimmed_path = str(segment_output).replace("_untrimmed_", "_trimmed_") | |
| import subprocess | |
| cmd_trim = [ | |
| 'ffmpeg', | |
| '-y', | |
| '-ss', '0', | |
| '-i', video_path, | |
| '-t', str(trim_point), | |
| '-c', 'copy', | |
| trimmed_path | |
| ] | |
| result = subprocess.run( | |
| cmd_trim, | |
| capture_output=True, | |
| text=True, | |
| timeout=300 | |
| ) | |
| if result.returncode == 0: | |
| # Get video duration to extract last frame | |
| from utils.video_processor import get_video_info | |
| info = get_video_info(trimmed_path) | |
| duration = float(info['format']['duration']) | |
| # Extract last frame (0.1s before end to ensure we get a frame) | |
| frame_timestamp = max(0, duration - 0.1) | |
| frame_path = output_dir / f"frame_segment_{i}_{timestamp}.jpg" | |
| # Extract frame only if not the last segment | |
| if should_extract_frame: | |
| try: | |
| extract_frame( | |
| video_path=trimmed_path, | |
| timestamp=frame_timestamp, | |
| output_path=str(frame_path) | |
| ) | |
| # Update current_image_path for next segment | |
| current_image_path = str(frame_path) | |
| temp_frame_paths.append(str(frame_path)) | |
| print_success(f" ✅ Trimmed to {trim_point:.2f}s, extracted last frame for next segment") | |
| except Exception as frame_error: | |
| print_error(f" ⚠️ Frame extraction failed: {str(frame_error)}") | |
| # Still use trimmed video, but extract frame from it as fallback | |
| try: | |
| # Try extracting from trimmed video anyway | |
| extract_frame( | |
| video_path=trimmed_path, | |
| timestamp=duration - 0.5, # Try earlier timestamp | |
| output_path=str(frame_path) | |
| ) | |
| current_image_path = str(frame_path) | |
| temp_frame_paths.append(str(frame_path)) | |
| print_success(f" ✅ Trimmed to {trim_point:.2f}s (fallback frame extraction)") | |
| except: | |
| print_error(f" ⚠️ Frame extraction failed completely") | |
| else: | |
| print_success(f" ✅ Trimmed last segment to {trim_point:.2f}s") | |
| # Use trimmed version for merging (keep untrimmed version) | |
| # Both files are kept: _untrimmed_ and _trimmed_ | |
| video_path = trimmed_path | |
| print_status(f" 📁 Kept untrimmed: {Path(segment_output).name}") | |
| print_status(f" 📁 Created trimmed: {Path(trimmed_path).name}") | |
| else: | |
| print_error(f" ⚠️ Trimming failed: {result.stderr}") | |
| print_status(f" Using full video") | |
| # Extract frame from full video if needed | |
| if should_extract_frame: | |
| try: | |
| from utils.video_processor import get_video_info, extract_frame | |
| info = get_video_info(video_path) | |
| duration = float(info['format']['duration']) | |
| frame_timestamp = max(0, duration - 0.1) | |
| frame_path = output_dir / f"frame_segment_{i}_{timestamp}.jpg" | |
| extract_frame( | |
| video_path=video_path, | |
| timestamp=frame_timestamp, | |
| output_path=str(frame_path) | |
| ) | |
| current_image_path = str(frame_path) | |
| temp_frame_paths.append(str(frame_path)) | |
| print_success(f" ✅ Extracted last frame from full video") | |
| except Exception as frame_error: | |
| print_error(f" ⚠️ Frame extraction failed: {str(frame_error)}") | |
| else: | |
| print_status(f" ⚠️ Could not find trim point, using full video") | |
| # Extract frame from full video if needed | |
| if should_extract_frame: | |
| try: | |
| from utils.video_processor import get_video_info, extract_frame | |
| info = get_video_info(video_path) | |
| duration = float(info['format']['duration']) | |
| frame_timestamp = max(0, duration - 0.1) | |
| frame_path = output_dir / f"frame_segment_{i}_{timestamp}.jpg" | |
| extract_frame( | |
| video_path=video_path, | |
| timestamp=frame_timestamp, | |
| output_path=str(frame_path) | |
| ) | |
| current_image_path = str(frame_path) | |
| temp_frame_paths.append(str(frame_path)) | |
| print_success(f" ✅ Extracted last frame from full video") | |
| except Exception as frame_error: | |
| print_error(f" ⚠️ Frame extraction failed: {str(frame_error)}") | |
| except Exception as e: | |
| print_error(f" ⚠️ Whisper trimming failed: {str(e)}") | |
| print_status(f" Using full video, will extract last frame") | |
| # Fallback: extract last frame from full video | |
| try: | |
| from utils.video_processor import get_video_info, extract_frame | |
| info = get_video_info(video_path) | |
| duration = float(info['format']['duration']) | |
| frame_timestamp = max(0, duration - 0.1) | |
| frame_path = output_dir / f"frame_segment_{i}_{timestamp}.jpg" | |
| extract_frame( | |
| video_path=video_path, | |
| timestamp=frame_timestamp, | |
| output_path=str(frame_path) | |
| ) | |
| current_image_path = str(frame_path) | |
| temp_frame_paths.append(str(frame_path)) | |
| print_status(f" ✅ Extracted last frame from full video") | |
| except Exception as e: | |
| print_error(f" ⚠️ Frame extraction failed: {str(e)}") | |
| print_error(f" ⚠️ Next segment will use previous frame or original image") | |
| else: | |
| # No dialogue - still try to trim if we can, or just extract frame | |
| if should_extract_frame: | |
| print_status(f" No dialogue in segment {i}, extracting last frame from full video...") | |
| try: | |
| from utils.video_processor import get_video_info, extract_frame | |
| info = get_video_info(video_path) | |
| duration = float(info['format']['duration']) | |
| frame_timestamp = max(0, duration - 0.1) | |
| frame_path = output_dir / f"frame_segment_{i}_{timestamp}.jpg" | |
| extract_frame( | |
| video_path=video_path, | |
| timestamp=frame_timestamp, | |
| output_path=str(frame_path) | |
| ) | |
| current_image_path = str(frame_path) | |
| temp_frame_paths.append(str(frame_path)) | |
| print_status(f" ✅ Extracted last frame (no dialogue in segment)") | |
| except Exception as e: | |
| print_error(f" ⚠️ Frame extraction failed: {str(e)}") | |
| print_error(f" ⚠️ Next segment will use previous frame or original image") | |
| else: | |
| print_status(f" No dialogue in last segment, using full video") | |
| video_paths.append(video_path) | |
| else: | |
| # KIE API: Use extend video flow | |
| print_status("Using KIE API for video generation with extend flow\n") | |
| video_urls = [] | |
| task_ids = [] | |
| # Generate first video | |
| first_prompt = segments[0] | |
| task_id = await generate_initial_video( | |
| prompt=first_prompt, | |
| image_path=config['image_path'], | |
| api_key=kie_api_key, | |
| model=config['model'], | |
| aspect_ratio=config['aspect_ratio'], | |
| voice_type=config['voice_type'], | |
| seed=config['seed'] | |
| ) | |
| task_ids.append(task_id) | |
| # Wait for callback result | |
| video_url = await wait_for_callback_result(task_id) | |
| video_urls.append(video_url) | |
| # Extend video for remaining segments | |
| for i, segment in enumerate(segments[1:], start=2): | |
| print_status(f"\n📹 Processing segment {i}/{len(segments)}") | |
| # Extend from previous task | |
| task_id = await extend_video( | |
| task_id=task_ids[-1], # Use the last task ID | |
| prompt=segment, | |
| api_key=kie_api_key, | |
| voice_type=config['voice_type'], | |
| seed=config['seed'] | |
| ) | |
| task_ids.append(task_id) | |
| # Wait for callback result | |
| video_url = await wait_for_callback_result(task_id) | |
| video_urls.append(video_url) | |
| # Download all videos from URLs | |
| print_header("DOWNLOADING VIDEOS") | |
| output_dir = Path("output_videos") | |
| output_dir.mkdir(exist_ok=True) | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| for i, url in enumerate(video_urls, start=1): | |
| output_path = output_dir / f"segment_{i}_{timestamp}.mp4" | |
| await download_video(url, str(output_path)) | |
| video_paths.append(str(output_path)) | |
| # For Replicate, videos are already in output directory with proper naming | |
| # Files are named: segment_{i}_untrimmed_{timestamp}.mp4 and segment_{i}_trimmed_{timestamp}.mp4 | |
| # Both versions are kept - untrimmed and trimmed | |
| # video_paths contains the trimmed versions which will be used for merging | |
| # No need to rename - they're already properly named | |
| # Merge videos | |
| print_header("MERGING VIDEOS") | |
| output_dir = Path("output_videos") | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| final_output = output_dir / f"final_video_{timestamp}.mp4" | |
| # Pass segments to merge_videos for Whisper-based trimming | |
| # For Replicate, videos are already trimmed during generation, so we skip trimming | |
| # For KIE, we need to trim during merge | |
| skip_trimming = (config['provider'] == 'replicate') | |
| await merge_videos( | |
| video_paths, | |
| str(final_output), | |
| segments=segments, # Pass segments for Whisper to find optimal trim points | |
| use_whisper=not skip_trimming, # Skip Whisper trimming for Replicate (already done) | |
| fallback_overlap=0.7 if not skip_trimming else 0 # No trimming for Replicate | |
| ) | |
| # Success | |
| print_header("✨ VIDEO CREATION COMPLETE!") | |
| print_success(f"Final video saved to: {final_output}") | |
| print(f"\nGenerated {len(segments)} segments") | |
| print(f"Total processing time: {time.strftime('%M:%S', time.gmtime(time.time()))}") | |
| except KeyboardInterrupt: | |
| print_error("\n\nVideo creation cancelled by user") | |
| sys.exit(1) | |
| except Exception as e: | |
| print_error(f"\nVideo creation failed: {str(e)}") | |
| import traceback | |
| traceback.print_exc() | |
| sys.exit(1) | |
| if __name__ == "__main__": | |
| # Run the async main function | |
| asyncio.run(main()) | |