File size: 4,433 Bytes
f81aa8a | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 | from __future__ import annotations
import argparse
import json
from pathlib import Path
import cv2
import numpy as np
import pandas as pd
from PIL import Image
from scipy.optimize import least_squares
from scene_tools import create_background_inpaint, kinova_fk_points_and_tool_pose, load_scene_calibration, render_scene
def euler_xyz_deg_to_matrix(theta_x_deg: float, theta_y_deg: float, theta_z_deg: float) -> np.ndarray:
rx, ry, rz = np.deg2rad([theta_x_deg, theta_y_deg, theta_z_deg])
cx, sx = np.cos(rx), np.sin(rx)
cy, sy = np.cos(ry), np.sin(ry)
cz, sz = np.cos(rz), np.sin(rz)
rot_x = np.array([[1, 0, 0], [0, cx, -sx], [0, sx, cx]], dtype=np.float64)
rot_y = np.array([[cy, 0, sy], [0, 1, 0], [-sy, 0, cy]], dtype=np.float64)
rot_z = np.array([[cz, -sz, 0], [sz, cz, 0], [0, 0, 1]], dtype=np.float64)
return rot_z @ rot_y @ rot_x
def solve_ik(target_pose: np.ndarray, q_init_deg: np.ndarray) -> np.ndarray:
target_pos = target_pose[:3].astype(np.float64)
target_rot = euler_xyz_deg_to_matrix(*target_pose[3:6])
q_init = np.asarray(q_init_deg, dtype=np.float64)
def residual(q_deg: np.ndarray) -> np.ndarray:
_, tool = kinova_fk_points_and_tool_pose(q_deg)
pos_err = tool[:3, 3] - target_pos
rot_err_mat = tool[:3, :3].T @ target_rot
rot_err, _ = cv2.Rodrigues(rot_err_mat)
regularize = 0.01 * (q_deg - q_init)
return np.concatenate([12.0 * pos_err, 0.75 * rot_err.reshape(-1), regularize])
result = least_squares(residual, q_init, method="lm", max_nfev=120)
return result.x.astype(np.float64)
def load_target_poses(args: argparse.Namespace) -> tuple[np.ndarray, np.ndarray]:
if args.poses_json:
payload = json.loads(Path(args.poses_json).read_text())
if isinstance(payload, dict) and "actions" in payload:
poses = np.asarray(payload["actions"], dtype=np.float64)
else:
poses = np.asarray(payload, dtype=np.float64)
start_q = np.asarray(args.q_init_deg, dtype=np.float64)
return poses, start_q
sync = pd.read_csv(Path(args.session_root) / "sync_index.csv")
rows = sync.iloc[args.sync_start : args.sync_stop : args.sync_step]
poses = rows[
["tool_x_m", "tool_y_m", "tool_z_m", "tool_theta_x_deg", "tool_theta_y_deg", "tool_theta_z_deg", "gripper_pos"]
].to_numpy(np.float64)
start_q = rows.iloc[0][[f"q{i}_deg" for i in range(1, 8)]].to_numpy(np.float64)
return poses, start_q
def main() -> None:
parser = argparse.ArgumentParser()
parser.add_argument("--session-root", default="/workspace/data/teddybear_raw/session_20260327_165944_bear")
parser.add_argument("--output", default="/workspace/kinova_scene_sim/outputs/rollout.gif")
parser.add_argument("--poses-json", default=None, help="JSON file containing a list of 7D tool poses.")
parser.add_argument(
"--q-init-deg",
nargs=7,
type=float,
default=[0.0008167303, 340.0004272461, 180.0014038086, 213.9954223633, 359.9988708496, 309.9989318848, 90.0014266968],
help="Initial joint seed for IK when using --poses-json.",
)
parser.add_argument("--sync-start", type=int, default=0)
parser.add_argument("--sync-stop", type=int, default=130)
parser.add_argument("--sync-step", type=int, default=8)
parser.add_argument("--fps", type=int, default=5)
args = parser.parse_args()
calibration = load_scene_calibration(args.session_root, sync_row_index=0)
background = create_background_inpaint(calibration.rgb)
poses, q_seed = load_target_poses(args)
frames: list[Image.Image] = []
q_current = q_seed.copy()
for idx, pose in enumerate(poses):
q_current = solve_ik(pose, q_current)
frame = render_scene(calibration, q_current, background=background)
# Mark the target tool center on the rendered frame.
points_world, _ = kinova_fk_points_and_tool_pose(q_current)
_ = points_world # quiet lint-like readers; pose solving already uses the FK.
frames.append(Image.fromarray(frame))
output = Path(args.output)
output.parent.mkdir(parents=True, exist_ok=True)
frames[0].save(
output,
save_all=True,
append_images=frames[1:],
loop=0,
duration=int(1000 / max(args.fps, 1)),
)
print(f"saved rollout gif to {output}")
if __name__ == "__main__":
main()
|