ImaghT's picture
Upload Unit_6.py with huggingface_hub
e9b5b42 verified
# ============================================================
# Unit 6: A2C PandaReachDense-v3 - 带完整训练监控
# ============================================================
import gymnasium as gym
import panda_gym
import numpy as np
import os
from stable_baselines3 import A2C
from stable_baselines3.common.env_util import make_vec_env
from stable_baselines3.common.vec_env import VecNormalize, VecVideoRecorder
from stable_baselines3.common.evaluation import evaluate_policy
from stable_baselines3.common.callbacks import EvalCallback, CheckpointCallback, CallbackList
# ============================================================
# 配置参数
# ============================================================
env_id = "PandaReachDense-v3"
total_timesteps = 2000000
n_envs = 64
# 日志和保存路径
log_dir = "/home/eason/Workspace/RL/Unit_6/logs/"
checkpoint_dir = "/home/eason/Workspace/RL/Unit_6/checkpoints/"
tensorboard_log = "/home/eason/Workspace/RL/Unit_6/tensorboard_logs/"
os.makedirs(log_dir, exist_ok=True)
os.makedirs(checkpoint_dir, exist_ok=True)
os.makedirs(tensorboard_log, exist_ok=True)
# ============================================================
# 第一步:创建训练环境
# ============================================================
print("="*60)
print("创建训练环境")
print("="*60)
env = make_vec_env(env_id, n_envs=n_envs)
env = VecNormalize(env, norm_obs=True, norm_reward=True, clip_obs=10.)
# ============================================================
# 第二步:创建独立的评估环境
# ============================================================
print("\n创建评估环境(用于训练中监控)...")
eval_env = make_vec_env(env_id, n_envs=1)
eval_env = VecNormalize(eval_env, norm_obs=True, norm_reward=True, clip_obs=10.)
# ============================================================
# 第三步:设置回调函数(核心!)
# ============================================================
print("\n配置训练监控回调...")
# 1️⃣ 评估回调:每 500k 步评估一次
eval_callback = EvalCallback(
eval_env,
best_model_save_path=log_dir, # 保存最佳模型
log_path=log_dir, # 保存评估日志
eval_freq=500000, # 每 500k 步评估一次
n_eval_episodes=5, # 每次评估 5 个 episode
deterministic=True,
render=False,
verbose=1
)
# 2️⃣ 检查点回调:每 500k 步保存一次模型
checkpoint_callback = CheckpointCallback(
save_freq=500000, # 每 500k 步保存
save_path=checkpoint_dir,
name_prefix="a2c_panda",
save_replay_buffer=False,
save_vecnormalize=True # 🔥 重要:同时保存归一化参数
)
# 3️⃣ 组合所有回调
callbacks = CallbackList([eval_callback, checkpoint_callback])
# ============================================================
# 第四步:创建模型(启用 TensorBoard)
# ============================================================
print("\n创建模型...")
model = A2C(
policy="MultiInputPolicy",
env=env,
n_steps=4, # 🔥 关键:降低 n_steps 保持 batch 合理
learning_rate=4e-4, # 可能需要略微提高
ent_coef=0.01, # 保持探索
vf_coef=0.5,
max_grad_norm=0.5,
verbose=1,
tensorboard_log=tensorboard_log # 🔥 启用 TensorBoard
)
# ============================================================
# 第五步:开始训练(带监控)
# ============================================================
print("\n" + "="*60)
print("🚀 开始训练(带实时监控)")
print("="*60)
print(f"📊 查看实时训练曲线: tensorboard --logdir {tensorboard_log}")
print(f"📈 评估频率: 每 500,000 步")
print(f"💾 检查点频率: 每 500,000 步")
print("="*60 + "\n")
model.learn(
total_timesteps=total_timesteps,
callback=callbacks,
progress_bar=True # 显示进度条
)
# ============================================================
# 第六步:保存最终模型
# ============================================================
model.save("a2c-PandaReachDense-v3")
env.save("vec_normalize.pkl")
print("\n✅ 训练完成!模型已保存。\n")
# ============================================================
# 第七步:最终评估
# ============================================================
print("="*60)
print("最终评估")
print("="*60)
# 加载最佳模型(如果评估回调保存了)
best_model_path = os.path.join(log_dir, "best_model")
if os.path.exists(best_model_path + ".zip"):
print("加载训练期间的最佳模型...")
eval_env_final = make_vec_env(env_id, n_envs=1)
eval_env_final = VecNormalize.load(
os.path.join(log_dir, "best_model_vecnormalize.pkl"),
eval_env_final
)
eval_env_final.training = False
eval_env_final.norm_reward = False
model_final = A2C.load(best_model_path, env=eval_env_final)
else:
print("使用最终训练模型...")
eval_env_final = make_vec_env(env_id, n_envs=1)
eval_env_final = VecNormalize.load("vec_normalize.pkl", eval_env_final)
eval_env_final.training = False
eval_env_final.norm_reward = False
model_final = model
mean_reward, std_reward = evaluate_policy(
model_final,
eval_env_final,
n_eval_episodes=20, # 最终评估多一些 episodes
deterministic=True
)
score = mean_reward - std_reward
print(f"\n最终评估结果:")
print(f" Mean Reward: {mean_reward:.2f}")
print(f" Std Reward: {std_reward:.2f}")
print(f" Score (mean-std): {score:.2f}")
print(f" 通过基准线: -3.5")
if score >= -3.5:
print(f" ✅ 状态: PASSED")
else:
print(f" ❌ 状态: NOT PASSED (还差 {-3.5 - score:.2f} 分)")
print("="*60 + "\n")
# ============================================================
# 第八步:生成演示视频
# ============================================================
print("="*60)
print("生成演示视频")
print("="*60)
video_folder = "/home/eason/Workspace/RL/Unit_6/videos/"
os.makedirs(video_folder, exist_ok=True)
video_env = make_vec_env(env_id, n_envs=1)
video_env = VecNormalize.load("vec_normalize.pkl", video_env)
video_env.training = False
video_env.norm_reward = False
video_env = VecVideoRecorder(
video_env,
video_folder,
record_video_trigger=lambda x: x == 0,
video_length=500,
name_prefix="panda-reach-agent"
)
obs = video_env.reset()
for _ in range(500):
action, _ = model_final.predict(obs, deterministic=True)
obs, _, _, _ = video_env.step(action)
video_env.close()
print(f"✅ 视频已保存到: {video_folder}")
print("="*60 + "\n")
print("🎉 全部完成!")
print("\n📊 查看训练曲线:")
print(f" tensorboard --logdir {tensorboard_log}")
print(f"\n📈 查看评估日志:")
print(f" cat {log_dir}/evaluations.npz # 使用 numpy 加载")