# ============================================================ # Unit 6_upload.py - 智能上传(优先使用最佳模型) # ============================================================ import gymnasium as gym import panda_gym import numpy as np import os import shutil from stable_baselines3 import A2C from stable_baselines3.common.env_util import make_vec_env from stable_baselines3.common.vec_env import VecNormalize, VecVideoRecorder from stable_baselines3.common.evaluation import evaluate_policy from huggingface_hub import HfApi, create_repo # ============================================================ # 配置参数(⚠️ 修改这里) # ============================================================ USERNAME = "ImaghT" # 你的 HF 用户名 MODEL_NAME = "a2c-PandaReachDense-v3" ENV_ID = "PandaReachDense-v3" N_EVAL_EPISODES = 20 # 增加评估episodes获得更准确结果 repo_id = f"{USERNAME}/{MODEL_NAME}" # ============================================================ # 1. 智能文件检测(优先使用最佳模型) # ============================================================ print("="*60) print("🔍 检测可用模型文件...") print("="*60) # 文件路径定义 BEST_MODEL_PATH = "/home/eason/Workspace/RL/Unit_6/logs/best_model" BEST_VEC_NORMALIZE_PATH = "/home/eason/Workspace/RL/Unit_6/logs/best_model_vecnormalize.pkl" FINAL_MODEL_PATH = "a2c-PandaReachDense-v3" FINAL_VEC_NORMALIZE_PATH = "vec_normalize.pkl" # 🎯 优先级检查:最佳模型 > 最终模型 if os.path.exists(f"{BEST_MODEL_PATH}.zip") and os.path.exists(BEST_VEC_NORMALIZE_PATH): print("✅ 发现训练期间保存的最佳模型(推荐使用)") MODEL_PATH = BEST_MODEL_PATH VEC_NORMALIZE_PATH = BEST_VEC_NORMALIZE_PATH model_source = "best_model" elif os.path.exists(f"{FINAL_MODEL_PATH}.zip") and os.path.exists(FINAL_VEC_NORMALIZE_PATH): print("✅ 发现最终训练模型") MODEL_PATH = FINAL_MODEL_PATH VEC_NORMALIZE_PATH = FINAL_VEC_NORMALIZE_PATH model_source = "final_model" else: print("❌ 错误: 未找到可用的模型文件!") print("\n请确保以下文件之一存在:") print(f" 方案1: {BEST_MODEL_PATH}.zip + {BEST_VEC_NORMALIZE_PATH}") print(f" 方案2: {FINAL_MODEL_PATH}.zip + {FINAL_VEC_NORMALIZE_PATH}") print("\n请先运行 Unit 6.py 训练代码。") exit(1) print(f"📁 使用模型: {MODEL_PATH}") print(f"📁 使用归一化: {VEC_NORMALIZE_PATH}") print(f"📊 模型来源: {model_source}\n") # ============================================================ # 2. 加载模型 # ============================================================ print("加载模型...") eval_env = make_vec_env(ENV_ID, n_envs=1) eval_env = VecNormalize.load(VEC_NORMALIZE_PATH, eval_env) eval_env.training = False eval_env.norm_reward = False model = A2C.load(MODEL_PATH, env=eval_env) print("✅ 模型加载成功\n") # ============================================================ # 3. 评估模型 # ============================================================ print("="*60) print(f"🧪 开始评估 ({N_EVAL_EPISODES} episodes)...") print("="*60) mean_reward, std_reward = evaluate_policy( model, eval_env, n_eval_episodes=N_EVAL_EPISODES, deterministic=True ) score = mean_reward - std_reward print("\n" + "="*60) print("📊 评估结果:") print(f" Mean Reward: {mean_reward:.2f}") print(f" Std Reward: {std_reward:.2f}") print(f" Score (mean-std): {score:.2f}") print(f" 通过基准线: -3.5") if score >= -3.5: print(f" ✅ 状态: PASSED") status_emoji = "✅" else: print(f" ❌ 状态: NOT PASSED (还差 {-3.5 - score:.2f} 分)") status_emoji = "❌" print("="*60 + "\n") # ============================================================ # 4. 生成演示视频 # ============================================================ print("🎬 生成演示视频...") video_folder = "/home/eason/Workspace/RL/Unit_6/video_upload" os.makedirs(video_folder, exist_ok=True) video_env = make_vec_env(ENV_ID, n_envs=1) video_env = VecNormalize.load(VEC_NORMALIZE_PATH, video_env) video_env.training = False video_env.norm_reward = False video_env = VecVideoRecorder( video_env, video_folder, record_video_trigger=lambda x: x == 0, video_length=500, name_prefix="panda-reach-agent" ) obs = video_env.reset() for _ in range(500): action, _ = model.predict(obs, deterministic=True) obs, _, _, _ = video_env.step(action) video_env.close() print(f"✅ 视频已生成\n") # ============================================================ # 5. 检查训练日志(可选信息) # ============================================================ training_info = "" if os.path.exists("/home/eason/Workspace/RL/Unit_6/logs/evaluations.npz"): try: evaluations = np.load("/home/eason/Workspace/RL/Unit_6/logs/evaluations.npz") timesteps = evaluations['timesteps'] results = evaluations['results'] # 获取训练过程信息 total_evals = len(timesteps) final_timestep = timesteps[-1] if len(timesteps) > 0 else "Unknown" best_eval_reward = np.max(results.mean(axis=1)) if len(results) > 0 else "Unknown" training_info = f""" ## Training Monitoring This model was trained with comprehensive monitoring: - **Total Evaluations**: {total_evals} (every 500,000 steps) - **Final Training Step**: {final_timestep:,} - **Best Evaluation Reward**: {best_eval_reward:.2f} - **Model Source**: {"Best model from training" if model_source == "best_model" else "Final training model"} - **Callbacks Used**: EvalCallback, CheckpointCallback - **TensorBoard Logging**: Enabled """ print(f"📈 发现训练日志: {total_evals} 次评估记录") except Exception as e: print(f"⚠️ 读取训练日志失败: {e}") training_info = "\n## Training Monitoring\n\nModel trained with monitoring callbacks.\n" else: training_info = "\n## Training Configuration\n\nStandard training without detailed monitoring.\n" # ============================================================ # 6. 创建增强版 README.md # ============================================================ readme_content = f"""--- library_name: stable-baselines3 tags: - PandaReachDense-v3 - deep-reinforcement-learning - reinforcement-learning - stable-baselines3 - robotics - panda-gym model-index: - name: A2C results: - task: type: reinforcement-learning name: reinforcement-learning dataset: name: PandaReachDense-v3 type: PandaReachDense-v3 metrics: - type: mean_reward value: {mean_reward:.2f} +/- {std_reward:.2f} name: mean_reward verified: false --- # {status_emoji} **A2C** Agent playing **PandaReachDense-v3** This is a trained model of a **A2C** agent playing **PandaReachDense-v3** using the [stable-baselines3 library](https://github.com/DLR-RM/stable-baselines3) and the [Deep Reinforcement Learning Course](https://huggingface.co/deep-rl-course/unit6). This environment is part of the [Panda-Gym](https://github.com/qgallouedec/panda-gym) environments and includes robotic manipulation tasks where the robot arm needs to reach a target position. ## 🏆 Evaluation Results | Metric | Value | |--------|-------| | Mean Reward | {mean_reward:.2f} | | Std Reward | {std_reward:.2f} | | **Score (mean - std)** | **{score:.2f}** | | Baseline Required | -3.5 | | Evaluation Episodes | {N_EVAL_EPISODES} | | Status | {status_emoji} {"**PASSED**" if score >= -3.5 else "**NOT PASSED**"} | | Model Source | {model_source.replace('_', ' ').title()} | {training_info} ## 🚀 Usage ```python import gymnasium as gym import panda_gym from stable_baselines3 import A2C from stable_baselines3.common.env_util import make_vec_env from stable_baselines3.common.vec_env import VecNormalize # Load environment and normalization env = make_vec_env("PandaReachDense-v3", n_envs=1) env = VecNormalize.load("vec_normalize.pkl", env) # ⚠️ CRITICAL: disable training mode and reward normalization at test time env.training = False env.norm_reward = False # Load model model = A2C.load("a2c-PandaReachDense-v3", env=env) # Run inference obs = env.reset() for _ in range(1000): action, _states = model.predict(obs, deterministic=True) obs, reward, done, info = env.step(action) if done: obs = env.reset() ``` ## 🔧 Training Configuration - **Algorithm**: A2C (Advantage Actor-Critic) - **Policy**: MultiInputPolicy (for Dict observation spaces) - **Environment**: PandaReachDense-v3 - **Total Timesteps**: 200,0000 - **Number of Parallel Envs**: 64 - **Normalization**: VecNormalize (observation + reward) - **Observation Clipping**: 10.0 - **Evaluation Frequency**: Every 500,000 steps - **Checkpoint Frequency**: Every 500,000 steps ## 🤖 Model Architecture The agent uses a **MultiInputPolicy** because the observation space is a dictionary containing: - `observation`: Robot joint positions, velocities, and gripper state - `desired_goal`: Target position coordinates (x, y, z) - `achieved_goal`: Current end-effector position coordinates (x, y, z) The goal is to minimize the distance between `achieved_goal` and `desired_goal`. ## 📈 Performance Notes - **Reward Range**: Typically from -50 (far from target) to 0 (at target) - **Success Criteria**: Achieving mean reward > -3.5 consistently - **Episode Length**: Usually 50 steps per episode - **Convergence**: Expect improvement after 200k-500k steps ## 🎯 Tips for Reproduction 1. **Normalization is Critical**: Always use VecNormalize for robotic tasks 2. **MultiInputPolicy Required**: Dict observation spaces need special handling 3. **Sufficient Training**: 1M+ timesteps recommended for stable performance 4. **Evaluation**: Use deterministic=True for consistent evaluation results """ # ============================================================ # 7. 准备上传文件 # ============================================================ print("📦 准备上传文件...") upload_folder = "/home/eason/Workspace/RL/Unit_6/upload_temp" os.makedirs(upload_folder, exist_ok=True) # 保存 README readme_path = os.path.join(upload_folder, "README.md") with open(readme_path, "w", encoding="utf-8") as f: f.write(readme_content) print(f"✅ 创建 README.md") # 复制模型文件(重命名为标准名称) model_dest = os.path.join(upload_folder, f"{MODEL_NAME}.zip") shutil.copy(f"{MODEL_PATH}.zip", model_dest) print(f"✅ 复制模型文件: {MODEL_PATH}.zip -> {MODEL_NAME}.zip") # 复制归一化文件(重命名为标准名称) vec_norm_dest = os.path.join(upload_folder, "vec_normalize.pkl") shutil.copy(VEC_NORMALIZE_PATH, vec_norm_dest) print(f"✅ 复制归一化文件: {VEC_NORMALIZE_PATH} -> vec_normalize.pkl") # 复制视频文件 video_files = [f for f in os.listdir(video_folder) if f.endswith(".mp4")] if video_files: video_src = os.path.join(video_folder, video_files[0]) video_dest = os.path.join(upload_folder, "replay.mp4") shutil.copy(video_src, video_dest) print(f"✅ 复制视频文件") else: print(f"⚠️ 未找到视频文件(可选)") # 可选:复制训练日志 if os.path.exists("/home/eason/Workspace/RL/Unit_6/logs/evaluations.npz"): eval_dest = os.path.join(upload_folder, "training_evaluations.npz") shutil.copy("/home/eason/Workspace/RL/Unit_6/logs/evaluations.npz", eval_dest) print(f"✅ 复制训练评估日志") # ============================================================ # 8. 上传到 Hugging Face Hub # ============================================================ print(f"\n🚀 上传到 {repo_id}...") api = HfApi() try: # 创建仓库(如果已存在则跳过) create_repo(repo_id, repo_type="model", exist_ok=True) print(f"✅ 仓库已创建/验证") except Exception as e: print(f"⚠️ 仓库警告: {e}") try: # 上传整个文件夹 commit_message = f"A2C PandaReach ({model_source}) - Mean: {mean_reward:.2f}, Std: {std_reward:.2f}, Score: {score:.2f}" api.upload_folder( folder_path=upload_folder, repo_id=repo_id, repo_type="model", commit_message=commit_message ) print(f"\n{'='*60}") print("🎉 上传成功!") print(f"{'='*60}") print(f"🔗 模型页面: https://huggingface.co/{repo_id}") print(f"🏆 检查进度: https://huggingface.co/spaces/ThomasSimonini/Check-my-progress-Deep-RL-Course") print(f"📊 模型来源: {model_source.replace('_', ' ').title()}") print(f"🎯 评估分数: {score:.2f} ({'通过' if score >= -3.5 else '未通过'})") print(f"{'='*60}\n") except Exception as e: print(f"\n❌ 上传失败: {e}") print(" 请检查:") print(" 1. 是否已运行 'huggingface-cli login'") print(" 2. 网络连接是否正常") print(" 3. 用户名是否正确\n") finally: # 清理临时文件 shutil.rmtree(upload_folder) print("🧹 清理临时文件") print("✨ 完成!") # ============================================================ # 9. 额外信息输出 # ============================================================ print("\n" + "="*60) print("📋 上传总结") print("="*60) print(f"📁 上传的文件:") print(f" - {MODEL_NAME}.zip (模型)") print(f" - vec_normalize.pkl (归一化参数)") print(f" - README.md (文档)") print(f" - replay.mp4 (演示视频)") if os.path.exists("/home/eason/Workspace/RL/Unit_6/logs/evaluations.npz"): print(f" - training_evaluations.npz (训练日志)") print(f"\n🎯 关键信息:") print(f" - 使用了 {'最佳' if model_source == 'best_model' else '最终'} 模型") print(f" - 评估分数: {score:.2f}") print(f" - 状态: {'✅ 通过' if score >= -3.5 else '❌ 未通过'}") print("="*60)