3DModelGen / ai_runtime.py
tomiconic's picture
Upload 11 files
77e37fc verified
from __future__ import annotations
import os
import shutil
import subprocess
import sys
import tempfile
import zipfile
from dataclasses import dataclass
from pathlib import Path
from typing import Callable, Generator
import numpy as np
import requests
import trimesh
from huggingface_hub import snapshot_download
from viewer import point_cloud_viewer_html, load_points_from_mesh_file
MODEL_TEXT3D = "tencent/Hunyuan3D-1"
MODEL_TEXT2IMAGE = "Tencent-Hunyuan/HunyuanDiT-v1.1-Diffusers-Distilled"
MODEL_OMNI = "tencent/Hunyuan3D-Omni"
REPO_TEXT3D = "https://github.com/Tencent-Hunyuan/Hunyuan3D-1.git"
REPO_OMNI = "https://github.com/Tencent-Hunyuan/Hunyuan3D-Omni.git"
REPO_TEXT3D_ZIP = "https://github.com/Tencent-Hunyuan/Hunyuan3D-1/archive/refs/heads/main.zip"
REPO_OMNI_ZIP = "https://github.com/Tencent-Hunyuan/Hunyuan3D-Omni/archive/refs/heads/main.zip"
BASE_CACHE = Path(os.getenv("PB3D_CACHE_ROOT", "/data/pb3d_cache" if Path("/data").exists() else "./pb3d_cache"))
REPOS_DIR = BASE_CACHE / "repos"
MODELS_DIR = BASE_CACHE / "models"
@dataclass
class AiBlueprintSession:
session_dir: str
blueprint_path: str
raw_ai_mesh_path: str
preview_glb_path: str
source_model: str
point_count: int
prompt: str
def to_state(self) -> dict:
return {
"session_dir": self.session_dir,
"blueprint_path": self.blueprint_path,
"raw_ai_mesh_path": self.raw_ai_mesh_path,
"preview_glb_path": self.preview_glb_path,
"source_model": self.source_model,
"point_count": self.point_count,
"prompt": self.prompt,
}
def ensure_cache_home() -> Path:
if Path("/data").exists():
os.environ.setdefault("HF_HOME", "/data/.huggingface")
BASE_CACHE.mkdir(parents=True, exist_ok=True)
REPOS_DIR.mkdir(parents=True, exist_ok=True)
MODELS_DIR.mkdir(parents=True, exist_ok=True)
return BASE_CACHE
def _download_repo_zip(zip_url: str, dest_root: Path) -> Path:
dest_root.parent.mkdir(parents=True, exist_ok=True)
archive_path = dest_root.parent / f"{dest_root.name}.zip"
resp = requests.get(zip_url, timeout=120)
resp.raise_for_status()
archive_path.write_bytes(resp.content)
with zipfile.ZipFile(archive_path, "r") as zf:
zf.extractall(dest_root.parent)
extracted = next(dest_root.parent.glob(f"{dest_root.name}-*"), None)
if extracted is None:
raise RuntimeError(f"Could not unpack {zip_url}")
if dest_root.exists():
shutil.rmtree(dest_root)
extracted.rename(dest_root)
return dest_root
def ensure_repo_checkout(name: str, repo_url: str, zip_url: str) -> Path:
ensure_cache_home()
dest = REPOS_DIR / name
if (dest / ".git").exists() or dest.exists():
return dest
try:
subprocess.run(
["git", "clone", "--depth", "1", repo_url, str(dest)],
check=True,
capture_output=True,
text=True,
)
return dest
except Exception:
return _download_repo_zip(zip_url, dest)
def ensure_model_snapshot(repo_id: str, local_dir: Path) -> Path:
local_dir.mkdir(parents=True, exist_ok=True)
snapshot_download(
repo_id=repo_id,
local_dir=str(local_dir),
local_dir_use_symlinks=False,
resume_download=True,
)
return local_dir
def prepare_hunyuan3d1_assets(progress: Callable[[str], None] | None = None) -> Path:
repo_root = ensure_repo_checkout("Hunyuan3D-1", REPO_TEXT3D, REPO_TEXT3D_ZIP)
weights_root = repo_root / "weights"
if progress:
progress("Pulling Hunyuan3D-1 weights into the Space cache…")
ensure_model_snapshot(MODEL_TEXT3D, weights_root)
if progress:
progress("Pulling HunyuanDiT text-to-image weights into the Space cache…")
ensure_model_snapshot(MODEL_TEXT2IMAGE, weights_root / "hunyuanDiT")
return repo_root
def prepare_omni_assets(progress: Callable[[str], None] | None = None) -> Path:
repo_root = ensure_repo_checkout("Hunyuan3D-Omni", REPO_OMNI, REPO_OMNI_ZIP)
if progress:
progress("Pulling Hunyuan3D-Omni weights into the Space cache…")
ensure_model_snapshot(MODEL_OMNI, MODELS_DIR / "tencent--Hunyuan3D-Omni")
return repo_root
def _find_first_mesh(root: Path) -> Path:
candidates = []
for ext in ("*.glb", "*.obj", "*.ply", "*.stl", "*.off"):
candidates.extend(root.rglob(ext))
candidates = sorted(candidates, key=lambda p: (p.suffix != ".glb", len(str(p))))
if not candidates:
raise FileNotFoundError(f"No mesh artifact found under {root}")
return candidates[0]
def _normalize_to_glb(mesh_path: Path, out_path: Path) -> Path:
asset = trimesh.load(mesh_path, force="mesh")
if isinstance(asset, trimesh.Scene):
meshes = [g for g in asset.geometry.values() if isinstance(g, trimesh.Trimesh)]
mesh = trimesh.util.concatenate(meshes) if meshes else trimesh.creation.box()
elif isinstance(asset, trimesh.Trimesh):
mesh = asset
else:
mesh = trimesh.creation.box()
mesh.remove_unreferenced_vertices()
mesh.apply_translation(-mesh.bounding_box.centroid)
scale = float(max(mesh.extents)) or 1.0
mesh.apply_scale(1.0 / scale)
mesh.export(out_path)
return out_path
def _points_to_ply(points: np.ndarray, out_path: Path) -> Path:
cloud = trimesh.points.PointCloud(points)
cloud.export(out_path)
return out_path
def _run_command(cmd: list[str], cwd: Path) -> subprocess.CompletedProcess[str]:
return subprocess.run(cmd, cwd=str(cwd), capture_output=True, text=True)
def run_hunyuan3d1_text_to_mesh(
prompt: str,
save_dir: Path,
save_memory: bool = True,
max_faces_num: int = 90000,
) -> Path:
repo_root = prepare_hunyuan3d1_assets()
save_dir.mkdir(parents=True, exist_ok=True)
cmd = [
sys.executable,
"main.py",
"--text_prompt",
prompt,
"--save_folder",
str(save_dir),
"--max_faces_num",
str(max_faces_num),
]
if save_memory:
cmd.append("--save_memory")
result = _run_command(cmd, cwd=repo_root)
if result.returncode != 0:
tail = (result.stderr or result.stdout or "").strip()[-1800:]
raise RuntimeError(
"Hunyuan3D-1 failed. This usually means the Space still needs the repo's heavier CUDA-side dependencies "
f"or more GPU memory.\n\nLast output:\n{tail}"
)
return _find_first_mesh(save_dir)
def iter_hunyuan_blueprint_session(
prompt: str,
save_memory: bool = True,
max_faces_num: int = 70000,
preview_points: int = 3200,
) -> Generator[dict, None, dict]:
prompt = (prompt or "").strip()
if not prompt:
raise ValueError("Enter a prompt first.")
session_dir = Path(tempfile.mkdtemp(prefix="pb3d_hunyuan_session_"))
yield {"status": "Preparing Hugging Face cache and model repos…", "session_dir": str(session_dir)}
ensure_cache_home()
yield {"status": f"Queueing {MODEL_TEXT3D} for prompt-driven generation…", "session_dir": str(session_dir)}
raw_mesh = run_hunyuan3d1_text_to_mesh(
prompt=prompt,
save_dir=session_dir / "hunyuan3d1_output",
save_memory=save_memory,
max_faces_num=max_faces_num,
)
yield {"status": "Sampling the AI mesh into an inspectable particle blueprint…", "session_dir": str(session_dir)}
points = load_points_from_mesh_file(raw_mesh, max_points=preview_points)
blueprint_path = _points_to_ply(points, session_dir / "blueprint_from_ai_mesh.ply")
preview_glb = _normalize_to_glb(raw_mesh, session_dir / "preview_mesh.glb")
chunks = [0.22, 0.45, 0.7, 1.0]
for idx, frac in enumerate(chunks, start=1):
count = max(180, int(len(points) * frac))
preview = points[:count]
yield {
"status": f"Blueprint readying for inspection ({idx}/{len(chunks)})…",
"viewer_html": point_cloud_viewer_html(preview, status=f"AI blueprint • {count} points"),
"summary": {
"prompt": prompt,
"source_model": MODEL_TEXT3D,
"point_count": int(count),
"stage": idx,
"stage_count": len(chunks),
"raw_ai_mesh_path": str(raw_mesh),
},
"session_dir": str(session_dir),
}
state = AiBlueprintSession(
session_dir=str(session_dir),
blueprint_path=str(blueprint_path),
raw_ai_mesh_path=str(raw_mesh),
preview_glb_path=str(preview_glb),
source_model=MODEL_TEXT3D,
point_count=int(len(points)),
prompt=prompt,
).to_state()
yield {
"status": "Blueprint ready. Rotate it on iPhone, then make the mesh when happy.",
"viewer_html": point_cloud_viewer_html(points, status=f"AI blueprint • {len(points)} points"),
"summary": {**state, "mode": "ai_blueprint_from_mesh"},
"blueprint_path": str(blueprint_path),
"state": state,
"mesh_preview": str(preview_glb),
"session_dir": str(session_dir),
}
return state
def finalize_ai_mesh_session(state: dict, prepare_omni: bool = True) -> Generator[dict, None, dict]:
mesh_path = Path(state["raw_ai_mesh_path"])
session_dir = Path(state["session_dir"])
if prepare_omni:
try:
yield {"status": f"Preloading {MODEL_OMNI} for future controllable refinement…"}
prepare_omni_assets()
omni_note = f"{MODEL_OMNI} cached."
except Exception as exc:
omni_note = f"Could not cache {MODEL_OMNI}: {exc}"
else:
omni_note = "Skipped."
yield {"status": "Centering and converting the AI mesh to exportable GLB…"}
glb_path = _normalize_to_glb(mesh_path, session_dir / "final_mesh.glb")
mesh = trimesh.load(glb_path, force="mesh")
if isinstance(mesh, trimesh.Scene):
mesh = trimesh.util.concatenate([g for g in mesh.geometry.values() if isinstance(g, trimesh.Trimesh)])
summary = {
**state,
"mesh_path": str(glb_path),
"mesh_source": MODEL_TEXT3D,
"omni_cache_note": omni_note,
"vertex_count": int(len(mesh.vertices)) if isinstance(mesh, trimesh.Trimesh) else None,
"face_count": int(len(mesh.faces)) if isinstance(mesh, trimesh.Trimesh) else None,
"note": "This export is the AI mesh produced during the blueprint stage, normalized for download. Hunyuan3D-Omni is preloaded but not yet driving the second-stage refinement command in this build.",
}
yield {
"status": "Mesh ready.",
"mesh_path": str(glb_path),
"summary": summary,
"mesh_file": str(glb_path),
}
return summary