| from pathlib import Path |
| import subprocess |
| import sys |
| import gymnasium as gym |
| from gymnasium import spaces |
| import numpy as np |
| from huggingface_hub import snapshot_download |
| import os |
| import signal |
| snapshot_download("lerobot/unitree-g1-mujoco") |
|
|
|
|
| def make_env(n_envs=1, use_async_envs=False, **kwargs): |
|
|
| |
| repo_dir = Path(__file__).parent |
| run_sim = repo_dir / "run_sim.py" |
|
|
| print("=" * 60) |
| print("launching run_sim.py as subprocess") |
| print("path:", run_sim) |
| print("=" * 60) |
|
|
| |
| proc = subprocess.Popen([sys.executable, str(run_sim)]) |
| print(f"simulator started as pid={proc.pid}") |
|
|
| class DummyEnv(gym.Env): |
| metadata = {"render_modes": []} |
|
|
| def __init__(self, process): |
| super().__init__() |
| self.process = process |
| self.action_space = spaces.Box(-1, 1, shape=(1,), dtype=np.float32) |
| self.observation_space = spaces.Box(-1, 1, shape=(1,), dtype=np.float32) |
|
|
| def reset(self, seed=None, options=None): |
| super().reset(seed=seed, options=options) |
| obs = np.zeros(1, dtype=np.float32) |
| info = {} |
| return obs, info |
|
|
| def step(self, action): |
| return np.zeros(1, dtype=np.float32), 0.0, False, False, {} |
|
|
| def close(self): |
| pass |
|
|
| def kill_sim(self): |
| if self.process.poll() is None: |
| print("killing simulator subprocess...") |
| self.process.terminate() |
| try: |
| self.process.wait(timeout=2) |
| except subprocess.TimeoutExpired: |
| print("force killing simulator subprocess...") |
| self.process.kill() |
|
|
| return DummyEnv(proc) |
|
|