File size: 4,702 Bytes
49bcfeb
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
from pathlib import Path
from typing import Any

import cv2
import gymnasium as gym
import mujoco
import numpy as np


class SO101Env(gym.Env):
    metadata = {"render_modes": ["rgb_array"], "render_fps": 4}

    def __init__(
        self,
        xml_pth: Path = Path("assets/SO-ARM100/Simulation/SO101/scene_with_cube.xml"),
        obs_h: int = 480,
        obs_w: int = 640,
        n_max_epi_steps: int = 1_000,
        cam_dis: float = 1.0,
        cam_azi: int = 100,
        n_sim_steps: int = 10,
    ) -> None:
        """Most simple S0101 environment. Reinforcement learning environment where reward is
        defined by the euclidian distance between the gripper and a red block that it needs to pick up.


        Args:
            xml_pth (Path, optional): Path to the scene .xml file that containing the robot and the cube it needs to pickup. Defaults to Path("assets/SO-ARM100/Simulation/SO101/scene_with_cube.xml").
            obs_w (int, optional): Render obs_w. Defaults to 640.
            obs_h (int, optional): _description_. Defaults to 480.
            n_max_epi_steps (int, optional): Size of on Episode. Defaults to 1_000.
            cam_dis (float, optional): Distance of the render camera to the robot. Defaults to 1.0.
            cam_azi (int, optional): Azimuth of the render camera. Defaults to 100.
            n_sim_steps (int, optional): Number of mujoco simulation steps.
        """
        self.mj_model = mujoco.MjModel.from_xml_path(str(xml_pth))
        self.mj_data = mujoco.MjData(self.mj_model)
        self.obs_h = obs_h
        self.obs_w = obs_w
        self.n_sim_steps = n_sim_steps
        self.mj_renderer = mujoco.Renderer(
            self.mj_model, height=self.obs_h, width=self.obs_w
        )

        self.n_max_epi_steps = n_max_epi_steps
        self.current_step = 0
        self.action_space = gym.spaces.Box(
            low=np.array([-1.91986, -1.74533, -1.69, -1.65806, -2.74385, -0.17453]),
            high=np.array([1.91986, 1.74533, 1.69, 1.65806, 2.84121, 1.74533]),
            shape=(self.mj_model.nu,),
            dtype=np.float32,
        )

        self.observation_space = gym.spaces.Box(
            low=0, high=255, shape=(self.obs_h, self.obs_w, 3), dtype=np.uint8
        )
        self.gripper_geom_id = mujoco.mj_name2id(
            self.mj_model, mujoco.mjtObj.mjOBJ_BODY, "moving_jaw_so101_v1"
        )
        self.cube_geom_id = mujoco.mj_name2id(
            self.mj_model, mujoco.mjtObj.mjOBJ_GEOM, "cube_geom"
        )
        self.cam_dis = cam_dis
        self.cam_azi = cam_azi

    def step(
        self, action: np.ndarray
    ) -> tuple[np.ndarray, float, bool, bool, dict[str, Any]]:
        # Apply the action and update the scene
        self.mj_data.ctrl = action
        for _ in range(self.n_sim_steps):
            mujoco.mj_step(self.mj_model, self.mj_data)
        self.mj_renderer.update_scene(self.mj_data)
        
        # Get a rendered observation
        obs = self._get_obs()

        # Compute the reward
        reward = self._compute_reward()

        # Check if the episode is terminated or truncated
        terminated = self.current_step >= self.n_max_epi_steps
        truncated = False
        info = {}

        self.current_step += 1

        return obs, reward, terminated, truncated, info

    def reset(
        self, seed: int | None = None, options: dict[str, Any] | None = None
    ) -> tuple[np.ndarray, dict[str, Any]]:
        """Reset the environment to the initial state.
        Returns:
            np.ndarray: Initial observation
            dict: Additional information
        """
        mujoco.mj_resetData(self.mj_model, self.mj_data)
        self.current_step = 0
        obs = self._get_obs()

        return obs, {}

    def _compute_reward(self) -> float:
        """Compute the reward as the negative Euclidean distance between the gripper and the cube."""
        # Get the positions of the gripper and cube geoms
        gripper_pos = self.mj_data.geom_xpos[self.gripper_geom_id]
        cube_pos = self.mj_data.geom_xpos[self.cube_geom_id]

        # Return the negative distance as the reward
        return -np.linalg.norm(gripper_pos - cube_pos)    

    def _get_obs(self) -> np.ndarray:
        """Render observation

        Returns:
            np.ndarray: Obervation, rendered image
        """
        mj_cam = mujoco.MjvCamera()
        mj_cam.distance = self.cam_dis
        mj_cam.azimuth = self.cam_azi
        self.mj_renderer.update_scene(self.mj_data, camera=mj_cam)
        return self.mj_renderer.render().copy()

    def close(self) -> None:
        # del self.mj_renderer
        del self.mj_data
        del self.mj_model