RoboMME / scripts /evaluation.py
HongzeFu's picture
HF Space: code-only (no binary assets)
06c11b0
import torch
import numpy
import random
import numpy as np
import cv2
import imageio
from pathlib import Path
from robomme.env_record_wrapper import BenchmarkEnvBuilder
class VideoRecorder:
BORDER_COLOR = (255, 0, 0)
BORDER_THICKNESS = 10
def __init__(self, fps: int = 30):
self.fps = fps
self.frames: list[np.ndarray] = []
@staticmethod
def _to_numpy(t) -> np.ndarray:
return t.cpu().numpy() if isinstance(t, torch.Tensor) else np.asarray(t)
@classmethod
def _make_frame(
cls,
front: np.ndarray | torch.Tensor,
wrist: np.ndarray | torch.Tensor,
is_video_demo: bool = False,
) -> np.ndarray:
frame = np.hstack([cls._to_numpy(front), cls._to_numpy(wrist)]).astype(np.uint8)
if is_video_demo:
h, w = frame.shape[:2]
cv2.rectangle(frame, (0, 0), (w, h), cls.BORDER_COLOR, cls.BORDER_THICKNESS)
return frame
def add_initial_obs(self, obs: dict):
rgb_list = obs["front_rgb_list"]
for i, (front, wrist) in enumerate(zip(rgb_list, obs["wrist_rgb_list"])):
self.frames.append(self._make_frame(front, wrist, is_video_demo=i < len(rgb_list) - 1))
def add_step_obs(self, obs: dict):
self.frames.append(self._make_frame(
obs["front_rgb_list"][-1], obs["wrist_rgb_list"][-1],
))
def save(self, file_path: str):
dir_path = Path(file_path).parent
dir_path.mkdir(parents=True, exist_ok=True)
imageio.mimsave(file_path, self.frames, fps=self.fps)
self.frames = []
class DummyModel:
def __init__(self, seed: int):
self.base_action = np.array(
[0.0, 0.0, 0.0, -np.pi / 2, 0.0, np.pi / 2, np.pi / 4, 1.0],
dtype=np.float32,
)
self.set_model_seed(seed)
def set_model_seed(self, seed: int):
# set model seed will not affect the env seed
# env seed is fixed internally
torch.manual_seed(seed)
numpy.random.seed(seed)
random.seed(seed)
self.seed = seed
def predict(self, *args, **kwargs):
noise = np.random.normal(0, 0.01, self.base_action.shape)
noise[..., -1:] = 0.0 # Preserve gripper action
return self.base_action + noise
TASKS = BenchmarkEnvBuilder.get_task_list()
MODEL_SEED = 7 # 7, 42, 0
dummy_model = DummyModel(seed=MODEL_SEED)
total_success = []
for task in TASKS:
env_builder = BenchmarkEnvBuilder(
env_id=task,
dataset="test",
action_space="joint_angle", # change this to your model's action space
max_steps=1300, # we set 1300 in MME-VLA experiments. The longest length of training data is @daiyp
)
episode_count = env_builder.get_episode_num()
for episode in range(2):
env = env_builder.make_env_for_episode(
episode,
include_maniskill_obs=True,
include_front_depth=True,
include_wrist_depth=True,
include_front_camera_extrinsic=True,
include_wrist_camera_extrinsic=True,
include_available_multi_choices=True,
include_front_camera_intrinsic=True,
include_wrist_camera_intrinsic=True,
)
obs, info = env.reset()
task_goal = info["task_goal"]
if isinstance(task_goal, list):
task_goal = task_goal[0] # you can take alternative task goals if you want
print(f"\nTask goal: {task_goal}")
recorder = VideoRecorder()
recorder.add_initial_obs(obs)
current_front_rgb = obs["front_rgb_list"][-1]
current_wrist_rgb = obs["wrist_rgb_list"][-1]
while True:
dummy_action = dummy_model.predict(current_front_rgb, current_wrist_rgb, task_goal)
obs, reward, terminated, truncated, info = env.step(dummy_action)
if info is not None and info.get("status") == "error":
print(f"Error: {info.get('error_message')}")
total_success.append(False)
break
if terminated or truncated:
outcome = info.get("status", "unknown")
print(f"Outcome of episode {episode} of task {task}: {outcome}")
total_success.append(outcome == "success")
break
current_front_rgb = obs["front_rgb_list"][-1]
current_wrist_rgb = obs["wrist_rgb_list"][-1]
recorder.add_step_obs(obs)
env.close()
recorder.save(file_path=f"rollout_videos/{task}_ep_{episode}_{outcome}_{task_goal}.mp4")
print(f"Evaluation completed.")
print(f"Success rate: {sum(total_success) / len(total_success)}")