|
|
import numpy as np |
|
|
import time |
|
|
import sys |
|
|
import os |
|
|
|
|
|
import meshcat |
|
|
import meshcat.geometry as g |
|
|
import logging |
|
|
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s") |
|
|
|
|
|
import pinocchio as pin |
|
|
from pinocchio.visualize import MeshcatVisualizer |
|
|
|
|
|
|
|
|
catp_path = os.path.expanduser("~/gitlab/catp") |
|
|
if os.path.exists(catp_path): |
|
|
sys.path.insert(0, catp_path) |
|
|
|
|
|
try: |
|
|
from example_robot_data import load |
|
|
from pynocchio import RobotWrapper |
|
|
PANDA_AVAILABLE = True |
|
|
except ImportError: |
|
|
PANDA_AVAILABLE = False |
|
|
logging.warning("Panda robot libraries not available") |
|
|
|
|
|
|
|
|
class TrajectoryPlanner: |
|
|
""" |
|
|
Manages Panda robot with dual trajectory planning visualization. |
|
|
""" |
|
|
def __init__(self): |
|
|
if not PANDA_AVAILABLE: |
|
|
raise ImportError("Panda robot libraries not available") |
|
|
|
|
|
self.vis = meshcat.Visualizer(zmq_url="tcp://127.0.0.1:6001") |
|
|
logging.info(f"Zmq URL: {self.vis.window.zmq_url}, web URL: {self.vis.window.web_url}") |
|
|
|
|
|
|
|
|
self.panda_pin = load('panda') |
|
|
self.panda_pin.data = self.panda_pin.model.createData() |
|
|
|
|
|
self.panda_tip_pin = "panda_hand_tcp" |
|
|
|
|
|
|
|
|
from planning_utils import find_random_poses_with_distance_pinocchio |
|
|
from planning_utils import compute_capacity_aware_trajectory, caclulate_toppra_trajectory, simulate_toppra |
|
|
from meshcat_shapes import display_frame as display_frame_shapes |
|
|
from meshcat_shapes import display, display_frame |
|
|
|
|
|
self.find_random_poses = find_random_poses_with_distance_pinocchio |
|
|
self.display = display |
|
|
self.display_frame = display_frame |
|
|
self.compute_capacity_aware_trajectory = compute_capacity_aware_trajectory |
|
|
self.caclulate_toppra_trajectory = caclulate_toppra_trajectory |
|
|
self.simulate_toppra = simulate_toppra |
|
|
|
|
|
try: |
|
|
self.panda_pyn = RobotWrapper( |
|
|
robot_wrapper=self.panda_pin, |
|
|
tip=self.panda_tip_pin, |
|
|
open_viewer=False, |
|
|
start_visualisation=True, |
|
|
viewer=self.vis, |
|
|
fix_joints=[ |
|
|
self.panda_pin.model.getJointId("panda_finger_joint1"), |
|
|
self.panda_pin.model.getJointId("panda_finger_joint2") |
|
|
] |
|
|
) |
|
|
self.viz = self.panda_pyn.viz |
|
|
self.viz.display_collisions = False |
|
|
except Exception as e: |
|
|
logging.error(f"Error initializing robot: {e}") |
|
|
raise |
|
|
|
|
|
|
|
|
self.vis["/Background"].set_property("top_color", [1, 1, 1]) |
|
|
self.vis["/Background"].set_property("bottom_color", [1, 1, 1]) |
|
|
self.vis["/Axes"].set_property("visible", False) |
|
|
self.vis['/Grid'].set_property("visible", True) |
|
|
|
|
|
|
|
|
q_min, q_max = self.panda_pyn.model.lowerPositionLimit, self.panda_pyn.model.upperPositionLimit |
|
|
dq_max = self.panda_pyn.model.velocityLimit |
|
|
ddq_max = np.array([15, 7.5, 10, 12.5, 15, 20, 20]) |
|
|
dddq_max = np.array([7500, 3750, 5000, 6250, 7500, 10000, 10000]) |
|
|
t_max = np.array([87, 87, 87, 87, 20, 20, 20]) |
|
|
|
|
|
dddx_max = np.array([6500.0, 6500.0, 6500.0]) |
|
|
ddx_max = np.array([13.0, 13, 13]) |
|
|
dx_max = np.array([1.7, 1.7, 1.7]) |
|
|
|
|
|
self.q0 = (self.panda_pyn.model.upperPositionLimit + self.panda_pyn.model.lowerPositionLimit) / 2 |
|
|
|
|
|
self.limits = { |
|
|
'q_min': q_min, 'q_max': q_max, |
|
|
'dq_max': dq_max, 'ddq_max': ddq_max, |
|
|
'dddq_max': dddq_max, 't_max': t_max, |
|
|
'dx_max': dx_max, 'ddx_max': ddx_max, |
|
|
'dddx_max': dddx_max |
|
|
} |
|
|
|
|
|
self.data = None |
|
|
self.data_top = None |
|
|
self.ts_sample = None |
|
|
self.qs_sample = None |
|
|
self.X_init = None |
|
|
self.X_final = None |
|
|
self.T_shift = None |
|
|
|
|
|
|
|
|
self._setup_dual_robots() |
|
|
|
|
|
def _setup_dual_robots(self): |
|
|
"""Setup two robots side by side for comparison""" |
|
|
self.panda_ours = self.panda_pyn.robot |
|
|
self.panda_toppra = self.panda_pyn.robot |
|
|
|
|
|
self.panda_toppra.data = self.panda_toppra.model.createData() |
|
|
|
|
|
self.viz_l = self.panda_pyn.viz |
|
|
self.viz_r = MeshcatVisualizer( |
|
|
self.panda_toppra.model, |
|
|
self.panda_toppra.collision_model, |
|
|
self.panda_toppra.visual_model |
|
|
) |
|
|
|
|
|
self.viz_r.initViewer(open=False, viewer=self.vis) |
|
|
self.viz_r.loadViewerModel("toppra", color=[0.0, 0.0, 0.0, 0.5]) |
|
|
|
|
|
|
|
|
self.T_shift = np.eye(4) |
|
|
self.T_shift[1, 3] = 0.5 |
|
|
self.vis["toppra"].set_transform(self.T_shift) |
|
|
|
|
|
|
|
|
self.display(self.viz_l, self.panda_ours, self.panda_tip_pin, |
|
|
"end_effector_ruc", self.q0) |
|
|
self.display(self.viz_r, self.panda_toppra, self.panda_tip_pin, |
|
|
"end_effector_toppra", self.q0, self.T_shift) |
|
|
|
|
|
def generate_trajectory(self, traj_length=0.8, scale=0.5, progress=None): |
|
|
"""Generate a random trajectory and compute both algorithms""" |
|
|
logging.info(f"Generating trajectory: length={traj_length}, scale={scale}") |
|
|
|
|
|
n_waypoints = int(traj_length / 0.05) |
|
|
q0 = (self.panda_pyn.model.upperPositionLimit + self.panda_pyn.model.lowerPositionLimit) / 2 |
|
|
|
|
|
if progress is not None: |
|
|
progress(0.1, desc="Finding random trajectory...") |
|
|
|
|
|
|
|
|
self.X_init, self.X_final, q_line = self.find_random_poses( |
|
|
robot=self.panda_pyn, |
|
|
distance=traj_length, |
|
|
q0=q0, |
|
|
verify_line=True, |
|
|
n_waypoints=n_waypoints, |
|
|
angle=np.pi/2 |
|
|
) |
|
|
|
|
|
|
|
|
self.display(self.viz_l, self.panda_ours, self.panda_tip_pin, |
|
|
"end_effector_ruc", q_line[0]) |
|
|
self.display(self.viz_r, self.panda_toppra, self.panda_tip_pin, |
|
|
"end_effector_toppra", q_line[0]) |
|
|
self.display_frame(self.viz_l, "start", self.X_init.np) |
|
|
self.display_frame(self.viz_l, "end", self.X_final.np) |
|
|
self.display_frame(self.viz_r, "start1", self.T_shift@self.X_init.np) |
|
|
self.display_frame(self.viz_r, "end1", self.T_shift@self.X_final.np) |
|
|
|
|
|
|
|
|
line_start = self.X_init.np[:3, 3] |
|
|
line_end = self.X_final.np[:3, 3] |
|
|
line_vertices = np.column_stack([line_start, line_end]) |
|
|
|
|
|
self.vis["trajectory_line"].set_object( |
|
|
g.Line(g.PointsGeometry(line_vertices), |
|
|
g.MeshBasicMaterial(color=0x0000ff, linewidth=2)) |
|
|
) |
|
|
|
|
|
|
|
|
line_start_shifted = (self.T_shift @ self.X_init.np)[:3, 3] |
|
|
line_end_shifted = (self.T_shift @ self.X_final.np)[:3, 3] |
|
|
line_vertices_shifted = np.column_stack([line_start_shifted, line_end_shifted]) |
|
|
|
|
|
self.vis["trajectory_line_toppra"].set_object( |
|
|
g.Line(g.PointsGeometry(line_vertices_shifted), |
|
|
g.MeshBasicMaterial(color=0x0000ff, linewidth=2)) |
|
|
) |
|
|
|
|
|
if progress is not None: |
|
|
progress(0.3, desc="Calculating capacity aware trajectory") |
|
|
|
|
|
|
|
|
options = { |
|
|
'Kp': 600, 'Kd': 150, 'Ki': 0.0, 'Tf': 0.01, |
|
|
'uptate_current_position': True, 'clamp_velocity': True, |
|
|
'clamp_min_accel': True, 'scaled_qp_limits': True, |
|
|
'override_acceleration': True, 'scale_limits': True, |
|
|
'calculate_limits': True, 'downsampling_ratio': 1, |
|
|
'use_manip_grad': False, 'manip_grad_w': 5000.0, |
|
|
'dt': 0.001, 'qp_form': 'acceleration' |
|
|
} |
|
|
|
|
|
|
|
|
logging.info("Computing capacity-aware trajectory...") |
|
|
self.data = self.compute_capacity_aware_trajectory( |
|
|
self.X_init, self.X_final, |
|
|
robot=None, robot_pyn=self.panda_pyn, |
|
|
lims=self.limits, scale=scale, |
|
|
options=options, q0=q0 |
|
|
) |
|
|
|
|
|
if progress is not None: |
|
|
progress(0.6, desc="Calculating TOPPRA trajectory") |
|
|
|
|
|
|
|
|
logging.info("Computing TOPPRA trajectory...") |
|
|
self.ts_sample, self.qs_sample, qds_sample, qdds_sample = self.caclulate_toppra_trajectory( |
|
|
self.X_init, self.X_final, |
|
|
robot_pyn=self.panda_pyn, q0=q0, |
|
|
d_waypoint=0.05, lims=self.limits, scale=scale |
|
|
) |
|
|
|
|
|
self.data_top = self.simulate_toppra( |
|
|
self.X_init, self.X_final, |
|
|
self.ts_sample, self.qs_sample, qds_sample, qdds_sample, |
|
|
q0=q0, robot_pyn=self.panda_pyn, |
|
|
lims=self.limits, scale=scale, options=options |
|
|
) |
|
|
|
|
|
|
|
|
self.display(self.viz_l, self.panda_ours, self.panda_tip_pin, |
|
|
"end_effector_ruc", self.data.qr_list[0]) |
|
|
self.display(self.viz_r, self.panda_toppra, self.panda_tip_pin, |
|
|
"end_effector_toppra", self.data_top.qr_list[0]) |
|
|
|
|
|
self.display_frame(self.viz_l, "start", self.X_init.np) |
|
|
self.display_frame(self.viz_l, "end", self.X_final.np) |
|
|
self.display_frame(self.viz_r, "start1", self.T_shift @ self.X_init) |
|
|
self.display_frame(self.viz_r, "end1", self.T_shift @ self.X_final) |
|
|
|
|
|
return { |
|
|
'toppra_duration': float(self.ts_sample[-1]), |
|
|
'ours_duration': float(self.data.t_ruckig[-1]), |
|
|
'waypoints': n_waypoints |
|
|
} |
|
|
|
|
|
def get_animation_data(self): |
|
|
"""Get data needed for animation""" |
|
|
if self.data is None or self.data_top is None: |
|
|
return None |
|
|
|
|
|
return { |
|
|
't_max': max(self.data.t_ruckig[-1], self.data_top.t_toppra[-1]), |
|
|
't_ruckig': self.data.t_ruckig, |
|
|
't_toppra': self.data_top.t_toppra, |
|
|
'qr_list': self.data.qr_list, |
|
|
'qs_sample': self.qs_sample, |
|
|
} |
|
|
|
|
|
def update_animation(self, t_current): |
|
|
"""Update robot positions for given time""" |
|
|
if self.data is None or self.data_top is None: |
|
|
return |
|
|
|
|
|
|
|
|
ind_t = 0 |
|
|
while ind_t < len(self.data_top.t_toppra) - 1 and self.data_top.t_toppra[ind_t] <= t_current: |
|
|
ind_t += 1 |
|
|
|
|
|
ind_r = 0 |
|
|
while ind_r < len(self.data.t_ruckig) - 1 and self.data.t_ruckig[ind_r] <= t_current: |
|
|
ind_r += 1 |
|
|
|
|
|
|
|
|
self.display(self.viz_l, self.panda_ours, self.panda_tip_pin, |
|
|
"end_effector_ruc", self.data.qr_list[ind_r]) |
|
|
self.display(self.viz_r, self.panda_toppra, self.panda_tip_pin, |
|
|
"end_effector_toppra", self.qs_sample[ind_t], self.T_shift) |
|
|
|
|
|
def get_plot_data(self): |
|
|
"""Get data for plotting""" |
|
|
if self.data is None or self.data_top is None: |
|
|
return None |
|
|
|
|
|
return { |
|
|
'ours': { |
|
|
't': self.data.t_ruckig[1:], |
|
|
'x': np.array(self.data.x_list[1:]).flatten().tolist(), |
|
|
'dx': self.data.dx_q_list[1:], |
|
|
'ddx': self.data.ddx_q_list[1:], |
|
|
'dddx': self.data.dddx_q_list[1:], |
|
|
'dx_max': self.data.dx_max_list[1:], |
|
|
'ddx_max': self.data.ddx_max_list[1:], |
|
|
'ddx_min': self.data.ddx_min_list[1:], |
|
|
'dddx_max': self.data.dddx_max_list[1:], |
|
|
'dddx_min': self.data.dddx_min_list[1:], |
|
|
'e_pos': (np.array(self.data.e_pos_list_ruckig) * 1e3), |
|
|
'e_rot': (np.rad2deg(np.array(self.data.e_rot_list_ruckig))), |
|
|
}, |
|
|
'toppra': { |
|
|
't': self.data_top.t_toppra, |
|
|
'x': self.data_top.x_top, |
|
|
'dx': self.data_top.dx_top, |
|
|
'ddx': self.data_top.ddx_top, |
|
|
'dddx': self.data_top.dddx_top, |
|
|
'ds_max': self.data_top.ds_max_list, |
|
|
'dds_min': self.data_top.dds_min_list, |
|
|
'dds_max': self.data_top.dds_max_list, |
|
|
'ddds_max': self.data_top.ddds_max_list, |
|
|
'ddds_min': self.data_top.ddds_min_list, |
|
|
'e_pos': (np.array(self.data_top.e_pos_list) * 1e3), |
|
|
'e_rot': (np.rad2deg(np.array(self.data_top.e_rot_list))), |
|
|
} |
|
|
} |
|
|
|
|
|
def iframe(self, width="100%", height=640): |
|
|
return f'<iframe src="/static/" style="width:{width};height:{height}px;border:0"></iframe>' |
|
|
|
|
|
|