| from __future__ import annotations |
|
|
| import argparse |
| import json |
| from pathlib import Path |
|
|
| import cv2 |
| import numpy as np |
| import pandas as pd |
| from PIL import Image |
| from scipy.optimize import least_squares |
|
|
| from scene_tools import create_background_inpaint, kinova_fk_points_and_tool_pose, load_scene_calibration, render_scene |
|
|
|
|
| def euler_xyz_deg_to_matrix(theta_x_deg: float, theta_y_deg: float, theta_z_deg: float) -> np.ndarray: |
| rx, ry, rz = np.deg2rad([theta_x_deg, theta_y_deg, theta_z_deg]) |
| cx, sx = np.cos(rx), np.sin(rx) |
| cy, sy = np.cos(ry), np.sin(ry) |
| cz, sz = np.cos(rz), np.sin(rz) |
| rot_x = np.array([[1, 0, 0], [0, cx, -sx], [0, sx, cx]], dtype=np.float64) |
| rot_y = np.array([[cy, 0, sy], [0, 1, 0], [-sy, 0, cy]], dtype=np.float64) |
| rot_z = np.array([[cz, -sz, 0], [sz, cz, 0], [0, 0, 1]], dtype=np.float64) |
| return rot_z @ rot_y @ rot_x |
|
|
|
|
| def solve_ik(target_pose: np.ndarray, q_init_deg: np.ndarray) -> np.ndarray: |
| target_pos = target_pose[:3].astype(np.float64) |
| target_rot = euler_xyz_deg_to_matrix(*target_pose[3:6]) |
| q_init = np.asarray(q_init_deg, dtype=np.float64) |
|
|
| def residual(q_deg: np.ndarray) -> np.ndarray: |
| _, tool = kinova_fk_points_and_tool_pose(q_deg) |
| pos_err = tool[:3, 3] - target_pos |
| rot_err_mat = tool[:3, :3].T @ target_rot |
| rot_err, _ = cv2.Rodrigues(rot_err_mat) |
| regularize = 0.01 * (q_deg - q_init) |
| return np.concatenate([12.0 * pos_err, 0.75 * rot_err.reshape(-1), regularize]) |
|
|
| result = least_squares(residual, q_init, method="lm", max_nfev=120) |
| return result.x.astype(np.float64) |
|
|
|
|
| def load_target_poses(args: argparse.Namespace) -> tuple[np.ndarray, np.ndarray]: |
| if args.poses_json: |
| payload = json.loads(Path(args.poses_json).read_text()) |
| if isinstance(payload, dict) and "actions" in payload: |
| poses = np.asarray(payload["actions"], dtype=np.float64) |
| else: |
| poses = np.asarray(payload, dtype=np.float64) |
| start_q = np.asarray(args.q_init_deg, dtype=np.float64) |
| return poses, start_q |
|
|
| sync = pd.read_csv(Path(args.session_root) / "sync_index.csv") |
| rows = sync.iloc[args.sync_start : args.sync_stop : args.sync_step] |
| poses = rows[ |
| ["tool_x_m", "tool_y_m", "tool_z_m", "tool_theta_x_deg", "tool_theta_y_deg", "tool_theta_z_deg", "gripper_pos"] |
| ].to_numpy(np.float64) |
| start_q = rows.iloc[0][[f"q{i}_deg" for i in range(1, 8)]].to_numpy(np.float64) |
| return poses, start_q |
|
|
|
|
| def main() -> None: |
| parser = argparse.ArgumentParser() |
| parser.add_argument("--session-root", default="/workspace/data/teddybear_raw/session_20260327_165944_bear") |
| parser.add_argument("--output", default="/workspace/kinova_scene_sim/outputs/rollout.gif") |
| parser.add_argument("--poses-json", default=None, help="JSON file containing a list of 7D tool poses.") |
| parser.add_argument( |
| "--q-init-deg", |
| nargs=7, |
| type=float, |
| default=[0.0008167303, 340.0004272461, 180.0014038086, 213.9954223633, 359.9988708496, 309.9989318848, 90.0014266968], |
| help="Initial joint seed for IK when using --poses-json.", |
| ) |
| parser.add_argument("--sync-start", type=int, default=0) |
| parser.add_argument("--sync-stop", type=int, default=130) |
| parser.add_argument("--sync-step", type=int, default=8) |
| parser.add_argument("--fps", type=int, default=5) |
| args = parser.parse_args() |
|
|
| calibration = load_scene_calibration(args.session_root, sync_row_index=0) |
| background = create_background_inpaint(calibration.rgb) |
| poses, q_seed = load_target_poses(args) |
|
|
| frames: list[Image.Image] = [] |
| q_current = q_seed.copy() |
| for idx, pose in enumerate(poses): |
| q_current = solve_ik(pose, q_current) |
| frame = render_scene(calibration, q_current, background=background) |
| |
| points_world, _ = kinova_fk_points_and_tool_pose(q_current) |
| _ = points_world |
| frames.append(Image.fromarray(frame)) |
|
|
| output = Path(args.output) |
| output.parent.mkdir(parents=True, exist_ok=True) |
| frames[0].save( |
| output, |
| save_all=True, |
| append_images=frames[1:], |
| loop=0, |
| duration=int(1000 / max(args.fps, 1)), |
| ) |
| print(f"saved rollout gif to {output}") |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|