veo3-api / src /api /veo3_client.py
dmmmmm's picture
Upload 28 files
0ab5288 verified
"""
Veo3 API客户端
"""
import json
import time
import os
import uuid
import base64
import requests
from typing import List, Tuple, Optional
from ..config import API_CONFIG, MIME_TYPES
class Veo3Client:
"""Veo3 API客户端类"""
def __init__(self, api_key: str):
self.api_key = api_key
self.headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
def upload_file(self, file_path: str) -> Tuple[bool, str]:
"""
上传文件到 KIE AI 的文件存储服务(使用Base64上传)
返回文件的公开访问URL
"""
url = API_CONFIG["FILE_UPLOAD_URL"]
headers = self.headers.copy()
# 获取文件名和扩展名
file_name = os.path.basename(file_path)
file_ext = os.path.splitext(file_name)[1].lower()
if file_ext is None or file_ext == "":
file_ext = ".jpg"
file_name = uuid.uuid4().hex + file_ext
# 根据文件扩展名确定MIME类型
mime_type = MIME_TYPES.get(file_ext, 'image/jpeg')
try:
print("hellow")
with open(file_path, 'rb') as file_handle:
# 读取文件并转换为Base64
file_data = file_handle.read()
base64_data = base64.b64encode(file_data).decode('utf-8')
# 构建data URL格式
data_url = f"data:{mime_type};base64,{base64_data}"
# 准备请求数据
payload = {
"base64Data": data_url,
"uploadPath": "images/veo3",
"fileName": file_name
}
response = requests.post(
url,
headers=headers,
json=payload,
timeout=API_CONFIG["TIMEOUT"]
)
print(response.status_code)
if response.status_code == 200:
resp_json = response.json()
print(resp_json)
if resp_json.get("success") and resp_json.get("data"):
# 返回下载URL
return True, resp_json["data"]["downloadUrl"]
else:
return False, resp_json.get("msg", "Upload failed")
else:
return False, f"Upload error, please try again later"
except Exception as e:
return False, f"Upload error: {str(e)}"
def create_task(
self,
prompt: str,
image_urls: List[str] = None,
aspect_ratio: str = "16:9",
watermark: str = "",
seeds: Optional[int] = None,
enable_fallback: bool = False
) -> Tuple[int, str]:
"""
创建Veo3视频生成任务
"""
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {self.api_key}"
}
url = API_CONFIG["VEO3_GENERATE_URL"]
# 构建Veo3 API请求参数
payload = {
"prompt": prompt,
"model": "veo3",
"aspectRatio": aspect_ratio,
"enableFallback": enable_fallback
}
# 添加可选参数
if image_urls:
payload["imageUrls"] = image_urls
if watermark:
payload["watermark"] = watermark
if seeds is not None and seeds > 0:
payload["seeds"] = seeds
print(json.dumps(payload), headers)
try:
response = requests.post(
url,
headers=headers,
data=json.dumps(payload),
timeout=30
)
if response.status_code == 200:
resp_json = response.json()
print(resp_json)
if resp_json.get("code") == 200:
return 200, resp_json["data"]["taskId"]
else:
return 500, resp_json.get("msg", "API request failed")
else:
return response.status_code, f"HTTP {response.status_code}: {response.text}"
except Exception as e:
return 500, f"Request failed: {str(e)}"
def get_task_result(self, task_id: str) -> Tuple[int, str]:
"""
获取任务结果
"""
start_time = time.time()
url = API_CONFIG["VEO3_DETAILS_URL"]
params = {"taskId": task_id}
headers = {"Authorization": f"Bearer {self.api_key}"}
while time.time() - start_time < API_CONFIG["TASK_TIMEOUT"]:
try:
response = requests.get(
url,
headers=headers,
params=params,
timeout=API_CONFIG["TIMEOUT"]
)
if response.status_code == 200:
resp_json = response.json()
print(resp_json)
if resp_json.get("code") == 200:
data = resp_json.get('data', {})
success_flag = data.get('successFlag')
if success_flag == 1: # 成功
# 获取视频URL
response_data = data.get('response', {})
result_urls = response_data.get('resultUrls', [])
if result_urls:
video_url = result_urls[0] # 取第一个视频URL
return 200, video_url
else:
return 500, "Video URL not found in response"
elif success_flag == 2 or success_flag == 3: # 失败
error_msg = data.get('errorMessage', 'Task failed')
return 500, error_msg
else: # success_flag == 0 或 None,任务仍在处理中
time.sleep(API_CONFIG["POLL_INTERVAL"])
continue
else:
return 500, resp_json.get("msg", "API request failed")
else:
return response.status_code, f"HTTP {response.status_code}: {response.text}"
except Exception as e:
pass
time.sleep(API_CONFIG["POLL_INTERVAL"])
return 500, "Task timeout, please try again later"