| """ |
| Example reward-model wrapper for LIBERO environments. |
| """ |
|
|
| from __future__ import annotations |
|
|
| from collections import deque |
| from dataclasses import dataclass |
| from typing import Any, Deque, Dict, List, Optional, Sequence, Tuple, Union |
| import os |
| import argparse |
| import sys |
| sys.path.append(os.path.join(os.path.dirname(__file__), "..", "LIBERO")) |
|
|
| import numpy as np |
| import gymnasium as gym |
| import gymnasium.vector as gym_vector |
|
|
| from robometer.evals.eval_utils import raw_dict_to_sample, extract_rewards_from_output, extract_success_probs_from_output |
| from robometer.evals.eval_server import process_batch_helper |
| from robometer.utils.setup_utils import setup_batch_collator |
| from robometer.utils.tensor_utils import t2n |
| from robometer.utils.save import load_model_from_hf |
|
|
| class GymToGymnasiumWrapper(gym.Env): |
| """ |
| A wrapper to convert a classic Gym environment to a Gymnasium-like interface. |
| It adapts `reset()` and `step()` signatures, handles info dict changes, and supports compatibility. |
| """ |
|
|
| def __init__(self, env, time_limit: int = None): |
| super().__init__() |
| self.env = env |
| |
| if hasattr(self.env, "action_space"): |
| self.action_space = self.env.action_space |
| if hasattr(self.env, "observation_space"): |
| self.observation_space = self.env.observation_space |
| self.reward_range = getattr(env, "reward_range", None) |
| self.metadata = getattr(env, "metadata", {}) |
| self.time_limit = time_limit |
| self.current_step = 0 |
|
|
| def reset(self, *, seed=None, options=None): |
| |
| self.current_step = 0 |
| |
| if seed is not None: |
| try: |
| obs = self.env.reset(seed=seed) |
| except TypeError: |
| self.env.seed(seed) |
| obs = self.env.reset() |
| else: |
| obs = self.env.reset() |
| info = {} |
| if isinstance(obs, tuple) and len(obs) == 2: |
| obs, info = obs |
| return obs, info |
|
|
| def step(self, action): |
| result = self.env.step(action) |
| self.current_step += 1 |
| if len(result) == 4: |
| obs, reward, done, info = result |
| terminated = done |
| |
| if self.time_limit is not None and self.current_step >= self.time_limit: |
| truncated = True |
| else: |
| truncated = info.get("TimeLimit.truncated", False) |
| return obs, reward, terminated, truncated, info |
| elif len(result) == 5: |
| |
| return result |
| else: |
| raise ValueError("Unexpected number of outputs from env.step") |
|
|
| def render(self, *args, **kwargs): |
| return self.env.render(*args, **kwargs) |
|
|
| def close(self): |
| return self.env.close() |
|
|
| def __getattr__(self, name): |
| |
| return getattr(self.env, name) |
|
|
|
|
|
|
| @dataclass |
| class RewardModelStepOutput: |
| |
| reward: float |
| success_prob: float |
| per_key_rewards: Dict[str, float] |
| per_key_success_probs: Dict[str, float] |
|
|
|
|
| class _RewardModelInferenceMixin: |
| """ |
| Shared reward-model inference logic with local model. |
| """ |
|
|
| def __init__( |
| self, |
| model_path: str, |
| device: str, |
| max_frames: Optional[int] = None, |
| ): |
| if model_path is not None: |
| reward_model_config, tokenizer, processor, reward_model = load_model_from_hf( |
| model_path=model_path, |
| device=device, |
| ) |
| reward_model.eval() |
|
|
| self.reward_model = reward_model |
| self.reward_model_config = reward_model_config |
|
|
| if self.reward_model is None: |
| raise ValueError("reward_model must be provided") |
|
|
| |
| if max_frames is not None: |
| self.max_frames = int(max_frames) |
| elif self.reward_model_config is not None: |
| self.max_frames = int(getattr(getattr(self.reward_model_config, "data", None), "max_frames", 16)) |
| else: |
| self.max_frames = 16 |
|
|
| |
| self.processor = None |
| self.tokenizer = None |
| self.batch_collator = None |
| self._model_device = None |
| self._model_type = None |
| if self.reward_model is not None: |
| self.processor = getattr(self.reward_model, "processor", None) |
| self.tokenizer = getattr(self.reward_model, "tokenizer", None) |
| if self.processor is None or self.tokenizer is None: |
| raise ValueError( |
| "processor and tokenizer must be available on reward_model " |
| "(reward_model.processor / reward_model.tokenizer)" |
| ) |
|
|
| |
| if self.reward_model_config is not None: |
| data_cfg = getattr(self.reward_model_config, "data", None) |
| if data_cfg is not None and hasattr(data_cfg, "use_multi_image") and not data_cfg.use_multi_image: |
| data_cfg.use_multi_image = True |
|
|
| |
| self._model_type = getattr(getattr(self.reward_model_config, "model", None), "model_type", None) |
| if self._model_type is None: |
| raise ValueError("reward_model_config.model.model_type is required for local reward inference") |
| self._model_device = getattr(self.reward_model, "device", None) |
| if self._model_device is None: |
| try: |
| import torch |
|
|
| self._model_device = next(self.reward_model.parameters()).device |
| if isinstance(self._model_device, torch.device): |
| self._model_device = str(self._model_device) |
| except Exception: |
| self._model_device = None |
|
|
| self.batch_collator = setup_batch_collator( |
| self.processor, self.tokenizer, self.reward_model_config, is_eval=True |
| ) |
|
|
| def _compute_rewards_batch( |
| self, batch_raw: List[Dict[str, Any]] |
| ) -> Tuple[List[float], List[float]]: |
| """ |
| Returns lists: (progress_rewards, success_probs). |
| """ |
| if len(batch_raw) == 0: |
| return [], [] |
|
|
| samples = [ |
| raw_dict_to_sample(raw_data=raw, max_frames=self.max_frames, sample_type="progress") |
| for raw in batch_raw |
| ] |
|
|
| is_discrete_mode = ( |
| self.reward_model_config is not None |
| and getattr(getattr(self.reward_model_config, "loss", None), "progress_loss_type", None) == "discrete" |
| ) |
| num_bins = ( |
| getattr(getattr(self.reward_model_config, "loss", None), "progress_discrete_bins", None) |
| if self.reward_model_config is not None |
| else None |
| ) |
| outputs = process_batch_helper( |
| model_type=self._model_type, |
| model=self.reward_model, |
| tokenizer=self.tokenizer, |
| batch_collator=self.batch_collator, |
| device=self._model_device, |
| batch_data=[s.model_dump() for s in samples], |
| job_id=0, |
| is_discrete_mode=bool(is_discrete_mode), |
| num_bins=num_bins, |
| ) |
| rewards = extract_rewards_from_output(outputs) |
| success_probs = extract_success_probs_from_output(outputs) |
| return rewards.tolist(), success_probs.tolist() |
|
|
|
|
| class LiberoRobometerRewardWrapper(gym.Wrapper, _RewardModelInferenceMixin): |
| """ |
| Non-vector LIBERO wrapper that replaces rewards with reward-model predictions. |
| """ |
|
|
| def __init__( |
| self, |
| env, |
| model_path: str, |
| device: str, |
| reward_relabeling_keys: Sequence[str], |
| *, |
| use_relative_rewards: bool = False, |
| add_estimated_reward: bool = False, |
| use_success_detection: bool = False, |
| success_detection_duration: int = 2, |
| success_detection_threshold: float = 0.65, |
| max_frames: Optional[int] = None, |
| ): |
| self.env = GymToGymnasiumWrapper(env, time_limit=400) |
| gym.Wrapper.__init__(self, self.env) |
| _RewardModelInferenceMixin.__init__( |
| self, |
| model_path=model_path, |
| device=device, |
| max_frames=max_frames, |
| ) |
|
|
| self.reward_relabeling_keys = list(reward_relabeling_keys) |
| if len(self.reward_relabeling_keys) == 0: |
| raise ValueError("reward_relabeling_keys must be non-empty") |
| |
| |
| if not hasattr(self.env, "action_space"): |
| self.action_space = gym.spaces.Box(low=-1.0, high=1.0, shape=(7,), dtype=np.float32) |
| else: |
| self.action_space = self.env.action_space |
|
|
| self.use_relative_rewards = bool(use_relative_rewards) |
| self.add_estimated_reward = bool(add_estimated_reward) |
| self.use_success_detection = bool(use_success_detection) |
| self.success_detection_duration = int(success_detection_duration) |
| self.success_detection_threshold = float(success_detection_threshold) |
|
|
| self._frames: Dict[str, Deque[np.ndarray]] = {} |
| self.language_instruction = self.env.language_instruction |
| self.episode_id = 0 |
| self._step_in_episode: int = 0 |
| self._prev_reward: float = 0.0 |
| self._success_window: Deque[float] = deque(maxlen=self.success_detection_duration) |
|
|
| def _get_language_instruction(self, obs: Dict[str, Any], info: Dict[str, Any]) -> Optional[str]: |
| if isinstance(info, dict) and "language_instruction" in info: |
| return info.get("language_instruction") |
| if isinstance(obs, dict) and isinstance(obs.get("prompt"), str): |
| return obs.get("prompt") |
| return self.language_instruction |
|
|
| def reset(self, **kwargs): |
| obs, info = self.env.reset(**kwargs) |
| self.language_instruction = self.env.language_instruction |
| self.episode_id += 1 |
|
|
| self._frames = {k: [] for k in self.reward_relabeling_keys} |
| self._step_in_episode = 0 |
| self._prev_reward = 0.0 |
| self._success_window = deque(maxlen=self.success_detection_duration) |
|
|
| if isinstance(obs, dict): |
| for k in self.reward_relabeling_keys: |
| if k in obs: |
| self._frames[k].append(t2n(obs[k])) |
| return obs, info |
|
|
| def step(self, action): |
| obs, env_reward, terminated, truncated, info = self.env.step(action) |
|
|
| if not isinstance(info, dict): |
| info = {} if info is None else dict(info) |
| |
| |
| if "success" not in info: |
| info["success"] = terminated |
| if terminated: |
| assert env_reward == 1.0, "Reward should be 1.0 when task succeeds" |
| |
| env_reward -= 1 |
|
|
| if isinstance(obs, dict): |
| for k in self.reward_relabeling_keys: |
| if k in obs: |
| self._frames[k].append(t2n(obs[k])) |
|
|
| |
| per_key_rewards: Dict[str, float] = {} |
| per_key_success: Dict[str, float] = {} |
|
|
| for key_idx, key in enumerate(self.reward_relabeling_keys): |
| frames = np.stack(list(self._frames[key]), axis=0) if len(self._frames[key]) > 0 else np.array([]) |
| raw = dict( |
| frames=frames, |
| task=self.language_instruction, |
| id=self.episode_id, |
| metadata=dict( |
| subsequence_length=len(self._frames[key]) if self._frames[key] is not None else 0, |
| ), |
| video_embeddings=None, |
| text_embedding=None, |
| ) |
| rewards, success_probs = self._compute_rewards_batch([raw]) |
| per_key_rewards[key] = rewards[0] |
| per_key_success[key] = success_probs[0] |
|
|
| pred_reward = np.mean(list(per_key_rewards.values())) if per_key_rewards else 0.0 |
| success_prob = np.mean(list(per_key_success.values())) if per_key_success else 0.0 |
|
|
| |
| if self.use_relative_rewards: |
| current = pred_reward |
| pred_reward = pred_reward - self._prev_reward |
| self._prev_reward = current |
| if terminated or truncated: |
| self._prev_reward = 0.0 |
|
|
| |
| if self.use_success_detection: |
| self._success_window.append(success_prob) |
| if len(self._success_window) == self.success_detection_duration: |
| votes = sum(1 for p in self._success_window if p >= self.success_detection_threshold) |
| if votes > (self.success_detection_duration / 2): |
| terminated = True |
| info["success"] = True |
| info["success_from_reward_model"] = True |
|
|
| |
| if self.add_estimated_reward: |
| out_reward = env_reward + pred_reward |
| else: |
| out_reward = pred_reward |
|
|
| info["env_reward"] = env_reward |
| info["predicted_reward"] = pred_reward |
| info["success_prob"] = success_prob |
| info["predicted_rewards_by_key"] = per_key_rewards |
| info["success_probs_by_key"] = per_key_success |
| info["step_in_episode"] = int(self._step_in_episode) |
|
|
| self._step_in_episode += 1 |
|
|
| |
| if terminated or truncated: |
| self._frames = {k: [] for k in self.reward_relabeling_keys} |
| self.language_instruction = self.env.language_instruction |
| self._step_in_episode = 0 |
| self._success_window = deque(maxlen=self.success_detection_duration) |
|
|
| return obs, out_reward, terminated, truncated, info |
|
|
|
|
| class VectorLiberoRobometerRewardWrapper(gym_vector.VectorWrapper, _RewardModelInferenceMixin): |
| """ |
| Vectorized LIBERO wrapper that replaces rewards with reward-model predictions per env. |
| """ |
|
|
| def __init__( |
| self, |
| env: gym_vector.VectorEnv, |
| model_path: str, |
| device: str, |
| reward_relabeling_keys: Sequence[str], |
| *, |
| use_relative_rewards: bool = False, |
| add_estimated_reward: bool = False, |
| replace_reward: bool = True, |
| use_success_detection: bool = False, |
| success_detection_duration: int = 2, |
| success_detection_threshold: float = 0.65, |
| max_frames: Optional[int] = None, |
| ): |
| gym_vector.VectorWrapper.__init__(self, env) |
| _RewardModelInferenceMixin.__init__( |
| self, |
| model_path=model_path, |
| device=device, |
| max_frames=max_frames, |
| ) |
|
|
| self.reward_relabeling_keys = list(reward_relabeling_keys) |
| if len(self.reward_relabeling_keys) == 0: |
| raise ValueError("reward_relabeling_keys must be non-empty") |
|
|
| self.use_relative_rewards = bool(use_relative_rewards) |
| self.add_estimated_reward = bool(add_estimated_reward) |
| self.replace_reward = bool(replace_reward) |
| self.use_success_detection = bool(use_success_detection) |
| self.success_detection_duration = int(success_detection_duration) |
| self.success_detection_threshold = float(success_detection_threshold) |
|
|
| self._n = int(getattr(self.env, "num_envs", 1)) |
| self._frames: List[Dict[str, Deque[np.ndarray]]] = [] |
| self._language_instructions: List[Optional[str]] = [] |
| self._episode_ids: List[int] = [] |
| self._step_in_episode: List[int] = [] |
| self._prev_rewards: List[float] = [] |
| self._success_windows: List[Deque[float]] = [] |
|
|
| self._init_state() |
|
|
| def _init_state(self): |
| self._n = int(getattr(self.env, "num_envs", self._n)) |
| self._frames = [ |
| {k: deque(maxlen=self.max_frames) for k in self.reward_relabeling_keys} for _ in range(self._n) |
| ] |
| self._language_instructions = [None for _ in range(self._n)] |
| self._episode_ids = [0 for _ in range(self._n)] |
| self._step_in_episode = [0 for _ in range(self._n)] |
| self._prev_rewards = [0.0 for _ in range(self._n)] |
| self._success_windows = [deque(maxlen=self.success_detection_duration) for _ in range(self._n)] |
|
|
| def _get_language_instruction_vec(self, obs: Dict[str, Any], info: Any) -> List[Optional[str]]: |
| getter = getattr(self.env, "get_language_instruction", None) |
| if callable(getter): |
| try: |
| instr = getter() |
| if isinstance(instr, str): |
| return [instr] * self._n |
| except Exception: |
| pass |
|
|
| |
| if isinstance(obs, dict) and "prompt" in obs: |
| p = obs["prompt"] |
| if isinstance(p, list) and len(p) == self._n: |
| return [str(x) for x in p] |
| if isinstance(p, np.ndarray) and p.shape[0] == self._n: |
| return [str(x) for x in p.tolist()] |
| |
| shared = getattr(self.env, "language_instruction", None) |
| return [shared] * self._n |
|
|
| def reset(self, **kwargs): |
| obs, info = self.env.reset(**kwargs) |
| self._init_state() |
|
|
| if isinstance(obs, dict): |
| instrs = self._get_language_instruction_vec(obs, info) |
| for i in range(self._n): |
| self._language_instructions[i] = instrs[i] |
| self._episode_ids[i] += 1 |
|
|
| for k in self.reward_relabeling_keys: |
| if k in obs: |
| arr = t2n(obs[k]) |
| if arr is not None and arr.shape[0] == self._n: |
| for i in range(self._n): |
| self._frames[i][k].append(arr[i]) |
|
|
| return obs, info |
|
|
| def step(self, actions): |
| obs, env_rewards, terminateds, truncateds, info = self.env.step(actions) |
|
|
| |
| env_rewards_np = t2n(env_rewards) |
| terminateds_np = t2n(terminateds).astype(bool) |
| truncateds_np = t2n(truncateds).astype(bool) |
|
|
| if env_rewards_np is None: |
| env_rewards_np = np.zeros((self._n,), dtype=np.float64) |
|
|
| |
| |
| for i in range(self._n): |
| if bool(terminateds_np[i]): |
| assert float(env_rewards_np[i]) == 1.0, "Reward should be 1.0 when task succeeds" |
| env_rewards_shifted = env_rewards_np.astype(np.float64) - 1.0 |
|
|
| |
| final_obs = None |
| if isinstance(info, dict) and "final_observation" in info: |
| final_obs = info.get("final_observation") |
|
|
| reset_instrs = self._get_language_instruction_vec(obs, info) if isinstance(obs, dict) else [None] * self._n |
| task_for_model: List[Optional[str]] = [ |
| (self._language_instructions[i] if self._language_instructions[i] is not None else reset_instrs[i]) |
| for i in range(self._n) |
| ] |
|
|
| |
| |
| if isinstance(obs, dict): |
| for k in self.reward_relabeling_keys: |
| if k not in obs: |
| continue |
| arr_reset = t2n(obs[k]) |
| if arr_reset is None or arr_reset.shape[0] != self._n: |
| continue |
| for i in range(self._n): |
| frame_i = arr_reset[i] |
| if final_obs is not None and i < len(final_obs) and final_obs[i] is not None: |
| fo_i = final_obs[i] |
| if isinstance(fo_i, dict) and k in fo_i: |
| frame_i = t2n(fo_i[k]) |
| self._frames[i][k].append(frame_i) |
|
|
| |
| per_env_per_key_reward: Dict[str, List[float]] = {k: [0.0] * self._n for k in self.reward_relabeling_keys} |
| per_env_per_key_success: Dict[str, List[float]] = {k: [0.0] * self._n for k in self.reward_relabeling_keys} |
|
|
| for key_idx, key in enumerate(self.reward_relabeling_keys): |
| batch_raw: List[Dict[str, Any]] = [] |
| for i in range(self._n): |
| frames = np.stack(list(self._frames[i][key]), axis=0) if len(self._frames[i][key]) > 0 else np.array([]) |
| batch_raw.append( |
| dict( |
| frames=frames, |
| task=task_for_model[i], |
| id=int(self._episode_ids[i]), |
| metadata=dict(subsequence_length=len(self._frames[i][key])), |
| video_embeddings=None, |
| text_embedding=None, |
| ) |
| ) |
|
|
| rewards_k, success_k = self._compute_rewards_batch(batch_raw) |
| for i in range(self._n): |
| per_env_per_key_reward[key][i] = rewards_k[i] if i < len(rewards_k) else 0.0 |
| per_env_per_key_success[key][i] = success_k[i] if i < len(success_k) else 0.0 |
|
|
| |
| pred_rewards_abs = np.zeros((self._n,), dtype=np.float64) |
| success_probs = np.zeros((self._n,), dtype=np.float64) |
| for i in range(self._n): |
| r_vals = [per_env_per_key_reward[k][i] for k in self.reward_relabeling_keys] |
| s_vals = [per_env_per_key_success[k][i] for k in self.reward_relabeling_keys] |
| pred_rewards_abs[i] = np.mean(r_vals) if len(r_vals) else 0.0 |
| success_probs[i] = np.mean(s_vals) if len(s_vals) else 0.0 |
|
|
| pred_rewards_out = pred_rewards_abs.copy() |
| if self.use_relative_rewards: |
| for i in range(self._n): |
| cur = float(pred_rewards_abs[i]) |
| pred_rewards_out[i] = cur - self._prev_rewards[i] |
| self._prev_rewards[i] = cur |
| if terminateds_np[i] or truncateds_np[i]: |
| self._prev_rewards[i] = 0.0 |
|
|
| |
| if self.use_success_detection: |
| for i in range(self._n): |
| self._success_windows[i].append(float(success_probs[i])) |
| if len(self._success_windows[i]) == self.success_detection_duration: |
| votes = sum(1 for p in self._success_windows[i] if p >= self.success_detection_threshold) |
| if votes > (self.success_detection_duration / 2): |
| terminateds_np[i] = True |
|
|
| |
| if self.add_estimated_reward: |
| out_rewards = env_rewards_shifted + pred_rewards_out |
| else: |
| out_rewards = env_rewards_shifted if not self.replace_reward else pred_rewards_out |
|
|
| |
| |
| if info is None: |
| info = {} |
| if isinstance(info, dict): |
| info = dict(info) |
| if "success" not in info: |
| info["success"] = terminateds_np.copy() |
| info["env_reward"] = env_rewards_shifted.astype(np.float64) |
| info["predicted_reward"] = pred_rewards_out.astype(np.float64) |
| info["predicted_reward_abs"] = pred_rewards_abs.astype(np.float64) |
| info["success_prob"] = success_probs.astype(np.float64) |
| info["step_in_episode"] = np.asarray(self._step_in_episode, dtype=np.int32) |
| |
| if isinstance(info, dict): |
| for k in self.reward_relabeling_keys: |
| info[f"predicted_reward/{k}"] = np.asarray(per_env_per_key_reward[k], dtype=np.float64) |
| info[f"success_prob/{k}"] = np.asarray(per_env_per_key_success[k], dtype=np.float64) |
|
|
| |
| for i in range(self._n): |
| self._step_in_episode[i] += 1 |
| if terminateds_np[i] or truncateds_np[i]: |
| self._frames[i] = {k: deque(maxlen=self.max_frames) for k in self.reward_relabeling_keys} |
| self._language_instructions[i] = reset_instrs[i] |
| self._step_in_episode[i] = 0 |
| self._success_windows[i] = deque(maxlen=self.success_detection_duration) |
| self._episode_ids[i] += 1 |
|
|
| |
| if isinstance(obs, dict) and final_obs is not None and i < len(final_obs) and final_obs[i] is not None: |
| for k in self.reward_relabeling_keys: |
| if k not in obs: |
| continue |
| arr_reset = t2n(obs[k]) |
| if arr_reset is not None and arr_reset.shape[0] == self._n: |
| self._frames[i][k].append(arr_reset[i]) |
|
|
| return obs, out_rewards.astype(np.float64), terminateds_np, truncateds_np, info |
|
|
| def main(): |
| try: |
| from libero.libero.envs import OffScreenRenderEnv, DummyVectorEnv |
| from libero.libero import benchmark, get_libero_path |
| except ImportError: |
| print("LIBERO not found. Please install LIBERO.") |
| sys.exit(1) |
|
|
| parser = argparse.ArgumentParser( |
| description="Run RBM inference locally: load model from HuggingFace and compute per-frame progress and success.", |
| epilog="Outputs: <out>.npy (rewards), <out>_success_probs.npy, <out>_progress_success.png", |
| formatter_class=argparse.RawDescriptionHelpFormatter, |
| ) |
| parser.add_argument("--model-path", default="aliangdw/Robometer-4B-LIBERO", help="HuggingFace model id or local checkpoint path") |
| parser.add_argument("--task-suite-name", default="libero_90", help="LIBERO task suite name") |
| parser.add_argument("--task-id", default=28, type=int, help="LIBERO task id") |
| parser.add_argument("--vectorized", action="store_true", help="Run in vectorized mode") |
| parser.add_argument("--num-envs", default=2, type=int, help="Number of environments to run in parallel") |
| args = parser.parse_args() |
|
|
| if not args.vectorized: |
| print("Testing Single LIBERO Robometer Reward Wrapper") |
| seed = np.random.randint(0, 1000000) |
| |
| benchmark_dict = benchmark.get_benchmark_dict() |
| task_suite = benchmark_dict[args.task_suite_name]() |
| task = task_suite.get_task(args.task_id) |
|
|
| task_bddl_file = os.path.join(get_libero_path("bddl_files"), task.problem_folder, task.bddl_file) |
|
|
| env_args = {"bddl_file_name": task_bddl_file, "camera_heights": 256, "camera_widths": 256} |
| base_env = OffScreenRenderEnv(**env_args) |
| base_env.seed(seed) |
|
|
| robometer_libero_env = LiberoRobometerRewardWrapper(base_env, |
| model_path=args.model_path, |
| device="cuda", |
| reward_relabeling_keys=["agentview_image"], |
| add_estimated_reward=True, |
| ) |
| obs, info = robometer_libero_env.reset() |
| for i in range(10): |
| action = np.random.uniform(-1, 1, 7) |
| obs, reward, terminated, truncated, info = robometer_libero_env.step(action) |
| print(f"Reward at step {i}: {reward}") |
| |
| robometer_libero_env.close() |
|
|
| else: |
| print("Testing Vectorized LIBERO Robometer Reward Wrapper") |
| def make_env(): |
| seed = np.random.randint(0, 1000000) |
| |
| benchmark_dict = benchmark.get_benchmark_dict() |
| task_suite = benchmark_dict[args.task_suite_name]() |
| task = task_suite.get_task(args.task_id) |
|
|
| task_bddl_file = os.path.join(get_libero_path("bddl_files"), task.problem_folder, task.bddl_file) |
|
|
| env_args = {"bddl_file_name": task_bddl_file, "camera_heights": 256, "camera_widths": 256} |
| base_env = OffScreenRenderEnv(**env_args) |
| base_env.seed(seed) |
| sample_obs = base_env.reset() |
| env = GymToGymnasiumWrapper(base_env, time_limit=400) |
| |
| if not hasattr(env, "action_space"): |
| env.action_space = gym.spaces.Box(low=-1.0, high=1.0, shape=(7,), dtype=np.float32) |
| if not hasattr(env, "observation_space"): |
| |
| obs_space_dict = {} |
| for k, v in sample_obs.items(): |
| |
| if isinstance(v, (str, bytes, bytearray)) or (isinstance(v, np.ndarray) and v.dtype.kind in {"U", "S"}): |
| obs_space_dict[k] = gym.spaces.Text(max_length=2048) |
| continue |
| v_arr = np.asarray(v) |
| dt = v_arr.dtype |
| if np.issubdtype(dt, np.uint8): |
| |
| obs_space_dict[k] = gym.spaces.Box( |
| low=np.zeros(v_arr.shape, dtype=np.uint8), |
| high=np.full(v_arr.shape, 255, dtype=np.uint8), |
| shape=v_arr.shape, |
| dtype=np.uint8, |
| ) |
| elif np.issubdtype(dt, np.integer): |
| ii = np.iinfo(dt) |
| obs_space_dict[k] = gym.spaces.Box( |
| low=np.full(v_arr.shape, ii.min, dtype=dt), |
| high=np.full(v_arr.shape, ii.max, dtype=dt), |
| shape=v_arr.shape, |
| dtype=dt, |
| ) |
| else: |
| |
| obs_space_dict[k] = gym.spaces.Box( |
| low=np.full(v_arr.shape, -np.inf, dtype=np.float32), |
| high=np.full(v_arr.shape, np.inf, dtype=np.float32), |
| shape=v_arr.shape, |
| dtype=np.float32, |
| ) |
| env.observation_space = gym.spaces.Dict(obs_space_dict) |
|
|
| return env |
|
|
| env_fns = [make_env for _ in range(args.num_envs)] |
| env = gym.vector.SyncVectorEnv(env_fns) |
| robometer_libero_env = VectorLiberoRobometerRewardWrapper(env, |
| model_path=args.model_path, |
| device="cuda", |
| reward_relabeling_keys=["agentview_image"], |
| add_estimated_reward=True, |
| ) |
| obs, info = robometer_libero_env.reset() |
| for i in range(10): |
| actions = np.random.uniform(-1, 1, (args.num_envs, 7)) |
| obs, rewards, terminateds, truncateds, infos = robometer_libero_env.step(actions) |
| print(f"Rewards at step {i}: {rewards}") |
| |
| robometer_libero_env.close() |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|