|
|
"""IsaacLab Arena EnvHub Environment. |
|
|
|
|
|
For more information, visit https://huggingface.co/docs/lerobot/envhub_isaaclab_arena |
|
|
""" |
|
|
|
|
|
from __future__ import annotations |
|
|
|
|
|
import argparse |
|
|
import importlib |
|
|
import importlib.util |
|
|
import logging |
|
|
import os |
|
|
import sys |
|
|
from pathlib import Path |
|
|
import yaml |
|
|
|
|
|
import gymnasium as gym |
|
|
|
|
|
from huggingface_hub import hf_hub_download |
|
|
from lerobot.envs.configs import EnvConfig |
|
|
|
|
|
|
|
|
HUB_REPO_ID = "nvidia/isaaclab-arena-envs" |
|
|
EXAMPLE_ENVS = "example_envs.yaml" |
|
|
|
|
|
def _download_hub_file(filename: str): |
|
|
return hf_hub_download(repo_id=HUB_REPO_ID, filename=filename) |
|
|
|
|
|
def _download_and_import(filename: str): |
|
|
"""Download file from Hub and import as module.""" |
|
|
local_path = _download_hub_file(filename) |
|
|
module_dir = os.path.dirname(local_path) |
|
|
|
|
|
|
|
|
if module_dir not in sys.path: |
|
|
sys.path.insert(0, module_dir) |
|
|
|
|
|
module_name = filename.replace(".py", "") |
|
|
|
|
|
|
|
|
with open(local_path) as f: |
|
|
content = f.read() |
|
|
|
|
|
|
|
|
content = content.replace("from .errors import", "from errors import") |
|
|
content = content.replace("from .isaaclab_env_wrapper import", "from isaaclab_env_wrapper import") |
|
|
|
|
|
|
|
|
module = importlib.util.module_from_spec(importlib.util.spec_from_file_location(module_name, local_path)) |
|
|
sys.modules[module_name] = module |
|
|
|
|
|
|
|
|
code = compile(content, local_path, "exec") |
|
|
exec(code, module.__dict__) |
|
|
|
|
|
return module |
|
|
|
|
|
|
|
|
try: |
|
|
from .errors import IsaacLabArenaCameraKeyError, IsaacLabArenaStateKeyError |
|
|
from .isaaclab_env_wrapper import IsaacLabEnvWrapper, cleanup_isaaclab |
|
|
except ImportError: |
|
|
_errors = _download_and_import("errors.py") |
|
|
_isaaclab_wrapper = _download_and_import("isaaclab_env_wrapper.py") |
|
|
IsaacLabEnvWrapper = _isaaclab_wrapper.IsaacLabEnvWrapper |
|
|
cleanup_isaaclab = _isaaclab_wrapper.cleanup_isaaclab |
|
|
IsaacLabArenaCameraKeyError = _errors.IsaacLabArenaCameraKeyError |
|
|
IsaacLabArenaStateKeyError = _errors.IsaacLabArenaStateKeyError |
|
|
|
|
|
|
|
|
def validate_config( |
|
|
env, |
|
|
state_keys: tuple[str, ...], |
|
|
camera_keys: tuple[str, ...], |
|
|
cfg_state_dim: int, |
|
|
cfg_action_dim: int, |
|
|
) -> None: |
|
|
"""Validate observation keys and dimensions against IsaacLab managers.""" |
|
|
obs_manager = env.observation_manager |
|
|
active_terms = obs_manager.active_terms |
|
|
policy_terms = set(active_terms.get("policy", [])) |
|
|
camera_terms = set(active_terms.get("camera_obs", [])) |
|
|
|
|
|
|
|
|
missing_state = [k for k in state_keys if k not in policy_terms] |
|
|
if missing_state: |
|
|
raise IsaacLabArenaStateKeyError(missing_state, policy_terms) |
|
|
|
|
|
missing_cam = [k for k in camera_keys if k not in camera_terms] |
|
|
if missing_cam: |
|
|
raise IsaacLabArenaCameraKeyError(missing_cam, camera_terms) |
|
|
|
|
|
|
|
|
env_action_dim = env.action_space.shape[-1] |
|
|
if cfg_action_dim != env_action_dim: |
|
|
raise ValueError(f"action_dim mismatch: config={cfg_action_dim}, env={env_action_dim}") |
|
|
|
|
|
|
|
|
policy_dims = obs_manager.group_obs_dim.get("policy", []) |
|
|
policy_names = active_terms.get("policy", []) |
|
|
term_dims = dict(zip(policy_names, policy_dims, strict=False)) |
|
|
|
|
|
expected_state_dim = 0 |
|
|
for key in state_keys: |
|
|
if key in term_dims: |
|
|
shape = term_dims[key] |
|
|
dim = 1 |
|
|
for s in shape if isinstance(shape, (tuple, list)) else [shape]: |
|
|
dim *= s |
|
|
expected_state_dim += dim |
|
|
|
|
|
if cfg_state_dim != expected_state_dim: |
|
|
raise ValueError( |
|
|
f"state_dim mismatch: config={cfg_state_dim}, " |
|
|
f"computed={expected_state_dim}. " |
|
|
f"Term dims: {term_dims}" |
|
|
) |
|
|
|
|
|
logging.info(f"Validated: state_keys={state_keys}, camera_keys={camera_keys}") |
|
|
|
|
|
|
|
|
def resolve_environment_alias(environment: str) -> str: |
|
|
envs_mapping = _download_hub_file(EXAMPLE_ENVS) |
|
|
with open(envs_mapping, "r") as f: |
|
|
envs_mapping = yaml.safe_load(f) |
|
|
|
|
|
module = ( |
|
|
f"{envs_mapping['repo']['base_module']}.{envs_mapping['repo']['envs'][environment]}" |
|
|
) |
|
|
return module |
|
|
|
|
|
def _create_isaaclab_env(config: dict, n_envs: int) -> dict[str, dict[int, gym.vector.VectorEnv]]: |
|
|
"""Create IsaacLab Arena environment from configuration. |
|
|
|
|
|
Args: |
|
|
config: Configuration dictionary. |
|
|
n_envs: Number of parallel environments. |
|
|
|
|
|
Returns: |
|
|
Dict mapping environment name to {task_id: VectorEnv}. |
|
|
""" |
|
|
from isaaclab.app import AppLauncher |
|
|
|
|
|
if config.get("enable_pinocchio", True): |
|
|
import pinocchio |
|
|
|
|
|
|
|
|
config["num_envs"] = n_envs |
|
|
|
|
|
|
|
|
as_isaaclab_argparse = argparse.Namespace(**config) |
|
|
|
|
|
logging.info("Launching IsaacLab simulation app...") |
|
|
app_launcher = AppLauncher(as_isaaclab_argparse) |
|
|
|
|
|
from isaaclab_arena.environments.arena_env_builder import ArenaEnvBuilder |
|
|
|
|
|
environment = config.get("environment") |
|
|
if environment is None: |
|
|
raise ValueError("cfg.environment must be specified") |
|
|
|
|
|
|
|
|
environment_path = resolve_environment_alias(environment) |
|
|
logging.info(f"Creating environment: {environment_path}") |
|
|
|
|
|
module_path, class_name = environment_path.rsplit(".", 1) |
|
|
environment_module = importlib.import_module(module_path) |
|
|
environment_class = getattr(environment_module, class_name)() |
|
|
|
|
|
env_builder = ArenaEnvBuilder(environment_class.get_env(as_isaaclab_argparse), as_isaaclab_argparse) |
|
|
|
|
|
|
|
|
render_mode = "rgb_array" if config.get("enable_cameras", False) else None |
|
|
|
|
|
raw_env = env_builder.make_registered() |
|
|
|
|
|
|
|
|
if render_mode and hasattr(raw_env, "render_mode"): |
|
|
raw_env.render_mode = render_mode |
|
|
logging.info(f"Set render_mode={render_mode} on underlying IsaacLab env") |
|
|
|
|
|
|
|
|
state_keys = tuple(k.strip() for k in (config.get("state_keys") or "").split(",") if k.strip()) |
|
|
camera_keys = tuple(k.strip() for k in (config.get("camera_keys") or "").split(",") if k.strip()) |
|
|
try: |
|
|
validate_config( |
|
|
raw_env, |
|
|
state_keys, |
|
|
camera_keys, |
|
|
config.get("state_dim", 54), |
|
|
config.get("action_dim", 36), |
|
|
) |
|
|
except (IsaacLabArenaCameraKeyError, IsaacLabArenaStateKeyError, ValueError) as e: |
|
|
logging.error(f"Validation failed: {e}") |
|
|
cleanup_isaaclab(raw_env, app_launcher) |
|
|
raise |
|
|
except Exception as e: |
|
|
logging.error(f"Validation failed with unexpected error: {type(e).__name__}: {e}") |
|
|
cleanup_isaaclab(raw_env, app_launcher) |
|
|
raise |
|
|
|
|
|
|
|
|
task = config.get("task") |
|
|
if task is None: |
|
|
task = f"Complete the {environment.replace('_', ' ')} task." |
|
|
|
|
|
|
|
|
wrapped_env = IsaacLabEnvWrapper( |
|
|
raw_env, |
|
|
episode_length=config.get("episode_length", 300), |
|
|
task=task, |
|
|
render_mode=render_mode, |
|
|
simulation_app=app_launcher, |
|
|
) |
|
|
logging.info(f"Created: {environment} with {wrapped_env.num_envs} envs, render_mode={render_mode}") |
|
|
|
|
|
return {environment: {0: wrapped_env}} |
|
|
|
|
|
|
|
|
def make_env( |
|
|
n_envs: int = 1, |
|
|
use_async_envs: bool = False, |
|
|
cfg: EnvConfig | None = None, |
|
|
) -> dict[str, dict[int, gym.vector.VectorEnv]]: |
|
|
"""Create IsaacLab Arena environments (EnvHub-compatible API). |
|
|
|
|
|
This function provides the standard EnvHub interface for loading |
|
|
environments from the Hugging Face Hub. Configuration is passed |
|
|
directly via kwargs or loaded from configs/config.yaml. |
|
|
|
|
|
Args: |
|
|
n_envs: Number of parallel environments (default: 1). |
|
|
use_async_envs: Ignored for IsaacLab (GPU-based batched execution). |
|
|
cfg: Configuration object with environment parameters. Required keys: |
|
|
- environment: Environment name or alias (e.g., "gr1_microwave") |
|
|
- headless: Whether to run headless (default: True) |
|
|
- enable_cameras: Enable camera rendering (default: False) |
|
|
- episode_length: Max steps per episode (default: 300) |
|
|
- state_keys: Comma-separated observation keys |
|
|
- camera_keys: Comma-separated camera observation keys |
|
|
- state_dim: Expected state dimension |
|
|
- action_dim: Expected action dimension |
|
|
|
|
|
Returns: |
|
|
Dict mapping environment name to {task_id: VectorEnv}. |
|
|
Format: {suite_name: {0: wrapped_vector_env}} |
|
|
|
|
|
Note: |
|
|
IsaacLab environments use GPU-based batched execution, so |
|
|
`use_async_envs` is ignored. The returned wrapper provides |
|
|
VectorEnv compatibility. |
|
|
""" |
|
|
if n_envs < 1: |
|
|
raise ValueError("`n_envs` must be at least 1") |
|
|
|
|
|
if not hasattr(cfg, "environment") or cfg.environment is None: |
|
|
raise ValueError( |
|
|
"No 'environment' specified. Pass it via kwargs or create " |
|
|
"configs/config.yaml with environment settings." |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
config = { |
|
|
"environment": cfg.environment, |
|
|
"embodiment": cfg.embodiment, |
|
|
"object": cfg.object, |
|
|
"mimic": cfg.mimic, |
|
|
"teleop_device": cfg.teleop_device, |
|
|
"seed": cfg.seed, |
|
|
"device": cfg.device, |
|
|
"disable_fabric": cfg.disable_fabric, |
|
|
"enable_cameras": cfg.enable_cameras, |
|
|
"headless": cfg.headless, |
|
|
"enable_pinocchio": cfg.enable_pinocchio, |
|
|
"episode_length": cfg.episode_length, |
|
|
"state_dim": cfg.state_dim, |
|
|
"action_dim": cfg.action_dim, |
|
|
"camera_height": cfg.camera_height, |
|
|
"camera_width": cfg.camera_width, |
|
|
"video": cfg.video, |
|
|
"video_length": cfg.video_length, |
|
|
"video_interval": cfg.video_interval, |
|
|
"state_keys": cfg.state_keys, |
|
|
"camera_keys": cfg.camera_keys or "", |
|
|
"task": cfg.task, |
|
|
} |
|
|
|
|
|
logging.info(f"EnvHub make_env: environment={config.get('environment')}, n_envs={n_envs}") |
|
|
logging.info(f"Config: headless={config.get('headless')}, enable_cameras={config.get('enable_cameras')}") |
|
|
logging.info(f"EnvHub Config: {config}") |
|
|
|
|
|
return _create_isaaclab_env(config, n_envs) |
|
|
|