import gradio as gr import os import json import time import hmac import hashlib import requests from datetime import datetime from pathlib import Path import sqlite3 import zipfile import io import tempfile # Configuration ENDPOINT = 'hunyuan.intl.tencentcloudapi.com' SERVICE = 'hunyuan' VERSION = '2023-09-01' REGION = 'ap-singapore' # Get credentials from environment variables SECRET_ID = os.environ.get('TENCENT_SECRET_ID', '') SECRET_KEY = os.environ.get('TENCENT_SECRET_KEY', '') APP_PASSWORD = os.environ.get('APP_PASSWORD', 'hunyuan3d') # Database setup DB_PATH = 'generations.db' MODELS_DIR = 'saved_models' # Create models directory if it doesn't exist Path(MODELS_DIR).mkdir(exist_ok=True) def init_database(): """Initialize SQLite database for storing generations""" conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() cursor.execute(''' CREATE TABLE IF NOT EXISTS generations ( id INTEGER PRIMARY KEY AUTOINCREMENT, job_id TEXT UNIQUE, prompt TEXT, status TEXT, created_at TIMESTAMP, completed_at TIMESTAMP, result_files TEXT, preview_image TEXT, settings TEXT, local_files TEXT ) ''') conn.commit() conn.close() init_database() def sha256(message): """Calculate SHA256 hash""" return hashlib.sha256(message.encode('utf-8')).hexdigest() def hmac_sha256(key, message): """Calculate HMAC-SHA256""" if isinstance(key, str): key = key.encode('utf-8') return hmac.new(key, message.encode('utf-8'), hashlib.sha256).digest() def hmac_sha256_hex(key, message): """Calculate HMAC-SHA256 and return hex""" if isinstance(key, str): key = key.encode('utf-8') return hmac.new(key, message.encode('utf-8'), hashlib.sha256).hexdigest() def make_request(action, payload): """Make authenticated request to Tencent Cloud API""" timestamp = int(time.time()) date = datetime.utcfromtimestamp(timestamp).strftime('%Y-%m-%d') payload_str = json.dumps(payload) # Build canonical request http_request_method = 'POST' canonical_uri = '/' canonical_query_string = '' canonical_headers = f'content-type:application/json; charset=utf-8\nhost:{ENDPOINT}\nx-tc-action:{action.lower()}\n' signed_headers = 'content-type;host;x-tc-action' hashed_request_payload = sha256(payload_str) canonical_request = f'{http_request_method}\n{canonical_uri}\n{canonical_query_string}\n{canonical_headers}\n{signed_headers}\n{hashed_request_payload}' # Build string to sign algorithm = 'TC3-HMAC-SHA256' hashed_canonical_request = sha256(canonical_request) credential_scope = f'{date}/{SERVICE}/tc3_request' string_to_sign = f'{algorithm}\n{timestamp}\n{credential_scope}\n{hashed_canonical_request}' # Calculate signature k_date = hmac_sha256(f'TC3{SECRET_KEY}', date) k_service = hmac_sha256(k_date, SERVICE) k_signing = hmac_sha256(k_service, 'tc3_request') signature = hmac_sha256_hex(k_signing, string_to_sign) # Build authorization header authorization = f'{algorithm} Credential={SECRET_ID}/{credential_scope}, SignedHeaders={signed_headers}, Signature={signature}' # Make request headers = { 'Content-Type': 'application/json; charset=utf-8', 'Host': ENDPOINT, 'X-TC-Action': action, 'X-TC-Version': VERSION, 'X-TC-Timestamp': str(timestamp), 'X-TC-Region': REGION, 'Authorization': authorization, } response = requests.post(f'https://{ENDPOINT}', headers=headers, data=payload_str) return response.json() def save_generation(job_id, prompt, settings): """Save generation to database""" conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() cursor.execute(''' INSERT OR REPLACE INTO generations (job_id, prompt, status, created_at, settings) VALUES (?, ?, ?, ?, ?) ''', (job_id, prompt, 'WAIT', datetime.now(), json.dumps(settings))) conn.commit() conn.close() def update_generation(job_id, status, result_files=None, preview_image=None): """Update generation status in database and download files""" conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() local_files = None if status == 'DONE' and result_files: # Download and save files locally local_files = download_and_save_models(job_id, result_files) cursor.execute(''' UPDATE generations SET status = ?, completed_at = ?, result_files = ?, preview_image = ?, local_files = ? WHERE job_id = ? ''', (status, datetime.now(), json.dumps(result_files), preview_image, json.dumps(local_files) if local_files else None, job_id)) else: cursor.execute(''' UPDATE generations SET status = ? WHERE job_id = ? ''', (status, job_id)) conn.commit() conn.close() def get_all_generations(): """Get all generations from database""" conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() cursor.execute(''' SELECT job_id, prompt, status, created_at, completed_at, result_files, preview_image, local_files FROM generations ORDER BY created_at DESC ''') rows = cursor.fetchall() conn.close() return rows def submit_generation( prompt, image, image_url, multi_left, multi_right, multi_back, generate_type, face_count, polygon_type, enable_pbr ): """Submit 3D generation job""" if not SECRET_ID or not SECRET_KEY: return "❌ Error: API credentials not configured. Please set TENCENT_SECRET_ID and TENCENT_SECRET_KEY environment variables." import base64 from io import BytesIO # Build payload payload = {} # Text prompt or image if prompt and prompt.strip(): payload['Prompt'] = prompt.strip() if image is not None: # Convert PIL image to base64 buffered = BytesIO() image.save(buffered, format="PNG") img_base64 = base64.b64encode(buffered.getvalue()).decode('utf-8') payload['ImageBase64'] = img_base64 elif image_url and image_url.strip(): payload['ImageUrl'] = image_url.strip() # Multi-view images (convert uploaded images to base64) multi_view_array = [] if multi_left is not None: buffered = BytesIO() multi_left.save(buffered, format="PNG") img_base64 = base64.b64encode(buffered.getvalue()).decode('utf-8') multi_view_array.append({'ViewType': 'left', 'ViewImageBase64': img_base64}) if multi_right is not None: buffered = BytesIO() multi_right.save(buffered, format="PNG") img_base64 = base64.b64encode(buffered.getvalue()).decode('utf-8') multi_view_array.append({'ViewType': 'right', 'ViewImageBase64': img_base64}) if multi_back is not None: buffered = BytesIO() multi_back.save(buffered, format="PNG") img_base64 = base64.b64encode(buffered.getvalue()).decode('utf-8') multi_view_array.append({'ViewType': 'back', 'ViewImageBase64': img_base64}) if multi_view_array: payload['MultiViewImages'] = multi_view_array # Settings payload['GenerateType'] = generate_type payload['FaceCount'] = face_count if generate_type == 'LowPoly': payload['PolygonType'] = polygon_type payload['EnablePBR'] = enable_pbr # Validate if not payload.get('Prompt') and not payload.get('ImageUrl') and not payload.get('ImageBase64'): return "❌ Error: Please provide either a text prompt or an image." try: # Submit job response = make_request('SubmitHunyuanTo3DProJob', payload) if 'Response' in response: if 'Error' in response['Response']: return f"❌ API Error: {response['Response']['Error']['Message']}" job_id = response['Response'].get('JobId') if job_id: # Save to database settings = { 'generate_type': generate_type, 'face_count': face_count, 'polygon_type': polygon_type, 'enable_pbr': enable_pbr } save_generation(job_id, prompt or 'Image-to-3D', settings) return f"✅ Job submitted successfully!\n\n**Job ID:** {job_id}\n\nYour generation has been queued. Check the 'View Generations' tab to monitor progress." else: return "❌ Error: No Job ID returned from API" else: return f"❌ Error: Unexpected response format: {response}" except Exception as e: return f"❌ Error: {str(e)}" def check_job_status(job_id): """Check the status of a job""" if not job_id or not job_id.strip(): return "⚠️ Please enter a Job ID" try: payload = {'JobId': job_id.strip()} response = make_request('QueryHunyuanTo3DProJob', payload) if 'Response' in response: if 'Error' in response['Response']: return f"❌ API Error: {response['Response']['Error']['Message']}" status = response['Response'].get('Status', 'UNKNOWN') # Update database if status == 'DONE': result_files = response['Response'].get('ResultFile3Ds', []) preview_image = result_files[0].get('PreviewImageUrl') if result_files else None update_generation(job_id, status, result_files, preview_image) elif status in ['FAIL', 'WAIT', 'RUN']: update_generation(job_id, status) # Format response result = f"**Job ID:** {job_id}\n**Status:** {status}\n\n" if status == 'DONE': result += "✅ **Generation Complete!**\n\n" result_files = response['Response'].get('ResultFile3Ds', []) for i, file in enumerate(result_files): result += f"**File {i+1}:**\n" result += f"- Type: {file.get('Type', 'N/A')}\n" result += f"- URL: {file.get('Url', 'N/A')}\n" if file.get('PreviewImageUrl'): result += f"- Preview: {file.get('PreviewImageUrl')}\n" result += "\n" elif status == 'FAIL': error_msg = response['Response'].get('ErrorMessage', 'Unknown error') result += f"❌ **Generation Failed**\nError: {error_msg}" elif status == 'WAIT': result += "⏳ **Waiting in queue...**" elif status == 'RUN': result += "🎨 **Generating your 3D model...**" return result else: return f"❌ Error: Unexpected response format" except Exception as e: return f"❌ Error: {str(e)}" def get_generations_list(): """Get list of generations for dropdown""" generations = get_all_generations() if not generations: return [] choices = [] for gen in generations: job_id, prompt, status, created_at, completed_at, result_files, preview_image, local_files = gen status_emoji = { 'DONE': '✅', 'FAIL': '❌', 'WAIT': '⏳', 'RUN': '🎨' }.get(status, '❓') label = f"{status_emoji} {job_id[:20]}... - {prompt[:30]}... ({status})" choices.append((label, job_id)) return choices def download_and_save_models(job_id, result_files): """Download and save model files locally""" saved_files = [] try: # Create job directory job_dir = Path(MODELS_DIR) / job_id job_dir.mkdir(parents=True, exist_ok=True) for file_info in result_files: file_url = file_info.get('Url') file_type = file_info.get('Type', 'model') if not file_url: continue # Download the ZIP file response = requests.get(file_url, timeout=60) response.raise_for_status() # Save ZIP file zip_path = job_dir / f"{file_type}.zip" with open(zip_path, 'wb') as f: f.write(response.content) # Extract ZIP extract_dir = job_dir / file_type extract_dir.mkdir(exist_ok=True) with zipfile.ZipFile(zip_path, 'r') as zip_ref: zip_ref.extractall(extract_dir) # Find all extracted files extracted_files = [] for root, dirs, files in os.walk(extract_dir): for file in files: rel_path = os.path.relpath(os.path.join(root, file), MODELS_DIR) extracted_files.append(rel_path) saved_files.append({ 'type': file_type, 'extracted_files': extracted_files, 'download_url': file_url, 'zip_path': str(zip_path.relative_to(MODELS_DIR)) }) return saved_files except Exception as e: print(f"Error downloading/saving models: {e}") return None def load_generation_details(job_id): """Load details for a specific generation""" if not job_id: return "Select a generation to view details", None conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() cursor.execute(''' SELECT job_id, prompt, status, created_at, completed_at, result_files, preview_image, local_files FROM generations WHERE job_id = ? ''', (job_id,)) gen = cursor.fetchone() conn.close() if not gen: return "Generation not found", None job_id, prompt, status, created_at, completed_at, result_files, preview_image, local_files = gen status_emoji = { 'DONE': '✅', 'FAIL': '❌', 'WAIT': '⏳', 'RUN': '🎨' }.get(status, '❓') # Build info text info = f"## {status_emoji} Generation Details\n\n" info += f"**Job ID:** {job_id}\n" info += f"**Prompt:** {prompt}\n" info += f"**Status:** {status}\n" info += f"**Created:** {created_at}\n" if completed_at: info += f"**Completed:** {completed_at}\n" preview_url = None if status == 'DONE' and result_files: files = json.loads(result_files) info += "\n### 📁 Download Links:\n" for file in files: file_type = file.get('Type', 'N/A') file_url = file.get('Url', 'N/A') info += f"- **{file_type}**: [Download]({file_url})\n" # Get preview image if file.get('PreviewImageUrl') and not preview_url: preview_url = file.get('PreviewImageUrl') # Show local files if available if local_files: local_files_data = json.loads(local_files) info += "\n### 💾 Saved Files:\n" for saved in local_files_data: info += f"\n**{saved['type']}**:\n" info += f"- ZIP: `{saved['zip_path']}`\n" info += f"- Extracted {len(saved['extracted_files'])} files\n" return info, preview_url def refresh_generations_dropdown(): """Refresh the generations dropdown""" choices = get_generations_list() if choices: return gr.Dropdown(choices=choices, value=choices[0][1]) return gr.Dropdown(choices=[], value=None) # Build Gradio Interface demo = gr.Blocks(title="Hunyuan 3D Studio") with demo: # Add custom CSS gr.HTML(""" """) gr.Markdown(""" # 🎨 THE GUYBERS 3D STUDIO ### Transform your imagination into stunning 3D models Powered by Tencent Cloud Hunyuan 3D API """) with gr.Tabs(): # Tab 1: Generate with gr.Tab("🚀 Generate"): with gr.Row(): with gr.Column(scale=2): prompt = gr.Textbox( label="Text Prompt", placeholder="Describe your 3D model... (e.g., A futuristic sci-fi spaceship with sleek curves and neon lights)", lines=3 ) with gr.Accordion("📸 Image Input", open=False): image = gr.Image(label="Upload Image", type="pil") image_url = gr.Textbox(label="Or Image URL", placeholder="https://example.com/image.jpg") gr.Markdown("*Note: Either Prompt OR Image is required. Cannot use both together (except in Sketch mode).*") with gr.Accordion("🎭 Multi-View Images (Optional)", open=False): multi_left = gr.Image(label="Left View", type="pil") multi_right = gr.Image(label="Right View", type="pil") multi_back = gr.Image(label="Back View", type="pil") with gr.Column(scale=1): gr.Markdown("### ⚙️ Settings") generate_type = gr.Dropdown( choices=["Normal", "LowPoly", "Geometry", "Sketch"], value="Normal", label="Generate Type" ) face_count = gr.Slider( minimum=40000, maximum=1500000, value=500000, step=10000, label="Face Count" ) polygon_type = gr.Dropdown( choices=["triangle", "quadrilateral"], value="triangle", label="Polygon Type (LowPoly only)" ) enable_pbr = gr.Checkbox(label="Enable PBR Materials", value=False) submit_btn = gr.Button("✨ Generate 3D Model", variant="primary", size="lg") output = gr.Markdown(label="Result") submit_btn.click( fn=submit_generation, inputs=[ prompt, image, image_url, multi_left, multi_right, multi_back, generate_type, face_count, polygon_type, enable_pbr ], outputs=output ) # Tab 2: Check Status with gr.Tab("🔍 Check Status"): gr.Markdown("### Check the status of a specific job") with gr.Row(): job_id_input = gr.Textbox(label="Job ID", placeholder="Enter Job ID to check status") check_btn = gr.Button("Check Status", variant="secondary") status_output = gr.Markdown(label="Status") check_btn.click( fn=check_job_status, inputs=job_id_input, outputs=status_output ) # Tab 3: View Generations with gr.Tab("📦 View Generations"): gr.Markdown("### All your generations") with gr.Row(): refresh_btn = gr.Button("🔄 Refresh List", variant="secondary") generations_dropdown = gr.Dropdown( label="Select Generation", choices=get_generations_list(), interactive=True ) with gr.Row(): with gr.Column(scale=1): generation_info = gr.Markdown("Select a generation to view details") with gr.Column(scale=1): preview_image = gr.Image(label="Preview Image") # Event handlers refresh_btn.click( fn=refresh_generations_dropdown, outputs=generations_dropdown ) generations_dropdown.change( fn=load_generation_details, inputs=generations_dropdown, outputs=[generation_info, preview_image] ) gr.Markdown(""" --- ### 📝 Notes: - Generations typically take 2-5 minutes - All generations are stored and can be viewed in the 'View Generations' tab - You can run multiple generations simultaneously - Download links are available once generation is complete """) # Launch with authentication if __name__ == "__main__": demo.queue() demo.launch( auth=("user", APP_PASSWORD), auth_message="Enter password to access Hunyuan 3D Studio", share=False )