| import torch | |
| import numpy as np | |
| import copy | |
| class ReplayMemory: | |
| def __init__(self, state_dim, action_dim, device='cuda', capacity=5e6): | |
| self.capacity = int(capacity) | |
| self.size = 0 | |
| self.position = 0 | |
| self.state_buffer = np.empty(shape=(self.capacity, state_dim), dtype=np.float32) | |
| self.action_buffer = np.empty(shape=(self.capacity, action_dim), dtype=np.float32) | |
| self.reward_buffer = np.empty(shape=(self.capacity, 1), dtype=np.float32) | |
| self.next_state_buffer = np.empty(shape=(self.capacity, state_dim), dtype=np.float32) | |
| self.done_buffer = np.empty(shape=(self.capacity, 1), dtype=np.float32) | |
| def normalize_states(self, eps=1e-3): | |
| mean = np.mean(copy.deepcopy(self.state_buffer).astype('float64'), axis=0) | |
| std = np.std(copy.deepcopy(self.state_buffer).astype('float64'), axis=0) + eps | |
| self.state_buffer = (self.state_buffer.astype('float64') - mean) / std | |
| self.next_state_buffer = (self.next_state_buffer.astype('float64') - mean) / std | |
| self.state_buffer = self.state_buffer.astype('float32') | |
| self.next_state_buffer = self.next_state_buffer.astype('float32') | |
| return | |
| def sample(self, batch_size): | |
| idxs = np.random.randint(0, self.size, size=batch_size) | |
| states = torch.FloatTensor(self.state_buffer[idxs]).to('cuda') | |
| actions = torch.FloatTensor(self.action_buffer[idxs]).to('cuda') | |
| rewards = torch.FloatTensor(self.reward_buffer[idxs]).to('cuda') | |
| next_states = torch.FloatTensor(self.next_state_buffer[idxs]).to('cuda') | |
| dones = torch.FloatTensor(self.done_buffer[idxs]).to('cuda') | |
| return states, actions, rewards, next_states, dones | |