qbhf2's picture
added NvidiaWarp and GarmentCode repos
66c9c8a
# Copyright (c) 2022 NVIDIA CORPORATION. All rights reserved.
# NVIDIA CORPORATION and its licensors retain all intellectual property
# and proprietary rights in and to this software, related documentation
# and any modifications thereto. Any use, reproduction, disclosure or
# distribution of this software and related documentation without an express
# license agreement from NVIDIA CORPORATION is strictly prohibited.
import numpy as np
import re
import warp as wp
def parse_usd(
source,
builder,
default_density=1.0e3,
only_load_enabled_rigid_bodies=False,
only_load_enabled_joints=True,
default_ke=1e5,
default_kd=250.0,
default_kf=500.0,
default_mu=0.6,
default_restitution=0.0,
default_thickness=0.0,
joint_limit_ke=100.0,
joint_limit_kd=10.0,
invert_rotations=False,
verbose=False,
ignore_paths=[],
):
"""
Parses a Universal Scene Description (USD) stage containing UsdPhysics schema definitions for rigid-body articulations and adds the bodies, shapes and joints to the given ModelBuilder.
The USD description has to be either a path (file name or URL), or an existing USD stage instance that implements the `UsdStage <https://openusd.org/dev/api/class_usd_stage.html>`_ interface.
Args:
source (str | pxr.UsdStage): The file path to the USD file, or an existing USD stage instance.
builder (ModelBuilder): The :class:`ModelBuilder` to add the bodies and joints to.
default_density (float): The default density to use for bodies without a density attribute.
only_load_enabled_rigid_bodies (bool): If True, only rigid bodies which do not have `physics:rigidBodyEnabled` set to False are loaded.
only_load_enabled_joints (bool): If True, only joints which do not have `physics:jointEnabled` set to False are loaded.
default_ke (float): The default contact stiffness to use, only considered by SemiImplicitIntegrator.
default_kd (float): The default contact damping to use, only considered by SemiImplicitIntegrator.
default_kf (float): The default friction stiffness to use, only considered by SemiImplicitIntegrator.
default_mu (float): The default friction coefficient to use if a shape has not friction coefficient defined.
default_restitution (float): The default coefficient of restitution to use if a shape has not coefficient of restitution defined.
default_thickness (float): The thickness to add to the shape geometry.
joint_limit_ke (float): The default stiffness to use for joint limits, only considered by SemiImplicitIntegrator.
joint_limit_kd (float): The default damping to use for joint limits, only considered by SemiImplicitIntegrator.
invert_rotations (bool): If True, inverts any rotations defined in the shape transforms.
verbose (bool): If True, print additional information about the parsed USD file.
ignore_paths (List[str]): A list of regular expressions matching prim paths to ignore.
Returns:
dict: Dictionary with the following entries:
.. list-table::
:widths: 25 75
* - "fps"
- USD stage frames per second
* - "duration"
- Difference between end time code and start time code of the USD stage
* - "up_axis"
- Upper-case string of the stage's up axis ("X", "Y", or "Z")
* - "path_shape_map"
- Mapping from prim path (str) of the UsdGeom to the respective shape index in :class:`ModelBuilder`
* - "path_body_map"
- Mapping from prim path (str) of a rigid body prim (e.g. that implements the PhysicsRigidBodyAPI) to the respective body index in :class:`ModelBuilder`
* - "path_shape_scale"
- Mapping from prim path (str) of the UsdGeom to its respective 3D world scale
* - "mass_unit"
- The stage's Kilograms Per Unit (KGPU) definition (1.0 by default)
* - "linear_unit"
- The stage's Meters Per Unit (MPU) definition (1.0 by default)
Note:
This importer is experimental and only supports a subset of the USD Physics schema. Please report any issues you encounter.
"""
try:
from pxr import Usd, UsdGeom, UsdPhysics
except ImportError:
raise ImportError("Failed to import pxr. Please install USD (e.g. via `pip install usd-core`).")
def get_attribute(prim, name):
if "*" in name:
regex = name.replace("*", ".*")
for attr in prim.GetAttributes():
if re.match(regex, attr.GetName()):
return attr
else:
return prim.GetAttribute(name)
def has_attribute(prim, name):
attr = get_attribute(prim, name)
return attr.IsValid() and attr.HasAuthoredValue()
def parse_float(prim, name, default=None):
attr = get_attribute(prim, name)
if not attr or not attr.HasAuthoredValue():
return default
val = attr.Get()
if np.isfinite(val):
return val
return default
def parse_quat(prim, name, default=None):
attr = get_attribute(prim, name)
if not attr or not attr.HasAuthoredValue():
return default
val = attr.Get()
if invert_rotations:
quat = wp.quat(*val.imaginary, -val.real)
else:
quat = wp.quat(*val.imaginary, val.real)
l = wp.length(quat)
if np.isfinite(l) and l > 0.0:
return quat
return default
def parse_vec(prim, name, default=None):
attr = get_attribute(prim, name)
if not attr or not attr.HasAuthoredValue():
return default
val = attr.Get()
if np.isfinite(val).all():
return np.array(val, dtype=np.float32)
return default
def parse_generic(prim, name, default=None):
attr = get_attribute(prim, name)
if not attr or not attr.HasAuthoredValue():
return default
return attr.Get()
def str2axis(s: str) -> np.ndarray:
axis = np.zeros(3, dtype=np.float32)
axis["XYZ".index(s.upper())] = 1.0
return axis
if isinstance(source, str):
stage = Usd.Stage.Open(source, Usd.Stage.LoadAll)
else:
stage = source
mass_unit = 1.0
try:
if UsdPhysics.StageHasAuthoredKilogramsPerUnit(stage):
mass_unit = UsdPhysics.GetStageKilogramsPerUnit(stage)
except:
pass
linear_unit = 1.0
try:
if UsdGeom.StageHasAuthoredMetersPerUnit(stage):
linear_unit = UsdGeom.GetStageMetersPerUnit(stage)
except:
pass
def parse_xform(prim):
xform = UsdGeom.Xform(prim)
mat = np.array(xform.GetLocalTransformation(), dtype=np.float32)
if invert_rotations:
rot = wp.quat_from_matrix(wp.mat33(mat[:3, :3].T.flatten()))
else:
rot = wp.quat_from_matrix(wp.mat33(mat[:3, :3].flatten()))
pos = mat[3, :3] * linear_unit
scale = np.ones(3, dtype=np.float32)
for op in xform.GetOrderedXformOps():
if op.GetOpType() == UsdGeom.XformOp.TypeScale:
scale = np.array(op.Get(), dtype=np.float32)
return wp.transform(pos, rot), scale
def parse_axis(prim, type, joint_data, is_angular, axis=None):
# parse joint axis data
schemas = prim.GetAppliedSchemas()
schemas_str = "".join(schemas)
if f"DriveAPI:{type}" not in schemas_str and f"PhysicsLimitAPI:{type}" not in schemas_str:
return
drive_type = parse_generic(prim, f"drive:{type}:physics:type", "force")
if drive_type != "force":
print(f"Warning: only force drive type is supported, ignoring drive:{type} for joint {path}")
return
stiffness = parse_float(prim, f"drive:{type}:physics:stiffness", 0.0)
damping = parse_float(prim, f"drive:{type}:physics:damping", 0.0)
low = parse_float(prim, f"limit:{type}:physics:low")
high = parse_float(prim, f"limit:{type}:physics:high")
target_pos = parse_float(prim, f"drive:{type}:physics:targetPosition")
target_vel = parse_float(prim, f"drive:{type}:physics:targetVelocity")
if is_angular:
stiffness *= mass_unit * linear_unit**2
stiffness = np.deg2rad(stiffness)
damping *= mass_unit * linear_unit**2
damping = np.deg2rad(damping)
if target_pos is not None:
target_pos = np.deg2rad(target_pos)
if target_vel is not None:
target_vel = np.deg2rad(target_vel)
if low is None:
low = joint_data["lowerLimit"]
else:
low = np.deg2rad(low)
if high is None:
high = joint_data["upperLimit"]
else:
high = np.deg2rad(high)
else:
stiffness *= mass_unit
damping *= mass_unit
if target_pos is not None:
target_pos *= linear_unit
if target_vel is not None:
target_vel *= linear_unit
if low is None:
low = joint_data["lowerLimit"]
else:
low *= linear_unit
if high is None:
high = joint_data["upperLimit"]
else:
high *= linear_unit
mode = wp.sim.JOINT_MODE_LIMIT
if f"DriveAPI:{type}" in schemas_str:
if target_vel is not None and target_vel != 0.0:
mode = wp.sim.JOINT_MODE_TARGET_VELOCITY
else:
mode = wp.sim.JOINT_MODE_TARGET_POSITION
if low > high:
low = (low + high) / 2
high = low
axis = wp.sim.JointAxis(
axis=(axis or joint_data["axis"]),
limit_lower=low,
limit_upper=high,
target=(target_pos or target_vel or (low + high) / 2),
target_ke=stiffness,
target_kd=damping,
mode=mode,
limit_ke=joint_limit_ke,
limit_kd=joint_limit_kd,
)
if is_angular:
joint_data["angular_axes"].append(axis)
else:
joint_data["linear_axes"].append(axis)
axis_str = "Y"
try:
axis_str = UsdGeom.GetStageUpAxis(stage)
except:
pass
upaxis = str2axis(axis_str)
shape_types = {"Cube", "Sphere", "Mesh", "Capsule", "Plane", "Cylinder", "Cone"}
path_body_map = {}
path_shape_map = {}
path_shape_scale = {}
# maps prim path name to its world transform
path_world_poses = {}
# transform from body frame to where the actual joint child frame is
# so that the link's children will use the right parent tf for the joint
prim_joint_xforms = {}
path_collision_filters = set()
no_collision_shapes = set()
body_density = {} # mapping from body ID to defined density
# first find all joints and materials
joint_data = {} # mapping from path of child link to joint USD settings
materials = {} # mapping from material path to material USD settings
joint_parents = set() # paths of joint parents
for prim in stage.Traverse():
type_name = str(prim.GetTypeName())
path = str(prim.GetPath())
# if verbose:
# print(path, type_name)
if type_name.endswith("Joint"):
# the type name can sometimes be "DistancePhysicsJoint" or "PhysicsDistanceJoint" ...
type_name = type_name.replace("Physics", "").replace("Joint", "")
child = str(prim.GetRelationship("physics:body1").GetTargets()[0])
pos0 = parse_vec(prim, "physics:localPos0", np.zeros(3, dtype=np.float32)) * linear_unit
pos1 = parse_vec(prim, "physics:localPos1", np.zeros(3, dtype=np.float32)) * linear_unit
rot0 = parse_quat(prim, "physics:localRot0", wp.quat_identity())
rot1 = parse_quat(prim, "physics:localRot1", wp.quat_identity())
joint_data[child] = {
"type": type_name,
"name": str(prim.GetName()),
"parent_tf": wp.transform(pos0, rot0),
"child_tf": wp.transform(pos1, rot1),
"enabled": parse_generic(prim, "physics:jointEnabled", True),
"collisionEnabled": parse_generic(prim, "physics:collisionEnabled", False),
"excludeFromArticulation": parse_generic(prim, "physics:excludeFromArticulation", False),
"axis": str2axis(parse_generic(prim, "physics:axis", "X")),
"breakForce": parse_float(prim, "physics:breakForce", np.inf),
"breakTorque": parse_float(prim, "physics:breakTorque", np.inf),
"linear_axes": [],
"angular_axes": [],
}
if only_load_enabled_joints and not joint_data[child]["enabled"]:
print("Skipping disabled joint", path)
continue
# parse joint limits
lower = parse_float(prim, "physics:lowerLimit", -np.inf)
upper = parse_float(prim, "physics:upperLimit", np.inf)
if type_name == "Distance":
# if distance is negative the joint is not limited
joint_data[child]["lowerLimit"] = parse_float(prim, "physics:minDistance", -1.0) * linear_unit
joint_data[child]["upperLimit"] = parse_float(prim, "physics:maxDistance", -1.0) * linear_unit
elif type_name == "Prismatic":
joint_data[child]["lowerLimit"] = lower * linear_unit
joint_data[child]["upperLimit"] = upper * linear_unit
else:
joint_data[child]["lowerLimit"] = np.deg2rad(lower) if np.isfinite(lower) else lower
joint_data[child]["upperLimit"] = np.deg2rad(upper) if np.isfinite(upper) else upper
if joint_data[child]["lowerLimit"] > joint_data[child]["upperLimit"]:
joint_data[child]["lowerLimit"] = (
joint_data[child]["lowerLimit"] + joint_data[child]["upperLimit"]
) / 2
joint_data[child]["upperLimit"] = joint_data[child]["lowerLimit"]
parents = prim.GetRelationship("physics:body0").GetTargets()
if len(parents) > 0:
parent_path = str(parents[0])
joint_data[child]["parent"] = parent_path
joint_parents.add(parent_path)
else:
joint_data[child]["parent"] = None
# parse joint drive
parse_axis(prim, "angular", joint_data[child], is_angular=True)
parse_axis(prim, "rotX", joint_data[child], is_angular=True, axis=(1.0, 0.0, 0.0))
parse_axis(prim, "rotY", joint_data[child], is_angular=True, axis=(0.0, 1.0, 0.0))
parse_axis(prim, "rotZ", joint_data[child], is_angular=True, axis=(0.0, 0.0, 1.0))
parse_axis(prim, "linear", joint_data[child], is_angular=False)
parse_axis(prim, "transX", joint_data[child], is_angular=False, axis=(1.0, 0.0, 0.0))
parse_axis(prim, "transY", joint_data[child], is_angular=False, axis=(0.0, 1.0, 0.0))
parse_axis(prim, "transZ", joint_data[child], is_angular=False, axis=(0.0, 0.0, 1.0))
elif type_name == "Material":
material = {}
if has_attribute(prim, "physics:density"):
material["density"] = parse_float(prim, "physics:density") * mass_unit # / (linear_unit**3)
if has_attribute(prim, "physics:restitution"):
material["restitution"] = parse_float(prim, "physics:restitution", default_restitution)
if has_attribute(prim, "physics:staticFriction"):
material["staticFriction"] = parse_float(prim, "physics:staticFriction", default_mu)
if has_attribute(prim, "physics:dynamicFriction"):
material["dynamicFriction"] = parse_float(prim, "physics:dynamicFriction", default_mu)
materials[path] = material
elif type_name == "PhysicsScene":
try:
scene = UsdPhysics.Scene(prim)
g_vec = scene.GetGravityDirectionAttr()
g_mag = scene.GetGravityMagnitudeAttr()
if g_mag.HasAuthoredValue() and np.isfinite(g_mag.Get()):
builder.gravity = -np.abs(g_mag.Get() * linear_unit)
if g_vec.HasAuthoredValue() and np.linalg.norm(g_vec.Get()) > 0.0:
builder.up_vector = np.array(g_vec.Get(), dtype=np.float32)
if np.any(builder.up_vector < 0.0):
builder.up_vector = -builder.up_vector
else:
builder.up_vector = upaxis
except:
pass
def parse_prim(prim, incoming_xform, incoming_scale, parent_body: int = -1):
nonlocal builder
nonlocal joint_data
nonlocal path_body_map
nonlocal path_shape_map
nonlocal path_shape_scale
nonlocal path_world_poses
nonlocal prim_joint_xforms
nonlocal path_collision_filters
nonlocal no_collision_shapes
nonlocal body_density
path = str(prim.GetPath())
for pattern in ignore_paths:
if re.match(pattern, path):
return
type_name = str(prim.GetTypeName())
if type_name.endswith("Joint") or type_name.endswith("Light") or type_name.endswith("Material"):
return
if verbose:
print(f"parse_prim {prim.GetPath()} ({type_name})")
if type_name == "PhysicsScene":
# in case the PhysicsScene has bodies as children...
for child in prim.GetChildren():
parse_prim(child, incoming_xform, incoming_scale, parent_body)
schemas = set(prim.GetAppliedSchemas())
children_refs = prim.GetChildren()
prim_joint_xforms[path] = wp.transform()
local_xform, scale = parse_xform(prim)
scale = incoming_scale * scale
xform = wp.mul(incoming_xform, local_xform)
path_world_poses[path] = xform
geo_tf = local_xform
body_id = parent_body
is_rigid_body = "PhysicsRigidBodyAPI" in schemas and parent_body == -1
create_rigid_body = is_rigid_body or path in joint_parents
if create_rigid_body:
body_id = builder.add_body(
origin=xform,
name=prim.GetName(),
)
path_body_map[path] = body_id
body_density[body_id] = 0.0
parent_body = body_id
geo_tf = wp.transform()
# set up joints between rigid bodies after the children have been added
if path in joint_data:
joint = joint_data[path]
joint_params = dict(
child=body_id,
linear_axes=joint["linear_axes"],
angular_axes=joint["angular_axes"],
name=joint["name"],
enabled=joint["enabled"],
parent_xform=joint["parent_tf"],
child_xform=joint["child_tf"],
)
parent_path = joint["parent"]
if parent_path is None:
joint_params["parent"] = -1
parent_tf = wp.transform()
else:
joint_params["parent"] = path_body_map[parent_path]
parent_tf = path_world_poses[parent_path]
# the joint to which we are connected will transform this body already
geo_tf = wp.transform()
if verbose:
print(f"Adding joint {joint['name']} between {joint['parent']} and {path}")
print(" parent_xform", joint["parent_tf"])
print(" child_xform ", joint["child_tf"])
print(" parent_tf ", parent_tf)
print(f" geo_tf at {path} = {geo_tf} (xform was {xform})")
if joint["type"] == "Revolute":
joint_params["joint_type"] = wp.sim.JOINT_REVOLUTE
if len(joint_params["angular_axes"]) == 0:
joint_params["angular_axes"].append(
wp.sim.JointAxis(
joint["axis"],
limit_lower=joint["lowerLimit"],
limit_upper=joint["upperLimit"],
limit_ke=joint_limit_ke,
limit_kd=joint_limit_kd,
)
)
elif joint["type"] == "Prismatic":
joint_params["joint_type"] = wp.sim.JOINT_PRISMATIC
if len(joint_params["linear_axes"]) == 0:
joint_params["linear_axes"].append(
wp.sim.JointAxis(
joint["axis"],
limit_lower=joint["lowerLimit"],
limit_upper=joint["upperLimit"],
limit_ke=joint_limit_ke,
limit_kd=joint_limit_kd,
)
)
elif joint["type"] == "Spherical":
joint_params["joint_type"] = wp.sim.JOINT_BALL
elif joint["type"] == "Fixed":
joint_params["joint_type"] = wp.sim.JOINT_FIXED
elif joint["type"] == "Distance":
joint_params["joint_type"] = wp.sim.JOINT_DISTANCE
# we have to add a dummy linear X axis to define the joint limits
joint_params["linear_axes"].append(
wp.sim.JointAxis(
(1.0, 0.0, 0.0),
limit_lower=joint["lowerLimit"],
limit_upper=joint["upperLimit"],
limit_ke=joint_limit_ke,
limit_kd=joint_limit_kd,
)
)
elif joint["type"] == "":
joint_params["joint_type"] = wp.sim.JOINT_D6
else:
print(f"Warning: unsupported joint type {joint['type']} for {path}")
builder.add_joint(**joint_params)
elif is_rigid_body:
builder.add_joint_free(child=body_id)
# free joint; we set joint_q/qd, not body_q/qd since eval_fk is used after model creation
builder.joint_q[-4:] = xform.q
builder.joint_q[-7:-4] = xform.p
linear_vel = parse_vec(prim, "physics:velocity", np.zeros(3, dtype=np.float32)) * linear_unit
angular_vel = parse_vec(prim, "physics:angularVelocity", np.zeros(3, dtype=np.float32)) * linear_unit
builder.joint_qd[-6:-3] = angular_vel
builder.joint_qd[-3:] = linear_vel
if verbose:
print(f"added {type_name} body {body_id} ({path}) at {xform}")
density = None
material = None
if prim.HasRelationship("material:binding:physics"):
other_paths = prim.GetRelationship("material:binding:physics").GetTargets()
if len(other_paths) > 0:
material = materials[str(other_paths[0])]
if material is not None:
if "density" in material:
density = material["density"]
if has_attribute(prim, "physics:density"):
d = parse_float(prim, "physics:density")
density = d * mass_unit # / (linear_unit**3)
# assert prim.GetAttribute('orientation').Get() == "rightHanded", "Only right-handed orientations are supported."
enabled = parse_generic(prim, "physics:rigidBodyEnabled", True)
if only_load_enabled_rigid_bodies and not enabled:
if verbose:
print("Skipping disabled rigid body", path)
return
mass = parse_float(prim, "physics:mass")
if is_rigid_body:
if density is None:
density = default_density
body_density[body_id] = density
elif density is None:
if body_id >= 0:
density = body_density[body_id]
else:
density = 0.0
com = parse_vec(prim, "physics:centerOfMass", np.zeros(3, dtype=np.float32))
i_diag = parse_vec(prim, "physics:diagonalInertia", np.zeros(3, dtype=np.float32))
i_rot = parse_quat(prim, "physics:principalAxes", wp.quat_identity())
# parse children
if type_name == "Xform":
if prim.IsInstance():
proto = prim.GetPrototype()
for child in proto.GetChildren():
parse_prim(child, xform, scale, parent_body)
else:
for child in children_refs:
parse_prim(child, xform, scale, parent_body)
elif type_name == "Scope":
for child in children_refs:
parse_prim(child, incoming_xform, incoming_scale, parent_body)
elif type_name in shape_types:
# parse shapes
shape_params = dict(
ke=default_ke, kd=default_kd, kf=default_kf, mu=default_mu, restitution=default_restitution
)
if material is not None:
if "restitution" in material:
shape_params["restitution"] = material["restitution"]
if "dynamicFriction" in material:
shape_params["mu"] = material["dynamicFriction"]
if has_attribute(prim, "doubleSided") and not prim.GetAttribute("doubleSided").Get():
print(f"Warning: treating {path} as double-sided because single-sided collisions are not supported.")
if type_name == "Cube":
size = parse_float(prim, "size", 2.0)
if has_attribute(prim, "extents"):
extents = parse_vec(prim, "extents") * scale
# TODO position geom at extents center?
geo_pos = 0.5 * (extents[0] + extents[1])
extents = extents[1] - extents[0]
else:
extents = scale * size
shape_id = builder.add_shape_box(
body_id,
geo_tf.p,
geo_tf.q,
hx=extents[0] / 2,
hy=extents[1] / 2,
hz=extents[2] / 2,
density=density,
thickness=default_thickness,
**shape_params,
)
elif type_name == "Sphere":
if not (scale[0] == scale[1] == scale[2]):
print("Warning: Non-uniform scaling of spheres is not supported.")
if has_attribute(prim, "extents"):
extents = parse_vec(prim, "extents") * scale
# TODO position geom at extents center?
geo_pos = 0.5 * (extents[0] + extents[1])
extents = extents[1] - extents[0]
if not (extents[0] == extents[1] == extents[2]):
print("Warning: Non-uniform extents of spheres are not supported.")
radius = extents[0]
else:
radius = parse_float(prim, "radius", 1.0) * scale[0]
shape_id = builder.add_shape_sphere(
body_id, geo_tf.p, geo_tf.q, radius, density=density, **shape_params
)
elif type_name == "Plane":
normal_str = parse_generic(prim, "axis", "Z").upper()
geo_rot = geo_tf.q
if normal_str != "Y":
normal = str2axis(normal_str)
c = np.cross(normal, (0.0, 1.0, 0.0))
angle = np.arcsin(np.linalg.norm(c))
axis = c / np.linalg.norm(c)
geo_rot = wp.mul(geo_rot, wp.quat_from_axis_angle(axis, angle))
width = parse_float(prim, "width", 0.0) * scale[0]
length = parse_float(prim, "length", 0.0) * scale[1]
shape_id = builder.add_shape_plane(
body=body_id,
pos=geo_tf.p,
rot=geo_rot,
width=width,
length=length,
thickness=default_thickness,
**shape_params,
)
elif type_name == "Capsule":
axis_str = parse_generic(prim, "axis", "Z").upper()
radius = parse_float(prim, "radius", 0.5) * scale[0]
half_height = parse_float(prim, "height", 2.0) / 2 * scale[1]
assert not has_attribute(prim, "extents"), "Capsule extents are not supported."
shape_id = builder.add_shape_capsule(
body_id,
geo_tf.p,
geo_tf.q,
radius,
half_height,
density=density,
up_axis="XYZ".index(axis_str),
**shape_params,
)
elif type_name == "Cylinder":
axis_str = parse_generic(prim, "axis", "Z").upper()
radius = parse_float(prim, "radius", 0.5) * scale[0]
half_height = parse_float(prim, "height", 2.0) / 2 * scale[1]
assert not has_attribute(prim, "extents"), "Cylinder extents are not supported."
shape_id = builder.add_shape_cylinder(
body_id,
geo_tf.p,
geo_tf.q,
radius,
half_height,
density=density,
up_axis="XYZ".index(axis_str),
**shape_params,
)
elif type_name == "Cone":
axis_str = parse_generic(prim, "axis", "Z").upper()
radius = parse_float(prim, "radius", 0.5) * scale[0]
half_height = parse_float(prim, "height", 2.0) / 2 * scale[1]
assert not has_attribute(prim, "extents"), "Cone extents are not supported."
shape_id = builder.add_shape_cone(
body_id,
geo_tf.p,
geo_tf.q,
radius,
half_height,
density=density,
up_axis="XYZ".index(axis_str),
**shape_params,
)
elif type_name == "Mesh":
mesh = UsdGeom.Mesh(prim)
points = np.array(mesh.GetPointsAttr().Get(), dtype=np.float32)
indices = np.array(mesh.GetFaceVertexIndicesAttr().Get(), dtype=np.float32)
counts = mesh.GetFaceVertexCountsAttr().Get()
faces = []
face_id = 0
for count in counts:
if count == 3:
faces.append(indices[face_id : face_id + 3])
elif count == 4:
faces.append(indices[face_id : face_id + 3])
faces.append(indices[[face_id, face_id + 2, face_id + 3]])
else:
# assert False, f"Error while parsing USD mesh {path}: encountered polygon with {count} vertices, but only triangles and quads are supported."
continue
face_id += count
m = wp.sim.Mesh(points, np.array(faces, dtype=np.int32).flatten())
shape_id = builder.add_shape_mesh(
body_id,
geo_tf.p,
geo_tf.q,
scale=scale,
mesh=m,
density=density,
thickness=default_thickness,
**shape_params,
)
else:
print(f"Warning: Unsupported geometry type {type_name} at {path}.")
return
path_body_map[path] = body_id
path_shape_map[path] = shape_id
path_shape_scale[path] = scale
if prim.HasRelationship("physics:filteredPairs"):
other_paths = prim.GetRelationship("physics:filteredPairs").GetTargets()
for other_path in other_paths:
path_collision_filters.add((path, str(other_path)))
if "PhysicsCollisionAPI" not in schemas or not parse_generic(prim, "physics:collisionEnabled", True):
no_collision_shapes.add(shape_id)
else:
print(f"Warning: encountered unsupported prim type {type_name}")
# update mass properties of rigid bodies in cases where properties are defined with higher precedence
if body_id >= 0:
com = parse_vec(prim, "physics:centerOfMass")
if com is not None:
# overwrite COM
builder.body_com[body_id] = com * scale
if mass is not None and not (is_rigid_body and mass == 0.0):
mass_ratio = mass / builder.body_mass[body_id]
# mass has precedence over density, so we overwrite the mass computed from density
builder.body_mass[body_id] = mass * mass_unit
if mass > 0.0:
builder.body_inv_mass[body_id] = 1.0 / builder.body_mass[body_id]
else:
builder.body_inv_mass[body_id] = 0.0
# update inertia
builder.body_inertia[body_id] *= mass_ratio
if builder.body_inertia[body_id].any():
builder.body_inv_inertia[body_id] = np.linalg.inv(builder.body_inertia[body_id])
else:
builder.body_inv_inertia[body_id] = np.zeros((3, 3), dtype=np.float32)
if np.linalg.norm(i_diag) > 0.0:
rot = np.array(wp.quat_to_matrix(i_rot), dtype=np.float32).reshape(3, 3)
inertia = rot @ np.diag(i_diag) @ rot.T
builder.body_inertia[body_id] = inertia
if inertia.any():
builder.body_inv_inertia[body_id] = np.linalg.inv(inertia)
else:
builder.body_inv_inertia[body_id] = np.zeros((3, 3), dtype=np.float32)
parse_prim(
stage.GetDefaultPrim(), incoming_xform=wp.transform(), incoming_scale=np.ones(3, dtype=np.float32) * linear_unit
)
shape_count = len(builder.shape_geo_type)
# apply collision filters now that we have added all shapes
for path1, path2 in path_collision_filters:
shape1 = path_shape_map[path1]
shape2 = path_shape_map[path2]
builder.shape_collision_filter_pairs.add((shape1, shape2))
# apply collision filters to all shapes that have no collision
for shape_id in no_collision_shapes:
for other_shape_id in range(shape_count):
if other_shape_id != shape_id:
builder.shape_collision_filter_pairs.add((shape_id, other_shape_id))
# return stage parameters
return {
"fps": stage.GetFramesPerSecond(),
"duration": stage.GetEndTimeCode() - stage.GetStartTimeCode(),
"up_axis": UsdGeom.GetStageUpAxis(stage).upper(),
"path_shape_map": path_shape_map,
"path_body_map": path_body_map,
"path_shape_scale": path_shape_scale,
"mass_unit": mass_unit,
"linear_unit": linear_unit,
}
def resolve_usd_from_url(url: str, target_folder_name: str = None, export_usda: bool = False):
"""
Downloads a USD file from a URL and resolves all references to other USD files to be downloaded to the given target folder.
Args:
url (str): URL to the USD file.
target_folder_name (str): Target folder name. If None, a timestamped folder will be created in the current directory.
export_usda (bool): If True, converts each downloaded USD file to USDA and saves the additional USDA file in the target folder with the same base name as the original USD file.
Returns:
str: File path to the downloaded USD file.
"""
import requests
import datetime
import os
try:
from pxr import Usd
except ImportError:
raise ImportError("Failed to import pxr. Please install USD (e.g. via `pip install usd-core`).")
response = requests.get(url, allow_redirects=True)
if response.status_code != 200:
raise RuntimeError(f"Failed to download USD file. Status code: {response.status_code}")
file = response.content
dot = os.path.extsep
base = os.path.basename(url)
url_folder = os.path.dirname(url)
base_name = dot.join(base.split(dot)[:-1])
if target_folder_name is None:
timestamp = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
target_folder_name = os.path.join(".usd_cache", f"{base_name}_{timestamp}")
os.makedirs(target_folder_name, exist_ok=True)
target_filename = os.path.join(target_folder_name, base)
with open(target_filename, "wb") as f:
f.write(file)
stage = Usd.Stage.Open(target_filename, Usd.Stage.LoadNone)
stage_str = stage.GetRootLayer().ExportToString()
print(f"Downloaded USD file to {target_filename}.")
if export_usda:
usda_filename = os.path.join(target_folder_name, base_name + ".usda")
with open(usda_filename, "w") as f:
f.write(stage_str)
print(f"Exported USDA file to {usda_filename}.")
# parse referenced USD files like `references = @./franka_collisions.usd@`
downloaded = set()
for match in re.finditer(r"references.=.@(.*?)@", stage_str):
refname = match.group(1)
if refname.startswith("./"):
refname = refname[2:]
if refname in downloaded:
continue
try:
response = requests.get(f"{url_folder}/{refname}", allow_redirects=True)
if response.status_code != 200:
print(f"Failed to download reference {refname}. Status code: {response.status_code}")
continue
file = response.content
refdir = os.path.dirname(refname)
if refdir:
os.makedirs(os.path.join(target_folder_name, refdir), exist_ok=True)
ref_filename = os.path.join(target_folder_name, refname)
if not os.path.exists(ref_filename):
with open(ref_filename, "wb") as f:
f.write(file)
downloaded.add(refname)
print(f"Downloaded USD reference {refname} to {ref_filename}.")
if export_usda:
ref_stage = Usd.Stage.Open(ref_filename, Usd.Stage.LoadNone)
ref_stage_str = ref_stage.GetRootLayer().ExportToString()
base = os.path.basename(ref_filename)
base_name = dot.join(base.split(dot)[:-1])
usda_filename = os.path.join(target_folder_name, base_name + ".usda")
with open(usda_filename, "w") as f:
f.write(ref_stage_str)
print(f"Exported USDA file to {usda_filename}.")
except Exception:
print(f"Failed to download {refname}.")
return target_filename