Spaces:
Running on Zero
Running on Zero
| from __future__ import annotations | |
| import os | |
| import shutil | |
| import subprocess | |
| import sys | |
| import tempfile | |
| import zipfile | |
| from dataclasses import dataclass | |
| from pathlib import Path | |
| from typing import Callable, Generator | |
| import numpy as np | |
| import requests | |
| import trimesh | |
| from huggingface_hub import snapshot_download | |
| from viewer import point_cloud_viewer_html, load_points_from_mesh_file | |
| MODEL_TEXT3D = "tencent/Hunyuan3D-1" | |
| MODEL_TEXT2IMAGE = "Tencent-Hunyuan/HunyuanDiT-v1.1-Diffusers-Distilled" | |
| MODEL_OMNI = "tencent/Hunyuan3D-Omni" | |
| REPO_TEXT3D = "https://github.com/Tencent-Hunyuan/Hunyuan3D-1.git" | |
| REPO_OMNI = "https://github.com/Tencent-Hunyuan/Hunyuan3D-Omni.git" | |
| REPO_TEXT3D_ZIP = "https://github.com/Tencent-Hunyuan/Hunyuan3D-1/archive/refs/heads/main.zip" | |
| REPO_OMNI_ZIP = "https://github.com/Tencent-Hunyuan/Hunyuan3D-Omni/archive/refs/heads/main.zip" | |
| BASE_CACHE = Path(os.getenv("PB3D_CACHE_ROOT", "/data/pb3d_cache" if Path("/data").exists() else "./pb3d_cache")) | |
| REPOS_DIR = BASE_CACHE / "repos" | |
| MODELS_DIR = BASE_CACHE / "models" | |
| class AiBlueprintSession: | |
| session_dir: str | |
| blueprint_path: str | |
| raw_ai_mesh_path: str | |
| preview_glb_path: str | |
| source_model: str | |
| point_count: int | |
| prompt: str | |
| def to_state(self) -> dict: | |
| return { | |
| "session_dir": self.session_dir, | |
| "blueprint_path": self.blueprint_path, | |
| "raw_ai_mesh_path": self.raw_ai_mesh_path, | |
| "preview_glb_path": self.preview_glb_path, | |
| "source_model": self.source_model, | |
| "point_count": self.point_count, | |
| "prompt": self.prompt, | |
| } | |
| def ensure_cache_home() -> Path: | |
| if Path("/data").exists(): | |
| os.environ.setdefault("HF_HOME", "/data/.huggingface") | |
| BASE_CACHE.mkdir(parents=True, exist_ok=True) | |
| REPOS_DIR.mkdir(parents=True, exist_ok=True) | |
| MODELS_DIR.mkdir(parents=True, exist_ok=True) | |
| return BASE_CACHE | |
| def _download_repo_zip(zip_url: str, dest_root: Path) -> Path: | |
| dest_root.parent.mkdir(parents=True, exist_ok=True) | |
| archive_path = dest_root.parent / f"{dest_root.name}.zip" | |
| resp = requests.get(zip_url, timeout=120) | |
| resp.raise_for_status() | |
| archive_path.write_bytes(resp.content) | |
| with zipfile.ZipFile(archive_path, "r") as zf: | |
| zf.extractall(dest_root.parent) | |
| extracted = next(dest_root.parent.glob(f"{dest_root.name}-*"), None) | |
| if extracted is None: | |
| raise RuntimeError(f"Could not unpack {zip_url}") | |
| if dest_root.exists(): | |
| shutil.rmtree(dest_root) | |
| extracted.rename(dest_root) | |
| return dest_root | |
| def ensure_repo_checkout(name: str, repo_url: str, zip_url: str) -> Path: | |
| ensure_cache_home() | |
| dest = REPOS_DIR / name | |
| if (dest / ".git").exists() or dest.exists(): | |
| return dest | |
| try: | |
| subprocess.run( | |
| ["git", "clone", "--depth", "1", repo_url, str(dest)], | |
| check=True, | |
| capture_output=True, | |
| text=True, | |
| ) | |
| return dest | |
| except Exception: | |
| return _download_repo_zip(zip_url, dest) | |
| def ensure_model_snapshot(repo_id: str, local_dir: Path) -> Path: | |
| local_dir.mkdir(parents=True, exist_ok=True) | |
| snapshot_download( | |
| repo_id=repo_id, | |
| local_dir=str(local_dir), | |
| local_dir_use_symlinks=False, | |
| resume_download=True, | |
| ) | |
| return local_dir | |
| def prepare_hunyuan3d1_assets(progress: Callable[[str], None] | None = None) -> Path: | |
| repo_root = ensure_repo_checkout("Hunyuan3D-1", REPO_TEXT3D, REPO_TEXT3D_ZIP) | |
| weights_root = repo_root / "weights" | |
| if progress: | |
| progress("Pulling Hunyuan3D-1 weights into the Space cache…") | |
| ensure_model_snapshot(MODEL_TEXT3D, weights_root) | |
| if progress: | |
| progress("Pulling HunyuanDiT text-to-image weights into the Space cache…") | |
| ensure_model_snapshot(MODEL_TEXT2IMAGE, weights_root / "hunyuanDiT") | |
| return repo_root | |
| def prepare_omni_assets(progress: Callable[[str], None] | None = None) -> Path: | |
| repo_root = ensure_repo_checkout("Hunyuan3D-Omni", REPO_OMNI, REPO_OMNI_ZIP) | |
| if progress: | |
| progress("Pulling Hunyuan3D-Omni weights into the Space cache…") | |
| ensure_model_snapshot(MODEL_OMNI, MODELS_DIR / "tencent--Hunyuan3D-Omni") | |
| return repo_root | |
| def _find_first_mesh(root: Path) -> Path: | |
| candidates = [] | |
| for ext in ("*.glb", "*.obj", "*.ply", "*.stl", "*.off"): | |
| candidates.extend(root.rglob(ext)) | |
| candidates = sorted(candidates, key=lambda p: (p.suffix != ".glb", len(str(p)))) | |
| if not candidates: | |
| raise FileNotFoundError(f"No mesh artifact found under {root}") | |
| return candidates[0] | |
| def _normalize_to_glb(mesh_path: Path, out_path: Path) -> Path: | |
| asset = trimesh.load(mesh_path, force="mesh") | |
| if isinstance(asset, trimesh.Scene): | |
| meshes = [g for g in asset.geometry.values() if isinstance(g, trimesh.Trimesh)] | |
| mesh = trimesh.util.concatenate(meshes) if meshes else trimesh.creation.box() | |
| elif isinstance(asset, trimesh.Trimesh): | |
| mesh = asset | |
| else: | |
| mesh = trimesh.creation.box() | |
| mesh.remove_unreferenced_vertices() | |
| mesh.apply_translation(-mesh.bounding_box.centroid) | |
| scale = float(max(mesh.extents)) or 1.0 | |
| mesh.apply_scale(1.0 / scale) | |
| mesh.export(out_path) | |
| return out_path | |
| def _points_to_ply(points: np.ndarray, out_path: Path) -> Path: | |
| cloud = trimesh.points.PointCloud(points) | |
| cloud.export(out_path) | |
| return out_path | |
| def _run_command(cmd: list[str], cwd: Path) -> subprocess.CompletedProcess[str]: | |
| return subprocess.run(cmd, cwd=str(cwd), capture_output=True, text=True) | |
| def run_hunyuan3d1_text_to_mesh( | |
| prompt: str, | |
| save_dir: Path, | |
| save_memory: bool = True, | |
| max_faces_num: int = 90000, | |
| ) -> Path: | |
| repo_root = prepare_hunyuan3d1_assets() | |
| save_dir.mkdir(parents=True, exist_ok=True) | |
| cmd = [ | |
| sys.executable, | |
| "main.py", | |
| "--text_prompt", | |
| prompt, | |
| "--save_folder", | |
| str(save_dir), | |
| "--max_faces_num", | |
| str(max_faces_num), | |
| ] | |
| if save_memory: | |
| cmd.append("--save_memory") | |
| result = _run_command(cmd, cwd=repo_root) | |
| if result.returncode != 0: | |
| tail = (result.stderr or result.stdout or "").strip()[-1800:] | |
| raise RuntimeError( | |
| "Hunyuan3D-1 failed. This usually means the Space still needs the repo's heavier CUDA-side dependencies " | |
| f"or more GPU memory.\n\nLast output:\n{tail}" | |
| ) | |
| return _find_first_mesh(save_dir) | |
| def iter_hunyuan_blueprint_session( | |
| prompt: str, | |
| save_memory: bool = True, | |
| max_faces_num: int = 70000, | |
| preview_points: int = 3200, | |
| ) -> Generator[dict, None, dict]: | |
| prompt = (prompt or "").strip() | |
| if not prompt: | |
| raise ValueError("Enter a prompt first.") | |
| session_dir = Path(tempfile.mkdtemp(prefix="pb3d_hunyuan_session_")) | |
| yield {"status": "Preparing Hugging Face cache and model repos…", "session_dir": str(session_dir)} | |
| ensure_cache_home() | |
| yield {"status": f"Queueing {MODEL_TEXT3D} for prompt-driven generation…", "session_dir": str(session_dir)} | |
| raw_mesh = run_hunyuan3d1_text_to_mesh( | |
| prompt=prompt, | |
| save_dir=session_dir / "hunyuan3d1_output", | |
| save_memory=save_memory, | |
| max_faces_num=max_faces_num, | |
| ) | |
| yield {"status": "Sampling the AI mesh into an inspectable particle blueprint…", "session_dir": str(session_dir)} | |
| points = load_points_from_mesh_file(raw_mesh, max_points=preview_points) | |
| blueprint_path = _points_to_ply(points, session_dir / "blueprint_from_ai_mesh.ply") | |
| preview_glb = _normalize_to_glb(raw_mesh, session_dir / "preview_mesh.glb") | |
| chunks = [0.22, 0.45, 0.7, 1.0] | |
| for idx, frac in enumerate(chunks, start=1): | |
| count = max(180, int(len(points) * frac)) | |
| preview = points[:count] | |
| yield { | |
| "status": f"Blueprint readying for inspection ({idx}/{len(chunks)})…", | |
| "viewer_html": point_cloud_viewer_html(preview, status=f"AI blueprint • {count} points"), | |
| "summary": { | |
| "prompt": prompt, | |
| "source_model": MODEL_TEXT3D, | |
| "point_count": int(count), | |
| "stage": idx, | |
| "stage_count": len(chunks), | |
| "raw_ai_mesh_path": str(raw_mesh), | |
| }, | |
| "session_dir": str(session_dir), | |
| } | |
| state = AiBlueprintSession( | |
| session_dir=str(session_dir), | |
| blueprint_path=str(blueprint_path), | |
| raw_ai_mesh_path=str(raw_mesh), | |
| preview_glb_path=str(preview_glb), | |
| source_model=MODEL_TEXT3D, | |
| point_count=int(len(points)), | |
| prompt=prompt, | |
| ).to_state() | |
| yield { | |
| "status": "Blueprint ready. Rotate it on iPhone, then make the mesh when happy.", | |
| "viewer_html": point_cloud_viewer_html(points, status=f"AI blueprint • {len(points)} points"), | |
| "summary": {**state, "mode": "ai_blueprint_from_mesh"}, | |
| "blueprint_path": str(blueprint_path), | |
| "state": state, | |
| "mesh_preview": str(preview_glb), | |
| "session_dir": str(session_dir), | |
| } | |
| return state | |
| def finalize_ai_mesh_session(state: dict, prepare_omni: bool = True) -> Generator[dict, None, dict]: | |
| mesh_path = Path(state["raw_ai_mesh_path"]) | |
| session_dir = Path(state["session_dir"]) | |
| if prepare_omni: | |
| try: | |
| yield {"status": f"Preloading {MODEL_OMNI} for future controllable refinement…"} | |
| prepare_omni_assets() | |
| omni_note = f"{MODEL_OMNI} cached." | |
| except Exception as exc: | |
| omni_note = f"Could not cache {MODEL_OMNI}: {exc}" | |
| else: | |
| omni_note = "Skipped." | |
| yield {"status": "Centering and converting the AI mesh to exportable GLB…"} | |
| glb_path = _normalize_to_glb(mesh_path, session_dir / "final_mesh.glb") | |
| mesh = trimesh.load(glb_path, force="mesh") | |
| if isinstance(mesh, trimesh.Scene): | |
| mesh = trimesh.util.concatenate([g for g in mesh.geometry.values() if isinstance(g, trimesh.Trimesh)]) | |
| summary = { | |
| **state, | |
| "mesh_path": str(glb_path), | |
| "mesh_source": MODEL_TEXT3D, | |
| "omni_cache_note": omni_note, | |
| "vertex_count": int(len(mesh.vertices)) if isinstance(mesh, trimesh.Trimesh) else None, | |
| "face_count": int(len(mesh.faces)) if isinstance(mesh, trimesh.Trimesh) else None, | |
| "note": "This export is the AI mesh produced during the blueprint stage, normalized for download. Hunyuan3D-Omni is preloaded but not yet driving the second-stage refinement command in this build.", | |
| } | |
| yield { | |
| "status": "Mesh ready.", | |
| "mesh_path": str(glb_path), | |
| "summary": summary, | |
| "mesh_file": str(glb_path), | |
| } | |
| return summary | |