Spaces:
Sleeping
Sleeping
| # Copyright (c) 2022 NVIDIA CORPORATION. All rights reserved. | |
| # NVIDIA CORPORATION and its licensors retain all intellectual property | |
| # and proprietary rights in and to this software, related documentation | |
| # and any modifications thereto. Any use, reproduction, disclosure or | |
| # distribution of this software and related documentation without an express | |
| # license agreement from NVIDIA CORPORATION is strictly prohibited. | |
| ########################################################################### | |
| # Example Trajectory Optimization | |
| # | |
| # Shows how to optimize torque trajectories for a simple planar environment | |
| # using Warp's provided Adam optimizer. | |
| # | |
| ########################################################################### | |
| import os | |
| import numpy as np | |
| import warp as wp | |
| import warp.sim | |
| import warp.sim.render | |
| from warp.optim import Adam | |
| wp.init() | |
| def loss_l2( | |
| states: wp.array2d(dtype=wp.float32), targets: wp.array2d(dtype=wp.float32), loss: wp.array(dtype=wp.float32) | |
| ): | |
| i, j = wp.tid() | |
| diff = states[i, j] - targets[i, j] | |
| l = diff * diff | |
| wp.atomic_add(loss, 0, l) | |
| def apply_torque(torques: wp.array(dtype=wp.float32), start_index: int, body_f: wp.array(dtype=wp.spatial_vector)): | |
| fx = torques[start_index + 0] | |
| fz = torques[start_index + 1] | |
| body_f[0] = wp.spatial_vector(0.0, 0.0, 0.0, fx, 0.0, fz) | |
| def save_state(body_q: wp.array(dtype=wp.transform), write_index: int, states: wp.array2d(dtype=wp.float32)): | |
| pos = wp.transform_get_translation(body_q[0]) | |
| states[write_index, 0] = pos[0] | |
| states[write_index, 1] = pos[2] | |
| class Example: | |
| def __init__(self, stage, device=None, verbose=False): | |
| self.verbose = verbose | |
| self.frame_dt = 1.0 / 60.0 | |
| self.episode_frames = 100 | |
| self.sim_substeps = 1 | |
| self.sim_dt = self.frame_dt / self.sim_substeps | |
| self.render_time = 0.0 | |
| self.iter = 0 | |
| builder = wp.sim.ModelBuilder() | |
| self.device = device | |
| # add planar joints | |
| builder = wp.sim.ModelBuilder(gravity=0.0) | |
| builder.add_articulation() | |
| b = builder.add_body(origin=wp.transform()) | |
| builder.add_shape_box(pos=wp.vec3(0.0, 0.0, 0.0), hx=0.5, hy=0.5, hz=0.5, density=100.0, body=b) | |
| # compute reference trajectory | |
| rad = np.linspace(0.0, np.pi * 2, self.episode_frames) | |
| self.ref_traj = np.stack([np.cos(rad), np.sin(rad)], axis=1) | |
| # set initial joint configuration to first reference state | |
| builder.body_q[0] = wp.transform(p=[self.ref_traj[0][0], 0.0, self.ref_traj[0][1]]) | |
| self.ref_traj = wp.array(self.ref_traj, dtype=wp.float32, device=self.device, requires_grad=True) | |
| self.last_traj = wp.empty_like(self.ref_traj) | |
| # finalize model | |
| self.model = builder.finalize(device, requires_grad=True) | |
| self.builder = builder | |
| self.model.ground = False | |
| self.dof_q = self.model.joint_coord_count | |
| self.dof_qd = self.model.joint_dof_count | |
| self.num_bodies = self.model.body_count | |
| self.action_dim = 2 | |
| self.state_dim = 2 | |
| assert self.ref_traj.shape == (self.episode_frames, self.state_dim) | |
| self.integrator = wp.sim.SemiImplicitIntegrator() | |
| # initial guess | |
| self.actions = wp.array( | |
| np.zeros(self.episode_frames * self.action_dim) * 100.0, | |
| dtype=wp.float32, | |
| device=self.device, | |
| requires_grad=True, | |
| ) | |
| self.optimizer = Adam([self.actions], lr=1e2) | |
| self.loss = wp.zeros(1, dtype=wp.float32, device=self.device, requires_grad=True) | |
| self.renderer = wp.sim.render.SimRenderer(self.model, stage, scaling=100.0) | |
| # allocate sim states for trajectory | |
| self.states = [] | |
| for _ in range(self.episode_frames + 1): | |
| self.states.append(self.model.state(requires_grad=True)) | |
| def compute_loss(self): | |
| """ | |
| Advances the system dynamics given the rigid-body state in maximal coordinates and generalized joint torques | |
| [body_q, body_qd, tau]. | |
| """ | |
| self.last_traj.zero_() | |
| for i in range(self.episode_frames): | |
| state = self.states[i] | |
| for _ in range(self.sim_substeps): | |
| next_state = self.model.state(requires_grad=True) | |
| wp.sim.collide(self.model, state) | |
| # apply generalized torques to rigid body here, instead of planar joints | |
| wp.launch( | |
| apply_torque, | |
| 1, | |
| inputs=[self.actions, i * self.action_dim], | |
| outputs=[state.body_f], | |
| device=self.device, | |
| ) | |
| state = self.integrator.simulate(self.model, state, next_state, self.sim_dt, requires_grad=True) | |
| self.states[i + 1] = state | |
| # save state | |
| wp.launch( | |
| save_state, | |
| dim=1, | |
| inputs=[self.states[i + 1].body_q, i], | |
| outputs=[self.last_traj], | |
| device=self.device, | |
| ) | |
| # compute loss | |
| wp.launch( | |
| loss_l2, | |
| dim=self.last_traj.shape, | |
| inputs=[self.last_traj, self.ref_traj], | |
| outputs=[self.loss], | |
| device=self.device, | |
| ) | |
| def update(self): | |
| """Runs a single optimizer iteration""" | |
| self.loss.zero_() | |
| tape = wp.Tape() | |
| with tape: | |
| self.compute_loss() | |
| if self.verbose and (self.iter + 1) % 10 == 0: | |
| print(f"Iter {self.iter+1} Loss: {self.loss.numpy()[0]:.3f}") | |
| tape.backward(loss=self.loss) | |
| # print("action grad", self.actions.grad.numpy()) | |
| assert not np.isnan(self.actions.grad.numpy()).any(), "NaN in gradient" | |
| self.optimizer.step([self.actions.grad]) | |
| tape.zero() | |
| self.iter = self.iter + 1 | |
| def render(self): | |
| for i in range(self.episode_frames): | |
| self.renderer.begin_frame(self.render_time) | |
| self.renderer.render(self.states[i + 1]) | |
| self.renderer.end_frame() | |
| self.render_time += self.frame_dt | |
| if __name__ == "__main__": | |
| import matplotlib.pyplot as plt | |
| stage_path = os.path.join(os.path.dirname(__file__), "outputs/example_sim_trajopt.usd") | |
| example = Example(stage_path, device=wp.get_preferred_device(), verbose=True) | |
| # Optimize | |
| num_iter = 250 | |
| for i in range(num_iter): | |
| example.update() | |
| # Render every 25 iters | |
| if i % 25 == 0: | |
| example.render() | |
| example.renderer.save() | |
| np_states = example.last_traj.numpy() | |
| np_ref = example.ref_traj.numpy() | |
| plt.plot(np_ref[:, 0], np_ref[:, 1], label="Reference Trajectory") | |
| plt.plot(np_states[:, 0], np_states[:, 1], label="Optimized Trajectory") | |
| plt.grid() | |
| plt.legend() | |
| plt.axis("equal") | |
| plt.show() | |
| # | |