| """数据集标签目录布局解析。 |
| |
| README 中的 keys 是相对每个 modality 的 ``.tar`` 根目录的扁平路径; |
| 实际解压后常多一层子目录或 clip stem 前缀。解析失败时在 ``FileNotFoundError`` |
| 里附带目录列表,便于与 Hugging Face 数据集页面中的说明对照。 |
| """ |
|
|
| from __future__ import annotations |
|
|
| from pathlib import Path |
|
|
|
|
| def _norm_name(s: str) -> str: |
| return "".join(c for c in s.lower() if c.isalnum()) |
|
|
|
|
| def _diagnose_labels(labels_dir: Path, folder: str, max_list: int = 50) -> str: |
| """列出 ``labels/<clip>/<folder>/`` 下文件采样 + clip 根下一级子目录。""" |
| lines: list[str] = [] |
| sub = labels_dir / folder |
| if sub.is_dir(): |
| files = sorted(p for p in sub.rglob("*") if p.is_file()) |
| lines.append(f"[{folder}/] 下共 {len(files)} 个文件(最多列出 {max_list} 条相对路径):") |
| for p in files[:max_list]: |
| try: |
| rel = p.relative_to(labels_dir).as_posix() |
| except ValueError: |
| rel = str(p) |
| lines.append(f" {rel}") |
| if len(files) > max_list: |
| lines.append(f" ... 另有 {len(files) - max_list} 个文件未列出") |
| else: |
| lines.append(f"[{folder}/] 不存在:{sub}") |
| try: |
| top = sorted(d.name for d in labels_dir.iterdir() if d.is_dir()) |
| lines.append(f"[labels/<clip>/] 一级子目录:{top}") |
| except OSError as e: |
| lines.append(f"[labels/<clip>/] 无法列举:{e}") |
| return "\n".join(lines) |
|
|
|
|
| def _scan_npy_json_npz( |
| labels_dir: Path, |
| folder: str, |
| fname: str, |
| *, |
| exts: tuple[str, ...] = (".npy",), |
| tokens_norm: list[str], |
| name_must_contain: str | None = None, |
| ) -> list[Path]: |
| """在整棵 labels/<clip>/ 下找候选文件:扩展名 + 归一化名须含各 token。""" |
| root_hint = labels_dir / folder |
| search_roots = [root_hint] if root_hint.is_dir() else [] |
| if not search_roots: |
| search_roots = [labels_dir] |
| hits: list[Path] = [] |
| for root in search_roots: |
| for p in root.rglob("*"): |
| if not p.is_file(): |
| continue |
| if not p.suffix.lower() in [e.lower() for e in exts]: |
| continue |
| if name_must_contain and name_must_contain.lower() not in p.name.lower(): |
| continue |
| pn = _norm_name(p.name) |
| if all(tok in pn for tok in tokens_norm if tok): |
| hits.append(p) |
| if not hits and root_hint.is_dir(): |
| for p in labels_dir.rglob("*"): |
| if not p.is_file() or p.suffix.lower() not in [e.lower() for e in exts]: |
| continue |
| if name_must_contain and name_must_contain.lower() not in p.name.lower(): |
| continue |
| pn = _norm_name(p.name) |
| if all(tok in pn for tok in tokens_norm if tok): |
| hits.append(p) |
| return hits |
|
|
|
|
| def resolve_clip_file(labels_dir: Path, *parts: str) -> Path: |
| """在 ``labels/<clip_id>/`` 下解析 ``parts`` 组成的相对路径(首个元素为一级子文件夹)。""" |
| if not parts: |
| raise ValueError("parts 不能为空") |
| if not labels_dir.is_dir(): |
| raise FileNotFoundError(f"clip 标签根目录不存在: {labels_dir}") |
|
|
| direct = labels_dir.joinpath(*parts) |
| if direct.is_file(): |
| return direct |
| |
| |
| clip_stem = labels_dir.resolve().name |
| if len(parts) >= 2: |
| folder, fname = parts[0], parts[-1] |
| if not fname.startswith(f"{clip_stem}."): |
| stemmed = labels_dir / folder / f"{clip_stem}.{fname}" |
| if stemmed.is_file(): |
| return stemmed |
| if len(parts) >= 2: |
| folder = parts[0] |
| rest = parts[1:] |
| doubled = (labels_dir / folder / folder).joinpath(*rest) |
| if doubled.is_file(): |
| return doubled |
| fname = parts[-1] |
| folder = parts[0] |
| sub = labels_dir / folder |
| if sub.is_dir(): |
| for p in sub.rglob(fname): |
| if p.is_file(): |
| return p |
|
|
| for p in labels_dir.rglob(fname): |
| if p.is_file(): |
| return p |
| fl = fname.lower() |
| for p in labels_dir.rglob("*"): |
| if p.is_file() and p.name.lower() == fl: |
| return p |
|
|
| |
| if folder in ("ftheta_intrinsic", "pinhole_intrinsic") and fname.endswith(".npy"): |
| prefix = folder + "." |
| if fname.lower().startswith(prefix.lower()): |
| cam = fname[len(prefix) : -len(".npy")] |
| cam_n = _norm_name(cam) |
| hits = [] |
| for p in labels_dir.rglob("*.npy"): |
| if not p.is_file(): |
| continue |
| pn = _norm_name(p.name) |
| if folder == "ftheta_intrinsic": |
| if "ftheta" not in pn: |
| continue |
| else: |
| if "pinhole" not in pn: |
| continue |
| if cam_n and cam_n in pn: |
| hits.append(p) |
| if len(hits) == 1: |
| return hits[0] |
| if len(hits) > 1: |
| hits.sort(key=lambda x: (len(x.parts), str(x))) |
| return hits[0] |
|
|
| |
| if folder == "pose" and fname.endswith(".npy"): |
| base = fname[: -len(".npy")] |
| if ".pose." in base: |
| idx_part, _, cam_part = base.partition(".pose.") |
| hits = _scan_npy_json_npz( |
| labels_dir, |
| folder, |
| fname, |
| exts=(".npy",), |
| tokens_norm=[_norm_name(idx_part), _norm_name(cam_part)], |
| name_must_contain="pose", |
| ) |
| if len(hits) == 1: |
| return hits[0] |
| if len(hits) > 1: |
| hits.sort(key=lambda x: (len(x.parts), -len(x.name), str(x))) |
| return hits[0] |
|
|
| |
| if folder == "vehicle_pose" and fname.endswith(".npy"): |
| idx_part = fname.split(".")[0] |
| hits = _scan_npy_json_npz( |
| labels_dir, |
| folder, |
| fname, |
| exts=(".npy",), |
| tokens_norm=[_norm_name(idx_part), "vehiclepose"], |
| name_must_contain="vehicle", |
| ) |
| if len(hits) == 1: |
| return hits[0] |
| if len(hits) > 1: |
| hits.sort(key=lambda x: (len(x.parts), str(x))) |
| return hits[0] |
|
|
| |
| if folder == "all_object_info" and fname.endswith(".json"): |
| idx_part = fname.split(".")[0] |
| hits = _scan_npy_json_npz( |
| labels_dir, |
| folder, |
| fname, |
| exts=(".json",), |
| tokens_norm=[_norm_name(idx_part), "allobjectinfo"], |
| ) |
| if len(hits) == 1: |
| return hits[0] |
| if len(hits) > 1: |
| hits.sort(key=lambda x: (len(x.parts), str(x))) |
| return hits[0] |
|
|
| |
| if folder == "lidar_raw" and fname.endswith(".npz"): |
| stem = fname[: -len(".npz")] |
| hits = _scan_npy_json_npz( |
| labels_dir, |
| folder, |
| fname, |
| exts=(".npz",), |
| tokens_norm=[_norm_name(stem), "lidar", "raw"], |
| ) |
| if len(hits) == 1: |
| return hits[0] |
| if len(hits) > 1: |
| hits.sort(key=lambda x: (len(x.parts), str(x))) |
| return hits[0] |
|
|
| detail = _diagnose_labels(labels_dir, folder) |
| raise FileNotFoundError( |
| f"在 {labels_dir} 下未找到 {'/'.join(parts)}(已尝试 README 扁平路径、双嵌套、" |
| f"rglob、按帧索引+相机的扫描匹配)。\n{detail}" |
| ) |
|
|