Spaces:
Sleeping
Sleeping
| # Copyright (c) 2022 NVIDIA CORPORATION. All rights reserved. | |
| # NVIDIA CORPORATION and its licensors retain all intellectual property | |
| # and proprietary rights in and to this software, related documentation | |
| # and any modifications thereto. Any use, reproduction, disclosure or | |
| # distribution of this software and related documentation without an express | |
| # license agreement from NVIDIA CORPORATION is strictly prohibited. | |
| import argparse | |
| import os | |
| from enum import Enum | |
| from typing import Tuple | |
| import numpy as np | |
| import warp as wp | |
| import warp.sim | |
| import warp.sim.render | |
| wp.init() | |
| class RenderMode(Enum): | |
| NONE = "none" | |
| OPENGL = "opengl" | |
| USD = "usd" | |
| def __str__(self): | |
| return self.value | |
| class IntegratorType(Enum): | |
| EULER = "euler" | |
| XPBD = "xpbd" | |
| def __str__(self): | |
| return self.value | |
| def compute_env_offsets(num_envs, env_offset=(5.0, 0.0, 5.0), up_axis="Y"): | |
| # compute positional offsets per environment | |
| env_offset = np.array(env_offset) | |
| nonzeros = np.nonzero(env_offset)[0] | |
| num_dim = nonzeros.shape[0] | |
| if num_dim > 0: | |
| side_length = int(np.ceil(num_envs ** (1.0 / num_dim))) | |
| env_offsets = [] | |
| else: | |
| env_offsets = np.zeros((num_envs, 3)) | |
| if num_dim == 1: | |
| for i in range(num_envs): | |
| env_offsets.append(i * env_offset) | |
| elif num_dim == 2: | |
| for i in range(num_envs): | |
| d0 = i // side_length | |
| d1 = i % side_length | |
| offset = np.zeros(3) | |
| offset[nonzeros[0]] = d0 * env_offset[nonzeros[0]] | |
| offset[nonzeros[1]] = d1 * env_offset[nonzeros[1]] | |
| env_offsets.append(offset) | |
| elif num_dim == 3: | |
| for i in range(num_envs): | |
| d0 = i // (side_length * side_length) | |
| d1 = (i // side_length) % side_length | |
| d2 = i % side_length | |
| offset = np.zeros(3) | |
| offset[0] = d0 * env_offset[0] | |
| offset[1] = d1 * env_offset[1] | |
| offset[2] = d2 * env_offset[2] | |
| env_offsets.append(offset) | |
| env_offsets = np.array(env_offsets) | |
| min_offsets = np.min(env_offsets, axis=0) | |
| correction = min_offsets + (np.max(env_offsets, axis=0) - min_offsets) / 2.0 | |
| if isinstance(up_axis, str): | |
| up_axis = "XYZ".index(up_axis.upper()) | |
| correction[up_axis] = 0.0 # ensure the envs are not shifted below the ground plane | |
| env_offsets -= correction | |
| return env_offsets | |
| class Environment: | |
| sim_name: str = "Environment" | |
| frame_dt = 1.0 / 60.0 | |
| episode_duration = 5.0 # seconds | |
| # whether to play the simulation indefinitely when using the OpenGL renderer | |
| continuous_opengl_render: bool = True | |
| sim_substeps_euler: int = 16 | |
| sim_substeps_xpbd: int = 5 | |
| euler_settings = dict() | |
| xpbd_settings = dict() | |
| render_mode: RenderMode = RenderMode.OPENGL | |
| opengl_render_settings = dict() | |
| usd_render_settings = dict(scaling=10.0) | |
| show_rigid_contact_points = False | |
| contact_points_radius = 1e-3 | |
| show_joints = False | |
| # whether OpenGLRenderer should render each environment in a separate tile | |
| use_tiled_rendering = False | |
| # whether to apply model.joint_q, joint_qd to bodies before simulating | |
| eval_fk: bool = True | |
| profile: bool = False | |
| use_graph_capture: bool = wp.get_preferred_device().is_cuda | |
| num_envs: int = 100 | |
| activate_ground_plane: bool = True | |
| integrator_type: IntegratorType = IntegratorType.XPBD | |
| up_axis: str = "Y" | |
| gravity: float = -9.81 | |
| env_offset: Tuple[float, float, float] = (1.0, 0.0, 1.0) | |
| # stiffness and damping for joint attachment dynamics used by Euler | |
| joint_attach_ke: float = 32000.0 | |
| joint_attach_kd: float = 50.0 | |
| # distance threshold at which contacts are generated | |
| rigid_contact_margin: float = 0.05 | |
| # whether each environment should have its own collision group | |
| # to avoid collisions between environments | |
| separate_collision_group_per_env: bool = True | |
| plot_body_coords: bool = False | |
| plot_joint_coords: bool = False | |
| requires_grad: bool = False | |
| # control-related definitions, to be updated by derived classes | |
| control_dim: int = 0 | |
| def __init__(self): | |
| self.parser = argparse.ArgumentParser() | |
| self.parser.add_argument( | |
| "--integrator", | |
| help="Type of integrator", | |
| type=IntegratorType, | |
| choices=list(IntegratorType), | |
| default=self.integrator_type.value, | |
| ) | |
| self.parser.add_argument( | |
| "--visualizer", | |
| help="Type of renderer", | |
| type=RenderMode, | |
| choices=list(RenderMode), | |
| default=self.render_mode.value, | |
| ) | |
| self.parser.add_argument( | |
| "--num_envs", help="Number of environments to simulate", type=int, default=self.num_envs | |
| ) | |
| self.parser.add_argument("--profile", help="Enable profiling", type=bool, default=self.profile) | |
| def parse_args(self): | |
| args = self.parser.parse_args() | |
| self.integrator_type = args.integrator | |
| self.render_mode = args.visualizer | |
| self.num_envs = args.num_envs | |
| self.profile = args.profile | |
| def init(self): | |
| if self.integrator_type == IntegratorType.EULER: | |
| self.sim_substeps = self.sim_substeps_euler | |
| elif self.integrator_type == IntegratorType.XPBD: | |
| self.sim_substeps = self.sim_substeps_xpbd | |
| self.episode_frames = int(self.episode_duration / self.frame_dt) | |
| self.sim_dt = self.frame_dt / self.sim_substeps | |
| self.sim_steps = int(self.episode_duration / self.sim_dt) | |
| if self.use_tiled_rendering and self.render_mode == RenderMode.OPENGL: | |
| # no environment offset when using tiled rendering | |
| self.env_offset = (0.0, 0.0, 0.0) | |
| builder = wp.sim.ModelBuilder() | |
| builder.rigid_contact_margin = self.rigid_contact_margin | |
| try: | |
| articulation_builder = wp.sim.ModelBuilder() | |
| self.create_articulation(articulation_builder) | |
| env_offsets = compute_env_offsets(self.num_envs, self.env_offset, self.up_axis) | |
| for i in range(self.num_envs): | |
| xform = wp.transform(env_offsets[i], wp.quat_identity()) | |
| builder.add_builder( | |
| articulation_builder, xform, separate_collision_group=self.separate_collision_group_per_env | |
| ) | |
| self.bodies_per_env = len(articulation_builder.body_q) | |
| except NotImplementedError: | |
| # custom simulation setup where something other than an articulation is used | |
| self.setup(builder) | |
| self.bodies_per_env = len(builder.body_q) | |
| self.model = builder.finalize() | |
| self.device = self.model.device | |
| if not self.device.is_cuda: | |
| self.use_graph_capture = False | |
| self.model.ground = self.activate_ground_plane | |
| self.model.joint_attach_ke = self.joint_attach_ke | |
| self.model.joint_attach_kd = self.joint_attach_kd | |
| # set up current and next state to be used by the integrator | |
| self.state_0 = None | |
| self.state_1 = None | |
| if self.integrator_type == IntegratorType.EULER: | |
| self.integrator = wp.sim.SemiImplicitIntegrator(**self.euler_settings) | |
| elif self.integrator_type == IntegratorType.XPBD: | |
| self.integrator = wp.sim.XPBDIntegrator(**self.xpbd_settings) | |
| self.renderer = None | |
| if self.profile: | |
| self.render_mode = RenderMode.NONE | |
| if self.render_mode == RenderMode.OPENGL: | |
| self.renderer = wp.sim.render.SimRendererOpenGL( | |
| self.model, | |
| self.sim_name, | |
| up_axis=self.up_axis, | |
| show_rigid_contact_points=self.show_rigid_contact_points, | |
| contact_points_radius=self.contact_points_radius, | |
| show_joints=self.show_joints, | |
| **self.opengl_render_settings, | |
| ) | |
| if self.use_tiled_rendering and self.num_envs > 1: | |
| floor_id = self.model.shape_count - 1 | |
| # all shapes except the floor | |
| instance_ids = np.arange(floor_id, dtype=np.int32).tolist() | |
| shapes_per_env = floor_id // self.num_envs | |
| additional_instances = [] | |
| if self.activate_ground_plane: | |
| additional_instances.append(floor_id) | |
| self.renderer.setup_tiled_rendering( | |
| instances=[ | |
| instance_ids[i * shapes_per_env : (i + 1) * shapes_per_env] + additional_instances | |
| for i in range(self.num_envs) | |
| ] | |
| ) | |
| elif self.render_mode == RenderMode.USD: | |
| filename = os.path.join(os.path.dirname(__file__), "..", "outputs", self.sim_name + ".usd") | |
| self.renderer = wp.sim.render.SimRendererUsd( | |
| self.model, | |
| filename, | |
| up_axis=self.up_axis, | |
| show_rigid_contact_points=self.show_rigid_contact_points, | |
| **self.usd_render_settings, | |
| ) | |
| def create_articulation(self, builder): | |
| raise NotImplementedError | |
| def setup(self, builder): | |
| pass | |
| def customize_model(self, model): | |
| pass | |
| def before_simulate(self): | |
| pass | |
| def after_simulate(self): | |
| pass | |
| def custom_update(self): | |
| pass | |
| def state(self): | |
| # shortcut to current state | |
| return self.state_0 | |
| def update(self): | |
| for i in range(self.sim_substeps): | |
| self.state_0.clear_forces() | |
| self.custom_update() | |
| wp.sim.collide(self.model, self.state_0) | |
| self.integrator.simulate(self.model, self.state_0, self.state_1, self.sim_dt) | |
| self.state_0, self.state_1 = self.state_1, self.state_0 | |
| def render(self, state=None): | |
| if self.renderer is not None: | |
| with wp.ScopedTimer("render", False): | |
| self.render_time += self.frame_dt | |
| self.renderer.begin_frame(self.render_time) | |
| # render state 1 (swapped with state 0 just before) | |
| self.renderer.render(state or self.state_1) | |
| self.renderer.end_frame() | |
| def run(self): | |
| # --------------- | |
| # run simulation | |
| self.sim_time = 0.0 | |
| self.render_time = 0.0 | |
| self.state_0 = self.model.state() | |
| self.state_1 = self.model.state() | |
| if self.eval_fk: | |
| wp.sim.eval_fk(self.model, self.model.joint_q, self.model.joint_qd, None, self.state_0) | |
| self.before_simulate() | |
| if self.renderer is not None: | |
| self.render(self.state_0) | |
| if self.render_mode == RenderMode.OPENGL: | |
| self.renderer.paused = True | |
| profiler = {} | |
| if self.use_graph_capture: | |
| # create update graph | |
| wp.capture_begin() | |
| self.update() | |
| graph = wp.capture_end() | |
| if self.plot_body_coords: | |
| q_history = [] | |
| q_history.append(self.state_0.body_q.numpy().copy()) | |
| qd_history = [] | |
| qd_history.append(self.state_0.body_qd.numpy().copy()) | |
| delta_history = [] | |
| delta_history.append(self.state_0.body_deltas.numpy().copy()) | |
| num_con_history = [] | |
| num_con_history.append(self.model.rigid_contact_inv_weight.numpy().copy()) | |
| if self.plot_joint_coords: | |
| joint_q_history = [] | |
| joint_q = wp.zeros_like(self.model.joint_q) | |
| joint_qd = wp.zeros_like(self.model.joint_qd) | |
| # simulate | |
| with wp.ScopedTimer("simulate", detailed=False, print=False, active=True, dict=profiler): | |
| running = True | |
| while running: | |
| for f in range(self.episode_frames): | |
| if self.use_graph_capture: | |
| wp.capture_launch(graph) | |
| self.sim_time += self.frame_dt | |
| else: | |
| self.update() | |
| self.sim_time += self.frame_dt | |
| if not self.profile: | |
| if self.plot_body_coords: | |
| q_history.append(self.state_0.body_q.numpy().copy()) | |
| qd_history.append(self.state_0.body_qd.numpy().copy()) | |
| delta_history.append(self.state_0.body_deltas.numpy().copy()) | |
| num_con_history.append(self.model.rigid_contact_inv_weight.numpy().copy()) | |
| if self.plot_joint_coords: | |
| wp.sim.eval_ik(self.model, self.state_0, joint_q, joint_qd) | |
| joint_q_history.append(joint_q.numpy().copy()) | |
| self.render() | |
| if self.render_mode == RenderMode.OPENGL and self.renderer.has_exit: | |
| running = False | |
| break | |
| if not self.continuous_opengl_render or self.render_mode != RenderMode.OPENGL: | |
| break | |
| wp.synchronize() | |
| self.after_simulate() | |
| avg_time = np.array(profiler["simulate"]).mean() / self.episode_frames | |
| avg_steps_second = 1000.0 * float(self.num_envs) / avg_time | |
| print(f"envs: {self.num_envs} steps/second {avg_steps_second} avg_time {avg_time}") | |
| if self.renderer is not None: | |
| self.renderer.save() | |
| if self.plot_body_coords: | |
| import matplotlib.pyplot as plt | |
| q_history = np.array(q_history) | |
| qd_history = np.array(qd_history) | |
| delta_history = np.array(delta_history) | |
| num_con_history = np.array(num_con_history) | |
| # find bodies with non-zero mass | |
| body_indices = np.where(self.model.body_mass.numpy() > 0)[0] | |
| body_indices = body_indices[:5] # limit number of bodies to plot | |
| fig, ax = plt.subplots(len(body_indices), 7, figsize=(10, 10), squeeze=False) | |
| fig.subplots_adjust(hspace=0.2, wspace=0.2) | |
| for i, j in enumerate(body_indices): | |
| ax[i, 0].set_title(f"Body {j} Position") | |
| ax[i, 0].grid() | |
| ax[i, 1].set_title(f"Body {j} Orientation") | |
| ax[i, 1].grid() | |
| ax[i, 2].set_title(f"Body {j} Linear Velocity") | |
| ax[i, 2].grid() | |
| ax[i, 3].set_title(f"Body {j} Angular Velocity") | |
| ax[i, 3].grid() | |
| ax[i, 4].set_title(f"Body {j} Linear Delta") | |
| ax[i, 4].grid() | |
| ax[i, 5].set_title(f"Body {j} Angular Delta") | |
| ax[i, 5].grid() | |
| ax[i, 6].set_title(f"Body {j} Num Contacts") | |
| ax[i, 6].grid() | |
| ax[i, 0].plot(q_history[:, j, :3]) | |
| ax[i, 1].plot(q_history[:, j, 3:]) | |
| ax[i, 2].plot(qd_history[:, j, 3:]) | |
| ax[i, 3].plot(qd_history[:, j, :3]) | |
| ax[i, 4].plot(delta_history[:, j, 3:]) | |
| ax[i, 5].plot(delta_history[:, j, :3]) | |
| ax[i, 6].plot(num_con_history[:, j]) | |
| ax[i, 0].set_xlim(0, self.sim_steps) | |
| ax[i, 1].set_xlim(0, self.sim_steps) | |
| ax[i, 2].set_xlim(0, self.sim_steps) | |
| ax[i, 3].set_xlim(0, self.sim_steps) | |
| ax[i, 4].set_xlim(0, self.sim_steps) | |
| ax[i, 5].set_xlim(0, self.sim_steps) | |
| ax[i, 6].set_xlim(0, self.sim_steps) | |
| ax[i, 6].yaxis.get_major_locator().set_params(integer=True) | |
| plt.show() | |
| if self.plot_joint_coords: | |
| import matplotlib.pyplot as plt | |
| joint_q_history = np.array(joint_q_history) | |
| dof_q = joint_q_history.shape[1] | |
| ncols = int(np.ceil(np.sqrt(dof_q))) | |
| nrows = int(np.ceil(dof_q / float(ncols))) | |
| fig, axes = plt.subplots( | |
| ncols=ncols, | |
| nrows=nrows, | |
| constrained_layout=True, | |
| figsize=(ncols * 3.5, nrows * 3.5), | |
| squeeze=False, | |
| sharex=True, | |
| ) | |
| joint_id = 0 | |
| joint_type_names = { | |
| wp.sim.JOINT_BALL: "ball", | |
| wp.sim.JOINT_REVOLUTE: "hinge", | |
| wp.sim.JOINT_PRISMATIC: "slide", | |
| wp.sim.JOINT_UNIVERSAL: "universal", | |
| wp.sim.JOINT_COMPOUND: "compound", | |
| wp.sim.JOINT_FREE: "free", | |
| wp.sim.JOINT_FIXED: "fixed", | |
| wp.sim.JOINT_DISTANCE: "distance", | |
| wp.sim.JOINT_D6: "D6", | |
| } | |
| joint_lower = self.model.joint_limit_lower.numpy() | |
| joint_upper = self.model.joint_limit_upper.numpy() | |
| joint_type = self.model.joint_type.numpy() | |
| while joint_id < len(joint_type) - 1 and joint_type[joint_id] == wp.sim.JOINT_FIXED: | |
| # skip fixed joints | |
| joint_id += 1 | |
| q_start = self.model.joint_q_start.numpy() | |
| qd_start = self.model.joint_qd_start.numpy() | |
| qd_i = qd_start[joint_id] | |
| for dim in range(ncols * nrows): | |
| ax = axes[dim // ncols, dim % ncols] | |
| if dim >= dof_q: | |
| ax.axis("off") | |
| continue | |
| ax.grid() | |
| ax.plot(joint_q_history[:, dim]) | |
| if joint_type[joint_id] != wp.sim.JOINT_FREE: | |
| lower = joint_lower[qd_i] | |
| if abs(lower) < 2 * np.pi: | |
| ax.axhline(lower, color="red") | |
| upper = joint_upper[qd_i] | |
| if abs(upper) < 2 * np.pi: | |
| ax.axhline(upper, color="red") | |
| joint_name = joint_type_names[joint_type[joint_id]] | |
| ax.set_title(f"$\\mathbf{{q_{{{dim}}}}}$ ({self.model.joint_name[joint_id]} / {joint_name} {joint_id})") | |
| if joint_id < self.model.joint_count - 1 and q_start[joint_id + 1] == dim + 1: | |
| joint_id += 1 | |
| qd_i = qd_start[joint_id] | |
| else: | |
| qd_i += 1 | |
| plt.tight_layout() | |
| plt.show() | |
| return 1000.0 * float(self.num_envs) / avg_time | |
| def run_env(Demo): | |
| demo = Demo() | |
| demo.parse_args() | |
| if demo.profile: | |
| import matplotlib.pyplot as plt | |
| env_count = 2 | |
| env_times = [] | |
| env_size = [] | |
| for i in range(15): | |
| demo.num_envs = env_count | |
| demo.init() | |
| steps_per_second = demo.run() | |
| env_size.append(env_count) | |
| env_times.append(steps_per_second) | |
| env_count *= 2 | |
| # dump times | |
| for i in range(len(env_times)): | |
| print(f"envs: {env_size[i]} steps/second: {env_times[i]}") | |
| # plot | |
| plt.figure(1) | |
| plt.plot(env_size, env_times) | |
| plt.xscale("log") | |
| plt.xlabel("Number of Envs") | |
| plt.yscale("log") | |
| plt.ylabel("Steps/Second") | |
| plt.show() | |
| else: | |
| demo.init() | |
| return demo.run() | |