xfu314's picture
Add phantom project with submodules and dependencies
96da58e
"""
Phantom Video Processor - Hugging Face Space Demo
将人类手部视频转换为机器人演示数据
"""
import gradio as gr
import spaces
import subprocess
import sys
import os
import shutil
import tempfile
from pathlib import Path
# ========== 路径配置 ==========
PHANTOM_DIR = Path("/home/user/app/phantom")
DATA_RAW_DIR = PHANTOM_DIR / "data" / "raw"
DATA_PROCESSED_DIR = PHANTOM_DIR / "data" / "processed"
MANO_DIR = PHANTOM_DIR / "submodules" / "phantom-hamer" / "_DATA" / "data" / "mano"
# 添加 Phantom 到 Python 路径
if PHANTOM_DIR.exists():
sys.path.insert(0, str(PHANTOM_DIR))
sys.path.insert(0, str(PHANTOM_DIR / "phantom"))
# ========== 环境检测 ==========
def check_environment():
"""检查环境状态"""
status = {
"phantom_installed": Path("/tmp/.phantom_ready").exists(),
"mano_ready": (MANO_DIR / "MANO_LEFT.pkl").exists() and (MANO_DIR / "MANO_RIGHT.pkl").exists(),
"sample_data": (DATA_RAW_DIR / "pick_and_place").exists(),
"cuda_available": False,
"gpu_name": None
}
try:
import torch
status["cuda_available"] = torch.cuda.is_available()
if status["cuda_available"]:
status["gpu_name"] = torch.cuda.get_device_name(0)
except:
pass
return status
def get_status_text():
"""获取状态文本"""
status = check_environment()
lines = []
lines.append("=" * 40)
lines.append("环境状态")
lines.append("=" * 40)
lines.append(f"Phantom 安装: {'✅' if status['phantom_installed'] else '❌ 首次运行需初始化'}")
lines.append(f"MANO 模型: {'✅' if status['mano_ready'] else '❌ 请上传 MANO 模型文件'}")
lines.append(f"示例数据: {'✅' if status['sample_data'] else '⏳ 将自动下载'}")
lines.append(f"CUDA: {'✅ ' + (status['gpu_name'] or '') if status['cuda_available'] else '⏳ GPU 将在处理时分配'}")
lines.append("=" * 40)
return "\n".join(lines)
# ========== MANO 模型上传 ==========
def upload_mano_files(left_file, right_file):
"""上传 MANO 模型文件"""
MANO_DIR.mkdir(parents=True, exist_ok=True)
messages = []
if left_file is not None:
dest = MANO_DIR / "MANO_LEFT.pkl"
shutil.copy(left_file.name, dest)
messages.append(f"✅ MANO_LEFT.pkl 已保存")
if right_file is not None:
dest = MANO_DIR / "MANO_RIGHT.pkl"
shutil.copy(right_file.name, dest)
messages.append(f"✅ MANO_RIGHT.pkl 已保存")
if not messages:
return "⚠️ 请选择文件上传"
return "\n".join(messages) + "\n\n" + get_status_text()
# ========== 初始化环境 ==========
def initialize_environment(progress=gr.Progress()):
"""初始化 Phantom 环境"""
if Path("/tmp/.phantom_ready").exists():
return "✅ 环境已就绪\n\n" + get_status_text()
progress(0, desc="开始初始化...")
setup_script = Path("/home/user/app/setup.sh")
if not setup_script.exists():
return "❌ setup.sh 不存在"
try:
# 运行 setup.sh
progress(0.1, desc="运行安装脚本...")
process = subprocess.Popen(
["bash", str(setup_script)],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
bufsize=1
)
output_lines = []
for line in iter(process.stdout.readline, ''):
output_lines.append(line.strip())
if len(output_lines) > 50:
output_lines = output_lines[-50:] # 保留最后 50 行
process.wait()
if process.returncode == 0:
progress(1.0, desc="完成!")
return "✅ 初始化完成!\n\n" + "\n".join(output_lines[-20:]) + "\n\n" + get_status_text()
else:
return f"❌ 初始化失败 (返回码: {process.returncode})\n\n" + "\n".join(output_lines[-30:])
except Exception as e:
return f"❌ 初始化错误: {str(e)}"
# ========== 视频处理 ==========
@spaces.GPU(duration=300)
def process_video(
video_file,
robot_type,
target_hand,
processing_mode,
use_sample_data,
progress=gr.Progress()
):
"""
处理视频 - 将人类手部转换为机器人
"""
import torch
# 状态信息
status_lines = []
# GPU 检查
if torch.cuda.is_available():
gpu = torch.cuda.get_device_name(0)
status_lines.append(f"✅ GPU: {gpu}")
status_lines.append(f" VRAM: {torch.cuda.get_device_properties(0).total_memory / 1e9:.1f} GB")
else:
status_lines.append("❌ GPU 不可用")
return None, None, "\n".join(status_lines)
# 检查环境
if not Path("/tmp/.phantom_ready").exists():
status_lines.append("❌ 请先点击「初始化环境」按钮")
return None, None, "\n".join(status_lines)
# 检查 MANO
if not (MANO_DIR / "MANO_LEFT.pkl").exists():
status_lines.append("❌ 请先上传 MANO 模型文件")
return None, None, "\n".join(status_lines)
progress(0.1, desc="准备处理...")
# 确定输入数据
if use_sample_data:
demo_name = "pick_and_place"
data_root = str(DATA_RAW_DIR)
status_lines.append(f"📂 使用示例数据: {demo_name}")
else:
if video_file is None:
status_lines.append("❌ 请上传视频或选择使用示例数据")
return None, None, "\n".join(status_lines)
# 创建临时目录存放上传的视频
demo_name = "user_upload"
user_data_dir = DATA_RAW_DIR / demo_name / "0"
user_data_dir.mkdir(parents=True, exist_ok=True)
# 复制视频到正确位置
video_dest = user_data_dir / "video.mkv"
shutil.copy(video_file, video_dest)
data_root = str(DATA_RAW_DIR)
status_lines.append(f"📂 处理上传视频: {video_file}")
status_lines.append(f"🤖 机器人类型: {robot_type}")
status_lines.append(f"✋ 目标手部: {target_hand}")
status_lines.append(f"⚙️ 处理模式: {processing_mode}")
status_lines.append("-" * 40)
progress(0.2, desc="开始处理...")
# 构建处理命令
cmd = [
sys.executable,
str(PHANTOM_DIR / "phantom" / "process_data.py"),
f"demo_name={demo_name}",
f"data_root_dir={data_root}",
f"processed_data_root_dir={str(DATA_PROCESSED_DIR)}",
f"mode={processing_mode}",
f"robot={robot_type}",
f"target_hand={target_hand}",
"bimanual_setup=single_arm",
"demo_num=0", # 只处理第一个 demo
]
status_lines.append(f"命令: {' '.join(cmd)}")
try:
# 运行处理
progress(0.3, desc="处理中...")
process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
cwd=str(PHANTOM_DIR / "phantom"),
env={**os.environ, "PYTHONPATH": str(PHANTOM_DIR)}
)
output_lines = []
for line in iter(process.stdout.readline, ''):
line = line.strip()
if line:
output_lines.append(line)
# 更新进度
if "BBOX" in line:
progress(0.4, desc="检测边界框...")
elif "HAND2D" in line:
progress(0.5, desc="提取2D手部姿态...")
elif "SEGMENTATION" in line:
progress(0.6, desc="分割手臂...")
elif "ACTION" in line:
progress(0.7, desc="提取动作...")
elif "INPAINT" in line:
progress(0.8, desc="视频修复...")
elif "ROBOT" in line:
progress(0.9, desc="叠加机器人...")
process.wait()
progress(1.0, desc="完成!")
# 添加处理输出
status_lines.append("-" * 40)
status_lines.append("处理日志 (最后 20 行):")
status_lines.extend(output_lines[-20:])
# 查找输出文件
output_video = None
output_data = None
processed_dir = DATA_PROCESSED_DIR / demo_name / "0"
# 查找生成的视频
video_pattern = f"video_overlay_{robot_type}_single_arm.mkv"
for f in processed_dir.glob("**/*.mkv"):
if robot_type.lower() in f.name.lower():
output_video = str(f)
break
# 查找训练数据
for f in processed_dir.glob("**/training_data*.npz"):
output_data = str(f)
break
if output_video:
status_lines.append(f"\n✅ 输出视频: {output_video}")
if output_data:
status_lines.append(f"✅ 训练数据: {output_data}")
if process.returncode == 0:
status_lines.insert(0, "✅ 处理完成!")
else:
status_lines.insert(0, f"⚠️ 处理完成但有警告 (返回码: {process.returncode})")
return output_video, output_data, "\n".join(status_lines)
except Exception as e:
import traceback
status_lines.append(f"\n❌ 处理错误: {str(e)}")
status_lines.append(traceback.format_exc())
return None, None, "\n".join(status_lines)
# ========== Gradio 界面 ==========
with gr.Blocks(
title="Phantom - 机器人视频生成器",
theme=gr.themes.Soft()
) as demo:
gr.Markdown("""
# 🤖 Phantom - 将人类视频转换为机器人演示
**论文**: [Phantom: Training Robots Without Robots Using Only Human Videos](https://phantom-human-videos.github.io/)
将人类手部操作视频自动转换为机器人演示数据,用于训练机器人策略。
""")
with gr.Tabs():
# ========== 环境设置 Tab ==========
with gr.TabItem("1️⃣ 环境设置"):
gr.Markdown("""
### 首次使用需要完成以下步骤:
1. **初始化环境** - 安装依赖和下载模型 (首次约 5-10 分钟)
2. **上传 MANO 模型** - 需要从官网注册下载
""")
with gr.Row():
with gr.Column():
init_btn = gr.Button("🔧 初始化环境", variant="primary", size="lg")
init_output = gr.Textbox(
label="初始化状态",
lines=15,
value=get_status_text()
)
with gr.Column():
gr.Markdown("""
### MANO 模型下载
1. 访问 [MANO 官网](https://mano.is.tue.mpg.de/)
2. 注册账号并下载模型
3. 上传 `MANO_LEFT.pkl` 和 `MANO_RIGHT.pkl`
""")
mano_left = gr.File(label="MANO_LEFT.pkl", file_types=[".pkl"])
mano_right = gr.File(label="MANO_RIGHT.pkl", file_types=[".pkl"])
upload_btn = gr.Button("📤 上传 MANO 模型")
upload_output = gr.Textbox(label="上传状态", lines=5)
init_btn.click(fn=initialize_environment, outputs=init_output)
upload_btn.click(fn=upload_mano_files, inputs=[mano_left, mano_right], outputs=upload_output)
# ========== 视频处理 Tab ==========
with gr.TabItem("2️⃣ 视频处理"):
with gr.Row():
with gr.Column():
gr.Markdown("### 输入设置")
use_sample = gr.Checkbox(
label="使用示例数据 (pick_and_place)",
value=True,
info="推荐首次使用时勾选,使用预置的示例视频"
)
video_input = gr.Video(
label="或上传自己的视频",
interactive=True
)
robot_type = gr.Dropdown(
choices=["Panda", "Kinova3", "UR5e", "IIWA", "Jaco"],
value="Panda",
label="机器人类型"
)
target_hand = gr.Radio(
choices=["left", "right"],
value="left",
label="目标手部"
)
processing_mode = gr.Dropdown(
choices=[
"bbox",
"hand2d",
"arm_segmentation",
"hand_inpaint",
"robot_inpaint",
"all"
],
value="bbox",
label="处理模式",
info="建议逐步运行: bbox -> hand2d -> arm_segmentation -> hand_inpaint -> robot_inpaint"
)
process_btn = gr.Button("🚀 开始处理", variant="primary", size="lg")
with gr.Column():
gr.Markdown("### 输出结果")
video_output = gr.Video(label="生成的机器人视频")
data_output = gr.File(label="训练数据 (NPZ)")
status_output = gr.Textbox(label="处理状态", lines=20)
process_btn.click(
fn=process_video,
inputs=[video_input, robot_type, target_hand, processing_mode, use_sample],
outputs=[video_output, data_output, status_output]
)
# ========== 说明 Tab ==========
with gr.TabItem("📖 说明"):
gr.Markdown("""
## 处理流程
Phantom 将人类手部视频转换为机器人演示数据,处理步骤:
| 步骤 | 模式 | 描述 |
|------|------|------|
| 1 | `bbox` | 检测手部边界框 |
| 2 | `hand2d` | 提取 2D 手部姿态 |
| 3 | `arm_segmentation` | 分割人类手臂 |
| 4 | `hand_inpaint` | 移除手臂并修复背景 |
| 5 | `robot_inpaint` | 叠加虚拟机器人 |
## 输入要求
- **视频格式**: MKV, MP4 等常见格式
- **分辨率**: 推荐 1080p
- **内容**: 单手操作视频,手部需清晰可见
## GPU Zero 限制
- 单次处理时间限制: 300 秒
- 建议逐步运行各处理模式
- 复杂视频可能需要多次处理
## 参考资料
- [Phantom 论文](https://arxiv.org/abs/2503.00779)
- [GitHub 仓库](https://github.com/MarionLepert/phantom)
- [MANO 手部模型](https://mano.is.tue.mpg.de/)
""")
# 启动
if __name__ == "__main__":
demo.queue().launch()