Spaces:
Running
Running
| from typing import Any, Union, List | |
| from collections import namedtuple | |
| from easydict import EasyDict | |
| import gym | |
| import copy | |
| import numpy as np | |
| from overcooked_ai_py.mdp.actions import Action, Direction | |
| from overcooked_ai_py.mdp.overcooked_mdp import PlayerState, OvercookedGridworld, OvercookedState, ObjectState, \ | |
| SoupState, Recipe | |
| from overcooked_ai_py.mdp.overcooked_env import OvercookedEnv, DEFAULT_ENV_PARAMS | |
| from ding.envs import BaseEnv | |
| from ding.utils import ENV_REGISTRY, deep_merge_dicts | |
| OvercookEnvTimestep = namedtuple('OvercookEnvTimestep', ['obs', 'reward', 'done', 'info']) | |
| # n, s = Direction.NORTH, Direction.SOUTH | |
| # e, w = Direction.EAST, Direction.WEST | |
| # stay, interact = Action.STAY, Action.INTERACT | |
| # Action.ALL_ACTIONS: [n, s, e, w, stay, interact] | |
| class OvercookEnv(BaseEnv): | |
| config = EasyDict( | |
| dict( | |
| env_name="cramped_room", | |
| horizon=400, | |
| concat_obs=False, | |
| action_mask=True, | |
| shape_reward=True, | |
| ) | |
| ) | |
| def __init__(self, cfg) -> None: | |
| self._cfg = deep_merge_dicts(self.config, cfg) | |
| self._env_name = self._cfg.env_name | |
| self._horizon = self._cfg.horizon | |
| self._concat_obs = self._cfg.concat_obs | |
| self._action_mask = self._cfg.action_mask | |
| self._shape_reward = self._cfg.shape_reward | |
| self.mdp = OvercookedGridworld.from_layout_name(self._env_name) | |
| self.base_env = OvercookedEnv.from_mdp(self.mdp, horizon=self._horizon, info_level=0) | |
| # rightnow overcook environment encoding only support 2 agent game | |
| self.agent_num = 2 | |
| self.action_dim = len(Action.ALL_ACTIONS) | |
| self.action_space = gym.spaces.Discrete(len(Action.ALL_ACTIONS)) | |
| # set up obs shape | |
| featurize_fn = lambda mdp, state: mdp.lossless_state_encoding(state) | |
| self.featurize_fn = featurize_fn | |
| dummy_mdp = self.base_env.mdp | |
| dummy_state = dummy_mdp.get_standard_start_state() | |
| obs_shape = self.featurize_fn(dummy_mdp, dummy_state)[0].shape # (5, 4, 26) | |
| obs_shape = (obs_shape[-1], *obs_shape[:-1]) # permute channel first | |
| if self._concat_obs: | |
| obs_shape = (obs_shape[0] * 2, *obs_shape[1:]) | |
| else: | |
| obs_shape = (2, ) + obs_shape | |
| self.observation_space = gym.spaces.Box(low=0, high=1, shape=obs_shape, dtype=np.int64) | |
| if self._action_mask: | |
| self.observation_space = gym.spaces.Dict( | |
| { | |
| 'agent_state': self.observation_space, | |
| 'action_mask': gym.spaces.Box( | |
| low=0, high=1, shape=(self.agent_num, self.action_dim), dtype=np.int64 | |
| ) | |
| } | |
| ) | |
| self.reward_space = gym.spaces.Box(low=0, high=100, shape=(1, ), dtype=np.float32) | |
| def seed(self, seed: int, dynamic_seed: bool = True) -> None: | |
| self._seed = seed | |
| self._dynamic_seed = dynamic_seed | |
| np.random.seed(self._seed) | |
| def close(self) -> None: | |
| # Note: the real env instance only has a empty close method, only pas | |
| pass | |
| def random_action(self): | |
| return [self.action_space.sample() for _ in range(self.agent_num)] | |
| def step(self, action): | |
| assert all(self.action_space.contains(a) for a in action), "%r (%s) invalid" % (action, type(action)) | |
| agent_action, other_agent_action = [Action.INDEX_TO_ACTION[a] for a in action] | |
| if self.agent_idx == 0: | |
| joint_action = (agent_action, other_agent_action) | |
| else: | |
| joint_action = (other_agent_action, agent_action) | |
| next_state, reward, done, env_info = self.base_env.step(joint_action) | |
| reward = np.array([float(reward)]) | |
| self._eval_episode_return += reward | |
| if self._shape_reward: | |
| self._eval_episode_return += sum(env_info['shaped_r_by_agent']) | |
| reward += sum(env_info['shaped_r_by_agent']) | |
| ob_p0, ob_p1 = self.featurize_fn(self.mdp, next_state) | |
| ob_p0, ob_p1 = self.obs_preprocess(ob_p0), self.obs_preprocess(ob_p1) | |
| if self.agent_idx == 0: | |
| both_agents_ob = [ob_p0, ob_p1] | |
| else: | |
| both_agents_ob = [ob_p1, ob_p0] | |
| if self._concat_obs: | |
| both_agents_ob = np.concatenate(both_agents_ob) | |
| else: | |
| both_agents_ob = np.stack(both_agents_ob) | |
| env_info["policy_agent_idx"] = self.agent_idx | |
| env_info["eval_episode_return"] = self._eval_episode_return | |
| env_info["other_agent_env_idx"] = 1 - self.agent_idx | |
| action_mask = self.get_action_mask() | |
| if self._action_mask: | |
| obs = { | |
| "agent_state": both_agents_ob, | |
| # "overcooked_state": self.base_env.state, | |
| "action_mask": action_mask | |
| } | |
| else: | |
| obs = both_agents_ob | |
| return OvercookEnvTimestep(obs, reward, done, env_info) | |
| def obs_preprocess(self, obs): | |
| obs = obs.transpose(2, 0, 1) | |
| return obs | |
| def reset(self): | |
| self.base_env.reset() | |
| self._eval_episode_return = 0 | |
| self.mdp = self.base_env.mdp | |
| # random init agent index | |
| self.agent_idx = np.random.choice([0, 1]) | |
| ob_p0, ob_p1 = self.featurize_fn(self.mdp, self.base_env.state) | |
| ob_p0, ob_p1 = self.obs_preprocess(ob_p0), self.obs_preprocess(ob_p1) | |
| if self.agent_idx == 0: | |
| both_agents_ob = [ob_p0, ob_p1] | |
| else: | |
| both_agents_ob = [ob_p1, ob_p0] | |
| if self._concat_obs: | |
| both_agents_ob = np.concatenate(both_agents_ob) | |
| else: | |
| both_agents_ob = np.stack(both_agents_ob) | |
| action_mask = self.get_action_mask() | |
| if self._action_mask: | |
| obs = {"agent_state": both_agents_ob, "action_mask": action_mask} | |
| else: | |
| obs = both_agents_ob | |
| return obs | |
| def get_available_actions(self): | |
| return self.mdp.get_actions(self.base_env.state) | |
| def get_action_mask(self): | |
| available_actions = self.get_available_actions() | |
| action_masks = np.zeros((self.agent_num, self.action_dim)).astype(np.int64) | |
| for i in range(self.action_dim): | |
| if Action.INDEX_TO_ACTION[i] in available_actions[0]: | |
| action_masks[0][i] = 1 | |
| if Action.INDEX_TO_ACTION[i] in available_actions[1]: | |
| action_masks[1][i] = 1 | |
| return action_masks | |
| def __repr__(self): | |
| return "DI-engine Overcooked Env" | |
| class OvercookGameEnv(BaseEnv): | |
| config = EasyDict( | |
| dict( | |
| env_name="cramped_room", | |
| horizon=400, | |
| concat_obs=False, | |
| action_mask=False, | |
| shape_reward=True, | |
| ) | |
| ) | |
| def __init__(self, cfg) -> None: | |
| self._cfg = deep_merge_dicts(self.config, cfg) | |
| self._env_name = self._cfg.env_name | |
| self._horizon = self._cfg.horizon | |
| self._concat_obs = self._cfg.concat_obs | |
| self._action_mask = self._cfg.action_mask | |
| self._shape_reward = self._cfg.shape_reward | |
| self.mdp = OvercookedGridworld.from_layout_name(self._env_name) | |
| self.base_env = OvercookedEnv.from_mdp(self.mdp, horizon=self._horizon, info_level=0) | |
| # rightnow overcook environment encoding only support 2 agent game | |
| self.agent_num = 2 | |
| self.action_dim = len(Action.ALL_ACTIONS) | |
| self.action_space = gym.spaces.Discrete(len(Action.ALL_ACTIONS)) | |
| # set up obs shape | |
| featurize_fn = lambda mdp, state: mdp.lossless_state_encoding(state) | |
| self.featurize_fn = featurize_fn | |
| dummy_mdp = self.base_env.mdp | |
| dummy_state = dummy_mdp.get_standard_start_state() | |
| obs_shape = self.featurize_fn(dummy_mdp, dummy_state)[0].shape # (5, 4, 26) | |
| obs_shape = (obs_shape[-1], *obs_shape[:-1]) # permute channel first | |
| if self._concat_obs: | |
| obs_shape = (obs_shape[0] * 2, *obs_shape[1:]) | |
| else: | |
| obs_shape = (2, ) + obs_shape | |
| self.observation_space = gym.spaces.Box(low=0, high=1, shape=obs_shape, dtype=np.int64) | |
| if self._action_mask: | |
| self.observation_space = gym.spaces.Dict( | |
| { | |
| 'agent_state': self.observation_space, | |
| 'action_mask': gym.spaces.Box( | |
| low=0, high=1, shape=(self.agent_num, self.action_dim), dtype=np.int64 | |
| ) | |
| } | |
| ) | |
| self.reward_space = gym.spaces.Box(low=0, high=100, shape=(1, ), dtype=np.float32) | |
| def seed(self, seed: int, dynamic_seed: bool = True) -> None: | |
| self._seed = seed | |
| self._dynamic_seed = dynamic_seed | |
| np.random.seed(self._seed) | |
| def close(self) -> None: | |
| # Note: the real env instance only has a empty close method, only pass | |
| pass | |
| def random_action(self): | |
| return [self.action_space.sample() for _ in range(self.agent_num)] | |
| def step(self, action): | |
| assert all(self.action_space.contains(a) for a in action), "%r (%s) invalid" % (action, type(action)) | |
| agent_action, other_agent_action = [Action.INDEX_TO_ACTION[a] for a in action] | |
| if self.agent_idx == 0: | |
| joint_action = (agent_action, other_agent_action) | |
| else: | |
| joint_action = (other_agent_action, agent_action) | |
| next_state, reward, done, env_info = self.base_env.step(joint_action) | |
| reward = np.array([float(reward)]) | |
| self._eval_episode_return += reward | |
| if self._shape_reward: | |
| self._eval_episode_return += sum(env_info['shaped_r_by_agent']) | |
| reward += sum(env_info['shaped_r_by_agent']) | |
| ob_p0, ob_p1 = self.featurize_fn(self.mdp, next_state) | |
| ob_p0, ob_p1 = self.obs_preprocess(ob_p0), self.obs_preprocess(ob_p1) | |
| if self.agent_idx == 0: | |
| both_agents_ob = [ob_p0, ob_p1] | |
| else: | |
| both_agents_ob = [ob_p1, ob_p0] | |
| if self._concat_obs: | |
| both_agents_ob = np.concatenate(both_agents_ob) | |
| else: | |
| both_agents_ob = np.stack(both_agents_ob) | |
| env_info["policy_agent_idx"] = self.agent_idx | |
| env_info["eval_episode_return"] = self._eval_episode_return | |
| env_info["other_agent_env_idx"] = 1 - self.agent_idx | |
| action_mask = self.get_action_mask() | |
| if self._action_mask: | |
| obs = {"agent_state": both_agents_ob, "action_mask": action_mask} | |
| else: | |
| obs = both_agents_ob | |
| return OvercookEnvTimestep(obs, reward, done, env_info) | |
| def obs_preprocess(self, obs): | |
| obs = obs.transpose(2, 0, 1) | |
| return obs | |
| def reset(self): | |
| self.base_env.reset() | |
| self._eval_episode_return = 0 | |
| self.mdp = self.base_env.mdp | |
| # random init agent index | |
| self.agent_idx = np.random.choice([0, 1]) | |
| #fix init agent index | |
| self.agent_idx = 0 | |
| ob_p0, ob_p1 = self.featurize_fn(self.mdp, self.base_env.state) | |
| ob_p0, ob_p1 = self.obs_preprocess(ob_p0), self.obs_preprocess(ob_p1) | |
| if self.agent_idx == 0: | |
| both_agents_ob = [ob_p0, ob_p1] | |
| else: | |
| both_agents_ob = [ob_p1, ob_p0] | |
| if self._concat_obs: | |
| both_agents_ob = np.concatenate(both_agents_ob) | |
| else: | |
| both_agents_ob = np.stack(both_agents_ob) | |
| action_mask = self.get_action_mask() | |
| if self._action_mask: | |
| obs = {"agent_state": both_agents_ob, "action_mask": action_mask} | |
| else: | |
| obs = both_agents_ob | |
| return obs | |
| def get_available_actions(self): | |
| return self.mdp.get_actions(self.base_env.state) | |
| def get_action_mask(self): | |
| available_actions = self.get_available_actions() | |
| action_masks = np.zeros((self.agent_num, self.action_dim)).astype(np.int64) | |
| for i in range(self.action_dim): | |
| if Action.INDEX_TO_ACTION[i] in available_actions[0]: | |
| action_masks[0][i] = 1 | |
| if Action.INDEX_TO_ACTION[i] in available_actions[1]: | |
| action_masks[1][i] = 1 | |
| return action_masks | |
| def __repr__(self): | |
| return "DI-engine Overcooked GameEnv" | |