import numpy as np from gym.spaces import Box from rlkit.envs.proxy_env import ProxyEnv class NormalizedBoxEnv(ProxyEnv): """ Normalize action to in [-1, 1]. Optionally normalize observations and scale reward. """ def __init__( self, env, reward_scale=1., obs_mean=None, obs_std=None, ): ProxyEnv.__init__(self, env) self._should_normalize = not (obs_mean is None and obs_std is None) if self._should_normalize: if obs_mean is None: obs_mean = np.zeros_like(env.observation_space.low) else: obs_mean = np.array(obs_mean) if obs_std is None: obs_std = np.ones_like(env.observation_space.low) else: obs_std = np.array(obs_std) self._reward_scale = reward_scale self._obs_mean = obs_mean self._obs_std = obs_std ub = np.ones(self._wrapped_env.action_space.shape) self.action_space = Box(-1 * ub, ub) def estimate_obs_stats(self, obs_batch, override_values=False): if self._obs_mean is not None and not override_values: raise Exception("Observation mean and std already set. To " "override, set override_values to True.") self._obs_mean = np.mean(obs_batch, axis=0) self._obs_std = np.std(obs_batch, axis=0) def _apply_normalize_obs(self, obs): return (obs - self._obs_mean) / (self._obs_std + 1e-8) def step(self, action): lb = self._wrapped_env.action_space.low ub = self._wrapped_env.action_space.high scaled_action = lb + (action + 1.) * 0.5 * (ub - lb) scaled_action = np.clip(scaled_action, lb, ub) wrapped_step = self._wrapped_env.step(scaled_action) next_obs, reward, terminated, truncated, info = wrapped_step if self._should_normalize: next_obs = self._apply_normalize_obs(next_obs) return next_obs, reward * self._reward_scale, terminated, truncated, info def __str__(self): return "Normalized: %s" % self._wrapped_env