# ============================================================ # Unit 6: A2C PandaReachDense-v3 - 带完整训练监控 # ============================================================ import gymnasium as gym import panda_gym import numpy as np import os from stable_baselines3 import A2C from stable_baselines3.common.env_util import make_vec_env from stable_baselines3.common.vec_env import VecNormalize, VecVideoRecorder from stable_baselines3.common.evaluation import evaluate_policy from stable_baselines3.common.callbacks import EvalCallback, CheckpointCallback, CallbackList # ============================================================ # 配置参数 # ============================================================ env_id = "PandaReachDense-v3" total_timesteps = 2000000 n_envs = 64 # 日志和保存路径 log_dir = "/home/eason/Workspace/RL/Unit_6/logs/" checkpoint_dir = "/home/eason/Workspace/RL/Unit_6/checkpoints/" tensorboard_log = "/home/eason/Workspace/RL/Unit_6/tensorboard_logs/" os.makedirs(log_dir, exist_ok=True) os.makedirs(checkpoint_dir, exist_ok=True) os.makedirs(tensorboard_log, exist_ok=True) # ============================================================ # 第一步:创建训练环境 # ============================================================ print("="*60) print("创建训练环境") print("="*60) env = make_vec_env(env_id, n_envs=n_envs) env = VecNormalize(env, norm_obs=True, norm_reward=True, clip_obs=10.) # ============================================================ # 第二步:创建独立的评估环境 # ============================================================ print("\n创建评估环境(用于训练中监控)...") eval_env = make_vec_env(env_id, n_envs=1) eval_env = VecNormalize(eval_env, norm_obs=True, norm_reward=True, clip_obs=10.) # ============================================================ # 第三步:设置回调函数(核心!) # ============================================================ print("\n配置训练监控回调...") # 1️⃣ 评估回调:每 500k 步评估一次 eval_callback = EvalCallback( eval_env, best_model_save_path=log_dir, # 保存最佳模型 log_path=log_dir, # 保存评估日志 eval_freq=500000, # 每 500k 步评估一次 n_eval_episodes=5, # 每次评估 5 个 episode deterministic=True, render=False, verbose=1 ) # 2️⃣ 检查点回调:每 500k 步保存一次模型 checkpoint_callback = CheckpointCallback( save_freq=500000, # 每 500k 步保存 save_path=checkpoint_dir, name_prefix="a2c_panda", save_replay_buffer=False, save_vecnormalize=True # 🔥 重要:同时保存归一化参数 ) # 3️⃣ 组合所有回调 callbacks = CallbackList([eval_callback, checkpoint_callback]) # ============================================================ # 第四步:创建模型(启用 TensorBoard) # ============================================================ print("\n创建模型...") model = A2C( policy="MultiInputPolicy", env=env, n_steps=4, # 🔥 关键:降低 n_steps 保持 batch 合理 learning_rate=4e-4, # 可能需要略微提高 ent_coef=0.01, # 保持探索 vf_coef=0.5, max_grad_norm=0.5, verbose=1, tensorboard_log=tensorboard_log # 🔥 启用 TensorBoard ) # ============================================================ # 第五步:开始训练(带监控) # ============================================================ print("\n" + "="*60) print("🚀 开始训练(带实时监控)") print("="*60) print(f"📊 查看实时训练曲线: tensorboard --logdir {tensorboard_log}") print(f"📈 评估频率: 每 500,000 步") print(f"💾 检查点频率: 每 500,000 步") print("="*60 + "\n") model.learn( total_timesteps=total_timesteps, callback=callbacks, progress_bar=True # 显示进度条 ) # ============================================================ # 第六步:保存最终模型 # ============================================================ model.save("a2c-PandaReachDense-v3") env.save("vec_normalize.pkl") print("\n✅ 训练完成!模型已保存。\n") # ============================================================ # 第七步:最终评估 # ============================================================ print("="*60) print("最终评估") print("="*60) # 加载最佳模型(如果评估回调保存了) best_model_path = os.path.join(log_dir, "best_model") if os.path.exists(best_model_path + ".zip"): print("加载训练期间的最佳模型...") eval_env_final = make_vec_env(env_id, n_envs=1) eval_env_final = VecNormalize.load( os.path.join(log_dir, "best_model_vecnormalize.pkl"), eval_env_final ) eval_env_final.training = False eval_env_final.norm_reward = False model_final = A2C.load(best_model_path, env=eval_env_final) else: print("使用最终训练模型...") eval_env_final = make_vec_env(env_id, n_envs=1) eval_env_final = VecNormalize.load("vec_normalize.pkl", eval_env_final) eval_env_final.training = False eval_env_final.norm_reward = False model_final = model mean_reward, std_reward = evaluate_policy( model_final, eval_env_final, n_eval_episodes=20, # 最终评估多一些 episodes deterministic=True ) score = mean_reward - std_reward print(f"\n最终评估结果:") print(f" Mean Reward: {mean_reward:.2f}") print(f" Std Reward: {std_reward:.2f}") print(f" Score (mean-std): {score:.2f}") print(f" 通过基准线: -3.5") if score >= -3.5: print(f" ✅ 状态: PASSED") else: print(f" ❌ 状态: NOT PASSED (还差 {-3.5 - score:.2f} 分)") print("="*60 + "\n") # ============================================================ # 第八步:生成演示视频 # ============================================================ print("="*60) print("生成演示视频") print("="*60) video_folder = "/home/eason/Workspace/RL/Unit_6/videos/" os.makedirs(video_folder, exist_ok=True) video_env = make_vec_env(env_id, n_envs=1) video_env = VecNormalize.load("vec_normalize.pkl", video_env) video_env.training = False video_env.norm_reward = False video_env = VecVideoRecorder( video_env, video_folder, record_video_trigger=lambda x: x == 0, video_length=500, name_prefix="panda-reach-agent" ) obs = video_env.reset() for _ in range(500): action, _ = model_final.predict(obs, deterministic=True) obs, _, _, _ = video_env.step(action) video_env.close() print(f"✅ 视频已保存到: {video_folder}") print("="*60 + "\n") print("🎉 全部完成!") print("\n📊 查看训练曲线:") print(f" tensorboard --logdir {tensorboard_log}") print(f"\n📈 查看评估日志:") print(f" cat {log_dir}/evaluations.npz # 使用 numpy 加载")