| | import torch |
| | import numpy |
| | import random |
| | import numpy as np |
| | import cv2 |
| | import imageio |
| |
|
| | from pathlib import Path |
| | from robomme.env_record_wrapper import BenchmarkEnvBuilder |
| |
|
| | class VideoRecorder: |
| | BORDER_COLOR = (255, 0, 0) |
| | BORDER_THICKNESS = 10 |
| |
|
| | def __init__(self, fps: int = 30): |
| | self.fps = fps |
| | self.frames: list[np.ndarray] = [] |
| |
|
| | @staticmethod |
| | def _to_numpy(t) -> np.ndarray: |
| | return t.cpu().numpy() if isinstance(t, torch.Tensor) else np.asarray(t) |
| |
|
| | @classmethod |
| | def _make_frame( |
| | cls, |
| | front: np.ndarray | torch.Tensor, |
| | wrist: np.ndarray | torch.Tensor, |
| | is_video_demo: bool = False, |
| | ) -> np.ndarray: |
| | frame = np.hstack([cls._to_numpy(front), cls._to_numpy(wrist)]).astype(np.uint8) |
| | if is_video_demo: |
| | h, w = frame.shape[:2] |
| | cv2.rectangle(frame, (0, 0), (w, h), cls.BORDER_COLOR, cls.BORDER_THICKNESS) |
| | return frame |
| |
|
| | def add_initial_obs(self, obs: dict): |
| | rgb_list = obs["front_rgb_list"] |
| | for i, (front, wrist) in enumerate(zip(rgb_list, obs["wrist_rgb_list"])): |
| | self.frames.append(self._make_frame(front, wrist, is_video_demo=i < len(rgb_list) - 1)) |
| |
|
| | def add_step_obs(self, obs: dict): |
| | self.frames.append(self._make_frame( |
| | obs["front_rgb_list"][-1], obs["wrist_rgb_list"][-1], |
| | )) |
| |
|
| | def save(self, file_path: str): |
| | dir_path = Path(file_path).parent |
| | dir_path.mkdir(parents=True, exist_ok=True) |
| | imageio.mimsave(file_path, self.frames, fps=self.fps) |
| | self.frames = [] |
| |
|
| | class DummyModel: |
| | def __init__(self, seed: int): |
| | self.base_action = np.array( |
| | [0.0, 0.0, 0.0, -np.pi / 2, 0.0, np.pi / 2, np.pi / 4, 1.0], |
| | dtype=np.float32, |
| | ) |
| | self.set_model_seed(seed) |
| | |
| | def set_model_seed(self, seed: int): |
| | |
| | |
| | torch.manual_seed(seed) |
| | numpy.random.seed(seed) |
| | random.seed(seed) |
| | self.seed = seed |
| | |
| | def predict(self, *args, **kwargs): |
| | noise = np.random.normal(0, 0.01, self.base_action.shape) |
| | noise[..., -1:] = 0.0 |
| | return self.base_action + noise |
| |
|
| |
|
| | TASKS = BenchmarkEnvBuilder.get_task_list() |
| | MODEL_SEED = 7 |
| | dummy_model = DummyModel(seed=MODEL_SEED) |
| |
|
| | total_success = [] |
| | for task in TASKS: |
| | env_builder = BenchmarkEnvBuilder( |
| | env_id=task, |
| | dataset="test", |
| | action_space="joint_angle", |
| | max_steps=1300, |
| | ) |
| | episode_count = env_builder.get_episode_num() |
| | for episode in range(2): |
| | env = env_builder.make_env_for_episode( |
| | episode, |
| | include_maniskill_obs=True, |
| | include_front_depth=True, |
| | include_wrist_depth=True, |
| | include_front_camera_extrinsic=True, |
| | include_wrist_camera_extrinsic=True, |
| | include_available_multi_choices=True, |
| | include_front_camera_intrinsic=True, |
| | include_wrist_camera_intrinsic=True, |
| | ) |
| | obs, info = env.reset() |
| | task_goal = info["task_goal"] |
| | if isinstance(task_goal, list): |
| | task_goal = task_goal[0] |
| | print(f"\nTask goal: {task_goal}") |
| | |
| | recorder = VideoRecorder() |
| | recorder.add_initial_obs(obs) |
| |
|
| | current_front_rgb = obs["front_rgb_list"][-1] |
| | current_wrist_rgb = obs["wrist_rgb_list"][-1] |
| |
|
| | while True: |
| | dummy_action = dummy_model.predict(current_front_rgb, current_wrist_rgb, task_goal) |
| | obs, reward, terminated, truncated, info = env.step(dummy_action) |
| | if info is not None and info.get("status") == "error": |
| | print(f"Error: {info.get('error_message')}") |
| | total_success.append(False) |
| | break |
| | if terminated or truncated: |
| | outcome = info.get("status", "unknown") |
| | print(f"Outcome of episode {episode} of task {task}: {outcome}") |
| | total_success.append(outcome == "success") |
| | break |
| | current_front_rgb = obs["front_rgb_list"][-1] |
| | current_wrist_rgb = obs["wrist_rgb_list"][-1] |
| | recorder.add_step_obs(obs) |
| | |
| | env.close() |
| | recorder.save(file_path=f"rollout_videos/{task}_ep_{episode}_{outcome}_{task_goal}.mp4") |
| | |
| | print(f"Evaluation completed.") |
| | print(f"Success rate: {sum(total_success) / len(total_success)}") |