Spaces:
Running
on
Zero
Running
on
Zero
| from __future__ import annotations | |
| import logging | |
| import os | |
| import xml.etree.ElementTree as ET | |
| from abc import ABC, abstractmethod | |
| from glob import glob | |
| from shutil import copy, copytree, rmtree | |
| import trimesh | |
| from scipy.spatial.transform import Rotation | |
| from embodied_gen.utils.enum import AssetType | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| __all__ = [ | |
| "AssetConverterFactory", | |
| "MeshtoMJCFConverter", | |
| "MeshtoUSDConverter", | |
| "URDFtoUSDConverter", | |
| "cvt_embodiedgen_asset_to_anysim", | |
| "PhysicsUSDAdder", | |
| ] | |
| def cvt_embodiedgen_asset_to_anysim( | |
| urdf_files: list[str], | |
| target_dirs: list[str], | |
| target_type: AssetType, | |
| source_type: AssetType, | |
| overwrite: bool = False, | |
| **kwargs, | |
| ) -> dict[str, str]: | |
| """Convert URDF files generated by EmbodiedGen into formats required by simulators. | |
| Supported simulators include SAPIEN, Isaac Sim, MuJoCo, Isaac Gym, Genesis, and Pybullet. | |
| Converting to the `USD` format requires `isaacsim` to be installed. | |
| Example: | |
| ```py | |
| from embodied_gen.data.asset_converter import cvt_embodiedgen_asset_to_anysim | |
| from embodied_gen.utils.enum import AssetType | |
| dst_asset_path = cvt_embodiedgen_asset_to_anysim( | |
| urdf_files=[ | |
| "path1_to_embodiedgen_asset/asset.urdf", | |
| "path2_to_embodiedgen_asset/asset.urdf", | |
| ], | |
| target_dirs=[ | |
| "path1_to_target_dir/asset.usd", | |
| "path2_to_target_dir/asset.usd", | |
| ], | |
| target_type=AssetType.USD, | |
| source_type=AssetType.MESH, | |
| ) | |
| ``` | |
| Args: | |
| urdf_files (list[str]): List of URDF file paths. | |
| target_dirs (list[str]): List of target directories. | |
| target_type (AssetType): Target asset type. | |
| source_type (AssetType): Source asset type. | |
| overwrite (bool, optional): Overwrite existing files. | |
| **kwargs: Additional converter arguments. | |
| Returns: | |
| dict[str, str]: Mapping from URDF file to converted asset file. | |
| """ | |
| if isinstance(urdf_files, str): | |
| urdf_files = [urdf_files] | |
| if isinstance(target_dirs, str): | |
| urdf_files = [target_dirs] | |
| # If the target type is URDF, no conversion is needed. | |
| if target_type == AssetType.URDF: | |
| return {key: key for key in urdf_files} | |
| asset_converter = AssetConverterFactory.create( | |
| target_type=target_type, | |
| source_type=source_type, | |
| **kwargs, | |
| ) | |
| asset_paths = dict() | |
| with asset_converter: | |
| for urdf_file, target_dir in zip(urdf_files, target_dirs): | |
| filename = os.path.basename(urdf_file).replace(".urdf", "") | |
| if target_type == AssetType.MJCF: | |
| target_file = f"{target_dir}/{filename}.xml" | |
| elif target_type == AssetType.USD: | |
| target_file = f"{target_dir}/{filename}.usd" | |
| else: | |
| raise NotImplementedError( | |
| f"Target type {target_type} not supported." | |
| ) | |
| if not os.path.exists(target_file) or overwrite: | |
| asset_converter.convert(urdf_file, target_file) | |
| asset_paths[urdf_file] = target_file | |
| return asset_paths | |
| class AssetConverterBase(ABC): | |
| """Abstract base class for asset converters. | |
| Provides context management and mesh transformation utilities. | |
| """ | |
| def convert(self, urdf_path: str, output_path: str, **kwargs) -> str: | |
| """Convert an asset file. | |
| Args: | |
| urdf_path (str): Path to input URDF file. | |
| output_path (str): Path to output file. | |
| **kwargs: Additional arguments. | |
| Returns: | |
| str: Path to converted asset. | |
| """ | |
| pass | |
| def transform_mesh( | |
| self, input_mesh: str, output_mesh: str, mesh_origin: ET.Element | |
| ) -> None: | |
| """Apply transform to mesh based on URDF origin element. | |
| Args: | |
| input_mesh (str): Path to input mesh. | |
| output_mesh (str): Path to output mesh. | |
| mesh_origin (ET.Element): Origin element from URDF. | |
| """ | |
| mesh = trimesh.load(input_mesh, group_material=False) | |
| rpy = list(map(float, mesh_origin.get("rpy").split(" "))) | |
| rotation = Rotation.from_euler("xyz", rpy, degrees=False) | |
| offset = list(map(float, mesh_origin.get("xyz").split(" "))) | |
| os.makedirs(os.path.dirname(output_mesh), exist_ok=True) | |
| if isinstance(mesh, trimesh.Scene): | |
| combined = trimesh.Scene() | |
| for mesh_part in mesh.geometry.values(): | |
| mesh_part.vertices = ( | |
| mesh_part.vertices @ rotation.as_matrix().T | |
| ) + offset | |
| combined.add_geometry(mesh_part) | |
| _ = combined.export(output_mesh) | |
| else: | |
| mesh.vertices = (mesh.vertices @ rotation.as_matrix().T) + offset | |
| _ = mesh.export(output_mesh) | |
| return | |
| def __enter__(self): | |
| """Context manager entry.""" | |
| return self | |
| def __exit__(self, exc_type, exc_val, exc_tb): | |
| """Context manager exit.""" | |
| return False | |
| class MeshtoMJCFConverter(AssetConverterBase): | |
| """Converts mesh-based URDF files to MJCF format. | |
| Handles geometry, materials, and asset copying. | |
| """ | |
| def __init__( | |
| self, | |
| **kwargs, | |
| ) -> None: | |
| self.kwargs = kwargs | |
| def _copy_asset_file(self, src: str, dst: str) -> None: | |
| """Copies asset file if not already present. | |
| Args: | |
| src (str): Source file path. | |
| dst (str): Destination file path. | |
| """ | |
| if os.path.exists(dst): | |
| return | |
| os.makedirs(os.path.dirname(dst), exist_ok=True) | |
| copy(src, dst) | |
| def add_geometry( | |
| self, | |
| mujoco_element: ET.Element, | |
| link: ET.Element, | |
| body: ET.Element, | |
| tag: str, | |
| input_dir: str, | |
| output_dir: str, | |
| mesh_name: str, | |
| material: ET.Element | None = None, | |
| is_collision: bool = False, | |
| ) -> None: | |
| """Adds geometry to MJCF body from URDF link. | |
| Args: | |
| mujoco_element (ET.Element): MJCF asset element. | |
| link (ET.Element): URDF link element. | |
| body (ET.Element): MJCF body element. | |
| tag (str): Tag name ("visual" or "collision"). | |
| input_dir (str): Input directory. | |
| output_dir (str): Output directory. | |
| mesh_name (str): Mesh name. | |
| material (ET.Element, optional): Material element. | |
| is_collision (bool, optional): If True, treat as collision geometry. | |
| """ | |
| element = link.find(tag) | |
| geometry = element.find("geometry") | |
| mesh = geometry.find("mesh") | |
| filename = mesh.get("filename") | |
| scale = mesh.get("scale", "1.0 1.0 1.0") | |
| input_mesh = f"{input_dir}/{filename}" | |
| output_mesh = f"{output_dir}/{filename}" | |
| self._copy_asset_file(input_mesh, output_mesh) | |
| mesh_origin = element.find("origin") | |
| if mesh_origin is not None: | |
| self.transform_mesh(input_mesh, output_mesh, mesh_origin) | |
| if is_collision: | |
| mesh_parts = trimesh.load( | |
| output_mesh, group_material=False, force="scene" | |
| ) | |
| mesh_parts = mesh_parts.geometry.values() | |
| else: | |
| mesh_parts = [trimesh.load(output_mesh, force="mesh")] | |
| for idx, mesh_part in enumerate(mesh_parts): | |
| if is_collision: | |
| idx_mesh_name = f"{mesh_name}_{idx}" | |
| base, ext = os.path.splitext(filename) | |
| idx_filename = f"{base}_{idx}{ext}" | |
| base_outdir = os.path.dirname(output_mesh) | |
| mesh_part.export(os.path.join(base_outdir, '..', idx_filename)) | |
| geom_attrs = { | |
| "contype": "1", | |
| "conaffinity": "1", | |
| "rgba": "1 1 1 0", | |
| } | |
| else: | |
| idx_mesh_name, idx_filename = mesh_name, filename | |
| geom_attrs = {"contype": "0", "conaffinity": "0"} | |
| ET.SubElement( | |
| mujoco_element, | |
| "mesh", | |
| name=idx_mesh_name, | |
| file=idx_filename, | |
| scale=scale, | |
| ) | |
| geom = ET.SubElement(body, "geom", type="mesh", mesh=idx_mesh_name) | |
| geom.attrib.update(geom_attrs) | |
| if material is not None: | |
| geom.set("material", material.get("name")) | |
| def add_materials( | |
| self, | |
| mujoco_element: ET.Element, | |
| link: ET.Element, | |
| tag: str, | |
| input_dir: str, | |
| output_dir: str, | |
| name: str, | |
| reflectance: float = 0.2, | |
| ) -> ET.Element: | |
| """Adds materials to MJCF asset from URDF link. | |
| Args: | |
| mujoco_element (ET.Element): MJCF asset element. | |
| link (ET.Element): URDF link element. | |
| tag (str): Tag name. | |
| input_dir (str): Input directory. | |
| output_dir (str): Output directory. | |
| name (str): Material name. | |
| reflectance (float, optional): Reflectance value. | |
| Returns: | |
| ET.Element: Material element. | |
| """ | |
| element = link.find(tag) | |
| geometry = element.find("geometry") | |
| mesh = geometry.find("mesh") | |
| filename = mesh.get("filename") | |
| dirname = os.path.dirname(filename) | |
| material = None | |
| for path in glob(f"{input_dir}/{dirname}/*.png"): | |
| file_name = os.path.basename(path) | |
| if "keep_materials" in self.kwargs: | |
| find_flag = False | |
| for keep_key in self.kwargs["keep_materials"]: | |
| if keep_key in file_name.lower(): | |
| find_flag = True | |
| if find_flag is False: | |
| continue | |
| self._copy_asset_file( | |
| path, | |
| f"{output_dir}/{dirname}/{file_name}", | |
| ) | |
| texture_name = f"texture_{name}_{os.path.splitext(file_name)[0]}" | |
| material = ET.SubElement( | |
| mujoco_element, | |
| "material", | |
| name=f"material_{name}", | |
| texture=texture_name, | |
| reflectance=str(reflectance), | |
| ) | |
| ET.SubElement( | |
| mujoco_element, | |
| "texture", | |
| name=texture_name, | |
| type="2d", | |
| file=f"{dirname}/{file_name}", | |
| ) | |
| return material | |
| def convert(self, urdf_path: str, mjcf_path: str): | |
| """Converts a URDF file to MJCF format. | |
| Args: | |
| urdf_path (str): Path to URDF file. | |
| mjcf_path (str): Path to output MJCF file. | |
| """ | |
| tree = ET.parse(urdf_path) | |
| root = tree.getroot() | |
| mujoco_struct = ET.Element("mujoco") | |
| mujoco_struct.set("model", root.get("name")) | |
| mujoco_asset = ET.SubElement(mujoco_struct, "asset") | |
| mujoco_worldbody = ET.SubElement(mujoco_struct, "worldbody") | |
| input_dir = os.path.dirname(urdf_path) | |
| output_dir = os.path.dirname(mjcf_path) | |
| os.makedirs(output_dir, exist_ok=True) | |
| for idx, link in enumerate(root.findall("link")): | |
| link_name = link.get("name", "unnamed_link") | |
| body = ET.SubElement(mujoco_worldbody, "body", name=link_name) | |
| material = self.add_materials( | |
| mujoco_asset, | |
| link, | |
| "visual", | |
| input_dir, | |
| output_dir, | |
| name=str(idx), | |
| ) | |
| joint = ET.SubElement(body, "joint", attrib={"type": "free"}) | |
| self.add_geometry( | |
| mujoco_asset, | |
| link, | |
| body, | |
| "visual", | |
| input_dir, | |
| output_dir, | |
| f"visual_mesh_{idx}", | |
| material, | |
| ) | |
| self.add_geometry( | |
| mujoco_asset, | |
| link, | |
| body, | |
| "collision", | |
| input_dir, | |
| output_dir, | |
| f"collision_mesh_{idx}", | |
| is_collision=True, | |
| ) | |
| tree = ET.ElementTree(mujoco_struct) | |
| ET.indent(tree, space=" ", level=0) | |
| tree.write(mjcf_path, encoding="utf-8", xml_declaration=True) | |
| logger.info(f"Successfully converted {urdf_path} → {mjcf_path}") | |
| class URDFtoMJCFConverter(MeshtoMJCFConverter): | |
| """Converts URDF files with joints to MJCF format, handling joint transformations. | |
| Handles fixed joints and hierarchical body structure. | |
| """ | |
| def convert(self, urdf_path: str, mjcf_path: str, **kwargs) -> str: | |
| """Converts a URDF file with joints to MJCF format. | |
| Args: | |
| urdf_path (str): Path to URDF file. | |
| mjcf_path (str): Path to output MJCF file. | |
| **kwargs: Additional arguments. | |
| Returns: | |
| str: Path to converted MJCF file. | |
| """ | |
| tree = ET.parse(urdf_path) | |
| root = tree.getroot() | |
| mujoco_struct = ET.Element("mujoco") | |
| mujoco_struct.set("model", root.get("name")) | |
| mujoco_asset = ET.SubElement(mujoco_struct, "asset") | |
| mujoco_worldbody = ET.SubElement(mujoco_struct, "worldbody") | |
| input_dir = os.path.dirname(urdf_path) | |
| output_dir = os.path.dirname(mjcf_path) | |
| os.makedirs(output_dir, exist_ok=True) | |
| body_dict = {} | |
| for idx, link in enumerate(root.findall("link")): | |
| link_name = link.get("name", f"unnamed_link_{idx}") | |
| body = ET.SubElement(mujoco_worldbody, "body", name=link_name) | |
| body_dict[link_name] = body | |
| if link.find("visual") is not None: | |
| material = self.add_materials( | |
| mujoco_asset, | |
| link, | |
| "visual", | |
| input_dir, | |
| output_dir, | |
| name=str(idx), | |
| ) | |
| self.add_geometry( | |
| mujoco_asset, | |
| link, | |
| body, | |
| "visual", | |
| input_dir, | |
| output_dir, | |
| f"visual_mesh_{idx}", | |
| material, | |
| ) | |
| if link.find("collision") is not None: | |
| self.add_geometry( | |
| mujoco_asset, | |
| link, | |
| body, | |
| "collision", | |
| input_dir, | |
| output_dir, | |
| f"collision_mesh_{idx}", | |
| is_collision=True, | |
| ) | |
| # Process joints to set transformations and hierarchy | |
| for joint in root.findall("joint"): | |
| joint_type = joint.get("type") | |
| if joint_type != "fixed": | |
| logger.warning("Only support fixed joints in conversion now.") | |
| continue | |
| parent_link = joint.find("parent").get("link") | |
| child_link = joint.find("child").get("link") | |
| origin = joint.find("origin") | |
| if parent_link not in body_dict or child_link not in body_dict: | |
| logger.warning( | |
| f"Parent or child link not found for joint: {joint.get('name')}" | |
| ) | |
| continue | |
| child_body = body_dict[child_link] | |
| mujoco_worldbody.remove(child_body) | |
| parent_body = body_dict[parent_link] | |
| parent_body.append(child_body) | |
| if origin is not None: | |
| xyz = origin.get("xyz", "0 0 0") | |
| rpy = origin.get("rpy", "0 0 0") | |
| child_body.set("pos", xyz) | |
| child_body.set("euler", rpy) | |
| tree = ET.ElementTree(mujoco_struct) | |
| ET.indent(tree, space=" ", level=0) | |
| tree.write(mjcf_path, encoding="utf-8", xml_declaration=True) | |
| logger.info(f"Successfully converted {urdf_path} → {mjcf_path}") | |
| return mjcf_path | |
| class MeshtoUSDConverter(AssetConverterBase): | |
| """Converts mesh-based URDF files to USD format. | |
| Adds physics APIs and post-processes collision meshes. | |
| """ | |
| DEFAULT_BIND_APIS = [ | |
| "MaterialBindingAPI", | |
| "PhysicsMeshCollisionAPI", | |
| "PhysxConvexDecompositionCollisionAPI", | |
| "PhysicsCollisionAPI", | |
| "PhysxCollisionAPI", | |
| "PhysicsMassAPI", | |
| "PhysicsRigidBodyAPI", | |
| "PhysxRigidBodyAPI", | |
| ] | |
| def __init__( | |
| self, | |
| force_usd_conversion: bool = True, | |
| make_instanceable: bool = False, | |
| simulation_app=None, | |
| **kwargs, | |
| ): | |
| """Initializes the converter. | |
| Args: | |
| force_usd_conversion (bool, optional): Force USD conversion. | |
| make_instanceable (bool, optional): Make prims instanceable. | |
| simulation_app (optional): Simulation app instance. | |
| **kwargs: Additional arguments. | |
| """ | |
| if simulation_app is not None: | |
| self.simulation_app = simulation_app | |
| self.exit_close = kwargs.pop("exit_close", True) | |
| self.physx_max_convex_hulls = kwargs.pop("physx_max_convex_hulls", 32) | |
| self.physx_max_vertices = kwargs.pop("physx_max_vertices", 16) | |
| self.physx_max_voxel_res = kwargs.pop("physx_max_voxel_res", 10000) | |
| self.usd_parms = dict( | |
| force_usd_conversion=force_usd_conversion, | |
| make_instanceable=make_instanceable, | |
| **kwargs, | |
| ) | |
| def __enter__(self): | |
| """Context manager entry, launches simulation app if needed.""" | |
| from isaaclab.app import AppLauncher | |
| if not hasattr(self, "simulation_app"): | |
| if "launch_args" not in self.usd_parms: | |
| launch_args = dict( | |
| headless=True, | |
| no_splash=True, | |
| fast_shutdown=True, | |
| disable_gpu=True, | |
| ) | |
| else: | |
| launch_args = self.usd_parms.pop("launch_args") | |
| self.app_launcher = AppLauncher(launch_args) | |
| self.simulation_app = self.app_launcher.app | |
| return self | |
| def __exit__(self, exc_type, exc_val, exc_tb): | |
| """Context manager exit, closes simulation app if created.""" | |
| # Close the simulation app if it was created here | |
| if exc_val is not None: | |
| logger.error(f"Exception occurred: {exc_val}.") | |
| if hasattr(self, "app_launcher") and self.exit_close: | |
| self.simulation_app.close() | |
| return False | |
| def convert(self, urdf_path: str, output_file: str): | |
| """Converts a URDF file to USD and post-processes collision meshes. | |
| Args: | |
| urdf_path (str): Path to URDF file. | |
| output_file (str): Path to output USD file. | |
| """ | |
| from isaaclab.sim.converters import MeshConverter, MeshConverterCfg | |
| from pxr import PhysxSchema, Sdf, Usd, UsdShade | |
| tree = ET.parse(urdf_path) | |
| root = tree.getroot() | |
| mesh_file = root.find("link/visual/geometry/mesh").get("filename") | |
| input_mesh = os.path.join(os.path.dirname(urdf_path), mesh_file) | |
| output_dir = os.path.abspath(os.path.dirname(output_file)) | |
| output_mesh = f"{output_dir}/mesh/{os.path.basename(mesh_file)}" | |
| mesh_origin = root.find("link/visual/origin") | |
| if mesh_origin is not None: | |
| self.transform_mesh(input_mesh, output_mesh, mesh_origin) | |
| cfg = MeshConverterCfg( | |
| asset_path=output_mesh, | |
| usd_dir=output_dir, | |
| usd_file_name=os.path.basename(output_file), | |
| **self.usd_parms, | |
| ) | |
| urdf_converter = MeshConverter(cfg) | |
| usd_path = urdf_converter.usd_path | |
| rmtree(os.path.dirname(output_mesh)) | |
| stage = Usd.Stage.Open(usd_path) | |
| layer = stage.GetRootLayer() | |
| with Usd.EditContext(stage, layer): | |
| base_prim = stage.GetPseudoRoot().GetChildren()[0] | |
| base_prim.SetMetadata("kind", "component") | |
| for prim in stage.Traverse(): | |
| # Change texture path to relative path. | |
| if prim.GetName() == "material_0": | |
| shader = UsdShade.Shader(prim).GetInput("diffuse_texture") | |
| if shader.Get() is not None: | |
| relative_path = shader.Get().path.replace( | |
| f"{output_dir}/", "" | |
| ) | |
| shader.Set(Sdf.AssetPath(relative_path)) | |
| # Add convex decomposition collision and set ShrinkWrap. | |
| elif prim.GetName() == "mesh": | |
| approx_attr = prim.CreateAttribute( | |
| "physics:approximation", Sdf.ValueTypeNames.Token | |
| ) | |
| approx_attr.Set("convexDecomposition") | |
| physx_conv_api = ( | |
| PhysxSchema.PhysxConvexDecompositionCollisionAPI.Apply( | |
| prim | |
| ) | |
| ) | |
| physx_conv_api.GetMaxConvexHullsAttr().Set( | |
| self.physx_max_convex_hulls | |
| ) | |
| physx_conv_api.GetHullVertexLimitAttr().Set( | |
| self.physx_max_vertices | |
| ) | |
| physx_conv_api.GetVoxelResolutionAttr().Set( | |
| self.physx_max_voxel_res | |
| ) | |
| physx_conv_api.GetShrinkWrapAttr().Set(True) | |
| api_schemas = prim.GetMetadata("apiSchemas") | |
| if api_schemas is None: | |
| api_schemas = Sdf.TokenListOp() | |
| api_list = list(api_schemas.GetAddedOrExplicitItems()) | |
| for api in self.DEFAULT_BIND_APIS: | |
| if api not in api_list: | |
| api_list.append(api) | |
| api_schemas.appendedItems = api_list | |
| prim.SetMetadata("apiSchemas", api_schemas) | |
| layer.Save() | |
| logger.info(f"Successfully converted {urdf_path} → {usd_path}") | |
| class PhysicsUSDAdder(MeshtoUSDConverter): | |
| """Adds physics APIs and collision properties to USD assets. | |
| Useful for post-processing USD files for simulation. | |
| """ | |
| DEFAULT_BIND_APIS = [ | |
| "MaterialBindingAPI", | |
| "PhysicsMeshCollisionAPI", | |
| "PhysxConvexDecompositionCollisionAPI", | |
| "PhysicsCollisionAPI", | |
| "PhysxCollisionAPI", | |
| "PhysicsRigidBodyAPI", | |
| ] | |
| def convert(self, usd_path: str, output_file: str = None): | |
| """Adds physics APIs and collision properties to a USD file. | |
| Args: | |
| usd_path (str): Path to input USD file. | |
| output_file (str, optional): Path to output USD file. | |
| """ | |
| from pxr import PhysxSchema, Sdf, Usd, UsdGeom, UsdPhysics | |
| if output_file is None: | |
| output_file = usd_path | |
| else: | |
| dst_dir = os.path.dirname(output_file) | |
| src_dir = os.path.dirname(usd_path) | |
| copytree(src_dir, dst_dir, dirs_exist_ok=True) | |
| stage = Usd.Stage.Open(output_file) | |
| layer = stage.GetRootLayer() | |
| with Usd.EditContext(stage, layer): | |
| for prim in stage.Traverse(): | |
| if prim.IsA(UsdGeom.Xform): | |
| for child in prim.GetChildren(): | |
| if not child.IsA(UsdGeom.Mesh): | |
| continue | |
| # Skip the lightfactory in Infinigen | |
| if "lightfactory" in prim.GetName().lower(): | |
| continue | |
| approx_attr = prim.CreateAttribute( | |
| "physics:approximation", Sdf.ValueTypeNames.Token | |
| ) | |
| approx_attr.Set("convexDecomposition") | |
| physx_conv_api = PhysxSchema.PhysxConvexDecompositionCollisionAPI.Apply( | |
| prim | |
| ) | |
| physx_conv_api.GetMaxConvexHullsAttr().Set( | |
| self.physx_max_convex_hulls | |
| ) | |
| physx_conv_api.GetHullVertexLimitAttr().Set( | |
| self.physx_max_vertices | |
| ) | |
| physx_conv_api.GetVoxelResolutionAttr().Set( | |
| self.physx_max_voxel_res | |
| ) | |
| physx_conv_api.GetShrinkWrapAttr().Set(True) | |
| rigid_body_api = UsdPhysics.RigidBodyAPI.Apply(prim) | |
| rigid_body_api.CreateKinematicEnabledAttr().Set(True) | |
| if prim.GetAttribute("physics:mass"): | |
| prim.RemoveProperty("physics:mass") | |
| if prim.GetAttribute("physics:velocity"): | |
| prim.RemoveProperty("physics:velocity") | |
| api_schemas = prim.GetMetadata("apiSchemas") | |
| if api_schemas is None: | |
| api_schemas = Sdf.TokenListOp() | |
| api_list = list(api_schemas.GetAddedOrExplicitItems()) | |
| for api in self.DEFAULT_BIND_APIS: | |
| if api not in api_list: | |
| api_list.append(api) | |
| api_schemas.appendedItems = api_list | |
| prim.SetMetadata("apiSchemas", api_schemas) | |
| layer.Save() | |
| logger.info(f"Successfully converted {usd_path} to {output_file}") | |
| class URDFtoUSDConverter(MeshtoUSDConverter): | |
| """Converts URDF files to USD format. | |
| Args: | |
| fix_base (bool, optional): Fix the base link. | |
| merge_fixed_joints (bool, optional): Merge fixed joints. | |
| make_instanceable (bool, optional): Make prims instanceable. | |
| force_usd_conversion (bool, optional): Force conversion to USD. | |
| collision_from_visuals (bool, optional): Generate collisions from visuals. | |
| joint_drive (optional): Joint drive configuration. | |
| rotate_wxyz (tuple[float], optional): Quaternion for rotation. | |
| simulation_app (optional): Simulation app instance. | |
| **kwargs: Additional arguments. | |
| """ | |
| def __init__( | |
| self, | |
| fix_base: bool = False, | |
| merge_fixed_joints: bool = False, | |
| make_instanceable: bool = True, | |
| force_usd_conversion: bool = True, | |
| collision_from_visuals: bool = True, | |
| joint_drive=None, | |
| rotate_wxyz: tuple[float] | None = None, | |
| simulation_app=None, | |
| **kwargs, | |
| ): | |
| """Initializes the converter. | |
| Args: | |
| fix_base (bool, optional): Fix the base link. | |
| merge_fixed_joints (bool, optional): Merge fixed joints. | |
| make_instanceable (bool, optional): Make prims instanceable. | |
| force_usd_conversion (bool, optional): Force conversion to USD. | |
| collision_from_visuals (bool, optional): Generate collisions from visuals. | |
| joint_drive (optional): Joint drive configuration. | |
| rotate_wxyz (tuple[float], optional): Quaternion for rotation. | |
| simulation_app (optional): Simulation app instance. | |
| **kwargs: Additional arguments. | |
| """ | |
| self.usd_parms = dict( | |
| fix_base=fix_base, | |
| merge_fixed_joints=merge_fixed_joints, | |
| make_instanceable=make_instanceable, | |
| force_usd_conversion=force_usd_conversion, | |
| collision_from_visuals=collision_from_visuals, | |
| joint_drive=joint_drive, | |
| **kwargs, | |
| ) | |
| self.rotate_wxyz = rotate_wxyz | |
| if simulation_app is not None: | |
| self.simulation_app = simulation_app | |
| def convert(self, urdf_path: str, output_file: str): | |
| """Converts a URDF file to USD and post-processes collision meshes. | |
| Args: | |
| urdf_path (str): Path to URDF file. | |
| output_file (str): Path to output USD file. | |
| """ | |
| from isaaclab.sim.converters import UrdfConverter, UrdfConverterCfg | |
| from pxr import Gf, PhysxSchema, Sdf, Usd, UsdGeom | |
| cfg = UrdfConverterCfg( | |
| asset_path=urdf_path, | |
| usd_dir=os.path.abspath(os.path.dirname(output_file)), | |
| usd_file_name=os.path.basename(output_file), | |
| **self.usd_parms, | |
| ) | |
| urdf_converter = UrdfConverter(cfg) | |
| usd_path = urdf_converter.usd_path | |
| stage = Usd.Stage.Open(usd_path) | |
| layer = stage.GetRootLayer() | |
| with Usd.EditContext(stage, layer): | |
| for prim in stage.Traverse(): | |
| if prim.GetName() == "collisions": | |
| approx_attr = prim.CreateAttribute( | |
| "physics:approximation", Sdf.ValueTypeNames.Token | |
| ) | |
| approx_attr.Set("convexDecomposition") | |
| physx_conv_api = ( | |
| PhysxSchema.PhysxConvexDecompositionCollisionAPI.Apply( | |
| prim | |
| ) | |
| ) | |
| physx_conv_api.GetMaxConvexHullsAttr().Set(32) | |
| physx_conv_api.GetHullVertexLimitAttr().Set(16) | |
| physx_conv_api.GetVoxelResolutionAttr().Set(10000) | |
| physx_conv_api.GetShrinkWrapAttr().Set(True) | |
| api_schemas = prim.GetMetadata("apiSchemas") | |
| if api_schemas is None: | |
| api_schemas = Sdf.TokenListOp() | |
| api_list = list(api_schemas.GetAddedOrExplicitItems()) | |
| for api in self.DEFAULT_BIND_APIS: | |
| if api not in api_list: | |
| api_list.append(api) | |
| api_schemas.appendedItems = api_list | |
| prim.SetMetadata("apiSchemas", api_schemas) | |
| if self.rotate_wxyz is not None: | |
| inner_prim = next( | |
| p | |
| for p in stage.GetDefaultPrim().GetChildren() | |
| if p.IsA(UsdGeom.Xform) | |
| ) | |
| xformable = UsdGeom.Xformable(inner_prim) | |
| xformable.ClearXformOpOrder() | |
| orient_op = xformable.AddOrientOp(UsdGeom.XformOp.PrecisionDouble) | |
| orient_op.Set(Gf.Quatd(*self.rotate_wxyz)) | |
| layer.Save() | |
| logger.info(f"Successfully converted {urdf_path} → {usd_path}") | |
| class AssetConverterFactory: | |
| """Factory for creating asset converters based on target and source types. | |
| Example: | |
| ```py | |
| from embodied_gen.data.asset_converter import AssetConverterFactory | |
| from embodied_gen.utils.enum import AssetType | |
| converter = AssetConverterFactory.create( | |
| target_type=AssetType.USD, source_type=AssetType.MESH | |
| ) | |
| with converter: | |
| for urdf_path, output_file in zip(urdf_paths, output_files): | |
| converter.convert(urdf_path, output_file) | |
| ``` | |
| """ | |
| def create( | |
| target_type: AssetType, source_type: AssetType = "urdf", **kwargs | |
| ) -> AssetConverterBase: | |
| """Creates an asset converter instance. | |
| Args: | |
| target_type (AssetType): Target asset type. | |
| source_type (AssetType, optional): Source asset type. | |
| **kwargs: Additional arguments. | |
| Returns: | |
| AssetConverterBase: Converter instance. | |
| """ | |
| if target_type == AssetType.MJCF and source_type == AssetType.MESH: | |
| converter = MeshtoMJCFConverter(**kwargs) | |
| elif target_type == AssetType.MJCF and source_type == AssetType.URDF: | |
| converter = URDFtoMJCFConverter(**kwargs) | |
| elif target_type == AssetType.USD and source_type == AssetType.MESH: | |
| converter = MeshtoUSDConverter(**kwargs) | |
| elif target_type == AssetType.USD and source_type == AssetType.URDF: | |
| converter = URDFtoUSDConverter(**kwargs) | |
| else: | |
| raise ValueError( | |
| f"Unsupported converter type: {source_type} -> {target_type}." | |
| ) | |
| return converter | |
| if __name__ == "__main__": | |
| target_asset_type = AssetType.MJCF | |
| # target_asset_type = AssetType.USD | |
| urdf_paths = [ | |
| 'outputs/EmbodiedGenData/demo_assets/banana/result/banana.urdf', | |
| 'outputs/EmbodiedGenData/demo_assets/book/result/book.urdf', | |
| 'outputs/EmbodiedGenData/demo_assets/lamp/result/lamp.urdf', | |
| 'outputs/EmbodiedGenData/demo_assets/mug/result/mug.urdf', | |
| 'outputs/EmbodiedGenData/demo_assets/remote_control/result/remote_control.urdf', | |
| "outputs/EmbodiedGenData/demo_assets/rubik's_cube/result/rubik's_cube.urdf", | |
| 'outputs/EmbodiedGenData/demo_assets/table/result/table.urdf', | |
| 'outputs/EmbodiedGenData/demo_assets/vase/result/vase.urdf', | |
| ] | |
| if target_asset_type == AssetType.MJCF: | |
| output_files = [ | |
| "outputs/embodiedgen_assets/demo_assets/demo_assets/remote_control/mjcf/remote_control.xml", | |
| ] | |
| asset_converter = AssetConverterFactory.create( | |
| target_type=AssetType.MJCF, | |
| source_type=AssetType.MESH, | |
| ) | |
| elif target_asset_type == AssetType.USD: | |
| output_files = [ | |
| 'outputs/EmbodiedGenData/demo_assets/banana/usd/banana.usd', | |
| 'outputs/EmbodiedGenData/demo_assets/book/usd/book.usd', | |
| 'outputs/EmbodiedGenData/demo_assets/lamp/usd/lamp.usd', | |
| 'outputs/EmbodiedGenData/demo_assets/mug/usd/mug.usd', | |
| 'outputs/EmbodiedGenData/demo_assets/remote_control/usd/remote_control.usd', | |
| "outputs/EmbodiedGenData/demo_assets/rubik's_cube/usd/rubik's_cube.usd", | |
| 'outputs/EmbodiedGenData/demo_assets/table/usd/table.usd', | |
| 'outputs/EmbodiedGenData/demo_assets/vase/usd/vase.usd', | |
| ] | |
| asset_converter = AssetConverterFactory.create( | |
| target_type=AssetType.USD, | |
| source_type=AssetType.MESH, | |
| ) | |
| with asset_converter: | |
| for urdf_path, output_file in zip(urdf_paths, output_files): | |
| asset_converter.convert(urdf_path, output_file) | |
| # urdf_path = "outputs/embodiedgen_assets/demo_assets/remote_control/result/remote_control.urdf" | |
| # output_file = "outputs/embodiedgen_assets/demo_assets/remote_control/usd/remote_control.usd" | |
| # asset_converter = AssetConverterFactory.create( | |
| # target_type=AssetType.USD, | |
| # source_type=AssetType.URDF, | |
| # rotate_wxyz=(0.7071, 0.7071, 0, 0), # rotate 90 deg around the X-axis | |
| # ) | |
| # with asset_converter: | |
| # asset_converter.convert(urdf_path, output_file) | |
| # # Convert infinigen urdf to mjcf | |
| # urdf_path = "/home/users/xinjie.wang/xinjie/infinigen/outputs/exports/kitchen_i_urdf/export_scene/scene.urdf" | |
| # output_file = "/home/users/xinjie.wang/xinjie/infinigen/outputs/exports/kitchen_i_urdf/mjcf/scene.xml" | |
| # asset_converter = AssetConverterFactory.create( | |
| # target_type=AssetType.MJCF, | |
| # source_type=AssetType.URDF, | |
| # keep_materials=["diffuse"], | |
| # ) | |
| # with asset_converter: | |
| # asset_converter.convert(urdf_path, output_file) | |
| # # Convert infinigen usdc to physics usdc | |
| # converter = PhysicsUSDAdder() | |
| # with converter: | |
| # converter.convert( | |
| # usd_path="/home/users/xinjie.wang/xinjie/infinigen/outputs/usdc/export_scene/export_scene.usdc", | |
| # output_file="/home/users/xinjie.wang/xinjie/infinigen/outputs/usdc_p3/export_scene/export_scene.usdc", | |
| # ) | |