File size: 7,371 Bytes
4b828b4
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
#!/usr/bin/env python3
"""
自动化脚本:为3D手办功能生成新的示例结果图片
"""

import asyncio
import base64
import httpx
import json
import os
import time
from PIL import Image
from io import BytesIO

# 配置
API_BASE_URL = "https://1251722089-fabl8rzpod.ap-singapore.tencentscf.com"
INPUT_IMAGE_PATH = "examples/figure_3d_input_example.jpeg"
OUTPUT_DIR = "examples/results"
TIMEOUT_SECONDS = 600  # 10分钟

# 认证配置
AUTH_HEADERS = {
    "Content-Type": "application/json",
    "X-API-Auth": "local_development_token_12345678"
}

# 3D手办风格配置
STYLES = [
    ("professional_lighting", "💡 专业灯光场景"),
    ("collector_shelf", "📚 爱好者收藏架场景"),
    ("desktop_display", "💻 电脑桌展示")
]

def load_and_encode_image(image_path):
    """加载图片并转换为Base64编码"""
    try:
        with open(image_path, 'rb') as f:
            image_data = f.read()
        
        # 转换为Base64
        base64_data = base64.b64encode(image_data).decode('utf-8')
        print(f"✅ 成功加载图片: {image_path}")
        print(f"📏 图片大小: {len(image_data)} bytes")
        return base64_data
    except Exception as e:
        print(f"❌ 加载图片失败: {e}")
        return None

async def submit_task(style_key, image_data):
    """提交3D手办生成任务"""
    url = f"{API_BASE_URL}/api/v1/tasks/figure-3d-generation"
    
    payload = {
        "image_data": image_data,
        "figure_style": style_key,
        "resolution": "square - 1024x1024 (1:1)"
    }
    
    headers = AUTH_HEADERS
    
    try:
        async with httpx.AsyncClient(timeout=30.0) as client:
            print(f"🚀 提交任务: {style_key}")
            response = await client.post(url, json=payload, headers=headers)
            
            if response.status_code in [200, 201]:
                result = response.json()
                task_id = result.get("task_id")
                print(f"✅ 任务提交成功: {task_id}")
                return task_id
            else:
                print(f"❌ 任务提交失败: {response.status_code} - {response.text}")
                return None
                
    except Exception as e:
        print(f"❌ 提交任务异常: {e}")
        return None

async def wait_for_completion(task_id):
    """等待任务完成"""
    url = f"{API_BASE_URL}/api/v1/tasks/{task_id}"
    
    start_time = time.time()
    
    async with httpx.AsyncClient(timeout=30.0) as client:
        while True:
            try:
                response = await client.get(url, headers=AUTH_HEADERS)
                
                if response.status_code == 200:
                    result = response.json()
                    status = result.get("status")
                    
                    print(f"📊 任务状态: {status}")
                    
                    if status == "completed":
                        print("✅ 任务完成!")
                        return result
                    elif status == "failed":
                        print(f"❌ 任务失败: {result.get('error', 'Unknown error')}")
                        return None
                    elif status in ["pending", "running", "processing"]:
                        # 检查超时
                        elapsed = time.time() - start_time
                        if elapsed > TIMEOUT_SECONDS:
                            print(f"⏰ 任务超时 ({TIMEOUT_SECONDS}秒)")
                            return None

                        print(f"⏳ 等待中... (已等待 {elapsed:.0f}秒)")
                        await asyncio.sleep(10)  # 每10秒检查一次
                    else:
                        print(f"❓ 未知状态: {status}")
                        await asyncio.sleep(5)
                        
                else:
                    print(f"❌ 状态查询失败: {response.status_code}")
                    await asyncio.sleep(5)
                    
            except Exception as e:
                print(f"❌ 状态查询异常: {e}")
                await asyncio.sleep(5)

async def download_result(task_id, style_key):
    """下载任务结果"""
    url = f"{API_BASE_URL}/api/v1/tasks/{task_id}/result"
    
    try:
        async with httpx.AsyncClient(timeout=60.0) as client:
            response = await client.get(url, headers=AUTH_HEADERS)
            
            if response.status_code == 200:
                result = response.json()
                result_url = result.get("result_url")
                
                if result_url:
                    # 下载图片 (图片URL通常不需要认证,但为了保险起见也加上)
                    img_response = await client.get(result_url)
                    if img_response.status_code == 200:
                        # 保存图片
                        output_path = os.path.join(OUTPUT_DIR, f"figure_3d_{style_key}.png")
                        
                        # 确保输出目录存在
                        os.makedirs(OUTPUT_DIR, exist_ok=True)
                        
                        with open(output_path, 'wb') as f:
                            f.write(img_response.content)
                        
                        print(f"✅ 结果已保存: {output_path}")
                        return output_path
                    else:
                        print(f"❌ 下载图片失败: {img_response.status_code}")
                else:
                    print("❌ 结果中没有图片URL")
            else:
                print(f"❌ 获取结果失败: {response.status_code}")
                
    except Exception as e:
        print(f"❌ 下载结果异常: {e}")
    
    return None

async def generate_example(style_key, style_name, image_data):
    """生成单个示例"""
    print(f"\n{'='*50}")
    print(f"🎨 开始生成示例: {style_name}")
    print(f"{'='*50}")
    
    # 1. 提交任务
    task_id = await submit_task(style_key, image_data)
    if not task_id:
        return False
    
    # 2. 等待完成
    result = await wait_for_completion(task_id)
    if not result:
        return False
    
    # 3. 下载结果
    output_path = await download_result(task_id, style_key)
    if output_path:
        print(f"🎉 示例生成成功: {style_name}")
        return True
    else:
        print(f"💥 示例生成失败: {style_name}")
        return False

async def main():
    """主函数"""
    print("🚀 开始生成3D手办示例...")
    
    # 加载输入图片
    image_data = load_and_encode_image(INPUT_IMAGE_PATH)
    if not image_data:
        print("❌ 无法加载输入图片,退出")
        return
    
    success_count = 0
    
    # 逐个生成示例(每次只运行一个)
    for style_key, style_name in STYLES:
        success = await generate_example(style_key, style_name, image_data)
        if success:
            success_count += 1
        
        # 在任务之间稍作休息
        if style_key != STYLES[-1][0]:  # 不是最后一个
            print("\n⏸️  休息5秒后继续下一个...")
            await asyncio.sleep(5)
    
    print(f"\n{'='*50}")
    print(f"🏁 生成完成! 成功: {success_count}/{len(STYLES)}")
    print(f"{'='*50}")

if __name__ == "__main__":
    asyncio.run(main())