Video_AdGenesis_App / standalone_video_creator.py
sushilideaclan01's picture
first push
91d209c
#!/usr/bin/env python3
"""
Standalone Video Creator
Automated video generation using extend video flow with one-time input.
No frontend required - everything runs from this single script.
"""
import os
import sys
import time
import json
import asyncio
import httpx
from pathlib import Path
from typing import List, Dict, Any, Optional
from datetime import datetime
from dotenv import load_dotenv
# Import utilities from the existing codebase
from utils.prompt_generator import (
VeoInputs,
generate_segments_payload,
split_script_into_segments
)
# Try importing replicate (optional)
try:
import replicate
REPLICATE_AVAILABLE = True
except ImportError:
REPLICATE_AVAILABLE = False
# Load environment variables
load_dotenv('.env.local')
# Configuration
KIE_API_BASE = "https://api.kie.ai"
BACKEND_BASE = f"http://localhost:{os.getenv('SERVER_PORT', 4000)}"
MAX_RETRIES = 3
POLLING_INTERVAL = 10 # seconds
MAX_WAIT_TIME = 600 # 10 minutes per video
class Colors:
"""Terminal colors for better output"""
HEADER = '\033[95m'
BLUE = '\033[94m'
CYAN = '\033[96m'
GREEN = '\033[92m'
YELLOW = '\033[93m'
RED = '\033[91m'
END = '\033[0m'
BOLD = '\033[1m'
def print_status(message: str, color: str = Colors.CYAN):
"""Print colored status message"""
print(f"{color}{message}{Colors.END}")
def print_success(message: str):
"""Print success message"""
print(f"{Colors.GREEN}{message}{Colors.END}")
def print_error(message: str):
"""Print error message"""
print(f"{Colors.RED}{message}{Colors.END}")
def print_header(message: str):
"""Print header message"""
print(f"\n{Colors.BOLD}{Colors.HEADER}{'='*60}{Colors.END}")
print(f"{Colors.BOLD}{Colors.HEADER}{message}{Colors.END}")
print(f"{Colors.BOLD}{Colors.HEADER}{'='*60}{Colors.END}\n")
def get_api_key() -> str:
"""Get KIE API key from environment"""
api_key = os.getenv('KIE_API_KEY')
if not api_key:
print_error("KIE_API_KEY not found in environment!")
print("Please add KIE_API_KEY to .env.local file")
print("Get your API key at: https://kie.ai/api-key")
sys.exit(1)
return api_key
def get_openai_api_key() -> str:
"""Get OpenAI API key from environment"""
api_key = os.getenv('OPENAI_API_KEY')
if not api_key:
print_error("OPENAI_API_KEY not found in environment!")
print("Please add OPENAI_API_KEY to .env.local file")
sys.exit(1)
return api_key
def get_replicate_api_key() -> str:
"""Get Replicate API key from environment"""
api_key = os.getenv('REPLICATE_API_TOKEN')
if not api_key:
print_error("REPLICATE_API_TOKEN not found in environment!")
print("Please add REPLICATE_API_TOKEN to .env.local file")
print("Get your API token at: https://replicate.com/account/api-tokens")
sys.exit(1)
return api_key
def collect_user_inputs() -> Dict[str, Any]:
"""Collect one-time inputs from user"""
print_header("VIDEO CREATION SETUP")
print("This script will generate a video using AI. You'll need to provide:")
print("1. A script/text for the video")
print("2. A reference image (character/scene)")
print("3. Video style preferences\n")
# Script input
print_status("Enter your video script (press Enter twice when done):")
script_lines = []
while True:
line = input()
if line == "" and script_lines and script_lines[-1] == "":
script_lines.pop() # Remove last empty line
break
script_lines.append(line)
script = "\n".join(script_lines).strip()
if not script:
print_error("Script cannot be empty!")
sys.exit(1)
# Image path
print_status("\nEnter path to reference image:")
image_path = input().strip()
if not os.path.exists(image_path):
print_error(f"Image file not found: {image_path}")
sys.exit(1)
# Style
print_status("\nEnter video style (default: 'clean, lifestyle UGC'):")
style = input().strip() or "clean, lifestyle UGC"
# Voice type
print_status("\nEnter voice type (Deep/Warm/Crisp/None, default: None):")
voice_type = input().strip() or "None"
# Model
print_status("\nEnter video model (default: 'veo3_fast'):")
model = input().strip() or "veo3_fast"
# Aspect ratio
print_status("\nEnter aspect ratio (16:9 or 9:16, default: '9:16'):")
aspect_ratio = input().strip() or "9:16"
# Camera style
print_status("\nEnter camera style (default: 'handheld steadicam'):")
camera_style = input().strip() or "handheld steadicam"
# Provider selection
print_status("\nSelect video generation provider:")
print(" 1. KIE API (supports extend video flow)")
print(" 2. Replicate (google/veo-3)")
provider_choice = input().strip() or "1"
if provider_choice == "2":
if not REPLICATE_AVAILABLE:
print_error("Replicate package not installed!")
print("Please install it: pip install replicate")
sys.exit(1)
provider = "replicate"
else:
provider = "kie"
# Seed for consistent lighting
print_status("\nEnter seed for consistent lighting (optional, press Enter to skip):")
seed_input = input().strip()
seed = int(seed_input) if seed_input else None
print_success("\nConfiguration complete!")
return {
'script': script,
'image_path': image_path,
'style': style,
'voice_type': voice_type,
'model': model,
'aspect_ratio': aspect_ratio,
'camera_style': camera_style,
'seed': seed,
'provider': provider
}
async def generate_initial_video(
prompt: Dict[str, Any],
image_path: str,
api_key: str,
model: str = "veo3_fast",
aspect_ratio: str = "9:16",
voice_type: str = "None",
seed: Optional[int] = None
) -> str:
"""
Generate the first video segment.
Uses the backend server's callback endpoint for status updates.
"""
print_status(f"🎬 Generating initial video segment...")
# Read image file
from pathlib import Path
with open(image_path, 'rb') as f:
image_data = f.read()
# Detect image format for upload
image_ext = Path(image_path).suffix.lower()
mime_type = {
'.jpg': 'image/jpeg',
'.jpeg': 'image/jpeg',
'.png': 'image/png',
'.gif': 'image/gif',
'.webp': 'image/webp'
}.get(image_ext, 'image/jpeg')
# Get public URL and callback URL
public_url = os.getenv('PUBLIC_URL', BACKEND_BASE)
callback_url = f"{public_url}/api/veo/callback"
# Upload image to backend so it's available to the /api/images/{id} endpoint
print_status(f"📷 Uploading image to backend for hosting...")
async with httpx.AsyncClient(timeout=30.0) as client:
upload_response = await client.post(
f"{BACKEND_BASE}/api/upload-image",
files={"file": ("reference_image" + image_ext, image_data, mime_type)},
)
if upload_response.status_code != 200:
raise Exception(f"Image upload failed: HTTP {upload_response.status_code} - {upload_response.text}")
upload_json = upload_response.json()
hosted_image_url = upload_json.get("url")
if not hosted_image_url:
raise Exception(f"Image upload response missing URL: {upload_json}")
print_success(f"Image hosted at: {hosted_image_url}")
async with httpx.AsyncClient(timeout=30.0) as client:
payload = {
"prompt": prompt,
"imageUrls": [hosted_image_url], # Use hosted URL
"model": model,
"aspectRatio": aspect_ratio,
"generationType": "FIRST_AND_LAST_FRAMES_2_VIDEO",
"enableTranslation": True,
"callBackUrl": callback_url
}
if seed is not None:
payload["seeds"] = seed
if voice_type and voice_type.lower() != "none":
payload["voiceType"] = voice_type
# Debug: print request payload
try:
print_status("📦 Initial generate payload:")
print(json.dumps(payload, indent=2, ensure_ascii=False))
except Exception:
# Fallback in case something isn't JSON serializable
print_status(f"📦 Initial generate payload (raw): {payload}")
response = await client.post(
f"{KIE_API_BASE}/api/v1/veo/generate",
headers={
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
},
json=payload
)
result = response.json()
if result.get('code') != 200:
raise Exception(f"Video generation failed: {result.get('msg')}")
task_id = result['data']['taskId']
print_success(f"Initial video generation started: {task_id}")
return task_id
async def extend_video(
task_id: str,
prompt: Dict[str, Any],
api_key: str,
voice_type: str = "None",
seed: Optional[int] = None
) -> str:
"""
Extend an existing video with new prompt.
Uses the backend server's callback endpoint for status updates.
"""
print_status(f"🎬 Extending video from task: {task_id}")
# Get public URL for callback
public_url = os.getenv('PUBLIC_URL', BACKEND_BASE)
callback_url = f"{public_url}/api/veo/callback"
async with httpx.AsyncClient(timeout=30.0) as client:
payload = {
"taskId": task_id,
"prompt": prompt,
"callBackUrl": callback_url
}
if seed is not None:
payload["seeds"] = seed
if voice_type and voice_type.lower() != "none":
payload["voiceType"] = voice_type
# Debug: print request payload
try:
print_status("📦 Extend payload:")
print(json.dumps(payload, indent=2, ensure_ascii=False))
except Exception:
print_status(f"📦 Extend payload (raw): {payload}")
response = await client.post(
f"{KIE_API_BASE}/api/v1/veo/extend",
headers={
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
},
json=payload
)
result = response.json()
if result.get('code') != 200:
raise Exception(f"Video extension failed: {result.get('msg')}")
new_task_id = result['data']['taskId']
print_success(f"Video extension started: {new_task_id}")
return new_task_id
async def wait_for_callback_result(task_id: str) -> str:
"""
Wait for callback result from backend server via SSE.
Connects to the SSE endpoint and listens for real-time callback events.
"""
print_status(f"⏳ Listening for video completion via SSE: {task_id}")
start_time = time.time()
try:
async with httpx.AsyncClient(timeout=httpx.Timeout(MAX_WAIT_TIME, connect=10.0)) as client:
async with client.stream(
'GET',
f"{BACKEND_BASE}/api/veo/events/{task_id}"
) as response:
if response.status_code != 200:
raise Exception(f"Failed to connect to SSE: HTTP {response.status_code}")
print_status(f"🔌 Connected to SSE stream")
async for line in response.aiter_lines():
# Check timeout
if time.time() - start_time > MAX_WAIT_TIME:
raise Exception(f"Video generation timed out after {MAX_WAIT_TIME}s")
# Parse SSE data
if line.startswith('data: '):
data_str = line[6:] # Remove "data: " prefix
try:
data = json.loads(data_str)
status = data.get('status')
if status == 'succeeded':
video_url = data.get('url')
if video_url:
elapsed = int(time.time() - start_time)
print_success(f"Video completed in {elapsed}s: {task_id}")
return video_url
elif status == 'failed':
error = data.get('error', 'Unknown error')
raise Exception(f"Video generation failed: {error}")
except json.JSONDecodeError:
continue # Skip invalid JSON
except httpx.TimeoutException:
raise Exception(f"Video generation timed out after {MAX_WAIT_TIME}s")
except Exception as e:
if "timed out" in str(e).lower():
raise
print_error(f"SSE connection error, falling back to polling: {str(e)}")
# Fallback to simple polling if SSE fails
return await poll_fallback(task_id)
async def poll_fallback(task_id: str) -> str:
"""Fallback polling method if SSE fails"""
print_status(f"⏳ Polling for video completion: {task_id}")
start_time = time.time()
async with httpx.AsyncClient(timeout=40.0) as client:
while time.time() - start_time < MAX_WAIT_TIME:
try:
response = await client.get(
f"{BACKEND_BASE}/api/veo/status/{task_id}"
)
if response.status_code == 200:
result = response.json()
status = result.get('status')
if status == 'succeeded':
video_url = result.get('url')
if video_url:
print_success(f"Video completed: {task_id}")
return video_url
elif status == 'failed':
error = result.get('error', 'Unknown error')
raise Exception(f"Video generation failed: {error}")
await asyncio.sleep(POLLING_INTERVAL)
print(f" Still processing... ({int(time.time() - start_time)}s)")
except httpx.HTTPError as e:
print_error(f"Error checking status: {str(e)}")
await asyncio.sleep(POLLING_INTERVAL)
raise Exception(f"Video generation timed out after {MAX_WAIT_TIME}s")
def convert_segment_to_text_prompt(segment: Dict[str, Any]) -> str:
"""
Convert structured JSON segment to a comprehensive text prompt for Replicate.
Replicate's Veo-3 expects a plain text prompt, not structured JSON.
"""
# Extract key information from structured segment
action_timeline = segment.get('action_timeline', {})
dialogue = action_timeline.get('dialogue', '')
character = segment.get('character_description', {})
physical = character.get('physical', '')
clothing = character.get('clothing', '')
current_state = character.get('current_state', '')
scene = segment.get('scene_continuity', {})
environment = scene.get('environment', '')
camera_position = scene.get('camera_position', '')
camera_movement = scene.get('camera_movement', '')
lighting_state = scene.get('lighting_state', '')
# Build comprehensive text prompt
prompt_parts = []
# Start with dialogue if available
if dialogue:
prompt_parts.append(f'"{dialogue}"')
# Add character description
if physical:
prompt_parts.append(f"Character: {physical}")
if clothing:
prompt_parts.append(f"Wearing: {clothing}")
if current_state:
prompt_parts.append(f"Current state: {current_state}")
# Add scene description
if environment:
prompt_parts.append(f"Scene: {environment}")
if lighting_state:
prompt_parts.append(f"Lighting: {lighting_state}")
# Add camera details
if camera_position:
prompt_parts.append(f"Camera: {camera_position}")
if camera_movement:
prompt_parts.append(f"Camera movement: {camera_movement}")
# Add synchronized actions if available
synced_actions = action_timeline.get('synchronized_actions', {})
if synced_actions:
actions_list = []
for key, value in synced_actions.items():
if value:
# Convert key like "0:00-0:02" to readable format
time_key = key.replace('f', '').replace('_', '-') if key.startswith('f') else key
actions_list.append(f"{time_key}: {value}")
if actions_list:
prompt_parts.append(f"Actions: {'; '.join(actions_list)}")
# Add instruction to not include captions/subtitles
prompt_parts.append("Do not include any captions, subtitles, or text overlays in the video")
# Add critical instruction to avoid blur transitions at start
prompt_parts.append("The video must start immediately at 0:00 with a sharp, clear, in-focus frame. No fade-in, no blur transition, no gradual focus effect at the start. The subject must be fully visible and sharp from the very first frame.")
# Join all parts with periods
text_prompt = ". ".join(prompt_parts)
return text_prompt.strip()
async def generate_video_replicate(
prompt: Dict[str, Any],
image_path: Optional[str] = None,
seed: Optional[int] = None,
aspect_ratio: str = "9:16"
) -> str:
"""
Generate video using Replicate's Veo-3 model.
Args:
prompt: Structured JSON segment (dict) - will be converted to text
image_path: Optional path to reference image
seed: Optional seed for consistency
aspect_ratio: Aspect ratio for video (e.g., "9:16", "16:9")
Returns:
Path to downloaded video file
"""
print_status(f"🎬 Generating video with Replicate (google/veo-3)...")
if not REPLICATE_AVAILABLE:
raise Exception("Replicate package not installed. Run: pip install replicate")
# Set up Replicate client
replicate_token = get_replicate_api_key()
os.environ['REPLICATE_API_TOKEN'] = replicate_token
# Stringify the JSON object and send as string
# This preserves the structured data while meeting Replicate's string requirement
prompt_string = json.dumps(prompt, ensure_ascii=False, indent=None)
# Prepare input - send stringified JSON
input_data = {
"prompt": prompt_string # Send JSON as stringified string
}
# Add aspect ratio
input_data["aspect_ratio"] = aspect_ratio
# Add seed if provided
if seed is not None:
input_data["seed"] = seed
# Add image if provided (Replicate expects a file object)
image_file = None
if image_path and os.path.exists(image_path):
image_file = open(image_path, 'rb')
input_data["image"] = image_file
# Debug: print request payload
try:
print_status("📦 Replicate input (stringified JSON):")
debug_input = {k: v for k, v in input_data.items() if k != 'image'}
if 'image' in input_data:
debug_input['image'] = f"<file: {image_path}>"
# Show first 500 chars of stringified JSON
if 'prompt' in debug_input and isinstance(debug_input['prompt'], str):
prompt_preview = debug_input['prompt'][:500] + "..." if len(debug_input['prompt']) > 500 else debug_input['prompt']
debug_input['prompt'] = prompt_preview
print(json.dumps(debug_input, indent=2, ensure_ascii=False))
print_status(f"📝 Full prompt length: {len(prompt_string)} characters (stringified JSON)")
print_status(f"📐 Aspect ratio: {aspect_ratio}")
except Exception:
print_status(f"📦 Replicate input (raw): {input_data}")
# Run Replicate model
print_status("⏳ Waiting for Replicate to generate video...")
try:
try:
output = replicate.run(
"google/veo-3",
input=input_data
)
except Exception as e:
# If aspect_ratio parameter is invalid, try alternative names
error_str = str(e).lower()
if "aspect" in error_str and ("invalid" in error_str or "unknown" in error_str):
print_status("⚠️ aspect_ratio parameter not recognized, trying alternative names...")
# Try camelCase version
if "aspect_ratio" in input_data:
input_data["aspectRatio"] = input_data.pop("aspect_ratio")
try:
output = replicate.run("google/veo-3", input=input_data)
except:
# If that fails, try "ratio"
if "aspectRatio" in input_data:
input_data["ratio"] = input_data.pop("aspectRatio")
output = replicate.run("google/veo-3", input=input_data)
# If stringified JSON fails, try converting to natural language text and retry
elif "invalid" in error_str or "expected" in error_str or "422" in error_str or "validation" in error_str:
print_status("⚠️ Stringified JSON rejected, converting to natural language text prompt and retrying...")
# Convert to text prompt
prompt_text = convert_segment_to_text_prompt(prompt)
input_data["prompt"] = prompt_text
# Debug: print text prompt
try:
print_status("📦 Replicate input (natural language text format):")
debug_input = {k: v for k, v in input_data.items() if k != 'image'}
if 'image' in input_data:
debug_input['image'] = f"<file: {image_path}>"
# Truncate prompt if too long for display
if 'prompt' in debug_input and isinstance(debug_input['prompt'], str) and len(debug_input['prompt']) > 500:
debug_input['prompt'] = debug_input['prompt'][:500] + "... (truncated)"
print(json.dumps(debug_input, indent=2, ensure_ascii=False))
print_status(f"📝 Full prompt length: {len(prompt_text)} characters")
except Exception:
print_status(f"📦 Replicate input (text format, raw): {input_data}")
# Retry with text prompt
output = replicate.run(
"google/veo-3",
input=input_data
)
else:
# Re-raise if it's a different error
raise
finally:
# Close image file if opened
if image_file:
image_file.close()
# Replicate output can be a URL string, file-like object, or object with url()/read() methods
import tempfile
temp_file = tempfile.NamedTemporaryFile(delete=False, suffix='.mp4')
temp_path = temp_file.name
temp_file.close()
# Handle different output types
if hasattr(output, 'read'):
# File-like object - read directly
print_status("Reading video from file-like object...")
with open(temp_path, 'wb') as f:
f.write(output.read())
elif hasattr(output, 'url'):
# Object with url() method
video_url = output.url()
print_success(f"Video generated: {video_url}")
# Download video
async with httpx.AsyncClient(timeout=120.0) as client:
response = await client.get(video_url)
if response.status_code != 200:
raise Exception(f"Failed to download video: HTTP {response.status_code}")
with open(temp_path, 'wb') as f:
f.write(response.content)
elif isinstance(output, str):
# URL string
video_url = output
print_success(f"Video generated: {video_url}")
# Download video
async with httpx.AsyncClient(timeout=120.0) as client:
response = await client.get(video_url)
if response.status_code != 200:
raise Exception(f"Failed to download video: HTTP {response.status_code}")
with open(temp_path, 'wb') as f:
f.write(response.content)
else:
# Try to convert to string and treat as URL
video_url = str(output)
print_success(f"Video generated: {video_url}")
async with httpx.AsyncClient(timeout=120.0) as client:
response = await client.get(video_url)
if response.status_code != 200:
raise Exception(f"Failed to download video: HTTP {response.status_code}")
with open(temp_path, 'wb') as f:
f.write(response.content)
print_success(f"Video saved to: {temp_path}")
return temp_path
async def download_video(url: str, output_path: str):
"""Download video from URL"""
print_status(f"📥 Downloading video to {output_path}...")
async with httpx.AsyncClient(timeout=120.0) as client:
response = await client.get(url)
if response.status_code != 200:
raise Exception(f"Failed to download video: HTTP {response.status_code}")
with open(output_path, 'wb') as f:
f.write(response.content)
print_success(f"Video downloaded: {output_path}")
async def merge_videos(
video_paths: List[str],
output_path: str,
segments: Optional[List[Dict[str, Any]]] = None,
use_whisper: bool = True,
fallback_overlap: float = 0.7
):
"""
Merge multiple videos into one using FFmpeg with Whisper-based precise trimming.
Args:
video_paths: List of video file paths to merge
output_path: Output file path
segments: Optional list of segment dicts with dialogue info for Whisper trimming
use_whisper: If True, use Whisper to find optimal trim points at speech boundaries
fallback_overlap: Fallback trim duration if Whisper fails (seconds)
"""
print_status(f"🎥 Merging {len(video_paths)} videos...")
# Try to import Whisper utilities
try:
from utils.whisper_trim import find_last_word_timestamp, is_whisper_available
WHISPER_AVAILABLE = is_whisper_available()
except ImportError:
WHISPER_AVAILABLE = False
print_status("⚠️ Whisper not available, using fallback trimming")
# Optionally trim overlap from all but the first clip
adjusted_paths = []
temp_trimmed_paths: List[str] = []
import subprocess
for idx, path in enumerate(video_paths):
if idx == 0:
adjusted_paths.append(path)
continue
# No start trimming - keep full video from beginning
# Only end trimming via Whisper is used (handled during video generation)
# Skip start trimming for all segments - use full video
adjusted_paths.append(path)
# Create a temporary file list for FFmpeg concat
list_file = "video_list.txt"
with open(list_file, 'w') as f:
for path in adjusted_paths:
f.write(f"file '{path}'\n")
try:
# Use FFmpeg to concatenate videos
cmd = [
'ffmpeg',
'-f', 'concat',
'-safe', '0',
'-i', list_file,
'-c', 'copy',
'-y',
output_path
]
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=300
)
if result.returncode != 0:
print_error(f"FFmpeg concat error: {result.stderr}")
raise Exception("Video merging failed")
print_success(f"Videos merged: {output_path}")
finally:
# Clean up temp files
if os.path.exists(list_file):
os.remove(list_file)
for tmp in temp_trimmed_paths:
if os.path.exists(tmp):
os.remove(tmp)
async def main():
"""Main execution flow"""
try:
# Banner
print_header("🎥 STANDALONE VIDEO CREATOR")
print("Automated video generation using extend video flow\n")
# Collect user inputs first (to know which provider to use)
config = collect_user_inputs()
# Check API keys based on provider
openai_api_key = get_openai_api_key()
if config['provider'] == 'kie':
kie_api_key = get_api_key()
elif config['provider'] == 'replicate':
# Replicate key will be checked when needed
pass
# Generate structured prompts using GPT-4o
print_header("GENERATING VIDEO PROMPTS")
print_status("🤖 Using GPT-4o to generate structured prompts...")
# Read reference image
with open(config['image_path'], 'rb') as f:
image_bytes = f.read()
# Create VeoInputs
veo_inputs = VeoInputs(
script=config['script'],
style=config['style'],
jsonFormat="standard",
continuationMode=True,
voiceType=config['voice_type'] if config['voice_type'] != "None" else None,
cameraStyle=config['camera_style'],
settingMode="single"
)
# Generate prompts
payload = generate_segments_payload(
inputs=veo_inputs,
image_bytes=image_bytes,
model="gpt-4o",
api_key=openai_api_key
)
# Debug: print GPT-generated segments payload
try:
print_status("🧾 Segments payload from GPT-4o:")
print(json.dumps(payload, indent=2, ensure_ascii=False))
except Exception:
print_status(f"🧾 Segments payload from GPT-4o (raw): {payload}")
segments = payload.get('segments', [])
print_success(f"Generated {len(segments)} video segments")
if not segments:
print_error("No segments generated!")
sys.exit(1)
# Generate videos
print_header("GENERATING VIDEOS")
video_paths = []
if config['provider'] == 'replicate':
# Replicate: Generate each segment independently with frame continuity
print_status("Using Replicate (google/veo-3) for video generation")
print_status("Note: Each segment uses last frame from previous trimmed segment for continuity\n")
output_dir = Path("output_videos")
output_dir.mkdir(exist_ok=True)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
# Track current reference image (starts with original)
current_image_path = config['image_path']
temp_frame_paths = [] # Track temp frame files for cleanup
for i, segment in enumerate(segments, start=1):
print_status(f"\n📹 Processing segment {i}/{len(segments)}")
# Generate video with current reference image
print_status(f" Using reference image: {current_image_path if i == 1 else 'last frame from previous segment'}")
video_path = await generate_video_replicate(
prompt=segment, # Send JSON segment directly
image_path=current_image_path,
seed=config['seed'],
aspect_ratio=config['aspect_ratio']
)
# Save generated video to output directory with proper naming
segment_output = output_dir / f"segment_{i}_untrimmed_{timestamp}.mp4"
import shutil
if os.path.exists(video_path):
shutil.move(video_path, str(segment_output))
video_path = str(segment_output)
# Trim video with Whisper to get optimal cut point
# For all segments except the last, we also extract the last frame for next segment
# For the last segment, we still trim it to avoid extra length
should_extract_frame = (i < len(segments)) # Only extract frame if not last segment
if should_extract_frame:
print_status(f" Trimming segment {i} to extract last frame for next segment...")
else:
print_status(f" Trimming segment {i} (last segment) to optimal length...")
# Get dialogue from segment for Whisper
action_timeline = segment.get('action_timeline', {})
dialogue = action_timeline.get('dialogue', '')
if dialogue:
try:
from utils.whisper_trim import find_last_word_timestamp
from utils.video_processor import extract_frame
# Find optimal trim point
last_word_time = find_last_word_timestamp(
video_path=video_path,
script=dialogue,
model_size="base"
)
if last_word_time and last_word_time > 0:
trim_point = last_word_time + 0.3 # 0.3s padding
# Trim the video - rename to indicate it's trimmed
trimmed_path = str(segment_output).replace("_untrimmed_", "_trimmed_")
import subprocess
cmd_trim = [
'ffmpeg',
'-y',
'-ss', '0',
'-i', video_path,
'-t', str(trim_point),
'-c', 'copy',
trimmed_path
]
result = subprocess.run(
cmd_trim,
capture_output=True,
text=True,
timeout=300
)
if result.returncode == 0:
# Get video duration to extract last frame
from utils.video_processor import get_video_info
info = get_video_info(trimmed_path)
duration = float(info['format']['duration'])
# Extract last frame (0.1s before end to ensure we get a frame)
frame_timestamp = max(0, duration - 0.1)
frame_path = output_dir / f"frame_segment_{i}_{timestamp}.jpg"
# Extract frame only if not the last segment
if should_extract_frame:
try:
extract_frame(
video_path=trimmed_path,
timestamp=frame_timestamp,
output_path=str(frame_path)
)
# Update current_image_path for next segment
current_image_path = str(frame_path)
temp_frame_paths.append(str(frame_path))
print_success(f" ✅ Trimmed to {trim_point:.2f}s, extracted last frame for next segment")
except Exception as frame_error:
print_error(f" ⚠️ Frame extraction failed: {str(frame_error)}")
# Still use trimmed video, but extract frame from it as fallback
try:
# Try extracting from trimmed video anyway
extract_frame(
video_path=trimmed_path,
timestamp=duration - 0.5, # Try earlier timestamp
output_path=str(frame_path)
)
current_image_path = str(frame_path)
temp_frame_paths.append(str(frame_path))
print_success(f" ✅ Trimmed to {trim_point:.2f}s (fallback frame extraction)")
except:
print_error(f" ⚠️ Frame extraction failed completely")
else:
print_success(f" ✅ Trimmed last segment to {trim_point:.2f}s")
# Use trimmed version for merging (keep untrimmed version)
# Both files are kept: _untrimmed_ and _trimmed_
video_path = trimmed_path
print_status(f" 📁 Kept untrimmed: {Path(segment_output).name}")
print_status(f" 📁 Created trimmed: {Path(trimmed_path).name}")
else:
print_error(f" ⚠️ Trimming failed: {result.stderr}")
print_status(f" Using full video")
# Extract frame from full video if needed
if should_extract_frame:
try:
from utils.video_processor import get_video_info, extract_frame
info = get_video_info(video_path)
duration = float(info['format']['duration'])
frame_timestamp = max(0, duration - 0.1)
frame_path = output_dir / f"frame_segment_{i}_{timestamp}.jpg"
extract_frame(
video_path=video_path,
timestamp=frame_timestamp,
output_path=str(frame_path)
)
current_image_path = str(frame_path)
temp_frame_paths.append(str(frame_path))
print_success(f" ✅ Extracted last frame from full video")
except Exception as frame_error:
print_error(f" ⚠️ Frame extraction failed: {str(frame_error)}")
else:
print_status(f" ⚠️ Could not find trim point, using full video")
# Extract frame from full video if needed
if should_extract_frame:
try:
from utils.video_processor import get_video_info, extract_frame
info = get_video_info(video_path)
duration = float(info['format']['duration'])
frame_timestamp = max(0, duration - 0.1)
frame_path = output_dir / f"frame_segment_{i}_{timestamp}.jpg"
extract_frame(
video_path=video_path,
timestamp=frame_timestamp,
output_path=str(frame_path)
)
current_image_path = str(frame_path)
temp_frame_paths.append(str(frame_path))
print_success(f" ✅ Extracted last frame from full video")
except Exception as frame_error:
print_error(f" ⚠️ Frame extraction failed: {str(frame_error)}")
except Exception as e:
print_error(f" ⚠️ Whisper trimming failed: {str(e)}")
print_status(f" Using full video, will extract last frame")
# Fallback: extract last frame from full video
try:
from utils.video_processor import get_video_info, extract_frame
info = get_video_info(video_path)
duration = float(info['format']['duration'])
frame_timestamp = max(0, duration - 0.1)
frame_path = output_dir / f"frame_segment_{i}_{timestamp}.jpg"
extract_frame(
video_path=video_path,
timestamp=frame_timestamp,
output_path=str(frame_path)
)
current_image_path = str(frame_path)
temp_frame_paths.append(str(frame_path))
print_status(f" ✅ Extracted last frame from full video")
except Exception as e:
print_error(f" ⚠️ Frame extraction failed: {str(e)}")
print_error(f" ⚠️ Next segment will use previous frame or original image")
else:
# No dialogue - still try to trim if we can, or just extract frame
if should_extract_frame:
print_status(f" No dialogue in segment {i}, extracting last frame from full video...")
try:
from utils.video_processor import get_video_info, extract_frame
info = get_video_info(video_path)
duration = float(info['format']['duration'])
frame_timestamp = max(0, duration - 0.1)
frame_path = output_dir / f"frame_segment_{i}_{timestamp}.jpg"
extract_frame(
video_path=video_path,
timestamp=frame_timestamp,
output_path=str(frame_path)
)
current_image_path = str(frame_path)
temp_frame_paths.append(str(frame_path))
print_status(f" ✅ Extracted last frame (no dialogue in segment)")
except Exception as e:
print_error(f" ⚠️ Frame extraction failed: {str(e)}")
print_error(f" ⚠️ Next segment will use previous frame or original image")
else:
print_status(f" No dialogue in last segment, using full video")
video_paths.append(video_path)
else:
# KIE API: Use extend video flow
print_status("Using KIE API for video generation with extend flow\n")
video_urls = []
task_ids = []
# Generate first video
first_prompt = segments[0]
task_id = await generate_initial_video(
prompt=first_prompt,
image_path=config['image_path'],
api_key=kie_api_key,
model=config['model'],
aspect_ratio=config['aspect_ratio'],
voice_type=config['voice_type'],
seed=config['seed']
)
task_ids.append(task_id)
# Wait for callback result
video_url = await wait_for_callback_result(task_id)
video_urls.append(video_url)
# Extend video for remaining segments
for i, segment in enumerate(segments[1:], start=2):
print_status(f"\n📹 Processing segment {i}/{len(segments)}")
# Extend from previous task
task_id = await extend_video(
task_id=task_ids[-1], # Use the last task ID
prompt=segment,
api_key=kie_api_key,
voice_type=config['voice_type'],
seed=config['seed']
)
task_ids.append(task_id)
# Wait for callback result
video_url = await wait_for_callback_result(task_id)
video_urls.append(video_url)
# Download all videos from URLs
print_header("DOWNLOADING VIDEOS")
output_dir = Path("output_videos")
output_dir.mkdir(exist_ok=True)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
for i, url in enumerate(video_urls, start=1):
output_path = output_dir / f"segment_{i}_{timestamp}.mp4"
await download_video(url, str(output_path))
video_paths.append(str(output_path))
# For Replicate, videos are already in output directory with proper naming
# Files are named: segment_{i}_untrimmed_{timestamp}.mp4 and segment_{i}_trimmed_{timestamp}.mp4
# Both versions are kept - untrimmed and trimmed
# video_paths contains the trimmed versions which will be used for merging
# No need to rename - they're already properly named
# Merge videos
print_header("MERGING VIDEOS")
output_dir = Path("output_videos")
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
final_output = output_dir / f"final_video_{timestamp}.mp4"
# Pass segments to merge_videos for Whisper-based trimming
# For Replicate, videos are already trimmed during generation, so we skip trimming
# For KIE, we need to trim during merge
skip_trimming = (config['provider'] == 'replicate')
await merge_videos(
video_paths,
str(final_output),
segments=segments, # Pass segments for Whisper to find optimal trim points
use_whisper=not skip_trimming, # Skip Whisper trimming for Replicate (already done)
fallback_overlap=0.7 if not skip_trimming else 0 # No trimming for Replicate
)
# Success
print_header("✨ VIDEO CREATION COMPLETE!")
print_success(f"Final video saved to: {final_output}")
print(f"\nGenerated {len(segments)} segments")
print(f"Total processing time: {time.strftime('%M:%S', time.gmtime(time.time()))}")
except KeyboardInterrupt:
print_error("\n\nVideo creation cancelled by user")
sys.exit(1)
except Exception as e:
print_error(f"\nVideo creation failed: {str(e)}")
import traceback
traceback.print_exc()
sys.exit(1)
if __name__ == "__main__":
# Run the async main function
asyncio.run(main())