xinjie.wang
update
7d1d5eb
# Project EmbodiedGen
#
# Copyright (c) 2025 Horizon Robotics. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied. See the License for the specific language governing
# permissions and limitations under the License.
import logging
import multiprocessing as mp
import os
import coacd
import numpy as np
import trimesh
logger = logging.getLogger(__name__)
__all__ = [
"decompose_convex_coacd",
"decompose_convex_mesh",
"decompose_convex_mp",
]
def decompose_convex_coacd(
filename: str,
outfile: str,
params: dict,
verbose: bool = False,
auto_scale: bool = True,
scale_factor: float = 1.0,
) -> None:
"""Decomposes a mesh using CoACD and saves the result.
This function loads a mesh from a file, runs the CoACD algorithm with the
given parameters, optionally scales the resulting convex hulls to match the
original mesh's bounding box, and exports the combined result to a file.
Args:
filename: Path to the input mesh file.
outfile: Path to save the decomposed output mesh.
params: A dictionary of parameters for the CoACD algorithm.
verbose: If True, sets the CoACD log level to 'info'.
auto_scale: If True, automatically computes a scale factor to match the
decomposed mesh's bounding box to the visual mesh's bounding box.
scale_factor: An additional scaling factor applied to the vertices of
the decomposed mesh parts.
"""
coacd.set_log_level("info" if verbose else "warn")
mesh = trimesh.load(filename, force="mesh")
mesh = coacd.Mesh(mesh.vertices, mesh.faces)
result = coacd.run_coacd(mesh, **params)
meshes = []
for v, f in result:
meshes.append(trimesh.Trimesh(v, f))
# Compute collision_scale because convex decomposition usually makes the mesh larger.
if auto_scale:
all_mesh = sum([trimesh.Trimesh(*m) for m in result])
convex_mesh_shape = np.ptp(all_mesh.vertices, axis=0)
visual_mesh_shape = np.ptp(mesh.vertices, axis=0)
scale_factor *= visual_mesh_shape / convex_mesh_shape
combined = trimesh.Scene()
for mesh_part in meshes:
mesh_part.vertices *= scale_factor
combined.add_geometry(mesh_part)
combined.export(outfile)
def decompose_convex_mesh(
filename: str,
outfile: str,
threshold: float = 0.05,
max_convex_hull: int = -1,
preprocess_mode: str = "auto",
preprocess_resolution: int = 30,
resolution: int = 2000,
mcts_nodes: int = 20,
mcts_iterations: int = 150,
mcts_max_depth: int = 3,
pca: bool = False,
merge: bool = True,
seed: int = 0,
auto_scale: bool = True,
scale_factor: float = 1.005,
verbose: bool = False,
) -> str:
"""Decomposes a mesh into convex parts with retry logic.
This function serves as a wrapper for `decompose_convex_coacd`, providing
explicit parameters for the CoACD algorithm and implementing a retry
mechanism. If the initial decomposition fails, it attempts again with
`preprocess_mode` set to 'on'.
Args:
filename: Path to the input mesh file.
outfile: Path to save the decomposed output mesh.
threshold: CoACD parameter. See CoACD documentation for details.
max_convex_hull: CoACD parameter. See CoACD documentation for details.
preprocess_mode: CoACD parameter. See CoACD documentation for details.
preprocess_resolution: CoACD parameter. See CoACD documentation for details.
resolution: CoACD parameter. See CoACD documentation for details.
mcts_nodes: CoACD parameter. See CoACD documentation for details.
mcts_iterations: CoACD parameter. See CoACD documentation for details.
mcts_max_depth: CoACD parameter. See CoACD documentation for details.
pca: CoACD parameter. See CoACD documentation for details.
merge: CoACD parameter. See CoACD documentation for details.
seed: CoACD parameter. See CoACD documentation for details.
auto_scale: If True, automatically scale the output to match the input
bounding box.
scale_factor: Additional scaling factor to apply.
verbose: If True, enables detailed logging.
Returns:
The path to the output file if decomposition is successful.
Raises:
RuntimeError: If convex decomposition fails after all attempts.
"""
coacd.set_log_level("info" if verbose else "warn")
if os.path.exists(outfile):
logger.warning(f"Output file {outfile} already exists, removing it.")
os.remove(outfile)
params = dict(
threshold=threshold,
max_convex_hull=max_convex_hull,
preprocess_mode=preprocess_mode,
preprocess_resolution=preprocess_resolution,
resolution=resolution,
mcts_nodes=mcts_nodes,
mcts_iterations=mcts_iterations,
mcts_max_depth=mcts_max_depth,
pca=pca,
merge=merge,
seed=seed,
)
try:
decompose_convex_coacd(
filename, outfile, params, verbose, auto_scale, scale_factor
)
if os.path.exists(outfile):
return outfile
except Exception as e:
if verbose:
print(f"Decompose convex first attempt failed: {e}.")
if preprocess_mode != "on":
try:
params["preprocess_mode"] = "on"
decompose_convex_coacd(
filename, outfile, params, verbose, auto_scale, scale_factor
)
if os.path.exists(outfile):
return outfile
except Exception as e:
if verbose:
print(
f"Decompose convex second attempt with preprocess_mode='on' failed: {e}"
)
raise RuntimeError(f"Convex decomposition failed on {filename}")
def decompose_convex_mp(
filename: str,
outfile: str,
threshold: float = 0.05,
max_convex_hull: int = -1,
preprocess_mode: str = "auto",
preprocess_resolution: int = 30,
resolution: int = 2000,
mcts_nodes: int = 20,
mcts_iterations: int = 150,
mcts_max_depth: int = 3,
pca: bool = False,
merge: bool = True,
seed: int = 0,
verbose: bool = False,
auto_scale: bool = True,
) -> str:
"""Decomposes a mesh into convex parts in a separate process.
This function uses the `multiprocessing` module to run the CoACD algorithm
in a spawned subprocess. This is useful for isolating the decomposition
process to prevent potential memory leaks or crashes in the main process.
It includes a retry mechanism similar to `decompose_convex_mesh`.
See https://simulately.wiki/docs/toolkits/ConvexDecomp for details.
Args:
filename: Path to the input mesh file.
outfile: Path to save the decomposed output mesh.
threshold: CoACD parameter.
max_convex_hull: CoACD parameter.
preprocess_mode: CoACD parameter.
preprocess_resolution: CoACD parameter.
resolution: CoACD parameter.
mcts_nodes: CoACD parameter.
mcts_iterations: CoACD parameter.
mcts_max_depth: CoACD parameter.
pca: CoACD parameter.
merge: CoACD parameter.
seed: CoACD parameter.
verbose: If True, enables detailed logging in the subprocess.
auto_scale: If True, automatically scale the output.
Returns:
The path to the output file if decomposition is successful.
Raises:
RuntimeError: If convex decomposition fails after all attempts.
"""
params = dict(
threshold=threshold,
max_convex_hull=max_convex_hull,
preprocess_mode=preprocess_mode,
preprocess_resolution=preprocess_resolution,
resolution=resolution,
mcts_nodes=mcts_nodes,
mcts_iterations=mcts_iterations,
mcts_max_depth=mcts_max_depth,
pca=pca,
merge=merge,
seed=seed,
)
ctx = mp.get_context("spawn")
p = ctx.Process(
target=decompose_convex_coacd,
args=(filename, outfile, params, verbose, auto_scale),
)
p.start()
p.join()
if p.exitcode == 0 and os.path.exists(outfile):
return outfile
if preprocess_mode != "on":
params["preprocess_mode"] = "on"
p = ctx.Process(
target=decompose_convex_coacd,
args=(filename, outfile, params, verbose, auto_scale),
)
p.start()
p.join()
if p.exitcode == 0 and os.path.exists(outfile):
return outfile
raise RuntimeError(f"Convex decomposition failed on {filename}")