liorbenhorin-nv's picture
update header
489d48a verified
"""IsaacLab Arena EnvHub Environment.
For more information, visit https://huggingface.co/docs/lerobot/envhub_isaaclab_arena
"""
from __future__ import annotations
import argparse
import importlib
import importlib.util
import logging
import os
import sys
from pathlib import Path
import yaml
import gymnasium as gym
from huggingface_hub import hf_hub_download
from lerobot.envs.configs import EnvConfig
# Hub constants for downloading additional files
HUB_REPO_ID = "nvidia/isaaclab-arena-envs"
EXAMPLE_ENVS = "example_envs.yaml"
def _download_hub_file(filename: str):
return hf_hub_download(repo_id=HUB_REPO_ID, filename=filename)
def _download_and_import(filename: str):
"""Download file from Hub and import as module."""
local_path = _download_hub_file(filename)
module_dir = os.path.dirname(local_path)
# Add directory to sys.path so modules can import each other
if module_dir not in sys.path:
sys.path.insert(0, module_dir)
module_name = filename.replace(".py", "")
# Read file content and replace relative imports with absolute imports
with open(local_path) as f:
content = f.read()
# Replace relative imports (e.g., "from .errors" -> "from errors")
content = content.replace("from .errors import", "from errors import")
content = content.replace("from .isaaclab_env_wrapper import", "from isaaclab_env_wrapper import")
# Create module and execute modified content
module = importlib.util.module_from_spec(importlib.util.spec_from_file_location(module_name, local_path))
sys.modules[module_name] = module
# Compile and execute the modified code
code = compile(content, local_path, "exec")
exec(code, module.__dict__) # noqa: S102
return module
try:
from .errors import IsaacLabArenaCameraKeyError, IsaacLabArenaStateKeyError
from .isaaclab_env_wrapper import IsaacLabEnvWrapper, cleanup_isaaclab
except ImportError:
_errors = _download_and_import("errors.py")
_isaaclab_wrapper = _download_and_import("isaaclab_env_wrapper.py")
IsaacLabEnvWrapper = _isaaclab_wrapper.IsaacLabEnvWrapper
cleanup_isaaclab = _isaaclab_wrapper.cleanup_isaaclab
IsaacLabArenaCameraKeyError = _errors.IsaacLabArenaCameraKeyError
IsaacLabArenaStateKeyError = _errors.IsaacLabArenaStateKeyError
def validate_config(
env,
state_keys: tuple[str, ...],
camera_keys: tuple[str, ...],
cfg_state_dim: int,
cfg_action_dim: int,
) -> None:
"""Validate observation keys and dimensions against IsaacLab managers."""
obs_manager = env.observation_manager
active_terms = obs_manager.active_terms
policy_terms = set(active_terms.get("policy", []))
camera_terms = set(active_terms.get("camera_obs", []))
# Validate keys exist
missing_state = [k for k in state_keys if k not in policy_terms]
if missing_state:
raise IsaacLabArenaStateKeyError(missing_state, policy_terms)
missing_cam = [k for k in camera_keys if k not in camera_terms]
if missing_cam:
raise IsaacLabArenaCameraKeyError(missing_cam, camera_terms)
# Validate dimensions
env_action_dim = env.action_space.shape[-1]
if cfg_action_dim != env_action_dim:
raise ValueError(f"action_dim mismatch: config={cfg_action_dim}, env={env_action_dim}")
# Compute expected state dimension
policy_dims = obs_manager.group_obs_dim.get("policy", [])
policy_names = active_terms.get("policy", [])
term_dims = dict(zip(policy_names, policy_dims, strict=False))
expected_state_dim = 0
for key in state_keys:
if key in term_dims:
shape = term_dims[key]
dim = 1
for s in shape if isinstance(shape, (tuple, list)) else [shape]:
dim *= s
expected_state_dim += dim
if cfg_state_dim != expected_state_dim:
raise ValueError(
f"state_dim mismatch: config={cfg_state_dim}, "
f"computed={expected_state_dim}. "
f"Term dims: {term_dims}"
)
logging.info(f"Validated: state_keys={state_keys}, camera_keys={camera_keys}")
def resolve_environment_alias(environment: str) -> str:
envs_mapping = _download_hub_file(EXAMPLE_ENVS)
with open(envs_mapping, "r") as f:
envs_mapping = yaml.safe_load(f)
module = (
f"{envs_mapping['repo']['base_module']}.{envs_mapping['repo']['envs'][environment]}"
)
return module
def _create_isaaclab_env(config: dict, n_envs: int) -> dict[str, dict[int, gym.vector.VectorEnv]]:
"""Create IsaacLab Arena environment from configuration.
Args:
config: Configuration dictionary.
n_envs: Number of parallel environments.
Returns:
Dict mapping environment name to {task_id: VectorEnv}.
"""
from isaaclab.app import AppLauncher
if config.get("enable_pinocchio", True):
import pinocchio # noqa: F401
# Override num_envs
config["num_envs"] = n_envs
# Create argparse namespace for IsaacLab
as_isaaclab_argparse = argparse.Namespace(**config)
logging.info("Launching IsaacLab simulation app...")
app_launcher = AppLauncher(as_isaaclab_argparse)
from isaaclab_arena.environments.arena_env_builder import ArenaEnvBuilder
environment = config.get("environment")
if environment is None:
raise ValueError("cfg.environment must be specified")
# Resolve alias and create environment
environment_path = resolve_environment_alias(environment)
logging.info(f"Creating environment: {environment_path}")
module_path, class_name = environment_path.rsplit(".", 1)
environment_module = importlib.import_module(module_path)
environment_class = getattr(environment_module, class_name)()
env_builder = ArenaEnvBuilder(environment_class.get_env(as_isaaclab_argparse), as_isaaclab_argparse)
# Determine render_mode
render_mode = "rgb_array" if config.get("enable_cameras", False) else None
raw_env = env_builder.make_registered()
# Set render_mode on underlying env
if render_mode and hasattr(raw_env, "render_mode"):
raw_env.render_mode = render_mode
logging.info(f"Set render_mode={render_mode} on underlying IsaacLab env")
# Validate config
state_keys = tuple(k.strip() for k in (config.get("state_keys") or "").split(",") if k.strip())
camera_keys = tuple(k.strip() for k in (config.get("camera_keys") or "").split(",") if k.strip())
try:
validate_config(
raw_env,
state_keys,
camera_keys,
config.get("state_dim", 54),
config.get("action_dim", 36),
)
except (IsaacLabArenaCameraKeyError, IsaacLabArenaStateKeyError, ValueError) as e:
logging.error(f"Validation failed: {e}")
cleanup_isaaclab(raw_env, app_launcher)
raise
except Exception as e:
logging.error(f"Validation failed with unexpected error: {type(e).__name__}: {e}")
cleanup_isaaclab(raw_env, app_launcher)
raise
# Get task description
task = config.get("task")
if task is None:
task = f"Complete the {environment.replace('_', ' ')} task."
# Wrap and return
wrapped_env = IsaacLabEnvWrapper(
raw_env,
episode_length=config.get("episode_length", 300),
task=task,
render_mode=render_mode,
simulation_app=app_launcher,
)
logging.info(f"Created: {environment} with {wrapped_env.num_envs} envs, render_mode={render_mode}")
return {environment: {0: wrapped_env}}
def make_env(
n_envs: int = 1,
use_async_envs: bool = False, # noqa: ARG001
cfg: EnvConfig | None = None,
) -> dict[str, dict[int, gym.vector.VectorEnv]]:
"""Create IsaacLab Arena environments (EnvHub-compatible API).
This function provides the standard EnvHub interface for loading
environments from the Hugging Face Hub. Configuration is passed
directly via kwargs or loaded from configs/config.yaml.
Args:
n_envs: Number of parallel environments (default: 1).
use_async_envs: Ignored for IsaacLab (GPU-based batched execution).
cfg: Configuration object with environment parameters. Required keys:
- environment: Environment name or alias (e.g., "gr1_microwave")
- headless: Whether to run headless (default: True)
- enable_cameras: Enable camera rendering (default: False)
- episode_length: Max steps per episode (default: 300)
- state_keys: Comma-separated observation keys
- camera_keys: Comma-separated camera observation keys
- state_dim: Expected state dimension
- action_dim: Expected action dimension
Returns:
Dict mapping environment name to {task_id: VectorEnv}.
Format: {suite_name: {0: wrapped_vector_env}}
Note:
IsaacLab environments use GPU-based batched execution, so
`use_async_envs` is ignored. The returned wrapper provides
VectorEnv compatibility.
"""
if n_envs < 1:
raise ValueError("`n_envs` must be at least 1")
if not hasattr(cfg, "environment") or cfg.environment is None:
raise ValueError(
"No 'environment' specified. Pass it via kwargs or create "
"configs/config.yaml with environment settings."
)
# Build config from cfg attributes directly (hub path) or YAML (local dev)
# if cfg is not None and hasattr(cfg, 'environment'):
# # Extract config directly from EnvConfig attributes
config = {
"environment": cfg.environment,
"embodiment": cfg.embodiment,
"object": cfg.object,
"mimic": cfg.mimic,
"teleop_device": cfg.teleop_device,
"seed": cfg.seed,
"device": cfg.device,
"disable_fabric": cfg.disable_fabric,
"enable_cameras": cfg.enable_cameras,
"headless": cfg.headless,
"enable_pinocchio": cfg.enable_pinocchio,
"episode_length": cfg.episode_length,
"state_dim": cfg.state_dim,
"action_dim": cfg.action_dim,
"camera_height": cfg.camera_height,
"camera_width": cfg.camera_width,
"video": cfg.video,
"video_length": cfg.video_length,
"video_interval": cfg.video_interval,
"state_keys": cfg.state_keys,
"camera_keys": cfg.camera_keys or "", # Pass empty string for no cameras
"task": cfg.task,
}
logging.info(f"EnvHub make_env: environment={config.get('environment')}, n_envs={n_envs}")
logging.info(f"Config: headless={config.get('headless')}, enable_cameras={config.get('enable_cameras')}")
logging.info(f"EnvHub Config: {config}")
return _create_isaaclab_env(config, n_envs)