|
|
""" |
|
|
视频处理模块 |
|
|
""" |
|
|
|
|
|
import gradio as gr |
|
|
from typing import List, Tuple, Optional |
|
|
|
|
|
from .veo3_client import Veo3Client |
|
|
from ..utils.file_utils import cleanup_temp_file |
|
|
|
|
|
|
|
|
class VideoProcessor: |
|
|
"""视频处理器类""" |
|
|
|
|
|
def __init__(self): |
|
|
self.client = None |
|
|
|
|
|
def set_api_key(self, api_key: str): |
|
|
"""设置API密钥""" |
|
|
self.client = Veo3Client(api_key) |
|
|
|
|
|
def process_veo3_video( |
|
|
self, |
|
|
prompt: str, |
|
|
uploaded_files: List[str], |
|
|
api_key: str, |
|
|
aspect_ratio: str = "16:9", |
|
|
watermark: str = "", |
|
|
seeds: Optional[int] = None, |
|
|
enable_fallback: bool = False, |
|
|
progress: gr.Progress = None |
|
|
) -> Tuple[Optional[str], str]: |
|
|
""" |
|
|
处理Veo3视频生成的主函数 |
|
|
""" |
|
|
|
|
|
if not api_key or not api_key.strip(): |
|
|
return None, "❌ Please enter API Key" |
|
|
|
|
|
if not prompt or not prompt.strip(): |
|
|
return None, "❌ Please enter a prompt" |
|
|
|
|
|
|
|
|
self.set_api_key(api_key.strip()) |
|
|
|
|
|
|
|
|
image_urls = [] |
|
|
file_paths = [] |
|
|
|
|
|
print(f"DEBUG: uploaded_files = {uploaded_files}, type = {type(uploaded_files)}") |
|
|
|
|
|
if uploaded_files: |
|
|
|
|
|
file_paths = uploaded_files if isinstance(uploaded_files, list) else [uploaded_files] |
|
|
|
|
|
if len(file_paths) > 1: |
|
|
return None, "❌ Maximum 1 image allowed for video generation" |
|
|
|
|
|
try: |
|
|
if progress: |
|
|
progress(0.1, desc="📤 Processing input...") |
|
|
|
|
|
|
|
|
if file_paths: |
|
|
for i, file_path in enumerate(file_paths): |
|
|
if progress: |
|
|
progress(0.1 + (0.2 * i / len(file_paths)), desc=f"📤 Uploading image {i + 1}/{len(file_paths)}...") |
|
|
|
|
|
success, result = self.client.upload_file(file_path) |
|
|
if success: |
|
|
image_urls.append(result) |
|
|
|
|
|
cleanup_temp_file(file_path) |
|
|
else: |
|
|
return None, f"❌ Failed to upload image {i + 1}: {result}" |
|
|
|
|
|
if progress: |
|
|
progress(0.3, desc="🚀 Creating video generation task...") |
|
|
|
|
|
|
|
|
status_code, result = self.client.create_task( |
|
|
prompt.strip(), |
|
|
image_urls, |
|
|
aspect_ratio=aspect_ratio, |
|
|
watermark=watermark, |
|
|
seeds=seeds, |
|
|
enable_fallback=enable_fallback |
|
|
) |
|
|
|
|
|
if status_code != 200: |
|
|
return None, f"❌ Failed to create task: {result}" |
|
|
|
|
|
task_id = result |
|
|
if progress: |
|
|
progress(0.4, desc=f"📋 Task ID: {task_id}") |
|
|
|
|
|
if progress: |
|
|
progress(0.5, desc="⏳ Generating video, please wait...") |
|
|
|
|
|
|
|
|
status_code, result = self.client.get_task_result(task_id) |
|
|
|
|
|
if status_code != 200: |
|
|
return None, f"❌ Video generation failed: {result}" |
|
|
|
|
|
if progress: |
|
|
progress(0.9, desc="📥 Processing video...") |
|
|
|
|
|
|
|
|
if result: |
|
|
if progress: |
|
|
progress(1.0, desc="✅ Complete!") |
|
|
return result, f"✅ Successfully generated video! Task ID: {task_id}" |
|
|
else: |
|
|
return None, "❌ No video URL received" |
|
|
|
|
|
except Exception as e: |
|
|
return None, f"❌ Error occurred during processing: {str(e)}" |
|
|
finally: |
|
|
|
|
|
try: |
|
|
for file_path in file_paths: |
|
|
cleanup_temp_file(file_path) |
|
|
except: |
|
|
pass |
|
|
|