""" 视频处理模块 """ import gradio as gr from typing import List, Tuple, Optional from .veo3_client import Veo3Client from ..utils.file_utils import cleanup_temp_file class VideoProcessor: """视频处理器类""" def __init__(self): self.client = None def set_api_key(self, api_key: str): """设置API密钥""" self.client = Veo3Client(api_key) def process_veo3_video( self, prompt: str, uploaded_files: List[str], api_key: str, aspect_ratio: str = "16:9", watermark: str = "", seeds: Optional[int] = None, enable_fallback: bool = False, progress: gr.Progress = None ) -> Tuple[Optional[str], str]: """ 处理Veo3视频生成的主函数 """ # 验证输入 if not api_key or not api_key.strip(): return None, "❌ Please enter API Key" if not prompt or not prompt.strip(): return None, "❌ Please enter a prompt" # 设置客户端 self.set_api_key(api_key.strip()) # 图片是可选的,用于image-to-video image_urls = [] file_paths = [] print(f"DEBUG: uploaded_files = {uploaded_files}, type = {type(uploaded_files)}") if uploaded_files: # 确保是列表格式 file_paths = uploaded_files if isinstance(uploaded_files, list) else [uploaded_files] # 验证图片数量 if len(file_paths) > 1: return None, "❌ Maximum 1 image allowed for video generation" try: if progress: progress(0.1, desc="📤 Processing input...") # 如果有图片,上传到 KIE AI 并获取公开URL if file_paths: for i, file_path in enumerate(file_paths): if progress: progress(0.1 + (0.2 * i / len(file_paths)), desc=f"📤 Uploading image {i + 1}/{len(file_paths)}...") success, result = self.client.upload_file(file_path) if success: image_urls.append(result) # 上传成功后删除本地临时文件 cleanup_temp_file(file_path) else: return None, f"❌ Failed to upload image {i + 1}: {result}" if progress: progress(0.3, desc="🚀 Creating video generation task...") # 创建Veo3任务 status_code, result = self.client.create_task( prompt.strip(), image_urls, aspect_ratio=aspect_ratio, watermark=watermark, seeds=seeds, enable_fallback=enable_fallback ) if status_code != 200: return None, f"❌ Failed to create task: {result}" task_id = result if progress: progress(0.4, desc=f"📋 Task ID: {task_id}") if progress: progress(0.5, desc="⏳ Generating video, please wait...") # 轮询获取结果 status_code, result = self.client.get_task_result(task_id) if status_code != 200: return None, f"❌ Video generation failed: {result}" if progress: progress(0.9, desc="📥 Processing video...") # 处理返回的视频URL if result: if progress: progress(1.0, desc="✅ Complete!") return result, f"✅ Successfully generated video! Task ID: {task_id}" else: return None, "❌ No video URL received" except Exception as e: return None, f"❌ Error occurred during processing: {str(e)}" finally: # 最终清理:删除任何剩余的临时文件 try: for file_path in file_paths: cleanup_temp_file(file_path) except: pass