WJAD / src /wjad /data /label_paths.py
fuzirui's picture
Sync WJAD codebase
0cfefd2 verified
"""数据集标签目录布局解析。
README 中的 keys 是相对每个 modality 的 ``.tar`` 根目录的扁平路径;
实际解压后常多一层子目录或 clip stem 前缀。解析失败时在 ``FileNotFoundError``
里附带目录列表,便于与 Hugging Face 数据集页面中的说明对照。
"""
from __future__ import annotations
from pathlib import Path
def _norm_name(s: str) -> str:
return "".join(c for c in s.lower() if c.isalnum())
def _diagnose_labels(labels_dir: Path, folder: str, max_list: int = 50) -> str:
"""列出 ``labels/<clip>/<folder>/`` 下文件采样 + clip 根下一级子目录。"""
lines: list[str] = []
sub = labels_dir / folder
if sub.is_dir():
files = sorted(p for p in sub.rglob("*") if p.is_file())
lines.append(f"[{folder}/] 下共 {len(files)} 个文件(最多列出 {max_list} 条相对路径):")
for p in files[:max_list]:
try:
rel = p.relative_to(labels_dir).as_posix()
except ValueError:
rel = str(p)
lines.append(f" {rel}")
if len(files) > max_list:
lines.append(f" ... 另有 {len(files) - max_list} 个文件未列出")
else:
lines.append(f"[{folder}/] 不存在:{sub}")
try:
top = sorted(d.name for d in labels_dir.iterdir() if d.is_dir())
lines.append(f"[labels/<clip>/] 一级子目录:{top}")
except OSError as e:
lines.append(f"[labels/<clip>/] 无法列举:{e}")
return "\n".join(lines)
def _scan_npy_json_npz(
labels_dir: Path,
folder: str,
fname: str,
*,
exts: tuple[str, ...] = (".npy",),
tokens_norm: list[str],
name_must_contain: str | None = None,
) -> list[Path]:
"""在整棵 labels/<clip>/ 下找候选文件:扩展名 + 归一化名须含各 token。"""
root_hint = labels_dir / folder
search_roots = [root_hint] if root_hint.is_dir() else []
if not search_roots:
search_roots = [labels_dir]
hits: list[Path] = []
for root in search_roots:
for p in root.rglob("*"):
if not p.is_file():
continue
if not p.suffix.lower() in [e.lower() for e in exts]:
continue
if name_must_contain and name_must_contain.lower() not in p.name.lower():
continue
pn = _norm_name(p.name)
if all(tok in pn for tok in tokens_norm if tok):
hits.append(p)
if not hits and root_hint.is_dir():
for p in labels_dir.rglob("*"):
if not p.is_file() or p.suffix.lower() not in [e.lower() for e in exts]:
continue
if name_must_contain and name_must_contain.lower() not in p.name.lower():
continue
pn = _norm_name(p.name)
if all(tok in pn for tok in tokens_norm if tok):
hits.append(p)
return hits
def resolve_clip_file(labels_dir: Path, *parts: str) -> Path:
"""在 ``labels/<clip_id>/`` 下解析 ``parts`` 组成的相对路径(首个元素为一级子文件夹)。"""
if not parts:
raise ValueError("parts 不能为空")
if not labels_dir.is_dir():
raise FileNotFoundError(f"clip 标签根目录不存在: {labels_dir}")
direct = labels_dir.joinpath(*parts)
if direct.is_file():
return direct
# NVIDIA 磁盘命名:``{clip_stem}.{README_key}``,clip_stem = ``labels/<clip>/``
# 解析后的目录名(含 ``uuid_t0_t1``);README 里的 key 本身不含此前缀。
clip_stem = labels_dir.resolve().name
if len(parts) >= 2:
folder, fname = parts[0], parts[-1]
if not fname.startswith(f"{clip_stem}."):
stemmed = labels_dir / folder / f"{clip_stem}.{fname}"
if stemmed.is_file():
return stemmed
if len(parts) >= 2:
folder = parts[0]
rest = parts[1:]
doubled = (labels_dir / folder / folder).joinpath(*rest)
if doubled.is_file():
return doubled
fname = parts[-1]
folder = parts[0]
sub = labels_dir / folder
if sub.is_dir():
for p in sub.rglob(fname):
if p.is_file():
return p
for p in labels_dir.rglob(fname):
if p.is_file():
return p
fl = fname.lower()
for p in labels_dir.rglob("*"):
if p.is_file() and p.name.lower() == fl:
return p
# ftheta / pinhole
if folder in ("ftheta_intrinsic", "pinhole_intrinsic") and fname.endswith(".npy"):
prefix = folder + "."
if fname.lower().startswith(prefix.lower()):
cam = fname[len(prefix) : -len(".npy")]
cam_n = _norm_name(cam)
hits = []
for p in labels_dir.rglob("*.npy"):
if not p.is_file():
continue
pn = _norm_name(p.name)
if folder == "ftheta_intrinsic":
if "ftheta" not in pn:
continue
else:
if "pinhole" not in pn:
continue
if cam_n and cam_n in pn:
hits.append(p)
if len(hits) == 1:
return hits[0]
if len(hits) > 1:
hits.sort(key=lambda x: (len(x.parts), str(x)))
return hits[0]
# pose: ``{idx:06d}.pose.{camera}.npy``
if folder == "pose" and fname.endswith(".npy"):
base = fname[: -len(".npy")]
if ".pose." in base:
idx_part, _, cam_part = base.partition(".pose.")
hits = _scan_npy_json_npz(
labels_dir,
folder,
fname,
exts=(".npy",),
tokens_norm=[_norm_name(idx_part), _norm_name(cam_part)],
name_must_contain="pose",
)
if len(hits) == 1:
return hits[0]
if len(hits) > 1:
hits.sort(key=lambda x: (len(x.parts), -len(x.name), str(x)))
return hits[0]
# vehicle_pose: ``{idx:06d}.vehicle_pose.npy``
if folder == "vehicle_pose" and fname.endswith(".npy"):
idx_part = fname.split(".")[0]
hits = _scan_npy_json_npz(
labels_dir,
folder,
fname,
exts=(".npy",),
tokens_norm=[_norm_name(idx_part), "vehiclepose"],
name_must_contain="vehicle",
)
if len(hits) == 1:
return hits[0]
if len(hits) > 1:
hits.sort(key=lambda x: (len(x.parts), str(x)))
return hits[0]
# all_object_info
if folder == "all_object_info" and fname.endswith(".json"):
idx_part = fname.split(".")[0]
hits = _scan_npy_json_npz(
labels_dir,
folder,
fname,
exts=(".json",),
tokens_norm=[_norm_name(idx_part), "allobjectinfo"],
)
if len(hits) == 1:
return hits[0]
if len(hits) > 1:
hits.sort(key=lambda x: (len(x.parts), str(x)))
return hits[0]
# lidar_raw
if folder == "lidar_raw" and fname.endswith(".npz"):
stem = fname[: -len(".npz")]
hits = _scan_npy_json_npz(
labels_dir,
folder,
fname,
exts=(".npz",),
tokens_norm=[_norm_name(stem), "lidar", "raw"],
)
if len(hits) == 1:
return hits[0]
if len(hits) > 1:
hits.sort(key=lambda x: (len(x.parts), str(x)))
return hits[0]
detail = _diagnose_labels(labels_dir, folder)
raise FileNotFoundError(
f"在 {labels_dir} 下未找到 {'/'.join(parts)}(已尝试 README 扁平路径、双嵌套、"
f"rglob、按帧索引+相机的扫描匹配)。\n{detail}"
)