| import gymnasium as gym |
| import numpy as np |
| import tensorflow as tf |
| import config |
| from reward import shape_reward_vectorized |
|
|
| class PPOTrainer: |
| def __init__(self, agent): |
| self.agent = agent |
| |
| self.envs = gym.vector.AsyncVectorEnv([ |
| lambda: gym.make(config.ENV_NAME) for _ in range(config.NUM_ENVS) |
| ]) |
| self.states, _ = self.envs.reset() |
|
|
| def collect_rollouts(self): |
| """Collects trajectories across all parallel environments.""" |
| mb_states = np.zeros((config.ROLLOUT_STEPS, config.NUM_ENVS, 2), dtype=np.float32) |
| mb_actions = np.zeros((config.ROLLOUT_STEPS, config.NUM_ENVS, 1), dtype=np.float32) |
| mb_log_probs = np.zeros((config.ROLLOUT_STEPS, config.NUM_ENVS, 1), dtype=np.float32) |
| mb_rewards = np.zeros((config.ROLLOUT_STEPS, config.NUM_ENVS), dtype=np.float32) |
| mb_values = np.zeros((config.ROLLOUT_STEPS, config.NUM_ENVS), dtype=np.float32) |
| mb_masks = np.zeros((config.ROLLOUT_STEPS, config.NUM_ENVS), dtype=np.float32) |
| |
| ep_raw_rewards = np.zeros(config.NUM_ENVS) |
| finished_ep_scores = [] |
|
|
| for step in range(config.ROLLOUT_STEPS): |
| mb_states[step] = self.states |
| |
| |
| actions, log_probs, values = self.agent.get_vector_actions(self.states) |
| |
| mb_actions[step] = actions |
| mb_log_probs[step] = log_probs |
| mb_values[step] = values |
| |
| next_states, rewards_raw, terminated, truncated, _ = self.envs.step(actions) |
| dones = terminated | truncated |
| |
| mb_rewards[step] = shape_reward_vectorized(next_states, rewards_raw) |
| mb_masks[step] = 1.0 - dones.astype(np.float32) |
| |
| ep_raw_rewards += rewards_raw |
| for idx, done in enumerate(dones): |
| if done: |
| finished_ep_scores.append(ep_raw_rewards[idx]) |
| ep_raw_rewards[idx] = 0.0 |
| |
| self.states = next_states |
|
|
| |
| mb_returns = np.zeros_like(mb_rewards) |
| mb_advantages = np.zeros_like(mb_rewards) |
| running_return = np.zeros(config.NUM_ENVS) |
| previous_value = np.zeros(config.NUM_ENVS) |
|
|
| for t in reversed(range(config.ROLLOUT_STEPS)): |
| running_return = mb_rewards[t] + config.GAMMA * running_return * mb_masks[t] |
| mb_returns[t] = running_return |
| |
| td_error = mb_rewards[t] + config.GAMMA * previous_value * mb_masks[t] - mb_values[t] |
| mb_advantages[t] = td_error |
| previous_value = mb_values[t] |
|
|
| |
| return ( |
| mb_states.reshape(-1, 2), |
| mb_actions.reshape(-1, 1), |
| mb_log_probs.reshape(-1, 1), |
| mb_returns.flatten(), |
| mb_advantages.flatten(), |
| finished_ep_scores |
| ) |
|
|
| def train_epoch(self, states, actions, log_probs, returns, advantages): |
|
|
| advantages = (advantages - np.mean(advantages)) / (np.std(advantages) + 1e-8) |
| |
| dataset = tf.data.Dataset.from_tensor_slices((states, actions, log_probs, returns, advantages)) |
| dataset = dataset.shuffle(buffer_size=len(states)).batch(config.BATCH_SIZE) |
| |
| total_al, total_cl = 0, 0 |
| steps = 0 |
| for _ in range(config.TRAIN_EPOCHS): |
| for batch in dataset: |
| b_states, b_actions, b_log_probs, b_returns, b_advantages = batch |
| al, cl = self.agent.train_step(b_states, b_actions, b_log_probs, b_returns, b_advantages) |
| total_al += al.numpy() |
| total_cl += cl.numpy() |
| steps += 1 |
| |
| return total_al / steps, total_cl / steps |
|
|
| def close(self): |
| self.envs.close() |