File size: 2,812 Bytes
8446e1f
 
 
 
 
f355fb3
8446e1f
f355fb3
 
 
 
8446e1f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c3efe87
8446e1f
c3efe87
8446e1f
c3efe87
8446e1f
c3efe87
8446e1f
 
 
c3efe87
8446e1f
 
 
 
c3efe87
8446e1f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
import gymnasium as gym
import mujoco
import numpy as np
import os

# --- 1. SO-101 ์ปค์Šคํ…€ Gym ํ™˜๊ฒฝ (์šฐ๋ฆฌ๊ฐ€ ๋งŒ๋“ค์—ˆ๋˜ ๊ฒƒ) ---

# ์ด ํŒŒ์ผ(env.py) ๊ธฐ์ค€ ์ƒ๋Œ€ ๊ฒฝ๋กœ
MODEL_PATH = os.path.join(
    os.path.dirname(__file__), "SO-ARM100", "Simulation", "SO101", "scene.xml"
)


class SO101SimEnv(gym.Env):
    """SO-101 MuJoCo ์‹œ๋ฎฌ๋ ˆ์ด์…˜์„ ์œ„ํ•œ ์ปค์Šคํ…€ Gym ํ™˜๊ฒฝ"""

    metadata = {"render_modes": ["human"], "render_fps": 30}

    def __init__(self, render_mode=None):
        if not os.path.exists(MODEL_PATH):
            raise FileNotFoundError(
                f"SO-101 ๋ชจ๋ธ ํŒŒ์ผ({MODEL_PATH})์„ ์ฐพ์„ ์ˆ˜ ์—†์Šต๋‹ˆ๋‹ค. "
                f"so100-arm ์ €์žฅ์†Œ๊ฐ€ ์˜ฌ๋ฐ”๋ฅด๊ฒŒ ๋ณต์ œ๋˜์—ˆ๋Š”์ง€ ํ™•์ธํ•˜์„ธ์š”."
            )
        self.model = mujoco.MjModel.from_xml_path(MODEL_PATH)
        self.data = mujoco.MjData(self.model)
        self.render_mode = render_mode
        self.viewer = None

        # 6๊ฐœ ๋ชจํ„ฐ
        self.action_space = gym.spaces.Box(
            low=-1.0, high=1.0, shape=(6,), dtype=np.float32
        )
        # 12๊ฐœ ์ƒํƒœ (6๊ฐœ ๊ด€์ ˆ ๊ฐ๋„ + 6๊ฐœ ๊ด€์ ˆ ์†๋„)
        self.observation_space = gym.spaces.Box(
            low=-np.inf, high=np.inf, shape=(12,), dtype=np.float32
        )

    def _get_obs(self):
        return np.concatenate([self.data.qpos[:6], self.data.qvel[:6]]).astype(
            np.float32
        )

    def step(self, action):
        self.data.ctrl[:6] = action
        mujoco.mj_step(self.model, self.data)
        if self.render_mode == "human":
            self.render()

        obs = self._get_obs()
        return obs, 0.0, False, False, {}  # (obs, reward, terminated, truncated, info)

    def reset(self, seed=None, options=None):
        super().reset(seed=seed)
        mujoco.mj_resetData(self.model, self.data)
        return self._get_obs(), {}

    def render(self):
        if self.viewer is None:
            os.environ["MUJOCO_GL"] = "egl"  # WSL ๋ Œ๋”๋ง ํ˜ธํ™˜์„ฑ
            from mujoco.viewer import launch_passive

            self.viewer = launch_passive(self.model, self.data)
        self.viewer.sync()

    def close(self):
        if self.viewer:
            self.viewer.close()


# --- 2. EnvHub๋ฅผ ์œ„ํ•œ make_env ํ•จ์ˆ˜ (์ƒˆ๋กœ์šด ๋ถ€๋ถ„) ---


def make_env(n_envs: int = 1, use_async_envs: bool = False):
    """
    LeRobot EnvHub๊ฐ€ ํ˜ธ์ถœํ•  SO-101 ํ™˜๊ฒฝ ์ƒ์„ฑ ํŒฉํ† ๋ฆฌ ํ•จ์ˆ˜
    """

    def _make_single_env():
        # 1๋ฒˆ์—์„œ ๋งŒ๋“  ์šฐ๋ฆฌ ์ปค์Šคํ…€ ํ™˜๊ฒฝ์„ ๋ฆฌํ„ด
        return SO101SimEnv(render_mode="human")

    # LeRobot ๋ฌธ์„œ์˜ ํ‘œ์ค€ ๋ฐฉ์‹ (๋ฒกํ„ฐ ํ™˜๊ฒฝ์œผ๋กœ ๋ž˜ํ•‘)
    env_cls = gym.vector.AsyncVectorEnv if use_async_envs else gym.vector.SyncVectorEnv
    vec_env = env_cls([_make_single_env for _ in range(n_envs)])

    return vec_env