Spaces:
Sleeping
Sleeping
| """ | |
| Phantom Video Processor - Hugging Face Space Demo | |
| 将人类手部视频转换为机器人演示数据 | |
| """ | |
| import gradio as gr | |
| import spaces | |
| import subprocess | |
| import sys | |
| import os | |
| import shutil | |
| import tempfile | |
| from pathlib import Path | |
| # ========== 路径配置 ========== | |
| PHANTOM_DIR = Path("/home/user/app/phantom") | |
| DATA_RAW_DIR = PHANTOM_DIR / "data" / "raw" | |
| DATA_PROCESSED_DIR = PHANTOM_DIR / "data" / "processed" | |
| MANO_DIR = PHANTOM_DIR / "submodules" / "phantom-hamer" / "_DATA" / "data" / "mano" | |
| # 添加 Phantom 到 Python 路径 | |
| if PHANTOM_DIR.exists(): | |
| sys.path.insert(0, str(PHANTOM_DIR)) | |
| sys.path.insert(0, str(PHANTOM_DIR / "phantom")) | |
| # ========== 环境检测 ========== | |
| def check_environment(): | |
| """检查环境状态""" | |
| status = { | |
| "phantom_installed": Path("/tmp/.phantom_ready").exists(), | |
| "mano_ready": (MANO_DIR / "MANO_LEFT.pkl").exists() and (MANO_DIR / "MANO_RIGHT.pkl").exists(), | |
| "sample_data": (DATA_RAW_DIR / "pick_and_place").exists(), | |
| "cuda_available": False, | |
| "gpu_name": None | |
| } | |
| try: | |
| import torch | |
| status["cuda_available"] = torch.cuda.is_available() | |
| if status["cuda_available"]: | |
| status["gpu_name"] = torch.cuda.get_device_name(0) | |
| except: | |
| pass | |
| return status | |
| def get_status_text(): | |
| """获取状态文本""" | |
| status = check_environment() | |
| lines = [] | |
| lines.append("=" * 40) | |
| lines.append("环境状态") | |
| lines.append("=" * 40) | |
| lines.append(f"Phantom 安装: {'✅' if status['phantom_installed'] else '❌ 首次运行需初始化'}") | |
| lines.append(f"MANO 模型: {'✅' if status['mano_ready'] else '❌ 请上传 MANO 模型文件'}") | |
| lines.append(f"示例数据: {'✅' if status['sample_data'] else '⏳ 将自动下载'}") | |
| lines.append(f"CUDA: {'✅ ' + (status['gpu_name'] or '') if status['cuda_available'] else '⏳ GPU 将在处理时分配'}") | |
| lines.append("=" * 40) | |
| return "\n".join(lines) | |
| # ========== MANO 模型上传 ========== | |
| def upload_mano_files(left_file, right_file): | |
| """上传 MANO 模型文件""" | |
| MANO_DIR.mkdir(parents=True, exist_ok=True) | |
| messages = [] | |
| if left_file is not None: | |
| dest = MANO_DIR / "MANO_LEFT.pkl" | |
| shutil.copy(left_file.name, dest) | |
| messages.append(f"✅ MANO_LEFT.pkl 已保存") | |
| if right_file is not None: | |
| dest = MANO_DIR / "MANO_RIGHT.pkl" | |
| shutil.copy(right_file.name, dest) | |
| messages.append(f"✅ MANO_RIGHT.pkl 已保存") | |
| if not messages: | |
| return "⚠️ 请选择文件上传" | |
| return "\n".join(messages) + "\n\n" + get_status_text() | |
| # ========== 初始化环境 ========== | |
| def initialize_environment(progress=gr.Progress()): | |
| """初始化 Phantom 环境""" | |
| if Path("/tmp/.phantom_ready").exists(): | |
| return "✅ 环境已就绪\n\n" + get_status_text() | |
| progress(0, desc="开始初始化...") | |
| setup_script = Path("/home/user/app/setup.sh") | |
| if not setup_script.exists(): | |
| return "❌ setup.sh 不存在" | |
| try: | |
| # 运行 setup.sh | |
| progress(0.1, desc="运行安装脚本...") | |
| process = subprocess.Popen( | |
| ["bash", str(setup_script)], | |
| stdout=subprocess.PIPE, | |
| stderr=subprocess.STDOUT, | |
| text=True, | |
| bufsize=1 | |
| ) | |
| output_lines = [] | |
| for line in iter(process.stdout.readline, ''): | |
| output_lines.append(line.strip()) | |
| if len(output_lines) > 50: | |
| output_lines = output_lines[-50:] # 保留最后 50 行 | |
| process.wait() | |
| if process.returncode == 0: | |
| progress(1.0, desc="完成!") | |
| return "✅ 初始化完成!\n\n" + "\n".join(output_lines[-20:]) + "\n\n" + get_status_text() | |
| else: | |
| return f"❌ 初始化失败 (返回码: {process.returncode})\n\n" + "\n".join(output_lines[-30:]) | |
| except Exception as e: | |
| return f"❌ 初始化错误: {str(e)}" | |
| # ========== 视频处理 ========== | |
| def process_video( | |
| video_file, | |
| robot_type, | |
| target_hand, | |
| processing_mode, | |
| use_sample_data, | |
| progress=gr.Progress() | |
| ): | |
| """ | |
| 处理视频 - 将人类手部转换为机器人 | |
| """ | |
| import torch | |
| # 状态信息 | |
| status_lines = [] | |
| # GPU 检查 | |
| if torch.cuda.is_available(): | |
| gpu = torch.cuda.get_device_name(0) | |
| status_lines.append(f"✅ GPU: {gpu}") | |
| status_lines.append(f" VRAM: {torch.cuda.get_device_properties(0).total_memory / 1e9:.1f} GB") | |
| else: | |
| status_lines.append("❌ GPU 不可用") | |
| return None, None, "\n".join(status_lines) | |
| # 检查环境 | |
| if not Path("/tmp/.phantom_ready").exists(): | |
| status_lines.append("❌ 请先点击「初始化环境」按钮") | |
| return None, None, "\n".join(status_lines) | |
| # 检查 MANO | |
| if not (MANO_DIR / "MANO_LEFT.pkl").exists(): | |
| status_lines.append("❌ 请先上传 MANO 模型文件") | |
| return None, None, "\n".join(status_lines) | |
| progress(0.1, desc="准备处理...") | |
| # 确定输入数据 | |
| if use_sample_data: | |
| demo_name = "pick_and_place" | |
| data_root = str(DATA_RAW_DIR) | |
| status_lines.append(f"📂 使用示例数据: {demo_name}") | |
| else: | |
| if video_file is None: | |
| status_lines.append("❌ 请上传视频或选择使用示例数据") | |
| return None, None, "\n".join(status_lines) | |
| # 创建临时目录存放上传的视频 | |
| demo_name = "user_upload" | |
| user_data_dir = DATA_RAW_DIR / demo_name / "0" | |
| user_data_dir.mkdir(parents=True, exist_ok=True) | |
| # 复制视频到正确位置 | |
| video_dest = user_data_dir / "video.mkv" | |
| shutil.copy(video_file, video_dest) | |
| data_root = str(DATA_RAW_DIR) | |
| status_lines.append(f"📂 处理上传视频: {video_file}") | |
| status_lines.append(f"🤖 机器人类型: {robot_type}") | |
| status_lines.append(f"✋ 目标手部: {target_hand}") | |
| status_lines.append(f"⚙️ 处理模式: {processing_mode}") | |
| status_lines.append("-" * 40) | |
| progress(0.2, desc="开始处理...") | |
| # 构建处理命令 | |
| cmd = [ | |
| sys.executable, | |
| str(PHANTOM_DIR / "phantom" / "process_data.py"), | |
| f"demo_name={demo_name}", | |
| f"data_root_dir={data_root}", | |
| f"processed_data_root_dir={str(DATA_PROCESSED_DIR)}", | |
| f"mode={processing_mode}", | |
| f"robot={robot_type}", | |
| f"target_hand={target_hand}", | |
| "bimanual_setup=single_arm", | |
| "demo_num=0", # 只处理第一个 demo | |
| ] | |
| status_lines.append(f"命令: {' '.join(cmd)}") | |
| try: | |
| # 运行处理 | |
| progress(0.3, desc="处理中...") | |
| process = subprocess.Popen( | |
| cmd, | |
| stdout=subprocess.PIPE, | |
| stderr=subprocess.STDOUT, | |
| text=True, | |
| cwd=str(PHANTOM_DIR / "phantom"), | |
| env={**os.environ, "PYTHONPATH": str(PHANTOM_DIR)} | |
| ) | |
| output_lines = [] | |
| for line in iter(process.stdout.readline, ''): | |
| line = line.strip() | |
| if line: | |
| output_lines.append(line) | |
| # 更新进度 | |
| if "BBOX" in line: | |
| progress(0.4, desc="检测边界框...") | |
| elif "HAND2D" in line: | |
| progress(0.5, desc="提取2D手部姿态...") | |
| elif "SEGMENTATION" in line: | |
| progress(0.6, desc="分割手臂...") | |
| elif "ACTION" in line: | |
| progress(0.7, desc="提取动作...") | |
| elif "INPAINT" in line: | |
| progress(0.8, desc="视频修复...") | |
| elif "ROBOT" in line: | |
| progress(0.9, desc="叠加机器人...") | |
| process.wait() | |
| progress(1.0, desc="完成!") | |
| # 添加处理输出 | |
| status_lines.append("-" * 40) | |
| status_lines.append("处理日志 (最后 20 行):") | |
| status_lines.extend(output_lines[-20:]) | |
| # 查找输出文件 | |
| output_video = None | |
| output_data = None | |
| processed_dir = DATA_PROCESSED_DIR / demo_name / "0" | |
| # 查找生成的视频 | |
| video_pattern = f"video_overlay_{robot_type}_single_arm.mkv" | |
| for f in processed_dir.glob("**/*.mkv"): | |
| if robot_type.lower() in f.name.lower(): | |
| output_video = str(f) | |
| break | |
| # 查找训练数据 | |
| for f in processed_dir.glob("**/training_data*.npz"): | |
| output_data = str(f) | |
| break | |
| if output_video: | |
| status_lines.append(f"\n✅ 输出视频: {output_video}") | |
| if output_data: | |
| status_lines.append(f"✅ 训练数据: {output_data}") | |
| if process.returncode == 0: | |
| status_lines.insert(0, "✅ 处理完成!") | |
| else: | |
| status_lines.insert(0, f"⚠️ 处理完成但有警告 (返回码: {process.returncode})") | |
| return output_video, output_data, "\n".join(status_lines) | |
| except Exception as e: | |
| import traceback | |
| status_lines.append(f"\n❌ 处理错误: {str(e)}") | |
| status_lines.append(traceback.format_exc()) | |
| return None, None, "\n".join(status_lines) | |
| # ========== Gradio 界面 ========== | |
| with gr.Blocks( | |
| title="Phantom - 机器人视频生成器", | |
| theme=gr.themes.Soft() | |
| ) as demo: | |
| gr.Markdown(""" | |
| # 🤖 Phantom - 将人类视频转换为机器人演示 | |
| **论文**: [Phantom: Training Robots Without Robots Using Only Human Videos](https://phantom-human-videos.github.io/) | |
| 将人类手部操作视频自动转换为机器人演示数据,用于训练机器人策略。 | |
| """) | |
| with gr.Tabs(): | |
| # ========== 环境设置 Tab ========== | |
| with gr.TabItem("1️⃣ 环境设置"): | |
| gr.Markdown(""" | |
| ### 首次使用需要完成以下步骤: | |
| 1. **初始化环境** - 安装依赖和下载模型 (首次约 5-10 分钟) | |
| 2. **上传 MANO 模型** - 需要从官网注册下载 | |
| """) | |
| with gr.Row(): | |
| with gr.Column(): | |
| init_btn = gr.Button("🔧 初始化环境", variant="primary", size="lg") | |
| init_output = gr.Textbox( | |
| label="初始化状态", | |
| lines=15, | |
| value=get_status_text() | |
| ) | |
| with gr.Column(): | |
| gr.Markdown(""" | |
| ### MANO 模型下载 | |
| 1. 访问 [MANO 官网](https://mano.is.tue.mpg.de/) | |
| 2. 注册账号并下载模型 | |
| 3. 上传 `MANO_LEFT.pkl` 和 `MANO_RIGHT.pkl` | |
| """) | |
| mano_left = gr.File(label="MANO_LEFT.pkl", file_types=[".pkl"]) | |
| mano_right = gr.File(label="MANO_RIGHT.pkl", file_types=[".pkl"]) | |
| upload_btn = gr.Button("📤 上传 MANO 模型") | |
| upload_output = gr.Textbox(label="上传状态", lines=5) | |
| init_btn.click(fn=initialize_environment, outputs=init_output) | |
| upload_btn.click(fn=upload_mano_files, inputs=[mano_left, mano_right], outputs=upload_output) | |
| # ========== 视频处理 Tab ========== | |
| with gr.TabItem("2️⃣ 视频处理"): | |
| with gr.Row(): | |
| with gr.Column(): | |
| gr.Markdown("### 输入设置") | |
| use_sample = gr.Checkbox( | |
| label="使用示例数据 (pick_and_place)", | |
| value=True, | |
| info="推荐首次使用时勾选,使用预置的示例视频" | |
| ) | |
| video_input = gr.Video( | |
| label="或上传自己的视频", | |
| interactive=True | |
| ) | |
| robot_type = gr.Dropdown( | |
| choices=["Panda", "Kinova3", "UR5e", "IIWA", "Jaco"], | |
| value="Panda", | |
| label="机器人类型" | |
| ) | |
| target_hand = gr.Radio( | |
| choices=["left", "right"], | |
| value="left", | |
| label="目标手部" | |
| ) | |
| processing_mode = gr.Dropdown( | |
| choices=[ | |
| "bbox", | |
| "hand2d", | |
| "arm_segmentation", | |
| "hand_inpaint", | |
| "robot_inpaint", | |
| "all" | |
| ], | |
| value="bbox", | |
| label="处理模式", | |
| info="建议逐步运行: bbox -> hand2d -> arm_segmentation -> hand_inpaint -> robot_inpaint" | |
| ) | |
| process_btn = gr.Button("🚀 开始处理", variant="primary", size="lg") | |
| with gr.Column(): | |
| gr.Markdown("### 输出结果") | |
| video_output = gr.Video(label="生成的机器人视频") | |
| data_output = gr.File(label="训练数据 (NPZ)") | |
| status_output = gr.Textbox(label="处理状态", lines=20) | |
| process_btn.click( | |
| fn=process_video, | |
| inputs=[video_input, robot_type, target_hand, processing_mode, use_sample], | |
| outputs=[video_output, data_output, status_output] | |
| ) | |
| # ========== 说明 Tab ========== | |
| with gr.TabItem("📖 说明"): | |
| gr.Markdown(""" | |
| ## 处理流程 | |
| Phantom 将人类手部视频转换为机器人演示数据,处理步骤: | |
| | 步骤 | 模式 | 描述 | | |
| |------|------|------| | |
| | 1 | `bbox` | 检测手部边界框 | | |
| | 2 | `hand2d` | 提取 2D 手部姿态 | | |
| | 3 | `arm_segmentation` | 分割人类手臂 | | |
| | 4 | `hand_inpaint` | 移除手臂并修复背景 | | |
| | 5 | `robot_inpaint` | 叠加虚拟机器人 | | |
| ## 输入要求 | |
| - **视频格式**: MKV, MP4 等常见格式 | |
| - **分辨率**: 推荐 1080p | |
| - **内容**: 单手操作视频,手部需清晰可见 | |
| ## GPU Zero 限制 | |
| - 单次处理时间限制: 300 秒 | |
| - 建议逐步运行各处理模式 | |
| - 复杂视频可能需要多次处理 | |
| ## 参考资料 | |
| - [Phantom 论文](https://arxiv.org/abs/2503.00779) | |
| - [GitHub 仓库](https://github.com/MarionLepert/phantom) | |
| - [MANO 手部模型](https://mano.is.tue.mpg.de/) | |
| """) | |
| # 启动 | |
| if __name__ == "__main__": | |
| demo.queue().launch() | |