""" Veo3 API客户端 """ import json import time import os import uuid import base64 import requests from typing import List, Tuple, Optional from ..config import API_CONFIG, MIME_TYPES class Veo3Client: """Veo3 API客户端类""" def __init__(self, api_key: str): self.api_key = api_key self.headers = { "Authorization": f"Bearer {api_key}", "Content-Type": "application/json" } def upload_file(self, file_path: str) -> Tuple[bool, str]: """ 上传文件到 KIE AI 的文件存储服务(使用Base64上传) 返回文件的公开访问URL """ url = API_CONFIG["FILE_UPLOAD_URL"] headers = self.headers.copy() # 获取文件名和扩展名 file_name = os.path.basename(file_path) file_ext = os.path.splitext(file_name)[1].lower() if file_ext is None or file_ext == "": file_ext = ".jpg" file_name = uuid.uuid4().hex + file_ext # 根据文件扩展名确定MIME类型 mime_type = MIME_TYPES.get(file_ext, 'image/jpeg') try: print("hellow") with open(file_path, 'rb') as file_handle: # 读取文件并转换为Base64 file_data = file_handle.read() base64_data = base64.b64encode(file_data).decode('utf-8') # 构建data URL格式 data_url = f"data:{mime_type};base64,{base64_data}" # 准备请求数据 payload = { "base64Data": data_url, "uploadPath": "images/veo3", "fileName": file_name } response = requests.post( url, headers=headers, json=payload, timeout=API_CONFIG["TIMEOUT"] ) print(response.status_code) if response.status_code == 200: resp_json = response.json() print(resp_json) if resp_json.get("success") and resp_json.get("data"): # 返回下载URL return True, resp_json["data"]["downloadUrl"] else: return False, resp_json.get("msg", "Upload failed") else: return False, f"Upload error, please try again later" except Exception as e: return False, f"Upload error: {str(e)}" def create_task( self, prompt: str, image_urls: List[str] = None, aspect_ratio: str = "16:9", watermark: str = "", seeds: Optional[int] = None, enable_fallback: bool = False ) -> Tuple[int, str]: """ 创建Veo3视频生成任务 """ headers = { "Content-Type": "application/json", "Authorization": f"Bearer {self.api_key}" } url = API_CONFIG["VEO3_GENERATE_URL"] # 构建Veo3 API请求参数 payload = { "prompt": prompt, "model": "veo3", "aspectRatio": aspect_ratio, "enableFallback": enable_fallback } # 添加可选参数 if image_urls: payload["imageUrls"] = image_urls if watermark: payload["watermark"] = watermark if seeds is not None and seeds > 0: payload["seeds"] = seeds print(json.dumps(payload), headers) try: response = requests.post( url, headers=headers, data=json.dumps(payload), timeout=30 ) if response.status_code == 200: resp_json = response.json() print(resp_json) if resp_json.get("code") == 200: return 200, resp_json["data"]["taskId"] else: return 500, resp_json.get("msg", "API request failed") else: return response.status_code, f"HTTP {response.status_code}: {response.text}" except Exception as e: return 500, f"Request failed: {str(e)}" def get_task_result(self, task_id: str) -> Tuple[int, str]: """ 获取任务结果 """ start_time = time.time() url = API_CONFIG["VEO3_DETAILS_URL"] params = {"taskId": task_id} headers = {"Authorization": f"Bearer {self.api_key}"} while time.time() - start_time < API_CONFIG["TASK_TIMEOUT"]: try: response = requests.get( url, headers=headers, params=params, timeout=API_CONFIG["TIMEOUT"] ) if response.status_code == 200: resp_json = response.json() print(resp_json) if resp_json.get("code") == 200: data = resp_json.get('data', {}) success_flag = data.get('successFlag') if success_flag == 1: # 成功 # 获取视频URL response_data = data.get('response', {}) result_urls = response_data.get('resultUrls', []) if result_urls: video_url = result_urls[0] # 取第一个视频URL return 200, video_url else: return 500, "Video URL not found in response" elif success_flag == 2 or success_flag == 3: # 失败 error_msg = data.get('errorMessage', 'Task failed') return 500, error_msg else: # success_flag == 0 或 None,任务仍在处理中 time.sleep(API_CONFIG["POLL_INTERVAL"]) continue else: return 500, resp_json.get("msg", "API request failed") else: return response.status_code, f"HTTP {response.status_code}: {response.text}" except Exception as e: pass time.sleep(API_CONFIG["POLL_INTERVAL"]) return 500, "Task timeout, please try again later"