| | from typing import List, NamedTuple |
| | import numpy as np |
| |
|
| | from mlagents.trainers.buffer import ( |
| | AgentBuffer, |
| | ObservationKeyPrefix, |
| | AgentBufferKey, |
| | BufferKey, |
| | ) |
| | from mlagents_envs.base_env import ActionTuple |
| | from mlagents.trainers.torch_entities.action_log_probs import LogProbsTuple |
| |
|
| |
|
| | class AgentStatus(NamedTuple): |
| | """ |
| | Stores observation, action, and reward for an agent. Does not have additional |
| | fields that are present in AgentExperience. |
| | """ |
| |
|
| | obs: List[np.ndarray] |
| | reward: float |
| | action: ActionTuple |
| | done: bool |
| |
|
| |
|
| | class AgentExperience(NamedTuple): |
| | """ |
| | Stores the full amount of data for an agent in one timestep. Includes |
| | the status' of group mates and the group reward, as well as the probabilities |
| | outputted by the policy. |
| | """ |
| |
|
| | obs: List[np.ndarray] |
| | reward: float |
| | done: bool |
| | action: ActionTuple |
| | action_probs: LogProbsTuple |
| | action_mask: np.ndarray |
| | prev_action: np.ndarray |
| | interrupted: bool |
| | memory: np.ndarray |
| | group_status: List[AgentStatus] |
| | group_reward: float |
| |
|
| |
|
| | class ObsUtil: |
| | @staticmethod |
| | def get_name_at(index: int) -> AgentBufferKey: |
| | """ |
| | returns the name of the observation given the index of the observation |
| | """ |
| | return ObservationKeyPrefix.OBSERVATION, index |
| |
|
| | @staticmethod |
| | def get_name_at_next(index: int) -> AgentBufferKey: |
| | """ |
| | returns the name of the next observation given the index of the observation |
| | """ |
| | return ObservationKeyPrefix.NEXT_OBSERVATION, index |
| |
|
| | @staticmethod |
| | def from_buffer(batch: AgentBuffer, num_obs: int) -> List[np.array]: |
| | """ |
| | Creates the list of observations from an AgentBuffer |
| | """ |
| | result: List[np.array] = [] |
| | for i in range(num_obs): |
| | result.append(batch[ObsUtil.get_name_at(i)]) |
| | return result |
| |
|
| | @staticmethod |
| | def from_buffer_next(batch: AgentBuffer, num_obs: int) -> List[np.array]: |
| | """ |
| | Creates the list of next observations from an AgentBuffer |
| | """ |
| | result = [] |
| | for i in range(num_obs): |
| | result.append(batch[ObsUtil.get_name_at_next(i)]) |
| | return result |
| |
|
| |
|
| | class GroupObsUtil: |
| | @staticmethod |
| | def get_name_at(index: int) -> AgentBufferKey: |
| | """ |
| | returns the name of the observation given the index of the observation |
| | """ |
| | return ObservationKeyPrefix.GROUP_OBSERVATION, index |
| |
|
| | @staticmethod |
| | def get_name_at_next(index: int) -> AgentBufferKey: |
| | """ |
| | returns the name of the next team observation given the index of the observation |
| | """ |
| | return ObservationKeyPrefix.NEXT_GROUP_OBSERVATION, index |
| |
|
| | @staticmethod |
| | def _transpose_list_of_lists( |
| | list_list: List[List[np.ndarray]], |
| | ) -> List[List[np.ndarray]]: |
| | return list(map(list, zip(*list_list))) |
| |
|
| | @staticmethod |
| | def from_buffer(batch: AgentBuffer, num_obs: int) -> List[np.array]: |
| | """ |
| | Creates the list of observations from an AgentBuffer |
| | """ |
| | separated_obs: List[np.array] = [] |
| | for i in range(num_obs): |
| | separated_obs.append( |
| | batch[GroupObsUtil.get_name_at(i)].padded_to_batch(pad_value=np.nan) |
| | ) |
| | |
| | |
| | result = GroupObsUtil._transpose_list_of_lists(separated_obs) |
| | return result |
| |
|
| | @staticmethod |
| | def from_buffer_next(batch: AgentBuffer, num_obs: int) -> List[np.array]: |
| | """ |
| | Creates the list of observations from an AgentBuffer |
| | """ |
| | separated_obs: List[np.array] = [] |
| | for i in range(num_obs): |
| | separated_obs.append( |
| | batch[GroupObsUtil.get_name_at_next(i)].padded_to_batch( |
| | pad_value=np.nan |
| | ) |
| | ) |
| | |
| | |
| | result = GroupObsUtil._transpose_list_of_lists(separated_obs) |
| | return result |
| |
|
| |
|
| | class Trajectory(NamedTuple): |
| | steps: List[AgentExperience] |
| | next_obs: List[ |
| | np.ndarray |
| | ] |
| | next_group_obs: List[List[np.ndarray]] |
| | agent_id: str |
| | behavior_id: str |
| |
|
| | def to_agentbuffer(self) -> AgentBuffer: |
| | """ |
| | Converts a Trajectory to an AgentBuffer |
| | :param trajectory: A Trajectory |
| | :returns: AgentBuffer. Note that the length of the AgentBuffer will be one |
| | less than the trajectory, as the next observation need to be populated from the last |
| | step of the trajectory. |
| | """ |
| | agent_buffer_trajectory = AgentBuffer() |
| | obs = self.steps[0].obs |
| | for step, exp in enumerate(self.steps): |
| | is_last_step = step == len(self.steps) - 1 |
| | if not is_last_step: |
| | next_obs = self.steps[step + 1].obs |
| | else: |
| | next_obs = self.next_obs |
| |
|
| | num_obs = len(obs) |
| | for i in range(num_obs): |
| | agent_buffer_trajectory[ObsUtil.get_name_at(i)].append(obs[i]) |
| | agent_buffer_trajectory[ObsUtil.get_name_at_next(i)].append(next_obs[i]) |
| |
|
| | |
| | teammate_continuous_actions, teammate_discrete_actions, teammate_rewards = ( |
| | [], |
| | [], |
| | [], |
| | ) |
| | for group_status in exp.group_status: |
| | teammate_rewards.append(group_status.reward) |
| | teammate_continuous_actions.append(group_status.action.continuous) |
| | teammate_discrete_actions.append(group_status.action.discrete) |
| |
|
| | |
| | agent_buffer_trajectory[BufferKey.GROUP_CONTINUOUS_ACTION].append( |
| | teammate_continuous_actions |
| | ) |
| | agent_buffer_trajectory[BufferKey.GROUP_DISCRETE_ACTION].append( |
| | teammate_discrete_actions |
| | ) |
| | agent_buffer_trajectory[BufferKey.GROUPMATE_REWARDS].append( |
| | teammate_rewards |
| | ) |
| | agent_buffer_trajectory[BufferKey.GROUP_REWARD].append(exp.group_reward) |
| |
|
| | |
| | teammate_cont_next_actions = [] |
| | teammate_disc_next_actions = [] |
| | if not is_last_step: |
| | next_exp = self.steps[step + 1] |
| | for group_status in next_exp.group_status: |
| | teammate_cont_next_actions.append(group_status.action.continuous) |
| | teammate_disc_next_actions.append(group_status.action.discrete) |
| | else: |
| | for group_status in exp.group_status: |
| | teammate_cont_next_actions.append(group_status.action.continuous) |
| | teammate_disc_next_actions.append(group_status.action.discrete) |
| |
|
| | agent_buffer_trajectory[BufferKey.GROUP_NEXT_CONT_ACTION].append( |
| | teammate_cont_next_actions |
| | ) |
| | agent_buffer_trajectory[BufferKey.GROUP_NEXT_DISC_ACTION].append( |
| | teammate_disc_next_actions |
| | ) |
| |
|
| | for i in range(num_obs): |
| | ith_group_obs = [] |
| | for _group_status in exp.group_status: |
| | |
| | ith_group_obs.append(_group_status.obs[i]) |
| | agent_buffer_trajectory[GroupObsUtil.get_name_at(i)].append( |
| | ith_group_obs |
| | ) |
| |
|
| | ith_group_obs_next = [] |
| | if is_last_step: |
| | for _obs in self.next_group_obs: |
| | ith_group_obs_next.append(_obs[i]) |
| | else: |
| | next_group_status = self.steps[step + 1].group_status |
| | for _group_status in next_group_status: |
| | |
| | ith_group_obs_next.append(_group_status.obs[i]) |
| | agent_buffer_trajectory[GroupObsUtil.get_name_at_next(i)].append( |
| | ith_group_obs_next |
| | ) |
| |
|
| | if exp.memory is not None: |
| | agent_buffer_trajectory[BufferKey.MEMORY].append(exp.memory) |
| |
|
| | agent_buffer_trajectory[BufferKey.MASKS].append(1.0) |
| | agent_buffer_trajectory[BufferKey.DONE].append(exp.done) |
| | agent_buffer_trajectory[BufferKey.GROUP_DONES].append( |
| | [_status.done for _status in exp.group_status] |
| | ) |
| |
|
| | |
| | agent_buffer_trajectory[BufferKey.CONTINUOUS_ACTION].append( |
| | exp.action.continuous |
| | ) |
| | agent_buffer_trajectory[BufferKey.DISCRETE_ACTION].append( |
| | exp.action.discrete |
| | ) |
| |
|
| | if not is_last_step: |
| | next_action = self.steps[step + 1].action |
| | cont_next_actions = next_action.continuous |
| | disc_next_actions = next_action.discrete |
| | else: |
| | cont_next_actions = np.zeros_like(exp.action.continuous) |
| | disc_next_actions = np.zeros_like(exp.action.discrete) |
| |
|
| | agent_buffer_trajectory[BufferKey.NEXT_CONT_ACTION].append( |
| | cont_next_actions |
| | ) |
| | agent_buffer_trajectory[BufferKey.NEXT_DISC_ACTION].append( |
| | disc_next_actions |
| | ) |
| |
|
| | agent_buffer_trajectory[BufferKey.CONTINUOUS_LOG_PROBS].append( |
| | exp.action_probs.continuous |
| | ) |
| | agent_buffer_trajectory[BufferKey.DISCRETE_LOG_PROBS].append( |
| | exp.action_probs.discrete |
| | ) |
| |
|
| | |
| | |
| | if exp.action_mask is not None: |
| | mask = 1 - np.concatenate(exp.action_mask) |
| | agent_buffer_trajectory[BufferKey.ACTION_MASK].append( |
| | mask, padding_value=1 |
| | ) |
| | else: |
| | |
| | |
| |
|
| | action_shape = exp.action.discrete.shape |
| | agent_buffer_trajectory[BufferKey.ACTION_MASK].append( |
| | np.ones(action_shape, dtype=np.float32), padding_value=1 |
| | ) |
| | agent_buffer_trajectory[BufferKey.PREV_ACTION].append(exp.prev_action) |
| | agent_buffer_trajectory[BufferKey.ENVIRONMENT_REWARDS].append(exp.reward) |
| |
|
| | |
| | obs = next_obs |
| | return agent_buffer_trajectory |
| |
|
| | @property |
| | def done_reached(self) -> bool: |
| | """ |
| | Returns true if trajectory is terminated with a Done. |
| | """ |
| | return self.steps[-1].done |
| |
|
| | @property |
| | def all_group_dones_reached(self) -> bool: |
| | """ |
| | Returns true if all other agents in this trajectory are done at the end of the trajectory. |
| | Combine with done_reached to check if the whole team is done. |
| | """ |
| | return all(_status.done for _status in self.steps[-1].group_status) |
| |
|
| | @property |
| | def interrupted(self) -> bool: |
| | """ |
| | Returns true if trajectory was terminated because max steps was reached. |
| | """ |
| | return self.steps[-1].interrupted |
| |
|