Spaces:
Running on Zero
Running on Zero
| import asyncio | |
| import builtins | |
| import contextlib | |
| import gc | |
| import io | |
| import os | |
| import random | |
| import runpy | |
| import shutil | |
| import subprocess | |
| import sys | |
| import time | |
| import traceback | |
| import uuid | |
| import warnings | |
| from pathlib import Path | |
| from typing import Any, Dict, List, Tuple | |
| def _patch_asyncio_invalid_fd_warning() -> None: | |
| """ | |
| Suppress a known CPython/asyncio destructor noise: | |
| ValueError: Invalid file descriptor: -1 | |
| """ | |
| if getattr(asyncio.BaseEventLoop, "_gamemaster_fd_patch", False): | |
| return | |
| original_del = asyncio.BaseEventLoop.__del__ | |
| def _safe_del(self): | |
| try: | |
| original_del(self) | |
| except ValueError as exc: | |
| if "Invalid file descriptor" in str(exc): | |
| return | |
| raise | |
| asyncio.BaseEventLoop.__del__ = _safe_del | |
| asyncio.BaseEventLoop._gamemaster_fd_patch = True | |
| _patch_asyncio_invalid_fd_warning() | |
| import gradio as gr | |
| import numpy as np | |
| import spaces | |
| import torch | |
| import trimesh | |
| from huggingface_hub import hf_hub_download | |
| ROOT = Path(__file__).resolve().parent | |
| TMP_ROOT = ROOT / "tmp_jobs" | |
| TMP_ROOT.mkdir(parents=True, exist_ok=True) | |
| SUPPORTED_EXTS = {".glb", ".gltf", ".obj", ".ply", ".stl"} | |
| DEFAULT_SIMPLIFY_FACES = int(os.environ.get("DEFAULT_SIMPLIFY_FACES", "12000")) | |
| MAX_SIMPLIFY_FACES = int(os.environ.get("MAX_SIMPLIFY_FACES", "50000")) | |
| STEP_TIMEOUT_SEC = int(os.environ.get("STEP_TIMEOUT_SEC", "3600")) | |
| ZERO_GPU_SKELETON_SEC = max(30, min(120, int(os.environ.get("ZERO_GPU_SKELETON_SEC", "90")))) | |
| ZERO_GPU_SKINNING_SEC = max(30, min(120, int(os.environ.get("ZERO_GPU_SKINNING_SEC", "120")))) | |
| CHECKPOINTS = { | |
| "michelangelo_shape_vae": ( | |
| "Maikou/Michelangelo", | |
| "checkpoints/aligned_shape_latents/shapevae-256.ckpt", | |
| ROOT / "skeleton/third_partys/Michelangelo/checkpoints/aligned_shape_latents/shapevae-256.ckpt", | |
| ), | |
| "skeleton_main": ( | |
| "Seed3D/Puppeteer", | |
| "skeleton_ckpts/puppeteer_skeleton_w_diverse_pose.pth", | |
| ROOT / "skeleton/skeleton_ckpts/puppeteer_skeleton_w_diverse_pose.pth", | |
| ), | |
| "skinning_main": ( | |
| "Seed3D/Puppeteer", | |
| "skinning_ckpts/puppeteer_skin_w_diverse_pose_depth1.pth", | |
| ROOT / "skinning/skinning_ckpts/puppeteer_skin_w_diverse_pose_depth1.pth", | |
| ), | |
| "partfield": ( | |
| "mikaelaangel/partfield-ckpt", | |
| "model_objaverse.ckpt", | |
| ROOT / "skinning/third_partys/PartField/ckpt/model_objaverse.ckpt", | |
| ), | |
| } | |
| _NON_FATAL_LOG_PATTERNS = ( | |
| "could not get a list of mounted file-systems", | |
| "Error: Not freed memory blocks:", | |
| "FutureWarning:", | |
| ) | |
| _AXIS_TO_INDEX = {"x": 0, "y": 1, "z": 2} | |
| _INDEX_TO_AXIS = {0: "x", 1: "y", 2: "z"} | |
| STANDARD_HUMANOID_BONES = [ | |
| "Hips", | |
| "Spine", | |
| "Chest", | |
| "Neck", | |
| "Head", | |
| "LeftUpperArm", | |
| "LeftLowerArm", | |
| "LeftHand", | |
| "RightUpperArm", | |
| "RightLowerArm", | |
| "RightHand", | |
| "LeftUpperLeg", | |
| "LeftLowerLeg", | |
| "LeftFoot", | |
| "RightUpperLeg", | |
| "RightLowerLeg", | |
| "RightFoot", | |
| ] | |
| STANDARD_HUMANOID_PARENTS = { | |
| "Spine": "Hips", | |
| "Chest": "Spine", | |
| "Neck": "Chest", | |
| "Head": "Neck", | |
| "LeftUpperArm": "Chest", | |
| "LeftLowerArm": "LeftUpperArm", | |
| "LeftHand": "LeftLowerArm", | |
| "RightUpperArm": "Chest", | |
| "RightLowerArm": "RightUpperArm", | |
| "RightHand": "RightLowerArm", | |
| "LeftUpperLeg": "Hips", | |
| "LeftLowerLeg": "LeftUpperLeg", | |
| "LeftFoot": "LeftLowerLeg", | |
| "RightUpperLeg": "Hips", | |
| "RightLowerLeg": "RightUpperLeg", | |
| "RightFoot": "RightLowerLeg", | |
| } | |
| def _normalize_input_path(input_mesh: Any) -> str: | |
| if isinstance(input_mesh, str): | |
| return input_mesh | |
| if isinstance(input_mesh, dict): | |
| path = input_mesh.get("path") | |
| if path: | |
| return str(path) | |
| path = getattr(input_mesh, "path", None) | |
| if path: | |
| return str(path) | |
| return "" | |
| def _is_non_fatal_log_line(line: str) -> bool: | |
| stripped = line.strip() | |
| if not stripped: | |
| return True | |
| return any(token in stripped for token in _NON_FATAL_LOG_PATTERNS) | |
| def _run_command(cmd: List[str], cwd: Path, logs: List[str], timeout_sec: int = STEP_TIMEOUT_SEC) -> None: | |
| proc = subprocess.run( | |
| cmd, | |
| cwd=str(cwd), | |
| stdout=subprocess.PIPE, | |
| stderr=subprocess.STDOUT, | |
| text=True, | |
| timeout=int(timeout_sec), | |
| check=False, | |
| ) | |
| out = (proc.stdout or "").strip() | |
| if proc.returncode != 0: | |
| raise RuntimeError(f"Command failed ({' '.join(cmd)}).\n{out[-6000:]}") | |
| lines = [line.strip() for line in out.splitlines() if line.strip()] | |
| keep = [line for line in lines if not _is_non_fatal_log_line(line)] | |
| label = Path(cmd[1]).name if len(cmd) > 1 and cmd[0] == sys.executable else Path(cmd[0]).name | |
| if keep: | |
| logs.append(f"{label}: {keep[-1]}") | |
| else: | |
| logs.append(f"{label}: completed") | |
| def _run_script_inprocess(script_path: Path, argv: List[str], cwd: Path) -> str: | |
| """ | |
| Execute a Python script in the current process so ZeroGPU CUDA context remains visible. | |
| """ | |
| old_argv = sys.argv[:] | |
| old_sys_path = sys.path[:] | |
| old_cwd = Path.cwd() | |
| old_print = builtins.print | |
| buf = io.StringIO() | |
| try: | |
| os.chdir(cwd) | |
| script_dir = str(script_path.parent.resolve()) | |
| cwd_dir = str(cwd.resolve()) | |
| for entry in reversed([script_dir, cwd_dir]): | |
| if entry not in sys.path: | |
| sys.path.insert(0, entry) | |
| sys.argv = [str(script_path), *argv] | |
| with contextlib.redirect_stdout(buf), contextlib.redirect_stderr(buf): | |
| runpy.run_path(str(script_path), run_name="__main__") | |
| except SystemExit as exc: | |
| code = exc.code if isinstance(exc.code, int) else 0 | |
| if code not in (0, None): | |
| raise RuntimeError(f"Script exited with code {code}.\n{buf.getvalue()[-6000:]}") | |
| except Exception: | |
| trace = traceback.format_exc() | |
| out = buf.getvalue().strip() | |
| combined = f"{out}\n{trace}".strip() | |
| raise RuntimeError(combined[-6000:]) | |
| finally: | |
| sys.argv = old_argv | |
| sys.path = old_sys_path | |
| builtins.print = old_print | |
| os.chdir(old_cwd) | |
| return buf.getvalue() | |
| def _safe_mesh(mesh: trimesh.Trimesh) -> trimesh.Trimesh: | |
| m = mesh.copy() | |
| with warnings.catch_warnings(): | |
| warnings.filterwarnings("ignore", category=RuntimeWarning, module=r"trimesh\..*") | |
| m.remove_infinite_values() | |
| if len(m.faces) > 0 and len(m.vertices) > 0: | |
| fv = m.vertices[m.faces] | |
| finite_faces = np.isfinite(fv).all(axis=(1, 2)) | |
| edge_a = fv[:, 1] - fv[:, 0] | |
| edge_b = fv[:, 2] - fv[:, 0] | |
| area2 = np.linalg.norm(np.cross(edge_a, edge_b), axis=1) | |
| valid_faces = finite_faces & np.isfinite(area2) & (area2 > 1e-12) | |
| if not np.all(valid_faces): | |
| m.update_faces(valid_faces) | |
| if len(m.faces) > 0: | |
| unique_faces = getattr(m, "unique_faces", None) | |
| if callable(unique_faces): | |
| m.update_faces(unique_faces()) | |
| elif hasattr(m, "remove_duplicate_faces"): | |
| m.remove_duplicate_faces() | |
| m.remove_unreferenced_vertices() | |
| return m | |
| def _geometry_only_mesh(mesh: trimesh.Trimesh) -> trimesh.Trimesh: | |
| m = mesh.copy(include_cache=False) | |
| m.visual = trimesh.visual.ColorVisuals(mesh=m) | |
| return m | |
| def _collect_components(input_path: Path) -> List[trimesh.Trimesh]: | |
| loaded = trimesh.load(str(input_path), force="scene", process=False) | |
| meshes: List[trimesh.Trimesh] = [] | |
| if isinstance(loaded, trimesh.Trimesh): | |
| meshes = [loaded] | |
| elif isinstance(loaded, trimesh.Scene): | |
| for geom in loaded.geometry.values(): | |
| if isinstance(geom, trimesh.Trimesh) and len(geom.faces) > 0 and len(geom.vertices) > 0: | |
| meshes.append(geom) | |
| if not meshes: | |
| raise RuntimeError("Could not extract mesh geometry from input.") | |
| components: List[trimesh.Trimesh] = [] | |
| for mesh in meshes: | |
| safe = _safe_mesh(_geometry_only_mesh(mesh)) | |
| try: | |
| parts = safe.split(only_watertight=False) | |
| except Exception: | |
| parts = [safe] | |
| if len(parts) == 0: | |
| parts = [safe] | |
| for part in parts: | |
| if len(part.faces) > 0 and len(part.vertices) > 0: | |
| components.append(part) | |
| return components | |
| def _compute_floor_axis_score( | |
| components: List[trimesh.Trimesh], | |
| all_vertices: np.ndarray, | |
| bounds_min: np.ndarray, | |
| bounds_max: np.ndarray, | |
| diag: float, | |
| axis_idx: int, | |
| floor_percentile: float, | |
| floor_thickness_ratio: float, | |
| min_component_faces: int, | |
| ) -> float: | |
| horiz_axes = [i for i in (0, 1, 2) if i != axis_idx] | |
| extents = np.maximum(bounds_max - bounds_min, 1e-6) | |
| full_height = float(extents[axis_idx]) | |
| full_footprint = float(np.prod(np.maximum(extents[horiz_axes], 1e-6))) | |
| floor_cut = float(np.percentile(all_vertices[:, axis_idx], floor_percentile)) | |
| floor_tol = float(max(full_height * floor_thickness_ratio, 1e-4)) | |
| score = 0.0 | |
| for comp in components: | |
| if len(comp.faces) < max(16, int(min_component_faces)): | |
| continue | |
| cmin = comp.vertices.min(axis=0) | |
| cmax = comp.vertices.max(axis=0) | |
| cdiag = float(np.linalg.norm(cmax - cmin)) | |
| if cdiag < max(diag * 0.0125, 5e-4): | |
| continue | |
| cheight = float(cmax[axis_idx] - cmin[axis_idx]) | |
| ctop = float(cmax[axis_idx]) | |
| cfoot = float(np.prod(np.maximum(cmax[horiz_axes] - cmin[horiz_axes], 1e-6))) | |
| foot_ratio = cfoot / max(full_footprint, 1e-6) | |
| if ctop > (floor_cut + floor_tol): | |
| continue | |
| if cheight > (full_height * 0.22): | |
| continue | |
| if foot_ratio < 0.05: | |
| continue | |
| band_proximity = 1.0 - min(1.0, max(0.0, ctop - floor_cut) / max(floor_tol, 1e-6)) | |
| thinness = 1.0 - min(1.0, cheight / max(full_height * 0.22, 1e-6)) | |
| score += foot_ratio * (0.65 * band_proximity + 0.35 * thinness) | |
| return float(score) | |
| def _auto_detect_up_axis( | |
| components: List[trimesh.Trimesh], | |
| all_vertices: np.ndarray, | |
| bounds_min: np.ndarray, | |
| bounds_max: np.ndarray, | |
| diag: float, | |
| floor_percentile: float, | |
| floor_thickness_ratio: float, | |
| min_component_faces: int, | |
| ) -> Tuple[str, Dict[str, float]]: | |
| axis_scores: Dict[str, float] = {} | |
| for axis_name, axis_idx in _AXIS_TO_INDEX.items(): | |
| axis_scores[axis_name] = _compute_floor_axis_score( | |
| components=components, | |
| all_vertices=all_vertices, | |
| bounds_min=bounds_min, | |
| bounds_max=bounds_max, | |
| diag=diag, | |
| axis_idx=axis_idx, | |
| floor_percentile=floor_percentile, | |
| floor_thickness_ratio=floor_thickness_ratio, | |
| min_component_faces=min_component_faces, | |
| ) | |
| best_axis = max(axis_scores.items(), key=lambda kv: kv[1])[0] | |
| if axis_scores[best_axis] <= 0.0: | |
| extents = bounds_max - bounds_min | |
| middle_idx = int(np.argsort(extents)[1]) | |
| best_axis = _INDEX_TO_AXIS[middle_idx] | |
| return best_axis, axis_scores | |
| def _preprocess_for_trellis( | |
| input_mesh_path: Path, | |
| cleaned_out_path: Path, | |
| remove_floor: bool, | |
| floor_percentile: float, | |
| floor_thickness_ratio: float, | |
| min_component_faces: int, | |
| ) -> Tuple[dict, str, Dict[str, float]]: | |
| components = _collect_components(input_mesh_path) | |
| if not components: | |
| raise RuntimeError("Input mesh has no valid components.") | |
| all_vertices = np.concatenate([c.vertices for c in components], axis=0) | |
| bounds_min = all_vertices.min(axis=0) | |
| bounds_max = all_vertices.max(axis=0) | |
| extents = np.maximum(bounds_max - bounds_min, 1e-6) | |
| diag = float(np.linalg.norm(extents)) | |
| resolved_up_axis, axis_scores = _auto_detect_up_axis( | |
| components=components, | |
| all_vertices=all_vertices, | |
| bounds_min=bounds_min, | |
| bounds_max=bounds_max, | |
| diag=diag, | |
| floor_percentile=float(floor_percentile), | |
| floor_thickness_ratio=float(floor_thickness_ratio), | |
| min_component_faces=int(min_component_faces), | |
| ) | |
| up_idx = _AXIS_TO_INDEX[resolved_up_axis] | |
| horiz_axes = [i for i in (0, 1, 2) if i != up_idx] | |
| full_height = float(extents[up_idx]) | |
| full_footprint = float(np.prod(np.maximum(extents[horiz_axes], 1e-6))) | |
| floor_cut = float(np.percentile(all_vertices[:, up_idx], floor_percentile)) | |
| floor_tol = float(max(full_height * floor_thickness_ratio, 1e-4)) | |
| kept: List[trimesh.Trimesh] = [] | |
| removed_floor = 0 | |
| removed_tiny = 0 | |
| for comp in components: | |
| cmin = comp.vertices.min(axis=0) | |
| cmax = comp.vertices.max(axis=0) | |
| cdiag = float(np.linalg.norm(cmax - cmin)) | |
| cfaces = len(comp.faces) | |
| if cfaces < int(min_component_faces) or cdiag < max(diag * 0.0125, 5e-4): | |
| removed_tiny += 1 | |
| continue | |
| cheight = float(cmax[up_idx] - cmin[up_idx]) | |
| ctop = float(cmax[up_idx]) | |
| cfoot = float(np.prod(np.maximum(cmax[horiz_axes] - cmin[horiz_axes], 1e-6))) | |
| foot_ratio = cfoot / max(full_footprint, 1e-6) | |
| floor_like = ( | |
| remove_floor | |
| and ctop <= (floor_cut + floor_tol) | |
| and cheight <= (full_height * 0.22) | |
| and foot_ratio >= 0.05 | |
| ) | |
| if floor_like: | |
| removed_floor += 1 | |
| continue | |
| kept.append(comp) | |
| if len(kept) == 0: | |
| kept = [max(components, key=lambda x: len(x.faces))] | |
| cleaned_out_path.parent.mkdir(parents=True, exist_ok=True) | |
| if ( | |
| len(components) == 1 | |
| and len(kept) == 1 | |
| and removed_floor == 0 | |
| and removed_tiny == 0 | |
| ): | |
| shutil.copy2(input_mesh_path, cleaned_out_path) | |
| else: | |
| merged = trimesh.util.concatenate([k.copy() for k in kept]) | |
| merged.export(str(cleaned_out_path), file_type="glb") | |
| stats = { | |
| "before_meshes": int(len(components)), | |
| "after_meshes": int(len(kept)), | |
| "before_faces": int(sum(len(m.faces) for m in components)), | |
| "after_faces": int(sum(len(m.faces) for m in kept)), | |
| "removed_floor_components": int(removed_floor), | |
| "removed_tiny_components": int(removed_tiny), | |
| } | |
| return stats, resolved_up_axis, axis_scores | |
| def _load_single_mesh(input_path: Path) -> trimesh.Trimesh: | |
| loaded = trimesh.load(str(input_path), force="scene", process=False) | |
| if isinstance(loaded, trimesh.Trimesh): | |
| mesh = loaded | |
| elif isinstance(loaded, trimesh.Scene): | |
| try: | |
| mesh = loaded.to_mesh() | |
| except Exception: | |
| geoms = [g for g in loaded.geometry.values() if isinstance(g, trimesh.Trimesh)] | |
| if not geoms: | |
| raise RuntimeError("Scene contains no mesh geometry.") | |
| mesh = trimesh.util.concatenate([g.copy() for g in geoms]) | |
| else: | |
| raise RuntimeError("Unsupported geometry format in uploaded file.") | |
| mesh = _safe_mesh(_geometry_only_mesh(mesh)) | |
| if len(mesh.faces) == 0 or len(mesh.vertices) == 0: | |
| raise RuntimeError("Mesh has no usable geometry after cleanup.") | |
| return mesh | |
| def _convert_to_obj(input_path: Path, out_obj_path: Path) -> Path: | |
| out_obj_path.parent.mkdir(parents=True, exist_ok=True) | |
| if input_path.suffix.lower() == ".obj": | |
| shutil.copy2(input_path, out_obj_path) | |
| return out_obj_path | |
| mesh = _load_single_mesh(input_path) | |
| mesh.export(str(out_obj_path), file_type="obj") | |
| return out_obj_path | |
| def _simplify_obj_mesh(input_obj_path: Path, target_faces: int, output_obj_path: Path, logs: List[str]) -> Path: | |
| if target_faces <= 0: | |
| shutil.copy2(input_obj_path, output_obj_path) | |
| return output_obj_path | |
| mesh = _load_single_mesh(input_obj_path) | |
| original_faces = int(len(mesh.faces)) | |
| if original_faces <= target_faces: | |
| shutil.copy2(input_obj_path, output_obj_path) | |
| return output_obj_path | |
| simplified = mesh | |
| try: | |
| simplified = mesh.simplify_quadric_decimation(face_count=int(target_faces)) | |
| simplified = _safe_mesh(simplified) | |
| if len(simplified.faces) == 0: | |
| simplified = mesh | |
| except Exception: | |
| simplified = mesh | |
| output_obj_path.parent.mkdir(parents=True, exist_ok=True) | |
| simplified.export(str(output_obj_path), file_type="obj") | |
| logs.append(f"Simplified mesh: faces {original_faces}->{len(simplified.faces)}") | |
| return output_obj_path | |
| def _scene_has_texture(input_path: Path) -> bool: | |
| try: | |
| loaded = trimesh.load(str(input_path), force="scene", process=False) | |
| except Exception: | |
| return False | |
| geoms = [loaded] if isinstance(loaded, trimesh.Trimesh) else list(getattr(loaded, "geometry", {}).values()) | |
| for geom in geoms: | |
| visual = getattr(geom, "visual", None) | |
| material = getattr(visual, "material", None) | |
| if getattr(visual, "kind", None) == "texture" and material is not None: | |
| for attr in ("baseColorTexture", "image", "metallicRoughnessTexture", "normalTexture"): | |
| if getattr(material, attr, None) is not None: | |
| return True | |
| return False | |
| def _export_flattened_visual_glb(input_path: Path, output_path: Path) -> Path: | |
| loaded = trimesh.load(str(input_path), force="scene", process=False) | |
| flat_scene = trimesh.Scene() | |
| if isinstance(loaded, trimesh.Trimesh): | |
| flat_scene.add_geometry(loaded.copy(), geom_name="geometry_0", node_name="geometry_0") | |
| elif isinstance(loaded, trimesh.Scene): | |
| index = 0 | |
| for node_name in loaded.graph.nodes_geometry: | |
| transform, geom_name = loaded.graph[node_name] | |
| geom = loaded.geometry.get(geom_name) | |
| if not isinstance(geom, trimesh.Trimesh) or len(geom.vertices) == 0: | |
| continue | |
| geom_copy = geom.copy() | |
| geom_copy.apply_transform(transform) | |
| flat_scene.add_geometry( | |
| geom_copy, | |
| geom_name=f"{geom_name}_{index}", | |
| node_name=f"{node_name}_{index}", | |
| ) | |
| index += 1 | |
| else: | |
| raise RuntimeError("Could not load a textured visual scene for rigged GLB export.") | |
| if not flat_scene.geometry: | |
| raise RuntimeError("Visual scene contains no mesh geometry.") | |
| output_path.parent.mkdir(parents=True, exist_ok=True) | |
| flat_scene.export(str(output_path), file_type="glb") | |
| return output_path | |
| def _read_obj_vertices(obj_path: Path) -> np.ndarray: | |
| vertices: List[List[float]] = [] | |
| with open(obj_path, "r", encoding="utf-8", errors="ignore") as handle: | |
| for line in handle: | |
| if line.startswith("v "): | |
| parts = line.split() | |
| if len(parts) >= 4: | |
| vertices.append([float(parts[1]), float(parts[2]), float(parts[3])]) | |
| if not vertices: | |
| mesh = _load_single_mesh(obj_path) | |
| return np.asarray(mesh.vertices, dtype=np.float32) | |
| return np.asarray(vertices, dtype=np.float32) | |
| def _parse_rig_with_skin(rig_path: Path) -> Tuple[List[str], np.ndarray, Dict[str, str], str, Dict[int, List[Tuple[str, float]]]]: | |
| joint_names: List[str] = [] | |
| joint_pos: Dict[str, List[float]] = {} | |
| parents: Dict[str, str] = {} | |
| root_name = "" | |
| skin: Dict[int, List[Tuple[str, float]]] = {} | |
| with open(rig_path, "r", encoding="utf-8", errors="ignore") as handle: | |
| for line in handle: | |
| word = line.split() | |
| if not word: | |
| continue | |
| if word[0] == "joints" and len(word) >= 5: | |
| name = word[1] | |
| joint_names.append(name) | |
| joint_pos[name] = [float(word[2]), float(word[3]), float(word[4])] | |
| elif word[0] == "root" and len(word) >= 2: | |
| root_name = word[1] | |
| elif word[0] == "hier" and len(word) >= 3: | |
| parents[word[2]] = word[1] | |
| elif word[0] == "skin" and len(word) >= 4: | |
| vertex_index = int(word[1]) | |
| influences: List[Tuple[str, float]] = [] | |
| for i in range(2, len(word) - 1, 2): | |
| try: | |
| influences.append((word[i], float(word[i + 1]))) | |
| except ValueError: | |
| continue | |
| skin[vertex_index] = influences | |
| if not joint_names: | |
| raise RuntimeError("Rig file contains no joints.") | |
| if not root_name or root_name not in joint_pos: | |
| root_name = joint_names[0] | |
| positions = np.asarray([joint_pos[name] for name in joint_names], dtype=np.float32) | |
| return joint_names, positions, parents, root_name, skin | |
| def _unique_bone_name(base: str, used: set[str]) -> str: | |
| if base not in used: | |
| used.add(base) | |
| return base | |
| i = 1 | |
| while f"{base}{i}" in used: | |
| i += 1 | |
| name = f"{base}{i}" | |
| used.add(name) | |
| return name | |
| def _parents_to_indices(joint_names: List[str], parents: Dict[str, str], root_name: str) -> Tuple[np.ndarray, int]: | |
| lookup = {name: i for i, name in enumerate(joint_names)} | |
| parent_indices = np.full(len(joint_names), -1, dtype=np.int32) | |
| for child_name, parent_name in parents.items(): | |
| child_i = lookup.get(child_name) | |
| parent_i = lookup.get(parent_name) | |
| if child_i is not None and parent_i is not None and child_i != parent_i: | |
| parent_indices[child_i] = parent_i | |
| root_idx = lookup.get(root_name) | |
| if root_idx is None: | |
| roots = np.where(parent_indices == -1)[0] | |
| root_idx = int(roots[0]) if len(roots) else 0 | |
| parent_indices[root_idx] = -1 | |
| return parent_indices, int(root_idx) | |
| def _children_from_parents(parent_indices: np.ndarray) -> List[List[int]]: | |
| children: List[List[int]] = [[] for _ in range(len(parent_indices))] | |
| for child, parent in enumerate(parent_indices): | |
| parent_i = int(parent) | |
| if 0 <= parent_i < len(parent_indices): | |
| children[parent_i].append(child) | |
| return children | |
| def _path_to_root(index: int, parent_indices: np.ndarray) -> List[int]: | |
| path = [] | |
| seen = set() | |
| cur = int(index) | |
| while 0 <= cur < len(parent_indices) and cur not in seen: | |
| path.append(cur) | |
| seen.add(cur) | |
| cur = int(parent_indices[cur]) | |
| path.reverse() | |
| return path | |
| def _infer_skeleton_axes(joints: np.ndarray, root_idx: int) -> Tuple[int, int, int, int]: | |
| extents = np.maximum(np.ptp(joints, axis=0), 1e-6) | |
| root = joints[root_idx] | |
| best_axis = int(np.argmax(extents)) | |
| best_score = -1.0 | |
| for axis in range(3): | |
| other = [i for i in range(3) if i != axis] | |
| other_norm = np.sqrt(np.sum(((joints[:, other] - root[other]) / extents[other]) ** 2, axis=1)) | |
| central = np.argsort(other_norm)[: max(3, int(np.ceil(len(joints) * 0.35)))] | |
| score = float(np.max(np.abs(joints[central, axis] - root[axis])) / extents[axis]) | |
| if score > best_score: | |
| best_score = score | |
| best_axis = axis | |
| other = [i for i in range(3) if i != best_axis] | |
| other_norm = np.sqrt(np.sum(((joints[:, other] - root[other]) / extents[other]) ** 2, axis=1)) | |
| central = np.argsort(other_norm)[: max(3, int(np.ceil(len(joints) * 0.35)))] | |
| delta = joints[central, best_axis] - root[best_axis] | |
| up_sign = 1 if float(np.max(delta)) >= abs(float(np.min(delta))) else -1 | |
| remaining = [i for i in range(3) if i != best_axis] | |
| side_axis = max(remaining, key=lambda i: float(extents[i])) | |
| left_side_sign = 1 | |
| return best_axis, up_sign, side_axis, left_side_sign | |
| def _assign_chain_names( | |
| assigned: Dict[int, str], | |
| used: set[str], | |
| chain: List[int], | |
| labels: List[str], | |
| ) -> None: | |
| for index, label in zip(chain, labels): | |
| if index in assigned: | |
| continue | |
| assigned[index] = _unique_bone_name(label, used) | |
| def _pick_nearest_unassigned( | |
| candidates: List[int], | |
| values: np.ndarray, | |
| target: float, | |
| assigned: Dict[int, str], | |
| ) -> int | None: | |
| available = [i for i in candidates if i not in assigned] | |
| if not available: | |
| return None | |
| return min(available, key=lambda i: abs(float(values[i]) - target)) | |
| def _smart_humanoid_name_map( | |
| joint_names: List[str], | |
| joint_positions: np.ndarray, | |
| parents: Dict[str, str], | |
| root_name: str, | |
| ) -> Dict[str, str]: | |
| if len(joint_names) == 0: | |
| return {} | |
| parent_indices, root_idx = _parents_to_indices(joint_names, parents, root_name) | |
| joints = np.asarray(joint_positions, dtype=np.float32) | |
| up_axis, up_sign, side_axis, left_side_sign = _infer_skeleton_axes(joints, root_idx) | |
| up = up_sign * joints[:, up_axis] | |
| side = joints[:, side_axis] - joints[root_idx, side_axis] | |
| height = max(float(np.ptp(up)), 1e-6) | |
| side_extent = max(float(np.ptp(side)), 1e-6) | |
| root_up = float(up[root_idx]) | |
| center_threshold = max(side_extent * 0.22, 1e-5) | |
| children = _children_from_parents(parent_indices) | |
| assigned: Dict[int, str] = {} | |
| used: set[str] = set() | |
| assigned[root_idx] = _unique_bone_name("Hips", used) | |
| center_candidates = [ | |
| i | |
| for i in range(len(joint_names)) | |
| if i != root_idx and abs(float(side[i])) <= center_threshold | |
| ] | |
| above_center = [i for i in center_candidates if float(up[i]) > root_up + height * 0.04] | |
| if above_center: | |
| center_leaves = [i for i in above_center if len(children[i]) == 0] | |
| head_tip = max(center_leaves or above_center, key=lambda i: float(up[i])) | |
| torso_chain = [ | |
| i | |
| for i in _path_to_root(head_tip, parent_indices) | |
| if i != root_idx | |
| and abs(float(side[i])) <= center_threshold * 1.35 | |
| and float(up[i]) > root_up + height * 0.02 | |
| ] | |
| torso_labels = ["Spine", "Chest", "Neck", "Head"] | |
| if len(torso_chain) >= len(torso_labels): | |
| positions = np.linspace(0, len(torso_chain) - 1, num=len(torso_labels)) | |
| torso_indices = [torso_chain[int(round(pos))] for pos in positions] | |
| for index, label in zip(torso_indices, torso_labels): | |
| if index not in assigned: | |
| assigned[index] = _unique_bone_name(label, used) | |
| else: | |
| top = max(float(up[i]) for i in above_center) | |
| torso_span = max(top - root_up, height * 0.25) | |
| for label, frac in [ | |
| ("Spine", 0.25), | |
| ("Chest", 0.50), | |
| ("Neck", 0.78), | |
| ("Head", 1.00), | |
| ]: | |
| picked = _pick_nearest_unassigned(above_center, up, root_up + torso_span * frac, assigned) | |
| if picked is not None: | |
| assigned[picked] = _unique_bone_name(label, used) | |
| def side_indices(sign: int) -> List[int]: | |
| return [ | |
| i | |
| for i in range(len(joint_names)) | |
| if i not in assigned and float(side[i]) * sign > side_extent * 0.06 | |
| ] | |
| def outermost_upper_leaf(sign: int) -> int | None: | |
| candidates = [ | |
| i | |
| for i in range(len(joint_names)) | |
| if float(side[i]) * sign > side_extent * 0.08 | |
| and float(up[i]) > root_up + height * 0.08 | |
| ] | |
| if not candidates: | |
| return None | |
| leaves = [i for i in candidates if len(children[i]) == 0] | |
| pool = leaves or candidates | |
| return max(pool, key=lambda i: (float(side[i]) * sign, float(up[i]) - root_up)) | |
| def lowest_lower_leaf(sign: int) -> int | None: | |
| candidates = [ | |
| i | |
| for i in range(len(joint_names)) | |
| if float(side[i]) * sign > side_extent * 0.04 | |
| and float(up[i]) < root_up + height * 0.10 | |
| ] | |
| if not candidates: | |
| return None | |
| leaves = [i for i in candidates if len(children[i]) == 0] | |
| pool = leaves or candidates | |
| return min(pool, key=lambda i: (float(up[i]), -float(side[i]) * sign)) | |
| for side_name, sign in [("Left", left_side_sign), ("Right", -left_side_sign)]: | |
| arm_chain: List[int] = [] | |
| leaf = outermost_upper_leaf(sign) | |
| if leaf is not None: | |
| arm_chain = [ | |
| i | |
| for i in _path_to_root(leaf, parent_indices) | |
| if i not in assigned | |
| and float(side[i]) * sign > side_extent * 0.04 | |
| and float(up[i]) > root_up - height * 0.04 | |
| ][:3] | |
| if len(arm_chain) < 3: | |
| fallback = sorted( | |
| [ | |
| i | |
| for i in side_indices(sign) | |
| if float(up[i]) > root_up + height * 0.04 | |
| ], | |
| key=lambda i: float(side[i]) * sign, | |
| ) | |
| for index in fallback: | |
| if index not in arm_chain: | |
| arm_chain.append(index) | |
| if len(arm_chain) == 3: | |
| break | |
| _assign_chain_names( | |
| assigned, | |
| used, | |
| arm_chain, | |
| [f"{side_name}UpperArm", f"{side_name}LowerArm", f"{side_name}Hand"], | |
| ) | |
| leg_chain: List[int] = [] | |
| leaf = lowest_lower_leaf(sign) | |
| if leaf is not None: | |
| leg_chain = [ | |
| i | |
| for i in _path_to_root(leaf, parent_indices) | |
| if i not in assigned | |
| and float(side[i]) * sign > side_extent * 0.025 | |
| and float(up[i]) < root_up + height * 0.18 | |
| ][:3] | |
| if len(leg_chain) < 3: | |
| fallback = sorted( | |
| [ | |
| i | |
| for i in side_indices(sign) | |
| if float(up[i]) < root_up + height * 0.18 | |
| ], | |
| key=lambda i: -float(up[i]), | |
| ) | |
| for index in fallback: | |
| if index not in leg_chain: | |
| leg_chain.append(index) | |
| if len(leg_chain) == 3: | |
| break | |
| _assign_chain_names( | |
| assigned, | |
| used, | |
| leg_chain, | |
| [f"{side_name}UpperLeg", f"{side_name}LowerLeg", f"{side_name}Foot"], | |
| ) | |
| extra_counts: Dict[str, int] = {} | |
| for index in range(len(joint_names)): | |
| if index in assigned: | |
| continue | |
| parent_name = assigned.get(int(parent_indices[index])) | |
| side_name = "Left" if float(side[index]) * left_side_sign >= 0 else "Right" | |
| if parent_name in STANDARD_HUMANOID_BONES: | |
| if parent_name.endswith("Hand"): | |
| base = f"{side_name}Finger" | |
| elif parent_name.endswith("Foot"): | |
| base = f"{side_name}Toe" | |
| elif parent_name in STANDARD_HUMANOID_BONES: | |
| base = f"{parent_name}Extra" | |
| elif abs(float(side[index])) <= center_threshold: | |
| if float(up[index]) > root_up + height * 0.70: | |
| base = "HeadExtra" | |
| elif float(up[index]) > root_up + height * 0.45: | |
| base = "ChestExtra" | |
| elif float(up[index]) > root_up + height * 0.12: | |
| base = "SpineExtra" | |
| else: | |
| base = "HipsExtra" | |
| elif float(up[index]) > root_up + height * 0.05: | |
| base = f"{side_name}ArmExtra" | |
| elif float(up[index]) < root_up - height * 0.35: | |
| base = f"{side_name}Toe" | |
| else: | |
| base = f"{side_name}LegExtra" | |
| extra_counts[base] = extra_counts.get(base, 0) + 1 | |
| assigned[index] = _unique_bone_name(f"{base}{extra_counts[base]}", used) | |
| result: Dict[str, str] = {} | |
| for i, old_name in enumerate(joint_names): | |
| result[old_name] = assigned[i] if i in assigned else _unique_bone_name(f"Bone{i}", used) | |
| return result | |
| def _nearest_present_bone( | |
| target_pos: np.ndarray, | |
| candidates: List[str], | |
| name_to_pos: Dict[str, np.ndarray], | |
| ) -> str | None: | |
| present = [name for name in candidates if name in name_to_pos] | |
| if not present: | |
| return None | |
| return min(present, key=lambda name: float(np.linalg.norm(name_to_pos[name] - target_pos))) | |
| def _standard_parent_for(name: str, present: set[str]) -> str | None: | |
| parent = STANDARD_HUMANOID_PARENTS.get(name) | |
| while parent is not None: | |
| if parent in present: | |
| return parent | |
| parent = STANDARD_HUMANOID_PARENTS.get(parent) | |
| return None | |
| def _extra_parent_for( | |
| name: str, | |
| pos: np.ndarray, | |
| name_to_pos: Dict[str, np.ndarray], | |
| root_name: str, | |
| ) -> str | None: | |
| present = set(name_to_pos) | |
| for standard_name in STANDARD_HUMANOID_BONES: | |
| if name.startswith(f"{standard_name}Extra") and standard_name in present: | |
| return standard_name | |
| direct_groups = [ | |
| ("LeftFinger", ["LeftHand", "LeftLowerArm", "LeftUpperArm", "Chest"]), | |
| ("RightFinger", ["RightHand", "RightLowerArm", "RightUpperArm", "Chest"]), | |
| ("LeftToe", ["LeftFoot", "LeftLowerLeg", "LeftUpperLeg", "Hips"]), | |
| ("RightToe", ["RightFoot", "RightLowerLeg", "RightUpperLeg", "Hips"]), | |
| ("LeftArmExtra", ["LeftUpperArm", "LeftLowerArm", "LeftHand", "Chest"]), | |
| ("RightArmExtra", ["RightUpperArm", "RightLowerArm", "RightHand", "Chest"]), | |
| ("LeftLegExtra", ["LeftUpperLeg", "LeftLowerLeg", "LeftFoot", "Hips"]), | |
| ("RightLegExtra", ["RightUpperLeg", "RightLowerLeg", "RightFoot", "Hips"]), | |
| ("HeadExtra", ["Head", "Neck", "Chest"]), | |
| ("NeckExtra", ["Neck", "Chest", "Spine"]), | |
| ("ChestExtra", ["Chest", "Spine", "Hips"]), | |
| ("SpineExtra", ["Spine", "Chest", "Hips"]), | |
| ("HipsExtra", ["Hips", "Spine"]), | |
| ] | |
| for prefix, candidates in direct_groups: | |
| if name.startswith(prefix): | |
| parent = _nearest_present_bone(pos, candidates, name_to_pos) | |
| if parent is not None: | |
| return parent | |
| nearest = _nearest_present_bone(pos, STANDARD_HUMANOID_BONES, name_to_pos) | |
| if nearest is not None: | |
| return nearest | |
| return root_name if root_name in present and name != root_name else None | |
| def _build_clean_humanoid_parents( | |
| joint_names: List[str], | |
| joint_positions: np.ndarray, | |
| root_name: str, | |
| ) -> Dict[str, str]: | |
| present = set(joint_names) | |
| resolved_root = "Hips" if "Hips" in present else root_name | |
| name_to_pos = { | |
| name: np.asarray(pos, dtype=np.float32) | |
| for name, pos in zip(joint_names, joint_positions) | |
| } | |
| clean_parents: Dict[str, str] = {} | |
| for name in joint_names: | |
| if name == resolved_root: | |
| continue | |
| if name in STANDARD_HUMANOID_BONES: | |
| parent = _standard_parent_for(name, present) | |
| if parent is None and resolved_root in present and name != resolved_root: | |
| parent = resolved_root | |
| else: | |
| parent = _extra_parent_for(name, name_to_pos[name], name_to_pos, resolved_root) | |
| if parent is not None and parent != name: | |
| clean_parents[name] = parent | |
| return clean_parents | |
| def _rename_rig_data_for_humanoid( | |
| joint_names: List[str], | |
| joint_positions: np.ndarray, | |
| parents: Dict[str, str], | |
| root_name: str, | |
| skin_map: Dict[int, List[Tuple[str, float]]], | |
| logs: List[str], | |
| ) -> Tuple[List[str], np.ndarray, Dict[str, str], str, Dict[int, List[Tuple[str, float]]]]: | |
| name_map = _smart_humanoid_name_map(joint_names, joint_positions, parents, root_name) | |
| renamed_joint_names = [name_map[name] for name in joint_names] | |
| renamed_root = name_map.get(root_name, renamed_joint_names[0] if renamed_joint_names else "Hips") | |
| if "Hips" in renamed_joint_names: | |
| renamed_root = "Hips" | |
| renamed_parents = _build_clean_humanoid_parents( | |
| joint_names=renamed_joint_names, | |
| joint_positions=joint_positions, | |
| root_name=renamed_root, | |
| ) | |
| renamed_skin: Dict[int, List[Tuple[str, float]]] = {} | |
| for vertex_index, influences in skin_map.items(): | |
| merged: Dict[str, float] = {} | |
| for joint_name, weight in influences: | |
| new_name = name_map.get(joint_name) | |
| if new_name is None: | |
| continue | |
| merged[new_name] = merged.get(new_name, 0.0) + float(weight) | |
| renamed_skin[vertex_index] = list(merged.items()) | |
| present_standard = [name for name in STANDARD_HUMANOID_BONES if name in renamed_joint_names] | |
| logs.append( | |
| "Bone names mapped: " | |
| f"{len(present_standard)}/{len(STANDARD_HUMANOID_BONES)} standard humanoid names, " | |
| f"{len(renamed_joint_names)} total bones." | |
| ) | |
| return renamed_joint_names, joint_positions, renamed_parents, renamed_root, renamed_skin | |
| def _write_rig_with_skin( | |
| rig_path: Path, | |
| joint_names: List[str], | |
| joint_positions: np.ndarray, | |
| parents: Dict[str, str], | |
| root_name: str, | |
| skin_map: Dict[int, List[Tuple[str, float]]], | |
| ) -> None: | |
| with open(rig_path, "w", encoding="utf-8") as handle: | |
| for name, pos in zip(joint_names, joint_positions): | |
| handle.write(f"joints {name} {float(pos[0]):.8f} {float(pos[1]):.8f} {float(pos[2]):.8f}\n") | |
| handle.write(f"root {root_name}\n") | |
| for child in joint_names: | |
| parent = parents.get(child) | |
| if parent: | |
| handle.write(f"hier {parent} {child}\n") | |
| for vertex_index in sorted(skin_map): | |
| influences = skin_map[vertex_index] | |
| if not influences: | |
| continue | |
| total = max(sum(max(0.0, float(weight)) for _, weight in influences), 1e-8) | |
| parts = [f"skin {vertex_index}"] | |
| for name, weight in influences: | |
| value = max(0.0, float(weight)) / total | |
| if value > 1e-6: | |
| parts.append(f"{name} {value:.6f}") | |
| handle.write(" ".join(parts) + "\n") | |
| def _source_skin_matrix( | |
| source_vertices: np.ndarray, | |
| joint_names: List[str], | |
| joint_positions: np.ndarray, | |
| skin_map: Dict[int, List[Tuple[str, float]]], | |
| ) -> np.ndarray: | |
| from scipy.spatial import cKDTree | |
| joint_index = {name: i for i, name in enumerate(joint_names)} | |
| weights = np.zeros((len(source_vertices), len(joint_names)), dtype=np.float32) | |
| for vertex_index, influences in skin_map.items(): | |
| if vertex_index < 0 or vertex_index >= len(weights): | |
| continue | |
| for joint_name, value in influences: | |
| idx = joint_index.get(joint_name) | |
| if idx is not None: | |
| weights[vertex_index, idx] += max(0.0, float(value)) | |
| row_sums = weights.sum(axis=1) | |
| missing = np.where(row_sums <= 1e-8)[0] | |
| if len(missing) > 0: | |
| _, nearest_joint = cKDTree(joint_positions).query(source_vertices[missing]) | |
| weights[missing, nearest_joint] = 1.0 | |
| row_sums = np.maximum(weights.sum(axis=1, keepdims=True), 1e-8) | |
| return weights / row_sums | |
| def _top4_joint_weights(weights: np.ndarray) -> Tuple[np.ndarray, np.ndarray]: | |
| if weights.shape[1] <= 4: | |
| joint_ids = np.zeros((weights.shape[0], 4), dtype=np.uint16) | |
| joint_weights = np.zeros((weights.shape[0], 4), dtype=np.float32) | |
| joint_ids[:, : weights.shape[1]] = np.arange(weights.shape[1], dtype=np.uint16) | |
| joint_weights[:, : weights.shape[1]] = weights | |
| else: | |
| top = np.argpartition(-weights, kth=3, axis=1)[:, :4] | |
| top_values = np.take_along_axis(weights, top, axis=1) | |
| order = np.argsort(-top_values, axis=1) | |
| joint_ids = np.take_along_axis(top, order, axis=1).astype(np.uint16) | |
| joint_weights = np.take_along_axis(top_values, order, axis=1).astype(np.float32) | |
| sums = np.maximum(joint_weights.sum(axis=1, keepdims=True), 1e-8) | |
| joint_weights = joint_weights / sums | |
| return joint_ids, joint_weights.astype(np.float32) | |
| def _gltf_accessor_array(gltf: Any, accessor_index: int) -> np.ndarray: | |
| accessor = gltf.accessors[accessor_index] | |
| buffer_view = gltf.bufferViews[accessor.bufferView] | |
| blob = gltf.binary_blob() | |
| if blob is None: | |
| raise RuntimeError("GLB has no binary buffer.") | |
| component_dtypes = { | |
| 5120: np.int8, | |
| 5121: np.uint8, | |
| 5122: np.dtype("<i2"), | |
| 5123: np.dtype("<u2"), | |
| 5125: np.dtype("<u4"), | |
| 5126: np.dtype("<f4"), | |
| } | |
| type_counts = {"SCALAR": 1, "VEC2": 2, "VEC3": 3, "VEC4": 4, "MAT4": 16} | |
| dtype = component_dtypes[accessor.componentType] | |
| count = type_counts[accessor.type] | |
| offset = (buffer_view.byteOffset or 0) + (accessor.byteOffset or 0) | |
| itemsize = np.dtype(dtype).itemsize | |
| stride = buffer_view.byteStride or (itemsize * count) | |
| if stride == itemsize * count: | |
| arr = np.frombuffer(blob, dtype=dtype, count=accessor.count * count, offset=offset) | |
| return arr.reshape((accessor.count, count)).copy() | |
| return np.ndarray( | |
| shape=(accessor.count, count), | |
| dtype=dtype, | |
| buffer=blob, | |
| offset=offset, | |
| strides=(stride, itemsize), | |
| ).copy() | |
| def _append_gltf_accessor( | |
| gltf: Any, | |
| blob: bytes, | |
| array: np.ndarray, | |
| component_type: int, | |
| accessor_type: str, | |
| target: int | None = None, | |
| ) -> Tuple[int, bytes]: | |
| from pygltflib import Accessor, Buffer, BufferView | |
| if gltf.buffers is None: | |
| gltf.buffers = [] | |
| if not gltf.buffers: | |
| gltf.buffers.append(Buffer(byteLength=0)) | |
| if gltf.bufferViews is None: | |
| gltf.bufferViews = [] | |
| if gltf.accessors is None: | |
| gltf.accessors = [] | |
| payload_array = np.ascontiguousarray(array) | |
| payload = payload_array.tobytes() | |
| padding = (-len(blob)) % 4 | |
| if padding: | |
| blob += b"\x00" * padding | |
| byte_offset = len(blob) | |
| blob += payload | |
| buffer_view_index = len(gltf.bufferViews) | |
| gltf.bufferViews.append( | |
| BufferView( | |
| buffer=0, | |
| byteOffset=byte_offset, | |
| byteLength=len(payload), | |
| target=target, | |
| ) | |
| ) | |
| count = int(payload_array.shape[0]) | |
| accessor = Accessor( | |
| bufferView=buffer_view_index, | |
| byteOffset=0, | |
| componentType=component_type, | |
| count=count, | |
| type=accessor_type, | |
| ) | |
| accessor_index = len(gltf.accessors) | |
| gltf.accessors.append(accessor) | |
| gltf.buffers[0].byteLength = len(blob) | |
| return accessor_index, blob | |
| def _inject_skin_into_glb( | |
| base_glb_path: Path, | |
| output_glb_path: Path, | |
| source_vertices: np.ndarray, | |
| source_weights: np.ndarray, | |
| joint_names: List[str], | |
| joint_positions: np.ndarray, | |
| parents: Dict[str, str], | |
| root_name: str, | |
| ) -> Path: | |
| from pygltflib import GLTF2, Node, Skin | |
| from scipy.spatial import cKDTree | |
| gltf = GLTF2().load_binary(str(base_glb_path)) | |
| blob = gltf.binary_blob() or b"" | |
| if gltf.nodes is None: | |
| gltf.nodes = [] | |
| if gltf.skins is None: | |
| gltf.skins = [] | |
| source_tree = cKDTree(source_vertices) | |
| for mesh in gltf.meshes or []: | |
| for primitive in mesh.primitives or []: | |
| position_accessor = getattr(primitive.attributes, "POSITION", None) | |
| if position_accessor is None: | |
| continue | |
| target_positions = _gltf_accessor_array(gltf, position_accessor).astype(np.float32) | |
| _, nearest = source_tree.query(target_positions) | |
| mapped_weights = source_weights[np.asarray(nearest, dtype=np.int64)] | |
| joints_0, weights_0 = _top4_joint_weights(mapped_weights) | |
| joints_accessor, blob = _append_gltf_accessor( | |
| gltf, | |
| blob, | |
| joints_0.astype(np.uint16), | |
| component_type=5123, | |
| accessor_type="VEC4", | |
| target=34962, | |
| ) | |
| weights_accessor, blob = _append_gltf_accessor( | |
| gltf, | |
| blob, | |
| weights_0.astype(np.float32), | |
| component_type=5126, | |
| accessor_type="VEC4", | |
| target=34962, | |
| ) | |
| primitive.attributes.JOINTS_0 = joints_accessor | |
| primitive.attributes.WEIGHTS_0 = weights_accessor | |
| joint_lookup = {name: i for i, name in enumerate(joint_names)} | |
| joint_node_indices: List[int] = [] | |
| first_joint_node = len(gltf.nodes) | |
| for i, name in enumerate(joint_names): | |
| parent_name = parents.get(name) | |
| local_pos = joint_positions[i].copy() | |
| if parent_name in joint_lookup: | |
| local_pos = local_pos - joint_positions[joint_lookup[parent_name]] | |
| joint_node_indices.append(first_joint_node + i) | |
| gltf.nodes.append( | |
| Node( | |
| name=name, | |
| translation=[float(v) for v in local_pos], | |
| children=[], | |
| ) | |
| ) | |
| root_nodes: List[int] = [] | |
| for name, node_index in zip(joint_names, joint_node_indices): | |
| parent_name = parents.get(name) | |
| if parent_name in joint_lookup: | |
| parent_node = gltf.nodes[joint_node_indices[joint_lookup[parent_name]]] | |
| if parent_node.children is None: | |
| parent_node.children = [] | |
| parent_node.children.append(node_index) | |
| else: | |
| root_nodes.append(node_index) | |
| inverse_bind = [] | |
| for pos in joint_positions: | |
| mat = np.eye(4, dtype=np.float32) | |
| mat[:3, 3] = -pos | |
| inverse_bind.append(mat.T.reshape(16)) | |
| ibm_accessor, blob = _append_gltf_accessor( | |
| gltf, | |
| blob, | |
| np.asarray(inverse_bind, dtype=np.float32), | |
| component_type=5126, | |
| accessor_type="MAT4", | |
| target=None, | |
| ) | |
| skin_index = len(gltf.skins) | |
| skeleton_root = joint_node_indices[joint_lookup.get(root_name, 0)] | |
| gltf.skins.append( | |
| Skin( | |
| inverseBindMatrices=ibm_accessor, | |
| joints=joint_node_indices, | |
| skeleton=skeleton_root, | |
| name="PuppeteerRig", | |
| ) | |
| ) | |
| for node in gltf.nodes: | |
| if node.mesh is not None: | |
| node.skin = skin_index | |
| if gltf.scenes: | |
| scene_index = gltf.scene if gltf.scene is not None else 0 | |
| if gltf.scenes[scene_index].nodes is None: | |
| gltf.scenes[scene_index].nodes = [] | |
| for root_node in root_nodes: | |
| if root_node not in gltf.scenes[scene_index].nodes: | |
| gltf.scenes[scene_index].nodes.append(root_node) | |
| gltf.set_binary_blob(blob) | |
| gltf.buffers[0].byteLength = len(blob) | |
| output_glb_path.parent.mkdir(parents=True, exist_ok=True) | |
| gltf.save_binary(str(output_glb_path)) | |
| return output_glb_path | |
| def _build_textured_rigged_glb( | |
| visual_mesh_path: Path, | |
| source_obj_path: Path, | |
| rig_with_skin_path: Path, | |
| output_glb_path: Path, | |
| logs: List[str], | |
| ) -> Path: | |
| flat_visual_glb = output_glb_path.with_name("visual_textured_flat.glb") | |
| _export_flattened_visual_glb(visual_mesh_path, flat_visual_glb) | |
| joint_names, joint_positions, parents, root_name, skin_map = _parse_rig_with_skin(rig_with_skin_path) | |
| joint_names, joint_positions, parents, root_name, skin_map = _rename_rig_data_for_humanoid( | |
| joint_names=joint_names, | |
| joint_positions=joint_positions, | |
| parents=parents, | |
| root_name=root_name, | |
| skin_map=skin_map, | |
| logs=logs, | |
| ) | |
| _write_rig_with_skin( | |
| rig_path=rig_with_skin_path, | |
| joint_names=joint_names, | |
| joint_positions=joint_positions, | |
| parents=parents, | |
| root_name=root_name, | |
| skin_map=skin_map, | |
| ) | |
| source_vertices = _read_obj_vertices(source_obj_path) | |
| source_weights = _source_skin_matrix(source_vertices, joint_names, joint_positions, skin_map) | |
| _inject_skin_into_glb( | |
| base_glb_path=flat_visual_glb, | |
| output_glb_path=output_glb_path, | |
| source_vertices=source_vertices, | |
| source_weights=source_weights, | |
| joint_names=joint_names, | |
| joint_positions=joint_positions, | |
| parents=parents, | |
| root_name=root_name, | |
| ) | |
| logs.append(f"Built textured skinned GLB: {output_glb_path.name}") | |
| return output_glb_path | |
| def _ensure_checkpoint(repo_id: str, filename: str, local_path: Path, logs: List[str]) -> Path: | |
| if local_path.exists(): | |
| return local_path | |
| local_path.parent.mkdir(parents=True, exist_ok=True) | |
| logs.append(f"Downloading checkpoint: {filename}") | |
| downloaded = hf_hub_download( | |
| repo_id=repo_id, | |
| filename=filename, | |
| local_dir=str(local_path.parent), | |
| ) | |
| downloaded_path = Path(downloaded) | |
| if downloaded_path != local_path and downloaded_path.exists() and not local_path.exists(): | |
| shutil.copy2(downloaded_path, local_path) | |
| if not local_path.exists(): | |
| return downloaded_path | |
| return local_path | |
| def _ensure_checkpoints(logs: List[str]) -> Dict[str, Path]: | |
| resolved: Dict[str, Path] = {} | |
| for key, (repo_id, filename, local_path) in CHECKPOINTS.items(): | |
| resolved[key] = _ensure_checkpoint(repo_id, filename, local_path, logs) | |
| return resolved | |
| def _ensure_skinning_michelangelo_link(logs: List[str]) -> None: | |
| src = ROOT / "skeleton/third_partys/Michelangelo" | |
| dst = ROOT / "skinning/third_partys/Michelangelo" | |
| if dst.exists(): | |
| return | |
| if not src.exists(): | |
| raise RuntimeError("Missing skeleton Michelangelo directory.") | |
| dst.parent.mkdir(parents=True, exist_ok=True) | |
| try: | |
| dst.symlink_to(src, target_is_directory=True) | |
| logs.append("Linked Michelangelo into skinning/third_partys.") | |
| except Exception: | |
| shutil.copytree(src, dst) | |
| logs.append("Copied Michelangelo into skinning/third_partys.") | |
| def _zero_gpu_skin_duration( | |
| input_obj_path: str, | |
| input_skel_folder: str, | |
| save_folder: str, | |
| skin_ckpt_path: str, | |
| target_faces: int, | |
| ) -> int: | |
| if int(target_faces) <= 12000: | |
| return min(90, ZERO_GPU_SKINNING_SEC) | |
| if int(target_faces) <= 24000: | |
| return min(110, ZERO_GPU_SKINNING_SEC) | |
| return ZERO_GPU_SKINNING_SEC | |
| def _run_skeleton_inference_gpu( | |
| input_obj_path: str, | |
| output_root: str, | |
| skeleton_ckpt_path: str, | |
| timeout_sec: int = STEP_TIMEOUT_SEC, | |
| ) -> str: | |
| out = _run_script_inprocess( | |
| script_path=ROOT / "skeleton" / "demo.py", | |
| cwd=ROOT / "skeleton", | |
| argv=[ | |
| "--input_path", | |
| str(input_obj_path), | |
| "--pretrained_weights", | |
| str(skeleton_ckpt_path), | |
| "--output_dir", | |
| str(output_root), | |
| "--save_name", | |
| "skel_results", | |
| "--input_pc_num", | |
| "8192", | |
| "--apply_marching_cubes", | |
| "--joint_token", | |
| "--seq_shuffle", | |
| ], | |
| ) | |
| out_lower = out.lower() | |
| if ( | |
| "no nvidia driver" in out_lower | |
| or "torch not compiled with cuda" in out_lower | |
| or "cuda is not available" in out_lower | |
| or "no cuda gpus are available" in out_lower | |
| ): | |
| raise gr.Error( | |
| "ZeroGPU did not attach a CUDA device for skeleton inference. " | |
| "Please retry in a new run." | |
| ) | |
| return "Skeleton prediction completed." | |
| def _run_skinning_inference_gpu( | |
| input_obj_path: str, | |
| input_skel_folder: str, | |
| save_folder: str, | |
| skin_ckpt_path: str, | |
| target_faces: int, | |
| timeout_sec: int = STEP_TIMEOUT_SEC, | |
| ) -> str: | |
| out = _run_script_inprocess( | |
| script_path=ROOT / "skinning" / "main.py", | |
| cwd=ROOT / "skinning", | |
| argv=[ | |
| "--num_workers", | |
| "0", | |
| "--batch_size", | |
| "1", | |
| "--generate", | |
| "--save_skin_npy", | |
| "--pretrained_weights", | |
| str(skin_ckpt_path), | |
| "--input_skel_folder", | |
| str(input_skel_folder), | |
| "--mesh_folder", | |
| str(Path(input_obj_path).parent), | |
| "--post_filter", | |
| "--depth", | |
| "1", | |
| "--save_folder", | |
| str(save_folder), | |
| ], | |
| ) | |
| out_lower = out.lower() | |
| if ( | |
| "no nvidia driver" in out_lower | |
| or "torch not compiled with cuda" in out_lower | |
| or "cuda is not available" in out_lower | |
| or "no cuda gpus are available" in out_lower | |
| ): | |
| raise gr.Error( | |
| "ZeroGPU did not attach a CUDA device for skinning inference. " | |
| "Please retry in a new run." | |
| ) | |
| return "Skinning prediction completed." | |
| def _pipeline( | |
| input_mesh_path: str, | |
| simplify_target_faces: int, | |
| trellis_cleanup: bool, | |
| remove_floor: bool, | |
| floor_percentile: float, | |
| floor_thickness_ratio: float, | |
| min_component_faces: int, | |
| progress: gr.Progress | None = None, | |
| ) -> Tuple[str, str, List[str], str]: | |
| if not input_mesh_path: | |
| raise gr.Error("Please upload a mesh first.") | |
| in_path = Path(input_mesh_path) | |
| if not in_path.exists(): | |
| raise gr.Error("Uploaded mesh path is unavailable.") | |
| if in_path.suffix.lower() not in SUPPORTED_EXTS: | |
| raise gr.Error(f"Unsupported file type: {in_path.suffix}. Use .glb, .gltf, .obj, .ply or .stl.") | |
| logs: List[str] = [] | |
| try: | |
| if progress is not None: | |
| progress(0.02, desc="Preparing input") | |
| job_dir = TMP_ROOT / f"{int(time.time())}_{uuid.uuid4().hex[:8]}" | |
| job_dir.mkdir(parents=True, exist_ok=True) | |
| staged_input = job_dir / f"input{in_path.suffix.lower()}" | |
| shutil.copy2(in_path, staged_input) | |
| logs.append(f"Input staged: {staged_input.name}") | |
| run_mesh = staged_input | |
| if staged_input.suffix.lower() in {".glb", ".gltf"} and trellis_cleanup: | |
| if progress is not None: | |
| progress(0.08, desc="TRELLIS cleanup (CPU)") | |
| cleaned = job_dir / "input_trellis_clean.glb" | |
| stats, up_axis, axis_scores = _preprocess_for_trellis( | |
| input_mesh_path=staged_input, | |
| cleaned_out_path=cleaned, | |
| remove_floor=bool(remove_floor), | |
| floor_percentile=float(floor_percentile), | |
| floor_thickness_ratio=float(floor_thickness_ratio), | |
| min_component_faces=int(min_component_faces), | |
| ) | |
| run_mesh = cleaned | |
| logs.append( | |
| f"TRELLIS cleanup: up={up_axis} (x={axis_scores['x']:.4f}, y={axis_scores['y']:.4f}, z={axis_scores['z']:.4f}), " | |
| f"meshes {stats['before_meshes']}->{stats['after_meshes']}, " | |
| f"faces {stats['before_faces']}->{stats['after_faces']}, " | |
| f"floor_removed={stats['removed_floor_components']}, tiny_removed={stats['removed_tiny_components']}" | |
| ) | |
| gc.collect() | |
| if progress is not None: | |
| progress(0.16, desc="Converting mesh to OBJ") | |
| mesh_dir = job_dir / "mesh" | |
| mesh_dir.mkdir(parents=True, exist_ok=True) | |
| obj_input = mesh_dir / "input.obj" | |
| _convert_to_obj(run_mesh, obj_input) | |
| if progress is not None: | |
| progress(0.24, desc="Simplifying mesh") | |
| simplified_obj = mesh_dir / "input_simplified.obj" | |
| rig_input_obj = _simplify_obj_mesh(obj_input, int(simplify_target_faces), simplified_obj, logs) | |
| if progress is not None: | |
| progress(0.34, desc="Preparing checkpoints") | |
| ckpts = _ensure_checkpoints(logs) | |
| _ensure_skinning_michelangelo_link(logs) | |
| results_root = job_dir / "results" | |
| results_root.mkdir(parents=True, exist_ok=True) | |
| if progress is not None: | |
| progress(0.46, desc="Skeleton prediction (ZeroGPU)") | |
| logs.append( | |
| _run_skeleton_inference_gpu( | |
| input_obj_path=str(rig_input_obj), | |
| output_root=str(results_root), | |
| skeleton_ckpt_path=str(ckpts["skeleton_main"]), | |
| ) | |
| ) | |
| skel_results = results_root / "skel_results" | |
| pred_rig = skel_results / "input_simplified_pred.txt" | |
| if not pred_rig.exists(): | |
| # fallback in case simplify step skipped and name differs | |
| pred_rig = skel_results / "input_pred.txt" | |
| if not pred_rig.exists(): | |
| raise RuntimeError("Skeleton output rig file not found.") | |
| skeletons_dir = results_root / "skeletons" | |
| skeletons_dir.mkdir(parents=True, exist_ok=True) | |
| skel_for_skin = skeletons_dir / "input_simplified.txt" | |
| if pred_rig.name == "input_pred.txt": | |
| skel_for_skin = skeletons_dir / "input.txt" | |
| shutil.copy2(pred_rig, skel_for_skin) | |
| if progress is not None: | |
| progress(0.66, desc="Skinning prediction (ZeroGPU)") | |
| logs.append( | |
| _run_skinning_inference_gpu( | |
| input_obj_path=str(rig_input_obj), | |
| input_skel_folder=str(skeletons_dir), | |
| save_folder=str(results_root / "skin_results"), | |
| skin_ckpt_path=str(ckpts["skinning_main"]), | |
| target_faces=int(simplify_target_faces), | |
| ) | |
| ) | |
| generated_dir = results_root / "skin_results" / "generate" | |
| rig_with_skin = generated_dir / "input_simplified_skin.txt" | |
| skin_npy = generated_dir / "input_simplified_skin.npy" | |
| if not rig_with_skin.exists(): | |
| rig_with_skin = generated_dir / "input_skin.txt" | |
| skin_npy = generated_dir / "input_skin.npy" | |
| if not rig_with_skin.exists(): | |
| raise RuntimeError("Final rig file with skin weights not found.") | |
| final_rig_dir = results_root / "final_rigging" | |
| final_rig_dir.mkdir(parents=True, exist_ok=True) | |
| final_rig_txt = final_rig_dir / "input.txt" | |
| shutil.copy2(rig_with_skin, final_rig_txt) | |
| if progress is not None: | |
| progress(0.86, desc="Building textured rigged GLB") | |
| visual_source = run_mesh | |
| if staged_input.suffix.lower() in {".glb", ".gltf"} and not _scene_has_texture(run_mesh): | |
| visual_source = staged_input | |
| final_rigged_glb = final_rig_dir / "input_puppeteer_rigged_textured.glb" | |
| _build_textured_rigged_glb( | |
| visual_mesh_path=visual_source, | |
| source_obj_path=rig_input_obj, | |
| rig_with_skin_path=final_rig_txt, | |
| output_glb_path=final_rigged_glb, | |
| logs=logs, | |
| ) | |
| skel_obj = skel_results / "input_simplified_skel.obj" | |
| if not skel_obj.exists(): | |
| skel_obj = skel_results / "input_skel.obj" | |
| artifacts = [ | |
| str(p) | |
| for p in [ | |
| final_rigged_glb, | |
| run_mesh, | |
| obj_input, | |
| rig_input_obj, | |
| skel_obj if skel_obj.exists() else None, | |
| pred_rig, | |
| rig_with_skin, | |
| skin_npy if skin_npy.exists() else None, | |
| final_rig_txt, | |
| ] | |
| if p is not None and Path(p).exists() | |
| ] | |
| preview_model = str(final_rigged_glb) | |
| logs.append("Pipeline complete.") | |
| if progress is not None: | |
| progress(1.0, desc="Done") | |
| return preview_model, str(final_rigged_glb), artifacts, "\n".join(logs) | |
| except gr.Error: | |
| raise | |
| except Exception as exc: | |
| msg = str(exc) | |
| low = msg.lower() | |
| if "quota exceeded" in low or "exceeded your pro gpu quota" in low: | |
| raise gr.Error( | |
| "ZeroGPU quota is exhausted for this account/session. " | |
| "Retry after reset or use an account with available quota." | |
| ) from exc | |
| if "illegal duration" in low or "maximum allowed" in low: | |
| raise gr.Error( | |
| "ZeroGPU rejected the requested GPU runtime duration. " | |
| "The Space uses a capped duration; please refresh and retry." | |
| ) from exc | |
| raise gr.Error(f"Puppeteer rigging failed: {exc}") from exc | |
| def run_pipeline_ui( | |
| input_file: Any, | |
| simplify_target_faces: int, | |
| trellis_cleanup: bool, | |
| remove_floor: bool, | |
| floor_percentile: float, | |
| floor_thickness_ratio: float, | |
| min_component_faces: int, | |
| progress=gr.Progress(track_tqdm=True), | |
| ): | |
| normalized_path = _normalize_input_path(input_file) | |
| return _pipeline( | |
| input_mesh_path=normalized_path, | |
| simplify_target_faces=int(simplify_target_faces), | |
| trellis_cleanup=bool(trellis_cleanup), | |
| remove_floor=bool(remove_floor), | |
| floor_percentile=float(floor_percentile), | |
| floor_thickness_ratio=float(floor_thickness_ratio), | |
| min_component_faces=int(min_component_faces), | |
| progress=progress, | |
| ) | |
| def _build_demo() -> gr.Blocks: | |
| with gr.Blocks(title="GameMaster Puppeteer Rigging") as demo: | |
| gr.Markdown( | |
| "## GameMaster Puppeteer Rigging\n" | |
| "Auto-rig uploaded 3D character meshes using `Seed3D/Puppeteer` (skeleton + skinning)." | |
| ) | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| input_file = gr.Model3D( | |
| label="Input Mesh (.glb/.gltf/.obj/.ply/.stl)", | |
| clear_color=[1.0, 1.0, 1.0, 1.0], | |
| height=520, | |
| ) | |
| simplify_target_faces = gr.Slider( | |
| minimum=4096, | |
| maximum=MAX_SIMPLIFY_FACES, | |
| value=DEFAULT_SIMPLIFY_FACES, | |
| step=512, | |
| label="Simplify Faces (recommended for ZeroGPU)", | |
| ) | |
| trellis_cleanup = gr.Checkbox(value=True, label="TRELLIS Cleanup (component pruning)") | |
| remove_floor = gr.Checkbox(value=True, label="Remove Floor-Like Components") | |
| floor_percentile = gr.Slider(0.1, 5.0, value=1.0, step=0.1, label="Floor Percentile Cut") | |
| floor_thickness_ratio = gr.Slider(0.01, 0.25, value=0.06, step=0.01, label="Floor Thickness Ratio") | |
| min_component_faces = gr.Slider(16, 4096, value=128, step=16, label="Minimum Faces per Component") | |
| run_btn = gr.Button("Run Puppeteer Rigging", variant="primary") | |
| with gr.Column(scale=1): | |
| output_preview = gr.Model3D( | |
| label="Rigged Textured Preview", | |
| clear_color=[1.0, 1.0, 1.0, 1.0], | |
| height=520, | |
| ) | |
| ready_model = gr.File(label="Ready Rigged Textured GLB") | |
| artifacts = gr.File(label="Artifacts", file_count="multiple") | |
| run_logs = gr.Textbox(label="Run Logs", lines=20, max_lines=30) | |
| run_btn.click( | |
| fn=run_pipeline_ui, | |
| inputs=[ | |
| input_file, | |
| simplify_target_faces, | |
| trellis_cleanup, | |
| remove_floor, | |
| floor_percentile, | |
| floor_thickness_ratio, | |
| min_component_faces, | |
| ], | |
| outputs=[output_preview, ready_model, artifacts, run_logs], | |
| api_name="run_pipeline_ui", | |
| ) | |
| return demo | |
| demo = _build_demo() | |
| if __name__ == "__main__": | |
| demo.queue(default_concurrency_limit=1).launch( | |
| server_name="0.0.0.0", | |
| server_port=7860, | |
| ssr_mode=False, | |
| theme=gr.themes.Soft(), | |
| allowed_paths=[str(TMP_ROOT)], | |
| ) | |