File size: 7,901 Bytes
0cfefd2 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 | """数据集标签目录布局解析。
README 中的 keys 是相对每个 modality 的 ``.tar`` 根目录的扁平路径;
实际解压后常多一层子目录或 clip stem 前缀。解析失败时在 ``FileNotFoundError``
里附带目录列表,便于与 Hugging Face 数据集页面中的说明对照。
"""
from __future__ import annotations
from pathlib import Path
def _norm_name(s: str) -> str:
return "".join(c for c in s.lower() if c.isalnum())
def _diagnose_labels(labels_dir: Path, folder: str, max_list: int = 50) -> str:
"""列出 ``labels/<clip>/<folder>/`` 下文件采样 + clip 根下一级子目录。"""
lines: list[str] = []
sub = labels_dir / folder
if sub.is_dir():
files = sorted(p for p in sub.rglob("*") if p.is_file())
lines.append(f"[{folder}/] 下共 {len(files)} 个文件(最多列出 {max_list} 条相对路径):")
for p in files[:max_list]:
try:
rel = p.relative_to(labels_dir).as_posix()
except ValueError:
rel = str(p)
lines.append(f" {rel}")
if len(files) > max_list:
lines.append(f" ... 另有 {len(files) - max_list} 个文件未列出")
else:
lines.append(f"[{folder}/] 不存在:{sub}")
try:
top = sorted(d.name for d in labels_dir.iterdir() if d.is_dir())
lines.append(f"[labels/<clip>/] 一级子目录:{top}")
except OSError as e:
lines.append(f"[labels/<clip>/] 无法列举:{e}")
return "\n".join(lines)
def _scan_npy_json_npz(
labels_dir: Path,
folder: str,
fname: str,
*,
exts: tuple[str, ...] = (".npy",),
tokens_norm: list[str],
name_must_contain: str | None = None,
) -> list[Path]:
"""在整棵 labels/<clip>/ 下找候选文件:扩展名 + 归一化名须含各 token。"""
root_hint = labels_dir / folder
search_roots = [root_hint] if root_hint.is_dir() else []
if not search_roots:
search_roots = [labels_dir]
hits: list[Path] = []
for root in search_roots:
for p in root.rglob("*"):
if not p.is_file():
continue
if not p.suffix.lower() in [e.lower() for e in exts]:
continue
if name_must_contain and name_must_contain.lower() not in p.name.lower():
continue
pn = _norm_name(p.name)
if all(tok in pn for tok in tokens_norm if tok):
hits.append(p)
if not hits and root_hint.is_dir():
for p in labels_dir.rglob("*"):
if not p.is_file() or p.suffix.lower() not in [e.lower() for e in exts]:
continue
if name_must_contain and name_must_contain.lower() not in p.name.lower():
continue
pn = _norm_name(p.name)
if all(tok in pn for tok in tokens_norm if tok):
hits.append(p)
return hits
def resolve_clip_file(labels_dir: Path, *parts: str) -> Path:
"""在 ``labels/<clip_id>/`` 下解析 ``parts`` 组成的相对路径(首个元素为一级子文件夹)。"""
if not parts:
raise ValueError("parts 不能为空")
if not labels_dir.is_dir():
raise FileNotFoundError(f"clip 标签根目录不存在: {labels_dir}")
direct = labels_dir.joinpath(*parts)
if direct.is_file():
return direct
# NVIDIA 磁盘命名:``{clip_stem}.{README_key}``,clip_stem = ``labels/<clip>/``
# 解析后的目录名(含 ``uuid_t0_t1``);README 里的 key 本身不含此前缀。
clip_stem = labels_dir.resolve().name
if len(parts) >= 2:
folder, fname = parts[0], parts[-1]
if not fname.startswith(f"{clip_stem}."):
stemmed = labels_dir / folder / f"{clip_stem}.{fname}"
if stemmed.is_file():
return stemmed
if len(parts) >= 2:
folder = parts[0]
rest = parts[1:]
doubled = (labels_dir / folder / folder).joinpath(*rest)
if doubled.is_file():
return doubled
fname = parts[-1]
folder = parts[0]
sub = labels_dir / folder
if sub.is_dir():
for p in sub.rglob(fname):
if p.is_file():
return p
for p in labels_dir.rglob(fname):
if p.is_file():
return p
fl = fname.lower()
for p in labels_dir.rglob("*"):
if p.is_file() and p.name.lower() == fl:
return p
# ftheta / pinhole
if folder in ("ftheta_intrinsic", "pinhole_intrinsic") and fname.endswith(".npy"):
prefix = folder + "."
if fname.lower().startswith(prefix.lower()):
cam = fname[len(prefix) : -len(".npy")]
cam_n = _norm_name(cam)
hits = []
for p in labels_dir.rglob("*.npy"):
if not p.is_file():
continue
pn = _norm_name(p.name)
if folder == "ftheta_intrinsic":
if "ftheta" not in pn:
continue
else:
if "pinhole" not in pn:
continue
if cam_n and cam_n in pn:
hits.append(p)
if len(hits) == 1:
return hits[0]
if len(hits) > 1:
hits.sort(key=lambda x: (len(x.parts), str(x)))
return hits[0]
# pose: ``{idx:06d}.pose.{camera}.npy``
if folder == "pose" and fname.endswith(".npy"):
base = fname[: -len(".npy")]
if ".pose." in base:
idx_part, _, cam_part = base.partition(".pose.")
hits = _scan_npy_json_npz(
labels_dir,
folder,
fname,
exts=(".npy",),
tokens_norm=[_norm_name(idx_part), _norm_name(cam_part)],
name_must_contain="pose",
)
if len(hits) == 1:
return hits[0]
if len(hits) > 1:
hits.sort(key=lambda x: (len(x.parts), -len(x.name), str(x)))
return hits[0]
# vehicle_pose: ``{idx:06d}.vehicle_pose.npy``
if folder == "vehicle_pose" and fname.endswith(".npy"):
idx_part = fname.split(".")[0]
hits = _scan_npy_json_npz(
labels_dir,
folder,
fname,
exts=(".npy",),
tokens_norm=[_norm_name(idx_part), "vehiclepose"],
name_must_contain="vehicle",
)
if len(hits) == 1:
return hits[0]
if len(hits) > 1:
hits.sort(key=lambda x: (len(x.parts), str(x)))
return hits[0]
# all_object_info
if folder == "all_object_info" and fname.endswith(".json"):
idx_part = fname.split(".")[0]
hits = _scan_npy_json_npz(
labels_dir,
folder,
fname,
exts=(".json",),
tokens_norm=[_norm_name(idx_part), "allobjectinfo"],
)
if len(hits) == 1:
return hits[0]
if len(hits) > 1:
hits.sort(key=lambda x: (len(x.parts), str(x)))
return hits[0]
# lidar_raw
if folder == "lidar_raw" and fname.endswith(".npz"):
stem = fname[: -len(".npz")]
hits = _scan_npy_json_npz(
labels_dir,
folder,
fname,
exts=(".npz",),
tokens_norm=[_norm_name(stem), "lidar", "raw"],
)
if len(hits) == 1:
return hits[0]
if len(hits) > 1:
hits.sort(key=lambda x: (len(x.parts), str(x)))
return hits[0]
detail = _diagnose_labels(labels_dir, folder)
raise FileNotFoundError(
f"在 {labels_dir} 下未找到 {'/'.join(parts)}(已尝试 README 扁平路径、双嵌套、"
f"rglob、按帧索引+相机的扫描匹配)。\n{detail}"
)
|