| """Sandbox 真实数据微验证脚本。 |
| |
| 由于 NVIDIA Cosmos-Drive-Dreams 数据集的 ``cosmos_synthetic`` 是一份切成 17 |
| 个分卷(共 ~700 GB)的 ``split`` 二进制,单独下载某一分卷无法解压出 mp4。 |
| 因此本脚本采用混合方案: |
| |
| 1. 用官方 ``download.py --file_types lidar --limit 1`` 拉下 1 个 clip 的 |
| 全部真实标签(所有 common 文件夹 + lidar_raw),约 50-200 MB; |
| 2. 把每个 ``.tar`` 解压到 ``labels/{clip_id}/{folder}/`` 结构,匹配 |
| ``wjad.data.cosmos_dataset`` 期待的布局; |
| 3. 用 ``imageio`` 合成一个随机噪声 mp4 占位真实合成视频 |
| (文件名 ``{clip_id}_{chunk_id}_Sunny.mp4``,121 帧,分辨率 1024×768); |
| 4. 调用 ``wjad.train.runner_local --tiny --max_steps 4`` 跑 4 步真实标签 + |
| 伪造视觉的训练。 |
| |
| 这样能验证: |
| - 数据集索引(``build_clip_index``) |
| - 标签解析(``all_object_info`` JSON、SE(3) pose、f-theta 内参) |
| - LIDAR 加载与遮挡过滤 |
| - Hungarian 匹配 + DETR loss |
| - 端到端 forward / GradNorm / PCGrad / 反传 |
| |
| 但不会验证 DINOv3 在真实图像上的语义提取(视觉是噪声,不会收敛)。 |
| """ |
|
|
| from __future__ import annotations |
|
|
| import os |
| import shutil |
| import subprocess |
| import sys |
| import tarfile |
| import urllib.request |
| from pathlib import Path |
|
|
| ROOT = Path(__file__).resolve().parent.parent |
| sys.path.insert(0, str(ROOT / "src")) |
|
|
| DATA_ROOT = Path(os.environ.get("WJAD_DATA_ROOT", ROOT / "data" / "cosmos")) |
| NV_DOWNLOAD_URL = ( |
| "https://raw.githubusercontent.com/nv-tlabs/Cosmos-Drive-Dreams/main/scripts/download.py" |
| ) |
|
|
|
|
| def _print_section(title: str) -> None: |
| bar = "=" * 60 |
| print(f"\n{bar}\n{title}\n{bar}", flush=True) |
|
|
|
|
| def step1_download_labels() -> None: |
| """用 NVIDIA 官方脚本下载 1 个 clip 的标签 + lidar。""" |
| _print_section("STEP 1 下载真实标签(1 个 clip)") |
| DATA_ROOT.mkdir(parents=True, exist_ok=True) |
| nv_script = DATA_ROOT / ".nvidia_download.py" |
| if not nv_script.exists(): |
| print(f"[download] 取 NVIDIA download.py -> {nv_script}", flush=True) |
| with urllib.request.urlopen(NV_DOWNLOAD_URL) as r, open(nv_script, "wb") as f: |
| f.write(r.read()) |
| |
| |
| cmd = [ |
| sys.executable, |
| str(nv_script), |
| "--odir", str(DATA_ROOT), |
| "--file_types", "lidar,hdmap", |
| "--workers", "4", |
| "--limit", "1", |
| ] |
| print(f"$ {' '.join(cmd)}", flush=True) |
| rc = subprocess.call(cmd) |
| if rc != 0: |
| sys.exit(f"download.py 失败 rc={rc}") |
|
|
|
|
| def _hoist_single_subdir(out_dir: Path) -> None: |
| """若解压结果仅为「单个子目录、顶层无文件」,把子目录内容抬到 out_dir(常见 tar 布局)。""" |
| if not out_dir.is_dir(): |
| return |
| subs = [p for p in out_dir.iterdir() if p.is_dir()] |
| files = [p for p in out_dir.iterdir() if p.is_file()] |
| if len(subs) == 1 and not files: |
| child = subs[0] |
| for item in child.iterdir(): |
| dest = out_dir / item.name |
| if dest.exists(): |
| continue |
| shutil.move(str(item), str(dest)) |
| try: |
| child.rmdir() |
| except OSError: |
| pass |
|
|
|
|
| def step2_reorganize_labels() -> str: |
| """把每个 common 文件夹的 .tar 解压到 ``labels/{clip_id}/{folder}/``。 |
| |
| 返回挑选出的 ``clip_id``(去掉 ``_{start}_{end}`` 后缀)。 |
| """ |
| _print_section("STEP 2 解压标签到 labels/<clip_id>/<folder> 布局") |
|
|
| common_folders = [ |
| "all_object_info", |
| "captions", |
| "car_mask_coarse", |
| "ftheta_intrinsic", |
| "pinhole_intrinsic", |
| "pose", |
| "vehicle_pose", |
| "lidar_raw", |
| |
| "3d_lanes", |
| "3d_lanelines", |
| "3d_road_boundaries", |
| "3d_wait_lines", |
| "3d_crosswalks", |
| "3d_road_markings", |
| "3d_poles", |
| "3d_traffic_lights", |
| "3d_traffic_signs", |
| ] |
|
|
| clip_id_full: str | None = None |
| clip_id: str | None = None |
|
|
| for folder in common_folders: |
| src = DATA_ROOT / folder |
| if not src.exists(): |
| print(f" - skip {folder} (not downloaded)", flush=True) |
| continue |
| tars = sorted(src.glob("*.tar")) |
| if not tars: |
| print(f" - skip {folder} (no .tar)", flush=True) |
| continue |
| if clip_id_full is None: |
| clip_id_full = tars[0].stem |
| clip_id = clip_id_full.rsplit("_", 2)[0] |
| print(f" -> chosen clip_id_full = {clip_id_full}", flush=True) |
| print(f" -> video / symlink clip_id = {clip_id}", flush=True) |
| use_tars = [t for t in tars if t.stem == clip_id_full] |
| if not use_tars: |
| print(f" - skip {folder}: 无与 {clip_id_full} 同名的 tar(避免解压错 clip)", flush=True) |
| continue |
| tar_path = use_tars[0] |
| |
| out_dir = DATA_ROOT / "labels" / clip_id_full / folder |
| out_dir.mkdir(parents=True, exist_ok=True) |
| with tarfile.open(tar_path, "r") as tf: |
| tf.extractall(out_dir) |
| _hoist_single_subdir(out_dir) |
| |
| _hoist_single_subdir(out_dir) |
| |
| members = sorted(out_dir.rglob("*"))[:3] |
| for m in members: |
| print(f" {m.relative_to(DATA_ROOT)}", flush=True) |
| print(f" - {folder}: {len(list(out_dir.rglob('*')))} files", flush=True) |
|
|
| if clip_id_full is None: |
| sys.exit("没有下到任何标签 tar,确认 HF_TOKEN 是否能访问 NVIDIA 数据集") |
|
|
| |
| |
| short_dir = DATA_ROOT / "labels" / clip_id |
| if not short_dir.exists(): |
| try: |
| short_dir.symlink_to(DATA_ROOT / "labels" / clip_id_full, target_is_directory=True) |
| except OSError: |
| shutil.copytree(DATA_ROOT / "labels" / clip_id_full, short_dir) |
| return clip_id |
|
|
|
|
| def step3_make_fake_video(clip_id: str) -> None: |
| """合成 121 帧随机 mp4 模拟 ``cosmos_synthetic`` 视频。""" |
| _print_section("STEP 3 合成占位视频(随机噪声 mp4)") |
| import numpy as np |
| import cv2 |
|
|
| syn_dir = DATA_ROOT / "synthetic" / "single_view" / "generation" |
| syn_dir.mkdir(parents=True, exist_ok=True) |
| out_path = syn_dir / f"{clip_id}_0_Sunny.mp4" |
|
|
| H, W, T = 768, 1024, 121 |
| rng = np.random.default_rng(0) |
| fourcc = cv2.VideoWriter_fourcc(*"mp4v") |
| writer = cv2.VideoWriter(str(out_path), fourcc, 30.0, (W, H)) |
| if not writer.isOpened(): |
| sys.exit(f"无法打开 mp4 写入器(缺 codec?): {out_path}") |
| for _ in range(T): |
| frame = rng.integers(0, 256, size=(H, W, 3), dtype=np.uint8) |
| writer.write(frame) |
| writer.release() |
| print(f" 写入 {out_path} ({out_path.stat().st_size / 1024**2:.1f} MB)", flush=True) |
|
|
|
|
| def step4_run_trainer(clip_id: str) -> None: |
| """跑 runner_local --tiny --max_steps 4。""" |
| _print_section("STEP 4 跑 trainer(真实标签 + 伪造视觉)") |
| cmd = [ |
| sys.executable, |
| "-m", |
| "wjad.train.runner_local", |
| "--config", str(ROOT / "configs" / "default.yaml"), |
| "--data_root", str(DATA_ROOT), |
| "--dinov3_path", str(ROOT / "dinov3-vitb16-pretrain-lvd1689m"), |
| "--device", "cuda" if _has_cuda() else "cpu", |
| "--tiny", |
| "--max_steps", "4", |
| ] |
| env = os.environ.copy() |
| env["PYTHONPATH"] = str(ROOT / "src") + os.pathsep + env.get("PYTHONPATH", "") |
| print(f"$ {' '.join(cmd)}", flush=True) |
| rc = subprocess.call(cmd, env=env) |
| if rc != 0: |
| sys.exit(f"trainer 失败 rc={rc}") |
|
|
|
|
| def _has_cuda() -> bool: |
| try: |
| import torch |
| return torch.cuda.is_available() |
| except Exception: |
| return False |
|
|
|
|
| def main() -> None: |
| _print_section("WJAD Sandbox Real-Data Tiny Test") |
| print(f"DATA_ROOT = {DATA_ROOT}", flush=True) |
| step1_download_labels() |
| clip_id = step2_reorganize_labels() |
| step3_make_fake_video(clip_id) |
| step4_run_trainer(clip_id) |
| _print_section("DONE") |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|