| | """ |
| | Copied from ml-agents/mlagents/trainers/tests/simple_test_envs.py |
| | |
| | Modified the env so that it doesn't automatically reset and respawn agent in order to pass |
| | pettingzoo api tests, since current PZ api test doesn't allow spawning new agents. |
| | """ |
| |
|
| | import random |
| | from typing import Dict, List, Any, Tuple |
| | import numpy as np |
| |
|
| | from mlagents_envs.base_env import ( |
| | ActionSpec, |
| | ObservationSpec, |
| | ObservationType, |
| | ActionTuple, |
| | BaseEnv, |
| | BehaviorSpec, |
| | DecisionSteps, |
| | TerminalSteps, |
| | BehaviorMapping, |
| | ) |
| | from mlagents_envs.side_channel.side_channel_manager import SideChannelManager |
| | from dummy_config import create_observation_specs_with_shapes |
| |
|
| | OBS_SIZE = 1 |
| | VIS_OBS_SIZE = (20, 20, 3) |
| | VAR_LEN_SIZE = (10, 5) |
| | STEP_SIZE = 0.2 |
| |
|
| | TIME_PENALTY = 0.01 |
| | MIN_STEPS = int(1.0 / STEP_SIZE) + 1 |
| | SUCCESS_REWARD = 1.0 + MIN_STEPS * TIME_PENALTY |
| |
|
| |
|
| | def clamp(x, min_val, max_val): |
| | return max(min_val, min(x, max_val)) |
| |
|
| |
|
| | class SimpleEnvironment(BaseEnv): |
| | """ |
| | Very simple "game" - the agent has a position on [-1, 1], gets a reward of 1 if it reaches 1, and a reward of -1 if |
| | it reaches -1. The position is incremented by the action amount (clamped to [-step_size, step_size]). |
| | """ |
| |
|
| | def __init__( |
| | self, |
| | brain_names, |
| | step_size=STEP_SIZE, |
| | num_visual=0, |
| | num_vector=1, |
| | num_var_len=0, |
| | vis_obs_size=VIS_OBS_SIZE, |
| | vec_obs_size=OBS_SIZE, |
| | var_len_obs_size=VAR_LEN_SIZE, |
| | action_sizes=(1, 0), |
| | goal_indices=None, |
| | ): |
| | super().__init__() |
| | self.num_visual = num_visual |
| | self.num_vector = num_vector |
| | self.num_var_len = num_var_len |
| | self.vis_obs_size = vis_obs_size |
| | self.vec_obs_size = vec_obs_size |
| | self.var_len_obs_size = var_len_obs_size |
| | self.goal_indices = goal_indices |
| | continuous_action_size, discrete_action_size = action_sizes |
| | discrete_tuple = tuple(2 for _ in range(discrete_action_size)) |
| | action_spec = ActionSpec(continuous_action_size, discrete_tuple) |
| | self.total_action_size = ( |
| | continuous_action_size + discrete_action_size |
| | ) |
| | self.action_spec = action_spec |
| | self.behavior_spec = BehaviorSpec(self._make_observation_specs(), action_spec) |
| | self.action_spec = action_spec |
| | self.names = brain_names |
| | self.positions: Dict[str, List[float]] = {} |
| | self.step_count: Dict[str, float] = {} |
| | self._side_channel_manager = SideChannelManager([]) |
| |
|
| | |
| | seed = ( |
| | brain_names, |
| | step_size, |
| | num_visual, |
| | num_vector, |
| | num_var_len, |
| | vis_obs_size, |
| | vec_obs_size, |
| | var_len_obs_size, |
| | action_sizes, |
| | ) |
| | self.random = random.Random(str(seed)) |
| |
|
| | self.goal: Dict[str, int] = {} |
| | self.action = {} |
| | self.rewards: Dict[str, float] = {} |
| | self.final_rewards: Dict[str, List[float]] = {} |
| | self.step_result: Dict[str, Tuple[DecisionSteps, TerminalSteps]] = {} |
| | self.agent_id: Dict[str, int] = {} |
| | self.step_size = step_size |
| | |
| | self.academy_capabilities = None |
| |
|
| | for name in self.names: |
| | self.agent_id[name] = 0 |
| | self.goal[name] = self.random.choice([-1, 1]) |
| | self.rewards[name] = 0 |
| | self.final_rewards[name] = [] |
| | self._reset_agent(name) |
| | self.action[name] = None |
| | self.step_result[name] = None |
| |
|
| | def _make_observation_specs(self) -> List[ObservationSpec]: |
| | obs_shape: List[Any] = [] |
| | for _ in range(self.num_vector): |
| | obs_shape.append((self.vec_obs_size,)) |
| | for _ in range(self.num_visual): |
| | obs_shape.append(self.vis_obs_size) |
| | for _ in range(self.num_var_len): |
| | obs_shape.append(self.var_len_obs_size) |
| | obs_spec = create_observation_specs_with_shapes(obs_shape) |
| | if self.goal_indices is not None: |
| | for i in range(len(obs_spec)): |
| | if i in self.goal_indices: |
| | obs_spec[i] = ObservationSpec( |
| | shape=obs_spec[i].shape, |
| | dimension_property=obs_spec[i].dimension_property, |
| | observation_type=ObservationType.GOAL_SIGNAL, |
| | name=obs_spec[i].name, |
| | ) |
| | return obs_spec |
| |
|
| | def _make_obs(self, value: float) -> List[np.ndarray]: |
| | obs = [] |
| | for _ in range(self.num_vector): |
| | obs.append(np.ones((1, self.vec_obs_size), dtype=np.float32) * value) |
| | for _ in range(self.num_visual): |
| | obs.append(np.ones((1,) + self.vis_obs_size, dtype=np.float32) * value) |
| | for _ in range(self.num_var_len): |
| | obs.append(np.ones((1,) + self.var_len_obs_size, dtype=np.float32) * value) |
| | return obs |
| |
|
| | @property |
| | def behavior_specs(self): |
| | behavior_dict = {} |
| | for n in self.names: |
| | behavior_dict[n] = self.behavior_spec |
| | return BehaviorMapping(behavior_dict) |
| |
|
| | def set_action_for_agent(self, behavior_name, agent_id, action): |
| | pass |
| |
|
| | def set_actions(self, behavior_name, action): |
| | self.action[behavior_name] = action |
| |
|
| | def get_steps(self, behavior_name): |
| | return self.step_result[behavior_name] |
| |
|
| | def _take_action(self, name: str) -> bool: |
| | deltas = [] |
| | _act = self.action[name] |
| | if self.action_spec.continuous_size > 0 and not _act: |
| | for _cont in _act.continuous[0]: |
| | deltas.append(_cont) |
| | if self.action_spec.discrete_size > 0 and not _act: |
| | for _disc in _act.discrete[0]: |
| | deltas.append(1 if _disc else -1) |
| | for i, _delta in enumerate(deltas): |
| | _delta = clamp(_delta, -self.step_size, self.step_size) |
| | self.positions[name][i] += _delta |
| | self.positions[name][i] = clamp(self.positions[name][i], -1, 1) |
| | self.step_count[name] += 1 |
| | |
| | done = all(pos >= 1.0 or pos <= -1.0 for pos in self.positions[name]) |
| | return done |
| |
|
| | def _generate_mask(self): |
| | action_mask = None |
| | if self.action_spec.discrete_size > 0: |
| | |
| | ndmask = np.array(2 * self.action_spec.discrete_size * [False], dtype=bool) |
| | ndmask = np.expand_dims(ndmask, axis=0) |
| | action_mask = [ndmask] |
| | return action_mask |
| |
|
| | def _compute_reward(self, name: str, done: bool) -> float: |
| | if done: |
| | reward = 0.0 |
| | for _pos in self.positions[name]: |
| | reward += (SUCCESS_REWARD * _pos * self.goal[name]) / len( |
| | self.positions[name] |
| | ) |
| | else: |
| | reward = -TIME_PENALTY |
| | return reward |
| |
|
| | def _reset_agent(self, name): |
| | self.goal[name] = self.random.choice([-1, 1]) |
| | self.positions[name] = [0.0 for _ in range(self.total_action_size)] |
| | self.step_count[name] = 0 |
| | self.rewards[name] = 0 |
| | self.agent_id[name] = self.agent_id[name] + 1 |
| |
|
| | def _make_batched_step( |
| | self, name: str, done: bool, reward: float, group_reward: float |
| | ) -> Tuple[DecisionSteps, TerminalSteps]: |
| | m_vector_obs = self._make_obs(self.goal[name]) |
| | m_reward = np.array([reward], dtype=np.float32) |
| | m_agent_id = np.array([self.agent_id[name]], dtype=np.int32) |
| | m_group_id = np.array([0], dtype=np.int32) |
| | m_group_reward = np.array([group_reward], dtype=np.float32) |
| | action_mask = self._generate_mask() |
| | decision_step = DecisionSteps( |
| | m_vector_obs, m_reward, m_agent_id, action_mask, m_group_id, m_group_reward |
| | ) |
| | terminal_step = TerminalSteps.empty(self.behavior_spec) |
| | if done: |
| | self.final_rewards[name].append(self.rewards[name]) |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | decision_step = DecisionSteps([], [], [], [], [], []) |
| | terminal_step = TerminalSteps( |
| | m_vector_obs, |
| | m_reward, |
| | np.array([False], dtype=bool), |
| | m_agent_id, |
| | m_group_id, |
| | m_group_reward, |
| | ) |
| | return (decision_step, terminal_step) |
| |
|
| | def _construct_reset_step( |
| | self, name: str |
| | ) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray]: |
| | new_reward = np.array([0.0], dtype=np.float32) |
| | new_done = np.array([False], dtype=bool) |
| | new_agent_id = np.array([self.agent_id[name]], dtype=np.int32) |
| | new_action_mask = self._generate_mask() |
| | new_group_id = np.array([0], dtype=np.int32) |
| | new_group_reward = np.array([0.0], dtype=np.float32) |
| | return ( |
| | new_reward, |
| | new_done, |
| | new_agent_id, |
| | new_action_mask, |
| | new_group_id, |
| | new_group_reward, |
| | ) |
| |
|
| | def step(self) -> None: |
| | assert all(action is not None for action in self.action.values()) |
| | for name in self.names: |
| | done = self._take_action(name) |
| | reward = self._compute_reward(name, done) |
| | self.rewards[name] += reward |
| | self.step_result[name] = self._make_batched_step(name, done, reward, 0.0) |
| |
|
| | def reset(self) -> None: |
| | for name in self.names: |
| | self._reset_agent(name) |
| | self.step_result[name] = self._make_batched_step(name, False, 0.0, 0.0) |
| |
|
| | @property |
| | def reset_parameters(self) -> Dict[str, str]: |
| | return {} |
| |
|
| | def close(self): |
| | pass |
| |
|
| |
|
| | class MultiAgentEnvironment(BaseEnv): |
| | """ |
| | The MultiAgentEnvironment maintains a list of SimpleEnvironment, one for each agent. |
| | When sending DecisionSteps and TerminalSteps to the trainers, it first batches the |
| | decision steps from the individual environments. When setting actions, it indexes the |
| | batched ActionTuple to obtain the ActionTuple for individual agents |
| | """ |
| |
|
| | def __init__( |
| | self, |
| | brain_names, |
| | step_size=STEP_SIZE, |
| | num_visual=0, |
| | num_vector=1, |
| | num_var_len=0, |
| | vis_obs_size=VIS_OBS_SIZE, |
| | vec_obs_size=OBS_SIZE, |
| | var_len_obs_size=VAR_LEN_SIZE, |
| | action_sizes=(1, 0), |
| | num_agents=2, |
| | goal_indices=None, |
| | ): |
| | super().__init__() |
| | self.envs = {} |
| | self.dones = {} |
| | self.just_died = set() |
| | self.names = brain_names |
| | self.final_rewards: Dict[str, List[float]] = {} |
| | for name in brain_names: |
| | self.final_rewards[name] = [] |
| | for i in range(num_agents): |
| | name_and_num = name + str(i) |
| | self.envs[name_and_num] = SimpleEnvironment( |
| | [name], |
| | step_size, |
| | num_visual, |
| | num_vector, |
| | num_var_len, |
| | vis_obs_size, |
| | vec_obs_size, |
| | var_len_obs_size, |
| | action_sizes, |
| | goal_indices, |
| | ) |
| | self.dones[name_and_num] = False |
| | self.envs[name_and_num].reset() |
| | |
| | self.behavior_spec = self.envs[name_and_num].behavior_spec |
| | self.action_spec = self.envs[name_and_num].action_spec |
| | self.num_agents = num_agents |
| | self._side_channel_manager = SideChannelManager([]) |
| |
|
| | @property |
| | def all_done(self): |
| | return all(self.dones.values()) |
| |
|
| | @property |
| | def behavior_specs(self): |
| | behavior_dict = {} |
| | for n in self.names: |
| | behavior_dict[n] = self.behavior_spec |
| | return BehaviorMapping(behavior_dict) |
| |
|
| | def set_action_for_agent(self, behavior_name, agent_id, action): |
| | pass |
| |
|
| | def set_actions(self, behavior_name, action): |
| | |
| | |
| | |
| | |
| | j = 0 |
| | for i in range(self.num_agents): |
| | _act = ActionTuple() |
| | name_and_num = behavior_name + str(i) |
| | env = self.envs[name_and_num] |
| | if not self.dones[name_and_num]: |
| | if self.action_spec.continuous_size > 0: |
| | _act.add_continuous(action.continuous[j : j + 1]) |
| | if self.action_spec.discrete_size > 0: |
| | _disc_list = [action.discrete[j, :]] |
| | _act.add_discrete(np.array(_disc_list)) |
| | j += 1 |
| | env.action[behavior_name] = _act |
| |
|
| | def get_steps(self, behavior_name): |
| | |
| | |
| | |
| | dec_vec_obs = [] |
| | dec_reward = [] |
| | dec_group_reward = [] |
| | dec_agent_id = [] |
| | dec_group_id = [] |
| | ter_vec_obs = [] |
| | ter_reward = [] |
| | ter_group_reward = [] |
| | ter_agent_id = [] |
| | ter_group_id = [] |
| | interrupted = [] |
| |
|
| | action_mask = None |
| | terminal_step = TerminalSteps.empty(self.behavior_spec) |
| | decision_step = None |
| | for i in range(self.num_agents): |
| | name_and_num = behavior_name + str(i) |
| | env = self.envs[name_and_num] |
| | _dec, _term = env.step_result[behavior_name] |
| | if not self.dones[name_and_num]: |
| | dec_agent_id.append(i) |
| | dec_group_id.append(1) |
| | if len(dec_vec_obs) > 0: |
| | for j, obs in enumerate(_dec.obs): |
| | dec_vec_obs[j] = np.concatenate((dec_vec_obs[j], obs), axis=0) |
| | else: |
| | for obs in _dec.obs: |
| | dec_vec_obs.append(obs) |
| | dec_reward.append(_dec.reward[0]) |
| | dec_group_reward.append(_dec.group_reward[0]) |
| | if _dec.action_mask is not None: |
| | if action_mask is None: |
| | action_mask = [] |
| | if len(action_mask) > 0: |
| | action_mask[0] = np.concatenate( |
| | (action_mask[0], _dec.action_mask[0]), axis=0 |
| | ) |
| | else: |
| | action_mask.append(_dec.action_mask[0]) |
| | if len(_term.reward) > 0 and name_and_num in self.just_died: |
| | ter_agent_id.append(i) |
| | ter_group_id.append(1) |
| | if len(ter_vec_obs) > 0: |
| | for j, obs in enumerate(_term.obs): |
| | ter_vec_obs[j] = np.concatenate((ter_vec_obs[j], obs), axis=0) |
| | else: |
| | for obs in _term.obs: |
| | ter_vec_obs.append(obs) |
| | ter_reward.append(_term.reward[0]) |
| | ter_group_reward.append(_term.group_reward[0]) |
| | interrupted.append(False) |
| | self.just_died.remove(name_and_num) |
| | decision_step = DecisionSteps( |
| | dec_vec_obs, |
| | dec_reward, |
| | dec_agent_id, |
| | action_mask, |
| | dec_group_id, |
| | dec_group_reward, |
| | ) |
| | terminal_step = TerminalSteps( |
| | ter_vec_obs, |
| | ter_reward, |
| | interrupted, |
| | ter_agent_id, |
| | ter_group_id, |
| | ter_group_reward, |
| | ) |
| | if self.all_done: |
| | decision_step = DecisionSteps([], [], [], [], [], []) |
| | return (decision_step, terminal_step) |
| |
|
| | def step(self) -> None: |
| | |
| | for name in self.names: |
| | for i in range(self.num_agents): |
| | name_and_num = name + str(i) |
| | |
| | if not self.dones[name_and_num]: |
| | env = self.envs[name_and_num] |
| | |
| | assert all(action is not None for action in env.action.values()) |
| | done = env._take_action(name) |
| | reward = env._compute_reward(name, done) |
| | self.dones[name_and_num] = done |
| | if done: |
| | self.just_died.add(name_and_num) |
| | if self.all_done: |
| | env.step_result[name] = env._make_batched_step( |
| | name, done, 0.0, reward |
| | ) |
| | self.final_rewards[name].append(reward) |
| | |
| | elif done: |
| | |
| | |
| | |
| | ceil_reward = min(-TIME_PENALTY, reward) |
| | env.step_result[name] = env._make_batched_step( |
| | name, done, ceil_reward, 0.0 |
| | ) |
| | self.final_rewards[name].append(reward) |
| |
|
| | else: |
| | env.step_result[name] = env._make_batched_step( |
| | name, done, reward, 0.0 |
| | ) |
| |
|
| | def reset(self) -> None: |
| | for name in self.names: |
| | for i in range(self.num_agents): |
| | name_and_num = name + str(i) |
| | self.dones[name_and_num] = False |
| |
|
| | self.dones = {} |
| | self.just_died = set() |
| | self.final_rewards = {} |
| | for name in self.names: |
| | self.final_rewards[name] = [] |
| | for i in range(self.num_agents): |
| | name_and_num = name + str(i) |
| | self.dones[name_and_num] = False |
| | self.envs[name_and_num].reset() |
| |
|
| | @property |
| | def reset_parameters(self) -> Dict[str, str]: |
| | return {} |
| |
|
| | def close(self): |
| | pass |
| |
|