diff --git "a/isaac_world_interface.py" "b/isaac_world_interface.py" new file mode 100644--- /dev/null +++ "b/isaac_world_interface.py" @@ -0,0 +1,2355 @@ +# Copyright (C) 2023 Andrea Patrizi (AndrePatri, andreapatrizi1b6e6@gmail.com) +# +# This file is part of AugMPCEnvs and distributed under the General Public License version 2 license. +# +# AugMPCEnvs is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 2 of the License, or +# (at your option) any later version. +# +# AugMPCEnvs is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with AugMPCEnvs. If not, see . +# +from isaacsim import SimulationApp + +import carb + +import os +import math + +import torch +import numpy as np + +from typing import Dict, List +from typing_extensions import override + +from EigenIPC.PyEigenIPC import VLevel +from EigenIPC.PyEigenIPC import LogType +from EigenIPC.PyEigenIPC import Journal + +from aug_mpc_envs.utils.math_utils import quat_to_omega +from aug_mpc_envs.utils.height_grid_visualizer import HeightGridVisualizer +from aug_mpc_envs.utils.height_sensor import HeightGridSensor + +from aug_mpc.world_interfaces.world_interface_base import AugMPCWorldInterfaceBase +from mpc_hive.utilities.math_utils_torch import world2base_frame,world2base_frame3D + +class IsaacSimEnv(AugMPCWorldInterfaceBase): + + def __init__(self, + robot_names: List[str], + robot_urdf_paths: List[str], + robot_srdf_paths: List[str], + jnt_imp_config_paths: List[str], + n_contacts: List[int], + cluster_dt: List[float], + use_remote_stepping: List[bool], + name: str = "IsaacSimEnv", + num_envs: int = 1, + debug = False, + verbose: bool = False, + vlevel: VLevel = VLevel.V1, + n_init_step: int = 0, + timeout_ms: int = 60000, + env_opts: Dict = None, + use_gpu: bool = True, + dtype: torch.dtype = torch.float32, + override_low_lev_controller: bool = False): + + self._render_step_counter = 0 + + super().__init__(name=name, + robot_names=robot_names, + robot_urdf_paths=robot_urdf_paths, + robot_srdf_paths=robot_srdf_paths, + jnt_imp_config_paths=jnt_imp_config_paths, + n_contacts=n_contacts, + cluster_dt=cluster_dt, + use_remote_stepping=use_remote_stepping, + num_envs=num_envs, + debug=debug, + verbose=verbose, + vlevel=vlevel, + n_init_step=n_init_step, + timeout_ms=timeout_ms, + env_opts=env_opts, + use_gpu=use_gpu, + dtype=dtype, + override_low_lev_controller=override_low_lev_controller) + + def is_running(self): + return self._simulation_app.is_running() + + def _pre_setup(self): + self._backend="torch" + enable_livestream = self._env_opts["enable_livestream"] + enable_viewport = self._env_opts["enable_viewport"] + base_isaac_exp = f'{os.environ["EXP_PATH"]}/omni.isaac.sim.python.aug_mpc_envs.kit' + base_isaac_exp_headless = f'{os.environ["EXP_PATH"]}/omni.isaac.sim.python.aug_mpc_envs.headless.kit' + + experience=base_isaac_exp + if self._env_opts["headless"]: + info = f"Will run in headless mode." + Journal.log(self.__class__.__name__, + "__init__", + info, + LogType.STAT, + throw_when_excep = True) + if enable_livestream: + experience = "" + elif enable_viewport: + exception = f"Using viewport is not supported yet." + Journal.log(self.__class__.__name__, + "__init__", + exception, + LogType.EXCEP, + throw_when_excep = True) + else: + experience=base_isaac_exp_headless + + self._simulation_app = SimulationApp({"headless": self._env_opts["headless"]}, + experience=experience) + # all imports depending on isaac sim kits have to be done after simulationapp + # from omni.isaac.core.tasks.base_task import BaseTask + self._import_isaac_pkgs() + info = "Using IsaacSim experience file @ " + experience + Journal.log(self.__class__.__name__, + "__init__", + info, + LogType.STAT, + throw_when_excep = True) + # carb.settings.get_settings().set("/persistent/omnihydra/useSceneGraphInstancing", True) + + if enable_livestream: + info = "Livestream enabled" + Journal.log(self.__class__.__name__, + "__init__", + info, + LogType.STAT, + throw_when_excep = True) + + self._simulation_app.set_setting("/app/livestream/enabled", True) + self._simulation_app.set_setting("/app/window/drawMouse", True) + self._simulation_app.set_setting("/app/livestream/proto", "ws") + self._simulation_app.set_setting("/app/livestream/websocket/framerate_limit", 120) + self._simulation_app.set_setting("/ngx/enabled", False) + enable_extension("omni.kit.livestream.native") + enable_extension("omni.services.streaming.manager") + self._render = ((not self._env_opts["headless"]) or self._env_opts["render_to_file"]) or enable_livestream or enable_viewport + + self._record = False + self._world = None + self._physics_context = None + self._scene = None + self._task = None + self._metadata = None + + self._robots_art_views = {} + self._blink_rigid_prim_views = {} + self._robots_articulations = {} + self._robots_geom_prim_views = {} + self.omni_contact_sensors = {} + + self._solver_position_iteration_count=self._env_opts["solver_position_iteration_count"] + self._solver_velocity_iteration_count=self._env_opts["solver_velocity_iteration_count"] + self._solver_stabilization_thresh=self._env_opts["stabilization_threshold"] + self._solver_position_iteration_counts={} + self._solver_velocity_iteration_counts={} + self._solver_stabilization_threshs={} + self._robot_bodynames={} + self._robot_n_links={} + self._robot_n_dofs={} + self._robot_dof_names={} + self._distr_offset={} # decribed how robots within each env are distributed + self._spawning_radius=self._env_opts["spawning_radius"] # [m] -> default distance between roots of robots in a single + self._height_sensors={} + self._height_imgs={} + self._height_vis={} + self._height_vis_step={} + self._height_vis_step={} + self._height_vis={} + self._terrain_hit_margin = 1.0 + self._terrain_hit_log_period = 1000 + self._terrain_hit_active = {} + self._terrain_hit_counts = {} + self._terrain_hit_counts_last_logged = {} + for robot_name in self._robot_names: + self._terrain_hit_active[robot_name] = torch.zeros((self._num_envs,), device=self._device, dtype=torch.bool) + self._terrain_hit_counts[robot_name] = torch.zeros((self._num_envs,), device=self._device, dtype=torch.int32) + self._terrain_hit_counts_last_logged[robot_name] = None + + def _import_isaac_pkgs(self): + # we use global, so that we can create the simulation app inside (and so + # access Isaac's kit) and also expose to all methods the imports + global World, omni_kit, get_context, UsdLux, Sdf, Gf, UsdPhysics, PhysicsSchemaTools, UsdShade, Vt + global enable_extension, set_camera_view, _urdf, move_prim, GridCloner, prim_utils + global get_current_stage, Scene, ArticulationView, RigidPrimView, rep + global OmniContactSensors, RlTerrains,OmniJntImpCntrl + global PhysxSchema, UsdGeom + global _sensor, _dynamic_control + global get_prim_at_path + + from pxr import PhysxSchema, UsdGeom, UsdShade, Vt + + from omni.isaac.core.world import World + from omni.usd import get_context + from pxr import UsdLux, Sdf, Gf, UsdPhysics, PhysicsSchemaTools + from omni.isaac.core.utils.extensions import enable_extension + from omni.isaac.core.utils.viewports import set_camera_view + import omni.kit as omni_kit + from omni.importer.urdf import _urdf + from omni.isaac.core.utils.prims import move_prim + from omni.isaac.cloner import GridCloner + import omni.isaac.core.utils.prims as prim_utils + from omni.isaac.core.utils.stage import get_current_stage + from omni.isaac.core.scenes.scene import Scene + from omni.isaac.core.articulations import ArticulationView + from omni.isaac.core.prims import RigidPrimView + + import omni.replicator.core as rep + + from omni.isaac.core.utils.prims import get_prim_at_path + + from omni.isaac.sensor import _sensor + + from omni.isaac.dynamic_control import _dynamic_control + + from aug_mpc_envs.utils.contact_sensor import OmniContactSensors + from aug_mpc_envs.utils.omni_jnt_imp_cntrl import OmniJntImpCntrl + from aug_mpc_envs.utils.terrains import RlTerrains + + def _parse_env_opts(self): + isaac_opts={} + isaac_opts["envs_ns"]="/World/envs" + isaac_opts["template_env_ns"]=isaac_opts["envs_ns"] + "/env_0" + isaac_opts["base_linkname"]="base_link" + isaac_opts["deduce_base_link"]=False + isaac_opts["ground_plane_prim_path"]="/World/terrain" + isaac_opts["physics_prim_path"]="/physicsScene" + isaac_opts["use_gpu"]=True + isaac_opts["use_gpu_pipeline"]=True + isaac_opts["device"]="cuda" + isaac_opts["is_fixed_base"]=False + isaac_opts["merge_fixed_jnts"]=True + isaac_opts["self_collide"]=True + isaac_opts["sim_device"]="cuda" if isaac_opts["use_gpu"] else "cpu" + isaac_opts["physics_dt"]=1e-3 + isaac_opts["gravity"] = np.array([0.0, 0.0, -9.81]) + isaac_opts["use_fabric"]=True# Enable/disable reading of physics buffers directly. Default is True. + isaac_opts["replicate_physics"]=True + # isaac_opts["worker_thread_count"]=4 + isaac_opts["solver_type"]=1 # 0: PGS, 1:TGS, defaults to TGS. PGS faster but TGS more stable + isaac_opts["enable_stabilization"]=True + # isaac_opts["bounce_threshold_velocity"] = 0.2 + # isaac_opts["friction_offset_threshold"] = 0.04 + # isaac_opts["friction_correlation_distance"] = 0.025 + # isaac_opts["enable_sleeping"] = True + # Per-actor settings ( can override in actor_options ) + isaac_opts["solver_position_iteration_count"] = 4 # defaults to 4 + isaac_opts["solver_velocity_iteration_count"] = 3 # defaults to 1 + isaac_opts["sleep_threshold"] = 1e-5 # Mass-normalized kinetic energy threshold below which an actor may go to sleep. + # Allowed range [0, max_float). + isaac_opts["stabilization_threshold"] = 1e-4 + # Per-body settings ( can override in actor_options ) + # isaac_opts["enable_gyroscopic_forces"] = True + # isaac_opts["density"] = 1000 # density to be used for bodies that do not specify mass or density + # isaac_opts["max_depenetration_velocity"] = 100.0 + # isaac_opts["solver_velocity_iteration_count"] = 1 + # GPU buffers settings + isaac_opts["gpu_max_rigid_contact_count"] = 512 * 2048 + isaac_opts["gpu_max_rigid_patch_count"] = 80 * 2048 + isaac_opts["gpu_found_lost_pairs_capacity"] = 102400 + isaac_opts["gpu_found_lost_aggregate_pairs_capacity"] = 102400 + isaac_opts["gpu_total_aggregate_pairs_capacity"] = 102400 + # isaac_opts["gpu_max_soft_body_contacts"] = 1024 * 1024 + # isaac_opts["gpu_max_particle_contacts"] = 1024 * 1024 + # isaac_opts["gpu_heap_capacity"] = 64 * 1024 * 1024 + # isaac_opts["gpu_temp_buffer_capacity"] = 16 * 1024 * 1024 + # isaac_opts["gpu_max_num_partitions"] = 8 + + isaac_opts["env_spacing"]=15.0 + isaac_opts["spawning_height"]=1.0 + isaac_opts["spawning_radius"]=1.0 + isaac_opts["spawn_height_check_half_extent"]=0.2 + isaac_opts["spawn_height_cushion"]=0.03 + + isaac_opts["enable_height_vis"]=False + isaac_opts["height_vis_radius"]=0.03 + isaac_opts["height_vis_update_period"]=1 + isaac_opts["collision_refinement_level"]=3 # increase cylinder tesselation for smoother wheel contacts + + # rendering helpers + isaac_opts["render_to_file"]=False + isaac_opts["use_follow_camera"]=False # if True, follow robot during rendering in human mode + isaac_opts["render_follow_env_idx"]=0 + isaac_opts["render_follow_robot_idx"]=0 + isaac_opts["render_follow_offset"]=[-0.2, 3.0, 0.1] + isaac_opts["render_follow_target_offset"]=[-0.2, -1.0, 0.0] + isaac_opts["rendering_dt"]=15*isaac_opts["physics_dt"] + isaac_opts["camera_prim_path"]="/OmniverseKit_Persp" + isaac_opts["render_resolution"]=[1280, 720] # [1024, 576] + + isaac_opts["render_panoramic_cam"]=True + isaac_opts["render_panoramic_cam_height"]=2.0 + isaac_opts["render_panoramic_cam_target_xy"]=[10.0, 14.] + isaac_opts["render_panoramic_cam_target_z"]=1.2 + + # ground opts + isaac_opts["use_flat_ground"]=True + isaac_opts["static_friction"]=0.5 + isaac_opts["dynamic_friction"]=0.5 + isaac_opts["restitution"]=0.1 + isaac_opts["ground_type"]="random" + isaac_opts["ground_size"]=800 + isaac_opts["terrain_border"]=isaac_opts["ground_size"]/2 + isaac_opts["dh_ground"]=0.03 + isaac_opts["step_height_lb"]=0.08 + isaac_opts["step_height_ub"]=0.12 + isaac_opts["step_width_lb"]=1.0 + isaac_opts["step_width_ub"]= 1.5 + isaac_opts["contact_prims"] = [] + isaac_opts["sensor_radii"] = 0.1 + isaac_opts["contact_offsets"] = {} + + isaac_opts["enable_livestream"] = False + isaac_opts["enable_viewport"] = False + + isaac_opts["use_diff_vels"] = False + + # random perturbations (impulse, durations, directions are sampled uniformly, force/torque computed accordinly) + isaac_opts["use_random_pertub"]=False + isaac_opts["pert_planar_only"]=True # if True, linear pushes only in xy plane and no torques + + isaac_opts["pert_wrenches_rate"]=30.0 # on average 1 pert every pert_wrenches_rate seconds + isaac_opts["pert_wrenches_min_duration"]=0.6 + isaac_opts["pert_wrenches_max_duration"]=3.5 # [s] + isaac_opts["pert_force_max_weight_scale"]=0.4 # clip force norm to scale*weight + isaac_opts["pert_force_min_weight_scale"]=0.1 # optional min force norm as scale*weight + isaac_opts["pert_torque_max_weight_scale"]=1.0 # clip torque norm to scale*weight*max_ang_impulse_lever + + isaac_opts["pert_target_delta_v"]=2.0 # [m/s] desired max impulse = m*delta_v + isaac_opts["det_pert_rate"]=True + + # max impulse (unitless scale multiplied by weight to get N*s): delta_v/g + isaac_opts["max_lin_impulse_norm"]=isaac_opts["pert_target_delta_v"]/9.81 + isaac_opts["lin_impulse_mag_min"]=1.0 # [0, 1] -> min fraction of max_lin_impulse_norm when sampling + isaac_opts["lin_impulse_mag_max"]=1.0 # [0, 1] -> max fraction of max_lin_impulse_norm when sampling + + isaac_opts["max_ang_impulse_lever"]=0.2 # [m] + isaac_opts["max_ang_impulse_norm"]=isaac_opts["max_lin_impulse_norm"]*isaac_opts["max_ang_impulse_lever"] + isaac_opts["terrain_hit_log_period"]=1000 + + # opts definition end + + isaac_opts.update(self._env_opts) # update defaults with provided opts + isaac_opts["rendering_freq"]=int(isaac_opts["rendering_dt"]/isaac_opts["physics_dt"]) + # isaac_opts["rendering_dt"]=isaac_opts["physics_dt"] # forcing rendering_dt==physics_dt + # for some mystic reason simulation is infuenced by the rendering dt (why ??????) + + # modify things + isaac_opts["cloning_offset"] = np.array([[0.0, 0.0, isaac_opts["spawning_height"]]]*self._num_envs) + if not isaac_opts["use_gpu"]: # don't use GPU at all + isaac_opts["use_gpu_pipeline"]=False + isaac_opts["device"]="cpu" + isaac_opts["sim_device"]="cpu" + else: # use GPU + if isaac_opts["use_gpu_pipeline"]: + isaac_opts["device"]="cuda" + isaac_opts["sim_device"]="cuda" + else: # cpu pipeline + isaac_opts["device"]="cpu" + isaac_opts["sim_device"]="cpu" + isaac_opts["use_gpu"]=False + # isaac_opts["sim_device"]=isaac_opts["device"] + # overwrite env opts in case some sim params were missing + self._env_opts=isaac_opts + + # update device flag based on sim opts + self._device=isaac_opts["device"] + self._use_gpu=isaac_opts["use_gpu"] + + def _calc_robot_distrib(self): + + import math + # we distribute robots in a single env. along the + # circumference of a circle of given radius + n_robots = len(self._robot_names) + offset_baseangle = 2 * math.pi / n_robots + for i in range(n_robots): + offset_angle = offset_baseangle * (i + 1) + robot_offset_wrt_center = torch.tensor([self._spawning_radius * math.cos(offset_angle), + self._spawning_radius * math.sin(offset_angle), 0], + device=self._device, + dtype=self._dtype) + # list with n references to the original tensor + tensor_list = [robot_offset_wrt_center] * self._num_envs + self._distr_offset[self._robot_names[i]] = torch.stack(tensor_list, dim=0) + + def _init_world(self): + + self._cloner = GridCloner(spacing=self._env_opts["env_spacing"]) + self._cloner.define_base_env(self._env_opts["envs_ns"]) + prim_utils.define_prim(self._env_opts["template_env_ns"]) + self._envs_prim_paths = self._cloner.generate_paths(self._env_opts["envs_ns"] + "/env", + self._num_envs) + + # parse device based on sim_param settings + + info = "Using sim device: " + str(self._env_opts["sim_device"]) + Journal.log(self.__class__.__name__, + "__init__", + info, + LogType.STAT, + throw_when_excep = True) + + self._world = World( + physics_dt=self._env_opts["physics_dt"], + rendering_dt=self._env_opts["physics_dt"], # == physics dt (rendering is actually done manually by this env) + backend=self._backend, + device=str(self._env_opts["sim_device"]), + physics_prim_path=self._env_opts["physics_prim_path"], + set_defaults = False, # set to True to use the defaults settings [physics_dt = 1.0/ 60.0, + # stage units in meters = 0.01 (i.e in cms), rendering_dt = 1.0 / 60.0, gravity = -9.81 m / s + # ccd_enabled, stabilization_enabled, gpu dynamics turned off, + # broadcast type is MBP, solver type is TGS] + sim_params=self._env_opts + ) + + big_info = "[World] Creating Isaac simulation " + self._name + "\n" + \ + "use_gpu_pipeline: " + str(self._env_opts["use_gpu_pipeline"]) + "\n" + \ + "device: " + str(self._env_opts["sim_device"]) + "\n" +\ + "backend: " + str(self._backend) + "\n" +\ + "integration_dt: " + str(self.physics_dt()) + "\n" + \ + "rendering_dt: " + str(self.rendering_dt()) + "\n" + Journal.log(self.__class__.__name__, + "_init_world", + big_info, + LogType.STAT, + throw_when_excep = True) + + # we get the physics context to expose additional low-level ## + # settings of the simulation + self._physics_context = self._world.get_physics_context() + self._physics_scene_path = self._physics_context.prim_path + # self._physics_context.enable_gpu_dynamics(True) + self._physics_context.enable_stablization(True) + self._physics_scene_prim = self._physics_context.get_current_physics_scene_prim() + self._solver_type = self._physics_context.get_solver_type() + + if "gpu_max_rigid_contact_count" in self._env_opts: + self._physics_context.set_gpu_max_rigid_contact_count(self._env_opts["gpu_max_rigid_contact_count"]) + if "gpu_max_rigid_patch_count" in self._env_opts: + self._physics_context.set_gpu_max_rigid_patch_count(self._env_opts["gpu_max_rigid_patch_count"]) + if "gpu_found_lost_pairs_capacity" in self._env_opts: + self._physics_context.set_gpu_found_lost_pairs_capacity(self._env_opts["gpu_found_lost_pairs_capacity"]) + if "gpu_found_lost_aggregate_pairs_capacity" in self._env_opts: + self._physics_context.set_gpu_found_lost_aggregate_pairs_capacity(self._env_opts["gpu_found_lost_aggregate_pairs_capacity"]) + if "gpu_total_aggregate_pairs_capacity" in self._env_opts: + self._physics_context.set_gpu_total_aggregate_pairs_capacity(self._env_opts["gpu_total_aggregate_pairs_capacity"]) + if "gpu_max_soft_body_contacts" in self._env_opts: + self._physics_context.set_gpu_max_soft_body_contacts(self._env_opts["gpu_max_soft_body_contacts"]) + if "gpu_max_particle_contacts" in self._env_opts: + self._physics_context.set_gpu_max_particle_contacts(self._env_opts["gpu_max_particle_contacts"]) + if "gpu_heap_capacity" in self._env_opts: + self._physics_context.set_gpu_heap_capacity(self._env_opts["gpu_heap_capacity"]) + if "gpu_temp_buffer_capacity" in self._env_opts: + self._physics_context.set_gpu_temp_buffer_capacity(self._env_opts["gpu_temp_buffer_capacity"]) + if "gpu_max_num_partitions" in self._env_opts: + self._physics_context.set_gpu_max_num_partitions(self._env_opts["gpu_max_num_partitions"]) + + # overwriting defaults + # self._physics_context.set_gpu_max_rigid_contact_count(2 * self._physics_context.get_gpu_max_rigid_contact_count()) + # self._physics_context.set_gpu_max_rigid_patch_count(2 * self._physics_context.get_gpu_max_rigid_patch_count()) + # self._physics_context.set_gpu_found_lost_pairs_capacity(2 * self._physics_context.get_gpu_found_lost_pairs_capacity()) + # self._physics_context.set_gpu_found_lost_aggregate_pairs_capacity(20 * self._physics_context.get_gpu_found_lost_aggregate_pairs_capacity()) + # self._physics_context.set_gpu_total_aggregate_pairs_capacity(20 * self._physics_context.get_gpu_total_aggregate_pairs_capacity()) + # self._physics_context.set_gpu_heap_capacity(2 * self._physics_context.get_gpu_heap_capacity()) + # self._physics_context.set_gpu_temp_buffer_capacity(20 * self._physics_context.get_gpu_heap_capacity()) + # self._physics_context.set_gpu_max_num_partitions(20 * self._physics_context.get_gpu_temp_buffer_capacity()) + + # GPU buffers + self._gpu_max_rigid_contact_count = self._physics_context.get_gpu_max_rigid_contact_count() + self._gpu_max_rigid_patch_count = self._physics_context.get_gpu_max_rigid_patch_count() + self._gpu_found_lost_pairs_capacity = self._physics_context.get_gpu_found_lost_pairs_capacity() + self._gpu_found_lost_aggregate_pairs_capacity = self._physics_context.get_gpu_found_lost_aggregate_pairs_capacity() + self._gpu_total_aggregate_pairs_capacity = self._physics_context.get_gpu_total_aggregate_pairs_capacity() + self._gpu_max_soft_body_contacts = self._physics_context.get_gpu_max_soft_body_contacts() + self._gpu_max_particle_contacts = self._physics_context.get_gpu_max_particle_contacts() + self._gpu_heap_capacity = self._physics_context.get_gpu_heap_capacity() + self._gpu_temp_buffer_capacity = self._physics_context.get_gpu_temp_buffer_capacity() + # self._gpu_max_num_partitions = physics_context.get_gpu_max_num_partitions() # BROKEN->method does not exist + + big_info2 = "[physics context]:" + "\n" + \ + "gpu_max_rigid_contact_count: " + str(self._gpu_max_rigid_contact_count) + "\n" + \ + "gpu_max_rigid_patch_count: " + str(self._gpu_max_rigid_patch_count) + "\n" + \ + "gpu_found_lost_pairs_capacity: " + str(self._gpu_found_lost_pairs_capacity) + "\n" + \ + "gpu_found_lost_aggregate_pairs_capacity: " + str(self._gpu_found_lost_aggregate_pairs_capacity) + "\n" + \ + "gpu_total_aggregate_pairs_capacity: " + str(self._gpu_total_aggregate_pairs_capacity) + "\n" + \ + "gpu_max_soft_body_contacts: " + str(self._gpu_max_soft_body_contacts) + "\n" + \ + "gpu_max_particle_contacts: " + str(self._gpu_max_particle_contacts) + "\n" + \ + "gpu_heap_capacity: " + str(self._gpu_heap_capacity) + "\n" + \ + "gpu_temp_buffer_capacity: " + str(self._gpu_temp_buffer_capacity) + "\n" + \ + "use_gpu_sim: " + str(self._world.get_physics_context().use_gpu_sim) + "\n" + \ + "use_gpu_pipeline: " + str(self._world.get_physics_context().use_gpu_pipeline) + "\n" + \ + "use_fabric: " + str(self._world.get_physics_context().use_fabric) + "\n" + \ + "world device: " + str(self._world.get_physics_context().device) + "\n" + \ + "physics context device: " + str(self._world.get_physics_context().device) + "\n" + + Journal.log(self.__class__.__name__, + "set_task", + big_info2, + LogType.STAT, + throw_when_excep = True) + + self._scene = self._world.scene + self._physics_context = self._world.get_physics_context() + + self._stage = get_context().get_stage() + + # strong, uniform lighting: bright sun + dome fill to cover the whole terrain + prim_utils.define_prim("/World/Lighting", "Xform") + sun_path = "/World/Lighting/SunLight" + dome_path = "/World/Lighting/AmbientDome" + + distantLight = UsdLux.DistantLight.Define(self._stage, Sdf.Path(sun_path)) + distantLight.CreateIntensityAttr(450.0) + distantLight.CreateAngleAttr(0.5) # soften shadows a bit + distantLight.CreateColorAttr(Gf.Vec3f(1.0, 1.0, 1.0)) + # Shadow attr naming differs across versions; set the underlying USD attribute directly. + distantLight.GetPrim().CreateAttribute("shadow:enable", Sdf.ValueTypeNames.Bool).Set(True) + + domeLight = UsdLux.DomeLight.Define(self._stage, Sdf.Path(dome_path)) + domeLight.CreateIntensityAttr(200.0) + domeLight.CreateExposureAttr(1.0) + domeLight.CreateColorAttr(Gf.Vec3f(1.0, 1.0, 1.0)) + + self._configure_scene() + + # if "enable_viewport" in sim_params: + # self._render = sim_params["enable_viewport"] + + def _get_baselink_candidate(self, + robot_name: str): + + stage=get_current_stage() + all_prims = [prim.GetPath().pathString for prim in stage.Traverse()] + filtered_prims = [prim for prim in all_prims if f"/{robot_name}/" in prim] + + matching=min(filtered_prims, key=len) if filtered_prims else None + + return matching.split('/')[-1] + + def _configure_scene(self): + + # environment + self._fix_base = [self._env_opts["is_fixed_base"]] * len(self._robot_names) + self._self_collide = [self._env_opts["self_collide"]] * len(self._robot_names) + self._merge_fixed = [self._env_opts["merge_fixed_jnts"]] * len(self._robot_names) + + Journal.log(self.__class__.__name__, + "_configure_scene", + "cloning environments...", + LogType.STAT, + throw_when_excep = True) + + self._ground_plane_prim_paths=[] + self._ground_plane=None + self.terrain_generator = None + if not self._env_opts["use_flat_ground"]: + # ensure terrain is large enough to contain all env clones + spacing = float(self._env_opts["env_spacing"]) + num_envs = self._num_envs + num_per_row = max(1, int(math.sqrt(num_envs))) + num_rows = int(math.ceil(num_envs / num_per_row)) + num_cols = int(math.ceil(num_envs / num_rows)) + row_offset = 0.5 * spacing * (num_rows - 1) + col_offset = 0.5 * spacing * (num_cols - 1) + margin = spacing # leave a full spacing as border cushion + required_size = 2.0 * (max(row_offset, col_offset) + margin) + + if required_size > self._env_opts["ground_size"]: + old_size = self._env_opts["ground_size"] + self._env_opts["ground_size"] = required_size + self._env_opts["terrain_border"] = self._env_opts["ground_size"] / 2.0 + Journal.log(self.__class__.__name__, + "_configure_scene", + f"Ground size increased from {old_size} to {required_size} to fit {num_envs} envs (spacing {spacing}).", + LogType.WARN, + throw_when_excep = True) + + min_height=-self._env_opts["dh_ground"] + max_height=self._env_opts["dh_ground"] + step=max_height-min_height + if self._env_opts["ground_type"]=="random": + terrain_prim_path=self._env_opts["ground_plane_prim_path"]+"_random_unif" + self._ground_plane_prim_paths.append(terrain_prim_path) + self.terrain_generator = RlTerrains(get_current_stage(), prim_path=terrain_prim_path) + self._ground_plane=self.terrain_generator.create_random_uniform_terrain(terrain_size=self._env_opts["ground_size"], + min_height=min_height, + max_height=max_height, + step=step, + position=np.array([0.0, 0.0,0.0]), + static_friction=self._env_opts["static_friction"], + dynamic_friction=self._env_opts["dynamic_friction"], + restitution=self._env_opts["restitution"]) + elif self._env_opts["ground_type"]=="random_patches": + terrain_prim_path=self._env_opts["ground_plane_prim_path"]+"_random_unif_patches" + self._ground_plane_prim_paths.append(terrain_prim_path) + self.terrain_generator = RlTerrains(get_current_stage(), prim_path=terrain_prim_path) + self._ground_plane=self.terrain_generator.create_random_patched_terrain(terrain_size=self._env_opts["ground_size"], + min_height=min_height, + max_height=max_height, + step=step, + position=np.array([0.0, 0.0,0.0]), + static_friction=self._env_opts["static_friction"], + dynamic_friction=self._env_opts["dynamic_friction"], + restitution=self._env_opts["restitution"], + patch_ratio=0.8, + patch_size=10 + ) + elif self._env_opts["ground_type"]=="slopes": + terrain_prim_path=self._env_opts["ground_plane_prim_path"]+"_slopes" + self._ground_plane_prim_paths.append(terrain_prim_path) + self.terrain_generator = RlTerrains(get_current_stage(), prim_path=terrain_prim_path) + self._ground_plane=self.terrain_generator.create_sloped_terrain(terrain_size=self._env_opts["ground_size"], + slope=-0.5, + position=np.array([0.0, 0.0,0.0]), + static_friction=self._env_opts["static_friction"], + dynamic_friction=self._env_opts["dynamic_friction"], + restitution=self._env_opts["restitution"] + ) + elif self._env_opts["ground_type"]=="stairs": + terrain_prim_path=self._env_opts["ground_plane_prim_path"]+"_stairs" + self._ground_plane_prim_paths.append(terrain_prim_path) + self.terrain_generator = RlTerrains(get_current_stage(), prim_path=terrain_prim_path) + self._ground_plane=self.terrain_generator.create_stairs_terrain(terrain_size=self._env_opts["ground_size"], + position=np.array([0.0, 0.0,0.0]), + static_friction=self._env_opts["static_friction"], + dynamic_friction=self._env_opts["dynamic_friction"], + restitution=self._env_opts["restitution"], + ) + elif self._env_opts["ground_type"]=="stepup": + terrain_prim_path=self._env_opts["ground_plane_prim_path"]+"_stepup" + self._ground_plane_prim_paths.append(terrain_prim_path) + self.terrain_generator = RlTerrains(get_current_stage(), prim_path=terrain_prim_path) + self._ground_plane=self.terrain_generator.create_stepup_terrain( + terrain_size=self._env_opts["ground_size"], + stairs_ratio=0.3, + min_steps=1, + max_steps=1, + pyramid_platform_size=15.0, + position=np.array([0.0, 0.0, 0.0]), + static_friction=self._env_opts["static_friction"], + dynamic_friction=self._env_opts["dynamic_friction"], + restitution=self._env_opts["restitution"], + step_height=0.15 + ) + elif self._env_opts["ground_type"]=="stepup_prim": + terrain_prim_path=self._env_opts["ground_plane_prim_path"]+"_stepup_prim" + self._ground_plane_prim_paths.append(terrain_prim_path) + self.terrain_generator = RlTerrains(get_current_stage(), prim_path=terrain_prim_path) + self._ground_plane=self.terrain_generator.create_stepup_prim_terrain( + terrain_size=self._env_opts["ground_size"], + stairs_ratio=0.9, + platform_size=50.0, + step_height_lb=self._env_opts["step_height_lb"], + step_height_ub=self._env_opts["step_height_ub"], + min_step_width=self._env_opts.get("step_width_lb", None), + max_step_width=self._env_opts.get("step_width_ub", None), + position=np.array([0.0, 0.0, 0.0]), + static_friction=self._env_opts["static_friction"], + dynamic_friction=self._env_opts["dynamic_friction"], + restitution=self._env_opts["restitution"], + n_steps=25, + area_factor=0.7, + random_n_steps=False + ) + # apply a custom checker material to the terrain primitives + mat_path = self._ensure_lightblue_checker_material() + self._apply_checker_material_to_terrain(terrain_root_path=terrain_prim_path, material_path=mat_path) + self._add_checker_overlay_plane(terrain_root_path=terrain_prim_path, material_path=mat_path) + self._add_checker_overlays_on_tiles(terrain_root_path=terrain_prim_path, material_path=mat_path) + else: + ground_type=self._env_opts["ground_type"] + Journal.log(self.__class__.__name__, + "_configure_scene", + f"Terrain type {ground_type} not supported. Will default to flat ground.", + LogType.EXCEP, + throw_when_excep = True) + + # add offsets to intial height depending on the terrain heightmap + if self.terrain_generator is not None: + stage = get_current_stage() + up_axis = UsdGeom.GetStageUpAxis(stage) + + spacing = self._env_opts["env_spacing"] + num_envs = self._num_envs + + num_per_row = max(1, int(np.sqrt(num_envs))) + num_rows = int(np.ceil(num_envs / num_per_row)) + num_cols = int(np.ceil(num_envs / num_rows)) + + row_offset = 0.5 * spacing * (num_rows - 1) + col_offset = 0.5 * spacing * (num_cols - 1) + + offsets = np.array(self._env_opts["cloning_offset"], dtype=float) + + for env_idx in range(num_envs): + row = env_idx // num_cols + col = env_idx % num_cols + x = row_offset - row * spacing + y = col * spacing - col_offset + + half_extent = self._env_opts["spawn_height_check_half_extent"] + if up_axis == UsdGeom.Tokens.z: + height = self.terrain_generator.get_max_height_in_rect(x, y, half_extent=half_extent) + else: + height = self.terrain_generator.get_max_height_in_rect(x, y, half_extent=half_extent) + + offsets[env_idx][2] += height + self._env_opts["spawn_height_cushion"] + + self._env_opts["cloning_offset"] = offsets + + else: + defaul_prim_path=self._env_opts["ground_plane_prim_path"]+"_default" + self._ground_plane_prim_paths.append(defaul_prim_path) + self._ground_plane=self._scene.add_default_ground_plane(z_position=0, + name="terrain", + prim_path=defaul_prim_path, + static_friction=self._env_opts["static_friction"], + dynamic_friction=self._env_opts["dynamic_friction"], + restitution=self._env_opts["restitution"]) + self._terrain_collision=None + + physics_material=UsdPhysics.MaterialAPI.Apply(self._ground_plane.prim) + physics_material.CreateDynamicFrictionAttr(self._env_opts["dynamic_friction"]) + physics_material.CreateStaticFrictionAttr(self._env_opts["static_friction"]) + physics_material.CreateRestitutionAttr(self._env_opts["restitution"]) + # self._ground_plane.apply_physics_material(physics_material) + + self._terrain_collision=PhysxSchema.PhysxCollisionAPI.Get(get_current_stage(), self._ground_plane.prim_path) + self._terrain_material=UsdPhysics.MaterialAPI.Get(get_current_stage(), self._ground_plane.prim_path) + self._terrain_physix_material=PhysxSchema.PhysxMaterialAPI.Get(get_current_stage(), self._ground_plane.prim_path) + + for i in range(len(self._robot_names)): + robot_name = self._robot_names[i] + + urdf_path = self._robot_urdf_paths[robot_name] + srdf_path = self._robot_srdf_paths[robot_name] + fix_base = self._fix_base[i] + self_collide = self._self_collide[i] + merge_fixed = self._merge_fixed[i] + + self._generate_rob_descriptions(robot_name=robot_name, + urdf_path=urdf_path, + srdf_path=srdf_path) + self._import_urdf(robot_name, + fix_base=fix_base, + self_collide=self_collide, + merge_fixed=merge_fixed) + + self._cloner.clone( + source_prim_path=self._env_opts["template_env_ns"], + prim_paths=self._envs_prim_paths, + replicate_physics=self._env_opts["replicate_physics"], + position_offsets=self._env_opts["cloning_offset"] + ) # we can clone the environment in which all the robos are + + base_link_name=self._env_opts["base_linkname"] + if self._env_opts["deduce_base_link"]: + base_link_name=self._get_baselink_candidate(robot_name=robot_name) + + self._robots_art_views[robot_name] = ArticulationView(name = robot_name + "ArtView", + prim_paths_expr = self._env_opts["envs_ns"] + "/env_.*"+ "/" + robot_name + "/" + base_link_name, + reset_xform_properties=False) + self._robots_articulations[robot_name] = self._scene.add(self._robots_art_views[robot_name]) + + # height grid sensor (terrain may be None if using flat ground) + self._height_sensors[robot_name] = HeightGridSensor( + terrain_utils=self.terrain_generator if not self._env_opts["use_flat_ground"] else None, + grid_size=int(self._env_opts["height_sensor_pixels"]), + resolution=float(self._env_opts["height_sensor_resolution"]), + forward_offset=float(self._env_opts["height_sensor_forward_offset"]), + lateral_offset=float(self._env_opts["height_sensor_lateral_offset"]), + n_envs=self._num_envs, + device=self._device, + dtype=self._dtype) + # ensure shared-data flags are set if a height sensor is active + self._env_opts["height_sensor_pixels"] = int(self._env_opts["height_sensor_pixels"]) + self._env_opts["height_sensor_resolution"] = float(self._env_opts["height_sensor_resolution"]) + self._enable_height_shared = True + self._height_sensor_pixels = self._env_opts["height_sensor_pixels"] + self._height_sensor_resolution = self._env_opts["height_sensor_resolution"] + self._height_vis_step[robot_name] = 0 + if self._env_opts["enable_height_vis"]: + self._height_vis[robot_name] = HeightGridVisualizer( + robot_name=robot_name, + num_envs=self._num_envs, + grid_size=int(self._env_opts["height_sensor_pixels"]), + resolution=float(self._env_opts["height_sensor_resolution"]), + marker_radius=float(self._env_opts["height_vis_radius"]), + forward_offset=float(self._env_opts["height_sensor_forward_offset"]), + lateral_offset=float(self._env_opts["height_sensor_lateral_offset"]), + device=self._device, + dtype=self._dtype) + + self._blink_rigid_prim_views[robot_name] = RigidPrimView(prim_paths_expr=self._env_opts["envs_ns"] + "/env_.*"+ "/" + robot_name + "/" + base_link_name, + name = robot_name + "RigidPrimView") # base link prim views + self._scene.add(self._blink_rigid_prim_views[robot_name]) # need to add so it is properly initialized when resetting world + + # self._robots_geom_prim_views[robot_name] = GeometryPrimView(name = robot_name + "GeomView", + # prim_paths_expr = self._env_ns + "/env*"+ "/" + robot_name, + # # prepare_contact_sensors = True + # ) + # self._robots_geom_prim_views[robot_name].apply_collision_apis() # to be able to apply contact sensors + + # init contact sensors + self._init_contact_sensors(robot_name=robot_name) # IMPORTANT: this has to be called + # after calling the clone() method and initializing articulation views!! + + # self._set_contact_links_material(prim_path="/World/envs/env_0/kyon_no_wheels/lower_leg_1/collisions/mesh_1") + # self._set_contact_links_material(prim_path="/World/envs/env_0/kyon_no_wheels/lower_leg_2/collisions/mesh_1") + # self._set_contact_links_material(prim_path="/World/envs/env_0/kyon_no_wheels/lower_leg_3/collisions/mesh_1") + # self._set_contact_links_material(prim_path="/World/envs/env_0/kyon_no_wheels/lower_leg_4/collisions/mesh_1") + + # filter collisions between default ground plane and custom terrains + # self._cloner.filter_collisions(physicsscene_path = self._physics_context.prim_path, + # collision_root_path = "/World/terrain_collisions", + # prim_paths=[self._ground_plane_prim_paths[1]], + # global_paths=[self._ground_plane_prim_paths[0]] + # ) + + # delete_prim(self._env_opts["ground_plane_prim_path"] + "/SphereLight") # we remove the default spherical light + + # set default camera viewport position and target + camera_position=[4.2, 4.2, 1.5] + camera_target=[0, 0, 0] + # use a single camera prim (configurable) for both viewport and render products + camera_prim = self._env_opts["camera_prim_path"] + self._set_initial_camera_params(camera_position=camera_position, + camera_target=camera_target, + camera_prim_path=camera_prim) + + if self._env_opts["render_to_file"]: + # base output dir + from datetime import datetime + timestamp = datetime.now().strftime("h%H_m%M_s%S_%d_%m_%Y") + self._render_output_dir = f"/tmp/IsaacRenderings/{timestamp}" + res = tuple(int(x) for x in self._env_opts["render_resolution"]) + + # create render product from chosen camera prim + self._render_product = rep.create.render_product(camera_prim, + res, name='rendering_camera') + self._render_writer = rep.WriterRegistry.get("BasicWriter") + self._render_writer.initialize(output_dir=self._render_output_dir, + rgb=True) + self._render_writer.attach([self._render_product]) + # optional top-down capture + if self._env_opts["render_panoramic_cam"]: + td_height = float(self._env_opts["render_panoramic_cam_height"]) + td_dir = self._render_output_dir + "/panoramic_cam" + td_offset = self._env_opts["render_panoramic_cam_target_xy"] + td_target_z = float(self._env_opts["render_panoramic_cam_target_z"]) + pos = [8.0, 11.0, td_height] + self._panoramic_cam_camera = rep.create.camera(focal_length=12, + name='rendering_camera_panoramic_cam', + clipping_range = (1, 200), + position = pos, + look_at = [td_offset[0], td_offset[1], td_target_z]) + self._panoramic_cam_render_product = rep.create.render_product(self._panoramic_cam_camera, + res, name='rendering_camera_panoramic_cam') + self._panoramic_cam_writer = rep.WriterRegistry.get("BasicWriter") + self._panoramic_cam_writer.initialize(output_dir=td_dir, rgb=True) + self._panoramic_cam_writer.attach([self._panoramic_cam_render_product]) + + self.apply_collision_filters(self._physics_context.prim_path, + "/World/collisions") + + self._reset_sim() + + self._fill_robot_info_from_world() + # initializes robot state data + + # update solver options + self._update_art_solver_options() + self._get_solver_info() # get again solver option before printing everything + self._print_envs_info() # debug print + + # for n in range(self._n_init_steps): # run some initialization steps + # self._step_sim() + + self._init_robots_state() + + self.scene_setup_completed = True + + Journal.log(self.__class__.__name__, + "set_up_scene", + "finished scene setup...", + LogType.STAT, + throw_when_excep = True) + + self._is = _sensor.acquire_imu_sensor_interface() + self._dyn_control=_dynamic_control.acquire_dynamic_control_interface() + + def _set_contact_links_material(self, prim_path: str): + prim=get_prim_at_path(prim_path) + physics_material=UsdPhysics.MaterialAPI.Apply(prim) + physics_material.CreateDynamicFrictionAttr(0) + physics_material.CreateStaticFrictionAttr(0) + physics_material.CreateRestitutionAttr(1.0) + physxMaterialAPI=PhysxSchema.PhysxMaterialAPI.Apply(prim) + physxMaterialAPI.CreateFrictionCombineModeAttr().Set("multiply") # average, min, multiply, max + physxMaterialAPI.CreateRestitutionCombineModeAttr().Set("multiply") + + def _get_lightblue_checker_texture_path(self) -> str: + tex_rel_path = os.path.join(os.path.dirname(__file__), "..", "assets", "textures", "ibrido_terrain_texture.png") + return os.path.abspath(tex_rel_path) + + def _ensure_lightblue_checker_material(self): + """Create (or reuse) a light-blue checker material for primitive terrains.""" + stage = get_current_stage() + mat_path = "/World/Looks/IbridoCheckerMaterial" + mat_prim = stage.GetPrimAtPath(mat_path) + if mat_prim.IsValid(): + return mat_path + + texture_path = self._get_lightblue_checker_texture_path() + + material = UsdShade.Material.Define(stage, mat_path) + + st_reader = UsdShade.Shader.Define(stage, f"{mat_path}/PrimvarReader_st") + st_reader.CreateIdAttr("UsdPrimvarReader_float2") + st_reader.CreateInput("varname", Sdf.ValueTypeNames.Token).Set("st") + st_reader.CreateOutput("result", Sdf.ValueTypeNames.Float2) + + uv_transform = UsdShade.Shader.Define(stage, f"{mat_path}/UVTransform") + uv_transform.CreateIdAttr("UsdTransform2d") + # keep UV scale at 1 here; tiling is controlled via mesh UVs + uv_transform.CreateInput("in", Sdf.ValueTypeNames.Float2).ConnectToSource(st_reader.GetOutput("result")) + uv_transform.CreateInput("scale", Sdf.ValueTypeNames.Float2).Set(Gf.Vec2f(1.0, 1.0)) + uv_transform.CreateOutput("result", Sdf.ValueTypeNames.Float2) + + tex = UsdShade.Shader.Define(stage, f"{mat_path}/CheckerTex") + tex.CreateIdAttr("UsdUVTexture") + tex.CreateInput("file", Sdf.ValueTypeNames.Asset).Set(Sdf.AssetPath(texture_path)) + tex.CreateInput("st", Sdf.ValueTypeNames.Float2).ConnectToSource(uv_transform.GetOutput("result")) + tex.CreateInput("wrapS", Sdf.ValueTypeNames.Token).Set("repeat") + tex.CreateInput("wrapT", Sdf.ValueTypeNames.Token).Set("repeat") + tex.CreateInput("minFilter", Sdf.ValueTypeNames.Token).Set("nearest") + tex.CreateInput("magFilter", Sdf.ValueTypeNames.Token).Set("nearest") + # disable mipmaps to avoid blurring sharp edges + tex.CreateInput("mipFilter", Sdf.ValueTypeNames.Token).Set("nearest") + tex.CreateInput("enableMipMap", Sdf.ValueTypeNames.Bool).Set(False) + tex.CreateInput("fallback", Sdf.ValueTypeNames.Color4f).Set(Gf.Vec4f(0.69, 0.85, 1.0, 1.0)) + tex.CreateOutput("rgb", Sdf.ValueTypeNames.Float3) + tex.CreateOutput("a", Sdf.ValueTypeNames.Float) + + pbr = UsdShade.Shader.Define(stage, f"{mat_path}/PBRShader") + pbr.CreateIdAttr("UsdPreviewSurface") + pbr.CreateInput("diffuseColor", Sdf.ValueTypeNames.Color3f).ConnectToSource(tex.GetOutput("rgb")) + pbr.CreateInput("roughness", Sdf.ValueTypeNames.Float).Set(0.45) + pbr.CreateInput("metallic", Sdf.ValueTypeNames.Float).Set(0.0) + pbr.CreateOutput("surface", Sdf.ValueTypeNames.Token) + + material.CreateSurfaceOutput().ConnectToSource(pbr.GetOutput("surface")) + + return mat_path + + def _ensure_groundplane_material(self): + """Guarantee a ground-plane material exists (default checker) and return its path.""" + stage = get_current_stage() + mat_path = "/World/Looks/groundPlaneMaterial" + mat_prim = stage.GetPrimAtPath(mat_path) + if mat_prim.IsValid(): + return mat_path + + # create a temporary default ground plane to spawn the checker material, then delete the geom + tmp_gp_path = "/World/tmp_ground_for_material" + self._scene.add_default_ground_plane(z_position=-1000.0, + name="tmp_ground_for_material", + prim_path=tmp_gp_path, + static_friction=self._env_opts["static_friction"], + dynamic_friction=self._env_opts["dynamic_friction"], + restitution=self._env_opts["restitution"]) + + mat_prim = stage.GetPrimAtPath(mat_path) + prim_utils.delete_prim(tmp_gp_path) + + return mat_path if mat_prim.IsValid() else None + + def _apply_checker_material_to_terrain(self, terrain_root_path: str, material_path: str): + """Bind the checker material to all terrain prims (visual only).""" + + stage = get_current_stage() + mat_prim = stage.GetPrimAtPath(material_path) + if not mat_prim.IsValid(): + return + material = UsdShade.Material(mat_prim) + for prim in stage.Traverse(): + path = prim.GetPath().pathString + if not path.startswith(terrain_root_path): + continue + binding = UsdShade.MaterialBindingAPI.Apply(prim) + binding.Bind(material, UsdShade.Tokens.strongerThanDescendants) + + def _add_checker_overlay_plane(self, terrain_root_path: str, material_path: str): + """Create a thin visual-only mesh with UVs so the checker pattern shows up even on cube prims.""" + stage = get_current_stage() + plane_path = f"{terrain_root_path}/visual_checker" + plane_prim = stage.GetPrimAtPath(plane_path) + if plane_prim.IsValid(): + # if already exists, just (re)bind material + mat_prim = stage.GetPrimAtPath(material_path) + if mat_prim.IsValid(): + UsdShade.MaterialBindingAPI.Apply(plane_prim).Bind(UsdShade.Material(mat_prim), UsdShade.Tokens.strongerThanDescendants) + return + + # try to read base slab dimensions/position to size the overlay + slab_path = terrain_root_path + "_slab" + slab_prim = stage.GetPrimAtPath(slab_path) + center = Gf.Vec3f(0.0, 0.0, 0.0) + width = float(self._env_opts.get("ground_size", 50.0)) + length = width + thickness = 0.1 + if slab_prim.IsValid(): + xformable = UsdGeom.Xformable(slab_prim) + for op in xformable.GetOrderedXformOps(): + if op.GetOpType() == UsdGeom.XformOp.TypeTranslate: + center = Gf.Vec3f(op.Get()) + elif op.GetOpType() == UsdGeom.XformOp.TypeScale: + scale = op.Get() + width = float(scale[0]) + length = float(scale[1]) + thickness = float(scale[2]) + + half_w = 0.5 * width + half_l = 0.5 * length + z = center[2] + 0.5 * thickness + 1e-3 # slightly above the slab to avoid z-fighting + + plane = UsdGeom.Mesh.Define(stage, plane_path) + plane.CreatePointsAttr([ + Gf.Vec3f(center[0] - half_w, center[1] - half_l, z), + Gf.Vec3f(center[0] + half_w, center[1] - half_l, z), + Gf.Vec3f(center[0] + half_w, center[1] + half_l, z), + Gf.Vec3f(center[0] - half_w, center[1] + half_l, z), + ]) + plane.CreateFaceVertexCountsAttr([4]) + plane.CreateFaceVertexIndicesAttr([0, 1, 2, 3]) + plane.CreateDoubleSidedAttr(True) + + # increase tiling density; adjustable via env opt + uv_repeats = max(1, int((width / 10.0) * 3.0)) + primvars = UsdGeom.PrimvarsAPI(plane) + st = primvars.CreatePrimvar("st", Sdf.ValueTypeNames.TexCoord2fArray, UsdGeom.Tokens.vertex) + st.Set(Vt.Vec2fArray([ + Gf.Vec2f(0.0, 0.0), + Gf.Vec2f(uv_repeats, 0.0), + Gf.Vec2f(uv_repeats, uv_repeats), + Gf.Vec2f(0.0, uv_repeats), + ])) + st.SetInterpolation(UsdGeom.Tokens.vertex) + + mat_prim = stage.GetPrimAtPath(material_path) + if mat_prim.IsValid(): + material = UsdShade.Material(mat_prim) + UsdShade.MaterialBindingAPI.Apply(plane.GetPrim()).Bind(material, UsdShade.Tokens.strongerThanDescendants) + + def _add_checker_overlays_on_tiles(self, terrain_root_path: str, material_path: str): + """Add visual quads on top of each tile cube so the checker texture appears on raised steps.""" + stage = get_current_stage() + mat_prim = stage.GetPrimAtPath(material_path) + if not mat_prim.IsValid(): + return + material = UsdShade.Material(mat_prim) + for prim in stage.Traverse(): + path = prim.GetPath().pathString + if not path.startswith(terrain_root_path): + continue + if prim.GetTypeName() != "Cube": + continue + name = path.split("/")[-1] + if "wall" in name or name.endswith("_slab"): + continue + xformable = UsdGeom.Xformable(prim) + center = Gf.Vec3f(0.0, 0.0, 0.0) + scale = Gf.Vec3f(1.0, 1.0, 1.0) + for op in xformable.GetOrderedXformOps(): + if op.GetOpType() == UsdGeom.XformOp.TypeTranslate: + center = Gf.Vec3f(op.Get()) + elif op.GetOpType() == UsdGeom.XformOp.TypeScale: + scale = Gf.Vec3f(op.Get()) + width, length, height = float(scale[0]), float(scale[1]), float(scale[2]) + half_w = 0.5 * width + half_l = 0.5 * length + z = center[2] + 0.5 * height + 1e-3 + overlay_path = f"{path}_checker" + if stage.GetPrimAtPath(overlay_path).IsValid(): + UsdShade.MaterialBindingAPI.Apply(stage.GetPrimAtPath(overlay_path)).Bind(material, UsdShade.Tokens.strongerThanDescendants) + continue + mesh = UsdGeom.Mesh.Define(stage, overlay_path) + mesh.CreatePointsAttr([ + Gf.Vec3f(center[0] - half_w, center[1] - half_l, z), + Gf.Vec3f(center[0] + half_w, center[1] - half_l, z), + Gf.Vec3f(center[0] + half_w, center[1] + half_l, z), + Gf.Vec3f(center[0] - half_w, center[1] + half_l, z), + ]) + mesh.CreateFaceVertexCountsAttr([4]) + mesh.CreateFaceVertexIndicesAttr([0, 1, 2, 3]) + mesh.CreateDoubleSidedAttr(True) + uv_repeats = max(1, int((width / 10.0) * float(self._env_opts.get("checker_uv_density", 3.0)))) + primvars = UsdGeom.PrimvarsAPI(mesh) + st = primvars.CreatePrimvar("st", Sdf.ValueTypeNames.TexCoord2fArray, UsdGeom.Tokens.vertex) + st.Set(Vt.Vec2fArray([ + Gf.Vec2f(0.0, 0.0), + Gf.Vec2f(uv_repeats, 0.0), + Gf.Vec2f(uv_repeats, uv_repeats), + Gf.Vec2f(0.0, uv_repeats), + ])) + st.SetInterpolation(UsdGeom.Tokens.vertex) + UsdShade.MaterialBindingAPI.Apply(mesh.GetPrim()).Bind(material, UsdShade.Tokens.strongerThanDescendants) + + def _is_link(self, prim): + return prim.GetTypeName() == 'Xform' + + def _is_joint(self, prim): + return prim.GetTypeName() == 'PhysicsRevoluteJoint' + + def _create_collision_group(self, group_path, link_paths): + """ + Create a collision group under the given group_path that contains the links. + Args: + group_path (str): Path to create the collision group. + link_paths (List[str]): List of link paths to include in this group. + """ + collision_group = Sdf.PrimSpec( + self._stage.GetRootLayer().GetPrimAtPath(group_path), + group_path.split("/")[-1], + Sdf.SpecifierDef, + "PhysicsCollisionGroup" + ) + # Add the links to the collision group + for link_path in link_paths: + includes_rel = Sdf.RelationshipSpec(collision_group, "collection:colliders:includes", False) + includes_rel.targetPathList.Append(link_path) + + def _add_collision_filter(self, group_path, link1, link2): + """ + Filters collision between two successive links. + + Args: + group_path (str): Path of the collision group. + link1 (str): Path of the first link. + link2 (str): Path of the second link. + """ + # Create a relationship to filter collisions between the two links + filtered_groups = Sdf.RelationshipSpec( + self._stage.GetPrimAtPath(group_path), + "physics:filteredGroups", + False + ) + filtered_groups.targetPathList.Append(link1) + filtered_groups.targetPathList.Append(link2) + + def _render_sim(self, mode="human"): + + if mode == "human": + # follow requested robot/env + if self._env_opts["use_follow_camera"]: + ridx = int(self._env_opts["render_follow_robot_idx"]) + eidx = int(self._env_opts["render_follow_env_idx"]) + if ridx < len(self._robot_names) and eidx < self._num_envs: + rname = self._robot_names[ridx] + pos = self._root_p.get(rname, None) + if pos is not None and pos.shape[0] > eidx: + base = pos[eidx].detach().cpu() + offset = torch.as_tensor(self._env_opts["render_follow_offset"], + device=base.device, dtype=base.dtype) + target_offset = torch.as_tensor(self._env_opts["render_follow_target_offset"], + device=base.device, dtype=base.dtype) + quat = self._root_q.get(rname, None) + if quat is not None and quat.shape[0] > eidx: + q = quat[eidx].detach().cpu() + w, x, y, z = q.unbind(-1) + yaw = torch.atan2(2 * (w * z + x * y), 1 - 2 * (y * y + z * z)) + cy, sy = torch.cos(yaw), torch.sin(yaw) + rot = torch.tensor([[cy, -sy], [sy, cy]], device=base.device, dtype=base.dtype) + offset_xy = torch.matmul(rot, offset[:2]) + target_offset_xy = torch.matmul(rot, target_offset[:2]) + offset = torch.stack((offset_xy[0], offset_xy[1], offset[2])) + target_offset = torch.stack((target_offset_xy[0], target_offset_xy[1], target_offset[2])) + eye = (base + offset).tolist() + target = (base + target_offset).tolist() + set_camera_view(eye=eye, target=target, camera_prim_path=self._env_opts["camera_prim_path"]) + + self._world.render() + # optional height grid visualization + if self._env_opts["enable_height_vis"]: + for robot_name, vis in self._height_vis.items(): + # use latest stored states + if robot_name not in self._height_imgs or robot_name not in self._height_sensors: + continue + heights = self._height_imgs.get(robot_name, None) + if heights is None or heights.numel() == 0: + continue + pos_src = self._root_p.get(robot_name, None) + quat_src = self._root_q.get(robot_name, None) + if pos_src is None or quat_src is None: + continue + step = self._height_vis_step.get(robot_name, 0) + period = max(1, int(self._env_opts["height_vis_update_period"])) + if step % period == 0: + try: + vis.update( + base_positions=pos_src, + base_quats=quat_src, + heights=heights) + except Exception as exc: + print(f"[height_vis] update failed for {robot_name}: {exc}") + self._height_vis_step[robot_name] = step + 1 + return None + elif mode == "rgb_array": + # check if viewport is enabled -- if not, then complain because we won't get any data + if not self._render or not self._record: + exception = f"Cannot render '{mode}' when rendering is not enabled. Please check the provided" + \ + "arguments to the environment class at initialization." + Journal.log(self.__class__.__name__, + "__init__", + exception, + LogType.EXCEP, + throw_when_excep = True) + # obtain the rgb data + rgb_data = self._rgb_annotator.get_data() + # convert to numpy array + rgb_data = np.frombuffer(rgb_data, dtype=np.uint8).reshape(*rgb_data.shape) + # return the rgb data + return rgb_data[:, :, :3] + else: + return None + + def _create_viewport_render_product(self, resolution=None): + """Create a render product of the viewport for rendering.""" + + try: + + # create render product + camera_prim = self._env_opts["camera_prim_path"] + res = resolution + if res is None: + res = tuple(int(x) for x in self._env_opts["render_resolution"]) + self._render_product = rep.create.render_product(camera_prim, res) + # create rgb annotator -- used to read data from the render product + self._rgb_annotator = rep.AnnotatorRegistry.get_annotator("rgb", device="cpu") + self._rgb_annotator.attach([self._render_product]) + self._record = True + except Exception as e: + carb.log_info("omni.replicator.core could not be imported. Skipping creation of render product.") + carb.log_info(str(e)) + + def _close(self): + if self._simulation_app.is_running(): + self._simulation_app.close() + + def _step_world(self): + self._world.step(render=False, step_sim=True) + + if (self._render) and (self._render_step_counter%self._env_opts["rendering_freq"]==0): + # if self._env_opts["render_to_file"]: + # rep.orchestrator.step() + self._render_sim() # manually trigger rendering (World.step(render=True) for some reason + # will step the simulation for a dt==rendering_dt) + self._render_step_counter += 1 + + def _generate_jnt_imp_control(self, robot_name: str): + + jnt_imp_controller = OmniJntImpCntrl(articulation=self._robots_articulations[robot_name], + device=self._device, + dtype=self._dtype, + enable_safety=True, + urdf_path=self._urdf_dump_paths[robot_name], + config_path=self._jnt_imp_config_paths[robot_name], + enable_profiling=False, + debug_checks=self._debug, + override_art_controller=self._override_low_lev_controller) + + return jnt_imp_controller + + def _reset_sim(self): + self._world.reset(soft=False) + + def _reset_state(self, + robot_name: str, + env_indxs: torch.Tensor = None, + randomize: bool = False): + + if env_indxs is not None: + if self._debug: + if self._use_gpu: + if not env_indxs.device.type == "cuda": + error = "Provided env_indxs should be on GPU!" + Journal.log(self.__class__.__name__, + "_step_jnt_imp_control", + error, + LogType.EXCEP, + True) + else: + if not env_indxs.device.type == "cpu": + error = "Provided env_indxs should be on CPU!" + Journal.log(self.__class__.__name__, + "_step_jnt_imp_control", + error, + LogType.EXCEP, + True) + if randomize: + self._randomize_yaw(robot_name=robot_name,env_indxs=env_indxs) + + # root q + self._robots_art_views[robot_name].set_world_poses(positions = self._root_p_default[robot_name][env_indxs, :], + orientations=self._root_q_default[robot_name][env_indxs, :], + indices = env_indxs) + # jnts q + self._robots_art_views[robot_name].set_joint_positions(positions = self._jnts_q_default[robot_name][env_indxs, :], + indices = env_indxs) + # root v and omega + self._robots_art_views[robot_name].set_joint_velocities(velocities = self._jnts_v_default[robot_name][env_indxs, :], + indices = env_indxs) + # jnts v + concatenated_vel = torch.cat((self._root_v_default[robot_name][env_indxs, :], + self._root_omega_default[robot_name][env_indxs, :]), dim=1) + self._robots_art_views[robot_name].set_velocities(velocities = concatenated_vel, + indices = env_indxs) + # jnts eff + self._robots_art_views[robot_name].set_joint_efforts(efforts = self._jnts_eff_default[robot_name][env_indxs, :], + indices = env_indxs) + self._reset_perturbations(robot_name=robot_name, env_indxs=env_indxs) + else: + + if randomize: + self._randomize_yaw(robot_name=robot_name,env_indxs=None) + + # root q + self._robots_art_views[robot_name].set_world_poses(positions = self._root_p_default[robot_name][:, :], + orientations=self._root_q_default[robot_name][:, :], + indices = None) + # jnts q + self._robots_art_views[robot_name].set_joint_positions(positions = self._jnts_q_default[robot_name][:, :], + indices = None) + # root v and omega + self._robots_art_views[robot_name].set_joint_velocities(velocities = self._jnts_v_default[robot_name][:, :], + indices = None) + # jnts v + concatenated_vel = torch.cat((self._root_v_default[robot_name][:, :], + self._root_omega_default[robot_name][:, :]), dim=1) + self._robots_art_views[robot_name].set_velocities(velocities = concatenated_vel, + indices = None) + # jnts eff + self._robots_art_views[robot_name].set_joint_efforts(efforts = self._jnts_eff_default[robot_name][:, :], + indices = None) + self._reset_perturbations(robot_name=robot_name, env_indxs=None) + + # we update the robots state + self._read_root_state_from_robot(env_indxs=env_indxs, + robot_name=robot_name) + self._read_jnts_state_from_robot(env_indxs=env_indxs, + robot_name=robot_name) + + def _reset_perturbations(self, robot_name: str, env_indxs: torch.Tensor = None): + """Clear perturbation state and wrenches for selected envs.""" + if robot_name not in self._pert_active: + return + if env_indxs is None: + self._pert_active[robot_name].zero_() + self._pert_steps_remaining[robot_name].zero_() + self._pert_forces_world[robot_name].zero_() + self._pert_torques_world[robot_name].zero_() + self._pert_det_counter[robot_name].zero_() + else: + self._pert_active[robot_name][env_indxs] = False + self._pert_steps_remaining[robot_name][env_indxs] = 0 + self._pert_forces_world[robot_name][env_indxs, :] = 0 + self._pert_torques_world[robot_name][env_indxs, :] = 0 + self._pert_det_counter[robot_name][env_indxs] = 0 + + def _process_perturbations(self): + + # Iterate over each robot view + for i in range(len(self._robot_names)): + robot_name = self._robot_names[i] + + # Pre-fetch views for code clarity (references, not copies) + active = self._pert_active[robot_name] + steps_rem = self._pert_steps_remaining[robot_name] + forces_world = self._pert_forces_world[robot_name] + torques_world = self._pert_torques_world[robot_name] + planar_only = self._env_opts["pert_planar_only"] if "pert_planar_only" in self._env_opts else False + + # --- 1. Update Active Counters (In-Place) --- + if active.any(): + # In-place subtraction + steps_rem[active] -= 1 + + # --- 2. Reset Finished Perturbations (In-Place) --- + # Logic: Active AND (Steps <= 0) + # Note: Creating 'newly_ended' boolean mask is a tiny unavoidable allocation + newly_ended = active & (steps_rem <= 0) + + if newly_ended.any(): + # Use masked_fill_ for in-place zeroing + active.masked_fill_(newly_ended, False) + forces_world[newly_ended, :]=0 + torques_world[newly_ended, :]=0 + steps_rem.masked_fill_(newly_ended, 0) + + # --- 3. Trigger New Perturbations --- + + det_rate = self._env_opts["det_pert_rate"] + if det_rate: + # deterministic spacing: count physics steps and trigger when threshold reached (if not already active) + det_counter = self._pert_det_counter[robot_name] + det_counter += 1 + trigger_mask = (det_counter >= self._pert_det_steps) & (~active) + else: + # Reuse scratch buffer for probability check + # Assumes self._pert_scratch is (num_envs, 1) pre-allocated + self._pert_scratch[robot_name].uniform_(0.0, 1.0) # used for triggering new perturbations + # Check probs against threshold + # Flatten scratch to (N,) to match 'active' mask + trigger_mask = (self._pert_scratch[robot_name].flatten() < self._pert_wrenches_prob) & (~active) + + if trigger_mask.any(): + + # Cache weights (references) + weight = self._weights[robot_name] + + # we now treat the configured "max_*_impulse_*" as the maximum **impulse** (N·s or N·m·s), + # and convert impulse -> force/torque by dividing by the sampled duration (seconds). + # Use per-robot weight scaling as before. + lin_impulse_max = self._env_opts["max_lin_impulse_norm"] * weight + ang_impulse_max = self._env_opts["max_ang_impulse_norm"] * weight + + # --- Force (Impulse) Direction Generation (Reuse _pert_lindir buffer) --- + lindir = self._pert_lindir[robot_name] # (N, 3) + + if planar_only: + # planar push direction from random yaw + yaw_angles = self._pert_scratch[robot_name].uniform_(0.0, 2*math.pi).flatten() + lindir[:, 0] = torch.cos(yaw_angles) + lindir[:, 1] = torch.sin(yaw_angles) + lindir[:, 2] = 0.0 + else: + # 1. Fill with Standard Normal noise in-place + lindir.normal_() + # 2. Normalize in-place + norms = torch.norm(lindir, dim=1, keepdim=True).clamp_min_(1e-6) + lindir.div_(norms) + + # 3. Sample linear impulse magnitudes (reuse scratch) + # scratch has shape (N,1) - uniform [0,1] + self._pert_scratch[robot_name].uniform_(self._env_opts["lin_impulse_mag_min"], self._env_opts["lin_impulse_mag_max"]) + # impulse vectors = unit_dir * (rand * lin_impulse_max) + + lindir.mul_(self._pert_scratch[robot_name] * lin_impulse_max) # now contains linear impulses (N,3) + + # --- Angular (Impulse) Direction Generation (Reuse _pert_angdir buffer) --- + angdir = self._pert_angdir[robot_name] # (N, 3) + + if planar_only: + angdir.zero_() # no torque when planar-only is requested + else: + # 1. Fill with Standard Normal noise + angdir.normal_() + # 2. Normalize + norms = torch.norm(angdir, dim=1, keepdim=True).clamp_min_(1e-6) + angdir.div_(norms) + + # 3. Sample angular impulse magnitudes (reuse scratch) + self._pert_scratch[robot_name].uniform_(0.0, 1.0) + angdir.mul_(self._pert_scratch[robot_name] * ang_impulse_max) # now contains angular impulses (N,3) + + # --- Duration Generation (Reuse _pert_durations) --- + # Keep integer steps sampling (same shape/device/dtype) + self._pert_durations[robot_name] = torch.randint_like( + self._pert_durations[robot_name], + low=self._pert_min_steps, + high=self._pert_max_steps + 1 + ) + + # --- convert to float + duration_steps = self._pert_durations[robot_name].to(dtype=lindir.dtype, device=lindir.device) + # duration in seconds (shape (N,)) + duration_seconds = duration_steps * self.physics_dt() + # avoid divide-by-zero + duration_seconds = duration_seconds.clamp_min_(1e-6) + + # compute per-step forces/torques = impulse / duration_seconds + # lindir currently holds linear impulses, angdir holds angular impulses + forces_to_apply = lindir / duration_seconds + torques_to_apply = angdir / duration_seconds + + # Optional clipping based on robot weight (min/max) + f_norm = torch.norm(forces_to_apply, dim=1, keepdim=True).clamp_min_(1e-9) + target_norm = f_norm + + f_clip_scale_max = self._env_opts["pert_force_max_weight_scale"] + if f_clip_scale_max > 0.0: + force_max = self._weights[robot_name] * f_clip_scale_max # (N,1) + target_norm = torch.minimum(target_norm, force_max) + + f_clip_scale_min = self._env_opts["pert_force_min_weight_scale"] + if f_clip_scale_min > 0.0: + force_min = self._weights[robot_name] * f_clip_scale_min + target_norm = torch.maximum(target_norm, force_min) + + forces_to_apply = forces_to_apply * (target_norm / f_norm) + + t_clip_scale = self._env_opts["pert_torque_max_weight_scale"] + if t_clip_scale > 0.0: + torque_max = self._weights[robot_name] * self._env_opts["max_ang_impulse_lever"] * t_clip_scale + t_norm = torch.norm(torques_to_apply, dim=1, keepdim=True).clamp_min_(1e-9) + t_scale = torch.minimum(torch.ones_like(t_norm), torque_max / t_norm) + torques_to_apply = torques_to_apply * t_scale + + # --- Update State Buffers --- + # Use boolean indexing to scatter only triggered values + active[trigger_mask] = True + steps_rem[trigger_mask] = self._pert_durations[robot_name][trigger_mask, :].flatten() + forces_world[trigger_mask, :] = forces_to_apply[trigger_mask, :] + torques_world[trigger_mask, :] = torques_to_apply[trigger_mask, :] + if det_rate: + det_counter[trigger_mask] = 0 + + # --- 4. Apply Wrenches (Vectorized) --- + # Only call API if there are active perturbations to minimize overhead + + forces_world[~active, :]=0 + torques_world[~active, :]=0 + + self._blink_rigid_prim_views[robot_name].apply_forces_and_torques_at_pos( + forces=forces_world, + torques=torques_world, + positions=None, # body frame origin + is_global=True + ) + + @override + def _pre_step(self): + + if self._env_opts["use_random_pertub"]: + self._process_perturbations() + + success=super()._pre_step() + + return success + + @override + def _pre_step_db(self): + + if self._env_opts["use_random_pertub"]: + self._process_perturbations() + + success=super()._pre_step_db() + + return success + + @override + def _update_contact_state(self, + robot_name: str, + env_indxs: torch.Tensor = None): + + super()._update_contact_state(robot_name, env_indxs) + + if self._env_opts["use_random_pertub"]: + # write APPLIED perturbations to root wrench (mainly for debug) + self.cluster_servers[robot_name].get_state().contact_wrenches_root.set(data=self._pert_forces_world[robot_name][env_indxs, :], + data_type="f", + contact_name="root", + robot_idxs = env_indxs, + gpu=self._use_gpu) + self.cluster_servers[robot_name].get_state().contact_wrenches_root.set(data=self._pert_torques_world[robot_name][env_indxs, :], + data_type="t", + contact_name="root", + robot_idxs = env_indxs, + gpu=self._use_gpu) + + def _post_warmup_validation(self, robot_name: str): + """Validate warmup state: base height, tilt, and (if available) contacts.""" + envs = torch.arange(self._num_envs, device=self._device) + + # terrain height query + def _terrain_height(xy: torch.Tensor): + if self._env_opts["use_flat_ground"] or self.terrain_generator is None: + return torch.zeros((xy.shape[0],), device=xy.device, dtype=self._dtype) + heights = [] + half_extent = self._env_opts["spawn_height_check_half_extent"] + for k in range(xy.shape[0]): + h = self.terrain_generator.get_max_height_in_rect( + float(xy[k, 0]), float(xy[k, 1]), half_extent=half_extent) + heights.append(h) + return torch.as_tensor(heights, device=xy.device, dtype=self._dtype) + + # base height check + base_xy = self._root_p[robot_name][:, 0:2] + base_z = self._root_p[robot_name][:, 2] + ground_z = _terrain_height(base_xy) + margin = float(self._env_opts["spawn_height_cushion"]) + bad_z = base_z < (ground_z + margin) + + # tilt check (angle between base up and world up) + q = self._root_q[robot_name] + # quaternion to up vector + w, x, y, z = q[:, 0], q[:, 1], q[:, 2], q[:, 3] + up = torch.stack([ + 2 * (x * z - w * y), + 2 * (y * z + w * x), + 1 - 2 * (x * x + y * y) + ], dim=1) + cos_tilt = up[:, 2].clamp(-1.0, 1.0) + tilt_thresh_deg = 35.0 + bad_tilt = cos_tilt < math.cos(math.radians(tilt_thresh_deg)) + + # contact check (only if sensors exist) + # bad_contact = torch.zeros_like(base_z, dtype=torch.bool) + # if robot_name in self.omni_contact_sensors and self.omni_contact_sensors[robot_name] is not None: + # counts = torch.zeros((self._num_envs,), dtype=torch.int32, device=self._device) + # for link in self._contact_names.get(robot_name, []): + # f_contact = self._get_contact_f(robot_name=robot_name, + # contact_link=link, + # env_indxs=None) + # if f_contact is None: + # continue + # # use normal component (assume z-up); ignore tangential forces + # active = f_contact[:, 2] > 1e-3 + # counts += active.int() + # bad_contact = counts < 3 + + failing = torch.nonzero(bad_z | bad_tilt, as_tuple=False).flatten() + if failing.numel() > 0: + # remediate: lift to terrain+margin, upright (preserve yaw), zero root velocities + # yaw = torch.atan2(2 * (w * z + x * y), 1 - 2 * (y * y + z * z)) + # safe_z = (ground_z + margin)[failing] + # self._root_p[robot_name][failing, 2] = safe_z + # cos_h = torch.cos(yaw[failing] / 2) + # sin_h = torch.sin(yaw[failing] / 2) + # upright = torch.zeros((failing.shape[0], 4), device=self._device, dtype=self._dtype) + # upright[:, 0] = cos_h + # upright[:, 3] = sin_h + # self._root_q[robot_name][failing, :] = upright + # self._root_v[robot_name][failing, :] = 0.0 + # self._root_omega[robot_name][failing, :] = 0.0 + + msgs = [] # print db message + if bad_z.any(): + msgs.append(f"low_z envs {torch.nonzero(bad_z, as_tuple=False).flatten().tolist()}") + if bad_tilt.any(): + msgs.append(f"tilt envs {torch.nonzero(bad_tilt, as_tuple=False).flatten().tolist()}") + Journal.log(self.__class__.__name__, + "_post_warmup_validation", + f"Warmup validation failures for {robot_name}: " + "; ".join(msgs), + LogType.WARN, + throw_when_excep=False) + return failing + + def _import_urdf(self, + robot_name: str, + fix_base = False, + self_collide = False, + merge_fixed = True): + + import_config=_urdf.ImportConfig() + # status,import_config=omni_kit.commands.execute("URDFCreateImportConfig") + + Journal.log(self.__class__.__name__, + "update_root_offsets", + "importing robot URDF", + LogType.STAT, + throw_when_excep = True) + _urdf.acquire_urdf_interface() + # we overwrite some settings which are bound to be fixed + import_config.merge_fixed_joints = merge_fixed # makes sim more stable + # in case of fixed joints with light objects + import_config.import_inertia_tensor = True + # import_config.convex_decomp = False + import_config.fix_base = fix_base + import_config.self_collision = self_collide + # import_config.distance_scale = 1 + # import_config.make_default_prim = True + # import_config.create_physics_scene = True + # import_config.default_drive_strength = 1047.19751 + # import_config.default_position_drive_damping = 52.35988 + # import_config.default_drive_type = _urdf.UrdfJointTargetType.JOINT_DRIVE_POSITION + # import URDF + success, robot_prim_path_default = omni_kit.commands.execute( + "URDFParseAndImportFile", + urdf_path=self._urdf_dump_paths[robot_name], + import_config=import_config, + # get_articulation_root=True, + ) + + robot_base_prim_path = self._env_opts["template_env_ns"] + "/" + robot_name + + if success: + Journal.log(self.__class__.__name__, + "_import_urdf", + "Successfully importedf URDF into IsaacSim", + LogType.STAT) + else: + Journal.log(self.__class__.__name__, + "_import_urdf", + "Failed to import URDF into IsaacSim", + LogType.EXCEP, + throw_when_excep = True) + + # moving default prim to base prim path for cloning + move_prim(robot_prim_path_default, # from + robot_base_prim_path) # to + + robot_base_prim = prim_utils.get_prim_at_path(robot_base_prim_path) + children = prim_utils.get_prim_children(robot_base_prim) + # log imported prim children to the journal (print was getting truncated in logs) + Journal.log(self.__class__.__name__, + "_import_urdf", + f"Imported robot URDF children: {children}", + LogType.STAT) + + # improve collision tesselation for cylinders (e.g., wheels) if requested + # self._apply_collision_refinement(robot_base_prim_path, + # self._env_opts["collision_refinement_level"]) + + return success + + def _apply_collision_refinement(self, robot_base_prim_path: str, refinement_level: int): + """Set refinement level on collision cylinders to avoid coarse faceting.""" + if refinement_level is None: + return + stage = get_current_stage() + coll_prefix = robot_base_prim_path + "/collisions" + count = 0 + for prim in stage.Traverse(): + if not prim.IsValid(): + continue + path_str = prim.GetPath().pathString + if not path_str.startswith(coll_prefix): + continue + if prim.GetTypeName() == "Cylinder": + attr = prim.GetAttribute("refinementLevel") + if not attr.IsValid(): + attr = prim.CreateAttribute("refinementLevel", Sdf.ValueTypeNames.Int) + attr.Set(int(refinement_level)) + count += 1 + Journal.log(self.__class__.__name__, + "_apply_collision_refinement", + f"Applied refinement level {refinement_level} to {count} cylinder collision prims under {coll_prefix}", + LogType.STAT) + + def apply_collision_filters(self, + physicscene_path: str, + coll_root_path: str): + + self._cloner.filter_collisions(physicsscene_path = physicscene_path, + collision_root_path = coll_root_path, + prim_paths=self._envs_prim_paths, + global_paths=self._ground_plane_prim_paths # can collide with these prims + ) + + def _read_root_state_from_robot(self, + robot_name: str, + env_indxs: torch.Tensor = None): + + self._get_root_state(numerical_diff=self._env_opts["use_diff_vels"], + env_indxs=env_indxs, + robot_name=robot_name) + + # height grid sensor readout + if robot_name in self._height_sensors: + pos_src = self._root_p[robot_name] if env_indxs is None else self._root_p[robot_name][env_indxs] + quat_src = self._root_q[robot_name] if env_indxs is None else self._root_q[robot_name][env_indxs] + heights = self._height_sensors[robot_name].read(pos_src, quat_src) + if self._env_opts["use_flat_ground"]: + heights.zero_() + if env_indxs is None: + self._height_imgs[robot_name] = heights + else: + # clone to avoid overlapping write/read views + self._height_imgs[robot_name][env_indxs] = heights.clone() + + # print("height image") + # print(self._height_imgs[robot_name][0, :, : ]) + + def _read_jnts_state_from_robot(self, + robot_name: str, + env_indxs: torch.Tensor = None): + + self._get_robots_jnt_state( + numerical_diff=self._env_opts["use_diff_vels"], + env_indxs=env_indxs, + robot_name=robot_name) + + def _get_root_state(self, + robot_name: str, + env_indxs: torch.Tensor = None, + numerical_diff: bool = False, + base_loc: bool = True): + + # reading = self._is.get_sensor_reading("/World/Cube/Imu_Sensor", + # use_latest_data = True) + + dt=self._cluster_dt[robot_name] # getting diff state always at cluster rate + + # measurements from simulator are in world frame + if env_indxs is not None: + + pose = self._robots_art_views[robot_name].get_world_poses( + clone = True, + indices=env_indxs) # tuple: (pos, quat), quat is [w, i, j, k] in Isaac4.2 + + self._root_p[robot_name][env_indxs, :] = pose[0] + + going_to_fly=self._root_p[robot_name][env_indxs, 0:2]>(self._env_opts["terrain_border"]-0.1) + if going_to_fly.any(): + flying=going_to_fly.sum().item() + warn = f"N. {flying} robots ({robot_name}) are about to go out of the terrain!!" + Journal.log(self.__class__.__name__, + "_get_root_state", + warn, + LogType.WARN, + throw_when_excep = True) + self._root_q[robot_name][env_indxs, :] = pose[1] # root orientation + if not numerical_diff: + # we get velocities from the simulation. This is not good since + # these can actually represent artifacts which do not have physical meaning. + # It's better to obtain them by differentiation to avoid issues with controllers, etc... + self._root_v[robot_name][env_indxs, :] = self._robots_art_views[robot_name].get_linear_velocities( + clone = True, + indices=env_indxs) # root lin. velocity + self._root_omega[robot_name][env_indxs, :] = self._robots_art_views[robot_name].get_angular_velocities( + clone = True, + indices=env_indxs) # root ang. velocity + + # for now obtain root a numerically + self._root_a[robot_name][env_indxs, :] = (self._root_v[robot_name][env_indxs, :] - \ + self._root_v_prev[robot_name][env_indxs, :]) / dt + self._root_alpha[robot_name][env_indxs, :] = (self._root_omega[robot_name][env_indxs, :] - \ + self._root_omega_prev[robot_name][env_indxs, :]) / dt + + self._root_v_prev[robot_name][env_indxs, :] = self._root_v[robot_name][env_indxs, :] + self._root_omega_prev[robot_name][env_indxs, :] = self._root_omega[robot_name][env_indxs, :] + else: + # differentiate numerically + self._root_v[robot_name][env_indxs, :] = (self._root_p[robot_name][env_indxs, :] - \ + self._root_p_prev[robot_name][env_indxs, :]) / dt + self._root_omega[robot_name][env_indxs, :] = quat_to_omega( + self._root_q_prev[robot_name][env_indxs, :], + self._root_q[robot_name][env_indxs, :], + dt) + + self._root_a[robot_name][env_indxs, :] = (self._root_v[robot_name][env_indxs, :] - \ + self._root_v_prev[robot_name][env_indxs, :]) / dt + self._root_alpha[robot_name][env_indxs, :] = (self._root_omega[robot_name][env_indxs, :] - \ + self._root_omega_prev[robot_name][env_indxs, :]) / dt + + # update "previous" data for numerical differentiation + self._root_p_prev[robot_name][env_indxs, :] = self._root_p[robot_name][env_indxs, :] + self._root_q_prev[robot_name][env_indxs, :] = self._root_q[robot_name][env_indxs, :] + self._root_v_prev[robot_name][env_indxs, :] = self._root_v[robot_name][env_indxs, :] + self._root_omega_prev[robot_name][env_indxs, :] = self._root_omega[robot_name][env_indxs, :] + self._track_terrain_hits(robot_name=robot_name, env_indxs=env_indxs) + + else: + # updating data for all environments + pose = self._robots_art_views[robot_name].get_world_poses( + clone = True) # tuple: (pos, quat) + self._root_p[robot_name][:, :] = pose[0] + self._root_q[robot_name][:, :] = pose[1] # root orientation + + going_to_fly=self._root_p[robot_name][:, 0:2]>(self._env_opts["terrain_border"]-0.1) + if going_to_fly.any(): + flying=going_to_fly.sum().item() + warn = f"N. {flying} robots ({robot_name}) are about to go out of the terrain!!" + Journal.log(self.__class__.__name__, + "_get_root_state", + warn, + LogType.WARN, + throw_when_excep = True) + + if not numerical_diff: + # we get velocities from the simulation. This is not good since + # these can actually represent artifacts which do not have physical meaning. + # It's better to obtain them by differentiation to avoid issues with controllers, etc... + self._root_v[robot_name][:, :] = self._robots_art_views[robot_name].get_linear_velocities( + clone = True) # root lin. velocity + self._root_omega[robot_name][:, :] = self._robots_art_views[robot_name].get_angular_velocities( + clone = True) # root ang. velocity + + self._root_a[robot_name][:, :] = (self._root_v[robot_name][:, :] - \ + self._root_v_prev[robot_name][:, :]) / dt + self._root_alpha[robot_name][:, :] = (self._root_omega[robot_name][:, :] - \ + self._root_omega_prev[robot_name][:, :]) / dt + + self._root_v_prev[robot_name][:, :] = self._root_v[robot_name][:, :] + self._root_omega_prev[robot_name][:, :] = self._root_omega[robot_name][:, :] + else: + # differentiate numerically + self._root_v[robot_name][:, :] = (self._root_p[robot_name][:, :] - \ + self._root_p_prev[robot_name][:, :]) / dt + self._root_omega[robot_name][:, :] = quat_to_omega(self._root_q_prev[robot_name][:, :], + self._root_q[robot_name][:, :], + dt) + + self._root_a[robot_name][:, :] = (self._root_v[robot_name][:, :] - \ + self._root_v_prev[robot_name][:, :]) / dt + self._root_alpha[robot_name][:, :] = (self._root_omega[robot_name][:, :] - \ + self._root_omega_prev[robot_name][:, :]) / dt + + # update "previous" data for numerical differentiation + self._root_p_prev[robot_name][:, :] = self._root_p[robot_name][:, :] + self._root_q_prev[robot_name][:, :] = self._root_q[robot_name][:, :] + self._root_v_prev[robot_name][:, :] = self._root_v[robot_name][:, :] + self._root_omega_prev[robot_name][:, :] = self._root_omega[robot_name][:, :] + self._track_terrain_hits(robot_name=robot_name, env_indxs=None) + + if base_loc: + # rotate robot twist in base local + twist_w=torch.cat((self._root_v[robot_name], + self._root_omega[robot_name]), + dim=1) + twist_base_loc=torch.cat((self._root_v_base_loc[robot_name], + self._root_omega_base_loc[robot_name]), + dim=1) + world2base_frame(t_w=twist_w,q_b=self._root_q[robot_name],t_out=twist_base_loc) + self._root_v_base_loc[robot_name]=twist_base_loc[:, 0:3] + self._root_omega_base_loc[robot_name]=twist_base_loc[:, 3:6] + + # rotate robot a in base local + a_w=torch.cat((self._root_a[robot_name], + self._root_alpha[robot_name]), + dim=1) + a_base_loc=torch.cat((self._root_a_base_loc[robot_name], + self._root_alpha_base_loc[robot_name]), + dim=1) + world2base_frame(t_w=a_w,q_b=self._root_q[robot_name],t_out=a_base_loc) + self._root_a_base_loc[robot_name]=a_base_loc[:, 0:3] + self._root_alpha_base_loc[robot_name]=a_base_loc[:, 3:6] + + # rotate gravity in base local + world2base_frame3D(v_w=self._gravity_normalized[robot_name],q_b=self._root_q[robot_name], + v_out=self._gravity_normalized_base_loc[robot_name]) + + def _get_robots_jnt_state(self, + robot_name: str, + env_indxs: torch.Tensor = None, + numerical_diff: bool = False): + + dt= self.physics_dt() if self._override_low_lev_controller else self._cluster_dt[robot_name] + + # measurements from simulator are in world frame + if env_indxs is not None: + + self._jnts_q[robot_name][env_indxs, :] = self._robots_art_views[robot_name].get_joint_positions( + clone = True, + indices=env_indxs) # joint positions + if not numerical_diff: + self._jnts_v[robot_name][env_indxs, :] = self._robots_art_views[robot_name].get_joint_velocities( + clone = True, + indices=env_indxs) # joint velocities + else: + # differentiate numerically + self._jnts_v[robot_name][env_indxs, :] = (self._jnts_q[robot_name][env_indxs, :] - \ + self._jnts_q_prev[robot_name][env_indxs, :]) / dt + # update "previous" data for numerical differentiation + self._jnts_q_prev[robot_name][env_indxs, :] = self._jnts_q[robot_name][env_indxs, :] + + self._jnts_eff[robot_name][env_indxs, :] = self._robots_art_views[robot_name].get_measured_joint_efforts( + clone = True, + joint_indices=None, + indices=env_indxs) # measured joint efforts (computed by joint force solver) + + else: + self._jnts_q[robot_name][:, :] = self._robots_art_views[robot_name].get_joint_positions( + clone = True) # joint positions + if not numerical_diff: + self._jnts_v[robot_name][:, :] = self._robots_art_views[robot_name].get_joint_velocities( + clone = True) # joint velocities + else: + self._jnts_v[robot_name][:, :] = (self._jnts_q[robot_name][:, :] - \ + self._jnts_q_prev[robot_name][:, :]) / dt + + self._jnts_q_prev[robot_name][:, :] = self._jnts_q[robot_name][:, :] + + self._jnts_eff[robot_name][env_indxs, :] = self._robots_art_views[robot_name].get_measured_joint_efforts( + clone = True) # measured joint efforts (computed by joint force solver) + + def _get_contact_f(self, + robot_name: str, + contact_link: str, + env_indxs: torch.Tensor) -> torch.Tensor: + + if self.omni_contact_sensors[robot_name] is not None: + return self.omni_contact_sensors[robot_name].get(dt=self.physics_dt(), + contact_link=contact_link, + env_indxs=env_indxs, + clone=False) + + def _set_jnts_to_homing(self, robot_name: str): + self._robots_art_views[robot_name].set_joints_default_state(positions=self._homing, + velocities = torch.zeros((self._homing.shape[0], self._homing.shape[1]), \ + dtype=self._dtype, device=self._device), + efforts = torch.zeros((self._homing.shape[0], self._homing.shape[1]), \ + dtype=self._dtype, device=self._device)) + + def _set_root_to_defconfig(self, robot_name: str): + self._robots_art_views[robot_name].set_default_state(positions=self._root_p_default[robot_name], + orientations=self._root_q_default[robot_name]) + + def _alter_twist_warmup(self, robot_name: str, env_indxs: torch.Tensor = None): + """Zero angular velocities and joint velocities for the given robot/envs.""" + if env_indxs is None: + twist = self._robots_art_views[robot_name].get_velocities(clone=True) + twist[:, 0] = 0.0 + twist[:, 1] = 0.0 + twist[:, 3:] = 0.0 # zero angular part, preserve current linear + self._robots_art_views[robot_name].set_velocities(velocities=twist, indices=None) + + # jnt_vel=self._robots_art_views[robot_name].get_joint_velocities( + # indices = None, clone=True) + # jnt_vel[:, :] = 0.0 + + # self._robots_art_views[robot_name].set_joint_velocities( + # velocities = jnt_vel, + # indices = None) + + else: + twist = self._robots_art_views[robot_name].get_velocities(clone=True, indices=env_indxs) + # twist[:, 0:2] = 0.0 + twist[:, 0] = 0.0 + twist[:, 1] = 0.0 + twist[:, 3:] = 0.0 + self._robots_art_views[robot_name].set_velocities(velocities=twist, indices=env_indxs) + + # jnt_vel=self._robots_art_views[robot_name].get_joint_velocities( + # indices = env_indxs, clone=True) + # jnt_vel[:, :] = 0.0 + + # self._robots_art_views[robot_name].set_joint_velocities( + # velocities = jnt_vel, + # indices = env_indxs) + + def _get_solver_info(self): + for i in range(0, len(self._robot_names)): + robot_name = self._robot_names[i] + self._solver_position_iteration_counts[robot_name] = self._robots_art_views[robot_name].get_solver_position_iteration_counts() + self._solver_velocity_iteration_counts[robot_name] = self._robots_art_views[robot_name].get_solver_velocity_iteration_counts() + self._solver_stabilization_threshs[robot_name] = self._robots_art_views[robot_name].get_stabilization_thresholds() + + def _update_art_solver_options(self): + + # sets new solver iteration options for specifc articulations + self._get_solver_info() # gets current solver info for the articulations of the + # environments, so that dictionaries are filled properly + + for i in range(0, len(self._robot_names)): + robot_name = self._robot_names[i] + # increase by a factor + self._solver_position_iteration_counts[robot_name] = torch.full((self._num_envs,), self._solver_position_iteration_count) + self._solver_velocity_iteration_counts[robot_name] = torch.full((self._num_envs,), self._solver_velocity_iteration_count) + self._solver_stabilization_threshs[robot_name] = torch.full((self._num_envs,), self._solver_stabilization_thresh) + self._robots_art_views[robot_name].set_solver_position_iteration_counts(self._solver_position_iteration_counts[robot_name]) + self._robots_art_views[robot_name].set_solver_velocity_iteration_counts(self._solver_velocity_iteration_counts[robot_name]) + self._robots_art_views[robot_name].set_stabilization_thresholds(self._solver_stabilization_threshs[robot_name]) + self._get_solver_info() # gets again solver info for articulation, so that it's possible to debug if + # the operation was successful + + def _print_envs_info(self): + + ground_info = f"[Ground info]" + "\n" + \ + "static friction coeff.: " + str(self._terrain_material.GetStaticFrictionAttr().Get()) + "\n" + \ + "dynamics friction coeff.: " + str(self._terrain_material.GetDynamicFrictionAttr().Get()) + "\n" + \ + "restitution coeff.: " + str(self._terrain_material.GetRestitutionAttr().Get()) + "\n" +\ + "friction comb. mode: " + str(self._terrain_physix_material.GetFrictionCombineModeAttr().Get()) + "\n" + \ + "damping comb. mode: " + str(self._terrain_physix_material.GetDampingCombineModeAttr().Get()) + "\n" + \ + "restitution comb. mode: " + str(self._terrain_physix_material.GetRestitutionCombineModeAttr().Get()) + "\n" + + Journal.log(self.__class__.__name__, + "_print_envs_info", + ground_info, + LogType.STAT, + throw_when_excep = True) + + for i in range(0, len(self._robot_names)): + robot_name = self._robot_names[i] + task_info = f"[{robot_name}]" + "\n" + \ + "bodies: " + str(self._robots_art_views[robot_name].body_names) + "\n" + \ + "n. prims: " + str(self._robots_art_views[robot_name].count) + "\n" + \ + "prims names: " + str(self._robots_art_views[robot_name].prim_paths) + "\n" + \ + "n. bodies: " + str(self._robots_art_views[robot_name].num_bodies) + "\n" + \ + "n. dofs: " + str(self._robots_art_views[robot_name].num_dof) + "\n" + \ + "dof names: " + str(self._robots_art_views[robot_name].dof_names) + "\n" + \ + "solver_position_iteration_counts: " + str(self._solver_position_iteration_counts[robot_name]) + "\n" + \ + "solver_velocity_iteration_counts: " + str(self._solver_velocity_iteration_counts[robot_name]) + "\n" + \ + "stabiliz. thresholds: " + str(self._solver_stabilization_threshs[robot_name]) + # print("dof limits: " + str(self._robots_art_views[robot_name].get_dof_limits())) + # print("effort modes: " + str(self._robots_art_views[robot_name].get_effort_modes())) + # print("dof gains: " + str(self._robots_art_views[robot_name].get_gains())) + # print("dof max efforts: " + str(self._robots_art_views[robot_name].get_max_efforts())) + # print("dof gains: " + str(self._robots_art_views[robot_name].get_gains())) + # print("physics handle valid: " + str(self._robots_art_views[robot_name].is_physics_handle_valid()) + Journal.log(self.__class__.__name__, + "_print_envs_info", + task_info, + LogType.STAT, + throw_when_excep = True) + + def _fill_robot_info_from_world(self): + for i in range(0, len(self._robot_names)): + robot_name = self._robot_names[i] + self._robot_bodynames[robot_name] = self._robots_art_views[robot_name].body_names + self._robot_n_links[robot_name] = self._robots_art_views[robot_name].num_bodies + self._robot_n_dofs[robot_name] = self._robots_art_views[robot_name].num_dof + self._robot_dof_names[robot_name] = self._robots_art_views[robot_name].dof_names + + def _set_initial_camera_params(self, + camera_position=[10, 10, 3], + camera_target=[0, 0, 0], + camera_prim_path="/OmniverseKit_Persp"): + set_camera_view(eye=camera_position, + target=camera_target, + camera_prim_path=camera_prim_path) + + def _init_contact_sensors(self, robot_name: str): + self.omni_contact_sensors[robot_name]=None + sensor_radii={} + contact_offsets={} + self._contact_names[robot_name]=self._env_opts["contact_prims"] + for contact_prim in self._env_opts["contact_prims"]: + sensor_radii[contact_prim]=self._env_opts["sensor_radii"] + contact_offsets[contact_prim]=np.array([0.0, 0.0, 0.0]) + if not (len(self._env_opts["contact_prims"])==0): + self.omni_contact_sensors[robot_name]=OmniContactSensors( + name=robot_name, + n_envs=self._num_envs, + contact_prims=self._env_opts["contact_prims"], + contact_offsets=contact_offsets, + sensor_radii=sensor_radii, + device=self._device, + dtype=self._dtype, + enable_debug=self._debug, + filter_paths=self._ground_plane_prim_paths) + self.omni_contact_sensors[robot_name].create_contact_sensors( + self._world, + envs_namespace=self._env_opts["envs_ns"]) + + def _init_robots_state(self): + + self._masses = {} + self._weights = {} + + self._pert_active = {} # bool mask: (num_envs,) + self._pert_steps_remaining = {} # int steps: (num_envs,) + self._pert_forces_world = {} # (num_envs,3) + self._pert_torques_world = {} # (num_envs,3) + self._pert_force_local = {} # (num_envs,3) (if needed) + self._pert_torque_local = {} # (num_envs,3) + self._pert_lindir = {} + self._pert_angdir = {} + self._pert_durations = {} + self._pert_scratch = {} + self._pert_det_counter = {} + + # convert durations in seconds to integer physics steps (min 1 step) + self._pert_min_steps = max(1, int(math.ceil(self._env_opts["pert_wrenches_min_duration"] / self.physics_dt()))) + self._pert_max_steps = max(self._pert_min_steps, int(math.ceil(self._env_opts["pert_wrenches_max_duration"] / self.physics_dt()))) + + pert_wrenches_step_rate=self._env_opts["pert_wrenches_rate"]/self.physics_dt() # 1 pert every n physics steps + self._pert_det_steps = max(1, int(round(pert_wrenches_step_rate))) + self._pert_wrenches_prob=min(1.0, 1.0/pert_wrenches_step_rate) # sampling prob to be used when not deterministic + + self._calc_robot_distrib() + + for i in range(0, len(self._robot_names)): + + robot_name = self._robot_names[i] + + pose = self._robots_art_views[robot_name].get_world_poses( + clone = True) # tuple: (pos, quat) + + # root p (measured, previous, default) + self._root_p[robot_name] = pose[0] + self._root_p_prev[robot_name] = torch.clone(pose[0]) + # print(self._root_p_default[robot_name].device) + self._root_p_default[robot_name] = torch.clone(pose[0]) + self._distr_offset[robot_name] + # root q (measured, previous, default) + self._root_q[robot_name] = pose[1] # root orientation + self._root_q_prev[robot_name] = torch.clone(pose[1]) + self._root_q_default[robot_name] = torch.clone(pose[1]) + # jnt q (measured, previous, default) + self._jnts_q[robot_name] = self._robots_art_views[robot_name].get_joint_positions( + clone = True) # joint positions + self._jnts_q_prev[robot_name] = self._robots_art_views[robot_name].get_joint_positions( + clone = True) + self._jnts_q_default[robot_name] = torch.full((self._jnts_q[robot_name].shape[0], + self._jnts_q[robot_name].shape[1]), + 0.0, + dtype=self._dtype, + device=self._device) + + # root v (measured, default) + self._root_v[robot_name] = self._robots_art_views[robot_name].get_linear_velocities( + clone = True) # root lin. velocityù + self._root_v_base_loc[robot_name] = torch.full_like(self._root_v[robot_name], fill_value=0.0) + self._root_v_prev[robot_name] = torch.full_like(self._root_v[robot_name], fill_value=0.0) + self._root_v_default[robot_name] = torch.full_like(self._root_v[robot_name], fill_value=0.0) + # root omega (measured, default) + self._root_omega[robot_name] = self._robots_art_views[robot_name].get_angular_velocities( + clone = True) # root ang. velocity + self._root_omega_prev[robot_name] = torch.full_like(self._root_omega[robot_name], fill_value=0.0) + + self._root_omega_base_loc[robot_name] = torch.full_like(self._root_omega[robot_name], fill_value=0.0) + self._root_omega_default[robot_name] = torch.full((self._root_omega[robot_name].shape[0], self._root_omega[robot_name].shape[1]), + 0.0, + dtype=self._dtype, + device=self._device) + # root a (measured,) + self._root_a[robot_name] = torch.full_like(self._root_v[robot_name], fill_value=0.0) + self._root_a_base_loc[robot_name] = torch.full_like(self._root_a[robot_name], fill_value=0.0) + self._root_alpha[robot_name] = torch.full_like(self._root_v[robot_name], fill_value=0.0) + self._root_alpha_base_loc[robot_name] = torch.full_like(self._root_alpha[robot_name], fill_value=0.0) + + # height grid sensor storage + grid_sz = int(self._env_opts["height_sensor_pixels"]) + self._height_imgs[robot_name] = torch.zeros((self._num_envs, grid_sz, grid_sz), + dtype=self._dtype, + device=self._device) + + # joints v (measured, default) + self._jnts_v[robot_name] = self._robots_art_views[robot_name].get_joint_velocities( + clone = True) # joint velocities + self._jnts_v_default[robot_name] = torch.full((self._jnts_v[robot_name].shape[0], self._jnts_v[robot_name].shape[1]), + 0.0, + dtype=self._dtype, + device=self._device) + + # joints efforts (measured, default) + self._jnts_eff[robot_name] = torch.full((self._jnts_v[robot_name].shape[0], self._jnts_v[robot_name].shape[1]), + 0.0, + dtype=self._dtype, + device=self._device) + self._jnts_eff_default[robot_name] = torch.full((self._jnts_v[robot_name].shape[0], self._jnts_v[robot_name].shape[1]), + 0.0, + dtype=self._dtype, + device=self._device) + self._root_pos_offsets[robot_name] = torch.zeros((self._num_envs, 3), + device=self._device) # reference position offses + + self._root_q_offsets[robot_name] = torch.zeros((self._num_envs, 4), + device=self._device) + self._root_q_offsets[robot_name][:, 0] = 1.0 # init to valid identity quaternion + + # boolean active flag per env + self._pert_active[robot_name] = torch.zeros((self._num_envs,), dtype=torch.bool, device=self._device) + # remaining steps as integer tensor + self._pert_steps_remaining[robot_name] = torch.zeros((self._num_envs,), dtype=torch.int32, device=self._device) + # world force & torque (N and N*m) stored as floats + self._pert_forces_world[robot_name] = torch.zeros((self._num_envs, 3), dtype=self._dtype, device=self._device) + self._pert_torques_world[robot_name] = torch.zeros((self._num_envs, 3), dtype=self._dtype, device=self._device) + # local frame copies (if you want to store local-frame versions) + self._pert_force_local[robot_name] = torch.zeros((self._num_envs, 3), dtype=self._dtype, device=self._device) + self._pert_torque_local[robot_name] = torch.zeros((self._num_envs, 3), dtype=self._dtype, device=self._device) + + self._pert_lindir[robot_name] = torch.zeros((self._num_envs, 3), dtype=self._dtype, device=self._device) + self._pert_angdir[robot_name] = torch.zeros((self._num_envs, 3), dtype=self._dtype, device=self._device) + + self._pert_durations[robot_name] = torch.zeros((self._num_envs, 1), dtype=torch.int32, device=self._device) + + self._pert_scratch[robot_name] = torch.zeros((self._num_envs, 1), dtype=self._dtype, device=self._device) + self._pert_det_counter[robot_name] = torch.zeros((self._num_envs,), dtype=torch.int32, device=self._device) + + self._masses[robot_name] = torch.sum(self._robots_art_views[robot_name].get_body_masses(clone=True), dim=1).to(dtype=self._dtype, device=self._device) + + self._weights[robot_name] = (self._masses[robot_name] * abs(self._env_opts["gravity"][2].item())).reshape((self._num_envs, 1)) + + def _track_terrain_hits(self, robot_name: str, env_indxs: torch.Tensor = None): + """Track transitions into the terrain boundary margin (1 m) and count hits per env.""" + if self._env_opts["use_flat_ground"]: + return + border = float(self._env_opts["terrain_border"]) + threshold = max(0.0, border - self._terrain_hit_margin) + state = self._terrain_hit_active[robot_name] + counts = self._terrain_hit_counts[robot_name] + if env_indxs is None: + pos_xy = self._root_p[robot_name][:, 0:2] + hitting = torch.any(torch.abs(pos_xy) > threshold, dim=1) + new_hits = (~state) & hitting + if new_hits.any(): + counts[new_hits] += 1 + state.copy_(hitting) + else: + pos_xy = self._root_p[robot_name][env_indxs, 0:2] + hitting = torch.any(torch.abs(pos_xy) > threshold, dim=1) + prev_state = state[env_indxs] + new_hits = (~prev_state) & hitting + if new_hits.any(): + counts[env_indxs[new_hits]] += 1 + state[env_indxs] = hitting + + def _maybe_log_terrain_hits(self): + """Log boundary hits at low frequency only when counters change.""" + if self._env_opts["use_flat_ground"]: + return + period = int(self._env_opts["terrain_hit_log_period"]) + if period <= 0 or (self.step_counter % period != 0): + return + for robot_name in self._robot_names: + active = self._terrain_hit_active.get(robot_name, None) + if active is None: + continue + active_now = int(active.sum().item()) + if active_now == 0: + continue + counts = self._terrain_hit_counts[robot_name] + last = self._terrain_hit_counts_last_logged.get(robot_name, None) + if last is not None and torch.equal(counts, last): + continue + total_hits = int(counts.sum().item()) + msg = f"{active_now} {robot_name} robots within {self._terrain_hit_margin}m of terrain border. Total hits: {total_hits}." + Journal.log(self.__class__.__name__, + "_terrain_hits", + msg, + LogType.WARN, + throw_when_excep = True) + self._terrain_hit_counts_last_logged[robot_name] = counts.clone() + + @override + def _post_world_step(self) -> bool: + res = super()._post_world_step() + self._maybe_log_terrain_hits() + return res + + @override + def _post_world_step_db(self) -> bool: + res = super()._post_world_step_db() + self._maybe_log_terrain_hits() + return res + + def current_tstep(self): + self._world.current_time_step_index + + def world_time(self, robot_name: str): + return self._world.current_time + + def physics_dt(self): + return self._world.get_physics_dt() + + def rendering_dt(self): + return self._env_opts["rendering_dt"] + + def set_physics_dt(self, physics_dt:float): + self._world.set_simulation_dt(physics_dt=physics_dt,rendering_dt=None) + + def set_rendering_dt(self, rendering_dt:float): + self._world.set_simulation_dt(physics_dt=None,rendering_dt=rendering_dt) + + def _robot_jnt_names(self, robot_name: str): + return self._robots_art_views[robot_name].dof_names