| |
| |
|
|
| from collections import defaultdict |
| from typing import Deque, Dict, DefaultDict, List |
|
|
| import numpy as np |
|
|
| from mlagents_envs.logging_util import get_logger |
| from mlagents_envs.base_env import BehaviorSpec |
| from mlagents.trainers.policy import Policy |
|
|
| from mlagents.trainers.trainer import Trainer |
| from mlagents.trainers.optimizer.torch_optimizer import TorchOptimizer |
| from mlagents.trainers.trajectory import Trajectory |
| from mlagents.trainers.agent_processor import AgentManagerQueue |
| from mlagents.trainers.stats import StatsPropertyType |
| from mlagents.trainers.behavior_id_utils import ( |
| BehaviorIdentifiers, |
| create_name_behavior_id, |
| ) |
| from mlagents.trainers.training_status import GlobalTrainingStatus, StatusType |
|
|
|
|
| logger = get_logger(__name__) |
|
|
|
|
| class GhostTrainer(Trainer): |
| """ |
| The GhostTrainer trains agents in adversarial games (there are teams in opposition) using a self-play mechanism. |
| In adversarial settings with self-play, at any time, there is only a single learning team. The other team(s) is |
| "ghosted" which means that its agents are executing fixed policies and not learning. The GhostTrainer wraps |
| a standard RL trainer which trains the learning team and ensures that only the trajectories collected |
| by the learning team are used for training. The GhostTrainer also maintains past policy snapshots to be used |
| as the fixed policies when the team is not learning. The GhostTrainer is 1:1 with brain_names as the other |
| trainers, and is responsible for one or more teams. Note, a GhostTrainer can have only one team in |
| asymmetric games where there is only one team with a particular behavior i.e. Hide and Seek. |
| The GhostController manages high level coordination between multiple ghost trainers. The learning team id |
| is cycled throughout a training run. |
| """ |
|
|
| def __init__( |
| self, |
| trainer, |
| brain_name, |
| controller, |
| reward_buff_cap, |
| trainer_settings, |
| training, |
| artifact_path, |
| ): |
| """ |
| Creates a GhostTrainer. |
| :param trainer: The trainer of the policy/policies being trained with self_play |
| :param brain_name: The name of the brain associated with trainer config |
| :param controller: GhostController that coordinates all ghost trainers and calculates ELO |
| :param reward_buff_cap: Max reward history to track in the reward buffer |
| :param trainer_settings: The parameters for the trainer. |
| :param training: Whether the trainer is set for training. |
| :param artifact_path: Path to store artifacts from this trainer. |
| """ |
|
|
| super().__init__( |
| brain_name, trainer_settings, training, artifact_path, reward_buff_cap |
| ) |
|
|
| self.trainer = trainer |
| self.controller = controller |
|
|
| self._internal_trajectory_queues: Dict[str, AgentManagerQueue[Trajectory]] = {} |
| self._internal_policy_queues: Dict[str, AgentManagerQueue[Policy]] = {} |
|
|
| self._team_to_name_to_policy_queue: DefaultDict[ |
| int, Dict[str, AgentManagerQueue[Policy]] |
| ] = defaultdict(dict) |
|
|
| self._name_to_parsed_behavior_id: Dict[str, BehaviorIdentifiers] = {} |
|
|
| |
| self._stats_reporter = self.trainer.stats_reporter |
| |
| self._stats_reporter.add_property(StatsPropertyType.SELF_PLAY, True) |
|
|
| self_play_parameters = trainer_settings.self_play |
| self.window = self_play_parameters.window |
| self.play_against_latest_model_ratio = ( |
| self_play_parameters.play_against_latest_model_ratio |
| ) |
| if ( |
| self.play_against_latest_model_ratio > 1.0 |
| or self.play_against_latest_model_ratio < 0.0 |
| ): |
| logger.warning( |
| "The play_against_latest_model_ratio is not between 0 and 1." |
| ) |
|
|
| self.steps_between_save = self_play_parameters.save_steps |
| self.steps_between_swap = self_play_parameters.swap_steps |
| self.steps_to_train_team = self_play_parameters.team_change |
| if self.steps_to_train_team > self.get_max_steps: |
| logger.warning( |
| "The max steps of the GhostTrainer for behavior name {} is less than team change. This team will not face \ |
| opposition that has been trained if the opposition is managed by a different GhostTrainer as in an \ |
| asymmetric game.".format( |
| self.brain_name |
| ) |
| ) |
|
|
| |
| |
| |
| |
| self.ghost_step: int = 0 |
|
|
| |
| self.policy_snapshots: List[Dict[str, List[float]]] = [] |
|
|
| |
| self.current_policy_snapshot: Dict[str, List[float]] = {} |
|
|
| self.snapshot_counter: int = 0 |
|
|
| |
| |
| |
| |
| self._learning_team: int = None |
| self.wrapped_trainer_team: int = None |
| self.last_save: int = 0 |
| self.last_swap: int = 0 |
| self.last_team_change: int = 0 |
|
|
| self.initial_elo = GlobalTrainingStatus.get_parameter_state( |
| self.brain_name, StatusType.ELO |
| ) |
| if self.initial_elo is None: |
| self.initial_elo = self_play_parameters.initial_elo |
| self.policy_elos: List[float] = [self.initial_elo] * ( |
| self.window + 1 |
| ) |
| self.current_opponent: int = 0 |
|
|
| @property |
| def get_step(self) -> int: |
| """ |
| Returns the number of steps the wrapped trainer has performed |
| :return: the step count of the wrapped trainer |
| """ |
| return self.trainer.get_step |
|
|
| @property |
| def reward_buffer(self) -> Deque[float]: |
| """ |
| Returns the reward buffer. The reward buffer contains the cumulative |
| rewards of the most recent episodes completed by agents using this |
| trainer. |
| :return: the reward buffer. |
| """ |
| return self.trainer.reward_buffer |
|
|
| @property |
| def current_elo(self) -> float: |
| """ |
| Gets ELO of current policy which is always last in the list |
| :return: ELO of current policy |
| """ |
| return self.policy_elos[-1] |
|
|
| def change_current_elo(self, change: float) -> None: |
| """ |
| Changes elo of current policy which is always last in the list |
| :param change: Amount to change current elo by |
| """ |
| self.policy_elos[-1] += change |
|
|
| def get_opponent_elo(self) -> float: |
| """ |
| Get elo of current opponent policy |
| :return: ELO of current opponent policy |
| """ |
| return self.policy_elos[self.current_opponent] |
|
|
| def change_opponent_elo(self, change: float) -> None: |
| """ |
| Changes elo of current opponent policy |
| :param change: Amount to change current opponent elo by |
| """ |
| self.policy_elos[self.current_opponent] -= change |
|
|
| def _process_trajectory(self, trajectory: Trajectory) -> None: |
| """ |
| Determines the final result of an episode and asks the GhostController |
| to calculate the ELO change. The GhostController changes the ELO |
| of the opponent policy since this may be in a different GhostTrainer |
| i.e. in asymmetric games. We assume the last reward determines the winner. |
| :param trajectory: Trajectory. |
| """ |
| if ( |
| trajectory.done_reached |
| and trajectory.all_group_dones_reached |
| and not trajectory.interrupted |
| ): |
| |
| final_reward = ( |
| trajectory.steps[-1].reward + trajectory.steps[-1].group_reward |
| ) |
| result = 0.5 |
| if final_reward > 0: |
| result = 1.0 |
| elif final_reward < 0: |
| result = 0.0 |
|
|
| change = self.controller.compute_elo_rating_changes( |
| self.current_elo, result |
| ) |
| self.change_current_elo(change) |
| self._stats_reporter.add_stat("Self-play/ELO", self.current_elo) |
|
|
| def advance(self) -> None: |
| """ |
| Steps the trainer, passing trajectories to wrapped trainer and calling trainer advance |
| """ |
| for trajectory_queue in self.trajectory_queues: |
| parsed_behavior_id = self._name_to_parsed_behavior_id[ |
| trajectory_queue.behavior_id |
| ] |
| if parsed_behavior_id.team_id == self._learning_team: |
| |
| internal_trajectory_queue = self._internal_trajectory_queues[ |
| parsed_behavior_id.brain_name |
| ] |
| try: |
| |
| |
| |
| for _ in range(trajectory_queue.qsize()): |
| t = trajectory_queue.get_nowait() |
| |
| internal_trajectory_queue.put(t) |
| self._process_trajectory(t) |
| except AgentManagerQueue.Empty: |
| pass |
| else: |
| |
| try: |
| for _ in range(trajectory_queue.qsize()): |
| t = trajectory_queue.get_nowait() |
| |
| self.ghost_step += len(t.steps) |
| except AgentManagerQueue.Empty: |
| pass |
|
|
| self._next_summary_step = self.trainer._next_summary_step |
| self.trainer.advance() |
| if self.get_step - self.last_team_change > self.steps_to_train_team: |
| self.controller.change_training_team(self.get_step) |
| self.last_team_change = self.get_step |
|
|
| next_learning_team = self.controller.get_learning_team |
|
|
| |
| |
| for brain_name in self._internal_policy_queues: |
| internal_policy_queue = self._internal_policy_queues[brain_name] |
| try: |
| policy = internal_policy_queue.get_nowait() |
| self.current_policy_snapshot[brain_name] = policy.get_weights() |
| except AgentManagerQueue.Empty: |
| continue |
| if ( |
| self._learning_team == next_learning_team |
| and next_learning_team in self._team_to_name_to_policy_queue |
| ): |
| name_to_policy_queue = self._team_to_name_to_policy_queue[ |
| next_learning_team |
| ] |
| if brain_name in name_to_policy_queue: |
| behavior_id = create_name_behavior_id( |
| brain_name, next_learning_team |
| ) |
| policy = self.get_policy(behavior_id) |
| policy.load_weights(self.current_policy_snapshot[brain_name]) |
| name_to_policy_queue[brain_name].put(policy) |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| if ( |
| self._learning_team != next_learning_team |
| and next_learning_team in self._team_to_name_to_policy_queue |
| ): |
| name_to_policy_queue = self._team_to_name_to_policy_queue[ |
| next_learning_team |
| ] |
| for brain_name in name_to_policy_queue: |
| behavior_id = create_name_behavior_id(brain_name, next_learning_team) |
| policy = self.get_policy(behavior_id) |
| policy.load_weights(self.current_policy_snapshot[brain_name]) |
| name_to_policy_queue[brain_name].put(policy) |
|
|
| |
| |
| if self.get_step - self.last_save > self.steps_between_save: |
| self._save_snapshot() |
| self.last_save = self.get_step |
|
|
| if ( |
| self._learning_team != next_learning_team |
| or self.ghost_step - self.last_swap > self.steps_between_swap |
| ): |
| self._learning_team = next_learning_team |
| self._swap_snapshots() |
| self.last_swap = self.ghost_step |
|
|
| def end_episode(self): |
| """ |
| Forwarding call to wrapped trainers end_episode |
| """ |
| self.trainer.end_episode() |
|
|
| def save_model(self) -> None: |
| """ |
| Forwarding call to wrapped trainers save_model. |
| """ |
| GlobalTrainingStatus.set_parameter_state( |
| self.brain_name, StatusType.ELO, self.current_elo |
| ) |
| self.trainer.save_model() |
|
|
| def create_policy( |
| self, |
| parsed_behavior_id: BehaviorIdentifiers, |
| behavior_spec: BehaviorSpec, |
| ) -> Policy: |
| """ |
| Creates policy with the wrapped trainer's create_policy function |
| The first policy encountered sets the wrapped |
| trainer team. This is to ensure that all agents from the same multi-agent |
| team are grouped. All policies associated with this team are added to the |
| wrapped trainer to be trained. |
| """ |
| policy = self.trainer.create_policy(parsed_behavior_id, behavior_spec) |
| team_id = parsed_behavior_id.team_id |
| self.controller.subscribe_team_id(team_id, self) |
|
|
| |
| if self.wrapped_trainer_team is None or team_id == self.wrapped_trainer_team: |
| internal_trainer_policy = self.trainer.create_policy( |
| parsed_behavior_id, behavior_spec |
| ) |
| self.trainer.add_policy(parsed_behavior_id, internal_trainer_policy) |
| self.current_policy_snapshot[ |
| parsed_behavior_id.brain_name |
| ] = internal_trainer_policy.get_weights() |
|
|
| policy.load_weights(internal_trainer_policy.get_weights()) |
| self._save_snapshot() |
| self._learning_team = self.controller.get_learning_team |
| self.wrapped_trainer_team = team_id |
| else: |
| |
| policy.load_weights( |
| self.trainer.get_policy(parsed_behavior_id).get_weights() |
| ) |
| return policy |
|
|
| def create_optimizer(self) -> TorchOptimizer: |
| pass |
|
|
| def add_policy( |
| self, parsed_behavior_id: BehaviorIdentifiers, policy: Policy |
| ) -> None: |
| """ |
| Adds policy to GhostTrainer. |
| :param parsed_behavior_id: Behavior ID that the policy should belong to. |
| :param policy: Policy to associate with name_behavior_id. |
| """ |
| name_behavior_id = parsed_behavior_id.behavior_id |
| self._name_to_parsed_behavior_id[name_behavior_id] = parsed_behavior_id |
| self.policies[name_behavior_id] = policy |
|
|
| def _save_snapshot(self) -> None: |
| """ |
| Saves a snapshot of the current weights of the policy and maintains the policy_snapshots |
| according to the window size |
| """ |
| for brain_name in self.current_policy_snapshot: |
| current_snapshot_for_brain_name = self.current_policy_snapshot[brain_name] |
|
|
| try: |
| self.policy_snapshots[self.snapshot_counter][ |
| brain_name |
| ] = current_snapshot_for_brain_name |
| except IndexError: |
| self.policy_snapshots.append( |
| {brain_name: current_snapshot_for_brain_name} |
| ) |
| self.policy_elos[self.snapshot_counter] = self.current_elo |
| self.snapshot_counter = (self.snapshot_counter + 1) % self.window |
|
|
| def _swap_snapshots(self) -> None: |
| """ |
| Swaps the appropriate weight to the policy and pushes it to respective policy queues |
| """ |
|
|
| for team_id in self._team_to_name_to_policy_queue: |
| if team_id == self._learning_team: |
| continue |
| elif np.random.uniform() < (1 - self.play_against_latest_model_ratio): |
| x = np.random.randint(len(self.policy_snapshots)) |
| snapshot = self.policy_snapshots[x] |
| else: |
| snapshot = self.current_policy_snapshot |
| x = "current" |
|
|
| self.current_opponent = -1 if x == "current" else x |
| name_to_policy_queue = self._team_to_name_to_policy_queue[team_id] |
| for brain_name in self._team_to_name_to_policy_queue[team_id]: |
| behavior_id = create_name_behavior_id(brain_name, team_id) |
| policy = self.get_policy(behavior_id) |
| policy.load_weights(snapshot[brain_name]) |
| name_to_policy_queue[brain_name].put(policy) |
| logger.debug( |
| "Step {}: Swapping snapshot {} to id {} with team {} learning".format( |
| self.ghost_step, x, behavior_id, self._learning_team |
| ) |
| ) |
|
|
| def publish_policy_queue(self, policy_queue: AgentManagerQueue[Policy]) -> None: |
| """ |
| Adds a policy queue for every member of the team to the list of queues to publish to when this Trainer |
| makes a policy update. Creates an internal policy queue for the wrapped |
| trainer to push to. The GhostTrainer pushes all policies to the env. |
| :param queue: Policy queue to publish to. |
| """ |
| super().publish_policy_queue(policy_queue) |
| parsed_behavior_id = self._name_to_parsed_behavior_id[policy_queue.behavior_id] |
| self._team_to_name_to_policy_queue[parsed_behavior_id.team_id][ |
| parsed_behavior_id.brain_name |
| ] = policy_queue |
| if parsed_behavior_id.team_id == self.wrapped_trainer_team: |
| |
| internal_policy_queue: AgentManagerQueue[Policy] = AgentManagerQueue( |
| parsed_behavior_id.brain_name |
| ) |
|
|
| self._internal_policy_queues[ |
| parsed_behavior_id.brain_name |
| ] = internal_policy_queue |
| self.trainer.publish_policy_queue(internal_policy_queue) |
|
|
| def subscribe_trajectory_queue( |
| self, trajectory_queue: AgentManagerQueue[Trajectory] |
| ) -> None: |
| """ |
| Adds a trajectory queue for every member of the team to the list of queues for the trainer |
| to ingest Trajectories from. Creates an internal trajectory queue to push trajectories from |
| the learning team. The wrapped trainer subscribes to this queue. |
| :param queue: Trajectory queue to publish to. |
| """ |
| super().subscribe_trajectory_queue(trajectory_queue) |
| parsed_behavior_id = self._name_to_parsed_behavior_id[ |
| trajectory_queue.behavior_id |
| ] |
| if parsed_behavior_id.team_id == self.wrapped_trainer_team: |
| |
| internal_trajectory_queue: AgentManagerQueue[ |
| Trajectory |
| ] = AgentManagerQueue(parsed_behavior_id.brain_name) |
|
|
| self._internal_trajectory_queues[ |
| parsed_behavior_id.brain_name |
| ] = internal_trajectory_queue |
| self.trainer.subscribe_trajectory_queue(internal_trajectory_queue) |
|
|