| """ |
| TartanAir adapter that auto-discovers RGB/depth pairs irrespective of the |
| extraction depth. |
| |
| The official zip's top-level entries already start with `<scene>/<difficulty>/`, |
| so depending on how the user extracts (`-d extracted/` vs `-d extracted/<scene>/`) |
| the same files end up at different depths. This loader recursively walks |
| `<data_root>` and pairs every `<...>/<P###>/image_left/<frame>_left.png` |
| with `<...>/<P###>/depth_left/<frame>_left_depth.npy`. Skips trajectories |
| whose RGB and depth aren't both present. |
| """ |
| from __future__ import annotations |
|
|
| import glob |
| import os |
| import re |
|
|
| import cv2 |
| import numpy as np |
|
|
| from ppd.data.depth_estimation import Dataset as BaseDataset |
| from ppd.utils.logger import Log |
|
|
|
|
| _FRAME_RE = re.compile(r"^(\d{6})_left\.png$") |
|
|
|
|
| class Dataset(BaseDataset): |
| def build_metas(self): |
| self.dataset_name = "tartanair" |
| root = self.cfg.data_root |
| self.rgb_files: list[str] = [] |
| self.depth_files: list[str] = [] |
|
|
| |
| |
| for img_dir in glob.iglob(os.path.join(root, "**", "image_left"), recursive=True): |
| traj_dir = os.path.dirname(img_dir) |
| dpt_dir = os.path.join(traj_dir, "depth_left") |
| if not os.path.isdir(dpt_dir): |
| continue |
| for fname in os.listdir(img_dir): |
| m = _FRAME_RE.match(fname) |
| if not m: |
| continue |
| rgb = os.path.join(img_dir, fname) |
| dpt = os.path.join(dpt_dir, f"{m.group(1)}_left_depth.npy") |
| if os.path.isfile(rgb) and os.path.isfile(dpt): |
| self.rgb_files.append(rgb) |
| self.depth_files.append(dpt) |
|
|
| assert len(self.rgb_files) == len(self.depth_files) |
|
|
| def read_depth(self, index, depth=None): |
| depth = np.load(self.depth_files[index]).astype(np.float32) |
| min_val, max_val = 0.1, 80.0 |
| sky_mask = depth > 200.0 |
| valid = (depth > min_val) & ~np.isnan(depth) & ~np.isinf(depth) & (depth < max_val) |
| if valid.sum() == 0: |
| Log.warn(f"No valid mask in depth: {self.depth_files[index]}") |
| if valid.sum() != 0 and np.isnan(depth).sum() != 0: |
| depth[np.isnan(depth)] = depth[valid].max() |
| if valid.sum() != 0 and np.isinf(depth).sum() != 0: |
| depth[np.isinf(depth)] = depth[valid].max() |
| depth = np.clip(depth, min_val, max_val) |
| depth[sky_mask] = depth.max() + 1.0 |
| valid = np.logical_or(valid, sky_mask) |
| return depth, valid.astype(np.uint8) |
|
|