0xZohar's picture
Add @spaces.GPU decorators and lazy loading for ZeroGPU
a27d916 verified
raw
history blame
20.8 kB
import gradio as gr
import os
import uuid
import shutil
import functools
from PIL import Image, ImageDraw, ImageFont
import numpy as np
import torch
# ZeroGPU Support - CRITICAL for HuggingFace Spaces
try:
import spaces
ZEROGPU_AVAILABLE = True
print("✅ ZeroGPU support enabled")
except ImportError:
print("⚠️ ZeroGPU not available - running in standard mode")
ZEROGPU_AVAILABLE = False
# Create dummy decorator for local development
class spaces:
@staticmethod
def GPU(duration=60):
def decorator(func):
return func
return decorator
#from cube3d.render.render_bricks import render_bricks
from cube3d.render.render_bricks_safe import render_bricks_safe
from cube3d.training.engine import Engine, EngineFast
from cube3d.training.bert_infer import generate_tokens
from cube3d.training.utils import normalize_bboxs
from cube3d.training.process_single_ldr import process_ldr_data, process_ldr_flatten, logits2botldrpr
from cube3d.config import HF_CACHE_DIR
# Neural design generation for text-to-LEGO functionality
try:
from clip_retrieval import get_retriever
CLIP_AVAILABLE = True
except ImportError:
print("⚠️ Text-to-design module not available. Text input feature will be disabled.")
CLIP_AVAILABLE = False
# Lazy loading for GPU models (ZeroGPU requirement)
_retriever = None
_gpt_engine = None
@functools.lru_cache(maxsize=1)
def get_clip_retriever_cached():
"""Lazy load CLIP retriever (initialized only once, cached)"""
print("🔧 Initializing CLIP retriever (one-time setup)...")
retriever = get_retriever(data_root="data/1313个筛选车结构和对照渲染图")
print(f"✅ CLIP retriever loaded ({retriever.features.shape[0]} designs)")
return retriever
@functools.lru_cache(maxsize=1)
def get_gpt_engine_cached():
"""Lazy load GPT engine (initialized only once, cached)"""
print("🔧 Initializing GPT engine (one-time setup)...")
config_path = 'cube3d/configs/open_model_v0.5.yaml'
gpt_ckpt_path = None # test mode doesn't use this
# Detect HuggingFace Spaces environment
is_hf_space = os.getenv("SPACE_ID") is not None
if is_hf_space:
from huggingface_hub import hf_hub_download
print(f"Loading GPT model from HuggingFace Model Hub...")
shape_ckpt_path = hf_hub_download(
repo_id="0xZohar/object-assembler-models",
filename="save_shape_cars_whole_p_rot_scratch_4mask_randp.safetensors",
cache_dir=HF_CACHE_DIR,
local_files_only=True
)
save_gpt_ckpt_path = shape_ckpt_path
print(f"✅ GPT model loaded from cache: {shape_ckpt_path}")
else:
shape_ckpt_path = 'model_weights/save_shape_cars_whole_p_rot_scratch_4mask_randp.safetensors'
save_gpt_ckpt_path = 'model_weights/save_shape_cars_whole_p_rot_scratch_4mask_randp.safetensors'
# ZeroGPU: Use fixed device='cuda', GPU allocation happens in @spaces.GPU functions
engine = EngineFast(
config_path, gpt_ckpt_path, shape_ckpt_path, save_gpt_ckpt_path,
device=torch.device('cuda'), # ZeroGPU manages this automatically
mode='test'
)
print("✅ GPT engine initialized")
return engine
# 确保临时目录存在(远程服务器路径)
TMP_DIR = "./tmp/ldr_processor_demo"
os.makedirs(TMP_DIR, exist_ok=True)
class MockFileStorage:
def __init__(self, file_path):
self.name = file_path # 关键:模拟文件路径属性,和 Gadio 保持一致
# 模型预测函数(保持原逻辑)
def model_predict(ldr_content):
parts = [line.strip() for line in ldr_content.splitlines() if line.strip()]
positions = [(120.0, 0, 180.0), (90.0, 0, 210.0), (90.0, 0, 180.0), (70.0, 0, 170.0)]
color_code = 115
result = []
for i, part in enumerate(parts):
pos = positions[i % len(positions)]
part_line = f"1 {color_code} {pos[0]} {pos[1]} {pos[2]} 0 0 1 0 1 0 -1 0 0 {part}"
result.append(part_line)
if i < len(parts) - 1:
result.append("0 STEP")
return "\n".join(result)
DEFAULT_PART_RENDER_PATH = "../data/car_1k/demos/example/part_ldr_1k_render/"
os.makedirs(DEFAULT_PART_RENDER_PATH, exist_ok=True)
def get_part_renderings(part_names):
renderings = []
for part in part_names:
# 拼接零件对应的渲染图路径(假设文件名与part_name一致,后缀为.png)
# 例如:part为"3001.dat" → 对应路径为 "./part_renders/3001.dat.png"
part_base = part.replace(".dat", "") # 统一转为小写并移除.dat
part_render_path = os.path.join(DEFAULT_PART_RENDER_PATH, f"{part_base}.png")
# 检查文件是否存在,不存在则使用默认缺失图(可选逻辑)
if not os.path.exists(part_render_path):
# 若需要,可指定一张"未知零件"的默认图路径
part_render_path = os.path.join(DEFAULT_PART_RENDER_PATH, "unknown_part.png")
renderings.append((part_render_path, part)) # (图片路径, 零件名)
return renderings
def process_data(data):
max_num_tokens = 410
processed_data = []
def padding(data, max_len=300):
pad_data = np.pad(data, ((0, max_len - data.shape[0]), (0, 0)), 'constant', constant_values=-1)
pad_data[data.shape[0]-max_len:,-1] = 1 #flag label
pad_data[data.shape[0]-max_len:,-2] = 0
return pad_data
processed_data.append(padding(data, max_num_tokens))
return processed_data
# 处理上传的LDR文件(保持原逻辑,增强异常捕获)
def process_ldr_file(file, process_for_model=True):
"""
Process LDR file for display and optionally for model inference
Args:
file: File object with .name attribute pointing to LDR file
process_for_model: If True, convert to numerical format for ML model (requires label mapping).
If False, skip numerical conversion (only extract parts for visualization).
Returns:
Tuple of (renderings, part_list, status, process_ldr_data, None)
"""
if not file:
return None, None, "Please upload an LDR file", None, None
# Read LDR content
with open(file.name, 'r') as f:
ldr_content = f.read()
# Extract part names for visualization (always needed)
part_names = []
for line in ldr_content.splitlines():
stripped_line = line.strip()
if stripped_line: # 跳过空行
parts = stripped_line.split()
# 检查第一列是否为'1',且行中至少有足够的元素
if len(parts) > 0 and parts[0] == '1' and len(parts) >= 12:
part_name = parts[-1].lower() # 取最后一列并转为小写
part_names.append(part_name)
renderings = get_part_renderings(part_names)
part_list = "\n".join(part_names)
# Conditionally process for ML model (requires label mapping)
if process_for_model:
with open(file.name, 'r') as f:
lines = f.readlines()
ldr_data, _ = process_ldr_flatten(lines)
# Sort
sort_cols = ldr_data[:, [-4, -5, -3]]
sort_idx = np.lexsort((sort_cols[:, 2], sort_cols[:, 1], sort_cols[:, 0]))
ldr_data = ldr_data[sort_idx]
process_ldr_data = process_data(ldr_data)
else:
# Skip numerical conversion - not needed for visualization
process_ldr_data = None
return renderings, part_list, f"File loaded, {len(part_names)} valid parts identified", process_ldr_data, None
# except Exception as e:
# return None, None, f"File processing failed: {str(e)}", None, None
# Process LDR from file system path (for text-generated designs)
def process_ldr_from_path(ldr_path, process_for_model=False):
"""
Process LDR file from file system path (not Gradio upload)
Args:
ldr_path: Absolute path to LDR file
process_for_model: If True, convert to numerical format for ML model.
If False (default), skip numerical conversion for visualization-only.
Returns:
Tuple of (renderings, part_list, status, process_ldr_data, None)
"""
if not os.path.exists(ldr_path):
return None, None, f"LDR file not found: {ldr_path}", None, None
# Create a mock file object to reuse process_ldr_file logic
class MockFile:
def __init__(self, path):
self.name = path
mock_file = MockFile(ldr_path)
return process_ldr_file(mock_file, process_for_model=process_for_model)
# Unified input handler: supports both file upload and text query
def unified_input_handler(file, text_query):
"""
Unified input handler for both file upload and text description
Priority:
1. If file is uploaded, use it
2. If text is provided, use CLIP retrieval
3. Otherwise, show error
"""
# Case 1: File upload (original flow)
if file is not None:
return process_ldr_file(file)
# Case 2: Text query (neural generation)
elif text_query and text_query.strip():
if not CLIP_AVAILABLE:
return None, None, "❌ Text-to-LEGO feature is not available (generation module not loaded)", None, None
try:
# Generate LDR design from text
query = text_query.strip()
print(f"🎨 Generating design from: {query}")
# Lazy load CLIP retriever (cached)
retriever = get_clip_retriever_cached()
result = retriever.get_best_match(query)
if result is None or not result.get("ldr_exists", True):
return None, None, f"❌ Could not generate design for '{query}'", None, None
ldr_path = result["ldr_path"]
confidence = result["similarity"]
car_id = result["car_id"]
print(f"✅ Found reference design: car_{car_id} (confidence: {confidence:.3f})")
# Process the LDR design for GPT model (WITH numerical conversion)
renderings, part_list, status, process_ldr_data, _ = process_ldr_from_path(
ldr_path,
process_for_model=True # Enable label mapping for GPT generation
)
# Check if numerical conversion succeeded
if process_ldr_data is None:
return None, None, f"❌ Failed to convert LDR to model format (missing label mappings)", None, None
# Generate new LDR using GPT model (GPU-accelerated)
new_ldr_filename = f"generated_{uuid.uuid4()}.ldr"
new_ldr_path = os.path.join(TMP_DIR, new_ldr_filename)
predicted_ldr_lines = generate_ldr_gpu(process_ldr_data, new_ldr_path)
# Render the GPT-generated LDR file
print(f"🎨 Rendering GPT-generated LEGO design...")
rendered_image = render_bricks_safe(new_ldr_path)
# Update status message with generation info
enhanced_status = f"✨ Generated from car_{car_id} (confidence: {confidence*100:.1f}%)\n🤖 GPT model created new assembly sequence\n{status}"
return renderings, part_list, enhanced_status, process_ldr_data, rendered_image
except Exception as e:
import traceback
error_msg = f"❌ Design generation failed: {str(e)}\n{traceback.format_exc()}"
print(error_msg)
return None, None, error_msg, None, None
# Case 3: No input
else:
return None, None, "⚠️ Please upload an LDR file OR enter a text description", None, None
import traceback # 导入traceback,用于打印完整堆栈
@spaces.GPU(duration=120) # GPT generation can take up to 120 seconds
def generate_ldr_gpu(ldr_content, ldr_path):
"""
Generate LDR file using GPT model (GPU-accelerated)
This function is decorated with @spaces.GPU to enable GPU allocation
on HuggingFace ZeroGPU Spaces. The engine is loaded lazily and cached.
Args:
ldr_content: Numerical LDR data (numpy array)
ldr_path: Output path for generated LDR file
Returns:
List of predicted LDR lines
"""
print("🤖 Running GPT model to generate new assembly sequence...")
print(" Using CUDA graphs (this will take some time to warmup)")
stride = 5
rot_num = 24
bert_shift = 1
shift = 0
# Lazy load GPT engine (cached, initialized only once)
engine = get_gpt_engine_cached()
# ZeroGPU: Device is always 'cuda' inside @spaces.GPU decorated functions
device = 'cuda'
print(" Graph compiled, starting generation...")
targets_source = torch.from_numpy(ldr_content[0]).to(device).unsqueeze(0)
targets = targets_source.clone()
logits, inputs_ids, strategy, mask, cut_idx = generate_tokens(
engine,
'',
targets,
None,
None,
False,
0.9,
None,
1,
'test'
)
targets = targets_source.clone()
targets[:,shift:,-7] = logits[:,1:-3:stride,:rot_num+1].permute(0, 2, 1).argmax(dim=1)
logits_x, inputs_ids, strategy, mask, cut_idx = generate_tokens(
engine,
'',
targets,
None,
None,
False,
0.9,
None,
0,
'test'
)
logits_x[:,1+bert_shift:-3:stride,:rot_num+1] = logits[:,1+bert_shift:-3:stride,:rot_num+1]
predict_ldr = logits2botldrpr(logits_x[0].cpu().detach().numpy(), inputs_ids[0].cpu().detach().numpy(), stride, 0, output_file=ldr_path)
print(f"✅ GPT generated {len(predict_ldr)} parts")
return predict_ldr
# CPU wrapper function for predict_and_render (non-GPU operations)
def predict_and_render(ldr_content):
"""
Predict and render LDR file (orchestrator function)
This function handles non-GPU operations (file I/O, rendering)
and calls GPU-accelerated functions when needed.
"""
if not ldr_content:
return "Please upload an LDR file first", None, None
ldr_filename = f"{uuid.uuid4()}.ldr"
ldr_path = os.path.join(TMP_DIR, ldr_filename)
# Call GPU-accelerated function
predicted_ldr = generate_ldr_gpu(ldr_content, ldr_path)
# 渲染新LDR
render_filename = f"{uuid.uuid4()}.png"
render_path = os.path.join(TMP_DIR, render_filename)
render_bricks_safe(ldr_path, render_path)
return predicted_ldr, ldr_path, render_path
#except Exception as e:
# error_msg = f"类型: {type(e).__name__}, 信息: {str(e)}, 堆栈: {traceback.format_exc()}"
# return f"Prediction failed: {error_msg}", None, None
# 清除临时文件(保持原逻辑)
def clean_temp_files():
try:
shutil.rmtree(TMP_DIR)
os.makedirs(TMP_DIR, exist_ok=True)
return "临时文件已清理"
except Exception as e:
return f"清理失败: {str(e)}"
#gr.Blocks.set_language("en")
_DESCRIPTION = '''
* **Option 1**: Upload an LDR file with part names
* **Option 2**: Describe your desired LEGO design in text (e.g., "red sports car")
* Generate a 3D assembly plan in LDR format
'''
with gr.Blocks(
title="ObjectAssembler: Assemble Your Object with Diverse Components",
) as demo:
gr.Markdown("ObjectAssembler: Assemble Your Object with Diverse Components")
gr.Markdown(_DESCRIPTION)
original_ldr = gr.State("")
with gr.Row():
with gr.Column(scale=1):
gr.Markdown("### Input Method")
ldr_file = gr.File(
label="Upload LDR File",
file_types=[".ldr"],
)
gr.Markdown("**— OR —**")
text_input = gr.Textbox(
label="Describe Your Design",
placeholder="e.g., red sports car, blue police car, yellow construction vehicle...",
lines=2
)
upload_btn = gr.Button("Load Input", variant="secondary")
predict_btn = gr.Button("Generate New LDR & Render", variant="primary")
clean_btn = gr.Button("Clean Temporary Files", variant="stop")
status_msg = gr.Textbox(label="Status Info", interactive=False)
gr.Markdown("### Original Part List")
part_list = gr.Textbox(lines=6, label="Part Names", interactive=False)
with gr.Column(scale=2):
gr.Markdown("### Part Preview")
part_renderings = gr.Gallery(
label="Part List Visualization",
columns=[6],
rows=[2],
object_fit="contain",
height="auto"
)
gr.Markdown("### Generated LDR Content")
predicted_ldr = gr.Textbox(lines=8, label="New LDR Format", interactive=False)
gr.Markdown("### Rendering Result")
render_result = gr.Image(label="Part Assembly Visualization", height=300)
ldr_download = gr.File(label="Download New LDR File")
# 事件绑定
upload_btn.click(
fn=unified_input_handler,
inputs=[ldr_file, text_input],
outputs=[part_renderings, part_list, status_msg, original_ldr, predicted_ldr]
)
predict_btn.click(
fn=predict_and_render,
inputs=[original_ldr],
outputs=[predicted_ldr, ldr_download, render_result]
)
clean_btn.click(
fn=clean_temp_files,
inputs=[],
outputs=[status_msg]
)
# 远程服务器启动配置(Hugging Face Spaces 兼容)
if __name__ == "__main__":
import os
# 检测是否在 Hugging Face Spaces 环境
is_hf_space = os.getenv("SPACE_ID") is not None
print("\n" + "="*50)
print("🚀 LEGO 3D建模序列生成系统启动中...")
print("="*50)
# ZeroGPU: Models are loaded lazily (on first use) to avoid CUDA initialization at startup
if CLIP_AVAILABLE:
print("✅ CLIP text-to-design feature enabled (lazy loading)")
print(" Models will be initialized on first use")
else:
print("⚠️ CLIP module not available - text-to-LEGO disabled")
if ZEROGPU_AVAILABLE:
print("✅ ZeroGPU support enabled - GPU allocation on demand")
else:
print("⚠️ Running in standard mode (no ZeroGPU)")
if is_hf_space:
print("🌐 运行环境: Hugging Face Spaces")
# Hugging Face Spaces 会自动处理端口和公开访问
demo.queue()
demo.launch(
show_error=True,
allowed_paths=[os.path.abspath(DEFAULT_PART_RENDER_PATH)]
)
else:
import threading
import time
print("💻 运行环境: 本地服务器")
# 在后台线程中启动,避免阻塞
def launch_gradio():
try:
demo.queue() # 启用队列功能
demo.launch(
server_name="0.0.0.0", # 允许所有IP访问
server_port=8080, # 修改为8080端口避免冲突
share=False, # 关闭公网临时链接
quiet=False, # 显示日志输出便于调试
show_error=True, # 显示错误便于调试
debug=False, # 调试模式
inbrowser=False, # 不自动打开浏览器
prevent_thread_lock=True, # 防止线程锁定
allowed_paths=[
os.path.abspath(DEFAULT_PART_RENDER_PATH) # 转换为绝对路径
]
)
except Exception as e:
print(f"启动时出现警告(可忽略): {e}")
print("服务器已在 http://0.0.0.0:8080 上运行")
# 启动Gradio
thread = threading.Thread(target=launch_gradio, daemon=False)
thread.start()
# 保持主线程运行
print(f"📍 访问地址: http://localhost:8080")
print(f"🔧 Blender: 已安装 (3.6.18)")
print(f"🤖 模型权重: 已加载 (1.6GB)")
print(f"📁 示例文件: examples/ldr_file/")
print("="*50)
print("\n按 Ctrl+C 停止服务器\n")
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
print("\n正在关闭服务器...")
exit(0)
# test_ldr_path = "../data/car_1k/demos/example/ldr_filter_truck_abnormal_rot_expand_trans_mid_final/modified_car_1_rot.ldr"
# mock_file = MockFileStorage(test_ldr_path)
# renderings, part_list, _, ldr_content, _ = process_ldr_file(mock_file)
# # if result:
# # print(f"调试结果:{result}")
# # else:
# # print("调试失败")
# predict_and_render(ldr_content)