WJAD / scripts /sandbox_real_data.py
fuzirui's picture
Sync WJAD codebase
0cfefd2 verified
"""Sandbox 真实数据微验证脚本。
由于 NVIDIA Cosmos-Drive-Dreams 数据集的 ``cosmos_synthetic`` 是一份切成 17
个分卷(共 ~700 GB)的 ``split`` 二进制,单独下载某一分卷无法解压出 mp4。
因此本脚本采用混合方案:
1. 用官方 ``download.py --file_types lidar --limit 1`` 拉下 1 个 clip 的
全部真实标签(所有 common 文件夹 + lidar_raw),约 50-200 MB;
2. 把每个 ``.tar`` 解压到 ``labels/{clip_id}/{folder}/`` 结构,匹配
``wjad.data.cosmos_dataset`` 期待的布局;
3. 用 ``imageio`` 合成一个随机噪声 mp4 占位真实合成视频
(文件名 ``{clip_id}_{chunk_id}_Sunny.mp4``,121 帧,分辨率 1024×768);
4. 调用 ``wjad.train.runner_local --tiny --max_steps 4`` 跑 4 步真实标签 +
伪造视觉的训练。
这样能验证:
- 数据集索引(``build_clip_index``)
- 标签解析(``all_object_info`` JSON、SE(3) pose、f-theta 内参)
- LIDAR 加载与遮挡过滤
- Hungarian 匹配 + DETR loss
- 端到端 forward / GradNorm / PCGrad / 反传
但不会验证 DINOv3 在真实图像上的语义提取(视觉是噪声,不会收敛)。
"""
from __future__ import annotations
import os
import shutil
import subprocess
import sys
import tarfile
import urllib.request
from pathlib import Path
ROOT = Path(__file__).resolve().parent.parent
sys.path.insert(0, str(ROOT / "src"))
DATA_ROOT = Path(os.environ.get("WJAD_DATA_ROOT", ROOT / "data" / "cosmos"))
NV_DOWNLOAD_URL = (
"https://raw.githubusercontent.com/nv-tlabs/Cosmos-Drive-Dreams/main/scripts/download.py"
)
def _print_section(title: str) -> None:
bar = "=" * 60
print(f"\n{bar}\n{title}\n{bar}", flush=True)
def step1_download_labels() -> None:
"""用 NVIDIA 官方脚本下载 1 个 clip 的标签 + lidar。"""
_print_section("STEP 1 下载真实标签(1 个 clip)")
DATA_ROOT.mkdir(parents=True, exist_ok=True)
nv_script = DATA_ROOT / ".nvidia_download.py"
if not nv_script.exists():
print(f"[download] 取 NVIDIA download.py -> {nv_script}", flush=True)
with urllib.request.urlopen(NV_DOWNLOAD_URL) as r, open(nv_script, "wb") as f:
f.write(r.read())
# 同时拉 lidar + hdmap:``hdmap`` 类别会触发 9 个 3d_* 文件夹下载,
# 配合 common 文件夹一起拿,覆盖动态 + 结构化两类标签。
cmd = [
sys.executable,
str(nv_script),
"--odir", str(DATA_ROOT),
"--file_types", "lidar,hdmap",
"--workers", "4",
"--limit", "1",
]
print(f"$ {' '.join(cmd)}", flush=True)
rc = subprocess.call(cmd)
if rc != 0:
sys.exit(f"download.py 失败 rc={rc}")
def _hoist_single_subdir(out_dir: Path) -> None:
"""若解压结果仅为「单个子目录、顶层无文件」,把子目录内容抬到 out_dir(常见 tar 布局)。"""
if not out_dir.is_dir():
return
subs = [p for p in out_dir.iterdir() if p.is_dir()]
files = [p for p in out_dir.iterdir() if p.is_file()]
if len(subs) == 1 and not files:
child = subs[0]
for item in child.iterdir():
dest = out_dir / item.name
if dest.exists():
continue
shutil.move(str(item), str(dest))
try:
child.rmdir()
except OSError:
pass
def step2_reorganize_labels() -> str:
"""把每个 common 文件夹的 .tar 解压到 ``labels/{clip_id}/{folder}/``。
返回挑选出的 ``clip_id``(去掉 ``_{start}_{end}`` 后缀)。
"""
_print_section("STEP 2 解压标签到 labels/<clip_id>/<folder> 布局")
common_folders = [
"all_object_info",
"captions",
"car_mask_coarse",
"ftheta_intrinsic",
"pinhole_intrinsic",
"pose",
"vehicle_pose",
"lidar_raw",
# HDMap 9 类
"3d_lanes",
"3d_lanelines",
"3d_road_boundaries",
"3d_wait_lines",
"3d_crosswalks",
"3d_road_markings",
"3d_poles",
"3d_traffic_lights",
"3d_traffic_signs",
]
clip_id_full: str | None = None # {clip_id}_{start}_{end}
clip_id: str | None = None
for folder in common_folders:
src = DATA_ROOT / folder
if not src.exists():
print(f" - skip {folder} (not downloaded)", flush=True)
continue
tars = sorted(src.glob("*.tar"))
if not tars:
print(f" - skip {folder} (no .tar)", flush=True)
continue
if clip_id_full is None:
clip_id_full = tars[0].stem
clip_id = clip_id_full.rsplit("_", 2)[0]
print(f" -> chosen clip_id_full = {clip_id_full}", flush=True)
print(f" -> video / symlink clip_id = {clip_id}", flush=True)
use_tars = [t for t in tars if t.stem == clip_id_full]
if not use_tars:
print(f" - skip {folder}: 无与 {clip_id_full} 同名的 tar(避免解压错 clip)", flush=True)
continue
tar_path = use_tars[0]
# 目标目录
out_dir = DATA_ROOT / "labels" / clip_id_full / folder
out_dir.mkdir(parents=True, exist_ok=True)
with tarfile.open(tar_path, "r") as tf:
tf.extractall(out_dir)
_hoist_single_subdir(out_dir)
# 若仍嵌套一层 modality 名(ftheta_intrinsic/ftheta_intrinsic/...)
_hoist_single_subdir(out_dir)
# 列几个样例
members = sorted(out_dir.rglob("*"))[:3]
for m in members:
print(f" {m.relative_to(DATA_ROOT)}", flush=True)
print(f" - {folder}: {len(list(out_dir.rglob('*')))} files", flush=True)
if clip_id_full is None:
sys.exit("没有下到任何标签 tar,确认 HF_TOKEN 是否能访问 NVIDIA 数据集")
# 兼容 cosmos_dataset.py:它从 labels/{clip_id}/ 读,但实际下载用的是
# {clip_id}_{start}_{end} 作为目录名。这里软链一份名为纯 clip_id 的目录。
short_dir = DATA_ROOT / "labels" / clip_id # type: ignore[arg-type]
if not short_dir.exists():
try:
short_dir.symlink_to(DATA_ROOT / "labels" / clip_id_full, target_is_directory=True)
except OSError:
shutil.copytree(DATA_ROOT / "labels" / clip_id_full, short_dir)
return clip_id # type: ignore[return-value]
def step3_make_fake_video(clip_id: str) -> None:
"""合成 121 帧随机 mp4 模拟 ``cosmos_synthetic`` 视频。"""
_print_section("STEP 3 合成占位视频(随机噪声 mp4)")
import numpy as np
import cv2
syn_dir = DATA_ROOT / "synthetic" / "single_view" / "generation"
syn_dir.mkdir(parents=True, exist_ok=True)
out_path = syn_dir / f"{clip_id}_0_Sunny.mp4"
H, W, T = 768, 1024, 121 # 顶部裁剪后 384,原始 768
rng = np.random.default_rng(0)
fourcc = cv2.VideoWriter_fourcc(*"mp4v")
writer = cv2.VideoWriter(str(out_path), fourcc, 30.0, (W, H))
if not writer.isOpened():
sys.exit(f"无法打开 mp4 写入器(缺 codec?): {out_path}")
for _ in range(T):
frame = rng.integers(0, 256, size=(H, W, 3), dtype=np.uint8)
writer.write(frame)
writer.release()
print(f" 写入 {out_path} ({out_path.stat().st_size / 1024**2:.1f} MB)", flush=True)
def step4_run_trainer(clip_id: str) -> None:
"""跑 runner_local --tiny --max_steps 4。"""
_print_section("STEP 4 跑 trainer(真实标签 + 伪造视觉)")
cmd = [
sys.executable,
"-m",
"wjad.train.runner_local",
"--config", str(ROOT / "configs" / "default.yaml"),
"--data_root", str(DATA_ROOT),
"--dinov3_path", str(ROOT / "dinov3-vitb16-pretrain-lvd1689m"),
"--device", "cuda" if _has_cuda() else "cpu",
"--tiny",
"--max_steps", "4",
]
env = os.environ.copy()
env["PYTHONPATH"] = str(ROOT / "src") + os.pathsep + env.get("PYTHONPATH", "")
print(f"$ {' '.join(cmd)}", flush=True)
rc = subprocess.call(cmd, env=env)
if rc != 0:
sys.exit(f"trainer 失败 rc={rc}")
def _has_cuda() -> bool:
try:
import torch
return torch.cuda.is_available()
except Exception:
return False
def main() -> None:
_print_section("WJAD Sandbox Real-Data Tiny Test")
print(f"DATA_ROOT = {DATA_ROOT}", flush=True)
step1_download_labels()
clip_id = step2_reorganize_labels()
step3_make_fake_video(clip_id)
step4_run_trainer(clip_id)
_print_section("DONE")
if __name__ == "__main__":
main()