|
|
from pathlib import Path |
|
|
import sys |
|
|
import gymnasium as gym |
|
|
from gymnasium import spaces |
|
|
import numpy as np |
|
|
from huggingface_hub import snapshot_download |
|
|
import yaml |
|
|
snapshot_download("lerobot/unitree-g1-mujoco") |
|
|
|
|
|
|
|
|
sys.path.insert(0, str(Path(__file__).parent)) |
|
|
|
|
|
from sim.simulator_factory import SimulatorFactory, init_channel |
|
|
|
|
|
|
|
|
def make_env(n_envs=1, use_async_envs=False, **kwargs): |
|
|
"""Create a UnitreeG1 simulation environment. |
|
|
|
|
|
Args: |
|
|
n_envs: Number of environments (currently only 1 supported) |
|
|
use_async_envs: Whether to use async envs (not implemented) |
|
|
**kwargs: Additional arguments passed to simulator |
|
|
- publish_images: bool, whether to publish camera images via ZMQ |
|
|
- camera_port: int, ZMQ port for camera images |
|
|
- cameras: list of camera names |
|
|
""" |
|
|
repo_dir = Path(__file__).parent |
|
|
|
|
|
|
|
|
config_path = repo_dir / "config.yaml" |
|
|
with open(config_path) as f: |
|
|
config = yaml.safe_load(f) |
|
|
|
|
|
|
|
|
publish_images = kwargs.get("publish_images", True) |
|
|
camera_port = kwargs.get("camera_port", 5555) |
|
|
cameras = kwargs.get("cameras", ["head_camera"]) |
|
|
|
|
|
enable_offscreen = publish_images or config.get("ENABLE_OFFSCREEN", False) |
|
|
camera_configs = {} |
|
|
if enable_offscreen: |
|
|
for cam_name in cameras: |
|
|
camera_configs[cam_name] = {"height": 480, "width": 640} |
|
|
|
|
|
|
|
|
init_channel(config=config) |
|
|
|
|
|
|
|
|
simulator = SimulatorFactory.create_simulator( |
|
|
config=config, |
|
|
env_name="default", |
|
|
onscreen=config.get("ENABLE_ONSCREEN", True), |
|
|
offscreen=enable_offscreen, |
|
|
camera_configs=camera_configs, |
|
|
) |
|
|
|
|
|
class UnitreeG1Env(gym.Env): |
|
|
"""Gymnasium environment wrapper for Unitree G1 MuJoCo simulation.""" |
|
|
|
|
|
metadata = {"render_modes": ["human"]} |
|
|
|
|
|
def __init__(self, sim, cam_configs, pub_images, cam_port): |
|
|
super().__init__() |
|
|
self.simulator = sim |
|
|
self.sim_env = sim.sim_env |
|
|
self.step_count = 0 |
|
|
self.camera_configs = cam_configs |
|
|
|
|
|
|
|
|
self.sim_dt = config["SIMULATE_DT"] |
|
|
self.viewer_dt = config.get("VIEWER_DT", 0.02) |
|
|
self.image_dt = config.get("IMAGE_DT", 0.033333) |
|
|
|
|
|
|
|
|
if pub_images and len(cam_configs) > 0: |
|
|
sim.start_image_publish_subprocess( |
|
|
start_method=config.get("MP_START_METHOD", "spawn"), |
|
|
camera_port=cam_port |
|
|
) |
|
|
print(f"Camera images publishing on tcp://localhost:{cam_port}") |
|
|
|
|
|
|
|
|
num_joints = config.get("NUM_MOTORS", 29) |
|
|
self.action_space = spaces.Box(-np.pi, np.pi, shape=(num_joints,), dtype=np.float32) |
|
|
self.observation_space = spaces.Box(-np.inf, np.inf, shape=(num_joints * 3 + 10,), dtype=np.float32) |
|
|
|
|
|
def reset(self, seed=None, options=None): |
|
|
"""Reset the simulation.""" |
|
|
super().reset(seed=seed, options=options) |
|
|
self.simulator.reset() |
|
|
self.step_count = 0 |
|
|
obs = self._get_obs() |
|
|
return obs, {} |
|
|
|
|
|
def step(self, action=None): |
|
|
"""Execute one simulation step. Caller handles timing.""" |
|
|
|
|
|
self.sim_env.sim_step() |
|
|
|
|
|
|
|
|
if self.step_count % int(self.viewer_dt / self.sim_dt) == 0: |
|
|
self.sim_env.update_viewer() |
|
|
|
|
|
|
|
|
if self.step_count % int(self.image_dt / self.sim_dt) == 0: |
|
|
self.sim_env.update_render_caches() |
|
|
|
|
|
self.step_count += 1 |
|
|
|
|
|
obs = self._get_obs() |
|
|
reward = 0.0 |
|
|
terminated = False |
|
|
truncated = False |
|
|
info = {} |
|
|
return obs, reward, terminated, truncated, info |
|
|
|
|
|
def _get_obs(self): |
|
|
"""Get current observation from simulation.""" |
|
|
obs_dict = self.sim_env.prepare_obs() |
|
|
obs = np.concatenate([ |
|
|
obs_dict.get("body_q", np.zeros(29)), |
|
|
obs_dict.get("body_dq", np.zeros(29)), |
|
|
obs_dict.get("body_tau_est", np.zeros(29)), |
|
|
obs_dict.get("floating_base_pose", np.zeros(7))[:4], |
|
|
obs_dict.get("floating_base_vel", np.zeros(6))[:3], |
|
|
obs_dict.get("floating_base_acc", np.zeros(3)), |
|
|
]).astype(np.float32) |
|
|
return obs |
|
|
|
|
|
def close(self): |
|
|
"""Close the simulation.""" |
|
|
print("++++closing simulator ++++") |
|
|
self.simulator.close() |
|
|
|
|
|
return UnitreeG1Env(simulator, camera_configs, publish_images, camera_port) |
|
|
|