""" Useful classes for supporting DeepMind MuJoCo binding. """ import gc import os from tempfile import TemporaryDirectory # DIRTY HACK copied from mujoco-py - a global lock on rendering from threading import Lock import mujoco import numpy as np _MjSim_render_lock = Lock() import ctypes import ctypes.util import os import platform import subprocess import robosuite.macros as macros _SYSTEM = platform.system() if _SYSTEM == "Windows": ctypes.WinDLL(os.path.join(os.path.dirname(__file__), "mujoco.dll")) CUDA_VISIBLE_DEVICES = os.environ.get("CUDA_VISIBLE_DEVICES", "") if CUDA_VISIBLE_DEVICES != "": MUJOCO_EGL_DEVICE_ID = os.environ.get("MUJOCO_EGL_DEVICE_ID", None) if MUJOCO_EGL_DEVICE_ID is not None: assert MUJOCO_EGL_DEVICE_ID.isdigit() and ( MUJOCO_EGL_DEVICE_ID in CUDA_VISIBLE_DEVICES ), "MUJOCO_EGL_DEVICE_ID needs to be set to one of the device id specified in CUDA_VISIBLE_DEVICES" if macros.MUJOCO_GPU_RENDERING and os.environ.get("MUJOCO_GL", None) not in ["osmesa", "glx"]: # If gpu rendering is specified in macros, then we enforce gpu # option for rendering if _SYSTEM == "Darwin": os.environ["MUJOCO_GL"] = "cgl" else: os.environ["MUJOCO_GL"] = "egl" _MUJOCO_GL = os.environ.get("MUJOCO_GL", "").lower().strip() if _MUJOCO_GL not in ("disable", "disabled", "off", "false", "0"): _VALID_MUJOCO_GL = ("enable", "enabled", "on", "true", "1", "glfw", "") if _SYSTEM == "Linux": _VALID_MUJOCO_GL += ("glx", "egl", "osmesa") elif _SYSTEM == "Windows": _VALID_MUJOCO_GL += ("wgl",) elif _SYSTEM == "Darwin": _VALID_MUJOCO_GL += ("cgl",) if _MUJOCO_GL not in _VALID_MUJOCO_GL: raise RuntimeError(f"invalid value for environment variable MUJOCO_GL: {_MUJOCO_GL}") if _SYSTEM == "Linux" and _MUJOCO_GL == "osmesa": from robosuite.renderers.context.osmesa_context import OSMesaGLContext as GLContext elif _SYSTEM == "Linux" and _MUJOCO_GL == "egl": from robosuite.renderers.context.egl_context import EGLGLContext as GLContext else: from robosuite.renderers.context.glfw_context import GLFWGLContext as GLContext class MjRenderContext: """ Class that encapsulates rendering functionality for a MuJoCo simulation. See https://github.com/openai/mujoco-py/blob/4830435a169c1f3e3b5f9b58a7c3d9c39bdf4acb/mujoco_py/mjrendercontext.pyx """ def __init__(self, sim, offscreen=True, device_id=-1, max_width=640, max_height=480): assert offscreen, "only offscreen supported for now" self.sim = sim self.offscreen = offscreen self.device_id = device_id # setup GL context with defaults for now self.gl_ctx = GLContext(max_width=max_width, max_height=max_height, device_id=self.device_id) self.gl_ctx.make_current() # Ensure the model data has been updated so that there # is something to render sim.forward() # make sure sim has this context sim.add_render_context(self) self.model = sim.model self.data = sim.data # create default scene self.scn = mujoco.MjvScene(sim.model._model, maxgeom=1000) # camera self.cam = mujoco.MjvCamera() self.cam.fixedcamid = 0 self.cam.type = mujoco.mjtCamera.mjCAMERA_FIXED # options for visual / collision mesh can be set externally, e.g. vopt.geomgroup[0], vopt.geomgroup[1] self.vopt = mujoco.MjvOption() self.pert = mujoco.MjvPerturb() self.pert.active = 0 self.pert.select = 0 self.pert.skinselect = -1 # self._markers = [] # self._overlay = {} self._set_mujoco_context_and_buffers() def _set_mujoco_context_and_buffers(self): self.con = mujoco.MjrContext(self.model._model, mujoco.mjtFontScale.mjFONTSCALE_150) mujoco.mjr_setBuffer(mujoco.mjtFramebuffer.mjFB_OFFSCREEN, self.con) def update_offscreen_size(self, width, height): if (width != self.con.offWidth) or (height != self.con.offHeight): self.model.vis.global_.offwidth = width self.model.vis.global_.offheight = height self.con.free() del self.con self._set_mujoco_context_and_buffers() def upload_texture(self, tex_id): """Uploads given texture to the GPU""" self.gl_ctx.make_current() mujoco.mjr_uploadTexture(self.model, self.con, tex_id) def render(self, width, height, camera_id=None, segmentation=False): viewport = mujoco.MjrRect(0, 0, width, height) # if self.sim.render_callback is not None: # self.sim.render_callback(self.sim, self) # update width and height of rendering context if necessary if width > self.con.offWidth or height > self.con.offHeight: new_width = max(width, self.model.vis.global_.offwidth) new_height = max(height, self.model.vis.global_.offheight) self.update_offscreen_size(new_width, new_height) if camera_id is not None: if camera_id == -1: self.cam.type = mujoco.mjtCamera.mjCAMERA_FREE else: self.cam.type = mujoco.mjtCamera.mjCAMERA_FIXED self.cam.fixedcamid = camera_id mujoco.mjv_updateScene( self.model._model, self.data._data, self.vopt, self.pert, self.cam, mujoco.mjtCatBit.mjCAT_ALL, self.scn ) if segmentation: self.scn.flags[mujoco.mjtRndFlag.mjRND_SEGMENT] = 1 self.scn.flags[mujoco.mjtRndFlag.mjRND_IDCOLOR] = 1 # for marker_params in self._markers: # self._add_marker_to_scene(marker_params) mujoco.mjr_render(viewport=viewport, scn=self.scn, con=self.con) # for gridpos, (text1, text2) in self._overlay.items(): # mjr_overlay(const.FONTSCALE_150, gridpos, rect, text1.encode(), text2.encode(), &self._con) if segmentation: self.scn.flags[mujoco.mjtRndFlag.mjRND_SEGMENT] = 0 self.scn.flags[mujoco.mjtRndFlag.mjRND_IDCOLOR] = 0 def read_pixels(self, width, height, depth=False, segmentation=False): viewport = mujoco.MjrRect(0, 0, width, height) rgb_img = np.empty((height, width, 3), dtype=np.uint8) depth_img = np.empty((height, width), dtype=np.float32) if depth else None mujoco.mjr_readPixels(rgb=rgb_img, depth=depth_img, viewport=viewport, con=self.con) ret_img = rgb_img if segmentation: seg_img = rgb_img[:, :, 0] + rgb_img[:, :, 1] * (2**8) + rgb_img[:, :, 2] * (2**16) seg_img[seg_img >= (self.scn.ngeom + 1)] = 0 seg_ids = np.full((self.scn.ngeom + 1, 2), fill_value=-1, dtype=np.int32) for i in range(self.scn.ngeom): geom = self.scn.geoms[i] if geom.segid != -1: seg_ids[geom.segid + 1, 0] = geom.objtype seg_ids[geom.segid + 1, 1] = geom.objid ret_img = seg_ids[seg_img] if depth: return (ret_img, depth_img) else: return ret_img def upload_texture(self, tex_id): """Uploads given texture to the GPU.""" self.gl_ctx.make_current() mujoco.mjr_uploadTexture(self.model, self.con, tex_id) def __del__(self): # free mujoco rendering context and GL rendering context self.con.free() self.gl_ctx.free() del self.con del self.gl_ctx del self.scn del self.cam del self.vopt del self.pert class MjRenderContextOffscreen(MjRenderContext): def __init__(self, sim, device_id, max_width=640, max_height=480): super().__init__(sim, offscreen=True, device_id=device_id, max_width=max_width, max_height=max_height) class MjSimState: """ A mujoco simulation state. """ def __init__(self, time, qpos, qvel): self.time = time self.qpos = qpos self.qvel = qvel @classmethod def from_flattened(cls, array, sim): """ Takes flat mjstate array and MjSim instance and returns MjSimState. """ idx_time = 0 idx_qpos = idx_time + 1 idx_qvel = idx_qpos + sim.model.nq time = array[idx_time] qpos = array[idx_qpos : idx_qpos + sim.model.nq] qvel = array[idx_qvel : idx_qvel + sim.model.nv] assert sim.model.na == 0 return cls(time=time, qpos=qpos, qvel=qvel) def flatten(self): return np.concatenate([[self.time], self.qpos, self.qvel], axis=0) class _MjModelMeta(type): """ Metaclass which allows MjModel below to delegate to mujoco.MjModel. Taken from dm_control: https://github.com/deepmind/dm_control/blob/main/dm_control/mujoco/wrapper/core.py#L244 """ def __new__(cls, name, bases, dct): for attr in dir(mujoco.MjModel): if not attr.startswith("_"): if attr not in dct: # pylint: disable=protected-access fget = lambda self, attr=attr: getattr(self._model, attr) fset = lambda self, value, attr=attr: setattr(self._model, attr, value) # pylint: enable=protected-access dct[attr] = property(fget, fset) return super().__new__(cls, name, bases, dct) class MjModel(metaclass=_MjModelMeta): """Wrapper class for a MuJoCo 'mjModel' instance. MjModel encapsulates features of the model that are expected to remain constant. It also contains simulation and visualization options which may be changed occasionally, although this is done explicitly by the user. """ _HAS_DYNAMIC_ATTRIBUTES = True def __init__(self, model_ptr): """Creates a new MjModel instance from a mujoco.MjModel.""" self._model = model_ptr # make useful mappings such as _body_name2id and _body_id2name self.make_mappings() @classmethod def from_xml_path(cls, xml_path): """Creates an MjModel instance from a path to a model XML file.""" model_ptr = _get_model_ptr_from_xml(xml_path=xml_path) return cls(model_ptr) def __del__(self): # free mujoco model del self._model """ Some methods supported by sim.model in mujoco-py. Copied from https://github.com/openai/mujoco-py/blob/ab86d331c9a77ae412079c6e58b8771fe63747fc/mujoco_py/generated/wrappers.pxi#L2611 """ def _extract_mj_names(self, name_adr, num_obj, obj_type): """ See https://github.com/openai/mujoco-py/blob/ab86d331c9a77ae412079c6e58b8771fe63747fc/mujoco_py/generated/wrappers.pxi#L1127 """ ### TODO: fix this to use @name_adr like mujoco-py - more robust than assuming IDs are continuous ### # objects don't need to be named in the XML, so name might be None id2name = {i: None for i in range(num_obj)} name2id = {} for i in range(num_obj): name = mujoco.mj_id2name(self._model, obj_type, i) name2id[name] = i id2name[i] = name # # objects don't need to be named in the XML, so name might be None # id2name = { i: None for i in range(num_obj) } # name2id = {} # for i in range(num_obj): # name = self.model.names[name_adr[i]] # decoded_name = name.decode() # if decoded_name: # obj_id = mujoco.mj_name2id(self.model, obj_type, name) # assert (0 <= obj_id < num_obj) and (id2name[obj_id] is None) # name2id[decoded_name] = obj_id # id2name[obj_id] = decoded_name # sort names by increasing id to keep order deterministic return tuple(id2name[nid] for nid in sorted(name2id.values())), name2id, id2name def make_mappings(self): """ Make some useful internal mappings that mujoco-py supported. """ p = self self.body_names, self._body_name2id, self._body_id2name = self._extract_mj_names( p.name_bodyadr, p.nbody, mujoco.mjtObj.mjOBJ_BODY ) self.joint_names, self._joint_name2id, self._joint_id2name = self._extract_mj_names( p.name_jntadr, p.njnt, mujoco.mjtObj.mjOBJ_JOINT ) self.geom_names, self._geom_name2id, self._geom_id2name = self._extract_mj_names( p.name_geomadr, p.ngeom, mujoco.mjtObj.mjOBJ_GEOM ) self.site_names, self._site_name2id, self._site_id2name = self._extract_mj_names( p.name_siteadr, p.nsite, mujoco.mjtObj.mjOBJ_SITE ) self.light_names, self._light_name2id, self._light_id2name = self._extract_mj_names( p.name_lightadr, p.nlight, mujoco.mjtObj.mjOBJ_LIGHT ) self.camera_names, self._camera_name2id, self._camera_id2name = self._extract_mj_names( p.name_camadr, p.ncam, mujoco.mjtObj.mjOBJ_CAMERA ) self.actuator_names, self._actuator_name2id, self._actuator_id2name = self._extract_mj_names( p.name_actuatoradr, p.nu, mujoco.mjtObj.mjOBJ_ACTUATOR ) self.sensor_names, self._sensor_name2id, self._sensor_id2name = self._extract_mj_names( p.name_sensoradr, p.nsensor, mujoco.mjtObj.mjOBJ_SENSOR ) self.tendon_names, self._tendon_name2id, self._tendon_id2name = self._extract_mj_names( p.name_tendonadr, p.ntendon, mujoco.mjtObj.mjOBJ_TENDON ) self.mesh_names, self._mesh_name2id, self._mesh_id2name = self._extract_mj_names( p.name_meshadr, p.nmesh, mujoco.mjtObj.mjOBJ_MESH ) def body_id2name(self, id): """Get body name from mujoco body id.""" if id not in self._body_id2name: raise ValueError("No body with id %d exists." % id) return self._body_id2name[id] def body_name2id(self, name): """Get body id from mujoco body name.""" if name not in self._body_name2id: raise ValueError('No "body" with name %s exists. Available "body" names = %s.' % (name, self.body_names)) return self._body_name2id[name] def joint_id2name(self, id): """Get joint name from mujoco joint id.""" if id not in self._joint_id2name: raise ValueError("No joint with id %d exists." % id) return self._joint_id2name[id] def joint_name2id(self, name): """Get joint id from joint name.""" if name not in self._joint_name2id: raise ValueError('No "joint" with name %s exists. Available "joint" names = %s.' % (name, self.joint_names)) return self._joint_name2id[name] def geom_id2name(self, id): """Get geom name from geom id.""" if id not in self._geom_id2name: raise ValueError("No geom with id %d exists." % id) return self._geom_id2name[id] def geom_name2id(self, name): """Get geom id from geom name.""" if name not in self._geom_name2id: raise ValueError('No "geom" with name %s exists. Available "geom" names = %s.' % (name, self.geom_names)) return self._geom_name2id[name] def site_id2name(self, id): """Get site name from site id.""" if id not in self._site_id2name: raise ValueError("No site with id %d exists." % id) return self._site_id2name[id] def site_name2id(self, name): """Get site id from site name.""" if name not in self._site_name2id: raise ValueError('No "site" with name %s exists. Available "site" names = %s.' % (name, self.site_names)) return self._site_name2id[name] def light_id2name(self, id): """Get light name from light id.""" if id not in self._light_id2name: raise ValueError("No light with id %d exists." % id) return self._light_id2name[id] def light_name2id(self, name): """Get light id from light name.""" if name not in self._light_name2id: raise ValueError('No "light" with name %s exists. Available "light" names = %s.' % (name, self.light_names)) return self._light_name2id[name] def camera_id2name(self, id): """Get camera name from camera id.""" if id not in self._camera_id2name: raise ValueError("No camera with id %d exists." % id) return self._camera_id2name[id] def camera_name2id(self, name): """Get camera id from camera name.""" if name not in self._camera_name2id: raise ValueError( 'No "camera" with name %s exists. Available "camera" names = %s.' % (name, self.camera_names) ) return self._camera_name2id[name] def actuator_id2name(self, id): """Get actuator name from actuator id.""" if id not in self._actuator_id2name: raise ValueError("No actuator with id %d exists." % id) return self._actuator_id2name[id] def actuator_name2id(self, name): """Get actuator id from actuator name.""" if name not in self._actuator_name2id: raise ValueError( 'No "actuator" with name %s exists. Available "actuator" names = %s.' % (name, self.actuator_names) ) return self._actuator_name2id[name] def sensor_id2name(self, id): """Get sensor name from sensor id.""" if id not in self._sensor_id2name: raise ValueError("No sensor with id %d exists." % id) return self._sensor_id2name[id] def sensor_name2id(self, name): """Get sensor id from sensor name.""" if name not in self._sensor_name2id: raise ValueError( 'No "sensor" with name %s exists. Available "sensor" names = %s.' % (name, self.sensor_names) ) return self._sensor_name2id[name] def tendon_id2name(self, id): """Get tendon name from tendon id.""" if id not in self._tendon_id2name: raise ValueError("No tendon with id %d exists." % id) return self._tendon_id2name[id] def tendon_name2id(self, name): """Get tendon id from tendon name.""" if name not in self._tendon_name2id: raise ValueError( 'No "tendon" with name %s exists. Available "tendon" names = %s.' % (name, self.tendon_names) ) return self._tendon_name2id[name] def mesh_id2name(self, id): """Get mesh name from mesh id.""" if id not in self._mesh_id2name: raise ValueError("No mesh with id %d exists." % id) return self._mesh_id2name[id] def mesh_name2id(self, name): """Get mesh id from mesh name.""" if name not in self._mesh_name2id: raise ValueError('No "mesh" with name %s exists. Available "mesh" names = %s.' % (name, self.mesh_names)) return self._mesh_name2id[name] # def userdata_id2name(self, id): # if id not in self._userdata_id2name: # raise ValueError("No userdata with id %d exists." % id) # return self._userdata_id2name[id] # def userdata_name2id(self, name): # if name not in self._userdata_name2id: # raise ValueError("No \"userdata\" with name %s exists. Available \"userdata\" names = %s." % (name, self.userdata_names)) # return self._userdata_name2id[name] def get_xml(self): with TemporaryDirectory() as td: filename = os.path.join(td, "model.xml") ret = mujoco.mj_saveLastXML(filename.encode(), self._model) return open(filename).read() def get_joint_qpos_addr(self, name): """ See https://github.com/openai/mujoco-py/blob/ab86d331c9a77ae412079c6e58b8771fe63747fc/mujoco_py/generated/wrappers.pxi#L1178 Returns the qpos address for given joint. Returns: - address (int, tuple): returns int address if 1-dim joint, otherwise returns the a (start, end) tuple for pos[start:end] access. """ joint_id = self.joint_name2id(name) joint_type = self.jnt_type[joint_id] joint_addr = self.jnt_qposadr[joint_id] if joint_type == mujoco.mjtJoint.mjJNT_FREE: ndim = 7 elif joint_type == mujoco.mjtJoint.mjJNT_BALL: ndim = 4 else: assert joint_type in (mujoco.mjtJoint.mjJNT_HINGE, mujoco.mjtJoint.mjJNT_SLIDE) ndim = 1 if ndim == 1: return joint_addr else: return (joint_addr, joint_addr + ndim) def get_joint_qvel_addr(self, name): """ See https://github.com/openai/mujoco-py/blob/ab86d331c9a77ae412079c6e58b8771fe63747fc/mujoco_py/generated/wrappers.pxi#L1202 Returns the qvel address for given joint. Returns: - address (int, tuple): returns int address if 1-dim joint, otherwise returns the a (start, end) tuple for vel[start:end] access. """ joint_id = self.joint_name2id(name) joint_type = self.jnt_type[joint_id] joint_addr = self.jnt_dofadr[joint_id] if joint_type == mujoco.mjtJoint.mjJNT_FREE: ndim = 6 elif joint_type == mujoco.mjtJoint.mjJNT_BALL: ndim = 3 else: assert joint_type in (mujoco.mjtJoint.mjJNT_HINGE, mujoco.mjtJoint.mjJNT_SLIDE) ndim = 1 if ndim == 1: return joint_addr else: return (joint_addr, joint_addr + ndim) class _MjDataMeta(type): """ Metaclass which allows MjData below to delegate to mujoco.MjData. Taken from dm_control. """ def __new__(cls, name, bases, dct): for attr in dir(mujoco.MjData): if not attr.startswith("_"): if attr not in dct: # pylint: disable=protected-access fget = lambda self, attr=attr: getattr(self._data, attr) fset = lambda self, value, attr=attr: setattr(self._data, attr, value) # pylint: enable=protected-access dct[attr] = property(fget, fset) return super().__new__(cls, name, bases, dct) class MjData(metaclass=_MjDataMeta): """Wrapper class for a MuJoCo 'mjData' instance. MjData contains all of the dynamic variables and intermediate results produced by the simulation. These are expected to change on each simulation timestep. The properties without docstrings are defined in mujoco source code from https://github.com/deepmind/mujoco/blob/062cb53a4a14b2a7a900453613a7ce498728f9d8/include/mujoco/mjdata.h#L126. """ def __init__(self, model): """Construct a new MjData instance. Args: model: An MjModel instance. """ self._model = model self._data = mujoco.MjData(model._model) @property def model(self): """The parent MjModel for this MjData instance.""" return self._model def __del__(self): # free mujoco data del self._data """ Some methods supported by sim.data in mujoco-py. Copied from https://github.com/openai/mujoco-py/blob/ab86d331c9a77ae412079c6e58b8771fe63747fc/mujoco_py/generated/wrappers.pxi#L2611 """ @property def body_xpos(self): """ Note: mujoco-py used to support sim.data.body_xpos but DM mujoco bindings requires sim.data.xpos, so we explicitly expose this as a property """ return self._data.xpos @property def body_xquat(self): """ Note: mujoco-py used to support sim.data.body_xquat but DM mujoco bindings requires sim.data.xquat, so we explicitly expose this as a property """ return self._data.xquat @property def body_xmat(self): """ Note: mujoco-py used to support sim.data.body_xmat but DM mujoco bindings requires sim.data.xmax, so we explicitly expose this as a property """ return self._data.xmat def get_body_xpos(self, name): """ Query cartesian position of a mujoco body using a name string. Args: name (str): The name of a mujoco body Returns: xpos (np.ndarray): The xpos value of the mujoco body """ bid = self.model.body_name2id(name) return self.xpos[bid] def get_body_xquat(self, name): """ Query the rotation of a mujoco body in quaternion (in wxyz convention) using a name string. Args: name (str): The name of a mujoco body Returns: xquat (np.ndarray): The xquat value of the mujoco body """ bid = self.model.body_name2id(name) return self.xquat[bid] def get_body_xmat(self, name): """ Query the rotation of a mujoco body in a rotation matrix using a name string. Args: name (str): The name of a mujoco body Returns: xmat (np.ndarray): The xmat value of the mujoco body """ bid = self.model.body_name2id(name) return self.xmat[bid].reshape((3, 3)) def get_body_jacp(self, name): """ Query the position jacobian of a mujoco body using a name string. Args: name (str): The name of a mujoco body Returns: jacp (np.ndarray): The jacp value of the mujoco body """ bid = self.model.body_name2id(name) jacp = np.zeros((3, self.model.nv)) mujoco.mj_jacBody(self.model._model, self._data, jacp, None, bid) return jacp def get_body_jacr(self, name): """ Query the rotation jacobian of a mujoco body using a name string. Args: name (str): The name of a mujoco body Returns: jacr (np.ndarray): The jacr value of the mujoco body """ bid = self.model.body_name2id(name) jacr = np.zeros((3, self.model.nv)) mujoco.mj_jacBody(self.model._model, self._data, None, jacr, bid) return jacr def get_body_xvelp(self, name): """ Query the translational velocity of a mujoco body using a name string. Args: name (str): The name of a mujoco body Returns: xvelp (np.ndarray): The translational velocity of the mujoco body. """ jacp = self.get_body_jacp(name) xvelp = np.dot(jacp, self.qvel) return xvelp def get_body_xvelr(self, name): """ Query the rotational velocity of a mujoco body using a name string. Args: name (str): The name of a mujoco body Returns: xvelr (np.ndarray): The rotational velocity of the mujoco body. """ jacr = self.get_body_jacr(name) xvelr = np.dot(jacr, self.qvel) return xvelr def get_geom_xpos(self, name): """ Query the cartesian position of a mujoco geom using a name string. Args: name (str): The name of a mujoco geom Returns: geom_xpos (np.ndarray): The cartesian position of the mujoco body. """ gid = self.model.geom_name2id(name) return self.geom_xpos[gid] def get_geom_xmat(self, name): """ Query the rotation of a mujoco geom in a rotation matrix using a name string. Args: name (str): The name of a mujoco geom Returns: geom_xmat (np.ndarray): The 3x3 rotation matrix of the mujoco geom. """ gid = self.model.geom_name2id(name) return self.geom_xmat[gid].reshape((3, 3)) def get_geom_jacp(self, name): """ Query the position jacobian of a mujoco geom using a name string. Args: name (str): The name of a mujoco geom Returns: jacp (np.ndarray): The jacp value of the mujoco geom """ gid = self.model.geom_name2id(name) jacp = np.zeros((3, self.model.nv)) mujoco.mj_jacGeom(self.model._model, self._data, jacp, None, gid) return jacp def get_geom_jacr(self, name): """ Query the rotation jacobian of a mujoco geom using a name string. Args: name (str): The name of a mujoco geom Returns: jacr (np.ndarray): The jacr value of the mujoco geom """ gid = self.model.geom_name2id(name) jacv = np.zeros((3, self.model.nv)) mujoco.mj_jacGeom(self.model._model, self._data, None, jacv, gid) return jacr def get_geom_xvelp(self, name): """ Query the translational velocity of a mujoco geom using a name string. Args: name (str): The name of a mujoco geom Returns: xvelp (np.ndarray): The translational velocity of the mujoco geom """ jacp = self.get_geom_jacp(name) xvelp = np.dot(jacp, self.qvel) return xvelp def get_geom_xvelr(self, name): """ Query the rotational velocity of a mujoco geom using a name string. Args: name (str): The name of a mujoco geom Returns: xvelr (np.ndarray): The rotational velocity of the mujoco geom """ jacr = self.get_geom_jacr(name) xvelr = np.dot(jacr, self.qvel) return xvelr def get_site_xpos(self, name): """ Query the cartesian position of a mujoco site using a name string. Args: name (str): The name of a mujoco site Returns: site_xpos (np.ndarray): The carteisan position of the mujoco site """ sid = self.model.site_name2id(name) return self.site_xpos[sid] def get_site_xmat(self, name): """ Query the rotation of a mujoco site in a rotation matrix using a name string. Args: name (str): The name of a mujoco site Returns: site_xmat (np.ndarray): The 3x3 rotation matrix of the mujoco site. """ sid = self.model.site_name2id(name) return self.site_xmat[sid].reshape((3, 3)) def get_site_jacp(self, name): """ Query the position jacobian of a mujoco site using a name string. Args: name (str): The name of a mujoco site Returns: jacp (np.ndarray): The jacp value of the mujoco site """ sid = self.model.site_name2id(name) jacp = np.zeros((3, self.model.nv)) mujoco.mj_jacSite(self.model._model, self._data, jacp, None, sid) return jacp def get_site_jacr(self, name): """ Query the rotation jacobian of a mujoco site using a name string. Args: name (str): The name of a mujoco site Returns: jacr (np.ndarray): The jacr value of the mujoco site """ sid = self.model.site_name2id(name) jacr = np.zeros((3, self.model.nv)) mujoco.mj_jacSite(self.model._model, self._data, None, jacr, sid) return jacr def get_site_xvelp(self, name): """ Query the translational velocity of a mujoco site using a name string. Args: name (str): The name of a mujoco site Returns: xvelp (np.ndarray): The translational velocity of the mujoco site """ jacp = self.get_site_jacp(name) xvelp = np.dot(jacp, self.qvel) return xvelp def get_site_xvelr(self, name): """ Query the rotational velocity of a mujoco site using a name string. Args: name (str): The name of a mujoco site Returns: xvelr (np.ndarray): The rotational velocity of the mujoco site """ jacr = self.get_site_jacr(name) xvelr = np.dot(jacr, self.qvel) return xvelr def get_camera_xpos(self, name): """ Get the cartesian position of a camera using name Args: name (str): The name of a camera Returns: cam_xpos (np.ndarray): The cartesian position of a camera """ cid = self.model.camera_name2id(name) return self.cam_xpos[cid] def get_camera_xmat(self, name): """ Get the rotation of a camera in a rotation matrix using name Args: name (str): The name of a camera Returns: cam_xmat (np.ndarray): The 3x3 rotation matrix of a camera """ cid = self.model.camera_name2id(name) return self.cam_xmat[cid].reshape((3, 3)) def get_light_xpos(self, name): """ Get cartesian position of a light source Args: name (str): The name of a lighting source Returns: light_xpos (np.ndarray): The cartesian position of the light source """ lid = self.model.light_name2id(name) return self.light_xpos[lid] def get_light_xdir(self, name): """ Get the direction of a light source using name Args: name (str): The name of a light Returns: light_xdir (np.ndarray): The direction vector of the lightsource """ lid = self.model.light_name2id(name) return self.light_xdir[lid] def get_sensor(self, name): """ Get the data of a sensor using name Args: name (str): The name of a sensor Returns: sensordata (np.ndarray): The sensor data vector """ sid = self.model.sensor_name2id(name) return self.sensordata[sid] def get_mocap_pos(self, name): """ Get the position of a mocap body using name. Args: name (str): The name of a joint Returns: mocap_pos (np.ndarray): The current position of a mocap body. """ body_id = self.model.body_name2id(name) mocap_id = self.model.body_mocapid[body_id] return self.mocap_pos[mocap_id] def set_mocap_pos(self, name, value): """ Set the quaternion of a mocap body using name. Args: name (str): The name of a joint value (float): The desired joint position of a mocap body. """ body_id = self.model.body_name2id(name) mocap_id = self.model.body_mocapid[body_id] self.mocap_pos[mocap_id] = value def get_mocap_quat(self, name): """ Get the quaternion of a mocap body using name. Args: name (str): The name of a joint Returns: mocap_quat (np.ndarray): The current quaternion of a mocap body. """ body_id = self.model.body_name2id(name) mocap_id = self.model.body_mocapid[body_id] return self.mocap_quat[mocap_id] def set_mocap_quat(self, name, value): """ Set the quaternion of a mocap body using name. Args: name (str): The name of a joint value (float): The desired joint quaternion of a mocap body. """ body_id = self.model.body_name2id(name) mocap_id = self.model.body_mocapid[body_id] self.mocap_quat[mocap_id] = value def get_joint_qpos(self, name): """ Get the position of a joint using name. Args: name (str): The name of a joint Returns: qpos (np.ndarray): The current position of a joint. """ addr = self.model.get_joint_qpos_addr(name) if isinstance(addr, (int, np.int32, np.int64)): return self.qpos[addr] else: start_i, end_i = addr return self.qpos[start_i:end_i] def set_joint_qpos(self, name, value): """ Set the velocities of a joint using name. Args: name (str): The name of a joint value (float): The desired joint velocity of a joint. """ addr = self.model.get_joint_qpos_addr(name) if isinstance(addr, (int, np.int32, np.int64)): self.qpos[addr] = value else: start_i, end_i = addr value = np.array(value) assert value.shape == (end_i - start_i,), "Value has incorrect shape %s: %s" % (name, value) self.qpos[start_i:end_i] = value def get_joint_qvel(self, name): """ Get the velocity of a joint using name. Args: name (str): The name of a joint Returns: qvel (np.ndarray): The current velocity of a joint. """ addr = self.model.get_joint_qvel_addr(name) if isinstance(addr, (int, np.int32, np.int64)): return self.qvel[addr] else: start_i, end_i = addr return self.qvel[start_i:end_i] def set_joint_qvel(self, name, value): """ Set the velocities of a mjo using name. Args: name (str): The name of a joint value (float): The desired joint velocity of a joint. """ addr = self.model.get_joint_qvel_addr(name) if isinstance(addr, (int, np.int32, np.int64)): self.qvel[addr] = value else: start_i, end_i = addr value = np.array(value) assert value.shape == (end_i - start_i,), "Value has incorrect shape %s: %s" % (name, value) self.qvel[start_i:end_i] = value class MjSim: """ Meant to somewhat replicate functionality in mujoco-py's MjSim object (see https://github.com/openai/mujoco-py/blob/master/mujoco_py/mjsim.pyx). """ def __init__(self, model): """ Args: model: should be an MjModel instance created via a factory function such as mujoco.MjModel.from_xml_string(xml) """ self.model = MjModel(model) self.data = MjData(self.model) # offscreen render context object self._render_context_offscreen = None @classmethod def from_xml_string(cls, xml): model = mujoco.MjModel.from_xml_string(xml) return cls(model) @classmethod def from_xml_file(cls, xml_file): f = open(xml_file, "r") xml = f.read() f.close() return cls.from_xml_string(xml) def reset(self): """Reset simulation.""" mujoco.mj_resetData(self.model._model, self.data._data) def forward(self): """Forward call to synchronize derived quantities.""" mujoco.mj_forward(self.model._model, self.data._data) def step(self, with_udd=True): """Step simulation.""" mujoco.mj_step(self.model._model, self.data._data) def render( self, width=None, height=None, *, camera_name=None, depth=False, mode="offscreen", device_id=-1, segmentation=False, ): """ Renders view from a camera and returns image as an `numpy.ndarray`. Args: - width (int): desired image width. - height (int): desired image height. - camera_name (str): name of camera in model. If None, the free camera will be used. - depth (bool): if True, also return depth buffer - device (int): device to use for rendering (only for GPU-backed rendering). Returns: - rgb (uint8 array): image buffer from camera - depth (float array): depth buffer from camera (only returned if depth=True) """ if camera_name is None: camera_id = None else: camera_id = self.model.camera_name2id(camera_name) assert mode == "offscreen", "only offscreen supported for now" assert self._render_context_offscreen is not None with _MjSim_render_lock: self._render_context_offscreen.render( width=width, height=height, camera_id=camera_id, segmentation=segmentation ) return self._render_context_offscreen.read_pixels(width, height, depth=depth, segmentation=segmentation) def add_render_context(self, render_context): assert render_context.offscreen if self._render_context_offscreen is not None: # free context del self._render_context_offscreen self._render_context_offscreen = render_context def get_state(self): """Return MjSimState instance for current state.""" return MjSimState( time=self.data.time, qpos=np.copy(self.data.qpos), qvel=np.copy(self.data.qvel), ) def set_state(self, value): """ Set internal state from MjSimState instance. Should call @forward afterwards to synchronize derived quantities. """ self.data.time = value.time self.data.qpos[:] = np.copy(value.qpos) self.data.qvel[:] = np.copy(value.qvel) def set_state_from_flattened(self, value): """ Set internal mujoco state using flat mjstate array. Should call @forward afterwards to synchronize derived quantities. See https://github.com/openai/mujoco-py/blob/4830435a169c1f3e3b5f9b58a7c3d9c39bdf4acb/mujoco_py/mjsimstate.pyx#L54 """ state = MjSimState.from_flattened(value, self) # do this instead of @set_state to avoid extra copy of qpos and qvel self.data.time = state.time self.data.qpos[:] = state.qpos self.data.qvel[:] = state.qvel def free(self): # clean up here to prevent memory leaks del self._render_context_offscreen del self.data del self.model del self gc.collect()