a2c-PandaReachDense-v3 / Unit_6_upload.py
ImaghT's picture
Upload Unit_6_upload.py with huggingface_hub
eef4ddc verified
# ============================================================
# Unit 6_upload.py - 智能上传(优先使用最佳模型)
# ============================================================
import gymnasium as gym
import panda_gym
import numpy as np
import os
import shutil
from stable_baselines3 import A2C
from stable_baselines3.common.env_util import make_vec_env
from stable_baselines3.common.vec_env import VecNormalize, VecVideoRecorder
from stable_baselines3.common.evaluation import evaluate_policy
from huggingface_hub import HfApi, create_repo
# ============================================================
# 配置参数(⚠️ 修改这里)
# ============================================================
USERNAME = "ImaghT" # 你的 HF 用户名
MODEL_NAME = "a2c-PandaReachDense-v3"
ENV_ID = "PandaReachDense-v3"
N_EVAL_EPISODES = 20 # 增加评估episodes获得更准确结果
repo_id = f"{USERNAME}/{MODEL_NAME}"
# ============================================================
# 1. 智能文件检测(优先使用最佳模型)
# ============================================================
print("="*60)
print("🔍 检测可用模型文件...")
print("="*60)
# 文件路径定义
BEST_MODEL_PATH = "/home/eason/Workspace/RL/Unit_6/logs/best_model"
BEST_VEC_NORMALIZE_PATH = "/home/eason/Workspace/RL/Unit_6/logs/best_model_vecnormalize.pkl"
FINAL_MODEL_PATH = "a2c-PandaReachDense-v3"
FINAL_VEC_NORMALIZE_PATH = "vec_normalize.pkl"
# 🎯 优先级检查:最佳模型 > 最终模型
if os.path.exists(f"{BEST_MODEL_PATH}.zip") and os.path.exists(BEST_VEC_NORMALIZE_PATH):
print("✅ 发现训练期间保存的最佳模型(推荐使用)")
MODEL_PATH = BEST_MODEL_PATH
VEC_NORMALIZE_PATH = BEST_VEC_NORMALIZE_PATH
model_source = "best_model"
elif os.path.exists(f"{FINAL_MODEL_PATH}.zip") and os.path.exists(FINAL_VEC_NORMALIZE_PATH):
print("✅ 发现最终训练模型")
MODEL_PATH = FINAL_MODEL_PATH
VEC_NORMALIZE_PATH = FINAL_VEC_NORMALIZE_PATH
model_source = "final_model"
else:
print("❌ 错误: 未找到可用的模型文件!")
print("\n请确保以下文件之一存在:")
print(f" 方案1: {BEST_MODEL_PATH}.zip + {BEST_VEC_NORMALIZE_PATH}")
print(f" 方案2: {FINAL_MODEL_PATH}.zip + {FINAL_VEC_NORMALIZE_PATH}")
print("\n请先运行 Unit 6.py 训练代码。")
exit(1)
print(f"📁 使用模型: {MODEL_PATH}")
print(f"📁 使用归一化: {VEC_NORMALIZE_PATH}")
print(f"📊 模型来源: {model_source}\n")
# ============================================================
# 2. 加载模型
# ============================================================
print("加载模型...")
eval_env = make_vec_env(ENV_ID, n_envs=1)
eval_env = VecNormalize.load(VEC_NORMALIZE_PATH, eval_env)
eval_env.training = False
eval_env.norm_reward = False
model = A2C.load(MODEL_PATH, env=eval_env)
print("✅ 模型加载成功\n")
# ============================================================
# 3. 评估模型
# ============================================================
print("="*60)
print(f"🧪 开始评估 ({N_EVAL_EPISODES} episodes)...")
print("="*60)
mean_reward, std_reward = evaluate_policy(
model,
eval_env,
n_eval_episodes=N_EVAL_EPISODES,
deterministic=True
)
score = mean_reward - std_reward
print("\n" + "="*60)
print("📊 评估结果:")
print(f" Mean Reward: {mean_reward:.2f}")
print(f" Std Reward: {std_reward:.2f}")
print(f" Score (mean-std): {score:.2f}")
print(f" 通过基准线: -3.5")
if score >= -3.5:
print(f" ✅ 状态: PASSED")
status_emoji = "✅"
else:
print(f" ❌ 状态: NOT PASSED (还差 {-3.5 - score:.2f} 分)")
status_emoji = "❌"
print("="*60 + "\n")
# ============================================================
# 4. 生成演示视频
# ============================================================
print("🎬 生成演示视频...")
video_folder = "/home/eason/Workspace/RL/Unit_6/video_upload"
os.makedirs(video_folder, exist_ok=True)
video_env = make_vec_env(ENV_ID, n_envs=1)
video_env = VecNormalize.load(VEC_NORMALIZE_PATH, video_env)
video_env.training = False
video_env.norm_reward = False
video_env = VecVideoRecorder(
video_env,
video_folder,
record_video_trigger=lambda x: x == 0,
video_length=500,
name_prefix="panda-reach-agent"
)
obs = video_env.reset()
for _ in range(500):
action, _ = model.predict(obs, deterministic=True)
obs, _, _, _ = video_env.step(action)
video_env.close()
print(f"✅ 视频已生成\n")
# ============================================================
# 5. 检查训练日志(可选信息)
# ============================================================
training_info = ""
if os.path.exists("/home/eason/Workspace/RL/Unit_6/logs/evaluations.npz"):
try:
evaluations = np.load("/home/eason/Workspace/RL/Unit_6/logs/evaluations.npz")
timesteps = evaluations['timesteps']
results = evaluations['results']
# 获取训练过程信息
total_evals = len(timesteps)
final_timestep = timesteps[-1] if len(timesteps) > 0 else "Unknown"
best_eval_reward = np.max(results.mean(axis=1)) if len(results) > 0 else "Unknown"
training_info = f"""
## Training Monitoring
This model was trained with comprehensive monitoring:
- **Total Evaluations**: {total_evals} (every 500,000 steps)
- **Final Training Step**: {final_timestep:,}
- **Best Evaluation Reward**: {best_eval_reward:.2f}
- **Model Source**: {"Best model from training" if model_source == "best_model" else "Final training model"}
- **Callbacks Used**: EvalCallback, CheckpointCallback
- **TensorBoard Logging**: Enabled
"""
print(f"📈 发现训练日志: {total_evals} 次评估记录")
except Exception as e:
print(f"⚠️ 读取训练日志失败: {e}")
training_info = "\n## Training Monitoring\n\nModel trained with monitoring callbacks.\n"
else:
training_info = "\n## Training Configuration\n\nStandard training without detailed monitoring.\n"
# ============================================================
# 6. 创建增强版 README.md
# ============================================================
readme_content = f"""---
library_name: stable-baselines3
tags:
- PandaReachDense-v3
- deep-reinforcement-learning
- reinforcement-learning
- stable-baselines3
- robotics
- panda-gym
model-index:
- name: A2C
results:
- task:
type: reinforcement-learning
name: reinforcement-learning
dataset:
name: PandaReachDense-v3
type: PandaReachDense-v3
metrics:
- type: mean_reward
value: {mean_reward:.2f} +/- {std_reward:.2f}
name: mean_reward
verified: false
---
# {status_emoji} **A2C** Agent playing **PandaReachDense-v3**
This is a trained model of a **A2C** agent playing **PandaReachDense-v3**
using the [stable-baselines3 library](https://github.com/DLR-RM/stable-baselines3)
and the [Deep Reinforcement Learning Course](https://huggingface.co/deep-rl-course/unit6).
This environment is part of the [Panda-Gym](https://github.com/qgallouedec/panda-gym) environments and includes robotic manipulation tasks where the robot arm needs to reach a target position.
## 🏆 Evaluation Results
| Metric | Value |
|--------|-------|
| Mean Reward | {mean_reward:.2f} |
| Std Reward | {std_reward:.2f} |
| **Score (mean - std)** | **{score:.2f}** |
| Baseline Required | -3.5 |
| Evaluation Episodes | {N_EVAL_EPISODES} |
| Status | {status_emoji} {"**PASSED**" if score >= -3.5 else "**NOT PASSED**"} |
| Model Source | {model_source.replace('_', ' ').title()} |
{training_info}
## 🚀 Usage
```python
import gymnasium as gym
import panda_gym
from stable_baselines3 import A2C
from stable_baselines3.common.env_util import make_vec_env
from stable_baselines3.common.vec_env import VecNormalize
# Load environment and normalization
env = make_vec_env("PandaReachDense-v3", n_envs=1)
env = VecNormalize.load("vec_normalize.pkl", env)
# ⚠️ CRITICAL: disable training mode and reward normalization at test time
env.training = False
env.norm_reward = False
# Load model
model = A2C.load("a2c-PandaReachDense-v3", env=env)
# Run inference
obs = env.reset()
for _ in range(1000):
action, _states = model.predict(obs, deterministic=True)
obs, reward, done, info = env.step(action)
if done:
obs = env.reset()
```
## 🔧 Training Configuration
- **Algorithm**: A2C (Advantage Actor-Critic)
- **Policy**: MultiInputPolicy (for Dict observation spaces)
- **Environment**: PandaReachDense-v3
- **Total Timesteps**: 200,0000
- **Number of Parallel Envs**: 64
- **Normalization**: VecNormalize (observation + reward)
- **Observation Clipping**: 10.0
- **Evaluation Frequency**: Every 500,000 steps
- **Checkpoint Frequency**: Every 500,000 steps
## 🤖 Model Architecture
The agent uses a **MultiInputPolicy** because the observation space is a dictionary containing:
- `observation`: Robot joint positions, velocities, and gripper state
- `desired_goal`: Target position coordinates (x, y, z)
- `achieved_goal`: Current end-effector position coordinates (x, y, z)
The goal is to minimize the distance between `achieved_goal` and `desired_goal`.
## 📈 Performance Notes
- **Reward Range**: Typically from -50 (far from target) to 0 (at target)
- **Success Criteria**: Achieving mean reward > -3.5 consistently
- **Episode Length**: Usually 50 steps per episode
- **Convergence**: Expect improvement after 200k-500k steps
## 🎯 Tips for Reproduction
1. **Normalization is Critical**: Always use VecNormalize for robotic tasks
2. **MultiInputPolicy Required**: Dict observation spaces need special handling
3. **Sufficient Training**: 1M+ timesteps recommended for stable performance
4. **Evaluation**: Use deterministic=True for consistent evaluation results
"""
# ============================================================
# 7. 准备上传文件
# ============================================================
print("📦 准备上传文件...")
upload_folder = "/home/eason/Workspace/RL/Unit_6/upload_temp"
os.makedirs(upload_folder, exist_ok=True)
# 保存 README
readme_path = os.path.join(upload_folder, "README.md")
with open(readme_path, "w", encoding="utf-8") as f:
f.write(readme_content)
print(f"✅ 创建 README.md")
# 复制模型文件(重命名为标准名称)
model_dest = os.path.join(upload_folder, f"{MODEL_NAME}.zip")
shutil.copy(f"{MODEL_PATH}.zip", model_dest)
print(f"✅ 复制模型文件: {MODEL_PATH}.zip -> {MODEL_NAME}.zip")
# 复制归一化文件(重命名为标准名称)
vec_norm_dest = os.path.join(upload_folder, "vec_normalize.pkl")
shutil.copy(VEC_NORMALIZE_PATH, vec_norm_dest)
print(f"✅ 复制归一化文件: {VEC_NORMALIZE_PATH} -> vec_normalize.pkl")
# 复制视频文件
video_files = [f for f in os.listdir(video_folder) if f.endswith(".mp4")]
if video_files:
video_src = os.path.join(video_folder, video_files[0])
video_dest = os.path.join(upload_folder, "replay.mp4")
shutil.copy(video_src, video_dest)
print(f"✅ 复制视频文件")
else:
print(f"⚠️ 未找到视频文件(可选)")
# 可选:复制训练日志
if os.path.exists("/home/eason/Workspace/RL/Unit_6/logs/evaluations.npz"):
eval_dest = os.path.join(upload_folder, "training_evaluations.npz")
shutil.copy("/home/eason/Workspace/RL/Unit_6/logs/evaluations.npz", eval_dest)
print(f"✅ 复制训练评估日志")
# ============================================================
# 8. 上传到 Hugging Face Hub
# ============================================================
print(f"\n🚀 上传到 {repo_id}...")
api = HfApi()
try:
# 创建仓库(如果已存在则跳过)
create_repo(repo_id, repo_type="model", exist_ok=True)
print(f"✅ 仓库已创建/验证")
except Exception as e:
print(f"⚠️ 仓库警告: {e}")
try:
# 上传整个文件夹
commit_message = f"A2C PandaReach ({model_source}) - Mean: {mean_reward:.2f}, Std: {std_reward:.2f}, Score: {score:.2f}"
api.upload_folder(
folder_path=upload_folder,
repo_id=repo_id,
repo_type="model",
commit_message=commit_message
)
print(f"\n{'='*60}")
print("🎉 上传成功!")
print(f"{'='*60}")
print(f"🔗 模型页面: https://huggingface.co/{repo_id}")
print(f"🏆 检查进度: https://huggingface.co/spaces/ThomasSimonini/Check-my-progress-Deep-RL-Course")
print(f"📊 模型来源: {model_source.replace('_', ' ').title()}")
print(f"🎯 评估分数: {score:.2f} ({'通过' if score >= -3.5 else '未通过'})")
print(f"{'='*60}\n")
except Exception as e:
print(f"\n❌ 上传失败: {e}")
print(" 请检查:")
print(" 1. 是否已运行 'huggingface-cli login'")
print(" 2. 网络连接是否正常")
print(" 3. 用户名是否正确\n")
finally:
# 清理临时文件
shutil.rmtree(upload_folder)
print("🧹 清理临时文件")
print("✨ 完成!")
# ============================================================
# 9. 额外信息输出
# ============================================================
print("\n" + "="*60)
print("📋 上传总结")
print("="*60)
print(f"📁 上传的文件:")
print(f" - {MODEL_NAME}.zip (模型)")
print(f" - vec_normalize.pkl (归一化参数)")
print(f" - README.md (文档)")
print(f" - replay.mp4 (演示视频)")
if os.path.exists("/home/eason/Workspace/RL/Unit_6/logs/evaluations.npz"):
print(f" - training_evaluations.npz (训练日志)")
print(f"\n🎯 关键信息:")
print(f" - 使用了 {'最佳' if model_source == 'best_model' else '最终'} 模型")
print(f" - 评估分数: {score:.2f}")
print(f" - 状态: {'✅ 通过' if score >= -3.5 else '❌ 未通过'}")
print("="*60)