Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import numpy as np | |
| from PIL import Image | |
| from transformers import pipeline | |
| from gradio_client import Client, handle_file | |
| import tempfile | |
| import os | |
| import json | |
| import torch | |
| # --- CONFIGURATION --- | |
| SPACE_HUNYUAN = "tencent/Hunyuan3D-2" | |
| SPACE_TRELLIS = "JeffreyXiang/TRELLIS" | |
| # --- LOCAL PIPELINES --- | |
| try: | |
| from hy3dgen.texgen import Hunyuan3DPaintPipeline | |
| from hy3dgen.shapegen import Hunyuan3DDiTFlowMatchingPipeline | |
| LOCAL_PIPELINE_AVAILABLE = True | |
| except ImportError: | |
| LOCAL_PIPELINE_AVAILABLE = False | |
| # --- LOAD LOCAL MODELS --- | |
| print("Loading RMBG-1.4 model...") | |
| rmbg_pipe = pipeline("image-segmentation", model="briaai/RMBG-1.4", trust_remote_code=True) | |
| print("Model loaded!") | |
| # Global pipelines (lazy loaded) | |
| dit_pipeline = None | |
| paint_pipeline = None | |
| def load_local_pipelines(): | |
| global dit_pipeline, paint_pipeline | |
| if LOCAL_PIPELINE_AVAILABLE: | |
| if dit_pipeline is None: | |
| print("Loading Hunyuan3DDiTFlowMatchingPipeline...") | |
| dit_pipeline = Hunyuan3DDiTFlowMatchingPipeline.from_pretrained('tencent/Hunyuan3D-2') | |
| if paint_pipeline is None: | |
| print("Loading Hunyuan3DPaintPipeline...") | |
| paint_pipeline = Hunyuan3DPaintPipeline.from_pretrained('tencent/Hunyuan3D-2') | |
| # --- IMAGE PROCESSING --- | |
| def remove_background(image: Image.Image) -> Image.Image: | |
| """Stage 1: Remove background using RMBG-1.4""" | |
| result = rmbg_pipe(image) | |
| return result | |
| def add_studio_background(image_no_bg: Image.Image, bg_type: str) -> Image.Image: | |
| """Stage 2: Add studio background""" | |
| if bg_type == "transparent": | |
| return image_no_bg | |
| bg_colors = { | |
| "white": (255, 255, 255), | |
| "dcr_dark": (10, 10, 10), | |
| "dcr_gradient": (10, 10, 10) | |
| } | |
| if image_no_bg.mode != "RGBA": | |
| image_no_bg = image_no_bg.convert("RGBA") | |
| bg = Image.new("RGBA", image_no_bg.size, bg_colors.get(bg_type, (255, 255, 255)) + (255,)) | |
| composite = Image.alpha_composite(bg, image_no_bg) | |
| return composite.convert("RGB") | |
| def upscale_lanczos(image: Image.Image, scale: float = 2.0) -> Image.Image: | |
| """Stage 3: Upscale image using Lanczos interpolation""" | |
| new_width = int(image.width * scale) | |
| new_height = int(image.height * scale) | |
| max_dimension = 4096 | |
| if new_width > max_dimension or new_height > max_dimension: | |
| ratio = min(max_dimension / new_width, max_dimension / new_height) | |
| new_width = int(new_width * ratio) | |
| new_height = int(new_height * ratio) | |
| return image.resize((new_width, new_height), Image.LANCZOS) | |
| def process_vehicle(image: Image.Image, background_type: str = "white", upscale_factor: float = 2.0) -> tuple[Image.Image, Image.Image, Image.Image]: | |
| """Pipeline: Remove BG -> Add Studio BG -> Upscale""" | |
| if image is None: | |
| raise gr.Error("Please upload an image first") | |
| no_bg = remove_background(image) | |
| with_bg = add_studio_background(no_bg, background_type) | |
| upscaled = upscale_lanczos(with_bg, upscale_factor) | |
| return no_bg, with_bg, upscaled | |
| # --- 3D GENERATION --- | |
| def generate_with_trellis(image_path: str, logs: list) -> str: | |
| """Attempt textured generation using Trellis Space""" | |
| logs.append("🔵 ENGINE: TRELLIS (Textured)") | |
| client = Client(SPACE_TRELLIS) | |
| logs.append("1. Generating 3D Asset (Video)...") | |
| # Trellis requires an upload. | |
| # Note: We use try/except in the calling function, but here we expect 'gradio_client' errors | |
| result_video = client.predict( | |
| image=handle_file(image_path), | |
| multiimages=[], | |
| seed=0, | |
| ss_guidance_strength=7.5, | |
| ss_sampling_steps=12, | |
| slat_guidance_strength=3.0, | |
| slat_sampling_steps=12, | |
| multiimage_algo="stochastic", | |
| api_name="/image_to_3d" | |
| ) | |
| logs.append(f"Video generated: {result_video}") | |
| logs.append("2. Extracting GLB Model...") | |
| # This call relies on the session state from step 1 | |
| result_glb_tuple = client.predict( | |
| mesh_simplify=0.95, | |
| texture_size=1024, | |
| api_name="/extract_glb" | |
| ) | |
| # Trellis returns (LitModel3D_file, Download_Path) | |
| # Usually index 1 is the .glb file path for download | |
| logs.append(f"Extraction result type: {type(result_glb_tuple)}") | |
| final_path = None | |
| if isinstance(result_glb_tuple, (list, tuple)): | |
| # Inspect items | |
| for i, item in enumerate(result_glb_tuple): | |
| logs.append(f"Item {i}: {item}") | |
| if isinstance(item, str) and item.endswith('.glb'): | |
| final_path = item | |
| if not final_path and len(result_glb_tuple) > 0: | |
| # Fallback to first item if it looks like a file | |
| final_path = result_glb_tuple[-1] | |
| else: | |
| final_path = result_glb_tuple | |
| # Handle dictionary return if any | |
| if isinstance(final_path, dict) and 'value' in final_path: | |
| final_path = final_path['value'] | |
| if not final_path: | |
| raise ValueError("Trellis did not return a valid GLB path.") | |
| return final_path | |
| def generate_with_hunyuan_remote(image_path: str, logs: list) -> str: | |
| """Fallback generation using Hunyuan Remote (Geometry Only)""" | |
| logs.append("🟠 ENGINE: HUNYUAN REMOTE (Geometry Only)") | |
| client = Client(SPACE_HUNYUAN) | |
| logs.append("Calling /shape_generation...") | |
| result = client.predict( | |
| caption="A 3D model of a vehicle", | |
| image=handle_file(image_path), | |
| mv_image_front=None, | |
| mv_image_back=None, | |
| mv_image_left=None, | |
| mv_image_right=None, | |
| steps=30, | |
| guidance_scale=5.0, | |
| seed=1234, | |
| octree_resolution=256, | |
| check_box_rembg=True, | |
| num_chunks=8000, | |
| randomize_seed=True, | |
| api_name="/shape_generation" | |
| ) | |
| final_path = None | |
| if isinstance(result, tuple): | |
| final_path = result[0] | |
| else: | |
| final_path = result | |
| # Handle dictionary return | |
| if isinstance(final_path, dict) and 'value' in final_path: | |
| final_path = final_path['value'] | |
| return final_path | |
| def generate_3d_model(image: Image.Image, engine_choice: str = "Trellis (Textured)", use_local: bool = False) -> tuple[str, str, str]: | |
| """Master 3D Generation Function""" | |
| debug_logs = [] | |
| if image is None: | |
| raise gr.Error("Please upload an image first") | |
| # Save temp image | |
| with tempfile.NamedTemporaryFile(suffix=".png", delete=False) as f: | |
| image.save(f.name) | |
| temp_path = f.name | |
| debug_logs.append(f"Input image saved to {temp_path}") | |
| final_model_path = None | |
| # --- STRATEGY: LOCAL (GPU) --- | |
| if use_local and LOCAL_PIPELINE_AVAILABLE: | |
| try: | |
| debug_logs.append("🟣 ENGINE: LOCAL HUNYUAN (Textured)") | |
| load_local_pipelines() | |
| mesh = dit_pipeline(image=temp_path)[0] | |
| mesh = paint_pipeline(mesh, image=temp_path) | |
| output_path = tempfile.mktemp(suffix=".glb") | |
| mesh.export(output_path) | |
| debug_logs.append(f"Local success: {output_path}") | |
| return output_path, output_path, "\n".join(debug_logs) | |
| except Exception as e: | |
| debug_logs.append(f"❌ Local Failed: {str(e)}") | |
| # Fall through to remote strategies | |
| # --- STRATEGY: REMOTE TRELLIS --- | |
| if "Trellis" in engine_choice: | |
| try: | |
| final_model_path = generate_with_trellis(temp_path, debug_logs) | |
| debug_logs.append(f"✅ Trellis Success: {final_model_path}") | |
| return final_model_path, final_model_path, "\n".join(debug_logs) | |
| except Exception as e: | |
| debug_logs.append(f"❌ Trellis Failed: {str(e)}") | |
| debug_logs.append("⚠️ Falling back to Hunyuan (Geometry Only)...") | |
| # Fall through to Hunyuan | |
| # --- STRATEGY: REMOTE HUNYUAN (Fallback for everything) --- | |
| try: | |
| final_model_path = generate_with_hunyuan_remote(temp_path, debug_logs) | |
| debug_logs.append(f"✅ Hunyuan Success: {final_model_path}") | |
| return final_model_path, final_model_path, "\n".join(debug_logs) | |
| except Exception as e: | |
| debug_logs.append(f"❌ Hunyuan Failed: {str(e)}") | |
| debug_logs.append("💀 All engines failed.") | |
| return None, None, "\n".join(debug_logs) | |
| # --- GLM-IMAGE EDITING --- | |
| import requests | |
| import io | |
| import traceback | |
| import json | |
| import time | |
| def edit_image_with_glm(image: Image.Image, prompt: str, strength: float = 0.8) -> tuple[Image.Image, str]: | |
| """ | |
| Edit image using zai-org/GLM-Image via Custom HF Router. | |
| Handles Async API polling. | |
| Endpoint: https://router.huggingface.co/zai-org/api/paas/v4/async/images/generations | |
| """ | |
| logs = [] | |
| if image is None: | |
| logs.append("Error: No image provided") | |
| return None, "\n".join(logs) | |
| if not prompt: | |
| logs.append("Error: No prompt provided") | |
| return None, "\n".join(logs) | |
| hf_token = os.environ.get("HF_TOKEN") | |
| if not hf_token: | |
| logs.append("Warning: HF_TOKEN not found in environment") | |
| return None, "Error: HF_TOKEN secret is missing" | |
| base_url = "https://router.huggingface.co/zai-org/api/paas/v4" | |
| create_url = f"{base_url}/async/images/generations" | |
| headers = { | |
| "Authorization": f"Bearer {hf_token}", | |
| "Content-Type": "application/json" | |
| } | |
| import base64 | |
| buffered = io.BytesIO() | |
| image.save(buffered, format="PNG") | |
| img_str = base64.b64encode(buffered.getvalue()).decode() | |
| payload = { | |
| "model": "glm-image", | |
| "prompt": prompt, | |
| "image_url": {"url": f"data:image/png;base64,{img_str}"}, | |
| # "image": f"data:image/png;base64,{img_str}", # Using standard Zhipu/OpenAI format preference | |
| "parameters": { | |
| "strength": strength | |
| } | |
| } | |
| logs.append(f"1. Sending Async Request to {create_url}") | |
| logs.append(f"Prompt: {prompt}") | |
| try: | |
| response = requests.post(create_url, headers=headers, json=payload, timeout=60) | |
| logs.append(f"Status: {response.status_code}") | |
| if response.status_code != 200: | |
| logs.append(f"Error Response: {response.text}") | |
| return None, "\n".join(logs) | |
| resp_json = response.json() | |
| logs.append(f"Create Response: {json.dumps(resp_json, indent=2)}") | |
| task_id = resp_json.get("id") or resp_json.get("request_id") | |
| task_status = resp_json.get("task_status") | |
| if not task_id: | |
| logs.append("Error: No Task ID returned") | |
| return None, "\n".join(logs) | |
| # POLLING LOOP | |
| logs.append(f"2. Polling for Task ID: {task_id}") | |
| max_retries = 90 # Increased wait time to ~3 minutes | |
| for i in range(max_retries): | |
| time.sleep(2) # Wait 2s between checks | |
| result_url = f"{base_url}/async-result/{task_id}" | |
| poll_resp = requests.get(result_url, headers=headers, timeout=30) | |
| if poll_resp.status_code != 200: | |
| logs.append(f"Poll Failed ({poll_resp.status_code}): {poll_resp.text}") | |
| continue # Retry? | |
| poll_data = poll_resp.json() | |
| status = poll_data.get("task_status") | |
| logs.append(f"Poll {i+1}: {status}") | |
| if status == "SUCCESS": | |
| logs.append("Task Completed Successfully!") | |
| # Extract image | |
| # Typical legacy response: { "image_result": [ { "url": ... } ] } | |
| # Or standard: { "data": [ { "url": ... } ] } | |
| img_url = None | |
| # Check known keys | |
| items = poll_data.get('items') or poll_data.get('data') or poll_data.get('choices') or poll_data.get('image_result') | |
| if items and len(items) > 0: | |
| first = items[0] | |
| img_url = first.get('url') or first.get('image') | |
| b64 = first.get('b64_json') | |
| if img_url: | |
| logs.append(f"Downloading Result: {img_url}") | |
| return Image.open(requests.get(img_url, stream=True).raw), "\n".join(logs) | |
| elif b64: | |
| logs.append("Decoding Base64 Result...") | |
| return Image.open(io.BytesIO(base64.b64decode(b64))), "\n".join(logs) | |
| logs.append(f"Success but no image found in keys: {poll_data.keys()}") | |
| logs.append(f"Full Dump: {json.dumps(poll_data)}") | |
| return None, "\n".join(logs) | |
| elif status == "FAIL" or status == "FAILED": | |
| logs.append(f"Task Failed: {poll_data}") | |
| return None, "\n".join(logs) | |
| # If PROCESSING, continue loop | |
| logs.append("Timeout: Task did not complete in time.") | |
| return None, "\n".join(logs) | |
| except Exception as e: | |
| logs.append(f"Exception: {str(e)}") | |
| logs.append(traceback.format_exc()) | |
| return None, "\n".join(logs) | |
| # --- UI --- | |
| with gr.Blocks(title="DCR Vehicle Studio", theme=gr.themes.Monochrome()) as demo: | |
| gr.Markdown("# 🚗 DCR Vehicle Studio") | |
| gr.Markdown("Remove background, add studio backgrounds, upscale, and generate 3D models") | |
| with gr.Tabs(): | |
| with gr.Tab("📸 Image Processing"): | |
| # ... (Existing Image Tab Code) ... | |
| with gr.Row(): | |
| with gr.Column(): | |
| input_image = gr.Image(type="pil", label="Input Vehicle Photo") | |
| bg_type = gr.Dropdown( | |
| choices=["white", "dcr_dark", "dcr_gradient", "transparent"], | |
| value="white", | |
| label="Background Type" | |
| ) | |
| upscale_factor = gr.Slider(minimum=1.0, maximum=4.0, value=2.0, step=0.5, label="Upscale Factor") | |
| process_btn = gr.Button("🎨 Process Image", variant="primary") | |
| with gr.Column(): | |
| out_no_bg = gr.Image(label="Stage 1: Background Removed") | |
| out_with_bg = gr.Image(label="Stage 2: Studio Background") | |
| out_upscaled = gr.Image(label="Stage 3: Upscaled") | |
| process_btn.click( | |
| fn=process_vehicle, | |
| inputs=[input_image, bg_type, upscale_factor], | |
| outputs=[out_no_bg, out_with_bg, out_upscaled] | |
| ) | |
| with gr.Tab("✨ GLM Image Editor"): | |
| gr.Markdown("### Experimental: Z.AI GLM-Image Editor") | |
| gr.Markdown("Use `zai-org/GLM-Image` to modify your vehicle photos.") | |
| with gr.Row(): | |
| with gr.Column(): | |
| glm_input = gr.Image(type="pil", label="Input Image") | |
| glm_prompt = gr.Textbox(label="Editing Prompt", placeholder="e.g. Change the car color to red, add snow on the ground") | |
| glm_strength = gr.Slider(minimum=0.1, maximum=1.0, value=0.8, label="Transformation Strength (0.1 = subtle, 1.0 = heavy)") | |
| glm_btn = gr.Button("✨ Re-Imagine", variant="primary") | |
| glm_debug = gr.Textbox(label="API Debug Log", lines=10, interactive=False) | |
| with gr.Column(): | |
| glm_output = gr.Image(label="GLM Result") | |
| glm_btn.click( | |
| fn=edit_image_with_glm, | |
| inputs=[glm_input, glm_prompt, glm_strength], | |
| outputs=[glm_output, glm_debug] | |
| ) | |
| with gr.Tab("🎮 3D Generation"): | |
| gr.Markdown("### Generate 3D Model") | |
| gr.Markdown("Choose your engine. **Trellis** provides textures but is experimental. **Hunyuan** is geometry only.") | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| input_3d = gr.Image(type="pil", label="Vehicle Image") | |
| engine_choice = gr.Dropdown( | |
| choices=["Trellis (Textured)", "Hunyuan (Geometry Only)"], | |
| value="Trellis (Textured)", | |
| label="Generation Engine" | |
| ) | |
| use_local_cb = gr.Checkbox(label="Use Local Pipeline (Requires GPU)", value=False) | |
| generate_3d_btn = gr.Button("🎮 Generate 3D Model", variant="primary") | |
| output_file = gr.File(label="Download GLB") | |
| debug_text = gr.Textbox(label="Debug Logs", lines=15, interactive=False) | |
| with gr.Column(scale=2): | |
| output_3d_viewer = gr.Model3D( | |
| label="3D Model Viewer", | |
| clear_color=[0.1, 0.1, 0.1, 1.0], | |
| height=500 | |
| ) | |
| generate_3d_btn.click( | |
| fn=generate_3d_model, | |
| inputs=[input_3d, engine_choice, use_local_cb], | |
| outputs=[output_3d_viewer, output_file, debug_text] | |
| ) | |
| demo.launch() | |