File size: 6,557 Bytes
0ab5288 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 |
"""
Veo3 API客户端
"""
import json
import time
import os
import uuid
import base64
import requests
from typing import List, Tuple, Optional
from ..config import API_CONFIG, MIME_TYPES
class Veo3Client:
"""Veo3 API客户端类"""
def __init__(self, api_key: str):
self.api_key = api_key
self.headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
def upload_file(self, file_path: str) -> Tuple[bool, str]:
"""
上传文件到 KIE AI 的文件存储服务(使用Base64上传)
返回文件的公开访问URL
"""
url = API_CONFIG["FILE_UPLOAD_URL"]
headers = self.headers.copy()
# 获取文件名和扩展名
file_name = os.path.basename(file_path)
file_ext = os.path.splitext(file_name)[1].lower()
if file_ext is None or file_ext == "":
file_ext = ".jpg"
file_name = uuid.uuid4().hex + file_ext
# 根据文件扩展名确定MIME类型
mime_type = MIME_TYPES.get(file_ext, 'image/jpeg')
try:
print("hellow")
with open(file_path, 'rb') as file_handle:
# 读取文件并转换为Base64
file_data = file_handle.read()
base64_data = base64.b64encode(file_data).decode('utf-8')
# 构建data URL格式
data_url = f"data:{mime_type};base64,{base64_data}"
# 准备请求数据
payload = {
"base64Data": data_url,
"uploadPath": "images/veo3",
"fileName": file_name
}
response = requests.post(
url,
headers=headers,
json=payload,
timeout=API_CONFIG["TIMEOUT"]
)
print(response.status_code)
if response.status_code == 200:
resp_json = response.json()
print(resp_json)
if resp_json.get("success") and resp_json.get("data"):
# 返回下载URL
return True, resp_json["data"]["downloadUrl"]
else:
return False, resp_json.get("msg", "Upload failed")
else:
return False, f"Upload error, please try again later"
except Exception as e:
return False, f"Upload error: {str(e)}"
def create_task(
self,
prompt: str,
image_urls: List[str] = None,
aspect_ratio: str = "16:9",
watermark: str = "",
seeds: Optional[int] = None,
enable_fallback: bool = False
) -> Tuple[int, str]:
"""
创建Veo3视频生成任务
"""
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {self.api_key}"
}
url = API_CONFIG["VEO3_GENERATE_URL"]
# 构建Veo3 API请求参数
payload = {
"prompt": prompt,
"model": "veo3",
"aspectRatio": aspect_ratio,
"enableFallback": enable_fallback
}
# 添加可选参数
if image_urls:
payload["imageUrls"] = image_urls
if watermark:
payload["watermark"] = watermark
if seeds is not None and seeds > 0:
payload["seeds"] = seeds
print(json.dumps(payload), headers)
try:
response = requests.post(
url,
headers=headers,
data=json.dumps(payload),
timeout=30
)
if response.status_code == 200:
resp_json = response.json()
print(resp_json)
if resp_json.get("code") == 200:
return 200, resp_json["data"]["taskId"]
else:
return 500, resp_json.get("msg", "API request failed")
else:
return response.status_code, f"HTTP {response.status_code}: {response.text}"
except Exception as e:
return 500, f"Request failed: {str(e)}"
def get_task_result(self, task_id: str) -> Tuple[int, str]:
"""
获取任务结果
"""
start_time = time.time()
url = API_CONFIG["VEO3_DETAILS_URL"]
params = {"taskId": task_id}
headers = {"Authorization": f"Bearer {self.api_key}"}
while time.time() - start_time < API_CONFIG["TASK_TIMEOUT"]:
try:
response = requests.get(
url,
headers=headers,
params=params,
timeout=API_CONFIG["TIMEOUT"]
)
if response.status_code == 200:
resp_json = response.json()
print(resp_json)
if resp_json.get("code") == 200:
data = resp_json.get('data', {})
success_flag = data.get('successFlag')
if success_flag == 1: # 成功
# 获取视频URL
response_data = data.get('response', {})
result_urls = response_data.get('resultUrls', [])
if result_urls:
video_url = result_urls[0] # 取第一个视频URL
return 200, video_url
else:
return 500, "Video URL not found in response"
elif success_flag == 2 or success_flag == 3: # 失败
error_msg = data.get('errorMessage', 'Task failed')
return 500, error_msg
else: # success_flag == 0 或 None,任务仍在处理中
time.sleep(API_CONFIG["POLL_INTERVAL"])
continue
else:
return 500, resp_json.get("msg", "API request failed")
else:
return response.status_code, f"HTTP {response.status_code}: {response.text}"
except Exception as e:
pass
time.sleep(API_CONFIG["POLL_INTERVAL"])
return 500, "Task timeout, please try again later"
|