Spaces:
Running
Running
| import torch | |
| import numpy as np | |
| import pytest | |
| from itertools import product | |
| from ding.model.template import ContinuousQAC, DiscreteMAQAC, DiscreteQAC | |
| from ding.torch_utils import is_differentiable | |
| from ding.utils import squeeze | |
| B = 4 | |
| T = 6 | |
| embedding_size = 32 | |
| action_shape_args = [(6, ), [ | |
| 1, | |
| ]] | |
| args = list(product(*[action_shape_args, [True, False], ['regression', 'reparameterization']])) | |
| class TestContinuousQAC: | |
| def test_fcqac(self, action_shape, twin, action_space): | |
| N = 32 | |
| inputs = {'obs': torch.randn(B, N), 'action': torch.randn(B, squeeze(action_shape))} | |
| model = ContinuousQAC( | |
| obs_shape=(N, ), | |
| action_shape=action_shape, | |
| action_space=action_space, | |
| critic_head_hidden_size=embedding_size, | |
| actor_head_hidden_size=embedding_size, | |
| twin_critic=twin, | |
| ) | |
| # compute_q | |
| q = model(inputs, mode='compute_critic')['q_value'] | |
| if twin: | |
| is_differentiable(q[0].sum(), model.critic[1][0]) | |
| is_differentiable(q[1].sum(), model.critic[1][1]) | |
| else: | |
| is_differentiable(q.sum(), model.critic) | |
| # compute_action | |
| print(model) | |
| if action_space == 'regression': | |
| action = model(inputs['obs'], mode='compute_actor')['action'] | |
| if squeeze(action_shape) == 1: | |
| assert action.shape == (B, ) | |
| else: | |
| assert action.shape == (B, squeeze(action_shape)) | |
| assert action.eq(action.clamp(-1, 1)).all() | |
| is_differentiable(action.sum(), model.actor) | |
| elif action_space == 'reparameterization': | |
| (mu, sigma) = model(inputs['obs'], mode='compute_actor')['logit'] | |
| assert mu.shape == (B, *action_shape) | |
| assert sigma.shape == (B, *action_shape) | |
| is_differentiable(mu.sum() + sigma.sum(), model.actor) | |
| args = list(product(*[[True, False], [(13, ), [4, 84, 84]]])) | |
| class TestDiscreteQAC: | |
| def test_discreteqac(self, twin, obs_shape): | |
| action_shape = 6 | |
| inputs = torch.randn(B, *obs_shape) | |
| model = DiscreteQAC( | |
| obs_shape=obs_shape, | |
| action_shape=action_shape, | |
| twin_critic=twin, | |
| encoder_hidden_size_list=[32, 32, 64] if len(obs_shape) > 1 else None, | |
| ) | |
| # compute_critic | |
| q = model(inputs, mode='compute_critic')['q_value'] | |
| if twin: | |
| is_differentiable(q[0].sum(), model.critic[1][0]) | |
| # is_differentiable(q[1].sum(), model.critic[1][1]) # backward encoder twice | |
| assert q[0].shape == (B, action_shape) | |
| assert q[1].shape == (B, action_shape) | |
| else: | |
| is_differentiable(q.sum(), model.critic[1]) | |
| assert q.shape == (B, action_shape) | |
| # compute_actor | |
| print(model) | |
| logit = model(inputs, mode='compute_actor')['logit'] | |
| assert logit.shape == (B, action_shape) | |
| is_differentiable(logit.sum(), model.actor) | |
| B = 4 | |
| embedding_size = 64 | |
| action_shape_args = [(6, ), 1] | |
| args = list(product(*[action_shape_args, [True, False], [True, False]])) | |
| class TestContinuousQACPixel: | |
| def test_qacpixel(self, action_shape, twin, share_encoder): | |
| inputs = {'obs': torch.randn(B, 3, 84, 84), 'action': torch.randn(B, squeeze(action_shape))} | |
| model = ContinuousQAC( | |
| obs_shape=(3, 84, 84), | |
| action_shape=action_shape, | |
| action_space='reparameterization', | |
| critic_head_hidden_size=embedding_size, | |
| actor_head_hidden_size=embedding_size, | |
| twin_critic=twin, | |
| share_encoder=share_encoder, | |
| encoder_hidden_size_list=[32, 32, 64], | |
| ) | |
| # compute_q | |
| q = model(inputs, mode='compute_critic')['q_value'] | |
| if twin: | |
| q = torch.min(q[0], q[1]) | |
| is_differentiable(q.sum(), model.critic) | |
| # compute_action | |
| print(model) | |
| (mu, sigma) = model(inputs['obs'], mode='compute_actor')['logit'] | |
| action_shape = squeeze(action_shape) | |
| assert mu.shape == (B, action_shape) | |
| assert sigma.shape == (B, action_shape) | |
| if share_encoder: # if share_encoder, actor_encoder's grad is not None | |
| is_differentiable(mu.sum() + sigma.sum(), model.actor_head) | |
| else: | |
| is_differentiable(mu.sum() + sigma.sum(), model.actor) | |