diff --git "a/isaac_world_interface.py" "b/isaac_world_interface.py" deleted file mode 100644--- "a/isaac_world_interface.py" +++ /dev/null @@ -1,2355 +0,0 @@ -# Copyright (C) 2023 Andrea Patrizi (AndrePatri, andreapatrizi1b6e6@gmail.com) -# -# This file is part of AugMPCEnvs and distributed under the General Public License version 2 license. -# -# AugMPCEnvs is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 2 of the License, or -# (at your option) any later version. -# -# AugMPCEnvs is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with AugMPCEnvs. If not, see . -# -from isaacsim import SimulationApp - -import carb - -import os -import math - -import torch -import numpy as np - -from typing import Dict, List -from typing_extensions import override - -from EigenIPC.PyEigenIPC import VLevel -from EigenIPC.PyEigenIPC import LogType -from EigenIPC.PyEigenIPC import Journal - -from aug_mpc_envs.utils.math_utils import quat_to_omega -from aug_mpc_envs.utils.height_grid_visualizer import HeightGridVisualizer -from aug_mpc_envs.utils.height_sensor import HeightGridSensor - -from aug_mpc.world_interfaces.world_interface_base import AugMPCWorldInterfaceBase -from mpc_hive.utilities.math_utils_torch import world2base_frame,world2base_frame3D - -class IsaacSimEnv(AugMPCWorldInterfaceBase): - - def __init__(self, - robot_names: List[str], - robot_urdf_paths: List[str], - robot_srdf_paths: List[str], - jnt_imp_config_paths: List[str], - n_contacts: List[int], - cluster_dt: List[float], - use_remote_stepping: List[bool], - name: str = "IsaacSimEnv", - num_envs: int = 1, - debug = False, - verbose: bool = False, - vlevel: VLevel = VLevel.V1, - n_init_step: int = 0, - timeout_ms: int = 60000, - env_opts: Dict = None, - use_gpu: bool = True, - dtype: torch.dtype = torch.float32, - override_low_lev_controller: bool = False): - - self._render_step_counter = 0 - - super().__init__(name=name, - robot_names=robot_names, - robot_urdf_paths=robot_urdf_paths, - robot_srdf_paths=robot_srdf_paths, - jnt_imp_config_paths=jnt_imp_config_paths, - n_contacts=n_contacts, - cluster_dt=cluster_dt, - use_remote_stepping=use_remote_stepping, - num_envs=num_envs, - debug=debug, - verbose=verbose, - vlevel=vlevel, - n_init_step=n_init_step, - timeout_ms=timeout_ms, - env_opts=env_opts, - use_gpu=use_gpu, - dtype=dtype, - override_low_lev_controller=override_low_lev_controller) - - def is_running(self): - return self._simulation_app.is_running() - - def _pre_setup(self): - self._backend="torch" - enable_livestream = self._env_opts["enable_livestream"] - enable_viewport = self._env_opts["enable_viewport"] - base_isaac_exp = f'{os.environ["EXP_PATH"]}/omni.isaac.sim.python.aug_mpc_envs.kit' - base_isaac_exp_headless = f'{os.environ["EXP_PATH"]}/omni.isaac.sim.python.aug_mpc_envs.headless.kit' - - experience=base_isaac_exp - if self._env_opts["headless"]: - info = f"Will run in headless mode." - Journal.log(self.__class__.__name__, - "__init__", - info, - LogType.STAT, - throw_when_excep = True) - if enable_livestream: - experience = "" - elif enable_viewport: - exception = f"Using viewport is not supported yet." - Journal.log(self.__class__.__name__, - "__init__", - exception, - LogType.EXCEP, - throw_when_excep = True) - else: - experience=base_isaac_exp_headless - - self._simulation_app = SimulationApp({"headless": self._env_opts["headless"]}, - experience=experience) - # all imports depending on isaac sim kits have to be done after simulationapp - # from omni.isaac.core.tasks.base_task import BaseTask - self._import_isaac_pkgs() - info = "Using IsaacSim experience file @ " + experience - Journal.log(self.__class__.__name__, - "__init__", - info, - LogType.STAT, - throw_when_excep = True) - # carb.settings.get_settings().set("/persistent/omnihydra/useSceneGraphInstancing", True) - - if enable_livestream: - info = "Livestream enabled" - Journal.log(self.__class__.__name__, - "__init__", - info, - LogType.STAT, - throw_when_excep = True) - - self._simulation_app.set_setting("/app/livestream/enabled", True) - self._simulation_app.set_setting("/app/window/drawMouse", True) - self._simulation_app.set_setting("/app/livestream/proto", "ws") - self._simulation_app.set_setting("/app/livestream/websocket/framerate_limit", 120) - self._simulation_app.set_setting("/ngx/enabled", False) - enable_extension("omni.kit.livestream.native") - enable_extension("omni.services.streaming.manager") - self._render = ((not self._env_opts["headless"]) or self._env_opts["render_to_file"]) or enable_livestream or enable_viewport - - self._record = False - self._world = None - self._physics_context = None - self._scene = None - self._task = None - self._metadata = None - - self._robots_art_views = {} - self._blink_rigid_prim_views = {} - self._robots_articulations = {} - self._robots_geom_prim_views = {} - self.omni_contact_sensors = {} - - self._solver_position_iteration_count=self._env_opts["solver_position_iteration_count"] - self._solver_velocity_iteration_count=self._env_opts["solver_velocity_iteration_count"] - self._solver_stabilization_thresh=self._env_opts["stabilization_threshold"] - self._solver_position_iteration_counts={} - self._solver_velocity_iteration_counts={} - self._solver_stabilization_threshs={} - self._robot_bodynames={} - self._robot_n_links={} - self._robot_n_dofs={} - self._robot_dof_names={} - self._distr_offset={} # decribed how robots within each env are distributed - self._spawning_radius=self._env_opts["spawning_radius"] # [m] -> default distance between roots of robots in a single - self._height_sensors={} - self._height_imgs={} - self._height_vis={} - self._height_vis_step={} - self._height_vis_step={} - self._height_vis={} - self._terrain_hit_margin = 1.0 - self._terrain_hit_log_period = 1000 - self._terrain_hit_active = {} - self._terrain_hit_counts = {} - self._terrain_hit_counts_last_logged = {} - for robot_name in self._robot_names: - self._terrain_hit_active[robot_name] = torch.zeros((self._num_envs,), device=self._device, dtype=torch.bool) - self._terrain_hit_counts[robot_name] = torch.zeros((self._num_envs,), device=self._device, dtype=torch.int32) - self._terrain_hit_counts_last_logged[robot_name] = None - - def _import_isaac_pkgs(self): - # we use global, so that we can create the simulation app inside (and so - # access Isaac's kit) and also expose to all methods the imports - global World, omni_kit, get_context, UsdLux, Sdf, Gf, UsdPhysics, PhysicsSchemaTools, UsdShade, Vt - global enable_extension, set_camera_view, _urdf, move_prim, GridCloner, prim_utils - global get_current_stage, Scene, ArticulationView, RigidPrimView, rep - global OmniContactSensors, RlTerrains,OmniJntImpCntrl - global PhysxSchema, UsdGeom - global _sensor, _dynamic_control - global get_prim_at_path - - from pxr import PhysxSchema, UsdGeom, UsdShade, Vt - - from omni.isaac.core.world import World - from omni.usd import get_context - from pxr import UsdLux, Sdf, Gf, UsdPhysics, PhysicsSchemaTools - from omni.isaac.core.utils.extensions import enable_extension - from omni.isaac.core.utils.viewports import set_camera_view - import omni.kit as omni_kit - from omni.importer.urdf import _urdf - from omni.isaac.core.utils.prims import move_prim - from omni.isaac.cloner import GridCloner - import omni.isaac.core.utils.prims as prim_utils - from omni.isaac.core.utils.stage import get_current_stage - from omni.isaac.core.scenes.scene import Scene - from omni.isaac.core.articulations import ArticulationView - from omni.isaac.core.prims import RigidPrimView - - import omni.replicator.core as rep - - from omni.isaac.core.utils.prims import get_prim_at_path - - from omni.isaac.sensor import _sensor - - from omni.isaac.dynamic_control import _dynamic_control - - from aug_mpc_envs.utils.contact_sensor import OmniContactSensors - from aug_mpc_envs.utils.omni_jnt_imp_cntrl import OmniJntImpCntrl - from aug_mpc_envs.utils.terrains import RlTerrains - - def _parse_env_opts(self): - isaac_opts={} - isaac_opts["envs_ns"]="/World/envs" - isaac_opts["template_env_ns"]=isaac_opts["envs_ns"] + "/env_0" - isaac_opts["base_linkname"]="base_link" - isaac_opts["deduce_base_link"]=False - isaac_opts["ground_plane_prim_path"]="/World/terrain" - isaac_opts["physics_prim_path"]="/physicsScene" - isaac_opts["use_gpu"]=True - isaac_opts["use_gpu_pipeline"]=True - isaac_opts["device"]="cuda" - isaac_opts["is_fixed_base"]=False - isaac_opts["merge_fixed_jnts"]=True - isaac_opts["self_collide"]=True - isaac_opts["sim_device"]="cuda" if isaac_opts["use_gpu"] else "cpu" - isaac_opts["physics_dt"]=1e-3 - isaac_opts["gravity"] = np.array([0.0, 0.0, -9.81]) - isaac_opts["use_fabric"]=True# Enable/disable reading of physics buffers directly. Default is True. - isaac_opts["replicate_physics"]=True - # isaac_opts["worker_thread_count"]=4 - isaac_opts["solver_type"]=1 # 0: PGS, 1:TGS, defaults to TGS. PGS faster but TGS more stable - isaac_opts["enable_stabilization"]=True - # isaac_opts["bounce_threshold_velocity"] = 0.2 - # isaac_opts["friction_offset_threshold"] = 0.04 - # isaac_opts["friction_correlation_distance"] = 0.025 - # isaac_opts["enable_sleeping"] = True - # Per-actor settings ( can override in actor_options ) - isaac_opts["solver_position_iteration_count"] = 4 # defaults to 4 - isaac_opts["solver_velocity_iteration_count"] = 3 # defaults to 1 - isaac_opts["sleep_threshold"] = 1e-5 # Mass-normalized kinetic energy threshold below which an actor may go to sleep. - # Allowed range [0, max_float). - isaac_opts["stabilization_threshold"] = 1e-4 - # Per-body settings ( can override in actor_options ) - # isaac_opts["enable_gyroscopic_forces"] = True - # isaac_opts["density"] = 1000 # density to be used for bodies that do not specify mass or density - # isaac_opts["max_depenetration_velocity"] = 100.0 - # isaac_opts["solver_velocity_iteration_count"] = 1 - # GPU buffers settings - isaac_opts["gpu_max_rigid_contact_count"] = 512 * 2048 - isaac_opts["gpu_max_rigid_patch_count"] = 80 * 2048 - isaac_opts["gpu_found_lost_pairs_capacity"] = 102400 - isaac_opts["gpu_found_lost_aggregate_pairs_capacity"] = 102400 - isaac_opts["gpu_total_aggregate_pairs_capacity"] = 102400 - # isaac_opts["gpu_max_soft_body_contacts"] = 1024 * 1024 - # isaac_opts["gpu_max_particle_contacts"] = 1024 * 1024 - # isaac_opts["gpu_heap_capacity"] = 64 * 1024 * 1024 - # isaac_opts["gpu_temp_buffer_capacity"] = 16 * 1024 * 1024 - # isaac_opts["gpu_max_num_partitions"] = 8 - - isaac_opts["env_spacing"]=15.0 - isaac_opts["spawning_height"]=1.0 - isaac_opts["spawning_radius"]=1.0 - isaac_opts["spawn_height_check_half_extent"]=0.2 - isaac_opts["spawn_height_cushion"]=0.03 - - isaac_opts["enable_height_vis"]=False - isaac_opts["height_vis_radius"]=0.03 - isaac_opts["height_vis_update_period"]=1 - isaac_opts["collision_refinement_level"]=3 # increase cylinder tesselation for smoother wheel contacts - - # rendering helpers - isaac_opts["render_to_file"]=False - isaac_opts["use_follow_camera"]=False # if True, follow robot during rendering in human mode - isaac_opts["render_follow_env_idx"]=0 - isaac_opts["render_follow_robot_idx"]=0 - isaac_opts["render_follow_offset"]=[-0.2, 3.0, 0.1] - isaac_opts["render_follow_target_offset"]=[-0.2, -1.0, 0.0] - isaac_opts["rendering_dt"]=15*isaac_opts["physics_dt"] - isaac_opts["camera_prim_path"]="/OmniverseKit_Persp" - isaac_opts["render_resolution"]=[1280, 720] # [1024, 576] - - isaac_opts["render_panoramic_cam"]=True - isaac_opts["render_panoramic_cam_height"]=2.0 - isaac_opts["render_panoramic_cam_target_xy"]=[10.0, 14.] - isaac_opts["render_panoramic_cam_target_z"]=1.2 - - # ground opts - isaac_opts["use_flat_ground"]=True - isaac_opts["static_friction"]=0.5 - isaac_opts["dynamic_friction"]=0.5 - isaac_opts["restitution"]=0.1 - isaac_opts["ground_type"]="random" - isaac_opts["ground_size"]=800 - isaac_opts["terrain_border"]=isaac_opts["ground_size"]/2 - isaac_opts["dh_ground"]=0.03 - isaac_opts["step_height_lb"]=0.08 - isaac_opts["step_height_ub"]=0.12 - isaac_opts["step_width_lb"]=1.0 - isaac_opts["step_width_ub"]= 1.5 - isaac_opts["contact_prims"] = [] - isaac_opts["sensor_radii"] = 0.1 - isaac_opts["contact_offsets"] = {} - - isaac_opts["enable_livestream"] = False - isaac_opts["enable_viewport"] = False - - isaac_opts["use_diff_vels"] = False - - # random perturbations (impulse, durations, directions are sampled uniformly, force/torque computed accordinly) - isaac_opts["use_random_pertub"]=False - isaac_opts["pert_planar_only"]=True # if True, linear pushes only in xy plane and no torques - - isaac_opts["pert_wrenches_rate"]=30.0 # on average 1 pert every pert_wrenches_rate seconds - isaac_opts["pert_wrenches_min_duration"]=0.6 - isaac_opts["pert_wrenches_max_duration"]=3.5 # [s] - isaac_opts["pert_force_max_weight_scale"]=0.4 # clip force norm to scale*weight - isaac_opts["pert_force_min_weight_scale"]=0.1 # optional min force norm as scale*weight - isaac_opts["pert_torque_max_weight_scale"]=1.0 # clip torque norm to scale*weight*max_ang_impulse_lever - - isaac_opts["pert_target_delta_v"]=2.0 # [m/s] desired max impulse = m*delta_v - isaac_opts["det_pert_rate"]=True - - # max impulse (unitless scale multiplied by weight to get N*s): delta_v/g - isaac_opts["max_lin_impulse_norm"]=isaac_opts["pert_target_delta_v"]/9.81 - isaac_opts["lin_impulse_mag_min"]=1.0 # [0, 1] -> min fraction of max_lin_impulse_norm when sampling - isaac_opts["lin_impulse_mag_max"]=1.0 # [0, 1] -> max fraction of max_lin_impulse_norm when sampling - - isaac_opts["max_ang_impulse_lever"]=0.2 # [m] - isaac_opts["max_ang_impulse_norm"]=isaac_opts["max_lin_impulse_norm"]*isaac_opts["max_ang_impulse_lever"] - isaac_opts["terrain_hit_log_period"]=1000 - - # opts definition end - - isaac_opts.update(self._env_opts) # update defaults with provided opts - isaac_opts["rendering_freq"]=int(isaac_opts["rendering_dt"]/isaac_opts["physics_dt"]) - # isaac_opts["rendering_dt"]=isaac_opts["physics_dt"] # forcing rendering_dt==physics_dt - # for some mystic reason simulation is infuenced by the rendering dt (why ??????) - - # modify things - isaac_opts["cloning_offset"] = np.array([[0.0, 0.0, isaac_opts["spawning_height"]]]*self._num_envs) - if not isaac_opts["use_gpu"]: # don't use GPU at all - isaac_opts["use_gpu_pipeline"]=False - isaac_opts["device"]="cpu" - isaac_opts["sim_device"]="cpu" - else: # use GPU - if isaac_opts["use_gpu_pipeline"]: - isaac_opts["device"]="cuda" - isaac_opts["sim_device"]="cuda" - else: # cpu pipeline - isaac_opts["device"]="cpu" - isaac_opts["sim_device"]="cpu" - isaac_opts["use_gpu"]=False - # isaac_opts["sim_device"]=isaac_opts["device"] - # overwrite env opts in case some sim params were missing - self._env_opts=isaac_opts - - # update device flag based on sim opts - self._device=isaac_opts["device"] - self._use_gpu=isaac_opts["use_gpu"] - - def _calc_robot_distrib(self): - - import math - # we distribute robots in a single env. along the - # circumference of a circle of given radius - n_robots = len(self._robot_names) - offset_baseangle = 2 * math.pi / n_robots - for i in range(n_robots): - offset_angle = offset_baseangle * (i + 1) - robot_offset_wrt_center = torch.tensor([self._spawning_radius * math.cos(offset_angle), - self._spawning_radius * math.sin(offset_angle), 0], - device=self._device, - dtype=self._dtype) - # list with n references to the original tensor - tensor_list = [robot_offset_wrt_center] * self._num_envs - self._distr_offset[self._robot_names[i]] = torch.stack(tensor_list, dim=0) - - def _init_world(self): - - self._cloner = GridCloner(spacing=self._env_opts["env_spacing"]) - self._cloner.define_base_env(self._env_opts["envs_ns"]) - prim_utils.define_prim(self._env_opts["template_env_ns"]) - self._envs_prim_paths = self._cloner.generate_paths(self._env_opts["envs_ns"] + "/env", - self._num_envs) - - # parse device based on sim_param settings - - info = "Using sim device: " + str(self._env_opts["sim_device"]) - Journal.log(self.__class__.__name__, - "__init__", - info, - LogType.STAT, - throw_when_excep = True) - - self._world = World( - physics_dt=self._env_opts["physics_dt"], - rendering_dt=self._env_opts["physics_dt"], # == physics dt (rendering is actually done manually by this env) - backend=self._backend, - device=str(self._env_opts["sim_device"]), - physics_prim_path=self._env_opts["physics_prim_path"], - set_defaults = False, # set to True to use the defaults settings [physics_dt = 1.0/ 60.0, - # stage units in meters = 0.01 (i.e in cms), rendering_dt = 1.0 / 60.0, gravity = -9.81 m / s - # ccd_enabled, stabilization_enabled, gpu dynamics turned off, - # broadcast type is MBP, solver type is TGS] - sim_params=self._env_opts - ) - - big_info = "[World] Creating Isaac simulation " + self._name + "\n" + \ - "use_gpu_pipeline: " + str(self._env_opts["use_gpu_pipeline"]) + "\n" + \ - "device: " + str(self._env_opts["sim_device"]) + "\n" +\ - "backend: " + str(self._backend) + "\n" +\ - "integration_dt: " + str(self.physics_dt()) + "\n" + \ - "rendering_dt: " + str(self.rendering_dt()) + "\n" - Journal.log(self.__class__.__name__, - "_init_world", - big_info, - LogType.STAT, - throw_when_excep = True) - - # we get the physics context to expose additional low-level ## - # settings of the simulation - self._physics_context = self._world.get_physics_context() - self._physics_scene_path = self._physics_context.prim_path - # self._physics_context.enable_gpu_dynamics(True) - self._physics_context.enable_stablization(True) - self._physics_scene_prim = self._physics_context.get_current_physics_scene_prim() - self._solver_type = self._physics_context.get_solver_type() - - if "gpu_max_rigid_contact_count" in self._env_opts: - self._physics_context.set_gpu_max_rigid_contact_count(self._env_opts["gpu_max_rigid_contact_count"]) - if "gpu_max_rigid_patch_count" in self._env_opts: - self._physics_context.set_gpu_max_rigid_patch_count(self._env_opts["gpu_max_rigid_patch_count"]) - if "gpu_found_lost_pairs_capacity" in self._env_opts: - self._physics_context.set_gpu_found_lost_pairs_capacity(self._env_opts["gpu_found_lost_pairs_capacity"]) - if "gpu_found_lost_aggregate_pairs_capacity" in self._env_opts: - self._physics_context.set_gpu_found_lost_aggregate_pairs_capacity(self._env_opts["gpu_found_lost_aggregate_pairs_capacity"]) - if "gpu_total_aggregate_pairs_capacity" in self._env_opts: - self._physics_context.set_gpu_total_aggregate_pairs_capacity(self._env_opts["gpu_total_aggregate_pairs_capacity"]) - if "gpu_max_soft_body_contacts" in self._env_opts: - self._physics_context.set_gpu_max_soft_body_contacts(self._env_opts["gpu_max_soft_body_contacts"]) - if "gpu_max_particle_contacts" in self._env_opts: - self._physics_context.set_gpu_max_particle_contacts(self._env_opts["gpu_max_particle_contacts"]) - if "gpu_heap_capacity" in self._env_opts: - self._physics_context.set_gpu_heap_capacity(self._env_opts["gpu_heap_capacity"]) - if "gpu_temp_buffer_capacity" in self._env_opts: - self._physics_context.set_gpu_temp_buffer_capacity(self._env_opts["gpu_temp_buffer_capacity"]) - if "gpu_max_num_partitions" in self._env_opts: - self._physics_context.set_gpu_max_num_partitions(self._env_opts["gpu_max_num_partitions"]) - - # overwriting defaults - # self._physics_context.set_gpu_max_rigid_contact_count(2 * self._physics_context.get_gpu_max_rigid_contact_count()) - # self._physics_context.set_gpu_max_rigid_patch_count(2 * self._physics_context.get_gpu_max_rigid_patch_count()) - # self._physics_context.set_gpu_found_lost_pairs_capacity(2 * self._physics_context.get_gpu_found_lost_pairs_capacity()) - # self._physics_context.set_gpu_found_lost_aggregate_pairs_capacity(20 * self._physics_context.get_gpu_found_lost_aggregate_pairs_capacity()) - # self._physics_context.set_gpu_total_aggregate_pairs_capacity(20 * self._physics_context.get_gpu_total_aggregate_pairs_capacity()) - # self._physics_context.set_gpu_heap_capacity(2 * self._physics_context.get_gpu_heap_capacity()) - # self._physics_context.set_gpu_temp_buffer_capacity(20 * self._physics_context.get_gpu_heap_capacity()) - # self._physics_context.set_gpu_max_num_partitions(20 * self._physics_context.get_gpu_temp_buffer_capacity()) - - # GPU buffers - self._gpu_max_rigid_contact_count = self._physics_context.get_gpu_max_rigid_contact_count() - self._gpu_max_rigid_patch_count = self._physics_context.get_gpu_max_rigid_patch_count() - self._gpu_found_lost_pairs_capacity = self._physics_context.get_gpu_found_lost_pairs_capacity() - self._gpu_found_lost_aggregate_pairs_capacity = self._physics_context.get_gpu_found_lost_aggregate_pairs_capacity() - self._gpu_total_aggregate_pairs_capacity = self._physics_context.get_gpu_total_aggregate_pairs_capacity() - self._gpu_max_soft_body_contacts = self._physics_context.get_gpu_max_soft_body_contacts() - self._gpu_max_particle_contacts = self._physics_context.get_gpu_max_particle_contacts() - self._gpu_heap_capacity = self._physics_context.get_gpu_heap_capacity() - self._gpu_temp_buffer_capacity = self._physics_context.get_gpu_temp_buffer_capacity() - # self._gpu_max_num_partitions = physics_context.get_gpu_max_num_partitions() # BROKEN->method does not exist - - big_info2 = "[physics context]:" + "\n" + \ - "gpu_max_rigid_contact_count: " + str(self._gpu_max_rigid_contact_count) + "\n" + \ - "gpu_max_rigid_patch_count: " + str(self._gpu_max_rigid_patch_count) + "\n" + \ - "gpu_found_lost_pairs_capacity: " + str(self._gpu_found_lost_pairs_capacity) + "\n" + \ - "gpu_found_lost_aggregate_pairs_capacity: " + str(self._gpu_found_lost_aggregate_pairs_capacity) + "\n" + \ - "gpu_total_aggregate_pairs_capacity: " + str(self._gpu_total_aggregate_pairs_capacity) + "\n" + \ - "gpu_max_soft_body_contacts: " + str(self._gpu_max_soft_body_contacts) + "\n" + \ - "gpu_max_particle_contacts: " + str(self._gpu_max_particle_contacts) + "\n" + \ - "gpu_heap_capacity: " + str(self._gpu_heap_capacity) + "\n" + \ - "gpu_temp_buffer_capacity: " + str(self._gpu_temp_buffer_capacity) + "\n" + \ - "use_gpu_sim: " + str(self._world.get_physics_context().use_gpu_sim) + "\n" + \ - "use_gpu_pipeline: " + str(self._world.get_physics_context().use_gpu_pipeline) + "\n" + \ - "use_fabric: " + str(self._world.get_physics_context().use_fabric) + "\n" + \ - "world device: " + str(self._world.get_physics_context().device) + "\n" + \ - "physics context device: " + str(self._world.get_physics_context().device) + "\n" - - Journal.log(self.__class__.__name__, - "set_task", - big_info2, - LogType.STAT, - throw_when_excep = True) - - self._scene = self._world.scene - self._physics_context = self._world.get_physics_context() - - self._stage = get_context().get_stage() - - # strong, uniform lighting: bright sun + dome fill to cover the whole terrain - prim_utils.define_prim("/World/Lighting", "Xform") - sun_path = "/World/Lighting/SunLight" - dome_path = "/World/Lighting/AmbientDome" - - distantLight = UsdLux.DistantLight.Define(self._stage, Sdf.Path(sun_path)) - distantLight.CreateIntensityAttr(450.0) - distantLight.CreateAngleAttr(0.5) # soften shadows a bit - distantLight.CreateColorAttr(Gf.Vec3f(1.0, 1.0, 1.0)) - # Shadow attr naming differs across versions; set the underlying USD attribute directly. - distantLight.GetPrim().CreateAttribute("shadow:enable", Sdf.ValueTypeNames.Bool).Set(True) - - domeLight = UsdLux.DomeLight.Define(self._stage, Sdf.Path(dome_path)) - domeLight.CreateIntensityAttr(200.0) - domeLight.CreateExposureAttr(1.0) - domeLight.CreateColorAttr(Gf.Vec3f(1.0, 1.0, 1.0)) - - self._configure_scene() - - # if "enable_viewport" in sim_params: - # self._render = sim_params["enable_viewport"] - - def _get_baselink_candidate(self, - robot_name: str): - - stage=get_current_stage() - all_prims = [prim.GetPath().pathString for prim in stage.Traverse()] - filtered_prims = [prim for prim in all_prims if f"/{robot_name}/" in prim] - - matching=min(filtered_prims, key=len) if filtered_prims else None - - return matching.split('/')[-1] - - def _configure_scene(self): - - # environment - self._fix_base = [self._env_opts["is_fixed_base"]] * len(self._robot_names) - self._self_collide = [self._env_opts["self_collide"]] * len(self._robot_names) - self._merge_fixed = [self._env_opts["merge_fixed_jnts"]] * len(self._robot_names) - - Journal.log(self.__class__.__name__, - "_configure_scene", - "cloning environments...", - LogType.STAT, - throw_when_excep = True) - - self._ground_plane_prim_paths=[] - self._ground_plane=None - self.terrain_generator = None - if not self._env_opts["use_flat_ground"]: - # ensure terrain is large enough to contain all env clones - spacing = float(self._env_opts["env_spacing"]) - num_envs = self._num_envs - num_per_row = max(1, int(math.sqrt(num_envs))) - num_rows = int(math.ceil(num_envs / num_per_row)) - num_cols = int(math.ceil(num_envs / num_rows)) - row_offset = 0.5 * spacing * (num_rows - 1) - col_offset = 0.5 * spacing * (num_cols - 1) - margin = spacing # leave a full spacing as border cushion - required_size = 2.0 * (max(row_offset, col_offset) + margin) - - if required_size > self._env_opts["ground_size"]: - old_size = self._env_opts["ground_size"] - self._env_opts["ground_size"] = required_size - self._env_opts["terrain_border"] = self._env_opts["ground_size"] / 2.0 - Journal.log(self.__class__.__name__, - "_configure_scene", - f"Ground size increased from {old_size} to {required_size} to fit {num_envs} envs (spacing {spacing}).", - LogType.WARN, - throw_when_excep = True) - - min_height=-self._env_opts["dh_ground"] - max_height=self._env_opts["dh_ground"] - step=max_height-min_height - if self._env_opts["ground_type"]=="random": - terrain_prim_path=self._env_opts["ground_plane_prim_path"]+"_random_unif" - self._ground_plane_prim_paths.append(terrain_prim_path) - self.terrain_generator = RlTerrains(get_current_stage(), prim_path=terrain_prim_path) - self._ground_plane=self.terrain_generator.create_random_uniform_terrain(terrain_size=self._env_opts["ground_size"], - min_height=min_height, - max_height=max_height, - step=step, - position=np.array([0.0, 0.0,0.0]), - static_friction=self._env_opts["static_friction"], - dynamic_friction=self._env_opts["dynamic_friction"], - restitution=self._env_opts["restitution"]) - elif self._env_opts["ground_type"]=="random_patches": - terrain_prim_path=self._env_opts["ground_plane_prim_path"]+"_random_unif_patches" - self._ground_plane_prim_paths.append(terrain_prim_path) - self.terrain_generator = RlTerrains(get_current_stage(), prim_path=terrain_prim_path) - self._ground_plane=self.terrain_generator.create_random_patched_terrain(terrain_size=self._env_opts["ground_size"], - min_height=min_height, - max_height=max_height, - step=step, - position=np.array([0.0, 0.0,0.0]), - static_friction=self._env_opts["static_friction"], - dynamic_friction=self._env_opts["dynamic_friction"], - restitution=self._env_opts["restitution"], - patch_ratio=0.8, - patch_size=10 - ) - elif self._env_opts["ground_type"]=="slopes": - terrain_prim_path=self._env_opts["ground_plane_prim_path"]+"_slopes" - self._ground_plane_prim_paths.append(terrain_prim_path) - self.terrain_generator = RlTerrains(get_current_stage(), prim_path=terrain_prim_path) - self._ground_plane=self.terrain_generator.create_sloped_terrain(terrain_size=self._env_opts["ground_size"], - slope=-0.5, - position=np.array([0.0, 0.0,0.0]), - static_friction=self._env_opts["static_friction"], - dynamic_friction=self._env_opts["dynamic_friction"], - restitution=self._env_opts["restitution"] - ) - elif self._env_opts["ground_type"]=="stairs": - terrain_prim_path=self._env_opts["ground_plane_prim_path"]+"_stairs" - self._ground_plane_prim_paths.append(terrain_prim_path) - self.terrain_generator = RlTerrains(get_current_stage(), prim_path=terrain_prim_path) - self._ground_plane=self.terrain_generator.create_stairs_terrain(terrain_size=self._env_opts["ground_size"], - position=np.array([0.0, 0.0,0.0]), - static_friction=self._env_opts["static_friction"], - dynamic_friction=self._env_opts["dynamic_friction"], - restitution=self._env_opts["restitution"], - ) - elif self._env_opts["ground_type"]=="stepup": - terrain_prim_path=self._env_opts["ground_plane_prim_path"]+"_stepup" - self._ground_plane_prim_paths.append(terrain_prim_path) - self.terrain_generator = RlTerrains(get_current_stage(), prim_path=terrain_prim_path) - self._ground_plane=self.terrain_generator.create_stepup_terrain( - terrain_size=self._env_opts["ground_size"], - stairs_ratio=0.3, - min_steps=1, - max_steps=1, - pyramid_platform_size=15.0, - position=np.array([0.0, 0.0, 0.0]), - static_friction=self._env_opts["static_friction"], - dynamic_friction=self._env_opts["dynamic_friction"], - restitution=self._env_opts["restitution"], - step_height=0.15 - ) - elif self._env_opts["ground_type"]=="stepup_prim": - terrain_prim_path=self._env_opts["ground_plane_prim_path"]+"_stepup_prim" - self._ground_plane_prim_paths.append(terrain_prim_path) - self.terrain_generator = RlTerrains(get_current_stage(), prim_path=terrain_prim_path) - self._ground_plane=self.terrain_generator.create_stepup_prim_terrain( - terrain_size=self._env_opts["ground_size"], - stairs_ratio=0.9, - platform_size=50.0, - step_height_lb=self._env_opts["step_height_lb"], - step_height_ub=self._env_opts["step_height_ub"], - min_step_width=self._env_opts.get("step_width_lb", None), - max_step_width=self._env_opts.get("step_width_ub", None), - position=np.array([0.0, 0.0, 0.0]), - static_friction=self._env_opts["static_friction"], - dynamic_friction=self._env_opts["dynamic_friction"], - restitution=self._env_opts["restitution"], - n_steps=25, - area_factor=0.7, - random_n_steps=False - ) - # apply a custom checker material to the terrain primitives - mat_path = self._ensure_lightblue_checker_material() - self._apply_checker_material_to_terrain(terrain_root_path=terrain_prim_path, material_path=mat_path) - self._add_checker_overlay_plane(terrain_root_path=terrain_prim_path, material_path=mat_path) - self._add_checker_overlays_on_tiles(terrain_root_path=terrain_prim_path, material_path=mat_path) - else: - ground_type=self._env_opts["ground_type"] - Journal.log(self.__class__.__name__, - "_configure_scene", - f"Terrain type {ground_type} not supported. Will default to flat ground.", - LogType.EXCEP, - throw_when_excep = True) - - # add offsets to intial height depending on the terrain heightmap - if self.terrain_generator is not None: - stage = get_current_stage() - up_axis = UsdGeom.GetStageUpAxis(stage) - - spacing = self._env_opts["env_spacing"] - num_envs = self._num_envs - - num_per_row = max(1, int(np.sqrt(num_envs))) - num_rows = int(np.ceil(num_envs / num_per_row)) - num_cols = int(np.ceil(num_envs / num_rows)) - - row_offset = 0.5 * spacing * (num_rows - 1) - col_offset = 0.5 * spacing * (num_cols - 1) - - offsets = np.array(self._env_opts["cloning_offset"], dtype=float) - - for env_idx in range(num_envs): - row = env_idx // num_cols - col = env_idx % num_cols - x = row_offset - row * spacing - y = col * spacing - col_offset - - half_extent = self._env_opts["spawn_height_check_half_extent"] - if up_axis == UsdGeom.Tokens.z: - height = self.terrain_generator.get_max_height_in_rect(x, y, half_extent=half_extent) - else: - height = self.terrain_generator.get_max_height_in_rect(x, y, half_extent=half_extent) - - offsets[env_idx][2] += height + self._env_opts["spawn_height_cushion"] - - self._env_opts["cloning_offset"] = offsets - - else: - defaul_prim_path=self._env_opts["ground_plane_prim_path"]+"_default" - self._ground_plane_prim_paths.append(defaul_prim_path) - self._ground_plane=self._scene.add_default_ground_plane(z_position=0, - name="terrain", - prim_path=defaul_prim_path, - static_friction=self._env_opts["static_friction"], - dynamic_friction=self._env_opts["dynamic_friction"], - restitution=self._env_opts["restitution"]) - self._terrain_collision=None - - physics_material=UsdPhysics.MaterialAPI.Apply(self._ground_plane.prim) - physics_material.CreateDynamicFrictionAttr(self._env_opts["dynamic_friction"]) - physics_material.CreateStaticFrictionAttr(self._env_opts["static_friction"]) - physics_material.CreateRestitutionAttr(self._env_opts["restitution"]) - # self._ground_plane.apply_physics_material(physics_material) - - self._terrain_collision=PhysxSchema.PhysxCollisionAPI.Get(get_current_stage(), self._ground_plane.prim_path) - self._terrain_material=UsdPhysics.MaterialAPI.Get(get_current_stage(), self._ground_plane.prim_path) - self._terrain_physix_material=PhysxSchema.PhysxMaterialAPI.Get(get_current_stage(), self._ground_plane.prim_path) - - for i in range(len(self._robot_names)): - robot_name = self._robot_names[i] - - urdf_path = self._robot_urdf_paths[robot_name] - srdf_path = self._robot_srdf_paths[robot_name] - fix_base = self._fix_base[i] - self_collide = self._self_collide[i] - merge_fixed = self._merge_fixed[i] - - self._generate_rob_descriptions(robot_name=robot_name, - urdf_path=urdf_path, - srdf_path=srdf_path) - self._import_urdf(robot_name, - fix_base=fix_base, - self_collide=self_collide, - merge_fixed=merge_fixed) - - self._cloner.clone( - source_prim_path=self._env_opts["template_env_ns"], - prim_paths=self._envs_prim_paths, - replicate_physics=self._env_opts["replicate_physics"], - position_offsets=self._env_opts["cloning_offset"] - ) # we can clone the environment in which all the robos are - - base_link_name=self._env_opts["base_linkname"] - if self._env_opts["deduce_base_link"]: - base_link_name=self._get_baselink_candidate(robot_name=robot_name) - - self._robots_art_views[robot_name] = ArticulationView(name = robot_name + "ArtView", - prim_paths_expr = self._env_opts["envs_ns"] + "/env_.*"+ "/" + robot_name + "/" + base_link_name, - reset_xform_properties=False) - self._robots_articulations[robot_name] = self._scene.add(self._robots_art_views[robot_name]) - - # height grid sensor (terrain may be None if using flat ground) - self._height_sensors[robot_name] = HeightGridSensor( - terrain_utils=self.terrain_generator if not self._env_opts["use_flat_ground"] else None, - grid_size=int(self._env_opts["height_sensor_pixels"]), - resolution=float(self._env_opts["height_sensor_resolution"]), - forward_offset=float(self._env_opts["height_sensor_forward_offset"]), - lateral_offset=float(self._env_opts["height_sensor_lateral_offset"]), - n_envs=self._num_envs, - device=self._device, - dtype=self._dtype) - # ensure shared-data flags are set if a height sensor is active - self._env_opts["height_sensor_pixels"] = int(self._env_opts["height_sensor_pixels"]) - self._env_opts["height_sensor_resolution"] = float(self._env_opts["height_sensor_resolution"]) - self._enable_height_shared = True - self._height_sensor_pixels = self._env_opts["height_sensor_pixels"] - self._height_sensor_resolution = self._env_opts["height_sensor_resolution"] - self._height_vis_step[robot_name] = 0 - if self._env_opts["enable_height_vis"]: - self._height_vis[robot_name] = HeightGridVisualizer( - robot_name=robot_name, - num_envs=self._num_envs, - grid_size=int(self._env_opts["height_sensor_pixels"]), - resolution=float(self._env_opts["height_sensor_resolution"]), - marker_radius=float(self._env_opts["height_vis_radius"]), - forward_offset=float(self._env_opts["height_sensor_forward_offset"]), - lateral_offset=float(self._env_opts["height_sensor_lateral_offset"]), - device=self._device, - dtype=self._dtype) - - self._blink_rigid_prim_views[robot_name] = RigidPrimView(prim_paths_expr=self._env_opts["envs_ns"] + "/env_.*"+ "/" + robot_name + "/" + base_link_name, - name = robot_name + "RigidPrimView") # base link prim views - self._scene.add(self._blink_rigid_prim_views[robot_name]) # need to add so it is properly initialized when resetting world - - # self._robots_geom_prim_views[robot_name] = GeometryPrimView(name = robot_name + "GeomView", - # prim_paths_expr = self._env_ns + "/env*"+ "/" + robot_name, - # # prepare_contact_sensors = True - # ) - # self._robots_geom_prim_views[robot_name].apply_collision_apis() # to be able to apply contact sensors - - # init contact sensors - self._init_contact_sensors(robot_name=robot_name) # IMPORTANT: this has to be called - # after calling the clone() method and initializing articulation views!! - - # self._set_contact_links_material(prim_path="/World/envs/env_0/kyon_no_wheels/lower_leg_1/collisions/mesh_1") - # self._set_contact_links_material(prim_path="/World/envs/env_0/kyon_no_wheels/lower_leg_2/collisions/mesh_1") - # self._set_contact_links_material(prim_path="/World/envs/env_0/kyon_no_wheels/lower_leg_3/collisions/mesh_1") - # self._set_contact_links_material(prim_path="/World/envs/env_0/kyon_no_wheels/lower_leg_4/collisions/mesh_1") - - # filter collisions between default ground plane and custom terrains - # self._cloner.filter_collisions(physicsscene_path = self._physics_context.prim_path, - # collision_root_path = "/World/terrain_collisions", - # prim_paths=[self._ground_plane_prim_paths[1]], - # global_paths=[self._ground_plane_prim_paths[0]] - # ) - - # delete_prim(self._env_opts["ground_plane_prim_path"] + "/SphereLight") # we remove the default spherical light - - # set default camera viewport position and target - camera_position=[4.2, 4.2, 1.5] - camera_target=[0, 0, 0] - # use a single camera prim (configurable) for both viewport and render products - camera_prim = self._env_opts["camera_prim_path"] - self._set_initial_camera_params(camera_position=camera_position, - camera_target=camera_target, - camera_prim_path=camera_prim) - - if self._env_opts["render_to_file"]: - # base output dir - from datetime import datetime - timestamp = datetime.now().strftime("h%H_m%M_s%S_%d_%m_%Y") - self._render_output_dir = f"/tmp/IsaacRenderings/{timestamp}" - res = tuple(int(x) for x in self._env_opts["render_resolution"]) - - # create render product from chosen camera prim - self._render_product = rep.create.render_product(camera_prim, - res, name='rendering_camera') - self._render_writer = rep.WriterRegistry.get("BasicWriter") - self._render_writer.initialize(output_dir=self._render_output_dir, - rgb=True) - self._render_writer.attach([self._render_product]) - # optional top-down capture - if self._env_opts["render_panoramic_cam"]: - td_height = float(self._env_opts["render_panoramic_cam_height"]) - td_dir = self._render_output_dir + "/panoramic_cam" - td_offset = self._env_opts["render_panoramic_cam_target_xy"] - td_target_z = float(self._env_opts["render_panoramic_cam_target_z"]) - pos = [8.0, 11.0, td_height] - self._panoramic_cam_camera = rep.create.camera(focal_length=12, - name='rendering_camera_panoramic_cam', - clipping_range = (1, 200), - position = pos, - look_at = [td_offset[0], td_offset[1], td_target_z]) - self._panoramic_cam_render_product = rep.create.render_product(self._panoramic_cam_camera, - res, name='rendering_camera_panoramic_cam') - self._panoramic_cam_writer = rep.WriterRegistry.get("BasicWriter") - self._panoramic_cam_writer.initialize(output_dir=td_dir, rgb=True) - self._panoramic_cam_writer.attach([self._panoramic_cam_render_product]) - - self.apply_collision_filters(self._physics_context.prim_path, - "/World/collisions") - - self._reset_sim() - - self._fill_robot_info_from_world() - # initializes robot state data - - # update solver options - self._update_art_solver_options() - self._get_solver_info() # get again solver option before printing everything - self._print_envs_info() # debug print - - # for n in range(self._n_init_steps): # run some initialization steps - # self._step_sim() - - self._init_robots_state() - - self.scene_setup_completed = True - - Journal.log(self.__class__.__name__, - "set_up_scene", - "finished scene setup...", - LogType.STAT, - throw_when_excep = True) - - self._is = _sensor.acquire_imu_sensor_interface() - self._dyn_control=_dynamic_control.acquire_dynamic_control_interface() - - def _set_contact_links_material(self, prim_path: str): - prim=get_prim_at_path(prim_path) - physics_material=UsdPhysics.MaterialAPI.Apply(prim) - physics_material.CreateDynamicFrictionAttr(0) - physics_material.CreateStaticFrictionAttr(0) - physics_material.CreateRestitutionAttr(1.0) - physxMaterialAPI=PhysxSchema.PhysxMaterialAPI.Apply(prim) - physxMaterialAPI.CreateFrictionCombineModeAttr().Set("multiply") # average, min, multiply, max - physxMaterialAPI.CreateRestitutionCombineModeAttr().Set("multiply") - - def _get_lightblue_checker_texture_path(self) -> str: - tex_rel_path = os.path.join(os.path.dirname(__file__), "..", "assets", "textures", "ibrido_terrain_texture.png") - return os.path.abspath(tex_rel_path) - - def _ensure_lightblue_checker_material(self): - """Create (or reuse) a light-blue checker material for primitive terrains.""" - stage = get_current_stage() - mat_path = "/World/Looks/IbridoCheckerMaterial" - mat_prim = stage.GetPrimAtPath(mat_path) - if mat_prim.IsValid(): - return mat_path - - texture_path = self._get_lightblue_checker_texture_path() - - material = UsdShade.Material.Define(stage, mat_path) - - st_reader = UsdShade.Shader.Define(stage, f"{mat_path}/PrimvarReader_st") - st_reader.CreateIdAttr("UsdPrimvarReader_float2") - st_reader.CreateInput("varname", Sdf.ValueTypeNames.Token).Set("st") - st_reader.CreateOutput("result", Sdf.ValueTypeNames.Float2) - - uv_transform = UsdShade.Shader.Define(stage, f"{mat_path}/UVTransform") - uv_transform.CreateIdAttr("UsdTransform2d") - # keep UV scale at 1 here; tiling is controlled via mesh UVs - uv_transform.CreateInput("in", Sdf.ValueTypeNames.Float2).ConnectToSource(st_reader.GetOutput("result")) - uv_transform.CreateInput("scale", Sdf.ValueTypeNames.Float2).Set(Gf.Vec2f(1.0, 1.0)) - uv_transform.CreateOutput("result", Sdf.ValueTypeNames.Float2) - - tex = UsdShade.Shader.Define(stage, f"{mat_path}/CheckerTex") - tex.CreateIdAttr("UsdUVTexture") - tex.CreateInput("file", Sdf.ValueTypeNames.Asset).Set(Sdf.AssetPath(texture_path)) - tex.CreateInput("st", Sdf.ValueTypeNames.Float2).ConnectToSource(uv_transform.GetOutput("result")) - tex.CreateInput("wrapS", Sdf.ValueTypeNames.Token).Set("repeat") - tex.CreateInput("wrapT", Sdf.ValueTypeNames.Token).Set("repeat") - tex.CreateInput("minFilter", Sdf.ValueTypeNames.Token).Set("nearest") - tex.CreateInput("magFilter", Sdf.ValueTypeNames.Token).Set("nearest") - # disable mipmaps to avoid blurring sharp edges - tex.CreateInput("mipFilter", Sdf.ValueTypeNames.Token).Set("nearest") - tex.CreateInput("enableMipMap", Sdf.ValueTypeNames.Bool).Set(False) - tex.CreateInput("fallback", Sdf.ValueTypeNames.Color4f).Set(Gf.Vec4f(0.69, 0.85, 1.0, 1.0)) - tex.CreateOutput("rgb", Sdf.ValueTypeNames.Float3) - tex.CreateOutput("a", Sdf.ValueTypeNames.Float) - - pbr = UsdShade.Shader.Define(stage, f"{mat_path}/PBRShader") - pbr.CreateIdAttr("UsdPreviewSurface") - pbr.CreateInput("diffuseColor", Sdf.ValueTypeNames.Color3f).ConnectToSource(tex.GetOutput("rgb")) - pbr.CreateInput("roughness", Sdf.ValueTypeNames.Float).Set(0.45) - pbr.CreateInput("metallic", Sdf.ValueTypeNames.Float).Set(0.0) - pbr.CreateOutput("surface", Sdf.ValueTypeNames.Token) - - material.CreateSurfaceOutput().ConnectToSource(pbr.GetOutput("surface")) - - return mat_path - - def _ensure_groundplane_material(self): - """Guarantee a ground-plane material exists (default checker) and return its path.""" - stage = get_current_stage() - mat_path = "/World/Looks/groundPlaneMaterial" - mat_prim = stage.GetPrimAtPath(mat_path) - if mat_prim.IsValid(): - return mat_path - - # create a temporary default ground plane to spawn the checker material, then delete the geom - tmp_gp_path = "/World/tmp_ground_for_material" - self._scene.add_default_ground_plane(z_position=-1000.0, - name="tmp_ground_for_material", - prim_path=tmp_gp_path, - static_friction=self._env_opts["static_friction"], - dynamic_friction=self._env_opts["dynamic_friction"], - restitution=self._env_opts["restitution"]) - - mat_prim = stage.GetPrimAtPath(mat_path) - prim_utils.delete_prim(tmp_gp_path) - - return mat_path if mat_prim.IsValid() else None - - def _apply_checker_material_to_terrain(self, terrain_root_path: str, material_path: str): - """Bind the checker material to all terrain prims (visual only).""" - - stage = get_current_stage() - mat_prim = stage.GetPrimAtPath(material_path) - if not mat_prim.IsValid(): - return - material = UsdShade.Material(mat_prim) - for prim in stage.Traverse(): - path = prim.GetPath().pathString - if not path.startswith(terrain_root_path): - continue - binding = UsdShade.MaterialBindingAPI.Apply(prim) - binding.Bind(material, UsdShade.Tokens.strongerThanDescendants) - - def _add_checker_overlay_plane(self, terrain_root_path: str, material_path: str): - """Create a thin visual-only mesh with UVs so the checker pattern shows up even on cube prims.""" - stage = get_current_stage() - plane_path = f"{terrain_root_path}/visual_checker" - plane_prim = stage.GetPrimAtPath(plane_path) - if plane_prim.IsValid(): - # if already exists, just (re)bind material - mat_prim = stage.GetPrimAtPath(material_path) - if mat_prim.IsValid(): - UsdShade.MaterialBindingAPI.Apply(plane_prim).Bind(UsdShade.Material(mat_prim), UsdShade.Tokens.strongerThanDescendants) - return - - # try to read base slab dimensions/position to size the overlay - slab_path = terrain_root_path + "_slab" - slab_prim = stage.GetPrimAtPath(slab_path) - center = Gf.Vec3f(0.0, 0.0, 0.0) - width = float(self._env_opts.get("ground_size", 50.0)) - length = width - thickness = 0.1 - if slab_prim.IsValid(): - xformable = UsdGeom.Xformable(slab_prim) - for op in xformable.GetOrderedXformOps(): - if op.GetOpType() == UsdGeom.XformOp.TypeTranslate: - center = Gf.Vec3f(op.Get()) - elif op.GetOpType() == UsdGeom.XformOp.TypeScale: - scale = op.Get() - width = float(scale[0]) - length = float(scale[1]) - thickness = float(scale[2]) - - half_w = 0.5 * width - half_l = 0.5 * length - z = center[2] + 0.5 * thickness + 1e-3 # slightly above the slab to avoid z-fighting - - plane = UsdGeom.Mesh.Define(stage, plane_path) - plane.CreatePointsAttr([ - Gf.Vec3f(center[0] - half_w, center[1] - half_l, z), - Gf.Vec3f(center[0] + half_w, center[1] - half_l, z), - Gf.Vec3f(center[0] + half_w, center[1] + half_l, z), - Gf.Vec3f(center[0] - half_w, center[1] + half_l, z), - ]) - plane.CreateFaceVertexCountsAttr([4]) - plane.CreateFaceVertexIndicesAttr([0, 1, 2, 3]) - plane.CreateDoubleSidedAttr(True) - - # increase tiling density; adjustable via env opt - uv_repeats = max(1, int((width / 10.0) * 3.0)) - primvars = UsdGeom.PrimvarsAPI(plane) - st = primvars.CreatePrimvar("st", Sdf.ValueTypeNames.TexCoord2fArray, UsdGeom.Tokens.vertex) - st.Set(Vt.Vec2fArray([ - Gf.Vec2f(0.0, 0.0), - Gf.Vec2f(uv_repeats, 0.0), - Gf.Vec2f(uv_repeats, uv_repeats), - Gf.Vec2f(0.0, uv_repeats), - ])) - st.SetInterpolation(UsdGeom.Tokens.vertex) - - mat_prim = stage.GetPrimAtPath(material_path) - if mat_prim.IsValid(): - material = UsdShade.Material(mat_prim) - UsdShade.MaterialBindingAPI.Apply(plane.GetPrim()).Bind(material, UsdShade.Tokens.strongerThanDescendants) - - def _add_checker_overlays_on_tiles(self, terrain_root_path: str, material_path: str): - """Add visual quads on top of each tile cube so the checker texture appears on raised steps.""" - stage = get_current_stage() - mat_prim = stage.GetPrimAtPath(material_path) - if not mat_prim.IsValid(): - return - material = UsdShade.Material(mat_prim) - for prim in stage.Traverse(): - path = prim.GetPath().pathString - if not path.startswith(terrain_root_path): - continue - if prim.GetTypeName() != "Cube": - continue - name = path.split("/")[-1] - if "wall" in name or name.endswith("_slab"): - continue - xformable = UsdGeom.Xformable(prim) - center = Gf.Vec3f(0.0, 0.0, 0.0) - scale = Gf.Vec3f(1.0, 1.0, 1.0) - for op in xformable.GetOrderedXformOps(): - if op.GetOpType() == UsdGeom.XformOp.TypeTranslate: - center = Gf.Vec3f(op.Get()) - elif op.GetOpType() == UsdGeom.XformOp.TypeScale: - scale = Gf.Vec3f(op.Get()) - width, length, height = float(scale[0]), float(scale[1]), float(scale[2]) - half_w = 0.5 * width - half_l = 0.5 * length - z = center[2] + 0.5 * height + 1e-3 - overlay_path = f"{path}_checker" - if stage.GetPrimAtPath(overlay_path).IsValid(): - UsdShade.MaterialBindingAPI.Apply(stage.GetPrimAtPath(overlay_path)).Bind(material, UsdShade.Tokens.strongerThanDescendants) - continue - mesh = UsdGeom.Mesh.Define(stage, overlay_path) - mesh.CreatePointsAttr([ - Gf.Vec3f(center[0] - half_w, center[1] - half_l, z), - Gf.Vec3f(center[0] + half_w, center[1] - half_l, z), - Gf.Vec3f(center[0] + half_w, center[1] + half_l, z), - Gf.Vec3f(center[0] - half_w, center[1] + half_l, z), - ]) - mesh.CreateFaceVertexCountsAttr([4]) - mesh.CreateFaceVertexIndicesAttr([0, 1, 2, 3]) - mesh.CreateDoubleSidedAttr(True) - uv_repeats = max(1, int((width / 10.0) * float(self._env_opts.get("checker_uv_density", 3.0)))) - primvars = UsdGeom.PrimvarsAPI(mesh) - st = primvars.CreatePrimvar("st", Sdf.ValueTypeNames.TexCoord2fArray, UsdGeom.Tokens.vertex) - st.Set(Vt.Vec2fArray([ - Gf.Vec2f(0.0, 0.0), - Gf.Vec2f(uv_repeats, 0.0), - Gf.Vec2f(uv_repeats, uv_repeats), - Gf.Vec2f(0.0, uv_repeats), - ])) - st.SetInterpolation(UsdGeom.Tokens.vertex) - UsdShade.MaterialBindingAPI.Apply(mesh.GetPrim()).Bind(material, UsdShade.Tokens.strongerThanDescendants) - - def _is_link(self, prim): - return prim.GetTypeName() == 'Xform' - - def _is_joint(self, prim): - return prim.GetTypeName() == 'PhysicsRevoluteJoint' - - def _create_collision_group(self, group_path, link_paths): - """ - Create a collision group under the given group_path that contains the links. - Args: - group_path (str): Path to create the collision group. - link_paths (List[str]): List of link paths to include in this group. - """ - collision_group = Sdf.PrimSpec( - self._stage.GetRootLayer().GetPrimAtPath(group_path), - group_path.split("/")[-1], - Sdf.SpecifierDef, - "PhysicsCollisionGroup" - ) - # Add the links to the collision group - for link_path in link_paths: - includes_rel = Sdf.RelationshipSpec(collision_group, "collection:colliders:includes", False) - includes_rel.targetPathList.Append(link_path) - - def _add_collision_filter(self, group_path, link1, link2): - """ - Filters collision between two successive links. - - Args: - group_path (str): Path of the collision group. - link1 (str): Path of the first link. - link2 (str): Path of the second link. - """ - # Create a relationship to filter collisions between the two links - filtered_groups = Sdf.RelationshipSpec( - self._stage.GetPrimAtPath(group_path), - "physics:filteredGroups", - False - ) - filtered_groups.targetPathList.Append(link1) - filtered_groups.targetPathList.Append(link2) - - def _render_sim(self, mode="human"): - - if mode == "human": - # follow requested robot/env - if self._env_opts["use_follow_camera"]: - ridx = int(self._env_opts["render_follow_robot_idx"]) - eidx = int(self._env_opts["render_follow_env_idx"]) - if ridx < len(self._robot_names) and eidx < self._num_envs: - rname = self._robot_names[ridx] - pos = self._root_p.get(rname, None) - if pos is not None and pos.shape[0] > eidx: - base = pos[eidx].detach().cpu() - offset = torch.as_tensor(self._env_opts["render_follow_offset"], - device=base.device, dtype=base.dtype) - target_offset = torch.as_tensor(self._env_opts["render_follow_target_offset"], - device=base.device, dtype=base.dtype) - quat = self._root_q.get(rname, None) - if quat is not None and quat.shape[0] > eidx: - q = quat[eidx].detach().cpu() - w, x, y, z = q.unbind(-1) - yaw = torch.atan2(2 * (w * z + x * y), 1 - 2 * (y * y + z * z)) - cy, sy = torch.cos(yaw), torch.sin(yaw) - rot = torch.tensor([[cy, -sy], [sy, cy]], device=base.device, dtype=base.dtype) - offset_xy = torch.matmul(rot, offset[:2]) - target_offset_xy = torch.matmul(rot, target_offset[:2]) - offset = torch.stack((offset_xy[0], offset_xy[1], offset[2])) - target_offset = torch.stack((target_offset_xy[0], target_offset_xy[1], target_offset[2])) - eye = (base + offset).tolist() - target = (base + target_offset).tolist() - set_camera_view(eye=eye, target=target, camera_prim_path=self._env_opts["camera_prim_path"]) - - self._world.render() - # optional height grid visualization - if self._env_opts["enable_height_vis"]: - for robot_name, vis in self._height_vis.items(): - # use latest stored states - if robot_name not in self._height_imgs or robot_name not in self._height_sensors: - continue - heights = self._height_imgs.get(robot_name, None) - if heights is None or heights.numel() == 0: - continue - pos_src = self._root_p.get(robot_name, None) - quat_src = self._root_q.get(robot_name, None) - if pos_src is None or quat_src is None: - continue - step = self._height_vis_step.get(robot_name, 0) - period = max(1, int(self._env_opts["height_vis_update_period"])) - if step % period == 0: - try: - vis.update( - base_positions=pos_src, - base_quats=quat_src, - heights=heights) - except Exception as exc: - print(f"[height_vis] update failed for {robot_name}: {exc}") - self._height_vis_step[robot_name] = step + 1 - return None - elif mode == "rgb_array": - # check if viewport is enabled -- if not, then complain because we won't get any data - if not self._render or not self._record: - exception = f"Cannot render '{mode}' when rendering is not enabled. Please check the provided" + \ - "arguments to the environment class at initialization." - Journal.log(self.__class__.__name__, - "__init__", - exception, - LogType.EXCEP, - throw_when_excep = True) - # obtain the rgb data - rgb_data = self._rgb_annotator.get_data() - # convert to numpy array - rgb_data = np.frombuffer(rgb_data, dtype=np.uint8).reshape(*rgb_data.shape) - # return the rgb data - return rgb_data[:, :, :3] - else: - return None - - def _create_viewport_render_product(self, resolution=None): - """Create a render product of the viewport for rendering.""" - - try: - - # create render product - camera_prim = self._env_opts["camera_prim_path"] - res = resolution - if res is None: - res = tuple(int(x) for x in self._env_opts["render_resolution"]) - self._render_product = rep.create.render_product(camera_prim, res) - # create rgb annotator -- used to read data from the render product - self._rgb_annotator = rep.AnnotatorRegistry.get_annotator("rgb", device="cpu") - self._rgb_annotator.attach([self._render_product]) - self._record = True - except Exception as e: - carb.log_info("omni.replicator.core could not be imported. Skipping creation of render product.") - carb.log_info(str(e)) - - def _close(self): - if self._simulation_app.is_running(): - self._simulation_app.close() - - def _step_world(self): - self._world.step(render=False, step_sim=True) - - if (self._render) and (self._render_step_counter%self._env_opts["rendering_freq"]==0): - # if self._env_opts["render_to_file"]: - # rep.orchestrator.step() - self._render_sim() # manually trigger rendering (World.step(render=True) for some reason - # will step the simulation for a dt==rendering_dt) - self._render_step_counter += 1 - - def _generate_jnt_imp_control(self, robot_name: str): - - jnt_imp_controller = OmniJntImpCntrl(articulation=self._robots_articulations[robot_name], - device=self._device, - dtype=self._dtype, - enable_safety=True, - urdf_path=self._urdf_dump_paths[robot_name], - config_path=self._jnt_imp_config_paths[robot_name], - enable_profiling=False, - debug_checks=self._debug, - override_art_controller=self._override_low_lev_controller) - - return jnt_imp_controller - - def _reset_sim(self): - self._world.reset(soft=False) - - def _reset_state(self, - robot_name: str, - env_indxs: torch.Tensor = None, - randomize: bool = False): - - if env_indxs is not None: - if self._debug: - if self._use_gpu: - if not env_indxs.device.type == "cuda": - error = "Provided env_indxs should be on GPU!" - Journal.log(self.__class__.__name__, - "_step_jnt_imp_control", - error, - LogType.EXCEP, - True) - else: - if not env_indxs.device.type == "cpu": - error = "Provided env_indxs should be on CPU!" - Journal.log(self.__class__.__name__, - "_step_jnt_imp_control", - error, - LogType.EXCEP, - True) - if randomize: - self._randomize_yaw(robot_name=robot_name,env_indxs=env_indxs) - - # root q - self._robots_art_views[robot_name].set_world_poses(positions = self._root_p_default[robot_name][env_indxs, :], - orientations=self._root_q_default[robot_name][env_indxs, :], - indices = env_indxs) - # jnts q - self._robots_art_views[robot_name].set_joint_positions(positions = self._jnts_q_default[robot_name][env_indxs, :], - indices = env_indxs) - # root v and omega - self._robots_art_views[robot_name].set_joint_velocities(velocities = self._jnts_v_default[robot_name][env_indxs, :], - indices = env_indxs) - # jnts v - concatenated_vel = torch.cat((self._root_v_default[robot_name][env_indxs, :], - self._root_omega_default[robot_name][env_indxs, :]), dim=1) - self._robots_art_views[robot_name].set_velocities(velocities = concatenated_vel, - indices = env_indxs) - # jnts eff - self._robots_art_views[robot_name].set_joint_efforts(efforts = self._jnts_eff_default[robot_name][env_indxs, :], - indices = env_indxs) - self._reset_perturbations(robot_name=robot_name, env_indxs=env_indxs) - else: - - if randomize: - self._randomize_yaw(robot_name=robot_name,env_indxs=None) - - # root q - self._robots_art_views[robot_name].set_world_poses(positions = self._root_p_default[robot_name][:, :], - orientations=self._root_q_default[robot_name][:, :], - indices = None) - # jnts q - self._robots_art_views[robot_name].set_joint_positions(positions = self._jnts_q_default[robot_name][:, :], - indices = None) - # root v and omega - self._robots_art_views[robot_name].set_joint_velocities(velocities = self._jnts_v_default[robot_name][:, :], - indices = None) - # jnts v - concatenated_vel = torch.cat((self._root_v_default[robot_name][:, :], - self._root_omega_default[robot_name][:, :]), dim=1) - self._robots_art_views[robot_name].set_velocities(velocities = concatenated_vel, - indices = None) - # jnts eff - self._robots_art_views[robot_name].set_joint_efforts(efforts = self._jnts_eff_default[robot_name][:, :], - indices = None) - self._reset_perturbations(robot_name=robot_name, env_indxs=None) - - # we update the robots state - self._read_root_state_from_robot(env_indxs=env_indxs, - robot_name=robot_name) - self._read_jnts_state_from_robot(env_indxs=env_indxs, - robot_name=robot_name) - - def _reset_perturbations(self, robot_name: str, env_indxs: torch.Tensor = None): - """Clear perturbation state and wrenches for selected envs.""" - if robot_name not in self._pert_active: - return - if env_indxs is None: - self._pert_active[robot_name].zero_() - self._pert_steps_remaining[robot_name].zero_() - self._pert_forces_world[robot_name].zero_() - self._pert_torques_world[robot_name].zero_() - self._pert_det_counter[robot_name].zero_() - else: - self._pert_active[robot_name][env_indxs] = False - self._pert_steps_remaining[robot_name][env_indxs] = 0 - self._pert_forces_world[robot_name][env_indxs, :] = 0 - self._pert_torques_world[robot_name][env_indxs, :] = 0 - self._pert_det_counter[robot_name][env_indxs] = 0 - - def _process_perturbations(self): - - # Iterate over each robot view - for i in range(len(self._robot_names)): - robot_name = self._robot_names[i] - - # Pre-fetch views for code clarity (references, not copies) - active = self._pert_active[robot_name] - steps_rem = self._pert_steps_remaining[robot_name] - forces_world = self._pert_forces_world[robot_name] - torques_world = self._pert_torques_world[robot_name] - planar_only = self._env_opts["pert_planar_only"] if "pert_planar_only" in self._env_opts else False - - # --- 1. Update Active Counters (In-Place) --- - if active.any(): - # In-place subtraction - steps_rem[active] -= 1 - - # --- 2. Reset Finished Perturbations (In-Place) --- - # Logic: Active AND (Steps <= 0) - # Note: Creating 'newly_ended' boolean mask is a tiny unavoidable allocation - newly_ended = active & (steps_rem <= 0) - - if newly_ended.any(): - # Use masked_fill_ for in-place zeroing - active.masked_fill_(newly_ended, False) - forces_world[newly_ended, :]=0 - torques_world[newly_ended, :]=0 - steps_rem.masked_fill_(newly_ended, 0) - - # --- 3. Trigger New Perturbations --- - - det_rate = self._env_opts["det_pert_rate"] - if det_rate: - # deterministic spacing: count physics steps and trigger when threshold reached (if not already active) - det_counter = self._pert_det_counter[robot_name] - det_counter += 1 - trigger_mask = (det_counter >= self._pert_det_steps) & (~active) - else: - # Reuse scratch buffer for probability check - # Assumes self._pert_scratch is (num_envs, 1) pre-allocated - self._pert_scratch[robot_name].uniform_(0.0, 1.0) # used for triggering new perturbations - # Check probs against threshold - # Flatten scratch to (N,) to match 'active' mask - trigger_mask = (self._pert_scratch[robot_name].flatten() < self._pert_wrenches_prob) & (~active) - - if trigger_mask.any(): - - # Cache weights (references) - weight = self._weights[robot_name] - - # we now treat the configured "max_*_impulse_*" as the maximum **impulse** (N·s or N·m·s), - # and convert impulse -> force/torque by dividing by the sampled duration (seconds). - # Use per-robot weight scaling as before. - lin_impulse_max = self._env_opts["max_lin_impulse_norm"] * weight - ang_impulse_max = self._env_opts["max_ang_impulse_norm"] * weight - - # --- Force (Impulse) Direction Generation (Reuse _pert_lindir buffer) --- - lindir = self._pert_lindir[robot_name] # (N, 3) - - if planar_only: - # planar push direction from random yaw - yaw_angles = self._pert_scratch[robot_name].uniform_(0.0, 2*math.pi).flatten() - lindir[:, 0] = torch.cos(yaw_angles) - lindir[:, 1] = torch.sin(yaw_angles) - lindir[:, 2] = 0.0 - else: - # 1. Fill with Standard Normal noise in-place - lindir.normal_() - # 2. Normalize in-place - norms = torch.norm(lindir, dim=1, keepdim=True).clamp_min_(1e-6) - lindir.div_(norms) - - # 3. Sample linear impulse magnitudes (reuse scratch) - # scratch has shape (N,1) - uniform [0,1] - self._pert_scratch[robot_name].uniform_(self._env_opts["lin_impulse_mag_min"], self._env_opts["lin_impulse_mag_max"]) - # impulse vectors = unit_dir * (rand * lin_impulse_max) - - lindir.mul_(self._pert_scratch[robot_name] * lin_impulse_max) # now contains linear impulses (N,3) - - # --- Angular (Impulse) Direction Generation (Reuse _pert_angdir buffer) --- - angdir = self._pert_angdir[robot_name] # (N, 3) - - if planar_only: - angdir.zero_() # no torque when planar-only is requested - else: - # 1. Fill with Standard Normal noise - angdir.normal_() - # 2. Normalize - norms = torch.norm(angdir, dim=1, keepdim=True).clamp_min_(1e-6) - angdir.div_(norms) - - # 3. Sample angular impulse magnitudes (reuse scratch) - self._pert_scratch[robot_name].uniform_(0.0, 1.0) - angdir.mul_(self._pert_scratch[robot_name] * ang_impulse_max) # now contains angular impulses (N,3) - - # --- Duration Generation (Reuse _pert_durations) --- - # Keep integer steps sampling (same shape/device/dtype) - self._pert_durations[robot_name] = torch.randint_like( - self._pert_durations[robot_name], - low=self._pert_min_steps, - high=self._pert_max_steps + 1 - ) - - # --- convert to float - duration_steps = self._pert_durations[robot_name].to(dtype=lindir.dtype, device=lindir.device) - # duration in seconds (shape (N,)) - duration_seconds = duration_steps * self.physics_dt() - # avoid divide-by-zero - duration_seconds = duration_seconds.clamp_min_(1e-6) - - # compute per-step forces/torques = impulse / duration_seconds - # lindir currently holds linear impulses, angdir holds angular impulses - forces_to_apply = lindir / duration_seconds - torques_to_apply = angdir / duration_seconds - - # Optional clipping based on robot weight (min/max) - f_norm = torch.norm(forces_to_apply, dim=1, keepdim=True).clamp_min_(1e-9) - target_norm = f_norm - - f_clip_scale_max = self._env_opts["pert_force_max_weight_scale"] - if f_clip_scale_max > 0.0: - force_max = self._weights[robot_name] * f_clip_scale_max # (N,1) - target_norm = torch.minimum(target_norm, force_max) - - f_clip_scale_min = self._env_opts["pert_force_min_weight_scale"] - if f_clip_scale_min > 0.0: - force_min = self._weights[robot_name] * f_clip_scale_min - target_norm = torch.maximum(target_norm, force_min) - - forces_to_apply = forces_to_apply * (target_norm / f_norm) - - t_clip_scale = self._env_opts["pert_torque_max_weight_scale"] - if t_clip_scale > 0.0: - torque_max = self._weights[robot_name] * self._env_opts["max_ang_impulse_lever"] * t_clip_scale - t_norm = torch.norm(torques_to_apply, dim=1, keepdim=True).clamp_min_(1e-9) - t_scale = torch.minimum(torch.ones_like(t_norm), torque_max / t_norm) - torques_to_apply = torques_to_apply * t_scale - - # --- Update State Buffers --- - # Use boolean indexing to scatter only triggered values - active[trigger_mask] = True - steps_rem[trigger_mask] = self._pert_durations[robot_name][trigger_mask, :].flatten() - forces_world[trigger_mask, :] = forces_to_apply[trigger_mask, :] - torques_world[trigger_mask, :] = torques_to_apply[trigger_mask, :] - if det_rate: - det_counter[trigger_mask] = 0 - - # --- 4. Apply Wrenches (Vectorized) --- - # Only call API if there are active perturbations to minimize overhead - - forces_world[~active, :]=0 - torques_world[~active, :]=0 - - self._blink_rigid_prim_views[robot_name].apply_forces_and_torques_at_pos( - forces=forces_world, - torques=torques_world, - positions=None, # body frame origin - is_global=True - ) - - @override - def _pre_step(self): - - if self._env_opts["use_random_pertub"]: - self._process_perturbations() - - success=super()._pre_step() - - return success - - @override - def _pre_step_db(self): - - if self._env_opts["use_random_pertub"]: - self._process_perturbations() - - success=super()._pre_step_db() - - return success - - @override - def _update_contact_state(self, - robot_name: str, - env_indxs: torch.Tensor = None): - - super()._update_contact_state(robot_name, env_indxs) - - if self._env_opts["use_random_pertub"]: - # write APPLIED perturbations to root wrench (mainly for debug) - self.cluster_servers[robot_name].get_state().contact_wrenches_root.set(data=self._pert_forces_world[robot_name][env_indxs, :], - data_type="f", - contact_name="root", - robot_idxs = env_indxs, - gpu=self._use_gpu) - self.cluster_servers[robot_name].get_state().contact_wrenches_root.set(data=self._pert_torques_world[robot_name][env_indxs, :], - data_type="t", - contact_name="root", - robot_idxs = env_indxs, - gpu=self._use_gpu) - - def _post_warmup_validation(self, robot_name: str): - """Validate warmup state: base height, tilt, and (if available) contacts.""" - envs = torch.arange(self._num_envs, device=self._device) - - # terrain height query - def _terrain_height(xy: torch.Tensor): - if self._env_opts["use_flat_ground"] or self.terrain_generator is None: - return torch.zeros((xy.shape[0],), device=xy.device, dtype=self._dtype) - heights = [] - half_extent = self._env_opts["spawn_height_check_half_extent"] - for k in range(xy.shape[0]): - h = self.terrain_generator.get_max_height_in_rect( - float(xy[k, 0]), float(xy[k, 1]), half_extent=half_extent) - heights.append(h) - return torch.as_tensor(heights, device=xy.device, dtype=self._dtype) - - # base height check - base_xy = self._root_p[robot_name][:, 0:2] - base_z = self._root_p[robot_name][:, 2] - ground_z = _terrain_height(base_xy) - margin = float(self._env_opts["spawn_height_cushion"]) - bad_z = base_z < (ground_z + margin) - - # tilt check (angle between base up and world up) - q = self._root_q[robot_name] - # quaternion to up vector - w, x, y, z = q[:, 0], q[:, 1], q[:, 2], q[:, 3] - up = torch.stack([ - 2 * (x * z - w * y), - 2 * (y * z + w * x), - 1 - 2 * (x * x + y * y) - ], dim=1) - cos_tilt = up[:, 2].clamp(-1.0, 1.0) - tilt_thresh_deg = 35.0 - bad_tilt = cos_tilt < math.cos(math.radians(tilt_thresh_deg)) - - # contact check (only if sensors exist) - # bad_contact = torch.zeros_like(base_z, dtype=torch.bool) - # if robot_name in self.omni_contact_sensors and self.omni_contact_sensors[robot_name] is not None: - # counts = torch.zeros((self._num_envs,), dtype=torch.int32, device=self._device) - # for link in self._contact_names.get(robot_name, []): - # f_contact = self._get_contact_f(robot_name=robot_name, - # contact_link=link, - # env_indxs=None) - # if f_contact is None: - # continue - # # use normal component (assume z-up); ignore tangential forces - # active = f_contact[:, 2] > 1e-3 - # counts += active.int() - # bad_contact = counts < 3 - - failing = torch.nonzero(bad_z | bad_tilt, as_tuple=False).flatten() - if failing.numel() > 0: - # remediate: lift to terrain+margin, upright (preserve yaw), zero root velocities - # yaw = torch.atan2(2 * (w * z + x * y), 1 - 2 * (y * y + z * z)) - # safe_z = (ground_z + margin)[failing] - # self._root_p[robot_name][failing, 2] = safe_z - # cos_h = torch.cos(yaw[failing] / 2) - # sin_h = torch.sin(yaw[failing] / 2) - # upright = torch.zeros((failing.shape[0], 4), device=self._device, dtype=self._dtype) - # upright[:, 0] = cos_h - # upright[:, 3] = sin_h - # self._root_q[robot_name][failing, :] = upright - # self._root_v[robot_name][failing, :] = 0.0 - # self._root_omega[robot_name][failing, :] = 0.0 - - msgs = [] # print db message - if bad_z.any(): - msgs.append(f"low_z envs {torch.nonzero(bad_z, as_tuple=False).flatten().tolist()}") - if bad_tilt.any(): - msgs.append(f"tilt envs {torch.nonzero(bad_tilt, as_tuple=False).flatten().tolist()}") - Journal.log(self.__class__.__name__, - "_post_warmup_validation", - f"Warmup validation failures for {robot_name}: " + "; ".join(msgs), - LogType.WARN, - throw_when_excep=False) - return failing - - def _import_urdf(self, - robot_name: str, - fix_base = False, - self_collide = False, - merge_fixed = True): - - import_config=_urdf.ImportConfig() - # status,import_config=omni_kit.commands.execute("URDFCreateImportConfig") - - Journal.log(self.__class__.__name__, - "update_root_offsets", - "importing robot URDF", - LogType.STAT, - throw_when_excep = True) - _urdf.acquire_urdf_interface() - # we overwrite some settings which are bound to be fixed - import_config.merge_fixed_joints = merge_fixed # makes sim more stable - # in case of fixed joints with light objects - import_config.import_inertia_tensor = True - # import_config.convex_decomp = False - import_config.fix_base = fix_base - import_config.self_collision = self_collide - # import_config.distance_scale = 1 - # import_config.make_default_prim = True - # import_config.create_physics_scene = True - # import_config.default_drive_strength = 1047.19751 - # import_config.default_position_drive_damping = 52.35988 - # import_config.default_drive_type = _urdf.UrdfJointTargetType.JOINT_DRIVE_POSITION - # import URDF - success, robot_prim_path_default = omni_kit.commands.execute( - "URDFParseAndImportFile", - urdf_path=self._urdf_dump_paths[robot_name], - import_config=import_config, - # get_articulation_root=True, - ) - - robot_base_prim_path = self._env_opts["template_env_ns"] + "/" + robot_name - - if success: - Journal.log(self.__class__.__name__, - "_import_urdf", - "Successfully importedf URDF into IsaacSim", - LogType.STAT) - else: - Journal.log(self.__class__.__name__, - "_import_urdf", - "Failed to import URDF into IsaacSim", - LogType.EXCEP, - throw_when_excep = True) - - # moving default prim to base prim path for cloning - move_prim(robot_prim_path_default, # from - robot_base_prim_path) # to - - robot_base_prim = prim_utils.get_prim_at_path(robot_base_prim_path) - children = prim_utils.get_prim_children(robot_base_prim) - # log imported prim children to the journal (print was getting truncated in logs) - Journal.log(self.__class__.__name__, - "_import_urdf", - f"Imported robot URDF children: {children}", - LogType.STAT) - - # improve collision tesselation for cylinders (e.g., wheels) if requested - # self._apply_collision_refinement(robot_base_prim_path, - # self._env_opts["collision_refinement_level"]) - - return success - - def _apply_collision_refinement(self, robot_base_prim_path: str, refinement_level: int): - """Set refinement level on collision cylinders to avoid coarse faceting.""" - if refinement_level is None: - return - stage = get_current_stage() - coll_prefix = robot_base_prim_path + "/collisions" - count = 0 - for prim in stage.Traverse(): - if not prim.IsValid(): - continue - path_str = prim.GetPath().pathString - if not path_str.startswith(coll_prefix): - continue - if prim.GetTypeName() == "Cylinder": - attr = prim.GetAttribute("refinementLevel") - if not attr.IsValid(): - attr = prim.CreateAttribute("refinementLevel", Sdf.ValueTypeNames.Int) - attr.Set(int(refinement_level)) - count += 1 - Journal.log(self.__class__.__name__, - "_apply_collision_refinement", - f"Applied refinement level {refinement_level} to {count} cylinder collision prims under {coll_prefix}", - LogType.STAT) - - def apply_collision_filters(self, - physicscene_path: str, - coll_root_path: str): - - self._cloner.filter_collisions(physicsscene_path = physicscene_path, - collision_root_path = coll_root_path, - prim_paths=self._envs_prim_paths, - global_paths=self._ground_plane_prim_paths # can collide with these prims - ) - - def _read_root_state_from_robot(self, - robot_name: str, - env_indxs: torch.Tensor = None): - - self._get_root_state(numerical_diff=self._env_opts["use_diff_vels"], - env_indxs=env_indxs, - robot_name=robot_name) - - # height grid sensor readout - if robot_name in self._height_sensors: - pos_src = self._root_p[robot_name] if env_indxs is None else self._root_p[robot_name][env_indxs] - quat_src = self._root_q[robot_name] if env_indxs is None else self._root_q[robot_name][env_indxs] - heights = self._height_sensors[robot_name].read(pos_src, quat_src) - if self._env_opts["use_flat_ground"]: - heights.zero_() - if env_indxs is None: - self._height_imgs[robot_name] = heights - else: - # clone to avoid overlapping write/read views - self._height_imgs[robot_name][env_indxs] = heights.clone() - - # print("height image") - # print(self._height_imgs[robot_name][0, :, : ]) - - def _read_jnts_state_from_robot(self, - robot_name: str, - env_indxs: torch.Tensor = None): - - self._get_robots_jnt_state( - numerical_diff=self._env_opts["use_diff_vels"], - env_indxs=env_indxs, - robot_name=robot_name) - - def _get_root_state(self, - robot_name: str, - env_indxs: torch.Tensor = None, - numerical_diff: bool = False, - base_loc: bool = True): - - # reading = self._is.get_sensor_reading("/World/Cube/Imu_Sensor", - # use_latest_data = True) - - dt=self._cluster_dt[robot_name] # getting diff state always at cluster rate - - # measurements from simulator are in world frame - if env_indxs is not None: - - pose = self._robots_art_views[robot_name].get_world_poses( - clone = True, - indices=env_indxs) # tuple: (pos, quat), quat is [w, i, j, k] in Isaac4.2 - - self._root_p[robot_name][env_indxs, :] = pose[0] - - going_to_fly=self._root_p[robot_name][env_indxs, 0:2]>(self._env_opts["terrain_border"]-0.1) - if going_to_fly.any(): - flying=going_to_fly.sum().item() - warn = f"N. {flying} robots ({robot_name}) are about to go out of the terrain!!" - Journal.log(self.__class__.__name__, - "_get_root_state", - warn, - LogType.WARN, - throw_when_excep = True) - self._root_q[robot_name][env_indxs, :] = pose[1] # root orientation - if not numerical_diff: - # we get velocities from the simulation. This is not good since - # these can actually represent artifacts which do not have physical meaning. - # It's better to obtain them by differentiation to avoid issues with controllers, etc... - self._root_v[robot_name][env_indxs, :] = self._robots_art_views[robot_name].get_linear_velocities( - clone = True, - indices=env_indxs) # root lin. velocity - self._root_omega[robot_name][env_indxs, :] = self._robots_art_views[robot_name].get_angular_velocities( - clone = True, - indices=env_indxs) # root ang. velocity - - # for now obtain root a numerically - self._root_a[robot_name][env_indxs, :] = (self._root_v[robot_name][env_indxs, :] - \ - self._root_v_prev[robot_name][env_indxs, :]) / dt - self._root_alpha[robot_name][env_indxs, :] = (self._root_omega[robot_name][env_indxs, :] - \ - self._root_omega_prev[robot_name][env_indxs, :]) / dt - - self._root_v_prev[robot_name][env_indxs, :] = self._root_v[robot_name][env_indxs, :] - self._root_omega_prev[robot_name][env_indxs, :] = self._root_omega[robot_name][env_indxs, :] - else: - # differentiate numerically - self._root_v[robot_name][env_indxs, :] = (self._root_p[robot_name][env_indxs, :] - \ - self._root_p_prev[robot_name][env_indxs, :]) / dt - self._root_omega[robot_name][env_indxs, :] = quat_to_omega( - self._root_q_prev[robot_name][env_indxs, :], - self._root_q[robot_name][env_indxs, :], - dt) - - self._root_a[robot_name][env_indxs, :] = (self._root_v[robot_name][env_indxs, :] - \ - self._root_v_prev[robot_name][env_indxs, :]) / dt - self._root_alpha[robot_name][env_indxs, :] = (self._root_omega[robot_name][env_indxs, :] - \ - self._root_omega_prev[robot_name][env_indxs, :]) / dt - - # update "previous" data for numerical differentiation - self._root_p_prev[robot_name][env_indxs, :] = self._root_p[robot_name][env_indxs, :] - self._root_q_prev[robot_name][env_indxs, :] = self._root_q[robot_name][env_indxs, :] - self._root_v_prev[robot_name][env_indxs, :] = self._root_v[robot_name][env_indxs, :] - self._root_omega_prev[robot_name][env_indxs, :] = self._root_omega[robot_name][env_indxs, :] - self._track_terrain_hits(robot_name=robot_name, env_indxs=env_indxs) - - else: - # updating data for all environments - pose = self._robots_art_views[robot_name].get_world_poses( - clone = True) # tuple: (pos, quat) - self._root_p[robot_name][:, :] = pose[0] - self._root_q[robot_name][:, :] = pose[1] # root orientation - - going_to_fly=self._root_p[robot_name][:, 0:2]>(self._env_opts["terrain_border"]-0.1) - if going_to_fly.any(): - flying=going_to_fly.sum().item() - warn = f"N. {flying} robots ({robot_name}) are about to go out of the terrain!!" - Journal.log(self.__class__.__name__, - "_get_root_state", - warn, - LogType.WARN, - throw_when_excep = True) - - if not numerical_diff: - # we get velocities from the simulation. This is not good since - # these can actually represent artifacts which do not have physical meaning. - # It's better to obtain them by differentiation to avoid issues with controllers, etc... - self._root_v[robot_name][:, :] = self._robots_art_views[robot_name].get_linear_velocities( - clone = True) # root lin. velocity - self._root_omega[robot_name][:, :] = self._robots_art_views[robot_name].get_angular_velocities( - clone = True) # root ang. velocity - - self._root_a[robot_name][:, :] = (self._root_v[robot_name][:, :] - \ - self._root_v_prev[robot_name][:, :]) / dt - self._root_alpha[robot_name][:, :] = (self._root_omega[robot_name][:, :] - \ - self._root_omega_prev[robot_name][:, :]) / dt - - self._root_v_prev[robot_name][:, :] = self._root_v[robot_name][:, :] - self._root_omega_prev[robot_name][:, :] = self._root_omega[robot_name][:, :] - else: - # differentiate numerically - self._root_v[robot_name][:, :] = (self._root_p[robot_name][:, :] - \ - self._root_p_prev[robot_name][:, :]) / dt - self._root_omega[robot_name][:, :] = quat_to_omega(self._root_q_prev[robot_name][:, :], - self._root_q[robot_name][:, :], - dt) - - self._root_a[robot_name][:, :] = (self._root_v[robot_name][:, :] - \ - self._root_v_prev[robot_name][:, :]) / dt - self._root_alpha[robot_name][:, :] = (self._root_omega[robot_name][:, :] - \ - self._root_omega_prev[robot_name][:, :]) / dt - - # update "previous" data for numerical differentiation - self._root_p_prev[robot_name][:, :] = self._root_p[robot_name][:, :] - self._root_q_prev[robot_name][:, :] = self._root_q[robot_name][:, :] - self._root_v_prev[robot_name][:, :] = self._root_v[robot_name][:, :] - self._root_omega_prev[robot_name][:, :] = self._root_omega[robot_name][:, :] - self._track_terrain_hits(robot_name=robot_name, env_indxs=None) - - if base_loc: - # rotate robot twist in base local - twist_w=torch.cat((self._root_v[robot_name], - self._root_omega[robot_name]), - dim=1) - twist_base_loc=torch.cat((self._root_v_base_loc[robot_name], - self._root_omega_base_loc[robot_name]), - dim=1) - world2base_frame(t_w=twist_w,q_b=self._root_q[robot_name],t_out=twist_base_loc) - self._root_v_base_loc[robot_name]=twist_base_loc[:, 0:3] - self._root_omega_base_loc[robot_name]=twist_base_loc[:, 3:6] - - # rotate robot a in base local - a_w=torch.cat((self._root_a[robot_name], - self._root_alpha[robot_name]), - dim=1) - a_base_loc=torch.cat((self._root_a_base_loc[robot_name], - self._root_alpha_base_loc[robot_name]), - dim=1) - world2base_frame(t_w=a_w,q_b=self._root_q[robot_name],t_out=a_base_loc) - self._root_a_base_loc[robot_name]=a_base_loc[:, 0:3] - self._root_alpha_base_loc[robot_name]=a_base_loc[:, 3:6] - - # rotate gravity in base local - world2base_frame3D(v_w=self._gravity_normalized[robot_name],q_b=self._root_q[robot_name], - v_out=self._gravity_normalized_base_loc[robot_name]) - - def _get_robots_jnt_state(self, - robot_name: str, - env_indxs: torch.Tensor = None, - numerical_diff: bool = False): - - dt= self.physics_dt() if self._override_low_lev_controller else self._cluster_dt[robot_name] - - # measurements from simulator are in world frame - if env_indxs is not None: - - self._jnts_q[robot_name][env_indxs, :] = self._robots_art_views[robot_name].get_joint_positions( - clone = True, - indices=env_indxs) # joint positions - if not numerical_diff: - self._jnts_v[robot_name][env_indxs, :] = self._robots_art_views[robot_name].get_joint_velocities( - clone = True, - indices=env_indxs) # joint velocities - else: - # differentiate numerically - self._jnts_v[robot_name][env_indxs, :] = (self._jnts_q[robot_name][env_indxs, :] - \ - self._jnts_q_prev[robot_name][env_indxs, :]) / dt - # update "previous" data for numerical differentiation - self._jnts_q_prev[robot_name][env_indxs, :] = self._jnts_q[robot_name][env_indxs, :] - - self._jnts_eff[robot_name][env_indxs, :] = self._robots_art_views[robot_name].get_measured_joint_efforts( - clone = True, - joint_indices=None, - indices=env_indxs) # measured joint efforts (computed by joint force solver) - - else: - self._jnts_q[robot_name][:, :] = self._robots_art_views[robot_name].get_joint_positions( - clone = True) # joint positions - if not numerical_diff: - self._jnts_v[robot_name][:, :] = self._robots_art_views[robot_name].get_joint_velocities( - clone = True) # joint velocities - else: - self._jnts_v[robot_name][:, :] = (self._jnts_q[robot_name][:, :] - \ - self._jnts_q_prev[robot_name][:, :]) / dt - - self._jnts_q_prev[robot_name][:, :] = self._jnts_q[robot_name][:, :] - - self._jnts_eff[robot_name][env_indxs, :] = self._robots_art_views[robot_name].get_measured_joint_efforts( - clone = True) # measured joint efforts (computed by joint force solver) - - def _get_contact_f(self, - robot_name: str, - contact_link: str, - env_indxs: torch.Tensor) -> torch.Tensor: - - if self.omni_contact_sensors[robot_name] is not None: - return self.omni_contact_sensors[robot_name].get(dt=self.physics_dt(), - contact_link=contact_link, - env_indxs=env_indxs, - clone=False) - - def _set_jnts_to_homing(self, robot_name: str): - self._robots_art_views[robot_name].set_joints_default_state(positions=self._homing, - velocities = torch.zeros((self._homing.shape[0], self._homing.shape[1]), \ - dtype=self._dtype, device=self._device), - efforts = torch.zeros((self._homing.shape[0], self._homing.shape[1]), \ - dtype=self._dtype, device=self._device)) - - def _set_root_to_defconfig(self, robot_name: str): - self._robots_art_views[robot_name].set_default_state(positions=self._root_p_default[robot_name], - orientations=self._root_q_default[robot_name]) - - def _alter_twist_warmup(self, robot_name: str, env_indxs: torch.Tensor = None): - """Zero angular velocities and joint velocities for the given robot/envs.""" - if env_indxs is None: - twist = self._robots_art_views[robot_name].get_velocities(clone=True) - twist[:, 0] = 0.0 - twist[:, 1] = 0.0 - twist[:, 3:] = 0.0 # zero angular part, preserve current linear - self._robots_art_views[robot_name].set_velocities(velocities=twist, indices=None) - - # jnt_vel=self._robots_art_views[robot_name].get_joint_velocities( - # indices = None, clone=True) - # jnt_vel[:, :] = 0.0 - - # self._robots_art_views[robot_name].set_joint_velocities( - # velocities = jnt_vel, - # indices = None) - - else: - twist = self._robots_art_views[robot_name].get_velocities(clone=True, indices=env_indxs) - # twist[:, 0:2] = 0.0 - twist[:, 0] = 0.0 - twist[:, 1] = 0.0 - twist[:, 3:] = 0.0 - self._robots_art_views[robot_name].set_velocities(velocities=twist, indices=env_indxs) - - # jnt_vel=self._robots_art_views[robot_name].get_joint_velocities( - # indices = env_indxs, clone=True) - # jnt_vel[:, :] = 0.0 - - # self._robots_art_views[robot_name].set_joint_velocities( - # velocities = jnt_vel, - # indices = env_indxs) - - def _get_solver_info(self): - for i in range(0, len(self._robot_names)): - robot_name = self._robot_names[i] - self._solver_position_iteration_counts[robot_name] = self._robots_art_views[robot_name].get_solver_position_iteration_counts() - self._solver_velocity_iteration_counts[robot_name] = self._robots_art_views[robot_name].get_solver_velocity_iteration_counts() - self._solver_stabilization_threshs[robot_name] = self._robots_art_views[robot_name].get_stabilization_thresholds() - - def _update_art_solver_options(self): - - # sets new solver iteration options for specifc articulations - self._get_solver_info() # gets current solver info for the articulations of the - # environments, so that dictionaries are filled properly - - for i in range(0, len(self._robot_names)): - robot_name = self._robot_names[i] - # increase by a factor - self._solver_position_iteration_counts[robot_name] = torch.full((self._num_envs,), self._solver_position_iteration_count) - self._solver_velocity_iteration_counts[robot_name] = torch.full((self._num_envs,), self._solver_velocity_iteration_count) - self._solver_stabilization_threshs[robot_name] = torch.full((self._num_envs,), self._solver_stabilization_thresh) - self._robots_art_views[robot_name].set_solver_position_iteration_counts(self._solver_position_iteration_counts[robot_name]) - self._robots_art_views[robot_name].set_solver_velocity_iteration_counts(self._solver_velocity_iteration_counts[robot_name]) - self._robots_art_views[robot_name].set_stabilization_thresholds(self._solver_stabilization_threshs[robot_name]) - self._get_solver_info() # gets again solver info for articulation, so that it's possible to debug if - # the operation was successful - - def _print_envs_info(self): - - ground_info = f"[Ground info]" + "\n" + \ - "static friction coeff.: " + str(self._terrain_material.GetStaticFrictionAttr().Get()) + "\n" + \ - "dynamics friction coeff.: " + str(self._terrain_material.GetDynamicFrictionAttr().Get()) + "\n" + \ - "restitution coeff.: " + str(self._terrain_material.GetRestitutionAttr().Get()) + "\n" +\ - "friction comb. mode: " + str(self._terrain_physix_material.GetFrictionCombineModeAttr().Get()) + "\n" + \ - "damping comb. mode: " + str(self._terrain_physix_material.GetDampingCombineModeAttr().Get()) + "\n" + \ - "restitution comb. mode: " + str(self._terrain_physix_material.GetRestitutionCombineModeAttr().Get()) + "\n" - - Journal.log(self.__class__.__name__, - "_print_envs_info", - ground_info, - LogType.STAT, - throw_when_excep = True) - - for i in range(0, len(self._robot_names)): - robot_name = self._robot_names[i] - task_info = f"[{robot_name}]" + "\n" + \ - "bodies: " + str(self._robots_art_views[robot_name].body_names) + "\n" + \ - "n. prims: " + str(self._robots_art_views[robot_name].count) + "\n" + \ - "prims names: " + str(self._robots_art_views[robot_name].prim_paths) + "\n" + \ - "n. bodies: " + str(self._robots_art_views[robot_name].num_bodies) + "\n" + \ - "n. dofs: " + str(self._robots_art_views[robot_name].num_dof) + "\n" + \ - "dof names: " + str(self._robots_art_views[robot_name].dof_names) + "\n" + \ - "solver_position_iteration_counts: " + str(self._solver_position_iteration_counts[robot_name]) + "\n" + \ - "solver_velocity_iteration_counts: " + str(self._solver_velocity_iteration_counts[robot_name]) + "\n" + \ - "stabiliz. thresholds: " + str(self._solver_stabilization_threshs[robot_name]) - # print("dof limits: " + str(self._robots_art_views[robot_name].get_dof_limits())) - # print("effort modes: " + str(self._robots_art_views[robot_name].get_effort_modes())) - # print("dof gains: " + str(self._robots_art_views[robot_name].get_gains())) - # print("dof max efforts: " + str(self._robots_art_views[robot_name].get_max_efforts())) - # print("dof gains: " + str(self._robots_art_views[robot_name].get_gains())) - # print("physics handle valid: " + str(self._robots_art_views[robot_name].is_physics_handle_valid()) - Journal.log(self.__class__.__name__, - "_print_envs_info", - task_info, - LogType.STAT, - throw_when_excep = True) - - def _fill_robot_info_from_world(self): - for i in range(0, len(self._robot_names)): - robot_name = self._robot_names[i] - self._robot_bodynames[robot_name] = self._robots_art_views[robot_name].body_names - self._robot_n_links[robot_name] = self._robots_art_views[robot_name].num_bodies - self._robot_n_dofs[robot_name] = self._robots_art_views[robot_name].num_dof - self._robot_dof_names[robot_name] = self._robots_art_views[robot_name].dof_names - - def _set_initial_camera_params(self, - camera_position=[10, 10, 3], - camera_target=[0, 0, 0], - camera_prim_path="/OmniverseKit_Persp"): - set_camera_view(eye=camera_position, - target=camera_target, - camera_prim_path=camera_prim_path) - - def _init_contact_sensors(self, robot_name: str): - self.omni_contact_sensors[robot_name]=None - sensor_radii={} - contact_offsets={} - self._contact_names[robot_name]=self._env_opts["contact_prims"] - for contact_prim in self._env_opts["contact_prims"]: - sensor_radii[contact_prim]=self._env_opts["sensor_radii"] - contact_offsets[contact_prim]=np.array([0.0, 0.0, 0.0]) - if not (len(self._env_opts["contact_prims"])==0): - self.omni_contact_sensors[robot_name]=OmniContactSensors( - name=robot_name, - n_envs=self._num_envs, - contact_prims=self._env_opts["contact_prims"], - contact_offsets=contact_offsets, - sensor_radii=sensor_radii, - device=self._device, - dtype=self._dtype, - enable_debug=self._debug, - filter_paths=self._ground_plane_prim_paths) - self.omni_contact_sensors[robot_name].create_contact_sensors( - self._world, - envs_namespace=self._env_opts["envs_ns"]) - - def _init_robots_state(self): - - self._masses = {} - self._weights = {} - - self._pert_active = {} # bool mask: (num_envs,) - self._pert_steps_remaining = {} # int steps: (num_envs,) - self._pert_forces_world = {} # (num_envs,3) - self._pert_torques_world = {} # (num_envs,3) - self._pert_force_local = {} # (num_envs,3) (if needed) - self._pert_torque_local = {} # (num_envs,3) - self._pert_lindir = {} - self._pert_angdir = {} - self._pert_durations = {} - self._pert_scratch = {} - self._pert_det_counter = {} - - # convert durations in seconds to integer physics steps (min 1 step) - self._pert_min_steps = max(1, int(math.ceil(self._env_opts["pert_wrenches_min_duration"] / self.physics_dt()))) - self._pert_max_steps = max(self._pert_min_steps, int(math.ceil(self._env_opts["pert_wrenches_max_duration"] / self.physics_dt()))) - - pert_wrenches_step_rate=self._env_opts["pert_wrenches_rate"]/self.physics_dt() # 1 pert every n physics steps - self._pert_det_steps = max(1, int(round(pert_wrenches_step_rate))) - self._pert_wrenches_prob=min(1.0, 1.0/pert_wrenches_step_rate) # sampling prob to be used when not deterministic - - self._calc_robot_distrib() - - for i in range(0, len(self._robot_names)): - - robot_name = self._robot_names[i] - - pose = self._robots_art_views[robot_name].get_world_poses( - clone = True) # tuple: (pos, quat) - - # root p (measured, previous, default) - self._root_p[robot_name] = pose[0] - self._root_p_prev[robot_name] = torch.clone(pose[0]) - # print(self._root_p_default[robot_name].device) - self._root_p_default[robot_name] = torch.clone(pose[0]) + self._distr_offset[robot_name] - # root q (measured, previous, default) - self._root_q[robot_name] = pose[1] # root orientation - self._root_q_prev[robot_name] = torch.clone(pose[1]) - self._root_q_default[robot_name] = torch.clone(pose[1]) - # jnt q (measured, previous, default) - self._jnts_q[robot_name] = self._robots_art_views[robot_name].get_joint_positions( - clone = True) # joint positions - self._jnts_q_prev[robot_name] = self._robots_art_views[robot_name].get_joint_positions( - clone = True) - self._jnts_q_default[robot_name] = torch.full((self._jnts_q[robot_name].shape[0], - self._jnts_q[robot_name].shape[1]), - 0.0, - dtype=self._dtype, - device=self._device) - - # root v (measured, default) - self._root_v[robot_name] = self._robots_art_views[robot_name].get_linear_velocities( - clone = True) # root lin. velocityù - self._root_v_base_loc[robot_name] = torch.full_like(self._root_v[robot_name], fill_value=0.0) - self._root_v_prev[robot_name] = torch.full_like(self._root_v[robot_name], fill_value=0.0) - self._root_v_default[robot_name] = torch.full_like(self._root_v[robot_name], fill_value=0.0) - # root omega (measured, default) - self._root_omega[robot_name] = self._robots_art_views[robot_name].get_angular_velocities( - clone = True) # root ang. velocity - self._root_omega_prev[robot_name] = torch.full_like(self._root_omega[robot_name], fill_value=0.0) - - self._root_omega_base_loc[robot_name] = torch.full_like(self._root_omega[robot_name], fill_value=0.0) - self._root_omega_default[robot_name] = torch.full((self._root_omega[robot_name].shape[0], self._root_omega[robot_name].shape[1]), - 0.0, - dtype=self._dtype, - device=self._device) - # root a (measured,) - self._root_a[robot_name] = torch.full_like(self._root_v[robot_name], fill_value=0.0) - self._root_a_base_loc[robot_name] = torch.full_like(self._root_a[robot_name], fill_value=0.0) - self._root_alpha[robot_name] = torch.full_like(self._root_v[robot_name], fill_value=0.0) - self._root_alpha_base_loc[robot_name] = torch.full_like(self._root_alpha[robot_name], fill_value=0.0) - - # height grid sensor storage - grid_sz = int(self._env_opts["height_sensor_pixels"]) - self._height_imgs[robot_name] = torch.zeros((self._num_envs, grid_sz, grid_sz), - dtype=self._dtype, - device=self._device) - - # joints v (measured, default) - self._jnts_v[robot_name] = self._robots_art_views[robot_name].get_joint_velocities( - clone = True) # joint velocities - self._jnts_v_default[robot_name] = torch.full((self._jnts_v[robot_name].shape[0], self._jnts_v[robot_name].shape[1]), - 0.0, - dtype=self._dtype, - device=self._device) - - # joints efforts (measured, default) - self._jnts_eff[robot_name] = torch.full((self._jnts_v[robot_name].shape[0], self._jnts_v[robot_name].shape[1]), - 0.0, - dtype=self._dtype, - device=self._device) - self._jnts_eff_default[robot_name] = torch.full((self._jnts_v[robot_name].shape[0], self._jnts_v[robot_name].shape[1]), - 0.0, - dtype=self._dtype, - device=self._device) - self._root_pos_offsets[robot_name] = torch.zeros((self._num_envs, 3), - device=self._device) # reference position offses - - self._root_q_offsets[robot_name] = torch.zeros((self._num_envs, 4), - device=self._device) - self._root_q_offsets[robot_name][:, 0] = 1.0 # init to valid identity quaternion - - # boolean active flag per env - self._pert_active[robot_name] = torch.zeros((self._num_envs,), dtype=torch.bool, device=self._device) - # remaining steps as integer tensor - self._pert_steps_remaining[robot_name] = torch.zeros((self._num_envs,), dtype=torch.int32, device=self._device) - # world force & torque (N and N*m) stored as floats - self._pert_forces_world[robot_name] = torch.zeros((self._num_envs, 3), dtype=self._dtype, device=self._device) - self._pert_torques_world[robot_name] = torch.zeros((self._num_envs, 3), dtype=self._dtype, device=self._device) - # local frame copies (if you want to store local-frame versions) - self._pert_force_local[robot_name] = torch.zeros((self._num_envs, 3), dtype=self._dtype, device=self._device) - self._pert_torque_local[robot_name] = torch.zeros((self._num_envs, 3), dtype=self._dtype, device=self._device) - - self._pert_lindir[robot_name] = torch.zeros((self._num_envs, 3), dtype=self._dtype, device=self._device) - self._pert_angdir[robot_name] = torch.zeros((self._num_envs, 3), dtype=self._dtype, device=self._device) - - self._pert_durations[robot_name] = torch.zeros((self._num_envs, 1), dtype=torch.int32, device=self._device) - - self._pert_scratch[robot_name] = torch.zeros((self._num_envs, 1), dtype=self._dtype, device=self._device) - self._pert_det_counter[robot_name] = torch.zeros((self._num_envs,), dtype=torch.int32, device=self._device) - - self._masses[robot_name] = torch.sum(self._robots_art_views[robot_name].get_body_masses(clone=True), dim=1).to(dtype=self._dtype, device=self._device) - - self._weights[robot_name] = (self._masses[robot_name] * abs(self._env_opts["gravity"][2].item())).reshape((self._num_envs, 1)) - - def _track_terrain_hits(self, robot_name: str, env_indxs: torch.Tensor = None): - """Track transitions into the terrain boundary margin (1 m) and count hits per env.""" - if self._env_opts["use_flat_ground"]: - return - border = float(self._env_opts["terrain_border"]) - threshold = max(0.0, border - self._terrain_hit_margin) - state = self._terrain_hit_active[robot_name] - counts = self._terrain_hit_counts[robot_name] - if env_indxs is None: - pos_xy = self._root_p[robot_name][:, 0:2] - hitting = torch.any(torch.abs(pos_xy) > threshold, dim=1) - new_hits = (~state) & hitting - if new_hits.any(): - counts[new_hits] += 1 - state.copy_(hitting) - else: - pos_xy = self._root_p[robot_name][env_indxs, 0:2] - hitting = torch.any(torch.abs(pos_xy) > threshold, dim=1) - prev_state = state[env_indxs] - new_hits = (~prev_state) & hitting - if new_hits.any(): - counts[env_indxs[new_hits]] += 1 - state[env_indxs] = hitting - - def _maybe_log_terrain_hits(self): - """Log boundary hits at low frequency only when counters change.""" - if self._env_opts["use_flat_ground"]: - return - period = int(self._env_opts["terrain_hit_log_period"]) - if period <= 0 or (self.step_counter % period != 0): - return - for robot_name in self._robot_names: - active = self._terrain_hit_active.get(robot_name, None) - if active is None: - continue - active_now = int(active.sum().item()) - if active_now == 0: - continue - counts = self._terrain_hit_counts[robot_name] - last = self._terrain_hit_counts_last_logged.get(robot_name, None) - if last is not None and torch.equal(counts, last): - continue - total_hits = int(counts.sum().item()) - msg = f"{active_now} {robot_name} robots within {self._terrain_hit_margin}m of terrain border. Total hits: {total_hits}." - Journal.log(self.__class__.__name__, - "_terrain_hits", - msg, - LogType.WARN, - throw_when_excep = True) - self._terrain_hit_counts_last_logged[robot_name] = counts.clone() - - @override - def _post_world_step(self) -> bool: - res = super()._post_world_step() - self._maybe_log_terrain_hits() - return res - - @override - def _post_world_step_db(self) -> bool: - res = super()._post_world_step_db() - self._maybe_log_terrain_hits() - return res - - def current_tstep(self): - self._world.current_time_step_index - - def world_time(self, robot_name: str): - return self._world.current_time - - def physics_dt(self): - return self._world.get_physics_dt() - - def rendering_dt(self): - return self._env_opts["rendering_dt"] - - def set_physics_dt(self, physics_dt:float): - self._world.set_simulation_dt(physics_dt=physics_dt,rendering_dt=None) - - def set_rendering_dt(self, rendering_dt:float): - self._world.set_simulation_dt(physics_dt=None,rendering_dt=rendering_dt) - - def _robot_jnt_names(self, robot_name: str): - return self._robots_art_views[robot_name].dof_names