"""数据集标签目录布局解析。 README 中的 keys 是相对每个 modality 的 ``.tar`` 根目录的扁平路径; 实际解压后常多一层子目录或 clip stem 前缀。解析失败时在 ``FileNotFoundError`` 里附带目录列表,便于与 Hugging Face 数据集页面中的说明对照。 """ from __future__ import annotations from pathlib import Path def _norm_name(s: str) -> str: return "".join(c for c in s.lower() if c.isalnum()) def _diagnose_labels(labels_dir: Path, folder: str, max_list: int = 50) -> str: """列出 ``labels///`` 下文件采样 + clip 根下一级子目录。""" lines: list[str] = [] sub = labels_dir / folder if sub.is_dir(): files = sorted(p for p in sub.rglob("*") if p.is_file()) lines.append(f"[{folder}/] 下共 {len(files)} 个文件(最多列出 {max_list} 条相对路径):") for p in files[:max_list]: try: rel = p.relative_to(labels_dir).as_posix() except ValueError: rel = str(p) lines.append(f" {rel}") if len(files) > max_list: lines.append(f" ... 另有 {len(files) - max_list} 个文件未列出") else: lines.append(f"[{folder}/] 不存在:{sub}") try: top = sorted(d.name for d in labels_dir.iterdir() if d.is_dir()) lines.append(f"[labels//] 一级子目录:{top}") except OSError as e: lines.append(f"[labels//] 无法列举:{e}") return "\n".join(lines) def _scan_npy_json_npz( labels_dir: Path, folder: str, fname: str, *, exts: tuple[str, ...] = (".npy",), tokens_norm: list[str], name_must_contain: str | None = None, ) -> list[Path]: """在整棵 labels// 下找候选文件:扩展名 + 归一化名须含各 token。""" root_hint = labels_dir / folder search_roots = [root_hint] if root_hint.is_dir() else [] if not search_roots: search_roots = [labels_dir] hits: list[Path] = [] for root in search_roots: for p in root.rglob("*"): if not p.is_file(): continue if not p.suffix.lower() in [e.lower() for e in exts]: continue if name_must_contain and name_must_contain.lower() not in p.name.lower(): continue pn = _norm_name(p.name) if all(tok in pn for tok in tokens_norm if tok): hits.append(p) if not hits and root_hint.is_dir(): for p in labels_dir.rglob("*"): if not p.is_file() or p.suffix.lower() not in [e.lower() for e in exts]: continue if name_must_contain and name_must_contain.lower() not in p.name.lower(): continue pn = _norm_name(p.name) if all(tok in pn for tok in tokens_norm if tok): hits.append(p) return hits def resolve_clip_file(labels_dir: Path, *parts: str) -> Path: """在 ``labels//`` 下解析 ``parts`` 组成的相对路径(首个元素为一级子文件夹)。""" if not parts: raise ValueError("parts 不能为空") if not labels_dir.is_dir(): raise FileNotFoundError(f"clip 标签根目录不存在: {labels_dir}") direct = labels_dir.joinpath(*parts) if direct.is_file(): return direct # NVIDIA 磁盘命名:``{clip_stem}.{README_key}``,clip_stem = ``labels//`` # 解析后的目录名(含 ``uuid_t0_t1``);README 里的 key 本身不含此前缀。 clip_stem = labels_dir.resolve().name if len(parts) >= 2: folder, fname = parts[0], parts[-1] if not fname.startswith(f"{clip_stem}."): stemmed = labels_dir / folder / f"{clip_stem}.{fname}" if stemmed.is_file(): return stemmed if len(parts) >= 2: folder = parts[0] rest = parts[1:] doubled = (labels_dir / folder / folder).joinpath(*rest) if doubled.is_file(): return doubled fname = parts[-1] folder = parts[0] sub = labels_dir / folder if sub.is_dir(): for p in sub.rglob(fname): if p.is_file(): return p for p in labels_dir.rglob(fname): if p.is_file(): return p fl = fname.lower() for p in labels_dir.rglob("*"): if p.is_file() and p.name.lower() == fl: return p # ftheta / pinhole if folder in ("ftheta_intrinsic", "pinhole_intrinsic") and fname.endswith(".npy"): prefix = folder + "." if fname.lower().startswith(prefix.lower()): cam = fname[len(prefix) : -len(".npy")] cam_n = _norm_name(cam) hits = [] for p in labels_dir.rglob("*.npy"): if not p.is_file(): continue pn = _norm_name(p.name) if folder == "ftheta_intrinsic": if "ftheta" not in pn: continue else: if "pinhole" not in pn: continue if cam_n and cam_n in pn: hits.append(p) if len(hits) == 1: return hits[0] if len(hits) > 1: hits.sort(key=lambda x: (len(x.parts), str(x))) return hits[0] # pose: ``{idx:06d}.pose.{camera}.npy`` if folder == "pose" and fname.endswith(".npy"): base = fname[: -len(".npy")] if ".pose." in base: idx_part, _, cam_part = base.partition(".pose.") hits = _scan_npy_json_npz( labels_dir, folder, fname, exts=(".npy",), tokens_norm=[_norm_name(idx_part), _norm_name(cam_part)], name_must_contain="pose", ) if len(hits) == 1: return hits[0] if len(hits) > 1: hits.sort(key=lambda x: (len(x.parts), -len(x.name), str(x))) return hits[0] # vehicle_pose: ``{idx:06d}.vehicle_pose.npy`` if folder == "vehicle_pose" and fname.endswith(".npy"): idx_part = fname.split(".")[0] hits = _scan_npy_json_npz( labels_dir, folder, fname, exts=(".npy",), tokens_norm=[_norm_name(idx_part), "vehiclepose"], name_must_contain="vehicle", ) if len(hits) == 1: return hits[0] if len(hits) > 1: hits.sort(key=lambda x: (len(x.parts), str(x))) return hits[0] # all_object_info if folder == "all_object_info" and fname.endswith(".json"): idx_part = fname.split(".")[0] hits = _scan_npy_json_npz( labels_dir, folder, fname, exts=(".json",), tokens_norm=[_norm_name(idx_part), "allobjectinfo"], ) if len(hits) == 1: return hits[0] if len(hits) > 1: hits.sort(key=lambda x: (len(x.parts), str(x))) return hits[0] # lidar_raw if folder == "lidar_raw" and fname.endswith(".npz"): stem = fname[: -len(".npz")] hits = _scan_npy_json_npz( labels_dir, folder, fname, exts=(".npz",), tokens_norm=[_norm_name(stem), "lidar", "raw"], ) if len(hits) == 1: return hits[0] if len(hits) > 1: hits.sort(key=lambda x: (len(x.parts), str(x))) return hits[0] detail = _diagnose_labels(labels_dir, folder) raise FileNotFoundError( f"在 {labels_dir} 下未找到 {'/'.join(parts)}(已尝试 README 扁平路径、双嵌套、" f"rglob、按帧索引+相机的扫描匹配)。\n{detail}" )