| import os |
| import sys |
| import numpy as np |
| import matplotlib |
| matplotlib.use("Agg") |
| import matplotlib.pyplot as plt |
|
|
| from xml.etree import ElementTree as ET |
| from libero.libero import benchmark, get_libero_path |
| import h5py |
| import imageio |
| from PIL import Image |
| from arrowline import * |
|
|
|
|
| def _figure_to_rgb(fig): |
| fig.canvas.draw() |
| canvas_width, canvas_height = fig.canvas.get_width_height() |
| if hasattr(fig.canvas, "tostring_rgb"): |
| buffer = fig.canvas.tostring_rgb() |
| rgb = np.frombuffer(buffer, dtype=np.uint8).reshape(canvas_height, canvas_width, 3) |
| else: |
| buffer = fig.canvas.buffer_rgba() |
| rgba = np.frombuffer(buffer, dtype=np.uint8).reshape(canvas_height, canvas_width, 4) |
| rgb = rgba[..., :3] |
| return rgb |
|
|
|
|
| def _load_demo_data(demo_file, demo_file_for_xml, include_trajectory): |
| with h5py.File(demo_file_for_xml, "r") as f: |
| model_xml = f["data"]["demo_0"].attrs.get("model_file") |
| if isinstance(model_xml, bytes): |
| model_xml = model_xml.decode("utf-8") |
| with h5py.File(demo_file, "r") as f: |
| images = f["data"]["demo_0"]["obs"]["agentview_rgb"][()] |
| eef_positions = None |
| if include_trajectory: |
| obs_group = f["data"]["demo_0"]["obs"] |
| if "ee_pos" in obs_group: |
| eef_positions = obs_group["ee_pos"][()] |
| else: |
| print("Dataset missing 'ee_pos' trajectory; skipping overlay.") |
| return images, eef_positions, model_xml |
|
|
|
|
| def _quat_to_rotation_matrix(quat): |
| q = np.asarray(quat, dtype=np.float64) |
| if q.shape != (4,): |
| raise ValueError("Quaternion must have four components.") |
| norm = np.linalg.norm(q) |
| if norm == 0: |
| raise ValueError("Quaternion norm must be positive.") |
| q = q / norm |
| w, x, y, z = q |
| return np.array( |
| [ |
| [1 - 2 * (y * y + z * z), 2 * (x * y - w * z), 2 * (x * z + w * y)], |
| [2 * (x * y + w * z), 1 - 2 * (x * x + z * z), 2 * (y * z - w * x)], |
| [2 * (x * z - w * y), 2 * (y * z + w * x), 1 - 2 * (x * x + y * y)], |
| ], |
| dtype=np.float64, |
| ) |
|
|
|
|
| def _get_camera_parameters(model_xml, camera_name, image_shape): |
| if model_xml is None: |
| raise ValueError("Model XML is not available in the demonstration file.") |
| root = ET.fromstring(model_xml) |
| camera_elem = root.find(f".//camera[@name='{camera_name}']") |
| if camera_elem is None: |
| raise KeyError(f"Camera '{camera_name}' not found in the MuJoCo model.") |
| pos = np.fromstring(camera_elem.attrib.get("pos", "0 0 0"), sep=" ", dtype=np.float64) |
| if pos.size != 3: |
| raise ValueError(f"Camera '{camera_name}' position must have three components.") |
| quat_attr = camera_elem.attrib.get("quat", "1 0 0 0") |
| quat = np.fromstring(quat_attr, sep=" ", dtype=np.float64) |
| if quat.size != 4: |
| raise ValueError(f"Camera '{camera_name}' quaternion must have four components.") |
| rotation = _quat_to_rotation_matrix(quat) |
| fovy_deg = float(camera_elem.attrib.get("fovy", 45.0)) |
| height, width = image_shape |
| fovy_rad = np.deg2rad(fovy_deg) |
| fy = 0.5 * height / np.tan(0.5 * fovy_rad) |
| fx = fy |
| cx = 0.5 * (width - 1) |
| cy = 0.5 * (height - 1) |
| return { |
| "position": pos, |
| "rotation": rotation, |
| "fx": fx, |
| "fy": fy, |
| "cx": cx, |
| "cy": cy, |
| "width": width, |
| "height": height, |
| } |
|
|
|
|
| def _project_points_to_image(points, camera_params): |
| if points is None or len(points) == 0: |
| return np.empty((0, 2), dtype=np.float64) |
| points = np.asarray(points, dtype=np.float64) |
| rel = points - camera_params["position"] |
| cam_coords = rel @ camera_params["rotation"] |
| depth = -cam_coords[:, 2] |
| with np.errstate(divide="ignore", invalid="ignore"): |
| u = camera_params["fx"] * (cam_coords[:, 0] / depth) + camera_params["cx"] |
| v = camera_params["fy"] * (-cam_coords[:, 1] / depth) + camera_params["cy"] |
| v = camera_params["height"] - 1 - v |
| pixels = np.stack([u, v], axis=1) |
| valid = depth > 1e-6 |
| valid &= np.isfinite(u) & np.isfinite(v) |
| valid &= (u >= 0) & (u <= camera_params["width"] - 1) |
| valid &= (v >= 0) & (v <= camera_params["height"] - 1) |
| pixels[~valid] = np.nan |
| return pixels |
|
|
|
|
| def _create_frame_with_trajectory( |
| image, |
| trajectory_pixels, |
| frame_idx, |
| total_traj_frames, |
| figsize, |
| ): |
| fig, ax = plt.subplots(figsize=figsize) |
| rgb_image = image[..., ::-1] |
| ax.imshow(rgb_image) |
| height, width = rgb_image.shape[:2] |
| ax.set_xlim([0, width - 1]) |
| ax.set_ylim([height - 1, 0]) |
| ax.set_axis_off() |
| |
| if trajectory_pixels is not None and trajectory_pixels.size > 0: |
| executed_range = min(frame_idx + 1, total_traj_frames) |
| ax.plot( |
| trajectory_pixels[:, 0], |
| trajectory_pixels[:, 1], |
| color="#b0bec5", |
| linestyle="--", |
| linewidth=1.5, |
| label="Full path", |
| ) |
| if executed_range > 0: |
| executed_points = trajectory_pixels[:executed_range].copy() |
| executed_valid = np.isfinite(executed_points[:, 0]) & np.isfinite(executed_points[:, 1]) |
| executed_points[~executed_valid] = np.nan |
| ax.plot( |
| executed_points[:, 0], |
| executed_points[:, 1], |
| color="#1976d2", |
| linewidth=2, |
| label="Executed", |
| ) |
| valid = np.isfinite(trajectory_pixels[:, 0]) & np.isfinite(trajectory_pixels[:, 1]) |
| valid_indices = np.flatnonzero(valid) |
| if valid_indices.size > 0: |
| start_point = trajectory_pixels[valid_indices[0]] |
| goal_point = trajectory_pixels[valid_indices[-1]] |
| ax.scatter(start_point[0], start_point[1], color="#2e7d32", s=40, label="Start") |
| ax.scatter(goal_point[0], goal_point[1], color="#c62828", s=40, label="Goal") |
| if total_traj_frames > 0: |
| current_idx = min(frame_idx, total_traj_frames - 1) |
| current_point = trajectory_pixels[current_idx] |
| if np.all(np.isfinite(current_point)): |
| ax.scatter( |
| current_point[0], |
| current_point[1], |
| color="#ffb300", |
| s=60, |
| label="Current", |
| edgecolors="black", |
| linewidths=0.5, |
| ) |
| fig.tight_layout(pad=0.2) |
| frame = _figure_to_rgb(fig) |
| plt.close(fig) |
| return frame |
|
|
|
|
| def _create_frame_with_arrow_traj( |
| image, |
| trajectory_pixels, |
| figsize |
| ): |
| fig, ax = plt.subplots(figsize=figsize) |
| rgb_image = image[..., ::-1] |
| ax.imshow(rgb_image) |
| height, width = rgb_image.shape[:2] |
| ax.set_xlim([0, width - 1]) |
| ax.set_ylim([height - 1, 0]) |
| ax.set_axis_off() |
| |
| |
| |
| |
| |
| |
| |
| arrowline(ax, trajectory_pixels[:, 0], trajectory_pixels[:, 1], |
| style='equal_d', interval=16, arrow_size=1.5, color='b') |
| fig.tight_layout(pad=0.02) |
| frame = _figure_to_rgb(fig) |
| plt.close(fig) |
| return frame |
|
|
|
|
| def generate_task_video( |
| task_index=0, |
| benchmark_name="libero_10", |
| output_video="task_demo.mp4", |
| include_trajectory=True, |
| camera_name="agentview", |
| fps=60, |
| figsize=(4, 4), |
| only_image=False, |
| ): |
| """ |
| Generate a demo video for the specified task, overlaying the end-effector trajectory in image space. |
| |
| Args: |
| task_index: Index of the task within the benchmark. |
| benchmark_name: Name of the benchmark whose dataset should be used. |
| output_video: Output video filename (MP4). |
| include_trajectory: If True, render the end-effector trajectory onto the video. |
| camera_name: Name of the MuJoCo camera that matches the rendered RGB frames. |
| fps: Frames per second for the output video. |
| figsize: Matplotlib figure size used when drawing frames with the trajectory overlay. |
| |
| Returns: |
| HTML widget embedding the generated video for display in notebooks. |
| """ |
| datasets_path = "/home/zechen/Data/Robo/LIBERO_Regen" |
| benchmark_dict = benchmark.get_benchmark_dict() |
| if benchmark_name not in benchmark_dict: |
| raise KeyError( |
| f"Unknown benchmark '{benchmark_name}'. Available keys: {list(benchmark_dict.keys())}" |
| ) |
|
|
| benchmark_instance = benchmark_dict[benchmark_name]() |
| num_tasks = benchmark_instance.get_num_tasks() |
| if task_index >= num_tasks: |
| raise ValueError( |
| f"Task index {task_index} out of range. Benchmark has {num_tasks} tasks." |
| ) |
|
|
| demo_file = os.path.join( |
| datasets_path, |
| benchmark_instance.get_task_demonstration(task_index), |
| ) |
| print("Task name: ", benchmark_instance.get_task_demonstration(task_index)) |
| if not os.path.exists(demo_file): |
| raise FileNotFoundError(f"Demo file not found: {demo_file}") |
|
|
| demo_file_for_xml = os.path.join( |
| "/home/zechen/Data/Robo/OriginalLIBERO", |
| benchmark_instance.get_task_demonstration(task_index), |
| ) |
| if not os.path.exists(demo_file_for_xml): |
| raise FileNotFoundError(f"Demo file for xml not found: {demo_file_for_xml}") |
|
|
| print(f"Using demo file: {demo_file}") |
|
|
| images, eef_positions, model_xml = _load_demo_data(demo_file, demo_file_for_xml, include_trajectory) |
|
|
| has_traj_data = include_trajectory and eef_positions is not None |
| trajectory_pixels = None |
| if has_traj_data: |
| if model_xml is None: |
| print("Model XML not found in dataset; skipping trajectory overlay.") |
| else: |
| try: |
| image_height, image_width = images.shape[1:3] |
| camera_params = _get_camera_parameters(model_xml, camera_name, (image_height, image_width)) |
| trajectory_pixels = _project_points_to_image(eef_positions, camera_params) |
| if trajectory_pixels.size == 0: |
| trajectory_pixels = None |
| except Exception as exc: |
| print(f"Failed to project trajectory for camera '{camera_name}': {exc}") |
| trajectory_pixels = None |
|
|
| has_traj_overlay = trajectory_pixels is not None |
| traj_frame_count = trajectory_pixels.shape[0] if has_traj_overlay else 0 |
|
|
| print("Length of trajectory_pixels: ", traj_frame_count) |
|
|
| if only_image: |
| frame = _create_frame_with_arrow_traj( |
| images[0], |
| trajectory_pixels[:8], |
| figsize, |
| ) |
| frame = frame[..., ::-1] |
| Image.fromarray(frame).transpose(Image.FLIP_TOP_BOTTOM).save(output_video.replace(".mp4", ".png")) |
| else: |
| os.makedirs(os.path.dirname(output_video) or ".", exist_ok=True) |
| with imageio.get_writer(output_video, fps=fps) as video_writer: |
| for frame_idx, image in enumerate(images): |
| if has_traj_overlay: |
| frame = _create_frame_with_trajectory( |
| image, |
| trajectory_pixels, |
| frame_idx, |
| traj_frame_count, |
| figsize, |
| ) |
| else: |
| frame = image[..., ::-1] |
| frame = np.asarray(Image.fromarray(frame).transpose(Image.FLIP_TOP_BOTTOM)) |
| video_writer.append_data(frame) |
| print(f"Video saved as: {output_video}") |
|
|
|
|
| if __name__ == "__main__": |
| |
| generate_task_video( |
| task_index=1, |
| benchmark_name="libero_object", |
| output_video="task_1_demo_with_traj.mp4", |
| include_trajectory=True, |
| camera_name="agentview", |
| only_image=True |
| ) |
|
|
|
|