File size: 8,652 Bytes
0cfefd2 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 | """Sandbox 真实数据微验证脚本。
由于 NVIDIA Cosmos-Drive-Dreams 数据集的 ``cosmos_synthetic`` 是一份切成 17
个分卷(共 ~700 GB)的 ``split`` 二进制,单独下载某一分卷无法解压出 mp4。
因此本脚本采用混合方案:
1. 用官方 ``download.py --file_types lidar --limit 1`` 拉下 1 个 clip 的
全部真实标签(所有 common 文件夹 + lidar_raw),约 50-200 MB;
2. 把每个 ``.tar`` 解压到 ``labels/{clip_id}/{folder}/`` 结构,匹配
``wjad.data.cosmos_dataset`` 期待的布局;
3. 用 ``imageio`` 合成一个随机噪声 mp4 占位真实合成视频
(文件名 ``{clip_id}_{chunk_id}_Sunny.mp4``,121 帧,分辨率 1024×768);
4. 调用 ``wjad.train.runner_local --tiny --max_steps 4`` 跑 4 步真实标签 +
伪造视觉的训练。
这样能验证:
- 数据集索引(``build_clip_index``)
- 标签解析(``all_object_info`` JSON、SE(3) pose、f-theta 内参)
- LIDAR 加载与遮挡过滤
- Hungarian 匹配 + DETR loss
- 端到端 forward / GradNorm / PCGrad / 反传
但不会验证 DINOv3 在真实图像上的语义提取(视觉是噪声,不会收敛)。
"""
from __future__ import annotations
import os
import shutil
import subprocess
import sys
import tarfile
import urllib.request
from pathlib import Path
ROOT = Path(__file__).resolve().parent.parent
sys.path.insert(0, str(ROOT / "src"))
DATA_ROOT = Path(os.environ.get("WJAD_DATA_ROOT", ROOT / "data" / "cosmos"))
NV_DOWNLOAD_URL = (
"https://raw.githubusercontent.com/nv-tlabs/Cosmos-Drive-Dreams/main/scripts/download.py"
)
def _print_section(title: str) -> None:
bar = "=" * 60
print(f"\n{bar}\n{title}\n{bar}", flush=True)
def step1_download_labels() -> None:
"""用 NVIDIA 官方脚本下载 1 个 clip 的标签 + lidar。"""
_print_section("STEP 1 下载真实标签(1 个 clip)")
DATA_ROOT.mkdir(parents=True, exist_ok=True)
nv_script = DATA_ROOT / ".nvidia_download.py"
if not nv_script.exists():
print(f"[download] 取 NVIDIA download.py -> {nv_script}", flush=True)
with urllib.request.urlopen(NV_DOWNLOAD_URL) as r, open(nv_script, "wb") as f:
f.write(r.read())
# 同时拉 lidar + hdmap:``hdmap`` 类别会触发 9 个 3d_* 文件夹下载,
# 配合 common 文件夹一起拿,覆盖动态 + 结构化两类标签。
cmd = [
sys.executable,
str(nv_script),
"--odir", str(DATA_ROOT),
"--file_types", "lidar,hdmap",
"--workers", "4",
"--limit", "1",
]
print(f"$ {' '.join(cmd)}", flush=True)
rc = subprocess.call(cmd)
if rc != 0:
sys.exit(f"download.py 失败 rc={rc}")
def _hoist_single_subdir(out_dir: Path) -> None:
"""若解压结果仅为「单个子目录、顶层无文件」,把子目录内容抬到 out_dir(常见 tar 布局)。"""
if not out_dir.is_dir():
return
subs = [p for p in out_dir.iterdir() if p.is_dir()]
files = [p for p in out_dir.iterdir() if p.is_file()]
if len(subs) == 1 and not files:
child = subs[0]
for item in child.iterdir():
dest = out_dir / item.name
if dest.exists():
continue
shutil.move(str(item), str(dest))
try:
child.rmdir()
except OSError:
pass
def step2_reorganize_labels() -> str:
"""把每个 common 文件夹的 .tar 解压到 ``labels/{clip_id}/{folder}/``。
返回挑选出的 ``clip_id``(去掉 ``_{start}_{end}`` 后缀)。
"""
_print_section("STEP 2 解压标签到 labels/<clip_id>/<folder> 布局")
common_folders = [
"all_object_info",
"captions",
"car_mask_coarse",
"ftheta_intrinsic",
"pinhole_intrinsic",
"pose",
"vehicle_pose",
"lidar_raw",
# HDMap 9 类
"3d_lanes",
"3d_lanelines",
"3d_road_boundaries",
"3d_wait_lines",
"3d_crosswalks",
"3d_road_markings",
"3d_poles",
"3d_traffic_lights",
"3d_traffic_signs",
]
clip_id_full: str | None = None # {clip_id}_{start}_{end}
clip_id: str | None = None
for folder in common_folders:
src = DATA_ROOT / folder
if not src.exists():
print(f" - skip {folder} (not downloaded)", flush=True)
continue
tars = sorted(src.glob("*.tar"))
if not tars:
print(f" - skip {folder} (no .tar)", flush=True)
continue
if clip_id_full is None:
clip_id_full = tars[0].stem
clip_id = clip_id_full.rsplit("_", 2)[0]
print(f" -> chosen clip_id_full = {clip_id_full}", flush=True)
print(f" -> video / symlink clip_id = {clip_id}", flush=True)
use_tars = [t for t in tars if t.stem == clip_id_full]
if not use_tars:
print(f" - skip {folder}: 无与 {clip_id_full} 同名的 tar(避免解压错 clip)", flush=True)
continue
tar_path = use_tars[0]
# 目标目录
out_dir = DATA_ROOT / "labels" / clip_id_full / folder
out_dir.mkdir(parents=True, exist_ok=True)
with tarfile.open(tar_path, "r") as tf:
tf.extractall(out_dir)
_hoist_single_subdir(out_dir)
# 若仍嵌套一层 modality 名(ftheta_intrinsic/ftheta_intrinsic/...)
_hoist_single_subdir(out_dir)
# 列几个样例
members = sorted(out_dir.rglob("*"))[:3]
for m in members:
print(f" {m.relative_to(DATA_ROOT)}", flush=True)
print(f" - {folder}: {len(list(out_dir.rglob('*')))} files", flush=True)
if clip_id_full is None:
sys.exit("没有下到任何标签 tar,确认 HF_TOKEN 是否能访问 NVIDIA 数据集")
# 兼容 cosmos_dataset.py:它从 labels/{clip_id}/ 读,但实际下载用的是
# {clip_id}_{start}_{end} 作为目录名。这里软链一份名为纯 clip_id 的目录。
short_dir = DATA_ROOT / "labels" / clip_id # type: ignore[arg-type]
if not short_dir.exists():
try:
short_dir.symlink_to(DATA_ROOT / "labels" / clip_id_full, target_is_directory=True)
except OSError:
shutil.copytree(DATA_ROOT / "labels" / clip_id_full, short_dir)
return clip_id # type: ignore[return-value]
def step3_make_fake_video(clip_id: str) -> None:
"""合成 121 帧随机 mp4 模拟 ``cosmos_synthetic`` 视频。"""
_print_section("STEP 3 合成占位视频(随机噪声 mp4)")
import numpy as np
import cv2
syn_dir = DATA_ROOT / "synthetic" / "single_view" / "generation"
syn_dir.mkdir(parents=True, exist_ok=True)
out_path = syn_dir / f"{clip_id}_0_Sunny.mp4"
H, W, T = 768, 1024, 121 # 顶部裁剪后 384,原始 768
rng = np.random.default_rng(0)
fourcc = cv2.VideoWriter_fourcc(*"mp4v")
writer = cv2.VideoWriter(str(out_path), fourcc, 30.0, (W, H))
if not writer.isOpened():
sys.exit(f"无法打开 mp4 写入器(缺 codec?): {out_path}")
for _ in range(T):
frame = rng.integers(0, 256, size=(H, W, 3), dtype=np.uint8)
writer.write(frame)
writer.release()
print(f" 写入 {out_path} ({out_path.stat().st_size / 1024**2:.1f} MB)", flush=True)
def step4_run_trainer(clip_id: str) -> None:
"""跑 runner_local --tiny --max_steps 4。"""
_print_section("STEP 4 跑 trainer(真实标签 + 伪造视觉)")
cmd = [
sys.executable,
"-m",
"wjad.train.runner_local",
"--config", str(ROOT / "configs" / "default.yaml"),
"--data_root", str(DATA_ROOT),
"--dinov3_path", str(ROOT / "dinov3-vitb16-pretrain-lvd1689m"),
"--device", "cuda" if _has_cuda() else "cpu",
"--tiny",
"--max_steps", "4",
]
env = os.environ.copy()
env["PYTHONPATH"] = str(ROOT / "src") + os.pathsep + env.get("PYTHONPATH", "")
print(f"$ {' '.join(cmd)}", flush=True)
rc = subprocess.call(cmd, env=env)
if rc != 0:
sys.exit(f"trainer 失败 rc={rc}")
def _has_cuda() -> bool:
try:
import torch
return torch.cuda.is_available()
except Exception:
return False
def main() -> None:
_print_section("WJAD Sandbox Real-Data Tiny Test")
print(f"DATA_ROOT = {DATA_ROOT}", flush=True)
step1_download_labels()
clip_id = step2_reorganize_labels()
step3_make_fake_video(clip_id)
step4_run_trainer(clip_id)
_print_section("DONE")
if __name__ == "__main__":
main()
|