xinjie.wang
update
7d1d5eb
from __future__ import annotations
import logging
import os
import xml.etree.ElementTree as ET
from abc import ABC, abstractmethod
from glob import glob
from shutil import copy, copytree, rmtree
import trimesh
from scipy.spatial.transform import Rotation
from embodied_gen.utils.enum import AssetType
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
__all__ = [
"AssetConverterFactory",
"MeshtoMJCFConverter",
"MeshtoUSDConverter",
"URDFtoUSDConverter",
"cvt_embodiedgen_asset_to_anysim",
"PhysicsUSDAdder",
]
def cvt_embodiedgen_asset_to_anysim(
urdf_files: list[str],
target_dirs: list[str],
target_type: AssetType,
source_type: AssetType,
overwrite: bool = False,
**kwargs,
) -> dict[str, str]:
"""Convert URDF files generated by EmbodiedGen into formats required by simulators.
Supported simulators include SAPIEN, Isaac Sim, MuJoCo, Isaac Gym, Genesis, and Pybullet.
Converting to the `USD` format requires `isaacsim` to be installed.
Example:
```py
from embodied_gen.data.asset_converter import cvt_embodiedgen_asset_to_anysim
from embodied_gen.utils.enum import AssetType
dst_asset_path = cvt_embodiedgen_asset_to_anysim(
urdf_files=[
"path1_to_embodiedgen_asset/asset.urdf",
"path2_to_embodiedgen_asset/asset.urdf",
],
target_dirs=[
"path1_to_target_dir/asset.usd",
"path2_to_target_dir/asset.usd",
],
target_type=AssetType.USD,
source_type=AssetType.MESH,
)
```
Args:
urdf_files (list[str]): List of URDF file paths.
target_dirs (list[str]): List of target directories.
target_type (AssetType): Target asset type.
source_type (AssetType): Source asset type.
overwrite (bool, optional): Overwrite existing files.
**kwargs: Additional converter arguments.
Returns:
dict[str, str]: Mapping from URDF file to converted asset file.
"""
if isinstance(urdf_files, str):
urdf_files = [urdf_files]
if isinstance(target_dirs, str):
urdf_files = [target_dirs]
# If the target type is URDF, no conversion is needed.
if target_type == AssetType.URDF:
return {key: key for key in urdf_files}
asset_converter = AssetConverterFactory.create(
target_type=target_type,
source_type=source_type,
**kwargs,
)
asset_paths = dict()
with asset_converter:
for urdf_file, target_dir in zip(urdf_files, target_dirs):
filename = os.path.basename(urdf_file).replace(".urdf", "")
if target_type == AssetType.MJCF:
target_file = f"{target_dir}/{filename}.xml"
elif target_type == AssetType.USD:
target_file = f"{target_dir}/{filename}.usd"
else:
raise NotImplementedError(
f"Target type {target_type} not supported."
)
if not os.path.exists(target_file) or overwrite:
asset_converter.convert(urdf_file, target_file)
asset_paths[urdf_file] = target_file
return asset_paths
class AssetConverterBase(ABC):
"""Abstract base class for asset converters.
Provides context management and mesh transformation utilities.
"""
@abstractmethod
def convert(self, urdf_path: str, output_path: str, **kwargs) -> str:
"""Convert an asset file.
Args:
urdf_path (str): Path to input URDF file.
output_path (str): Path to output file.
**kwargs: Additional arguments.
Returns:
str: Path to converted asset.
"""
pass
def transform_mesh(
self, input_mesh: str, output_mesh: str, mesh_origin: ET.Element
) -> None:
"""Apply transform to mesh based on URDF origin element.
Args:
input_mesh (str): Path to input mesh.
output_mesh (str): Path to output mesh.
mesh_origin (ET.Element): Origin element from URDF.
"""
mesh = trimesh.load(input_mesh, group_material=False)
rpy = list(map(float, mesh_origin.get("rpy").split(" ")))
rotation = Rotation.from_euler("xyz", rpy, degrees=False)
offset = list(map(float, mesh_origin.get("xyz").split(" ")))
os.makedirs(os.path.dirname(output_mesh), exist_ok=True)
if isinstance(mesh, trimesh.Scene):
combined = trimesh.Scene()
for mesh_part in mesh.geometry.values():
mesh_part.vertices = (
mesh_part.vertices @ rotation.as_matrix().T
) + offset
combined.add_geometry(mesh_part)
_ = combined.export(output_mesh)
else:
mesh.vertices = (mesh.vertices @ rotation.as_matrix().T) + offset
_ = mesh.export(output_mesh)
return
def __enter__(self):
"""Context manager entry."""
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager exit."""
return False
class MeshtoMJCFConverter(AssetConverterBase):
"""Converts mesh-based URDF files to MJCF format.
Handles geometry, materials, and asset copying.
"""
def __init__(
self,
**kwargs,
) -> None:
self.kwargs = kwargs
def _copy_asset_file(self, src: str, dst: str) -> None:
"""Copies asset file if not already present.
Args:
src (str): Source file path.
dst (str): Destination file path.
"""
if os.path.exists(dst):
return
os.makedirs(os.path.dirname(dst), exist_ok=True)
copy(src, dst)
def add_geometry(
self,
mujoco_element: ET.Element,
link: ET.Element,
body: ET.Element,
tag: str,
input_dir: str,
output_dir: str,
mesh_name: str,
material: ET.Element | None = None,
is_collision: bool = False,
) -> None:
"""Adds geometry to MJCF body from URDF link.
Args:
mujoco_element (ET.Element): MJCF asset element.
link (ET.Element): URDF link element.
body (ET.Element): MJCF body element.
tag (str): Tag name ("visual" or "collision").
input_dir (str): Input directory.
output_dir (str): Output directory.
mesh_name (str): Mesh name.
material (ET.Element, optional): Material element.
is_collision (bool, optional): If True, treat as collision geometry.
"""
element = link.find(tag)
geometry = element.find("geometry")
mesh = geometry.find("mesh")
filename = mesh.get("filename")
scale = mesh.get("scale", "1.0 1.0 1.0")
input_mesh = f"{input_dir}/{filename}"
output_mesh = f"{output_dir}/{filename}"
self._copy_asset_file(input_mesh, output_mesh)
mesh_origin = element.find("origin")
if mesh_origin is not None:
self.transform_mesh(input_mesh, output_mesh, mesh_origin)
if is_collision:
mesh_parts = trimesh.load(
output_mesh, group_material=False, force="scene"
)
mesh_parts = mesh_parts.geometry.values()
else:
mesh_parts = [trimesh.load(output_mesh, force="mesh")]
for idx, mesh_part in enumerate(mesh_parts):
if is_collision:
idx_mesh_name = f"{mesh_name}_{idx}"
base, ext = os.path.splitext(filename)
idx_filename = f"{base}_{idx}{ext}"
base_outdir = os.path.dirname(output_mesh)
mesh_part.export(os.path.join(base_outdir, '..', idx_filename))
geom_attrs = {
"contype": "1",
"conaffinity": "1",
"rgba": "1 1 1 0",
}
else:
idx_mesh_name, idx_filename = mesh_name, filename
geom_attrs = {"contype": "0", "conaffinity": "0"}
ET.SubElement(
mujoco_element,
"mesh",
name=idx_mesh_name,
file=idx_filename,
scale=scale,
)
geom = ET.SubElement(body, "geom", type="mesh", mesh=idx_mesh_name)
geom.attrib.update(geom_attrs)
if material is not None:
geom.set("material", material.get("name"))
def add_materials(
self,
mujoco_element: ET.Element,
link: ET.Element,
tag: str,
input_dir: str,
output_dir: str,
name: str,
reflectance: float = 0.2,
) -> ET.Element:
"""Adds materials to MJCF asset from URDF link.
Args:
mujoco_element (ET.Element): MJCF asset element.
link (ET.Element): URDF link element.
tag (str): Tag name.
input_dir (str): Input directory.
output_dir (str): Output directory.
name (str): Material name.
reflectance (float, optional): Reflectance value.
Returns:
ET.Element: Material element.
"""
element = link.find(tag)
geometry = element.find("geometry")
mesh = geometry.find("mesh")
filename = mesh.get("filename")
dirname = os.path.dirname(filename)
material = None
for path in glob(f"{input_dir}/{dirname}/*.png"):
file_name = os.path.basename(path)
if "keep_materials" in self.kwargs:
find_flag = False
for keep_key in self.kwargs["keep_materials"]:
if keep_key in file_name.lower():
find_flag = True
if find_flag is False:
continue
self._copy_asset_file(
path,
f"{output_dir}/{dirname}/{file_name}",
)
texture_name = f"texture_{name}_{os.path.splitext(file_name)[0]}"
material = ET.SubElement(
mujoco_element,
"material",
name=f"material_{name}",
texture=texture_name,
reflectance=str(reflectance),
)
ET.SubElement(
mujoco_element,
"texture",
name=texture_name,
type="2d",
file=f"{dirname}/{file_name}",
)
return material
def convert(self, urdf_path: str, mjcf_path: str):
"""Converts a URDF file to MJCF format.
Args:
urdf_path (str): Path to URDF file.
mjcf_path (str): Path to output MJCF file.
"""
tree = ET.parse(urdf_path)
root = tree.getroot()
mujoco_struct = ET.Element("mujoco")
mujoco_struct.set("model", root.get("name"))
mujoco_asset = ET.SubElement(mujoco_struct, "asset")
mujoco_worldbody = ET.SubElement(mujoco_struct, "worldbody")
input_dir = os.path.dirname(urdf_path)
output_dir = os.path.dirname(mjcf_path)
os.makedirs(output_dir, exist_ok=True)
for idx, link in enumerate(root.findall("link")):
link_name = link.get("name", "unnamed_link")
body = ET.SubElement(mujoco_worldbody, "body", name=link_name)
material = self.add_materials(
mujoco_asset,
link,
"visual",
input_dir,
output_dir,
name=str(idx),
)
joint = ET.SubElement(body, "joint", attrib={"type": "free"})
self.add_geometry(
mujoco_asset,
link,
body,
"visual",
input_dir,
output_dir,
f"visual_mesh_{idx}",
material,
)
self.add_geometry(
mujoco_asset,
link,
body,
"collision",
input_dir,
output_dir,
f"collision_mesh_{idx}",
is_collision=True,
)
tree = ET.ElementTree(mujoco_struct)
ET.indent(tree, space=" ", level=0)
tree.write(mjcf_path, encoding="utf-8", xml_declaration=True)
logger.info(f"Successfully converted {urdf_path}{mjcf_path}")
class URDFtoMJCFConverter(MeshtoMJCFConverter):
"""Converts URDF files with joints to MJCF format, handling joint transformations.
Handles fixed joints and hierarchical body structure.
"""
def convert(self, urdf_path: str, mjcf_path: str, **kwargs) -> str:
"""Converts a URDF file with joints to MJCF format.
Args:
urdf_path (str): Path to URDF file.
mjcf_path (str): Path to output MJCF file.
**kwargs: Additional arguments.
Returns:
str: Path to converted MJCF file.
"""
tree = ET.parse(urdf_path)
root = tree.getroot()
mujoco_struct = ET.Element("mujoco")
mujoco_struct.set("model", root.get("name"))
mujoco_asset = ET.SubElement(mujoco_struct, "asset")
mujoco_worldbody = ET.SubElement(mujoco_struct, "worldbody")
input_dir = os.path.dirname(urdf_path)
output_dir = os.path.dirname(mjcf_path)
os.makedirs(output_dir, exist_ok=True)
body_dict = {}
for idx, link in enumerate(root.findall("link")):
link_name = link.get("name", f"unnamed_link_{idx}")
body = ET.SubElement(mujoco_worldbody, "body", name=link_name)
body_dict[link_name] = body
if link.find("visual") is not None:
material = self.add_materials(
mujoco_asset,
link,
"visual",
input_dir,
output_dir,
name=str(idx),
)
self.add_geometry(
mujoco_asset,
link,
body,
"visual",
input_dir,
output_dir,
f"visual_mesh_{idx}",
material,
)
if link.find("collision") is not None:
self.add_geometry(
mujoco_asset,
link,
body,
"collision",
input_dir,
output_dir,
f"collision_mesh_{idx}",
is_collision=True,
)
# Process joints to set transformations and hierarchy
for joint in root.findall("joint"):
joint_type = joint.get("type")
if joint_type != "fixed":
logger.warning("Only support fixed joints in conversion now.")
continue
parent_link = joint.find("parent").get("link")
child_link = joint.find("child").get("link")
origin = joint.find("origin")
if parent_link not in body_dict or child_link not in body_dict:
logger.warning(
f"Parent or child link not found for joint: {joint.get('name')}"
)
continue
child_body = body_dict[child_link]
mujoco_worldbody.remove(child_body)
parent_body = body_dict[parent_link]
parent_body.append(child_body)
if origin is not None:
xyz = origin.get("xyz", "0 0 0")
rpy = origin.get("rpy", "0 0 0")
child_body.set("pos", xyz)
child_body.set("euler", rpy)
tree = ET.ElementTree(mujoco_struct)
ET.indent(tree, space=" ", level=0)
tree.write(mjcf_path, encoding="utf-8", xml_declaration=True)
logger.info(f"Successfully converted {urdf_path}{mjcf_path}")
return mjcf_path
class MeshtoUSDConverter(AssetConverterBase):
"""Converts mesh-based URDF files to USD format.
Adds physics APIs and post-processes collision meshes.
"""
DEFAULT_BIND_APIS = [
"MaterialBindingAPI",
"PhysicsMeshCollisionAPI",
"PhysxConvexDecompositionCollisionAPI",
"PhysicsCollisionAPI",
"PhysxCollisionAPI",
"PhysicsMassAPI",
"PhysicsRigidBodyAPI",
"PhysxRigidBodyAPI",
]
def __init__(
self,
force_usd_conversion: bool = True,
make_instanceable: bool = False,
simulation_app=None,
**kwargs,
):
"""Initializes the converter.
Args:
force_usd_conversion (bool, optional): Force USD conversion.
make_instanceable (bool, optional): Make prims instanceable.
simulation_app (optional): Simulation app instance.
**kwargs: Additional arguments.
"""
if simulation_app is not None:
self.simulation_app = simulation_app
self.exit_close = kwargs.pop("exit_close", True)
self.physx_max_convex_hulls = kwargs.pop("physx_max_convex_hulls", 32)
self.physx_max_vertices = kwargs.pop("physx_max_vertices", 16)
self.physx_max_voxel_res = kwargs.pop("physx_max_voxel_res", 10000)
self.usd_parms = dict(
force_usd_conversion=force_usd_conversion,
make_instanceable=make_instanceable,
**kwargs,
)
def __enter__(self):
"""Context manager entry, launches simulation app if needed."""
from isaaclab.app import AppLauncher
if not hasattr(self, "simulation_app"):
if "launch_args" not in self.usd_parms:
launch_args = dict(
headless=True,
no_splash=True,
fast_shutdown=True,
disable_gpu=True,
)
else:
launch_args = self.usd_parms.pop("launch_args")
self.app_launcher = AppLauncher(launch_args)
self.simulation_app = self.app_launcher.app
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager exit, closes simulation app if created."""
# Close the simulation app if it was created here
if exc_val is not None:
logger.error(f"Exception occurred: {exc_val}.")
if hasattr(self, "app_launcher") and self.exit_close:
self.simulation_app.close()
return False
def convert(self, urdf_path: str, output_file: str):
"""Converts a URDF file to USD and post-processes collision meshes.
Args:
urdf_path (str): Path to URDF file.
output_file (str): Path to output USD file.
"""
from isaaclab.sim.converters import MeshConverter, MeshConverterCfg
from pxr import PhysxSchema, Sdf, Usd, UsdShade
tree = ET.parse(urdf_path)
root = tree.getroot()
mesh_file = root.find("link/visual/geometry/mesh").get("filename")
input_mesh = os.path.join(os.path.dirname(urdf_path), mesh_file)
output_dir = os.path.abspath(os.path.dirname(output_file))
output_mesh = f"{output_dir}/mesh/{os.path.basename(mesh_file)}"
mesh_origin = root.find("link/visual/origin")
if mesh_origin is not None:
self.transform_mesh(input_mesh, output_mesh, mesh_origin)
cfg = MeshConverterCfg(
asset_path=output_mesh,
usd_dir=output_dir,
usd_file_name=os.path.basename(output_file),
**self.usd_parms,
)
urdf_converter = MeshConverter(cfg)
usd_path = urdf_converter.usd_path
rmtree(os.path.dirname(output_mesh))
stage = Usd.Stage.Open(usd_path)
layer = stage.GetRootLayer()
with Usd.EditContext(stage, layer):
base_prim = stage.GetPseudoRoot().GetChildren()[0]
base_prim.SetMetadata("kind", "component")
for prim in stage.Traverse():
# Change texture path to relative path.
if prim.GetName() == "material_0":
shader = UsdShade.Shader(prim).GetInput("diffuse_texture")
if shader.Get() is not None:
relative_path = shader.Get().path.replace(
f"{output_dir}/", ""
)
shader.Set(Sdf.AssetPath(relative_path))
# Add convex decomposition collision and set ShrinkWrap.
elif prim.GetName() == "mesh":
approx_attr = prim.CreateAttribute(
"physics:approximation", Sdf.ValueTypeNames.Token
)
approx_attr.Set("convexDecomposition")
physx_conv_api = (
PhysxSchema.PhysxConvexDecompositionCollisionAPI.Apply(
prim
)
)
physx_conv_api.GetMaxConvexHullsAttr().Set(
self.physx_max_convex_hulls
)
physx_conv_api.GetHullVertexLimitAttr().Set(
self.physx_max_vertices
)
physx_conv_api.GetVoxelResolutionAttr().Set(
self.physx_max_voxel_res
)
physx_conv_api.GetShrinkWrapAttr().Set(True)
api_schemas = prim.GetMetadata("apiSchemas")
if api_schemas is None:
api_schemas = Sdf.TokenListOp()
api_list = list(api_schemas.GetAddedOrExplicitItems())
for api in self.DEFAULT_BIND_APIS:
if api not in api_list:
api_list.append(api)
api_schemas.appendedItems = api_list
prim.SetMetadata("apiSchemas", api_schemas)
layer.Save()
logger.info(f"Successfully converted {urdf_path}{usd_path}")
class PhysicsUSDAdder(MeshtoUSDConverter):
"""Adds physics APIs and collision properties to USD assets.
Useful for post-processing USD files for simulation.
"""
DEFAULT_BIND_APIS = [
"MaterialBindingAPI",
"PhysicsMeshCollisionAPI",
"PhysxConvexDecompositionCollisionAPI",
"PhysicsCollisionAPI",
"PhysxCollisionAPI",
"PhysicsRigidBodyAPI",
]
def convert(self, usd_path: str, output_file: str = None):
"""Adds physics APIs and collision properties to a USD file.
Args:
usd_path (str): Path to input USD file.
output_file (str, optional): Path to output USD file.
"""
from pxr import PhysxSchema, Sdf, Usd, UsdGeom, UsdPhysics
if output_file is None:
output_file = usd_path
else:
dst_dir = os.path.dirname(output_file)
src_dir = os.path.dirname(usd_path)
copytree(src_dir, dst_dir, dirs_exist_ok=True)
stage = Usd.Stage.Open(output_file)
layer = stage.GetRootLayer()
with Usd.EditContext(stage, layer):
for prim in stage.Traverse():
if prim.IsA(UsdGeom.Xform):
for child in prim.GetChildren():
if not child.IsA(UsdGeom.Mesh):
continue
# Skip the lightfactory in Infinigen
if "lightfactory" in prim.GetName().lower():
continue
approx_attr = prim.CreateAttribute(
"physics:approximation", Sdf.ValueTypeNames.Token
)
approx_attr.Set("convexDecomposition")
physx_conv_api = PhysxSchema.PhysxConvexDecompositionCollisionAPI.Apply(
prim
)
physx_conv_api.GetMaxConvexHullsAttr().Set(
self.physx_max_convex_hulls
)
physx_conv_api.GetHullVertexLimitAttr().Set(
self.physx_max_vertices
)
physx_conv_api.GetVoxelResolutionAttr().Set(
self.physx_max_voxel_res
)
physx_conv_api.GetShrinkWrapAttr().Set(True)
rigid_body_api = UsdPhysics.RigidBodyAPI.Apply(prim)
rigid_body_api.CreateKinematicEnabledAttr().Set(True)
if prim.GetAttribute("physics:mass"):
prim.RemoveProperty("physics:mass")
if prim.GetAttribute("physics:velocity"):
prim.RemoveProperty("physics:velocity")
api_schemas = prim.GetMetadata("apiSchemas")
if api_schemas is None:
api_schemas = Sdf.TokenListOp()
api_list = list(api_schemas.GetAddedOrExplicitItems())
for api in self.DEFAULT_BIND_APIS:
if api not in api_list:
api_list.append(api)
api_schemas.appendedItems = api_list
prim.SetMetadata("apiSchemas", api_schemas)
layer.Save()
logger.info(f"Successfully converted {usd_path} to {output_file}")
class URDFtoUSDConverter(MeshtoUSDConverter):
"""Converts URDF files to USD format.
Args:
fix_base (bool, optional): Fix the base link.
merge_fixed_joints (bool, optional): Merge fixed joints.
make_instanceable (bool, optional): Make prims instanceable.
force_usd_conversion (bool, optional): Force conversion to USD.
collision_from_visuals (bool, optional): Generate collisions from visuals.
joint_drive (optional): Joint drive configuration.
rotate_wxyz (tuple[float], optional): Quaternion for rotation.
simulation_app (optional): Simulation app instance.
**kwargs: Additional arguments.
"""
def __init__(
self,
fix_base: bool = False,
merge_fixed_joints: bool = False,
make_instanceable: bool = True,
force_usd_conversion: bool = True,
collision_from_visuals: bool = True,
joint_drive=None,
rotate_wxyz: tuple[float] | None = None,
simulation_app=None,
**kwargs,
):
"""Initializes the converter.
Args:
fix_base (bool, optional): Fix the base link.
merge_fixed_joints (bool, optional): Merge fixed joints.
make_instanceable (bool, optional): Make prims instanceable.
force_usd_conversion (bool, optional): Force conversion to USD.
collision_from_visuals (bool, optional): Generate collisions from visuals.
joint_drive (optional): Joint drive configuration.
rotate_wxyz (tuple[float], optional): Quaternion for rotation.
simulation_app (optional): Simulation app instance.
**kwargs: Additional arguments.
"""
self.usd_parms = dict(
fix_base=fix_base,
merge_fixed_joints=merge_fixed_joints,
make_instanceable=make_instanceable,
force_usd_conversion=force_usd_conversion,
collision_from_visuals=collision_from_visuals,
joint_drive=joint_drive,
**kwargs,
)
self.rotate_wxyz = rotate_wxyz
if simulation_app is not None:
self.simulation_app = simulation_app
def convert(self, urdf_path: str, output_file: str):
"""Converts a URDF file to USD and post-processes collision meshes.
Args:
urdf_path (str): Path to URDF file.
output_file (str): Path to output USD file.
"""
from isaaclab.sim.converters import UrdfConverter, UrdfConverterCfg
from pxr import Gf, PhysxSchema, Sdf, Usd, UsdGeom
cfg = UrdfConverterCfg(
asset_path=urdf_path,
usd_dir=os.path.abspath(os.path.dirname(output_file)),
usd_file_name=os.path.basename(output_file),
**self.usd_parms,
)
urdf_converter = UrdfConverter(cfg)
usd_path = urdf_converter.usd_path
stage = Usd.Stage.Open(usd_path)
layer = stage.GetRootLayer()
with Usd.EditContext(stage, layer):
for prim in stage.Traverse():
if prim.GetName() == "collisions":
approx_attr = prim.CreateAttribute(
"physics:approximation", Sdf.ValueTypeNames.Token
)
approx_attr.Set("convexDecomposition")
physx_conv_api = (
PhysxSchema.PhysxConvexDecompositionCollisionAPI.Apply(
prim
)
)
physx_conv_api.GetMaxConvexHullsAttr().Set(32)
physx_conv_api.GetHullVertexLimitAttr().Set(16)
physx_conv_api.GetVoxelResolutionAttr().Set(10000)
physx_conv_api.GetShrinkWrapAttr().Set(True)
api_schemas = prim.GetMetadata("apiSchemas")
if api_schemas is None:
api_schemas = Sdf.TokenListOp()
api_list = list(api_schemas.GetAddedOrExplicitItems())
for api in self.DEFAULT_BIND_APIS:
if api not in api_list:
api_list.append(api)
api_schemas.appendedItems = api_list
prim.SetMetadata("apiSchemas", api_schemas)
if self.rotate_wxyz is not None:
inner_prim = next(
p
for p in stage.GetDefaultPrim().GetChildren()
if p.IsA(UsdGeom.Xform)
)
xformable = UsdGeom.Xformable(inner_prim)
xformable.ClearXformOpOrder()
orient_op = xformable.AddOrientOp(UsdGeom.XformOp.PrecisionDouble)
orient_op.Set(Gf.Quatd(*self.rotate_wxyz))
layer.Save()
logger.info(f"Successfully converted {urdf_path}{usd_path}")
class AssetConverterFactory:
"""Factory for creating asset converters based on target and source types.
Example:
```py
from embodied_gen.data.asset_converter import AssetConverterFactory
from embodied_gen.utils.enum import AssetType
converter = AssetConverterFactory.create(
target_type=AssetType.USD, source_type=AssetType.MESH
)
with converter:
for urdf_path, output_file in zip(urdf_paths, output_files):
converter.convert(urdf_path, output_file)
```
"""
@staticmethod
def create(
target_type: AssetType, source_type: AssetType = "urdf", **kwargs
) -> AssetConverterBase:
"""Creates an asset converter instance.
Args:
target_type (AssetType): Target asset type.
source_type (AssetType, optional): Source asset type.
**kwargs: Additional arguments.
Returns:
AssetConverterBase: Converter instance.
"""
if target_type == AssetType.MJCF and source_type == AssetType.MESH:
converter = MeshtoMJCFConverter(**kwargs)
elif target_type == AssetType.MJCF and source_type == AssetType.URDF:
converter = URDFtoMJCFConverter(**kwargs)
elif target_type == AssetType.USD and source_type == AssetType.MESH:
converter = MeshtoUSDConverter(**kwargs)
elif target_type == AssetType.USD and source_type == AssetType.URDF:
converter = URDFtoUSDConverter(**kwargs)
else:
raise ValueError(
f"Unsupported converter type: {source_type} -> {target_type}."
)
return converter
if __name__ == "__main__":
target_asset_type = AssetType.MJCF
# target_asset_type = AssetType.USD
urdf_paths = [
'outputs/EmbodiedGenData/demo_assets/banana/result/banana.urdf',
'outputs/EmbodiedGenData/demo_assets/book/result/book.urdf',
'outputs/EmbodiedGenData/demo_assets/lamp/result/lamp.urdf',
'outputs/EmbodiedGenData/demo_assets/mug/result/mug.urdf',
'outputs/EmbodiedGenData/demo_assets/remote_control/result/remote_control.urdf',
"outputs/EmbodiedGenData/demo_assets/rubik's_cube/result/rubik's_cube.urdf",
'outputs/EmbodiedGenData/demo_assets/table/result/table.urdf',
'outputs/EmbodiedGenData/demo_assets/vase/result/vase.urdf',
]
if target_asset_type == AssetType.MJCF:
output_files = [
"outputs/embodiedgen_assets/demo_assets/demo_assets/remote_control/mjcf/remote_control.xml",
]
asset_converter = AssetConverterFactory.create(
target_type=AssetType.MJCF,
source_type=AssetType.MESH,
)
elif target_asset_type == AssetType.USD:
output_files = [
'outputs/EmbodiedGenData/demo_assets/banana/usd/banana.usd',
'outputs/EmbodiedGenData/demo_assets/book/usd/book.usd',
'outputs/EmbodiedGenData/demo_assets/lamp/usd/lamp.usd',
'outputs/EmbodiedGenData/demo_assets/mug/usd/mug.usd',
'outputs/EmbodiedGenData/demo_assets/remote_control/usd/remote_control.usd',
"outputs/EmbodiedGenData/demo_assets/rubik's_cube/usd/rubik's_cube.usd",
'outputs/EmbodiedGenData/demo_assets/table/usd/table.usd',
'outputs/EmbodiedGenData/demo_assets/vase/usd/vase.usd',
]
asset_converter = AssetConverterFactory.create(
target_type=AssetType.USD,
source_type=AssetType.MESH,
)
with asset_converter:
for urdf_path, output_file in zip(urdf_paths, output_files):
asset_converter.convert(urdf_path, output_file)
# urdf_path = "outputs/embodiedgen_assets/demo_assets/remote_control/result/remote_control.urdf"
# output_file = "outputs/embodiedgen_assets/demo_assets/remote_control/usd/remote_control.usd"
# asset_converter = AssetConverterFactory.create(
# target_type=AssetType.USD,
# source_type=AssetType.URDF,
# rotate_wxyz=(0.7071, 0.7071, 0, 0), # rotate 90 deg around the X-axis
# )
# with asset_converter:
# asset_converter.convert(urdf_path, output_file)
# # Convert infinigen urdf to mjcf
# urdf_path = "/home/users/xinjie.wang/xinjie/infinigen/outputs/exports/kitchen_i_urdf/export_scene/scene.urdf"
# output_file = "/home/users/xinjie.wang/xinjie/infinigen/outputs/exports/kitchen_i_urdf/mjcf/scene.xml"
# asset_converter = AssetConverterFactory.create(
# target_type=AssetType.MJCF,
# source_type=AssetType.URDF,
# keep_materials=["diffuse"],
# )
# with asset_converter:
# asset_converter.convert(urdf_path, output_file)
# # Convert infinigen usdc to physics usdc
# converter = PhysicsUSDAdder()
# with converter:
# converter.convert(
# usd_path="/home/users/xinjie.wang/xinjie/infinigen/outputs/usdc/export_scene/export_scene.usdc",
# output_file="/home/users/xinjie.wang/xinjie/infinigen/outputs/usdc_p3/export_scene/export_scene.usdc",
# )