| from typing import Any, Dict, List, Optional, Type |
|
|
| import gym |
| import torch as th |
| from torch import nn |
|
|
| from stable_baselines3.common.policies import BasePolicy, register_policy |
| from stable_baselines3.common.torch_layers import BaseFeaturesExtractor, FlattenExtractor, NatureCNN, create_mlp |
| from stable_baselines3.common.type_aliases import Schedule |
|
|
|
|
| class QNetwork(BasePolicy): |
| """ |
| Action-Value (Q-Value) network for DQN |
| |
| :param observation_space: Observation space |
| :param action_space: Action space |
| :param net_arch: The specification of the policy and value networks. |
| :param activation_fn: Activation function |
| :param normalize_images: Whether to normalize images or not, |
| dividing by 255.0 (True by default) |
| """ |
|
|
| def __init__( |
| self, |
| observation_space: gym.spaces.Space, |
| action_space: gym.spaces.Space, |
| features_extractor: nn.Module, |
| features_dim: int, |
| net_arch: Optional[List[int]] = None, |
| activation_fn: Type[nn.Module] = nn.ReLU, |
| normalize_images: bool = True, |
| ): |
| super(QNetwork, self).__init__( |
| observation_space, |
| action_space, |
| features_extractor=features_extractor, |
| normalize_images=normalize_images, |
| ) |
|
|
| if net_arch is None: |
| net_arch = [64, 64] |
|
|
| self.net_arch = net_arch |
| self.activation_fn = activation_fn |
| self.features_extractor = features_extractor |
| self.features_dim = features_dim |
| self.normalize_images = normalize_images |
| action_dim = self.action_space.n |
| q_net = create_mlp(self.features_dim, action_dim, self.net_arch, self.activation_fn) |
| self.q_net = nn.Sequential(*q_net) |
|
|
| def forward(self, obs: th.Tensor) -> th.Tensor: |
| """ |
| Predict the q-values. |
| |
| :param obs: Observation |
| :return: The estimated Q-Value for each action. |
| """ |
| return self.q_net(self.extract_features(obs)) |
|
|
| def _predict(self, observation: th.Tensor, deterministic: bool = True) -> th.Tensor: |
| q_values = self.forward(observation) |
| |
| action = q_values.argmax(dim=1).reshape(-1) |
| return action |
|
|
| def _get_constructor_parameters(self) -> Dict[str, Any]: |
| data = super()._get_constructor_parameters() |
|
|
| data.update( |
| dict( |
| net_arch=self.net_arch, |
| features_dim=self.features_dim, |
| activation_fn=self.activation_fn, |
| features_extractor=self.features_extractor, |
| ) |
| ) |
| return data |
|
|
|
|
| class DQNPolicy(BasePolicy): |
| """ |
| Policy class with Q-Value Net and target net for DQN |
| |
| :param observation_space: Observation space |
| :param action_space: Action space |
| :param lr_schedule: Learning rate schedule (could be constant) |
| :param net_arch: The specification of the policy and value networks. |
| :param activation_fn: Activation function |
| :param features_extractor_class: Features extractor to use. |
| :param features_extractor_kwargs: Keyword arguments |
| to pass to the features extractor. |
| :param normalize_images: Whether to normalize images or not, |
| dividing by 255.0 (True by default) |
| :param optimizer_class: The optimizer to use, |
| ``th.optim.Adam`` by default |
| :param optimizer_kwargs: Additional keyword arguments, |
| excluding the learning rate, to pass to the optimizer |
| """ |
|
|
| def __init__( |
| self, |
| observation_space: gym.spaces.Space, |
| action_space: gym.spaces.Space, |
| lr_schedule: Schedule, |
| net_arch: Optional[List[int]] = None, |
| activation_fn: Type[nn.Module] = nn.ReLU, |
| features_extractor_class: Type[BaseFeaturesExtractor] = FlattenExtractor, |
| features_extractor_kwargs: Optional[Dict[str, Any]] = None, |
| normalize_images: bool = True, |
| optimizer_class: Type[th.optim.Optimizer] = th.optim.Adam, |
| optimizer_kwargs: Optional[Dict[str, Any]] = None, |
| ): |
| super(DQNPolicy, self).__init__( |
| observation_space, |
| action_space, |
| features_extractor_class, |
| features_extractor_kwargs, |
| optimizer_class=optimizer_class, |
| optimizer_kwargs=optimizer_kwargs, |
| ) |
|
|
| if net_arch is None: |
| if features_extractor_class == FlattenExtractor: |
| net_arch = [64, 64] |
| else: |
| net_arch = [] |
|
|
| self.net_arch = net_arch |
| self.activation_fn = activation_fn |
| self.normalize_images = normalize_images |
|
|
| self.net_args = { |
| "observation_space": self.observation_space, |
| "action_space": self.action_space, |
| "net_arch": self.net_arch, |
| "activation_fn": self.activation_fn, |
| "normalize_images": normalize_images, |
| } |
|
|
| self.q_net, self.q_net_target = None, None |
| self._build(lr_schedule) |
|
|
| def _build(self, lr_schedule: Schedule) -> None: |
| """ |
| Create the network and the optimizer. |
| |
| :param lr_schedule: Learning rate schedule |
| lr_schedule(1) is the initial learning rate |
| """ |
|
|
| self.q_net = self.make_q_net() |
| self.q_net_target = self.make_q_net() |
| self.q_net_target.load_state_dict(self.q_net.state_dict()) |
|
|
| |
| self.optimizer = self.optimizer_class(self.parameters(), lr=lr_schedule(1), **self.optimizer_kwargs) |
|
|
| def make_q_net(self) -> QNetwork: |
| |
| net_args = self._update_features_extractor(self.net_args, features_extractor=None) |
| return QNetwork(**net_args).to(self.device) |
|
|
| def forward(self, obs: th.Tensor, deterministic: bool = True) -> th.Tensor: |
| return self._predict(obs, deterministic=deterministic) |
|
|
| def _predict(self, obs: th.Tensor, deterministic: bool = True) -> th.Tensor: |
| return self.q_net._predict(obs, deterministic=deterministic) |
|
|
| def _get_constructor_parameters(self) -> Dict[str, Any]: |
| data = super()._get_constructor_parameters() |
|
|
| data.update( |
| dict( |
| net_arch=self.net_args["net_arch"], |
| activation_fn=self.net_args["activation_fn"], |
| lr_schedule=self._dummy_schedule, |
| optimizer_class=self.optimizer_class, |
| optimizer_kwargs=self.optimizer_kwargs, |
| features_extractor_class=self.features_extractor_class, |
| features_extractor_kwargs=self.features_extractor_kwargs, |
| ) |
| ) |
| return data |
|
|
|
|
| MlpPolicy = DQNPolicy |
|
|
|
|
| class CnnPolicy(DQNPolicy): |
| """ |
| Policy class for DQN when using images as input. |
| |
| :param observation_space: Observation space |
| :param action_space: Action space |
| :param lr_schedule: Learning rate schedule (could be constant) |
| :param net_arch: The specification of the policy and value networks. |
| :param activation_fn: Activation function |
| :param features_extractor_class: Features extractor to use. |
| :param normalize_images: Whether to normalize images or not, |
| dividing by 255.0 (True by default) |
| :param optimizer_class: The optimizer to use, |
| ``th.optim.Adam`` by default |
| :param optimizer_kwargs: Additional keyword arguments, |
| excluding the learning rate, to pass to the optimizer |
| """ |
|
|
| def __init__( |
| self, |
| observation_space: gym.spaces.Space, |
| action_space: gym.spaces.Space, |
| lr_schedule: Schedule, |
| net_arch: Optional[List[int]] = None, |
| activation_fn: Type[nn.Module] = nn.ReLU, |
| features_extractor_class: Type[BaseFeaturesExtractor] = NatureCNN, |
| features_extractor_kwargs: Optional[Dict[str, Any]] = None, |
| normalize_images: bool = True, |
| optimizer_class: Type[th.optim.Optimizer] = th.optim.Adam, |
| optimizer_kwargs: Optional[Dict[str, Any]] = None, |
| ): |
| super(CnnPolicy, self).__init__( |
| observation_space, |
| action_space, |
| lr_schedule, |
| net_arch, |
| activation_fn, |
| features_extractor_class, |
| features_extractor_kwargs, |
| normalize_images, |
| optimizer_class, |
| optimizer_kwargs, |
| ) |
|
|
|
|
| register_policy("MlpPolicy", MlpPolicy) |
| register_policy("CnnPolicy", CnnPolicy) |
|
|