|
|
""" |
|
|
Veo3 API客户端 |
|
|
""" |
|
|
|
|
|
import json |
|
|
import time |
|
|
import os |
|
|
import uuid |
|
|
import base64 |
|
|
import requests |
|
|
from typing import List, Tuple, Optional |
|
|
|
|
|
from ..config import API_CONFIG, MIME_TYPES |
|
|
|
|
|
|
|
|
class Veo3Client: |
|
|
"""Veo3 API客户端类""" |
|
|
|
|
|
def __init__(self, api_key: str): |
|
|
self.api_key = api_key |
|
|
self.headers = { |
|
|
"Authorization": f"Bearer {api_key}", |
|
|
"Content-Type": "application/json" |
|
|
} |
|
|
|
|
|
def upload_file(self, file_path: str) -> Tuple[bool, str]: |
|
|
""" |
|
|
上传文件到 KIE AI 的文件存储服务(使用Base64上传) |
|
|
返回文件的公开访问URL |
|
|
""" |
|
|
url = API_CONFIG["FILE_UPLOAD_URL"] |
|
|
headers = self.headers.copy() |
|
|
|
|
|
|
|
|
file_name = os.path.basename(file_path) |
|
|
file_ext = os.path.splitext(file_name)[1].lower() |
|
|
if file_ext is None or file_ext == "": |
|
|
file_ext = ".jpg" |
|
|
file_name = uuid.uuid4().hex + file_ext |
|
|
|
|
|
|
|
|
mime_type = MIME_TYPES.get(file_ext, 'image/jpeg') |
|
|
|
|
|
try: |
|
|
print("hellow") |
|
|
with open(file_path, 'rb') as file_handle: |
|
|
|
|
|
file_data = file_handle.read() |
|
|
base64_data = base64.b64encode(file_data).decode('utf-8') |
|
|
|
|
|
|
|
|
data_url = f"data:{mime_type};base64,{base64_data}" |
|
|
|
|
|
|
|
|
payload = { |
|
|
"base64Data": data_url, |
|
|
"uploadPath": "images/veo3", |
|
|
"fileName": file_name |
|
|
} |
|
|
|
|
|
response = requests.post( |
|
|
url, |
|
|
headers=headers, |
|
|
json=payload, |
|
|
timeout=API_CONFIG["TIMEOUT"] |
|
|
) |
|
|
print(response.status_code) |
|
|
if response.status_code == 200: |
|
|
resp_json = response.json() |
|
|
print(resp_json) |
|
|
if resp_json.get("success") and resp_json.get("data"): |
|
|
|
|
|
return True, resp_json["data"]["downloadUrl"] |
|
|
else: |
|
|
return False, resp_json.get("msg", "Upload failed") |
|
|
else: |
|
|
return False, f"Upload error, please try again later" |
|
|
except Exception as e: |
|
|
return False, f"Upload error: {str(e)}" |
|
|
|
|
|
def create_task( |
|
|
self, |
|
|
prompt: str, |
|
|
image_urls: List[str] = None, |
|
|
aspect_ratio: str = "16:9", |
|
|
watermark: str = "", |
|
|
seeds: Optional[int] = None, |
|
|
enable_fallback: bool = False |
|
|
) -> Tuple[int, str]: |
|
|
""" |
|
|
创建Veo3视频生成任务 |
|
|
""" |
|
|
headers = { |
|
|
"Content-Type": "application/json", |
|
|
"Authorization": f"Bearer {self.api_key}" |
|
|
} |
|
|
url = API_CONFIG["VEO3_GENERATE_URL"] |
|
|
|
|
|
|
|
|
payload = { |
|
|
"prompt": prompt, |
|
|
"model": "veo3", |
|
|
"aspectRatio": aspect_ratio, |
|
|
"enableFallback": enable_fallback |
|
|
} |
|
|
|
|
|
|
|
|
if image_urls: |
|
|
payload["imageUrls"] = image_urls |
|
|
if watermark: |
|
|
payload["watermark"] = watermark |
|
|
if seeds is not None and seeds > 0: |
|
|
payload["seeds"] = seeds |
|
|
print(json.dumps(payload), headers) |
|
|
try: |
|
|
response = requests.post( |
|
|
url, |
|
|
headers=headers, |
|
|
data=json.dumps(payload), |
|
|
timeout=30 |
|
|
) |
|
|
if response.status_code == 200: |
|
|
resp_json = response.json() |
|
|
print(resp_json) |
|
|
if resp_json.get("code") == 200: |
|
|
return 200, resp_json["data"]["taskId"] |
|
|
else: |
|
|
return 500, resp_json.get("msg", "API request failed") |
|
|
else: |
|
|
return response.status_code, f"HTTP {response.status_code}: {response.text}" |
|
|
except Exception as e: |
|
|
return 500, f"Request failed: {str(e)}" |
|
|
|
|
|
def get_task_result(self, task_id: str) -> Tuple[int, str]: |
|
|
""" |
|
|
获取任务结果 |
|
|
""" |
|
|
start_time = time.time() |
|
|
url = API_CONFIG["VEO3_DETAILS_URL"] |
|
|
params = {"taskId": task_id} |
|
|
headers = {"Authorization": f"Bearer {self.api_key}"} |
|
|
|
|
|
while time.time() - start_time < API_CONFIG["TASK_TIMEOUT"]: |
|
|
try: |
|
|
response = requests.get( |
|
|
url, |
|
|
headers=headers, |
|
|
params=params, |
|
|
timeout=API_CONFIG["TIMEOUT"] |
|
|
) |
|
|
if response.status_code == 200: |
|
|
resp_json = response.json() |
|
|
print(resp_json) |
|
|
if resp_json.get("code") == 200: |
|
|
data = resp_json.get('data', {}) |
|
|
success_flag = data.get('successFlag') |
|
|
|
|
|
if success_flag == 1: |
|
|
|
|
|
response_data = data.get('response', {}) |
|
|
result_urls = response_data.get('resultUrls', []) |
|
|
if result_urls: |
|
|
video_url = result_urls[0] |
|
|
return 200, video_url |
|
|
else: |
|
|
return 500, "Video URL not found in response" |
|
|
elif success_flag == 2 or success_flag == 3: |
|
|
error_msg = data.get('errorMessage', 'Task failed') |
|
|
return 500, error_msg |
|
|
else: |
|
|
time.sleep(API_CONFIG["POLL_INTERVAL"]) |
|
|
continue |
|
|
else: |
|
|
return 500, resp_json.get("msg", "API request failed") |
|
|
else: |
|
|
return response.status_code, f"HTTP {response.status_code}: {response.text}" |
|
|
except Exception as e: |
|
|
pass |
|
|
time.sleep(API_CONFIG["POLL_INTERVAL"]) |
|
|
|
|
|
return 500, "Task timeout, please try again later" |
|
|
|