# ============================================================ # Unit 4: Policy Gradient (REINFORCE) for Pixelcopter-PLE-v0 # Deep Reinforcement Learning Course - Hugging Face # 支持继续训练版本 # ============================================================ import numpy as np import matplotlib.pyplot as plt from collections import deque import gymnasium as gym from gymnasium import spaces import torch import torch.nn as nn import torch.nn.functional as F import torch.optim as optim import pickle import os from datetime import datetime # ===== 修正的PLE环境导入和Wrapper ===== from ple.games.pixelcopter import Pixelcopter from ple import PLE class PLEWrapper(gym.Env): def __init__(self): super().__init__() self.game = Pixelcopter() # 只使用fps参数,移除display参数 self.env = PLE(self.game, fps=30) self.env.init() # 定义观察和动作空间 state_dim = len(self.env.getGameState()) self.observation_space = spaces.Box(low=-np.inf, high=np.inf, shape=(state_dim,), dtype=np.float32) self.action_space = spaces.Discrete(len(self.env.getActionSet())) self.actions = self.env.getActionSet() def reset(self, seed=None): self.env.reset_game() state = np.array(list(self.env.getGameState().values()), dtype=np.float32) return state, {} def step(self, action): reward = self.env.act(self.actions[action]) state = np.array(list(self.env.getGameState().values()), dtype=np.float32) terminated = self.env.game_over() return state, reward, terminated, False, {} # ============================================================ # 设备配置 # ============================================================ device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") print(f"Using device: {device}") # ============================================================ # 环境配置 # ============================================================ env = PLEWrapper() eval_env = PLEWrapper() s_size = env.observation_space.shape[0] a_size = env.action_space.n print(f"Environment: Pixelcopter-PLE") print(f"Observation Space: {s_size}, Action Space: {a_size}") # ============================================================ # 策略网络定义 # ============================================================ class Policy(nn.Module): """ 策略网络:输入状态,输出动作概率分布 """ def __init__(self, s_size, a_size, h_size=128): """ 初始化策略网络 Args: s_size: 状态空间维度 a_size: 动作空间维度 h_size: 隐藏层大小 """ super(Policy, self).__init__() self.fc1 = nn.Linear(s_size, h_size) self.fc2 = nn.Linear(h_size, h_size * 2) self.fc3 = nn.Linear(h_size * 2, a_size) # 添加dropout提高泛化能力 self.dropout = nn.Dropout(0.1) def forward(self, x): """ 前向传播 Args: x: 输入状态 Returns: 动作概率分布 """ x = F.relu(self.fc1(x)) x = self.dropout(x) x = F.relu(self.fc2(x)) x = self.dropout(x) x = self.fc3(x) return F.softmax(x, dim=1) def act(self, state): """ 根据当前策略选择动作 Args: state: 当前状态 Returns: action: 选择的动作 log_prob: 该动作的对数概率(用于梯度计算) """ # 转换状态为tensor并移到正确设备 state = torch.from_numpy(state).float().unsqueeze(0).to(device) # 获取动作概率分布(保持在同一设备上) probs = self.forward(state) # 创建分类分布 m = torch.distributions.Categorical(probs) # 采样动作(基于概率分布,不是贪心选择) action = m.sample() # 返回动作值和对数概率 return action.item(), m.log_prob(action) # ============================================================ # 学习率调度器 # ============================================================ class LearningRateScheduler: def __init__(self, optimizer, initial_lr, decay_rate=0.95, decay_episodes=5000): self.optimizer = optimizer self.initial_lr = initial_lr self.decay_rate = decay_rate self.decay_episodes = decay_episodes def step(self, episode): if episode > 0 and episode % self.decay_episodes == 0: new_lr = self.initial_lr * (self.decay_rate ** (episode // self.decay_episodes)) for param_group in self.optimizer.param_groups: param_group['lr'] = new_lr print(f"📉 Learning rate decayed to: {new_lr:.2e}") # ============================================================ # 改进的REINFORCE算法实现 # ============================================================ def reinforce_continued(policy, optimizer, n_training_episodes, max_t, gamma, print_every, previous_scores=[], model_path=None, lr_scheduler=None): """ 支持继续训练的REINFORCE算法 Args: policy: 策略网络 optimizer: 优化器 n_training_episodes: 新增训练轮数 max_t: 每轮最大步数 gamma: 折扣因子 print_every: 打印间隔 previous_scores: 之前的训练分数 model_path: 模型保存路径 lr_scheduler: 学习率调度器 Returns: scores: 所有得分列表(包括之前的) """ scores_deque = deque(maxlen=100) # 保存最近100轮得分 scores = previous_scores.copy() # 保留之前的训练历史 # 如果有之前的分数,用最近的分数初始化deque if previous_scores: recent_scores = previous_scores[-100:] if len(previous_scores) >= 100 else previous_scores scores_deque.extend(recent_scores) print(f"📈 Resuming with recent average score: {np.mean(scores_deque):.2f}") print(f"📊 Previous best score: {max(previous_scores):.2f}") start_episode = len(previous_scores) + 1 best_avg_score = max([np.mean(previous_scores[max(0, i-99):i+1]) for i in range(len(previous_scores))]) if previous_scores else -float('inf') print(f"🚀 Starting continued training from episode {start_episode}") print(f"🎯 Target: Beat previous best average score of {best_avg_score:.2f}") print() for i_episode in range(start_episode, start_episode + n_training_episodes): saved_log_probs = [] # 保存每步的log概率 rewards = [] # 保存每步的奖励 state, _ = env.reset() # --- 1. 收集一条完整轨迹 --- for t in range(max_t): # 根据当前策略选择动作 action, log_prob = policy.act(state) saved_log_probs.append(log_prob) # 执行动作,获取下一状态和奖励 state, reward, terminated, truncated, _ = env.step(action) rewards.append(reward) # 检查是否结束 if terminated or truncated: break # 记录本轮总得分 episode_score = sum(rewards) scores_deque.append(episode_score) scores.append(episode_score) # --- 2. 计算折扣回报 (Discounted Returns) --- returns = deque(maxlen=max_t) n_steps = len(rewards) # 从后往前计算累计折扣回报:G_t = r_t + γ*r_{t+1} + γ²*r_{t+2} + ... G = 0 for r in reversed(rewards): G = r + gamma * G returns.appendleft(G) # 标准化回报(重要的工程技巧,提高训练稳定性) returns = torch.tensor(returns).to(device) if len(returns) > 1: # 避免标准差为0的情况 returns = (returns - returns.mean()) / (returns.std() + 1e-8) # --- 3. 计算策略梯度损失 --- # 策略梯度定理:∇J(θ) = E[∇log π(a|s) * G_t] # 损失函数:L = -∑(log_prob * return) (负号因为要最大化回报) policy_loss = [] for log_prob, return_val in zip(saved_log_probs, returns): policy_loss.append(-log_prob * return_val) # 合并所有损失 policy_loss = torch.cat(policy_loss).sum() # --- 4. 反向传播更新参数 --- optimizer.zero_grad() policy_loss.backward() # 添加梯度裁剪以提高训练稳定性 torch.nn.utils.clip_grad_norm_(policy.parameters(), max_norm=1.0) optimizer.step() # 学习率调度 if lr_scheduler: lr_scheduler.step(i_episode) # 打印训练进度 if i_episode % print_every == 0: current_avg = np.mean(scores_deque) current_lr = optimizer.param_groups[0]['lr'] print(f'Episode {i_episode:6d} | Avg Score: {current_avg:7.2f} | Last Score: {episode_score:7.2f} | Steps: {len(rewards):4d} | LR: {current_lr:.2e}') # 检查是否创造新纪录 if current_avg > best_avg_score: best_avg_score = current_avg print(f"🎉 New best average score: {best_avg_score:.2f}") # 保存最佳模型 if model_path: best_model_path = model_path.replace('.pth', '_best.pth') torch.save({ 'policy_state_dict': policy.state_dict(), 'optimizer_state_dict': optimizer.state_dict(), 's_size': s_size, 'a_size': a_size, 'hidden_size': policy.fc1.out_features, 'scores': scores, 'episode': i_episode, 'best_avg_score': best_avg_score, 'timestamp': datetime.now().isoformat() }, best_model_path) print(f"💾 Best model saved: {best_model_path}") # 定期保存检查点 if model_path and i_episode % (print_every * 2) == 0: checkpoint_path = model_path.replace('.pth', f'_checkpoint_{i_episode}.pth') torch.save({ 'policy_state_dict': policy.state_dict(), 'optimizer_state_dict': optimizer.state_dict(), 's_size': s_size, 'a_size': a_size, 'hidden_size': policy.fc1.out_features, 'scores': scores, 'episode': i_episode, 'timestamp': datetime.now().isoformat() }, checkpoint_path) print(f"💾 Checkpoint saved: {checkpoint_path}") print() return scores # ============================================================ # 评估函数 # ============================================================ def evaluate_policy(policy, eval_env, n_eval_episodes=10): """ 评估策略性能 Args: policy: 训练好的策略网络 eval_env: 评估环境 n_eval_episodes: 评估轮数 Returns: episode_rewards: 每轮奖励列表 mean_reward: 平均奖励 std_reward: 奖励标准差 """ episode_rewards = [] # 设置为评估模式 policy.eval() for i in range(n_eval_episodes): state, _ = eval_env.reset() episode_reward = 0 done = False steps = 0 while not done and steps < 10000: # 添加最大步数限制 # 评估时使用确定性策略(不采样,选择概率最大的动作) with torch.no_grad(): state_tensor = torch.from_numpy(state).float().unsqueeze(0).to(device) probs = policy.forward(state_tensor) action = torch.argmax(probs, dim=1).item() state, reward, terminated, truncated, _ = eval_env.step(action) episode_reward += reward done = terminated or truncated steps += 1 episode_rewards.append(episode_reward) print(f"Eval Episode {i+1:2d}: Reward = {episode_reward:7.2f} | Steps = {steps:4d}") # 恢复训练模式 policy.train() mean_reward = np.mean(episode_rewards) std_reward = np.std(episode_rewards) print(f"\n{'='*50}") print(f"Evaluation Results:") print(f"Mean Reward: {mean_reward:.2f}") print(f"Std Reward: {std_reward:.2f}") print(f"Score (mean - std): {mean_reward - std_reward:.2f}") print(f"Required for Pixelcopter-PLE-v0: 5.0") print(f"{'='*50}") return episode_rewards, mean_reward, std_reward # ============================================================ # 模型加载函数 # ============================================================ def load_model(model_path, policy, optimizer): """ 加载已保存的模型 """ if os.path.exists(model_path): print(f"🔄 Loading existing model from {model_path}") checkpoint = torch.load(model_path, map_location=device, weights_only=False) # 加载模型参数 policy.load_state_dict(checkpoint['policy_state_dict']) optimizer.load_state_dict(checkpoint['optimizer_state_dict']) # 加载训练历史 previous_scores = checkpoint.get('scores', []) episode = checkpoint.get('episode', 0) best_avg_score = checkpoint.get('best_avg_score', -float('inf')) print(f"✅ Model loaded successfully!") print(f"📊 Loaded {len(previous_scores)} previous training episodes") if previous_scores: print(f"🎯 Previous best score: {max(previous_scores):.2f}") print(f"🏆 Previous best average score: {best_avg_score:.2f}") return previous_scores, episode, best_avg_score else: print("🆕 No existing model found, starting fresh training") return [], 0, -float('inf') # ============================================================ # 主训练流程 # ============================================================ if __name__ == "__main__": # 超参数设置 - 针对继续训练优化 HIDDEN_SIZE = 256 INITIAL_LEARNING_RATE = 2e-5 # 继续训练时使用较小的学习率 N_TRAINING_EPISODES = 20000 # 继续训练的轮数 MAX_T = 10000 GAMMA = 0.995 # 稍微提高折扣因子 PRINT_EVERY = 1000 # 模型路径 MODEL_PATH = "/home/eason/Workspace/Result_DRL/reinforce_pixelcopter.pth" print("="*60) print("REINFORCE Continued Training for Pixelcopter-PLE-v0") print("="*60) print(f"📅 Training started at: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") print() # 初始化策略网络和优化器 policy = Policy(s_size, a_size, HIDDEN_SIZE).to(device) optimizer = optim.Adam(policy.parameters(), lr=INITIAL_LEARNING_RATE) # 初始化学习率调度器 lr_scheduler = LearningRateScheduler(optimizer, INITIAL_LEARNING_RATE, decay_rate=0.95, decay_episodes=5000) print(f"🧠 Policy Network: {policy}") print(f"⚙️ Optimizer: Adam (initial_lr={INITIAL_LEARNING_RATE:.2e})") print(f"📈 Training Episodes: {N_TRAINING_EPISODES}") print(f"⏱️ Max Steps per Episode: {MAX_T}") print(f"💰 Discount Factor: {GAMMA}") print() # 尝试加载已有模型 previous_scores, last_episode, best_avg_score = load_model(MODEL_PATH, policy, optimizer) # 更新优化器学习率(确保使用当前设定的学习率) for param_group in optimizer.param_groups: param_group['lr'] = INITIAL_LEARNING_RATE print(f"📚 Current learning rate: {INITIAL_LEARNING_RATE:.2e}") print() # 开始训练 print("🚀 Starting training...") print("-" * 80) scores = reinforce_continued( policy=policy, optimizer=optimizer, n_training_episodes=N_TRAINING_EPISODES, max_t=MAX_T, gamma=GAMMA, print_every=PRINT_EVERY, previous_scores=previous_scores, model_path=MODEL_PATH, lr_scheduler=lr_scheduler ) print("\n" + "="*60) print("Training Completed!") print("="*60) # 保存最终模型 final_model_data = { 'policy_state_dict': policy.state_dict(), 'optimizer_state_dict': optimizer.state_dict(), 's_size': s_size, 'a_size': a_size, 'hidden_size': HIDDEN_SIZE, 'scores': scores, 'total_episodes': len(scores), 'final_avg_score': np.mean(scores[-100:]) if len(scores) >= 100 else np.mean(scores), 'training_completed_at': datetime.now().isoformat(), 'hyperparameters': { 'learning_rate': INITIAL_LEARNING_RATE, 'gamma': GAMMA, 'hidden_size': HIDDEN_SIZE, 'max_t': MAX_T } } torch.save(final_model_data, MODEL_PATH) print(f"✅ Final model saved to {MODEL_PATH}") # 评估训练好的模型 print("\n" + "="*60) print("Evaluating Final Policy") print("="*60) episode_rewards, mean_reward, std_reward = evaluate_policy(policy, eval_env, n_eval_episodes=10) # 训练结果总结 print(f"\n🎉 Final Training Results:") print(f" Total Episodes Trained: {len(scores)}") print(f" Final Average Score (last 100): {np.mean(scores[-100:]) if len(scores) >= 100 else np.mean(scores):.2f}") print(f" Best Single Episode Score: {max(scores):.2f}") print(f" Evaluation Mean Reward: {mean_reward:.2f}") print(f" Evaluation Std Reward: {std_reward:.2f}") print(f" Final Score (mean - std): {mean_reward - std_reward:.2f}") print(f" Required for Pixelcopter-PLE-v0: 5.0") if mean_reward - std_reward >= 5.0: print(f" Status: ✅ PASSED! Congratulations!") else: needed_improvement = 5.0 - (mean_reward - std_reward) print(f" Status: ❌ Need {needed_improvement:.2f} more points") print(f" Suggestion: Continue training with lower learning rate or adjust network architecture") print(f"\n📅 Training completed at: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")