import gymnasium as gym import numpy as np import tensorflow as tf import config from reward import shape_reward_vectorized class PPOTrainer: def __init__(self, agent): self.agent = agent # Create 15 asynchronous environments running in parallel processes self.envs = gym.vector.AsyncVectorEnv([ lambda: gym.make(config.ENV_NAME) for _ in range(config.NUM_ENVS) ]) self.states, _ = self.envs.reset() def collect_rollouts(self): """Collects trajectories across all parallel environments.""" mb_states = np.zeros((config.ROLLOUT_STEPS, config.NUM_ENVS, 2), dtype=np.float32) mb_actions = np.zeros((config.ROLLOUT_STEPS, config.NUM_ENVS, 1), dtype=np.float32) mb_log_probs = np.zeros((config.ROLLOUT_STEPS, config.NUM_ENVS, 1), dtype=np.float32) mb_rewards = np.zeros((config.ROLLOUT_STEPS, config.NUM_ENVS), dtype=np.float32) mb_values = np.zeros((config.ROLLOUT_STEPS, config.NUM_ENVS), dtype=np.float32) mb_masks = np.zeros((config.ROLLOUT_STEPS, config.NUM_ENVS), dtype=np.float32) ep_raw_rewards = np.zeros(config.NUM_ENVS) finished_ep_scores = [] for step in range(config.ROLLOUT_STEPS): mb_states[step] = self.states # Request continuous actions for all environments at once actions, log_probs, values = self.agent.get_vector_actions(self.states) mb_actions[step] = actions mb_log_probs[step] = log_probs mb_values[step] = values next_states, rewards_raw, terminated, truncated, _ = self.envs.step(actions) dones = terminated | truncated mb_rewards[step] = shape_reward_vectorized(next_states, rewards_raw) mb_masks[step] = 1.0 - dones.astype(np.float32) ep_raw_rewards += rewards_raw for idx, done in enumerate(dones): if done: finished_ep_scores.append(ep_raw_rewards[idx]) ep_raw_rewards[idx] = 0.0 # Reset tracked internal count self.states = next_states # Standard General Advantage Tracking for parallel timelines mb_returns = np.zeros_like(mb_rewards) mb_advantages = np.zeros_like(mb_rewards) running_return = np.zeros(config.NUM_ENVS) previous_value = np.zeros(config.NUM_ENVS) for t in reversed(range(config.ROLLOUT_STEPS)): running_return = mb_rewards[t] + config.GAMMA * running_return * mb_masks[t] mb_returns[t] = running_return td_error = mb_rewards[t] + config.GAMMA * previous_value * mb_masks[t] - mb_values[t] mb_advantages[t] = td_error previous_value = mb_values[t] # Flatten environment data dimensions for matrix training compatibility return ( mb_states.reshape(-1, 2), mb_actions.reshape(-1, 1), mb_log_probs.reshape(-1, 1), mb_returns.flatten(), mb_advantages.flatten(), finished_ep_scores ) def train_epoch(self, states, actions, log_probs, returns, advantages): advantages = (advantages - np.mean(advantages)) / (np.std(advantages) + 1e-8) dataset = tf.data.Dataset.from_tensor_slices((states, actions, log_probs, returns, advantages)) dataset = dataset.shuffle(buffer_size=len(states)).batch(config.BATCH_SIZE) total_al, total_cl = 0, 0 steps = 0 for _ in range(config.TRAIN_EPOCHS): for batch in dataset: b_states, b_actions, b_log_probs, b_returns, b_advantages = batch al, cl = self.agent.train_step(b_states, b_actions, b_log_probs, b_returns, b_advantages) total_al += al.numpy() total_cl += cl.numpy() steps += 1 return total_al / steps, total_cl / steps def close(self): self.envs.close()