"""Sandbox 真实数据微验证脚本。 由于 NVIDIA Cosmos-Drive-Dreams 数据集的 ``cosmos_synthetic`` 是一份切成 17 个分卷(共 ~700 GB)的 ``split`` 二进制,单独下载某一分卷无法解压出 mp4。 因此本脚本采用混合方案: 1. 用官方 ``download.py --file_types lidar --limit 1`` 拉下 1 个 clip 的 全部真实标签(所有 common 文件夹 + lidar_raw),约 50-200 MB; 2. 把每个 ``.tar`` 解压到 ``labels/{clip_id}/{folder}/`` 结构,匹配 ``wjad.data.cosmos_dataset`` 期待的布局; 3. 用 ``imageio`` 合成一个随机噪声 mp4 占位真实合成视频 (文件名 ``{clip_id}_{chunk_id}_Sunny.mp4``,121 帧,分辨率 1024×768); 4. 调用 ``wjad.train.runner_local --tiny --max_steps 4`` 跑 4 步真实标签 + 伪造视觉的训练。 这样能验证: - 数据集索引(``build_clip_index``) - 标签解析(``all_object_info`` JSON、SE(3) pose、f-theta 内参) - LIDAR 加载与遮挡过滤 - Hungarian 匹配 + DETR loss - 端到端 forward / GradNorm / PCGrad / 反传 但不会验证 DINOv3 在真实图像上的语义提取(视觉是噪声,不会收敛)。 """ from __future__ import annotations import os import shutil import subprocess import sys import tarfile import urllib.request from pathlib import Path ROOT = Path(__file__).resolve().parent.parent sys.path.insert(0, str(ROOT / "src")) DATA_ROOT = Path(os.environ.get("WJAD_DATA_ROOT", ROOT / "data" / "cosmos")) NV_DOWNLOAD_URL = ( "https://raw.githubusercontent.com/nv-tlabs/Cosmos-Drive-Dreams/main/scripts/download.py" ) def _print_section(title: str) -> None: bar = "=" * 60 print(f"\n{bar}\n{title}\n{bar}", flush=True) def step1_download_labels() -> None: """用 NVIDIA 官方脚本下载 1 个 clip 的标签 + lidar。""" _print_section("STEP 1 下载真实标签(1 个 clip)") DATA_ROOT.mkdir(parents=True, exist_ok=True) nv_script = DATA_ROOT / ".nvidia_download.py" if not nv_script.exists(): print(f"[download] 取 NVIDIA download.py -> {nv_script}", flush=True) with urllib.request.urlopen(NV_DOWNLOAD_URL) as r, open(nv_script, "wb") as f: f.write(r.read()) # 同时拉 lidar + hdmap:``hdmap`` 类别会触发 9 个 3d_* 文件夹下载, # 配合 common 文件夹一起拿,覆盖动态 + 结构化两类标签。 cmd = [ sys.executable, str(nv_script), "--odir", str(DATA_ROOT), "--file_types", "lidar,hdmap", "--workers", "4", "--limit", "1", ] print(f"$ {' '.join(cmd)}", flush=True) rc = subprocess.call(cmd) if rc != 0: sys.exit(f"download.py 失败 rc={rc}") def _hoist_single_subdir(out_dir: Path) -> None: """若解压结果仅为「单个子目录、顶层无文件」,把子目录内容抬到 out_dir(常见 tar 布局)。""" if not out_dir.is_dir(): return subs = [p for p in out_dir.iterdir() if p.is_dir()] files = [p for p in out_dir.iterdir() if p.is_file()] if len(subs) == 1 and not files: child = subs[0] for item in child.iterdir(): dest = out_dir / item.name if dest.exists(): continue shutil.move(str(item), str(dest)) try: child.rmdir() except OSError: pass def step2_reorganize_labels() -> str: """把每个 common 文件夹的 .tar 解压到 ``labels/{clip_id}/{folder}/``。 返回挑选出的 ``clip_id``(去掉 ``_{start}_{end}`` 后缀)。 """ _print_section("STEP 2 解压标签到 labels// 布局") common_folders = [ "all_object_info", "captions", "car_mask_coarse", "ftheta_intrinsic", "pinhole_intrinsic", "pose", "vehicle_pose", "lidar_raw", # HDMap 9 类 "3d_lanes", "3d_lanelines", "3d_road_boundaries", "3d_wait_lines", "3d_crosswalks", "3d_road_markings", "3d_poles", "3d_traffic_lights", "3d_traffic_signs", ] clip_id_full: str | None = None # {clip_id}_{start}_{end} clip_id: str | None = None for folder in common_folders: src = DATA_ROOT / folder if not src.exists(): print(f" - skip {folder} (not downloaded)", flush=True) continue tars = sorted(src.glob("*.tar")) if not tars: print(f" - skip {folder} (no .tar)", flush=True) continue if clip_id_full is None: clip_id_full = tars[0].stem clip_id = clip_id_full.rsplit("_", 2)[0] print(f" -> chosen clip_id_full = {clip_id_full}", flush=True) print(f" -> video / symlink clip_id = {clip_id}", flush=True) use_tars = [t for t in tars if t.stem == clip_id_full] if not use_tars: print(f" - skip {folder}: 无与 {clip_id_full} 同名的 tar(避免解压错 clip)", flush=True) continue tar_path = use_tars[0] # 目标目录 out_dir = DATA_ROOT / "labels" / clip_id_full / folder out_dir.mkdir(parents=True, exist_ok=True) with tarfile.open(tar_path, "r") as tf: tf.extractall(out_dir) _hoist_single_subdir(out_dir) # 若仍嵌套一层 modality 名(ftheta_intrinsic/ftheta_intrinsic/...) _hoist_single_subdir(out_dir) # 列几个样例 members = sorted(out_dir.rglob("*"))[:3] for m in members: print(f" {m.relative_to(DATA_ROOT)}", flush=True) print(f" - {folder}: {len(list(out_dir.rglob('*')))} files", flush=True) if clip_id_full is None: sys.exit("没有下到任何标签 tar,确认 HF_TOKEN 是否能访问 NVIDIA 数据集") # 兼容 cosmos_dataset.py:它从 labels/{clip_id}/ 读,但实际下载用的是 # {clip_id}_{start}_{end} 作为目录名。这里软链一份名为纯 clip_id 的目录。 short_dir = DATA_ROOT / "labels" / clip_id # type: ignore[arg-type] if not short_dir.exists(): try: short_dir.symlink_to(DATA_ROOT / "labels" / clip_id_full, target_is_directory=True) except OSError: shutil.copytree(DATA_ROOT / "labels" / clip_id_full, short_dir) return clip_id # type: ignore[return-value] def step3_make_fake_video(clip_id: str) -> None: """合成 121 帧随机 mp4 模拟 ``cosmos_synthetic`` 视频。""" _print_section("STEP 3 合成占位视频(随机噪声 mp4)") import numpy as np import cv2 syn_dir = DATA_ROOT / "synthetic" / "single_view" / "generation" syn_dir.mkdir(parents=True, exist_ok=True) out_path = syn_dir / f"{clip_id}_0_Sunny.mp4" H, W, T = 768, 1024, 121 # 顶部裁剪后 384,原始 768 rng = np.random.default_rng(0) fourcc = cv2.VideoWriter_fourcc(*"mp4v") writer = cv2.VideoWriter(str(out_path), fourcc, 30.0, (W, H)) if not writer.isOpened(): sys.exit(f"无法打开 mp4 写入器(缺 codec?): {out_path}") for _ in range(T): frame = rng.integers(0, 256, size=(H, W, 3), dtype=np.uint8) writer.write(frame) writer.release() print(f" 写入 {out_path} ({out_path.stat().st_size / 1024**2:.1f} MB)", flush=True) def step4_run_trainer(clip_id: str) -> None: """跑 runner_local --tiny --max_steps 4。""" _print_section("STEP 4 跑 trainer(真实标签 + 伪造视觉)") cmd = [ sys.executable, "-m", "wjad.train.runner_local", "--config", str(ROOT / "configs" / "default.yaml"), "--data_root", str(DATA_ROOT), "--dinov3_path", str(ROOT / "dinov3-vitb16-pretrain-lvd1689m"), "--device", "cuda" if _has_cuda() else "cpu", "--tiny", "--max_steps", "4", ] env = os.environ.copy() env["PYTHONPATH"] = str(ROOT / "src") + os.pathsep + env.get("PYTHONPATH", "") print(f"$ {' '.join(cmd)}", flush=True) rc = subprocess.call(cmd, env=env) if rc != 0: sys.exit(f"trainer 失败 rc={rc}") def _has_cuda() -> bool: try: import torch return torch.cuda.is_available() except Exception: return False def main() -> None: _print_section("WJAD Sandbox Real-Data Tiny Test") print(f"DATA_ROOT = {DATA_ROOT}", flush=True) step1_download_labels() clip_id = step2_reorganize_labels() step3_make_fake_video(clip_id) step4_run_trainer(clip_id) _print_section("DONE") if __name__ == "__main__": main()