|
|
import gradio as gr |
|
|
import os |
|
|
import json |
|
|
import time |
|
|
import hmac |
|
|
import hashlib |
|
|
import requests |
|
|
from datetime import datetime |
|
|
from pathlib import Path |
|
|
import sqlite3 |
|
|
import zipfile |
|
|
import io |
|
|
import tempfile |
|
|
|
|
|
|
|
|
ENDPOINT = 'hunyuan.intl.tencentcloudapi.com' |
|
|
SERVICE = 'hunyuan' |
|
|
VERSION = '2023-09-01' |
|
|
REGION = 'ap-singapore' |
|
|
|
|
|
|
|
|
SECRET_ID = os.environ.get('TENCENT_SECRET_ID', '') |
|
|
SECRET_KEY = os.environ.get('TENCENT_SECRET_KEY', '') |
|
|
APP_PASSWORD = os.environ.get('APP_PASSWORD', 'hunyuan3d') |
|
|
|
|
|
|
|
|
DB_PATH = 'generations.db' |
|
|
MODELS_DIR = 'saved_models' |
|
|
|
|
|
|
|
|
Path(MODELS_DIR).mkdir(exist_ok=True) |
|
|
|
|
|
def init_database(): |
|
|
"""Initialize SQLite database for storing generations""" |
|
|
conn = sqlite3.connect(DB_PATH) |
|
|
cursor = conn.cursor() |
|
|
cursor.execute(''' |
|
|
CREATE TABLE IF NOT EXISTS generations ( |
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT, |
|
|
job_id TEXT UNIQUE, |
|
|
prompt TEXT, |
|
|
status TEXT, |
|
|
created_at TIMESTAMP, |
|
|
completed_at TIMESTAMP, |
|
|
result_files TEXT, |
|
|
preview_image TEXT, |
|
|
settings TEXT, |
|
|
local_files TEXT |
|
|
) |
|
|
''') |
|
|
conn.commit() |
|
|
conn.close() |
|
|
|
|
|
init_database() |
|
|
|
|
|
def sha256(message): |
|
|
"""Calculate SHA256 hash""" |
|
|
return hashlib.sha256(message.encode('utf-8')).hexdigest() |
|
|
|
|
|
def hmac_sha256(key, message): |
|
|
"""Calculate HMAC-SHA256""" |
|
|
if isinstance(key, str): |
|
|
key = key.encode('utf-8') |
|
|
return hmac.new(key, message.encode('utf-8'), hashlib.sha256).digest() |
|
|
|
|
|
def hmac_sha256_hex(key, message): |
|
|
"""Calculate HMAC-SHA256 and return hex""" |
|
|
if isinstance(key, str): |
|
|
key = key.encode('utf-8') |
|
|
return hmac.new(key, message.encode('utf-8'), hashlib.sha256).hexdigest() |
|
|
|
|
|
def make_request(action, payload): |
|
|
"""Make authenticated request to Tencent Cloud API""" |
|
|
timestamp = int(time.time()) |
|
|
date = datetime.utcfromtimestamp(timestamp).strftime('%Y-%m-%d') |
|
|
payload_str = json.dumps(payload) |
|
|
|
|
|
|
|
|
http_request_method = 'POST' |
|
|
canonical_uri = '/' |
|
|
canonical_query_string = '' |
|
|
canonical_headers = f'content-type:application/json; charset=utf-8\nhost:{ENDPOINT}\nx-tc-action:{action.lower()}\n' |
|
|
signed_headers = 'content-type;host;x-tc-action' |
|
|
hashed_request_payload = sha256(payload_str) |
|
|
canonical_request = f'{http_request_method}\n{canonical_uri}\n{canonical_query_string}\n{canonical_headers}\n{signed_headers}\n{hashed_request_payload}' |
|
|
|
|
|
|
|
|
algorithm = 'TC3-HMAC-SHA256' |
|
|
hashed_canonical_request = sha256(canonical_request) |
|
|
credential_scope = f'{date}/{SERVICE}/tc3_request' |
|
|
string_to_sign = f'{algorithm}\n{timestamp}\n{credential_scope}\n{hashed_canonical_request}' |
|
|
|
|
|
|
|
|
k_date = hmac_sha256(f'TC3{SECRET_KEY}', date) |
|
|
k_service = hmac_sha256(k_date, SERVICE) |
|
|
k_signing = hmac_sha256(k_service, 'tc3_request') |
|
|
signature = hmac_sha256_hex(k_signing, string_to_sign) |
|
|
|
|
|
|
|
|
authorization = f'{algorithm} Credential={SECRET_ID}/{credential_scope}, SignedHeaders={signed_headers}, Signature={signature}' |
|
|
|
|
|
|
|
|
headers = { |
|
|
'Content-Type': 'application/json; charset=utf-8', |
|
|
'Host': ENDPOINT, |
|
|
'X-TC-Action': action, |
|
|
'X-TC-Version': VERSION, |
|
|
'X-TC-Timestamp': str(timestamp), |
|
|
'X-TC-Region': REGION, |
|
|
'Authorization': authorization, |
|
|
} |
|
|
|
|
|
response = requests.post(f'https://{ENDPOINT}', headers=headers, data=payload_str) |
|
|
return response.json() |
|
|
|
|
|
def save_generation(job_id, prompt, settings): |
|
|
"""Save generation to database""" |
|
|
conn = sqlite3.connect(DB_PATH) |
|
|
cursor = conn.cursor() |
|
|
cursor.execute(''' |
|
|
INSERT OR REPLACE INTO generations (job_id, prompt, status, created_at, settings) |
|
|
VALUES (?, ?, ?, ?, ?) |
|
|
''', (job_id, prompt, 'WAIT', datetime.now(), json.dumps(settings))) |
|
|
conn.commit() |
|
|
conn.close() |
|
|
|
|
|
def update_generation(job_id, status, result_files=None, preview_image=None): |
|
|
"""Update generation status in database and download files""" |
|
|
conn = sqlite3.connect(DB_PATH) |
|
|
cursor = conn.cursor() |
|
|
|
|
|
local_files = None |
|
|
|
|
|
if status == 'DONE' and result_files: |
|
|
|
|
|
local_files = download_and_save_models(job_id, result_files) |
|
|
|
|
|
cursor.execute(''' |
|
|
UPDATE generations |
|
|
SET status = ?, completed_at = ?, result_files = ?, preview_image = ?, local_files = ? |
|
|
WHERE job_id = ? |
|
|
''', (status, datetime.now(), json.dumps(result_files), preview_image, json.dumps(local_files) if local_files else None, job_id)) |
|
|
else: |
|
|
cursor.execute(''' |
|
|
UPDATE generations |
|
|
SET status = ? |
|
|
WHERE job_id = ? |
|
|
''', (status, job_id)) |
|
|
|
|
|
conn.commit() |
|
|
conn.close() |
|
|
|
|
|
def get_all_generations(): |
|
|
"""Get all generations from database""" |
|
|
conn = sqlite3.connect(DB_PATH) |
|
|
cursor = conn.cursor() |
|
|
cursor.execute(''' |
|
|
SELECT job_id, prompt, status, created_at, completed_at, result_files, preview_image, local_files |
|
|
FROM generations |
|
|
ORDER BY created_at DESC |
|
|
''') |
|
|
rows = cursor.fetchall() |
|
|
conn.close() |
|
|
return rows |
|
|
|
|
|
def submit_generation( |
|
|
prompt, |
|
|
image, |
|
|
image_url, |
|
|
multi_left, |
|
|
multi_right, |
|
|
multi_back, |
|
|
generate_type, |
|
|
face_count, |
|
|
polygon_type, |
|
|
enable_pbr |
|
|
): |
|
|
"""Submit 3D generation job""" |
|
|
if not SECRET_ID or not SECRET_KEY: |
|
|
return "β Error: API credentials not configured. Please set TENCENT_SECRET_ID and TENCENT_SECRET_KEY environment variables." |
|
|
|
|
|
import base64 |
|
|
from io import BytesIO |
|
|
|
|
|
|
|
|
payload = {} |
|
|
|
|
|
|
|
|
if prompt and prompt.strip(): |
|
|
payload['Prompt'] = prompt.strip() |
|
|
|
|
|
if image is not None: |
|
|
|
|
|
buffered = BytesIO() |
|
|
image.save(buffered, format="PNG") |
|
|
img_base64 = base64.b64encode(buffered.getvalue()).decode('utf-8') |
|
|
payload['ImageBase64'] = img_base64 |
|
|
elif image_url and image_url.strip(): |
|
|
payload['ImageUrl'] = image_url.strip() |
|
|
|
|
|
|
|
|
multi_view_array = [] |
|
|
if multi_left is not None: |
|
|
buffered = BytesIO() |
|
|
multi_left.save(buffered, format="PNG") |
|
|
img_base64 = base64.b64encode(buffered.getvalue()).decode('utf-8') |
|
|
multi_view_array.append({'ViewType': 'left', 'ViewImageBase64': img_base64}) |
|
|
if multi_right is not None: |
|
|
buffered = BytesIO() |
|
|
multi_right.save(buffered, format="PNG") |
|
|
img_base64 = base64.b64encode(buffered.getvalue()).decode('utf-8') |
|
|
multi_view_array.append({'ViewType': 'right', 'ViewImageBase64': img_base64}) |
|
|
if multi_back is not None: |
|
|
buffered = BytesIO() |
|
|
multi_back.save(buffered, format="PNG") |
|
|
img_base64 = base64.b64encode(buffered.getvalue()).decode('utf-8') |
|
|
multi_view_array.append({'ViewType': 'back', 'ViewImageBase64': img_base64}) |
|
|
|
|
|
if multi_view_array: |
|
|
payload['MultiViewImages'] = multi_view_array |
|
|
|
|
|
|
|
|
payload['GenerateType'] = generate_type |
|
|
payload['FaceCount'] = face_count |
|
|
if generate_type == 'LowPoly': |
|
|
payload['PolygonType'] = polygon_type |
|
|
payload['EnablePBR'] = enable_pbr |
|
|
|
|
|
|
|
|
if not payload.get('Prompt') and not payload.get('ImageUrl') and not payload.get('ImageBase64'): |
|
|
return "β Error: Please provide either a text prompt or an image." |
|
|
|
|
|
try: |
|
|
|
|
|
response = make_request('SubmitHunyuanTo3DProJob', payload) |
|
|
|
|
|
if 'Response' in response: |
|
|
if 'Error' in response['Response']: |
|
|
return f"β API Error: {response['Response']['Error']['Message']}" |
|
|
|
|
|
job_id = response['Response'].get('JobId') |
|
|
if job_id: |
|
|
|
|
|
settings = { |
|
|
'generate_type': generate_type, |
|
|
'face_count': face_count, |
|
|
'polygon_type': polygon_type, |
|
|
'enable_pbr': enable_pbr |
|
|
} |
|
|
save_generation(job_id, prompt or 'Image-to-3D', settings) |
|
|
|
|
|
return f"β
Job submitted successfully!\n\n**Job ID:** {job_id}\n\nYour generation has been queued. Check the 'View Generations' tab to monitor progress." |
|
|
else: |
|
|
return "β Error: No Job ID returned from API" |
|
|
else: |
|
|
return f"β Error: Unexpected response format: {response}" |
|
|
|
|
|
except Exception as e: |
|
|
return f"β Error: {str(e)}" |
|
|
|
|
|
def check_job_status(job_id): |
|
|
"""Check the status of a job""" |
|
|
if not job_id or not job_id.strip(): |
|
|
return "β οΈ Please enter a Job ID" |
|
|
|
|
|
try: |
|
|
payload = {'JobId': job_id.strip()} |
|
|
response = make_request('QueryHunyuanTo3DProJob', payload) |
|
|
|
|
|
if 'Response' in response: |
|
|
if 'Error' in response['Response']: |
|
|
return f"β API Error: {response['Response']['Error']['Message']}" |
|
|
|
|
|
status = response['Response'].get('Status', 'UNKNOWN') |
|
|
|
|
|
|
|
|
if status == 'DONE': |
|
|
result_files = response['Response'].get('ResultFile3Ds', []) |
|
|
preview_image = result_files[0].get('PreviewImageUrl') if result_files else None |
|
|
update_generation(job_id, status, result_files, preview_image) |
|
|
elif status in ['FAIL', 'WAIT', 'RUN']: |
|
|
update_generation(job_id, status) |
|
|
|
|
|
|
|
|
result = f"**Job ID:** {job_id}\n**Status:** {status}\n\n" |
|
|
|
|
|
if status == 'DONE': |
|
|
result += "β
**Generation Complete!**\n\n" |
|
|
result_files = response['Response'].get('ResultFile3Ds', []) |
|
|
for i, file in enumerate(result_files): |
|
|
result += f"**File {i+1}:**\n" |
|
|
result += f"- Type: {file.get('Type', 'N/A')}\n" |
|
|
result += f"- URL: {file.get('Url', 'N/A')}\n" |
|
|
if file.get('PreviewImageUrl'): |
|
|
result += f"- Preview: {file.get('PreviewImageUrl')}\n" |
|
|
result += "\n" |
|
|
elif status == 'FAIL': |
|
|
error_msg = response['Response'].get('ErrorMessage', 'Unknown error') |
|
|
result += f"β **Generation Failed**\nError: {error_msg}" |
|
|
elif status == 'WAIT': |
|
|
result += "β³ **Waiting in queue...**" |
|
|
elif status == 'RUN': |
|
|
result += "π¨ **Generating your 3D model...**" |
|
|
|
|
|
return result |
|
|
else: |
|
|
return f"β Error: Unexpected response format" |
|
|
|
|
|
except Exception as e: |
|
|
return f"β Error: {str(e)}" |
|
|
|
|
|
def get_generations_list(): |
|
|
"""Get list of generations for dropdown""" |
|
|
generations = get_all_generations() |
|
|
if not generations: |
|
|
return [] |
|
|
|
|
|
choices = [] |
|
|
for gen in generations: |
|
|
job_id, prompt, status, created_at, completed_at, result_files, preview_image, local_files = gen |
|
|
status_emoji = { |
|
|
'DONE': 'β
', |
|
|
'FAIL': 'β', |
|
|
'WAIT': 'β³', |
|
|
'RUN': 'π¨' |
|
|
}.get(status, 'β') |
|
|
label = f"{status_emoji} {job_id[:20]}... - {prompt[:30]}... ({status})" |
|
|
choices.append((label, job_id)) |
|
|
|
|
|
return choices |
|
|
|
|
|
def download_and_save_models(job_id, result_files): |
|
|
"""Download and save model files locally""" |
|
|
saved_files = [] |
|
|
|
|
|
try: |
|
|
|
|
|
job_dir = Path(MODELS_DIR) / job_id |
|
|
job_dir.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
for file_info in result_files: |
|
|
file_url = file_info.get('Url') |
|
|
file_type = file_info.get('Type', 'model') |
|
|
|
|
|
if not file_url: |
|
|
continue |
|
|
|
|
|
|
|
|
response = requests.get(file_url, timeout=60) |
|
|
response.raise_for_status() |
|
|
|
|
|
|
|
|
zip_path = job_dir / f"{file_type}.zip" |
|
|
with open(zip_path, 'wb') as f: |
|
|
f.write(response.content) |
|
|
|
|
|
|
|
|
extract_dir = job_dir / file_type |
|
|
extract_dir.mkdir(exist_ok=True) |
|
|
|
|
|
with zipfile.ZipFile(zip_path, 'r') as zip_ref: |
|
|
zip_ref.extractall(extract_dir) |
|
|
|
|
|
|
|
|
extracted_files = [] |
|
|
for root, dirs, files in os.walk(extract_dir): |
|
|
for file in files: |
|
|
rel_path = os.path.relpath(os.path.join(root, file), MODELS_DIR) |
|
|
extracted_files.append(rel_path) |
|
|
|
|
|
saved_files.append({ |
|
|
'type': file_type, |
|
|
'extracted_files': extracted_files, |
|
|
'download_url': file_url, |
|
|
'zip_path': str(zip_path.relative_to(MODELS_DIR)) |
|
|
}) |
|
|
|
|
|
return saved_files |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Error downloading/saving models: {e}") |
|
|
return None |
|
|
|
|
|
def load_generation_details(job_id): |
|
|
"""Load details for a specific generation""" |
|
|
if not job_id: |
|
|
return "Select a generation to view details", None |
|
|
|
|
|
conn = sqlite3.connect(DB_PATH) |
|
|
cursor = conn.cursor() |
|
|
cursor.execute(''' |
|
|
SELECT job_id, prompt, status, created_at, completed_at, result_files, preview_image, local_files |
|
|
FROM generations |
|
|
WHERE job_id = ? |
|
|
''', (job_id,)) |
|
|
gen = cursor.fetchone() |
|
|
conn.close() |
|
|
|
|
|
if not gen: |
|
|
return "Generation not found", None |
|
|
|
|
|
job_id, prompt, status, created_at, completed_at, result_files, preview_image, local_files = gen |
|
|
|
|
|
status_emoji = { |
|
|
'DONE': 'β
', |
|
|
'FAIL': 'β', |
|
|
'WAIT': 'β³', |
|
|
'RUN': 'π¨' |
|
|
}.get(status, 'β') |
|
|
|
|
|
|
|
|
info = f"## {status_emoji} Generation Details\n\n" |
|
|
info += f"**Job ID:** {job_id}\n" |
|
|
info += f"**Prompt:** {prompt}\n" |
|
|
info += f"**Status:** {status}\n" |
|
|
info += f"**Created:** {created_at}\n" |
|
|
|
|
|
if completed_at: |
|
|
info += f"**Completed:** {completed_at}\n" |
|
|
|
|
|
preview_url = None |
|
|
|
|
|
if status == 'DONE' and result_files: |
|
|
files = json.loads(result_files) |
|
|
info += "\n### π Download Links:\n" |
|
|
for file in files: |
|
|
file_type = file.get('Type', 'N/A') |
|
|
file_url = file.get('Url', 'N/A') |
|
|
info += f"- **{file_type}**: [Download]({file_url})\n" |
|
|
|
|
|
|
|
|
if file.get('PreviewImageUrl') and not preview_url: |
|
|
preview_url = file.get('PreviewImageUrl') |
|
|
|
|
|
|
|
|
if local_files: |
|
|
local_files_data = json.loads(local_files) |
|
|
info += "\n### πΎ Saved Files:\n" |
|
|
for saved in local_files_data: |
|
|
info += f"\n**{saved['type']}**:\n" |
|
|
info += f"- ZIP: `{saved['zip_path']}`\n" |
|
|
info += f"- Extracted {len(saved['extracted_files'])} files\n" |
|
|
|
|
|
return info, preview_url |
|
|
|
|
|
def refresh_generations_dropdown(): |
|
|
"""Refresh the generations dropdown""" |
|
|
choices = get_generations_list() |
|
|
if choices: |
|
|
return gr.Dropdown(choices=choices, value=choices[0][1]) |
|
|
return gr.Dropdown(choices=[], value=None) |
|
|
|
|
|
|
|
|
demo = gr.Blocks(title="Hunyuan 3D Studio") |
|
|
|
|
|
with demo: |
|
|
|
|
|
gr.HTML(""" |
|
|
<style> |
|
|
.gradio-container { |
|
|
max-width: 1200px !important; |
|
|
} |
|
|
.status-box { |
|
|
border-radius: 8px; |
|
|
padding: 1rem; |
|
|
margin: 1rem 0; |
|
|
} |
|
|
</style> |
|
|
""") |
|
|
gr.Markdown(""" |
|
|
# π¨ THE GUYBERS 3D STUDIO |
|
|
### Transform your imagination into stunning 3D models |
|
|
Powered by Tencent Cloud Hunyuan 3D API |
|
|
""") |
|
|
|
|
|
with gr.Tabs(): |
|
|
|
|
|
with gr.Tab("π Generate"): |
|
|
with gr.Row(): |
|
|
with gr.Column(scale=2): |
|
|
prompt = gr.Textbox( |
|
|
label="Text Prompt", |
|
|
placeholder="Describe your 3D model... (e.g., A futuristic sci-fi spaceship with sleek curves and neon lights)", |
|
|
lines=3 |
|
|
) |
|
|
|
|
|
with gr.Accordion("πΈ Image Input", open=False): |
|
|
image = gr.Image(label="Upload Image", type="pil") |
|
|
image_url = gr.Textbox(label="Or Image URL", placeholder="https://example.com/image.jpg") |
|
|
gr.Markdown("*Note: Either Prompt OR Image is required. Cannot use both together (except in Sketch mode).*") |
|
|
|
|
|
with gr.Accordion("π Multi-View Images (Optional)", open=False): |
|
|
multi_left = gr.Image(label="Left View", type="pil") |
|
|
multi_right = gr.Image(label="Right View", type="pil") |
|
|
multi_back = gr.Image(label="Back View", type="pil") |
|
|
|
|
|
with gr.Column(scale=1): |
|
|
gr.Markdown("### βοΈ Settings") |
|
|
generate_type = gr.Dropdown( |
|
|
choices=["Normal", "LowPoly", "Geometry", "Sketch"], |
|
|
value="Normal", |
|
|
label="Generate Type" |
|
|
) |
|
|
face_count = gr.Slider( |
|
|
minimum=40000, |
|
|
maximum=1500000, |
|
|
value=500000, |
|
|
step=10000, |
|
|
label="Face Count" |
|
|
) |
|
|
polygon_type = gr.Dropdown( |
|
|
choices=["triangle", "quadrilateral"], |
|
|
value="triangle", |
|
|
label="Polygon Type (LowPoly only)" |
|
|
) |
|
|
enable_pbr = gr.Checkbox(label="Enable PBR Materials", value=False) |
|
|
|
|
|
submit_btn = gr.Button("β¨ Generate 3D Model", variant="primary", size="lg") |
|
|
output = gr.Markdown(label="Result") |
|
|
|
|
|
submit_btn.click( |
|
|
fn=submit_generation, |
|
|
inputs=[ |
|
|
prompt, image, image_url, |
|
|
multi_left, multi_right, multi_back, |
|
|
generate_type, face_count, polygon_type, enable_pbr |
|
|
], |
|
|
outputs=output |
|
|
) |
|
|
|
|
|
|
|
|
with gr.Tab("π Check Status"): |
|
|
gr.Markdown("### Check the status of a specific job") |
|
|
with gr.Row(): |
|
|
job_id_input = gr.Textbox(label="Job ID", placeholder="Enter Job ID to check status") |
|
|
check_btn = gr.Button("Check Status", variant="secondary") |
|
|
status_output = gr.Markdown(label="Status") |
|
|
|
|
|
check_btn.click( |
|
|
fn=check_job_status, |
|
|
inputs=job_id_input, |
|
|
outputs=status_output |
|
|
) |
|
|
|
|
|
|
|
|
with gr.Tab("π¦ View Generations"): |
|
|
gr.Markdown("### All your generations") |
|
|
|
|
|
with gr.Row(): |
|
|
refresh_btn = gr.Button("π Refresh List", variant="secondary") |
|
|
|
|
|
generations_dropdown = gr.Dropdown( |
|
|
label="Select Generation", |
|
|
choices=get_generations_list(), |
|
|
interactive=True |
|
|
) |
|
|
|
|
|
with gr.Row(): |
|
|
with gr.Column(scale=1): |
|
|
generation_info = gr.Markdown("Select a generation to view details") |
|
|
|
|
|
with gr.Column(scale=1): |
|
|
preview_image = gr.Image(label="Preview Image") |
|
|
|
|
|
|
|
|
refresh_btn.click( |
|
|
fn=refresh_generations_dropdown, |
|
|
outputs=generations_dropdown |
|
|
) |
|
|
|
|
|
generations_dropdown.change( |
|
|
fn=load_generation_details, |
|
|
inputs=generations_dropdown, |
|
|
outputs=[generation_info, preview_image] |
|
|
) |
|
|
|
|
|
gr.Markdown(""" |
|
|
--- |
|
|
### π Notes: |
|
|
- Generations typically take 2-5 minutes |
|
|
- All generations are stored and can be viewed in the 'View Generations' tab |
|
|
- You can run multiple generations simultaneously |
|
|
- Download links are available once generation is complete |
|
|
""") |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
demo.queue() |
|
|
demo.launch( |
|
|
auth=("user", APP_PASSWORD), |
|
|
auth_message="Enter password to access Hunyuan 3D Studio", |
|
|
share=False |
|
|
) |
|
|
|