Guyber / app.py
SIGMitch's picture
Update app.py
78c988c verified
import gradio as gr
import os
import json
import time
import hmac
import hashlib
import requests
from datetime import datetime
from pathlib import Path
import sqlite3
import zipfile
import io
import tempfile
# Configuration
ENDPOINT = 'hunyuan.intl.tencentcloudapi.com'
SERVICE = 'hunyuan'
VERSION = '2023-09-01'
REGION = 'ap-singapore'
# Get credentials from environment variables
SECRET_ID = os.environ.get('TENCENT_SECRET_ID', '')
SECRET_KEY = os.environ.get('TENCENT_SECRET_KEY', '')
APP_PASSWORD = os.environ.get('APP_PASSWORD', 'hunyuan3d')
# Database setup
DB_PATH = 'generations.db'
MODELS_DIR = 'saved_models'
# Create models directory if it doesn't exist
Path(MODELS_DIR).mkdir(exist_ok=True)
def init_database():
"""Initialize SQLite database for storing generations"""
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS generations (
id INTEGER PRIMARY KEY AUTOINCREMENT,
job_id TEXT UNIQUE,
prompt TEXT,
status TEXT,
created_at TIMESTAMP,
completed_at TIMESTAMP,
result_files TEXT,
preview_image TEXT,
settings TEXT,
local_files TEXT
)
''')
conn.commit()
conn.close()
init_database()
def sha256(message):
"""Calculate SHA256 hash"""
return hashlib.sha256(message.encode('utf-8')).hexdigest()
def hmac_sha256(key, message):
"""Calculate HMAC-SHA256"""
if isinstance(key, str):
key = key.encode('utf-8')
return hmac.new(key, message.encode('utf-8'), hashlib.sha256).digest()
def hmac_sha256_hex(key, message):
"""Calculate HMAC-SHA256 and return hex"""
if isinstance(key, str):
key = key.encode('utf-8')
return hmac.new(key, message.encode('utf-8'), hashlib.sha256).hexdigest()
def make_request(action, payload):
"""Make authenticated request to Tencent Cloud API"""
timestamp = int(time.time())
date = datetime.utcfromtimestamp(timestamp).strftime('%Y-%m-%d')
payload_str = json.dumps(payload)
# Build canonical request
http_request_method = 'POST'
canonical_uri = '/'
canonical_query_string = ''
canonical_headers = f'content-type:application/json; charset=utf-8\nhost:{ENDPOINT}\nx-tc-action:{action.lower()}\n'
signed_headers = 'content-type;host;x-tc-action'
hashed_request_payload = sha256(payload_str)
canonical_request = f'{http_request_method}\n{canonical_uri}\n{canonical_query_string}\n{canonical_headers}\n{signed_headers}\n{hashed_request_payload}'
# Build string to sign
algorithm = 'TC3-HMAC-SHA256'
hashed_canonical_request = sha256(canonical_request)
credential_scope = f'{date}/{SERVICE}/tc3_request'
string_to_sign = f'{algorithm}\n{timestamp}\n{credential_scope}\n{hashed_canonical_request}'
# Calculate signature
k_date = hmac_sha256(f'TC3{SECRET_KEY}', date)
k_service = hmac_sha256(k_date, SERVICE)
k_signing = hmac_sha256(k_service, 'tc3_request')
signature = hmac_sha256_hex(k_signing, string_to_sign)
# Build authorization header
authorization = f'{algorithm} Credential={SECRET_ID}/{credential_scope}, SignedHeaders={signed_headers}, Signature={signature}'
# Make request
headers = {
'Content-Type': 'application/json; charset=utf-8',
'Host': ENDPOINT,
'X-TC-Action': action,
'X-TC-Version': VERSION,
'X-TC-Timestamp': str(timestamp),
'X-TC-Region': REGION,
'Authorization': authorization,
}
response = requests.post(f'https://{ENDPOINT}', headers=headers, data=payload_str)
return response.json()
def save_generation(job_id, prompt, settings):
"""Save generation to database"""
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute('''
INSERT OR REPLACE INTO generations (job_id, prompt, status, created_at, settings)
VALUES (?, ?, ?, ?, ?)
''', (job_id, prompt, 'WAIT', datetime.now(), json.dumps(settings)))
conn.commit()
conn.close()
def update_generation(job_id, status, result_files=None, preview_image=None):
"""Update generation status in database and download files"""
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
local_files = None
if status == 'DONE' and result_files:
# Download and save files locally
local_files = download_and_save_models(job_id, result_files)
cursor.execute('''
UPDATE generations
SET status = ?, completed_at = ?, result_files = ?, preview_image = ?, local_files = ?
WHERE job_id = ?
''', (status, datetime.now(), json.dumps(result_files), preview_image, json.dumps(local_files) if local_files else None, job_id))
else:
cursor.execute('''
UPDATE generations
SET status = ?
WHERE job_id = ?
''', (status, job_id))
conn.commit()
conn.close()
def get_all_generations():
"""Get all generations from database"""
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute('''
SELECT job_id, prompt, status, created_at, completed_at, result_files, preview_image, local_files
FROM generations
ORDER BY created_at DESC
''')
rows = cursor.fetchall()
conn.close()
return rows
def submit_generation(
prompt,
image,
image_url,
multi_left,
multi_right,
multi_back,
generate_type,
face_count,
polygon_type,
enable_pbr
):
"""Submit 3D generation job"""
if not SECRET_ID or not SECRET_KEY:
return "❌ Error: API credentials not configured. Please set TENCENT_SECRET_ID and TENCENT_SECRET_KEY environment variables."
import base64
from io import BytesIO
# Build payload
payload = {}
# Text prompt or image
if prompt and prompt.strip():
payload['Prompt'] = prompt.strip()
if image is not None:
# Convert PIL image to base64
buffered = BytesIO()
image.save(buffered, format="PNG")
img_base64 = base64.b64encode(buffered.getvalue()).decode('utf-8')
payload['ImageBase64'] = img_base64
elif image_url and image_url.strip():
payload['ImageUrl'] = image_url.strip()
# Multi-view images (convert uploaded images to base64)
multi_view_array = []
if multi_left is not None:
buffered = BytesIO()
multi_left.save(buffered, format="PNG")
img_base64 = base64.b64encode(buffered.getvalue()).decode('utf-8')
multi_view_array.append({'ViewType': 'left', 'ViewImageBase64': img_base64})
if multi_right is not None:
buffered = BytesIO()
multi_right.save(buffered, format="PNG")
img_base64 = base64.b64encode(buffered.getvalue()).decode('utf-8')
multi_view_array.append({'ViewType': 'right', 'ViewImageBase64': img_base64})
if multi_back is not None:
buffered = BytesIO()
multi_back.save(buffered, format="PNG")
img_base64 = base64.b64encode(buffered.getvalue()).decode('utf-8')
multi_view_array.append({'ViewType': 'back', 'ViewImageBase64': img_base64})
if multi_view_array:
payload['MultiViewImages'] = multi_view_array
# Settings
payload['GenerateType'] = generate_type
payload['FaceCount'] = face_count
if generate_type == 'LowPoly':
payload['PolygonType'] = polygon_type
payload['EnablePBR'] = enable_pbr
# Validate
if not payload.get('Prompt') and not payload.get('ImageUrl') and not payload.get('ImageBase64'):
return "❌ Error: Please provide either a text prompt or an image."
try:
# Submit job
response = make_request('SubmitHunyuanTo3DProJob', payload)
if 'Response' in response:
if 'Error' in response['Response']:
return f"❌ API Error: {response['Response']['Error']['Message']}"
job_id = response['Response'].get('JobId')
if job_id:
# Save to database
settings = {
'generate_type': generate_type,
'face_count': face_count,
'polygon_type': polygon_type,
'enable_pbr': enable_pbr
}
save_generation(job_id, prompt or 'Image-to-3D', settings)
return f"βœ… Job submitted successfully!\n\n**Job ID:** {job_id}\n\nYour generation has been queued. Check the 'View Generations' tab to monitor progress."
else:
return "❌ Error: No Job ID returned from API"
else:
return f"❌ Error: Unexpected response format: {response}"
except Exception as e:
return f"❌ Error: {str(e)}"
def check_job_status(job_id):
"""Check the status of a job"""
if not job_id or not job_id.strip():
return "⚠️ Please enter a Job ID"
try:
payload = {'JobId': job_id.strip()}
response = make_request('QueryHunyuanTo3DProJob', payload)
if 'Response' in response:
if 'Error' in response['Response']:
return f"❌ API Error: {response['Response']['Error']['Message']}"
status = response['Response'].get('Status', 'UNKNOWN')
# Update database
if status == 'DONE':
result_files = response['Response'].get('ResultFile3Ds', [])
preview_image = result_files[0].get('PreviewImageUrl') if result_files else None
update_generation(job_id, status, result_files, preview_image)
elif status in ['FAIL', 'WAIT', 'RUN']:
update_generation(job_id, status)
# Format response
result = f"**Job ID:** {job_id}\n**Status:** {status}\n\n"
if status == 'DONE':
result += "βœ… **Generation Complete!**\n\n"
result_files = response['Response'].get('ResultFile3Ds', [])
for i, file in enumerate(result_files):
result += f"**File {i+1}:**\n"
result += f"- Type: {file.get('Type', 'N/A')}\n"
result += f"- URL: {file.get('Url', 'N/A')}\n"
if file.get('PreviewImageUrl'):
result += f"- Preview: {file.get('PreviewImageUrl')}\n"
result += "\n"
elif status == 'FAIL':
error_msg = response['Response'].get('ErrorMessage', 'Unknown error')
result += f"❌ **Generation Failed**\nError: {error_msg}"
elif status == 'WAIT':
result += "⏳ **Waiting in queue...**"
elif status == 'RUN':
result += "🎨 **Generating your 3D model...**"
return result
else:
return f"❌ Error: Unexpected response format"
except Exception as e:
return f"❌ Error: {str(e)}"
def get_generations_list():
"""Get list of generations for dropdown"""
generations = get_all_generations()
if not generations:
return []
choices = []
for gen in generations:
job_id, prompt, status, created_at, completed_at, result_files, preview_image, local_files = gen
status_emoji = {
'DONE': 'βœ…',
'FAIL': '❌',
'WAIT': '⏳',
'RUN': '🎨'
}.get(status, '❓')
label = f"{status_emoji} {job_id[:20]}... - {prompt[:30]}... ({status})"
choices.append((label, job_id))
return choices
def download_and_save_models(job_id, result_files):
"""Download and save model files locally"""
saved_files = []
try:
# Create job directory
job_dir = Path(MODELS_DIR) / job_id
job_dir.mkdir(parents=True, exist_ok=True)
for file_info in result_files:
file_url = file_info.get('Url')
file_type = file_info.get('Type', 'model')
if not file_url:
continue
# Download the ZIP file
response = requests.get(file_url, timeout=60)
response.raise_for_status()
# Save ZIP file
zip_path = job_dir / f"{file_type}.zip"
with open(zip_path, 'wb') as f:
f.write(response.content)
# Extract ZIP
extract_dir = job_dir / file_type
extract_dir.mkdir(exist_ok=True)
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
zip_ref.extractall(extract_dir)
# Find all extracted files
extracted_files = []
for root, dirs, files in os.walk(extract_dir):
for file in files:
rel_path = os.path.relpath(os.path.join(root, file), MODELS_DIR)
extracted_files.append(rel_path)
saved_files.append({
'type': file_type,
'extracted_files': extracted_files,
'download_url': file_url,
'zip_path': str(zip_path.relative_to(MODELS_DIR))
})
return saved_files
except Exception as e:
print(f"Error downloading/saving models: {e}")
return None
def load_generation_details(job_id):
"""Load details for a specific generation"""
if not job_id:
return "Select a generation to view details", None
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute('''
SELECT job_id, prompt, status, created_at, completed_at, result_files, preview_image, local_files
FROM generations
WHERE job_id = ?
''', (job_id,))
gen = cursor.fetchone()
conn.close()
if not gen:
return "Generation not found", None
job_id, prompt, status, created_at, completed_at, result_files, preview_image, local_files = gen
status_emoji = {
'DONE': 'βœ…',
'FAIL': '❌',
'WAIT': '⏳',
'RUN': '🎨'
}.get(status, '❓')
# Build info text
info = f"## {status_emoji} Generation Details\n\n"
info += f"**Job ID:** {job_id}\n"
info += f"**Prompt:** {prompt}\n"
info += f"**Status:** {status}\n"
info += f"**Created:** {created_at}\n"
if completed_at:
info += f"**Completed:** {completed_at}\n"
preview_url = None
if status == 'DONE' and result_files:
files = json.loads(result_files)
info += "\n### πŸ“ Download Links:\n"
for file in files:
file_type = file.get('Type', 'N/A')
file_url = file.get('Url', 'N/A')
info += f"- **{file_type}**: [Download]({file_url})\n"
# Get preview image
if file.get('PreviewImageUrl') and not preview_url:
preview_url = file.get('PreviewImageUrl')
# Show local files if available
if local_files:
local_files_data = json.loads(local_files)
info += "\n### πŸ’Ύ Saved Files:\n"
for saved in local_files_data:
info += f"\n**{saved['type']}**:\n"
info += f"- ZIP: `{saved['zip_path']}`\n"
info += f"- Extracted {len(saved['extracted_files'])} files\n"
return info, preview_url
def refresh_generations_dropdown():
"""Refresh the generations dropdown"""
choices = get_generations_list()
if choices:
return gr.Dropdown(choices=choices, value=choices[0][1])
return gr.Dropdown(choices=[], value=None)
# Build Gradio Interface
demo = gr.Blocks(title="Hunyuan 3D Studio")
with demo:
# Add custom CSS
gr.HTML("""
<style>
.gradio-container {
max-width: 1200px !important;
}
.status-box {
border-radius: 8px;
padding: 1rem;
margin: 1rem 0;
}
</style>
""")
gr.Markdown("""
# 🎨 THE GUYBERS 3D STUDIO
### Transform your imagination into stunning 3D models
Powered by Tencent Cloud Hunyuan 3D API
""")
with gr.Tabs():
# Tab 1: Generate
with gr.Tab("πŸš€ Generate"):
with gr.Row():
with gr.Column(scale=2):
prompt = gr.Textbox(
label="Text Prompt",
placeholder="Describe your 3D model... (e.g., A futuristic sci-fi spaceship with sleek curves and neon lights)",
lines=3
)
with gr.Accordion("πŸ“Έ Image Input", open=False):
image = gr.Image(label="Upload Image", type="pil")
image_url = gr.Textbox(label="Or Image URL", placeholder="https://example.com/image.jpg")
gr.Markdown("*Note: Either Prompt OR Image is required. Cannot use both together (except in Sketch mode).*")
with gr.Accordion("🎭 Multi-View Images (Optional)", open=False):
multi_left = gr.Image(label="Left View", type="pil")
multi_right = gr.Image(label="Right View", type="pil")
multi_back = gr.Image(label="Back View", type="pil")
with gr.Column(scale=1):
gr.Markdown("### βš™οΈ Settings")
generate_type = gr.Dropdown(
choices=["Normal", "LowPoly", "Geometry", "Sketch"],
value="Normal",
label="Generate Type"
)
face_count = gr.Slider(
minimum=40000,
maximum=1500000,
value=500000,
step=10000,
label="Face Count"
)
polygon_type = gr.Dropdown(
choices=["triangle", "quadrilateral"],
value="triangle",
label="Polygon Type (LowPoly only)"
)
enable_pbr = gr.Checkbox(label="Enable PBR Materials", value=False)
submit_btn = gr.Button("✨ Generate 3D Model", variant="primary", size="lg")
output = gr.Markdown(label="Result")
submit_btn.click(
fn=submit_generation,
inputs=[
prompt, image, image_url,
multi_left, multi_right, multi_back,
generate_type, face_count, polygon_type, enable_pbr
],
outputs=output
)
# Tab 2: Check Status
with gr.Tab("πŸ” Check Status"):
gr.Markdown("### Check the status of a specific job")
with gr.Row():
job_id_input = gr.Textbox(label="Job ID", placeholder="Enter Job ID to check status")
check_btn = gr.Button("Check Status", variant="secondary")
status_output = gr.Markdown(label="Status")
check_btn.click(
fn=check_job_status,
inputs=job_id_input,
outputs=status_output
)
# Tab 3: View Generations
with gr.Tab("πŸ“¦ View Generations"):
gr.Markdown("### All your generations")
with gr.Row():
refresh_btn = gr.Button("πŸ”„ Refresh List", variant="secondary")
generations_dropdown = gr.Dropdown(
label="Select Generation",
choices=get_generations_list(),
interactive=True
)
with gr.Row():
with gr.Column(scale=1):
generation_info = gr.Markdown("Select a generation to view details")
with gr.Column(scale=1):
preview_image = gr.Image(label="Preview Image")
# Event handlers
refresh_btn.click(
fn=refresh_generations_dropdown,
outputs=generations_dropdown
)
generations_dropdown.change(
fn=load_generation_details,
inputs=generations_dropdown,
outputs=[generation_info, preview_image]
)
gr.Markdown("""
---
### πŸ“ Notes:
- Generations typically take 2-5 minutes
- All generations are stored and can be viewed in the 'View Generations' tab
- You can run multiple generations simultaneously
- Download links are available once generation is complete
""")
# Launch with authentication
if __name__ == "__main__":
demo.queue()
demo.launch(
auth=("user", APP_PASSWORD),
auth_message="Enter password to access Hunyuan 3D Studio",
share=False
)