File size: 8,652 Bytes
0cfefd2
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
"""Sandbox 真实数据微验证脚本。

由于 NVIDIA Cosmos-Drive-Dreams 数据集的 ``cosmos_synthetic`` 是一份切成 17
个分卷(共 ~700 GB)的 ``split`` 二进制,单独下载某一分卷无法解压出 mp4。
因此本脚本采用混合方案:

  1. 用官方 ``download.py --file_types lidar --limit 1`` 拉下 1 个 clip 的
     全部真实标签(所有 common 文件夹 + lidar_raw),约 50-200 MB;
  2. 把每个 ``.tar`` 解压到 ``labels/{clip_id}/{folder}/`` 结构,匹配
     ``wjad.data.cosmos_dataset`` 期待的布局;
  3. 用 ``imageio`` 合成一个随机噪声 mp4 占位真实合成视频
     (文件名 ``{clip_id}_{chunk_id}_Sunny.mp4``,121 帧,分辨率 1024×768);
  4. 调用 ``wjad.train.runner_local --tiny --max_steps 4`` 跑 4 步真实标签 +
     伪造视觉的训练。

这样能验证:
  - 数据集索引(``build_clip_index``)
  - 标签解析(``all_object_info`` JSON、SE(3) pose、f-theta 内参)
  - LIDAR 加载与遮挡过滤
  - Hungarian 匹配 + DETR loss
  - 端到端 forward / GradNorm / PCGrad / 反传

但不会验证 DINOv3 在真实图像上的语义提取(视觉是噪声,不会收敛)。
"""

from __future__ import annotations

import os
import shutil
import subprocess
import sys
import tarfile
import urllib.request
from pathlib import Path

ROOT = Path(__file__).resolve().parent.parent
sys.path.insert(0, str(ROOT / "src"))

DATA_ROOT = Path(os.environ.get("WJAD_DATA_ROOT", ROOT / "data" / "cosmos"))
NV_DOWNLOAD_URL = (
    "https://raw.githubusercontent.com/nv-tlabs/Cosmos-Drive-Dreams/main/scripts/download.py"
)


def _print_section(title: str) -> None:
    bar = "=" * 60
    print(f"\n{bar}\n{title}\n{bar}", flush=True)


def step1_download_labels() -> None:
    """用 NVIDIA 官方脚本下载 1 个 clip 的标签 + lidar。"""
    _print_section("STEP 1  下载真实标签(1 个 clip)")
    DATA_ROOT.mkdir(parents=True, exist_ok=True)
    nv_script = DATA_ROOT / ".nvidia_download.py"
    if not nv_script.exists():
        print(f"[download] 取 NVIDIA download.py -> {nv_script}", flush=True)
        with urllib.request.urlopen(NV_DOWNLOAD_URL) as r, open(nv_script, "wb") as f:
            f.write(r.read())
    # 同时拉 lidar + hdmap:``hdmap`` 类别会触发 9 个 3d_* 文件夹下载,
    # 配合 common 文件夹一起拿,覆盖动态 + 结构化两类标签。
    cmd = [
        sys.executable,
        str(nv_script),
        "--odir", str(DATA_ROOT),
        "--file_types", "lidar,hdmap",
        "--workers", "4",
        "--limit", "1",
    ]
    print(f"$ {' '.join(cmd)}", flush=True)
    rc = subprocess.call(cmd)
    if rc != 0:
        sys.exit(f"download.py 失败 rc={rc}")


def _hoist_single_subdir(out_dir: Path) -> None:
    """若解压结果仅为「单个子目录、顶层无文件」,把子目录内容抬到 out_dir(常见 tar 布局)。"""
    if not out_dir.is_dir():
        return
    subs = [p for p in out_dir.iterdir() if p.is_dir()]
    files = [p for p in out_dir.iterdir() if p.is_file()]
    if len(subs) == 1 and not files:
        child = subs[0]
        for item in child.iterdir():
            dest = out_dir / item.name
            if dest.exists():
                continue
            shutil.move(str(item), str(dest))
        try:
            child.rmdir()
        except OSError:
            pass


def step2_reorganize_labels() -> str:
    """把每个 common 文件夹的 .tar 解压到 ``labels/{clip_id}/{folder}/``。

    返回挑选出的 ``clip_id``(去掉 ``_{start}_{end}`` 后缀)。
    """
    _print_section("STEP 2  解压标签到 labels/<clip_id>/<folder> 布局")

    common_folders = [
        "all_object_info",
        "captions",
        "car_mask_coarse",
        "ftheta_intrinsic",
        "pinhole_intrinsic",
        "pose",
        "vehicle_pose",
        "lidar_raw",
        # HDMap 9 类
        "3d_lanes",
        "3d_lanelines",
        "3d_road_boundaries",
        "3d_wait_lines",
        "3d_crosswalks",
        "3d_road_markings",
        "3d_poles",
        "3d_traffic_lights",
        "3d_traffic_signs",
    ]

    clip_id_full: str | None = None  # {clip_id}_{start}_{end}
    clip_id: str | None = None

    for folder in common_folders:
        src = DATA_ROOT / folder
        if not src.exists():
            print(f"  - skip {folder} (not downloaded)", flush=True)
            continue
        tars = sorted(src.glob("*.tar"))
        if not tars:
            print(f"  - skip {folder} (no .tar)", flush=True)
            continue
        if clip_id_full is None:
            clip_id_full = tars[0].stem
            clip_id = clip_id_full.rsplit("_", 2)[0]
            print(f"  -> chosen clip_id_full = {clip_id_full}", flush=True)
            print(f"  -> video / symlink clip_id   = {clip_id}", flush=True)
        use_tars = [t for t in tars if t.stem == clip_id_full]
        if not use_tars:
            print(f"  - skip {folder}: 无与 {clip_id_full} 同名的 tar(避免解压错 clip)", flush=True)
            continue
        tar_path = use_tars[0]
        # 目标目录
        out_dir = DATA_ROOT / "labels" / clip_id_full / folder
        out_dir.mkdir(parents=True, exist_ok=True)
        with tarfile.open(tar_path, "r") as tf:
            tf.extractall(out_dir)
        _hoist_single_subdir(out_dir)
        # 若仍嵌套一层 modality 名(ftheta_intrinsic/ftheta_intrinsic/...)
        _hoist_single_subdir(out_dir)
        # 列几个样例
        members = sorted(out_dir.rglob("*"))[:3]
        for m in members:
            print(f"     {m.relative_to(DATA_ROOT)}", flush=True)
        print(f"  - {folder}: {len(list(out_dir.rglob('*')))} files", flush=True)

    if clip_id_full is None:
        sys.exit("没有下到任何标签 tar,确认 HF_TOKEN 是否能访问 NVIDIA 数据集")

    # 兼容 cosmos_dataset.py:它从 labels/{clip_id}/ 读,但实际下载用的是
    # {clip_id}_{start}_{end} 作为目录名。这里软链一份名为纯 clip_id 的目录。
    short_dir = DATA_ROOT / "labels" / clip_id  # type: ignore[arg-type]
    if not short_dir.exists():
        try:
            short_dir.symlink_to(DATA_ROOT / "labels" / clip_id_full, target_is_directory=True)
        except OSError:
            shutil.copytree(DATA_ROOT / "labels" / clip_id_full, short_dir)
    return clip_id  # type: ignore[return-value]


def step3_make_fake_video(clip_id: str) -> None:
    """合成 121 帧随机 mp4 模拟 ``cosmos_synthetic`` 视频。"""
    _print_section("STEP 3  合成占位视频(随机噪声 mp4)")
    import numpy as np
    import cv2

    syn_dir = DATA_ROOT / "synthetic" / "single_view" / "generation"
    syn_dir.mkdir(parents=True, exist_ok=True)
    out_path = syn_dir / f"{clip_id}_0_Sunny.mp4"

    H, W, T = 768, 1024, 121  # 顶部裁剪后 384,原始 768
    rng = np.random.default_rng(0)
    fourcc = cv2.VideoWriter_fourcc(*"mp4v")
    writer = cv2.VideoWriter(str(out_path), fourcc, 30.0, (W, H))
    if not writer.isOpened():
        sys.exit(f"无法打开 mp4 写入器(缺 codec?): {out_path}")
    for _ in range(T):
        frame = rng.integers(0, 256, size=(H, W, 3), dtype=np.uint8)
        writer.write(frame)
    writer.release()
    print(f"  写入 {out_path}  ({out_path.stat().st_size / 1024**2:.1f} MB)", flush=True)


def step4_run_trainer(clip_id: str) -> None:
    """跑 runner_local --tiny --max_steps 4。"""
    _print_section("STEP 4  跑 trainer(真实标签 + 伪造视觉)")
    cmd = [
        sys.executable,
        "-m",
        "wjad.train.runner_local",
        "--config", str(ROOT / "configs" / "default.yaml"),
        "--data_root", str(DATA_ROOT),
        "--dinov3_path", str(ROOT / "dinov3-vitb16-pretrain-lvd1689m"),
        "--device", "cuda" if _has_cuda() else "cpu",
        "--tiny",
        "--max_steps", "4",
    ]
    env = os.environ.copy()
    env["PYTHONPATH"] = str(ROOT / "src") + os.pathsep + env.get("PYTHONPATH", "")
    print(f"$ {' '.join(cmd)}", flush=True)
    rc = subprocess.call(cmd, env=env)
    if rc != 0:
        sys.exit(f"trainer 失败 rc={rc}")


def _has_cuda() -> bool:
    try:
        import torch
        return torch.cuda.is_available()
    except Exception:
        return False


def main() -> None:
    _print_section("WJAD Sandbox  Real-Data Tiny Test")
    print(f"DATA_ROOT = {DATA_ROOT}", flush=True)
    step1_download_labels()
    clip_id = step2_reorganize_labels()
    step3_make_fake_video(clip_id)
    step4_run_trainer(clip_id)
    _print_section("DONE")


if __name__ == "__main__":
    main()