File size: 6,557 Bytes
0ab5288
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
"""
Veo3 API客户端
"""

import json
import time
import os
import uuid
import base64
import requests
from typing import List, Tuple, Optional

from ..config import API_CONFIG, MIME_TYPES


class Veo3Client:
    """Veo3 API客户端类"""
    
    def __init__(self, api_key: str):
        self.api_key = api_key
        self.headers = {
            "Authorization": f"Bearer {api_key}",
            "Content-Type": "application/json"
        }
    
    def upload_file(self, file_path: str) -> Tuple[bool, str]:
        """
        上传文件到 KIE AI 的文件存储服务(使用Base64上传)
        返回文件的公开访问URL
        """
        url = API_CONFIG["FILE_UPLOAD_URL"]
        headers = self.headers.copy()

        # 获取文件名和扩展名
        file_name = os.path.basename(file_path)
        file_ext = os.path.splitext(file_name)[1].lower()
        if file_ext is None or file_ext == "":
            file_ext = ".jpg"
        file_name = uuid.uuid4().hex + file_ext

        # 根据文件扩展名确定MIME类型
        mime_type = MIME_TYPES.get(file_ext, 'image/jpeg')

        try:
            print("hellow")
            with open(file_path, 'rb') as file_handle:
                # 读取文件并转换为Base64
                file_data = file_handle.read()
                base64_data = base64.b64encode(file_data).decode('utf-8')
                
                # 构建data URL格式
                data_url = f"data:{mime_type};base64,{base64_data}"
                
                # 准备请求数据
                payload = {
                    "base64Data": data_url,
                    "uploadPath": "images/veo3",
                    "fileName": file_name
                }

                response = requests.post(
                    url, 
                    headers=headers, 
                    json=payload,
                    timeout=API_CONFIG["TIMEOUT"]
                )
            print(response.status_code)
            if response.status_code == 200:
                resp_json = response.json()
                print(resp_json)
                if resp_json.get("success") and resp_json.get("data"):
                    # 返回下载URL
                    return True, resp_json["data"]["downloadUrl"]
                else:
                    return False, resp_json.get("msg", "Upload failed")
            else:
                return False, f"Upload error, please try again later"
        except Exception as e:
            return False, f"Upload error: {str(e)}"

    def create_task(
        self, 
        prompt: str, 
        image_urls: List[str] = None,
        aspect_ratio: str = "16:9", 
        watermark: str = "", 
        seeds: Optional[int] = None, 
        enable_fallback: bool = False
    ) -> Tuple[int, str]:
        """
        创建Veo3视频生成任务
        """
        headers = {
            "Content-Type": "application/json",
            "Authorization": f"Bearer {self.api_key}"
        }
        url = API_CONFIG["VEO3_GENERATE_URL"]
        
        # 构建Veo3 API请求参数
        payload = {
            "prompt": prompt,
            "model": "veo3",
            "aspectRatio": aspect_ratio,
            "enableFallback": enable_fallback
        }
        
        # 添加可选参数
        if image_urls:
            payload["imageUrls"] = image_urls
        if watermark:
            payload["watermark"] = watermark
        if seeds is not None and seeds > 0:
            payload["seeds"] = seeds
        print(json.dumps(payload), headers)
        try:
            response = requests.post(
                url, 
                headers=headers, 
                data=json.dumps(payload), 
                timeout=30
            )
            if response.status_code == 200:
                resp_json = response.json()
                print(resp_json)
                if resp_json.get("code") == 200:
                    return 200, resp_json["data"]["taskId"]
                else:
                    return 500, resp_json.get("msg", "API request failed")
            else:
                return response.status_code, f"HTTP {response.status_code}: {response.text}"
        except Exception as e:
            return 500, f"Request failed: {str(e)}"

    def get_task_result(self, task_id: str) -> Tuple[int, str]:
        """
        获取任务结果
        """
        start_time = time.time()
        url = API_CONFIG["VEO3_DETAILS_URL"]
        params = {"taskId": task_id}
        headers = {"Authorization": f"Bearer {self.api_key}"}
        
        while time.time() - start_time < API_CONFIG["TASK_TIMEOUT"]:
            try:
                response = requests.get(
                    url, 
                    headers=headers, 
                    params=params, 
                    timeout=API_CONFIG["TIMEOUT"]
                )
                if response.status_code == 200:
                    resp_json = response.json()
                    print(resp_json)
                    if resp_json.get("code") == 200:
                        data = resp_json.get('data', {})
                        success_flag = data.get('successFlag')
                        
                        if success_flag == 1:  # 成功
                            # 获取视频URL
                            response_data = data.get('response', {})
                            result_urls = response_data.get('resultUrls', [])
                            if result_urls:
                                video_url = result_urls[0]  # 取第一个视频URL
                                return 200, video_url
                            else:
                                return 500, "Video URL not found in response"
                        elif success_flag == 2 or success_flag == 3:  # 失败
                            error_msg = data.get('errorMessage', 'Task failed')
                            return 500, error_msg
                        else:  # success_flag == 0 或 None,任务仍在处理中
                            time.sleep(API_CONFIG["POLL_INTERVAL"])
                            continue
                    else:
                        return 500, resp_json.get("msg", "API request failed")
                else:
                    return response.status_code, f"HTTP {response.status_code}: {response.text}"
            except Exception as e:
                pass
            time.sleep(API_CONFIG["POLL_INTERVAL"])
        
        return 500, "Task timeout, please try again later"