# Copyright (c) 2022 NVIDIA CORPORATION. All rights reserved. # NVIDIA CORPORATION and its licensors retain all intellectual property # and proprietary rights in and to this software, related documentation # and any modifications thereto. Any use, reproduction, disclosure or # distribution of this software and related documentation without an express # license agreement from NVIDIA CORPORATION is strictly prohibited. import numpy as np import re import warp as wp def parse_usd( source, builder, default_density=1.0e3, only_load_enabled_rigid_bodies=False, only_load_enabled_joints=True, default_ke=1e5, default_kd=250.0, default_kf=500.0, default_mu=0.6, default_restitution=0.0, default_thickness=0.0, joint_limit_ke=100.0, joint_limit_kd=10.0, invert_rotations=False, verbose=False, ignore_paths=[], ): """ Parses a Universal Scene Description (USD) stage containing UsdPhysics schema definitions for rigid-body articulations and adds the bodies, shapes and joints to the given ModelBuilder. The USD description has to be either a path (file name or URL), or an existing USD stage instance that implements the `UsdStage `_ interface. Args: source (str | pxr.UsdStage): The file path to the USD file, or an existing USD stage instance. builder (ModelBuilder): The :class:`ModelBuilder` to add the bodies and joints to. default_density (float): The default density to use for bodies without a density attribute. only_load_enabled_rigid_bodies (bool): If True, only rigid bodies which do not have `physics:rigidBodyEnabled` set to False are loaded. only_load_enabled_joints (bool): If True, only joints which do not have `physics:jointEnabled` set to False are loaded. default_ke (float): The default contact stiffness to use, only considered by SemiImplicitIntegrator. default_kd (float): The default contact damping to use, only considered by SemiImplicitIntegrator. default_kf (float): The default friction stiffness to use, only considered by SemiImplicitIntegrator. default_mu (float): The default friction coefficient to use if a shape has not friction coefficient defined. default_restitution (float): The default coefficient of restitution to use if a shape has not coefficient of restitution defined. default_thickness (float): The thickness to add to the shape geometry. joint_limit_ke (float): The default stiffness to use for joint limits, only considered by SemiImplicitIntegrator. joint_limit_kd (float): The default damping to use for joint limits, only considered by SemiImplicitIntegrator. invert_rotations (bool): If True, inverts any rotations defined in the shape transforms. verbose (bool): If True, print additional information about the parsed USD file. ignore_paths (List[str]): A list of regular expressions matching prim paths to ignore. Returns: dict: Dictionary with the following entries: .. list-table:: :widths: 25 75 * - "fps" - USD stage frames per second * - "duration" - Difference between end time code and start time code of the USD stage * - "up_axis" - Upper-case string of the stage's up axis ("X", "Y", or "Z") * - "path_shape_map" - Mapping from prim path (str) of the UsdGeom to the respective shape index in :class:`ModelBuilder` * - "path_body_map" - Mapping from prim path (str) of a rigid body prim (e.g. that implements the PhysicsRigidBodyAPI) to the respective body index in :class:`ModelBuilder` * - "path_shape_scale" - Mapping from prim path (str) of the UsdGeom to its respective 3D world scale * - "mass_unit" - The stage's Kilograms Per Unit (KGPU) definition (1.0 by default) * - "linear_unit" - The stage's Meters Per Unit (MPU) definition (1.0 by default) Note: This importer is experimental and only supports a subset of the USD Physics schema. Please report any issues you encounter. """ try: from pxr import Usd, UsdGeom, UsdPhysics except ImportError: raise ImportError("Failed to import pxr. Please install USD (e.g. via `pip install usd-core`).") def get_attribute(prim, name): if "*" in name: regex = name.replace("*", ".*") for attr in prim.GetAttributes(): if re.match(regex, attr.GetName()): return attr else: return prim.GetAttribute(name) def has_attribute(prim, name): attr = get_attribute(prim, name) return attr.IsValid() and attr.HasAuthoredValue() def parse_float(prim, name, default=None): attr = get_attribute(prim, name) if not attr or not attr.HasAuthoredValue(): return default val = attr.Get() if np.isfinite(val): return val return default def parse_quat(prim, name, default=None): attr = get_attribute(prim, name) if not attr or not attr.HasAuthoredValue(): return default val = attr.Get() if invert_rotations: quat = wp.quat(*val.imaginary, -val.real) else: quat = wp.quat(*val.imaginary, val.real) l = wp.length(quat) if np.isfinite(l) and l > 0.0: return quat return default def parse_vec(prim, name, default=None): attr = get_attribute(prim, name) if not attr or not attr.HasAuthoredValue(): return default val = attr.Get() if np.isfinite(val).all(): return np.array(val, dtype=np.float32) return default def parse_generic(prim, name, default=None): attr = get_attribute(prim, name) if not attr or not attr.HasAuthoredValue(): return default return attr.Get() def str2axis(s: str) -> np.ndarray: axis = np.zeros(3, dtype=np.float32) axis["XYZ".index(s.upper())] = 1.0 return axis if isinstance(source, str): stage = Usd.Stage.Open(source, Usd.Stage.LoadAll) else: stage = source mass_unit = 1.0 try: if UsdPhysics.StageHasAuthoredKilogramsPerUnit(stage): mass_unit = UsdPhysics.GetStageKilogramsPerUnit(stage) except: pass linear_unit = 1.0 try: if UsdGeom.StageHasAuthoredMetersPerUnit(stage): linear_unit = UsdGeom.GetStageMetersPerUnit(stage) except: pass def parse_xform(prim): xform = UsdGeom.Xform(prim) mat = np.array(xform.GetLocalTransformation(), dtype=np.float32) if invert_rotations: rot = wp.quat_from_matrix(wp.mat33(mat[:3, :3].T.flatten())) else: rot = wp.quat_from_matrix(wp.mat33(mat[:3, :3].flatten())) pos = mat[3, :3] * linear_unit scale = np.ones(3, dtype=np.float32) for op in xform.GetOrderedXformOps(): if op.GetOpType() == UsdGeom.XformOp.TypeScale: scale = np.array(op.Get(), dtype=np.float32) return wp.transform(pos, rot), scale def parse_axis(prim, type, joint_data, is_angular, axis=None): # parse joint axis data schemas = prim.GetAppliedSchemas() schemas_str = "".join(schemas) if f"DriveAPI:{type}" not in schemas_str and f"PhysicsLimitAPI:{type}" not in schemas_str: return drive_type = parse_generic(prim, f"drive:{type}:physics:type", "force") if drive_type != "force": print(f"Warning: only force drive type is supported, ignoring drive:{type} for joint {path}") return stiffness = parse_float(prim, f"drive:{type}:physics:stiffness", 0.0) damping = parse_float(prim, f"drive:{type}:physics:damping", 0.0) low = parse_float(prim, f"limit:{type}:physics:low") high = parse_float(prim, f"limit:{type}:physics:high") target_pos = parse_float(prim, f"drive:{type}:physics:targetPosition") target_vel = parse_float(prim, f"drive:{type}:physics:targetVelocity") if is_angular: stiffness *= mass_unit * linear_unit**2 stiffness = np.deg2rad(stiffness) damping *= mass_unit * linear_unit**2 damping = np.deg2rad(damping) if target_pos is not None: target_pos = np.deg2rad(target_pos) if target_vel is not None: target_vel = np.deg2rad(target_vel) if low is None: low = joint_data["lowerLimit"] else: low = np.deg2rad(low) if high is None: high = joint_data["upperLimit"] else: high = np.deg2rad(high) else: stiffness *= mass_unit damping *= mass_unit if target_pos is not None: target_pos *= linear_unit if target_vel is not None: target_vel *= linear_unit if low is None: low = joint_data["lowerLimit"] else: low *= linear_unit if high is None: high = joint_data["upperLimit"] else: high *= linear_unit mode = wp.sim.JOINT_MODE_LIMIT if f"DriveAPI:{type}" in schemas_str: if target_vel is not None and target_vel != 0.0: mode = wp.sim.JOINT_MODE_TARGET_VELOCITY else: mode = wp.sim.JOINT_MODE_TARGET_POSITION if low > high: low = (low + high) / 2 high = low axis = wp.sim.JointAxis( axis=(axis or joint_data["axis"]), limit_lower=low, limit_upper=high, target=(target_pos or target_vel or (low + high) / 2), target_ke=stiffness, target_kd=damping, mode=mode, limit_ke=joint_limit_ke, limit_kd=joint_limit_kd, ) if is_angular: joint_data["angular_axes"].append(axis) else: joint_data["linear_axes"].append(axis) axis_str = "Y" try: axis_str = UsdGeom.GetStageUpAxis(stage) except: pass upaxis = str2axis(axis_str) shape_types = {"Cube", "Sphere", "Mesh", "Capsule", "Plane", "Cylinder", "Cone"} path_body_map = {} path_shape_map = {} path_shape_scale = {} # maps prim path name to its world transform path_world_poses = {} # transform from body frame to where the actual joint child frame is # so that the link's children will use the right parent tf for the joint prim_joint_xforms = {} path_collision_filters = set() no_collision_shapes = set() body_density = {} # mapping from body ID to defined density # first find all joints and materials joint_data = {} # mapping from path of child link to joint USD settings materials = {} # mapping from material path to material USD settings joint_parents = set() # paths of joint parents for prim in stage.Traverse(): type_name = str(prim.GetTypeName()) path = str(prim.GetPath()) # if verbose: # print(path, type_name) if type_name.endswith("Joint"): # the type name can sometimes be "DistancePhysicsJoint" or "PhysicsDistanceJoint" ... type_name = type_name.replace("Physics", "").replace("Joint", "") child = str(prim.GetRelationship("physics:body1").GetTargets()[0]) pos0 = parse_vec(prim, "physics:localPos0", np.zeros(3, dtype=np.float32)) * linear_unit pos1 = parse_vec(prim, "physics:localPos1", np.zeros(3, dtype=np.float32)) * linear_unit rot0 = parse_quat(prim, "physics:localRot0", wp.quat_identity()) rot1 = parse_quat(prim, "physics:localRot1", wp.quat_identity()) joint_data[child] = { "type": type_name, "name": str(prim.GetName()), "parent_tf": wp.transform(pos0, rot0), "child_tf": wp.transform(pos1, rot1), "enabled": parse_generic(prim, "physics:jointEnabled", True), "collisionEnabled": parse_generic(prim, "physics:collisionEnabled", False), "excludeFromArticulation": parse_generic(prim, "physics:excludeFromArticulation", False), "axis": str2axis(parse_generic(prim, "physics:axis", "X")), "breakForce": parse_float(prim, "physics:breakForce", np.inf), "breakTorque": parse_float(prim, "physics:breakTorque", np.inf), "linear_axes": [], "angular_axes": [], } if only_load_enabled_joints and not joint_data[child]["enabled"]: print("Skipping disabled joint", path) continue # parse joint limits lower = parse_float(prim, "physics:lowerLimit", -np.inf) upper = parse_float(prim, "physics:upperLimit", np.inf) if type_name == "Distance": # if distance is negative the joint is not limited joint_data[child]["lowerLimit"] = parse_float(prim, "physics:minDistance", -1.0) * linear_unit joint_data[child]["upperLimit"] = parse_float(prim, "physics:maxDistance", -1.0) * linear_unit elif type_name == "Prismatic": joint_data[child]["lowerLimit"] = lower * linear_unit joint_data[child]["upperLimit"] = upper * linear_unit else: joint_data[child]["lowerLimit"] = np.deg2rad(lower) if np.isfinite(lower) else lower joint_data[child]["upperLimit"] = np.deg2rad(upper) if np.isfinite(upper) else upper if joint_data[child]["lowerLimit"] > joint_data[child]["upperLimit"]: joint_data[child]["lowerLimit"] = ( joint_data[child]["lowerLimit"] + joint_data[child]["upperLimit"] ) / 2 joint_data[child]["upperLimit"] = joint_data[child]["lowerLimit"] parents = prim.GetRelationship("physics:body0").GetTargets() if len(parents) > 0: parent_path = str(parents[0]) joint_data[child]["parent"] = parent_path joint_parents.add(parent_path) else: joint_data[child]["parent"] = None # parse joint drive parse_axis(prim, "angular", joint_data[child], is_angular=True) parse_axis(prim, "rotX", joint_data[child], is_angular=True, axis=(1.0, 0.0, 0.0)) parse_axis(prim, "rotY", joint_data[child], is_angular=True, axis=(0.0, 1.0, 0.0)) parse_axis(prim, "rotZ", joint_data[child], is_angular=True, axis=(0.0, 0.0, 1.0)) parse_axis(prim, "linear", joint_data[child], is_angular=False) parse_axis(prim, "transX", joint_data[child], is_angular=False, axis=(1.0, 0.0, 0.0)) parse_axis(prim, "transY", joint_data[child], is_angular=False, axis=(0.0, 1.0, 0.0)) parse_axis(prim, "transZ", joint_data[child], is_angular=False, axis=(0.0, 0.0, 1.0)) elif type_name == "Material": material = {} if has_attribute(prim, "physics:density"): material["density"] = parse_float(prim, "physics:density") * mass_unit # / (linear_unit**3) if has_attribute(prim, "physics:restitution"): material["restitution"] = parse_float(prim, "physics:restitution", default_restitution) if has_attribute(prim, "physics:staticFriction"): material["staticFriction"] = parse_float(prim, "physics:staticFriction", default_mu) if has_attribute(prim, "physics:dynamicFriction"): material["dynamicFriction"] = parse_float(prim, "physics:dynamicFriction", default_mu) materials[path] = material elif type_name == "PhysicsScene": try: scene = UsdPhysics.Scene(prim) g_vec = scene.GetGravityDirectionAttr() g_mag = scene.GetGravityMagnitudeAttr() if g_mag.HasAuthoredValue() and np.isfinite(g_mag.Get()): builder.gravity = -np.abs(g_mag.Get() * linear_unit) if g_vec.HasAuthoredValue() and np.linalg.norm(g_vec.Get()) > 0.0: builder.up_vector = np.array(g_vec.Get(), dtype=np.float32) if np.any(builder.up_vector < 0.0): builder.up_vector = -builder.up_vector else: builder.up_vector = upaxis except: pass def parse_prim(prim, incoming_xform, incoming_scale, parent_body: int = -1): nonlocal builder nonlocal joint_data nonlocal path_body_map nonlocal path_shape_map nonlocal path_shape_scale nonlocal path_world_poses nonlocal prim_joint_xforms nonlocal path_collision_filters nonlocal no_collision_shapes nonlocal body_density path = str(prim.GetPath()) for pattern in ignore_paths: if re.match(pattern, path): return type_name = str(prim.GetTypeName()) if type_name.endswith("Joint") or type_name.endswith("Light") or type_name.endswith("Material"): return if verbose: print(f"parse_prim {prim.GetPath()} ({type_name})") if type_name == "PhysicsScene": # in case the PhysicsScene has bodies as children... for child in prim.GetChildren(): parse_prim(child, incoming_xform, incoming_scale, parent_body) schemas = set(prim.GetAppliedSchemas()) children_refs = prim.GetChildren() prim_joint_xforms[path] = wp.transform() local_xform, scale = parse_xform(prim) scale = incoming_scale * scale xform = wp.mul(incoming_xform, local_xform) path_world_poses[path] = xform geo_tf = local_xform body_id = parent_body is_rigid_body = "PhysicsRigidBodyAPI" in schemas and parent_body == -1 create_rigid_body = is_rigid_body or path in joint_parents if create_rigid_body: body_id = builder.add_body( origin=xform, name=prim.GetName(), ) path_body_map[path] = body_id body_density[body_id] = 0.0 parent_body = body_id geo_tf = wp.transform() # set up joints between rigid bodies after the children have been added if path in joint_data: joint = joint_data[path] joint_params = dict( child=body_id, linear_axes=joint["linear_axes"], angular_axes=joint["angular_axes"], name=joint["name"], enabled=joint["enabled"], parent_xform=joint["parent_tf"], child_xform=joint["child_tf"], ) parent_path = joint["parent"] if parent_path is None: joint_params["parent"] = -1 parent_tf = wp.transform() else: joint_params["parent"] = path_body_map[parent_path] parent_tf = path_world_poses[parent_path] # the joint to which we are connected will transform this body already geo_tf = wp.transform() if verbose: print(f"Adding joint {joint['name']} between {joint['parent']} and {path}") print(" parent_xform", joint["parent_tf"]) print(" child_xform ", joint["child_tf"]) print(" parent_tf ", parent_tf) print(f" geo_tf at {path} = {geo_tf} (xform was {xform})") if joint["type"] == "Revolute": joint_params["joint_type"] = wp.sim.JOINT_REVOLUTE if len(joint_params["angular_axes"]) == 0: joint_params["angular_axes"].append( wp.sim.JointAxis( joint["axis"], limit_lower=joint["lowerLimit"], limit_upper=joint["upperLimit"], limit_ke=joint_limit_ke, limit_kd=joint_limit_kd, ) ) elif joint["type"] == "Prismatic": joint_params["joint_type"] = wp.sim.JOINT_PRISMATIC if len(joint_params["linear_axes"]) == 0: joint_params["linear_axes"].append( wp.sim.JointAxis( joint["axis"], limit_lower=joint["lowerLimit"], limit_upper=joint["upperLimit"], limit_ke=joint_limit_ke, limit_kd=joint_limit_kd, ) ) elif joint["type"] == "Spherical": joint_params["joint_type"] = wp.sim.JOINT_BALL elif joint["type"] == "Fixed": joint_params["joint_type"] = wp.sim.JOINT_FIXED elif joint["type"] == "Distance": joint_params["joint_type"] = wp.sim.JOINT_DISTANCE # we have to add a dummy linear X axis to define the joint limits joint_params["linear_axes"].append( wp.sim.JointAxis( (1.0, 0.0, 0.0), limit_lower=joint["lowerLimit"], limit_upper=joint["upperLimit"], limit_ke=joint_limit_ke, limit_kd=joint_limit_kd, ) ) elif joint["type"] == "": joint_params["joint_type"] = wp.sim.JOINT_D6 else: print(f"Warning: unsupported joint type {joint['type']} for {path}") builder.add_joint(**joint_params) elif is_rigid_body: builder.add_joint_free(child=body_id) # free joint; we set joint_q/qd, not body_q/qd since eval_fk is used after model creation builder.joint_q[-4:] = xform.q builder.joint_q[-7:-4] = xform.p linear_vel = parse_vec(prim, "physics:velocity", np.zeros(3, dtype=np.float32)) * linear_unit angular_vel = parse_vec(prim, "physics:angularVelocity", np.zeros(3, dtype=np.float32)) * linear_unit builder.joint_qd[-6:-3] = angular_vel builder.joint_qd[-3:] = linear_vel if verbose: print(f"added {type_name} body {body_id} ({path}) at {xform}") density = None material = None if prim.HasRelationship("material:binding:physics"): other_paths = prim.GetRelationship("material:binding:physics").GetTargets() if len(other_paths) > 0: material = materials[str(other_paths[0])] if material is not None: if "density" in material: density = material["density"] if has_attribute(prim, "physics:density"): d = parse_float(prim, "physics:density") density = d * mass_unit # / (linear_unit**3) # assert prim.GetAttribute('orientation').Get() == "rightHanded", "Only right-handed orientations are supported." enabled = parse_generic(prim, "physics:rigidBodyEnabled", True) if only_load_enabled_rigid_bodies and not enabled: if verbose: print("Skipping disabled rigid body", path) return mass = parse_float(prim, "physics:mass") if is_rigid_body: if density is None: density = default_density body_density[body_id] = density elif density is None: if body_id >= 0: density = body_density[body_id] else: density = 0.0 com = parse_vec(prim, "physics:centerOfMass", np.zeros(3, dtype=np.float32)) i_diag = parse_vec(prim, "physics:diagonalInertia", np.zeros(3, dtype=np.float32)) i_rot = parse_quat(prim, "physics:principalAxes", wp.quat_identity()) # parse children if type_name == "Xform": if prim.IsInstance(): proto = prim.GetPrototype() for child in proto.GetChildren(): parse_prim(child, xform, scale, parent_body) else: for child in children_refs: parse_prim(child, xform, scale, parent_body) elif type_name == "Scope": for child in children_refs: parse_prim(child, incoming_xform, incoming_scale, parent_body) elif type_name in shape_types: # parse shapes shape_params = dict( ke=default_ke, kd=default_kd, kf=default_kf, mu=default_mu, restitution=default_restitution ) if material is not None: if "restitution" in material: shape_params["restitution"] = material["restitution"] if "dynamicFriction" in material: shape_params["mu"] = material["dynamicFriction"] if has_attribute(prim, "doubleSided") and not prim.GetAttribute("doubleSided").Get(): print(f"Warning: treating {path} as double-sided because single-sided collisions are not supported.") if type_name == "Cube": size = parse_float(prim, "size", 2.0) if has_attribute(prim, "extents"): extents = parse_vec(prim, "extents") * scale # TODO position geom at extents center? geo_pos = 0.5 * (extents[0] + extents[1]) extents = extents[1] - extents[0] else: extents = scale * size shape_id = builder.add_shape_box( body_id, geo_tf.p, geo_tf.q, hx=extents[0] / 2, hy=extents[1] / 2, hz=extents[2] / 2, density=density, thickness=default_thickness, **shape_params, ) elif type_name == "Sphere": if not (scale[0] == scale[1] == scale[2]): print("Warning: Non-uniform scaling of spheres is not supported.") if has_attribute(prim, "extents"): extents = parse_vec(prim, "extents") * scale # TODO position geom at extents center? geo_pos = 0.5 * (extents[0] + extents[1]) extents = extents[1] - extents[0] if not (extents[0] == extents[1] == extents[2]): print("Warning: Non-uniform extents of spheres are not supported.") radius = extents[0] else: radius = parse_float(prim, "radius", 1.0) * scale[0] shape_id = builder.add_shape_sphere( body_id, geo_tf.p, geo_tf.q, radius, density=density, **shape_params ) elif type_name == "Plane": normal_str = parse_generic(prim, "axis", "Z").upper() geo_rot = geo_tf.q if normal_str != "Y": normal = str2axis(normal_str) c = np.cross(normal, (0.0, 1.0, 0.0)) angle = np.arcsin(np.linalg.norm(c)) axis = c / np.linalg.norm(c) geo_rot = wp.mul(geo_rot, wp.quat_from_axis_angle(axis, angle)) width = parse_float(prim, "width", 0.0) * scale[0] length = parse_float(prim, "length", 0.0) * scale[1] shape_id = builder.add_shape_plane( body=body_id, pos=geo_tf.p, rot=geo_rot, width=width, length=length, thickness=default_thickness, **shape_params, ) elif type_name == "Capsule": axis_str = parse_generic(prim, "axis", "Z").upper() radius = parse_float(prim, "radius", 0.5) * scale[0] half_height = parse_float(prim, "height", 2.0) / 2 * scale[1] assert not has_attribute(prim, "extents"), "Capsule extents are not supported." shape_id = builder.add_shape_capsule( body_id, geo_tf.p, geo_tf.q, radius, half_height, density=density, up_axis="XYZ".index(axis_str), **shape_params, ) elif type_name == "Cylinder": axis_str = parse_generic(prim, "axis", "Z").upper() radius = parse_float(prim, "radius", 0.5) * scale[0] half_height = parse_float(prim, "height", 2.0) / 2 * scale[1] assert not has_attribute(prim, "extents"), "Cylinder extents are not supported." shape_id = builder.add_shape_cylinder( body_id, geo_tf.p, geo_tf.q, radius, half_height, density=density, up_axis="XYZ".index(axis_str), **shape_params, ) elif type_name == "Cone": axis_str = parse_generic(prim, "axis", "Z").upper() radius = parse_float(prim, "radius", 0.5) * scale[0] half_height = parse_float(prim, "height", 2.0) / 2 * scale[1] assert not has_attribute(prim, "extents"), "Cone extents are not supported." shape_id = builder.add_shape_cone( body_id, geo_tf.p, geo_tf.q, radius, half_height, density=density, up_axis="XYZ".index(axis_str), **shape_params, ) elif type_name == "Mesh": mesh = UsdGeom.Mesh(prim) points = np.array(mesh.GetPointsAttr().Get(), dtype=np.float32) indices = np.array(mesh.GetFaceVertexIndicesAttr().Get(), dtype=np.float32) counts = mesh.GetFaceVertexCountsAttr().Get() faces = [] face_id = 0 for count in counts: if count == 3: faces.append(indices[face_id : face_id + 3]) elif count == 4: faces.append(indices[face_id : face_id + 3]) faces.append(indices[[face_id, face_id + 2, face_id + 3]]) else: # assert False, f"Error while parsing USD mesh {path}: encountered polygon with {count} vertices, but only triangles and quads are supported." continue face_id += count m = wp.sim.Mesh(points, np.array(faces, dtype=np.int32).flatten()) shape_id = builder.add_shape_mesh( body_id, geo_tf.p, geo_tf.q, scale=scale, mesh=m, density=density, thickness=default_thickness, **shape_params, ) else: print(f"Warning: Unsupported geometry type {type_name} at {path}.") return path_body_map[path] = body_id path_shape_map[path] = shape_id path_shape_scale[path] = scale if prim.HasRelationship("physics:filteredPairs"): other_paths = prim.GetRelationship("physics:filteredPairs").GetTargets() for other_path in other_paths: path_collision_filters.add((path, str(other_path))) if "PhysicsCollisionAPI" not in schemas or not parse_generic(prim, "physics:collisionEnabled", True): no_collision_shapes.add(shape_id) else: print(f"Warning: encountered unsupported prim type {type_name}") # update mass properties of rigid bodies in cases where properties are defined with higher precedence if body_id >= 0: com = parse_vec(prim, "physics:centerOfMass") if com is not None: # overwrite COM builder.body_com[body_id] = com * scale if mass is not None and not (is_rigid_body and mass == 0.0): mass_ratio = mass / builder.body_mass[body_id] # mass has precedence over density, so we overwrite the mass computed from density builder.body_mass[body_id] = mass * mass_unit if mass > 0.0: builder.body_inv_mass[body_id] = 1.0 / builder.body_mass[body_id] else: builder.body_inv_mass[body_id] = 0.0 # update inertia builder.body_inertia[body_id] *= mass_ratio if builder.body_inertia[body_id].any(): builder.body_inv_inertia[body_id] = np.linalg.inv(builder.body_inertia[body_id]) else: builder.body_inv_inertia[body_id] = np.zeros((3, 3), dtype=np.float32) if np.linalg.norm(i_diag) > 0.0: rot = np.array(wp.quat_to_matrix(i_rot), dtype=np.float32).reshape(3, 3) inertia = rot @ np.diag(i_diag) @ rot.T builder.body_inertia[body_id] = inertia if inertia.any(): builder.body_inv_inertia[body_id] = np.linalg.inv(inertia) else: builder.body_inv_inertia[body_id] = np.zeros((3, 3), dtype=np.float32) parse_prim( stage.GetDefaultPrim(), incoming_xform=wp.transform(), incoming_scale=np.ones(3, dtype=np.float32) * linear_unit ) shape_count = len(builder.shape_geo_type) # apply collision filters now that we have added all shapes for path1, path2 in path_collision_filters: shape1 = path_shape_map[path1] shape2 = path_shape_map[path2] builder.shape_collision_filter_pairs.add((shape1, shape2)) # apply collision filters to all shapes that have no collision for shape_id in no_collision_shapes: for other_shape_id in range(shape_count): if other_shape_id != shape_id: builder.shape_collision_filter_pairs.add((shape_id, other_shape_id)) # return stage parameters return { "fps": stage.GetFramesPerSecond(), "duration": stage.GetEndTimeCode() - stage.GetStartTimeCode(), "up_axis": UsdGeom.GetStageUpAxis(stage).upper(), "path_shape_map": path_shape_map, "path_body_map": path_body_map, "path_shape_scale": path_shape_scale, "mass_unit": mass_unit, "linear_unit": linear_unit, } def resolve_usd_from_url(url: str, target_folder_name: str = None, export_usda: bool = False): """ Downloads a USD file from a URL and resolves all references to other USD files to be downloaded to the given target folder. Args: url (str): URL to the USD file. target_folder_name (str): Target folder name. If None, a timestamped folder will be created in the current directory. export_usda (bool): If True, converts each downloaded USD file to USDA and saves the additional USDA file in the target folder with the same base name as the original USD file. Returns: str: File path to the downloaded USD file. """ import requests import datetime import os try: from pxr import Usd except ImportError: raise ImportError("Failed to import pxr. Please install USD (e.g. via `pip install usd-core`).") response = requests.get(url, allow_redirects=True) if response.status_code != 200: raise RuntimeError(f"Failed to download USD file. Status code: {response.status_code}") file = response.content dot = os.path.extsep base = os.path.basename(url) url_folder = os.path.dirname(url) base_name = dot.join(base.split(dot)[:-1]) if target_folder_name is None: timestamp = datetime.datetime.now().strftime("%Y%m%d%H%M%S") target_folder_name = os.path.join(".usd_cache", f"{base_name}_{timestamp}") os.makedirs(target_folder_name, exist_ok=True) target_filename = os.path.join(target_folder_name, base) with open(target_filename, "wb") as f: f.write(file) stage = Usd.Stage.Open(target_filename, Usd.Stage.LoadNone) stage_str = stage.GetRootLayer().ExportToString() print(f"Downloaded USD file to {target_filename}.") if export_usda: usda_filename = os.path.join(target_folder_name, base_name + ".usda") with open(usda_filename, "w") as f: f.write(stage_str) print(f"Exported USDA file to {usda_filename}.") # parse referenced USD files like `references = @./franka_collisions.usd@` downloaded = set() for match in re.finditer(r"references.=.@(.*?)@", stage_str): refname = match.group(1) if refname.startswith("./"): refname = refname[2:] if refname in downloaded: continue try: response = requests.get(f"{url_folder}/{refname}", allow_redirects=True) if response.status_code != 200: print(f"Failed to download reference {refname}. Status code: {response.status_code}") continue file = response.content refdir = os.path.dirname(refname) if refdir: os.makedirs(os.path.join(target_folder_name, refdir), exist_ok=True) ref_filename = os.path.join(target_folder_name, refname) if not os.path.exists(ref_filename): with open(ref_filename, "wb") as f: f.write(file) downloaded.add(refname) print(f"Downloaded USD reference {refname} to {ref_filename}.") if export_usda: ref_stage = Usd.Stage.Open(ref_filename, Usd.Stage.LoadNone) ref_stage_str = ref_stage.GetRootLayer().ExportToString() base = os.path.basename(ref_filename) base_name = dot.join(base.split(dot)[:-1]) usda_filename = os.path.join(target_folder_name, base_name + ".usda") with open(usda_filename, "w") as f: f.write(ref_stage_str) print(f"Exported USDA file to {usda_filename}.") except Exception: print(f"Failed to download {refname}.") return target_filename