""" Phantom Video Processor - Hugging Face Space Demo 将人类手部视频转换为机器人演示数据 """ import gradio as gr import spaces import subprocess import sys import os import shutil import tempfile from pathlib import Path # ========== 路径配置 ========== PHANTOM_DIR = Path("/home/user/app/phantom") DATA_RAW_DIR = PHANTOM_DIR / "data" / "raw" DATA_PROCESSED_DIR = PHANTOM_DIR / "data" / "processed" MANO_DIR = PHANTOM_DIR / "submodules" / "phantom-hamer" / "_DATA" / "data" / "mano" # 添加 Phantom 到 Python 路径 if PHANTOM_DIR.exists(): sys.path.insert(0, str(PHANTOM_DIR)) sys.path.insert(0, str(PHANTOM_DIR / "phantom")) # ========== 环境检测 ========== def check_environment(): """检查环境状态""" status = { "phantom_installed": Path("/tmp/.phantom_ready").exists(), "mano_ready": (MANO_DIR / "MANO_LEFT.pkl").exists() and (MANO_DIR / "MANO_RIGHT.pkl").exists(), "sample_data": (DATA_RAW_DIR / "pick_and_place").exists(), "cuda_available": False, "gpu_name": None } try: import torch status["cuda_available"] = torch.cuda.is_available() if status["cuda_available"]: status["gpu_name"] = torch.cuda.get_device_name(0) except: pass return status def get_status_text(): """获取状态文本""" status = check_environment() lines = [] lines.append("=" * 40) lines.append("环境状态") lines.append("=" * 40) lines.append(f"Phantom 安装: {'✅' if status['phantom_installed'] else '❌ 首次运行需初始化'}") lines.append(f"MANO 模型: {'✅' if status['mano_ready'] else '❌ 请上传 MANO 模型文件'}") lines.append(f"示例数据: {'✅' if status['sample_data'] else '⏳ 将自动下载'}") lines.append(f"CUDA: {'✅ ' + (status['gpu_name'] or '') if status['cuda_available'] else '⏳ GPU 将在处理时分配'}") lines.append("=" * 40) return "\n".join(lines) # ========== MANO 模型上传 ========== def upload_mano_files(left_file, right_file): """上传 MANO 模型文件""" MANO_DIR.mkdir(parents=True, exist_ok=True) messages = [] if left_file is not None: dest = MANO_DIR / "MANO_LEFT.pkl" shutil.copy(left_file.name, dest) messages.append(f"✅ MANO_LEFT.pkl 已保存") if right_file is not None: dest = MANO_DIR / "MANO_RIGHT.pkl" shutil.copy(right_file.name, dest) messages.append(f"✅ MANO_RIGHT.pkl 已保存") if not messages: return "⚠️ 请选择文件上传" return "\n".join(messages) + "\n\n" + get_status_text() # ========== 初始化环境 ========== def initialize_environment(progress=gr.Progress()): """初始化 Phantom 环境""" if Path("/tmp/.phantom_ready").exists(): return "✅ 环境已就绪\n\n" + get_status_text() progress(0, desc="开始初始化...") setup_script = Path("/home/user/app/setup.sh") if not setup_script.exists(): return "❌ setup.sh 不存在" try: # 运行 setup.sh progress(0.1, desc="运行安装脚本...") process = subprocess.Popen( ["bash", str(setup_script)], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, bufsize=1 ) output_lines = [] for line in iter(process.stdout.readline, ''): output_lines.append(line.strip()) if len(output_lines) > 50: output_lines = output_lines[-50:] # 保留最后 50 行 process.wait() if process.returncode == 0: progress(1.0, desc="完成!") return "✅ 初始化完成!\n\n" + "\n".join(output_lines[-20:]) + "\n\n" + get_status_text() else: return f"❌ 初始化失败 (返回码: {process.returncode})\n\n" + "\n".join(output_lines[-30:]) except Exception as e: return f"❌ 初始化错误: {str(e)}" # ========== 视频处理 ========== @spaces.GPU(duration=300) def process_video( video_file, robot_type, target_hand, processing_mode, use_sample_data, progress=gr.Progress() ): """ 处理视频 - 将人类手部转换为机器人 """ import torch # 状态信息 status_lines = [] # GPU 检查 if torch.cuda.is_available(): gpu = torch.cuda.get_device_name(0) status_lines.append(f"✅ GPU: {gpu}") status_lines.append(f" VRAM: {torch.cuda.get_device_properties(0).total_memory / 1e9:.1f} GB") else: status_lines.append("❌ GPU 不可用") return None, None, "\n".join(status_lines) # 检查环境 if not Path("/tmp/.phantom_ready").exists(): status_lines.append("❌ 请先点击「初始化环境」按钮") return None, None, "\n".join(status_lines) # 检查 MANO if not (MANO_DIR / "MANO_LEFT.pkl").exists(): status_lines.append("❌ 请先上传 MANO 模型文件") return None, None, "\n".join(status_lines) progress(0.1, desc="准备处理...") # 确定输入数据 if use_sample_data: demo_name = "pick_and_place" data_root = str(DATA_RAW_DIR) status_lines.append(f"📂 使用示例数据: {demo_name}") else: if video_file is None: status_lines.append("❌ 请上传视频或选择使用示例数据") return None, None, "\n".join(status_lines) # 创建临时目录存放上传的视频 demo_name = "user_upload" user_data_dir = DATA_RAW_DIR / demo_name / "0" user_data_dir.mkdir(parents=True, exist_ok=True) # 复制视频到正确位置 video_dest = user_data_dir / "video.mkv" shutil.copy(video_file, video_dest) data_root = str(DATA_RAW_DIR) status_lines.append(f"📂 处理上传视频: {video_file}") status_lines.append(f"🤖 机器人类型: {robot_type}") status_lines.append(f"✋ 目标手部: {target_hand}") status_lines.append(f"⚙️ 处理模式: {processing_mode}") status_lines.append("-" * 40) progress(0.2, desc="开始处理...") # 构建处理命令 cmd = [ sys.executable, str(PHANTOM_DIR / "phantom" / "process_data.py"), f"demo_name={demo_name}", f"data_root_dir={data_root}", f"processed_data_root_dir={str(DATA_PROCESSED_DIR)}", f"mode={processing_mode}", f"robot={robot_type}", f"target_hand={target_hand}", "bimanual_setup=single_arm", "demo_num=0", # 只处理第一个 demo ] status_lines.append(f"命令: {' '.join(cmd)}") try: # 运行处理 progress(0.3, desc="处理中...") process = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, cwd=str(PHANTOM_DIR / "phantom"), env={**os.environ, "PYTHONPATH": str(PHANTOM_DIR)} ) output_lines = [] for line in iter(process.stdout.readline, ''): line = line.strip() if line: output_lines.append(line) # 更新进度 if "BBOX" in line: progress(0.4, desc="检测边界框...") elif "HAND2D" in line: progress(0.5, desc="提取2D手部姿态...") elif "SEGMENTATION" in line: progress(0.6, desc="分割手臂...") elif "ACTION" in line: progress(0.7, desc="提取动作...") elif "INPAINT" in line: progress(0.8, desc="视频修复...") elif "ROBOT" in line: progress(0.9, desc="叠加机器人...") process.wait() progress(1.0, desc="完成!") # 添加处理输出 status_lines.append("-" * 40) status_lines.append("处理日志 (最后 20 行):") status_lines.extend(output_lines[-20:]) # 查找输出文件 output_video = None output_data = None processed_dir = DATA_PROCESSED_DIR / demo_name / "0" # 查找生成的视频 video_pattern = f"video_overlay_{robot_type}_single_arm.mkv" for f in processed_dir.glob("**/*.mkv"): if robot_type.lower() in f.name.lower(): output_video = str(f) break # 查找训练数据 for f in processed_dir.glob("**/training_data*.npz"): output_data = str(f) break if output_video: status_lines.append(f"\n✅ 输出视频: {output_video}") if output_data: status_lines.append(f"✅ 训练数据: {output_data}") if process.returncode == 0: status_lines.insert(0, "✅ 处理完成!") else: status_lines.insert(0, f"⚠️ 处理完成但有警告 (返回码: {process.returncode})") return output_video, output_data, "\n".join(status_lines) except Exception as e: import traceback status_lines.append(f"\n❌ 处理错误: {str(e)}") status_lines.append(traceback.format_exc()) return None, None, "\n".join(status_lines) # ========== Gradio 界面 ========== with gr.Blocks( title="Phantom - 机器人视频生成器", theme=gr.themes.Soft() ) as demo: gr.Markdown(""" # 🤖 Phantom - 将人类视频转换为机器人演示 **论文**: [Phantom: Training Robots Without Robots Using Only Human Videos](https://phantom-human-videos.github.io/) 将人类手部操作视频自动转换为机器人演示数据,用于训练机器人策略。 """) with gr.Tabs(): # ========== 环境设置 Tab ========== with gr.TabItem("1️⃣ 环境设置"): gr.Markdown(""" ### 首次使用需要完成以下步骤: 1. **初始化环境** - 安装依赖和下载模型 (首次约 5-10 分钟) 2. **上传 MANO 模型** - 需要从官网注册下载 """) with gr.Row(): with gr.Column(): init_btn = gr.Button("🔧 初始化环境", variant="primary", size="lg") init_output = gr.Textbox( label="初始化状态", lines=15, value=get_status_text() ) with gr.Column(): gr.Markdown(""" ### MANO 模型下载 1. 访问 [MANO 官网](https://mano.is.tue.mpg.de/) 2. 注册账号并下载模型 3. 上传 `MANO_LEFT.pkl` 和 `MANO_RIGHT.pkl` """) mano_left = gr.File(label="MANO_LEFT.pkl", file_types=[".pkl"]) mano_right = gr.File(label="MANO_RIGHT.pkl", file_types=[".pkl"]) upload_btn = gr.Button("📤 上传 MANO 模型") upload_output = gr.Textbox(label="上传状态", lines=5) init_btn.click(fn=initialize_environment, outputs=init_output) upload_btn.click(fn=upload_mano_files, inputs=[mano_left, mano_right], outputs=upload_output) # ========== 视频处理 Tab ========== with gr.TabItem("2️⃣ 视频处理"): with gr.Row(): with gr.Column(): gr.Markdown("### 输入设置") use_sample = gr.Checkbox( label="使用示例数据 (pick_and_place)", value=True, info="推荐首次使用时勾选,使用预置的示例视频" ) video_input = gr.Video( label="或上传自己的视频", interactive=True ) robot_type = gr.Dropdown( choices=["Panda", "Kinova3", "UR5e", "IIWA", "Jaco"], value="Panda", label="机器人类型" ) target_hand = gr.Radio( choices=["left", "right"], value="left", label="目标手部" ) processing_mode = gr.Dropdown( choices=[ "bbox", "hand2d", "arm_segmentation", "hand_inpaint", "robot_inpaint", "all" ], value="bbox", label="处理模式", info="建议逐步运行: bbox -> hand2d -> arm_segmentation -> hand_inpaint -> robot_inpaint" ) process_btn = gr.Button("🚀 开始处理", variant="primary", size="lg") with gr.Column(): gr.Markdown("### 输出结果") video_output = gr.Video(label="生成的机器人视频") data_output = gr.File(label="训练数据 (NPZ)") status_output = gr.Textbox(label="处理状态", lines=20) process_btn.click( fn=process_video, inputs=[video_input, robot_type, target_hand, processing_mode, use_sample], outputs=[video_output, data_output, status_output] ) # ========== 说明 Tab ========== with gr.TabItem("📖 说明"): gr.Markdown(""" ## 处理流程 Phantom 将人类手部视频转换为机器人演示数据,处理步骤: | 步骤 | 模式 | 描述 | |------|------|------| | 1 | `bbox` | 检测手部边界框 | | 2 | `hand2d` | 提取 2D 手部姿态 | | 3 | `arm_segmentation` | 分割人类手臂 | | 4 | `hand_inpaint` | 移除手臂并修复背景 | | 5 | `robot_inpaint` | 叠加虚拟机器人 | ## 输入要求 - **视频格式**: MKV, MP4 等常见格式 - **分辨率**: 推荐 1080p - **内容**: 单手操作视频,手部需清晰可见 ## GPU Zero 限制 - 单次处理时间限制: 300 秒 - 建议逐步运行各处理模式 - 复杂视频可能需要多次处理 ## 参考资料 - [Phantom 论文](https://arxiv.org/abs/2503.00779) - [GitHub 仓库](https://github.com/MarionLepert/phantom) - [MANO 手部模型](https://mano.is.tue.mpg.de/) """) # 启动 if __name__ == "__main__": demo.queue().launch()