|
|
""" |
|
|
Capture bundle reader/validator (SPECIFICATIONS.md Appendix C). |
|
|
|
|
|
This module provides a single entry point (`CaptureBundle`) that: |
|
|
- loads a bundle manifest, |
|
|
- validates referenced files exist, |
|
|
- offers typed accessors to common assets. |
|
|
""" |
|
|
|
|
|
from __future__ import annotations |
|
|
|
|
|
import json |
|
|
from dataclasses import dataclass |
|
|
from pathlib import Path |
|
|
from typing import Any, Dict, List, Optional, Tuple, Union |
|
|
import numpy as np |
|
|
|
|
|
from ..models.capture_models import CaptureManifest, DetailedAnnotation |
|
|
from .dataset_layout import CaptureBundleLayout, validate_paths_exist |
|
|
|
|
|
|
|
|
class CaptureBundleError(ValueError): |
|
|
pass |
|
|
|
|
|
|
|
|
@dataclass(frozen=True) |
|
|
class _CalibrationCompat: |
|
|
rig_extrinsics_path: Optional[str] = None |
|
|
sync_offsets_path: Optional[str] = None |
|
|
|
|
|
|
|
|
@dataclass(frozen=True) |
|
|
class _AnnotationsCompat: |
|
|
quick_annotation_path: Optional[str] = None |
|
|
detailed_annotation_path: Optional[str] = None |
|
|
|
|
|
|
|
|
@dataclass(frozen=True) |
|
|
class _DeviceCompat: |
|
|
device_id: str |
|
|
video_path: Optional[str] = None |
|
|
intrinsics_path: Optional[str] = None |
|
|
timestamps_path: Optional[str] = None |
|
|
arkit_poses_path: Optional[str] = None |
|
|
lidar_depth_dir: Optional[str] = None |
|
|
|
|
|
model_extra: Dict[str, Any] = None |
|
|
|
|
|
|
|
|
@dataclass(frozen=True) |
|
|
class _ManifestCompat: |
|
|
schema_version: str |
|
|
capture_id: str |
|
|
devices: List[_DeviceCompat] |
|
|
calibration: Optional[_CalibrationCompat] = None |
|
|
annotations: Optional[_AnnotationsCompat] = None |
|
|
teacher_outputs: Optional[Any] = None |
|
|
metadata: Dict[str, Any] = None |
|
|
|
|
|
|
|
|
@dataclass(frozen=True) |
|
|
class CaptureBundle: |
|
|
root: Path |
|
|
|
|
|
|
|
|
manifest: Union[CaptureManifest, _ManifestCompat] |
|
|
manifest_obj: Dict[str, Any] |
|
|
schema_version: str |
|
|
_v2_stream_index: Dict[Tuple[str, str], str] |
|
|
|
|
|
@classmethod |
|
|
def load(cls, root: Path) -> CaptureBundle: |
|
|
root = Path(root) |
|
|
layout = CaptureBundleLayout(root=root) |
|
|
if not root.exists(): |
|
|
raise CaptureBundleError(f"Capture bundle root does not exist: {root}") |
|
|
if not layout.manifest_path.exists(): |
|
|
raise CaptureBundleError(f"Missing manifest.json at: {layout.manifest_path}") |
|
|
|
|
|
try: |
|
|
manifest_obj = json.loads(layout.manifest_path.read_text()) |
|
|
except Exception as e: |
|
|
raise CaptureBundleError(f"Failed to parse manifest.json: {e}") from e |
|
|
|
|
|
schema_version = str(manifest_obj.get("schema_version", "1.0") or "1.0") |
|
|
|
|
|
|
|
|
if schema_version.startswith("1"): |
|
|
try: |
|
|
manifest = CaptureManifest.model_validate(manifest_obj) |
|
|
except Exception as e: |
|
|
raise CaptureBundleError(f"Invalid manifest schema: {e}") from e |
|
|
bundle = cls( |
|
|
root=root, |
|
|
manifest=manifest, |
|
|
manifest_obj=manifest_obj, |
|
|
schema_version=schema_version, |
|
|
_v2_stream_index={}, |
|
|
) |
|
|
bundle.validate(repairable_ok=False) |
|
|
return bundle |
|
|
|
|
|
|
|
|
if not schema_version.startswith("2"): |
|
|
raise CaptureBundleError(f"Unsupported manifest schema_version: {schema_version}") |
|
|
|
|
|
devices = manifest_obj.get("devices", []) or [] |
|
|
if not isinstance(devices, list) or not devices: |
|
|
raise CaptureBundleError("v2 manifest has no devices[] pointers") |
|
|
|
|
|
|
|
|
v2_stream_index: Dict[Tuple[str, str], str] = {} |
|
|
for s in manifest_obj.get("streams", []) or []: |
|
|
if not isinstance(s, dict): |
|
|
continue |
|
|
did = s.get("device_id") |
|
|
kind = s.get("kind") |
|
|
path = s.get("path") |
|
|
if ( |
|
|
isinstance(did, str) |
|
|
and isinstance(kind, str) |
|
|
and isinstance(path, str) |
|
|
and did |
|
|
and kind |
|
|
and path |
|
|
): |
|
|
v2_stream_index[(did, kind)] = path |
|
|
|
|
|
def _resolve_stream_data_rel(did: str, kind: str) -> Optional[str]: |
|
|
sp = v2_stream_index.get((did, kind)) |
|
|
if not sp: |
|
|
return None |
|
|
sj_path = (root / sp).resolve() |
|
|
if not sj_path.exists(): |
|
|
return None |
|
|
try: |
|
|
sj = json.loads(sj_path.read_text()) |
|
|
except Exception: |
|
|
return None |
|
|
data = sj.get("data") if isinstance(sj, dict) else None |
|
|
if isinstance(data, dict) and isinstance(data.get("path"), str) and data["path"]: |
|
|
|
|
|
rel = (sj_path.parent / data["path"]).resolve() |
|
|
try: |
|
|
return str(rel.relative_to(root)) |
|
|
except Exception: |
|
|
return str(rel) |
|
|
return None |
|
|
|
|
|
compat_devices: List[_DeviceCompat] = [] |
|
|
for dref in devices: |
|
|
if not isinstance(dref, dict): |
|
|
continue |
|
|
did = dref.get("device_id") |
|
|
p = dref.get("path") |
|
|
if not (isinstance(did, str) and did and isinstance(p, str) and p): |
|
|
continue |
|
|
|
|
|
|
|
|
video_rel = _resolve_stream_data_rel(did, "video.rgb") |
|
|
|
|
|
|
|
|
intr_v2 = root / "devices" / did / "calibration" / "intrinsics.json" |
|
|
intr_v1 = root / "devices" / did / "intrinsics.json" |
|
|
intr_rel = None |
|
|
if intr_v2.exists(): |
|
|
intr_rel = str(intr_v2.relative_to(root)) |
|
|
elif intr_v1.exists(): |
|
|
intr_rel = str(intr_v1.relative_to(root)) |
|
|
|
|
|
ts_v1 = root / "devices" / did / "timestamps.json" |
|
|
ts_rel = str(ts_v1.relative_to(root)) if ts_v1.exists() else None |
|
|
|
|
|
poses_v1 = root / "devices" / did / "arkit_poses.json" |
|
|
poses_rel = str(poses_v1.relative_to(root)) if poses_v1.exists() else None |
|
|
|
|
|
depth_dir_v1 = root / "devices" / did / "depth" |
|
|
depth_dir_rel = str(depth_dir_v1.relative_to(root)) if depth_dir_v1.exists() else None |
|
|
|
|
|
extra_streams: Dict[str, Any] = {} |
|
|
if depth_dir_rel: |
|
|
extra_streams["depth"] = {"directory": depth_dir_rel} |
|
|
|
|
|
compat_devices.append( |
|
|
_DeviceCompat( |
|
|
device_id=did, |
|
|
video_path=video_rel, |
|
|
intrinsics_path=intr_rel, |
|
|
timestamps_path=ts_rel, |
|
|
arkit_poses_path=poses_rel, |
|
|
lidar_depth_dir=None, |
|
|
model_extra={"streams": extra_streams} if extra_streams else {}, |
|
|
) |
|
|
) |
|
|
|
|
|
if not compat_devices: |
|
|
raise CaptureBundleError("v2 manifest has no usable device entries") |
|
|
|
|
|
|
|
|
calib_files = [] |
|
|
calib = ( |
|
|
manifest_obj.get("calibration") |
|
|
if isinstance(manifest_obj.get("calibration"), dict) |
|
|
else {} |
|
|
) |
|
|
if isinstance(calib, dict) and isinstance(calib.get("files"), list): |
|
|
calib_files = [str(x) for x in calib.get("files") if isinstance(x, str)] |
|
|
cal = _CalibrationCompat( |
|
|
rig_extrinsics_path=( |
|
|
"calibration/rig_extrinsics.json" |
|
|
if "calibration/rig_extrinsics.json" in calib_files |
|
|
else None |
|
|
), |
|
|
sync_offsets_path=( |
|
|
"calibration/sync_offsets.json" |
|
|
if "calibration/sync_offsets.json" in calib_files |
|
|
else None |
|
|
), |
|
|
) |
|
|
|
|
|
ann_files = [] |
|
|
anns = ( |
|
|
manifest_obj.get("annotations") |
|
|
if isinstance(manifest_obj.get("annotations"), dict) |
|
|
else {} |
|
|
) |
|
|
if isinstance(anns, dict) and isinstance(anns.get("files"), list): |
|
|
ann_files = [str(x) for x in anns.get("files") if isinstance(x, str)] |
|
|
ann = _AnnotationsCompat( |
|
|
quick_annotation_path=( |
|
|
"annotations/quick_annotation.json" |
|
|
if "annotations/quick_annotation.json" in ann_files |
|
|
else None |
|
|
), |
|
|
detailed_annotation_path=( |
|
|
"annotations/detailed_annotation.json" |
|
|
if "annotations/detailed_annotation.json" in ann_files |
|
|
else None |
|
|
), |
|
|
) |
|
|
|
|
|
compat_manifest = _ManifestCompat( |
|
|
schema_version=schema_version, |
|
|
capture_id=str(manifest_obj.get("capture_id") or ""), |
|
|
devices=compat_devices, |
|
|
calibration=cal, |
|
|
annotations=ann, |
|
|
metadata={}, |
|
|
) |
|
|
|
|
|
bundle = cls( |
|
|
root=root, |
|
|
manifest=compat_manifest, |
|
|
manifest_obj=manifest_obj, |
|
|
schema_version=schema_version, |
|
|
_v2_stream_index=v2_stream_index, |
|
|
) |
|
|
bundle.validate(repairable_ok=False) |
|
|
return bundle |
|
|
|
|
|
@property |
|
|
def layout(self) -> CaptureBundleLayout: |
|
|
return CaptureBundleLayout(root=self.root) |
|
|
|
|
|
def validate(self, repairable_ok: bool = True) -> None: |
|
|
""" |
|
|
Validate that referenced paths exist. |
|
|
|
|
|
If `repairable_ok` is True, missing optional assets do not fail validation. |
|
|
""" |
|
|
rels: List[Optional[str]] = [] |
|
|
devices = getattr(self.manifest, "devices", []) or [] |
|
|
for d in devices: |
|
|
rels.extend( |
|
|
[ |
|
|
getattr(d, "video_path", None), |
|
|
getattr(d, "intrinsics_path", None), |
|
|
getattr(d, "timestamps_path", None), |
|
|
getattr(d, "arkit_poses_path", None), |
|
|
] |
|
|
) |
|
|
rels.append(getattr(d, "lidar_depth_dir", None)) |
|
|
|
|
|
|
|
|
|
|
|
extra = getattr(d, "model_extra", None) or {} |
|
|
streams = extra.get("streams") if isinstance(extra, dict) else None |
|
|
if isinstance(streams, dict): |
|
|
depth = streams.get("depth") |
|
|
if isinstance(depth, dict): |
|
|
rel = depth.get("directory") |
|
|
if isinstance(rel, str) and rel: |
|
|
rels.append(rel) |
|
|
|
|
|
if getattr(self.manifest, "calibration", None): |
|
|
cal = self.manifest.calibration |
|
|
rels.extend( |
|
|
[ |
|
|
getattr(cal, "rig_extrinsics_path", None), |
|
|
getattr(cal, "sync_offsets_path", None), |
|
|
] |
|
|
) |
|
|
if getattr(self.manifest, "annotations", None): |
|
|
ann = self.manifest.annotations |
|
|
rels.extend( |
|
|
[ |
|
|
getattr(ann, "quick_annotation_path", None), |
|
|
getattr(ann, "detailed_annotation_path", None), |
|
|
] |
|
|
) |
|
|
if getattr(self.manifest, "teacher_outputs", None): |
|
|
tout = self.manifest.teacher_outputs |
|
|
rels.extend( |
|
|
[ |
|
|
getattr(tout, "depth_dir", None), |
|
|
getattr(tout, "uncertainty_dir", None), |
|
|
getattr(tout, "reconstruction_path", None), |
|
|
] |
|
|
) |
|
|
|
|
|
errors = validate_paths_exist(self.root, rels) |
|
|
if errors and not repairable_ok: |
|
|
raise CaptureBundleError("Capture bundle validation failed:\n- " + "\n- ".join(errors)) |
|
|
|
|
|
|
|
|
|
|
|
def list_devices(self) -> List[str]: |
|
|
devices = getattr(self.manifest, "devices", []) or [] |
|
|
return [str(getattr(d, "device_id")) for d in devices] |
|
|
|
|
|
def get_device(self, device_id: str): |
|
|
devices = getattr(self.manifest, "devices", []) or [] |
|
|
for d in devices: |
|
|
if str(getattr(d, "device_id")) == device_id: |
|
|
return d |
|
|
raise CaptureBundleError(f"Unknown device_id: {device_id}") |
|
|
|
|
|
def device_video_path(self, device_id: str) -> Path: |
|
|
d = self.get_device(device_id) |
|
|
if not getattr(d, "video_path", None): |
|
|
raise CaptureBundleError( |
|
|
f"No video_path for device_id={device_id} (schema_version={self.schema_version})" |
|
|
) |
|
|
return (self.root / str(getattr(d, "video_path"))).resolve() |
|
|
|
|
|
def device_intrinsics_path(self, device_id: str) -> Path: |
|
|
d = self.get_device(device_id) |
|
|
if not getattr(d, "intrinsics_path", None): |
|
|
raise CaptureBundleError( |
|
|
f"No intrinsics_path for device_id={device_id} " |
|
|
f"(schema_version={self.schema_version})" |
|
|
) |
|
|
return (self.root / str(getattr(d, "intrinsics_path"))).resolve() |
|
|
|
|
|
def device_timestamps_path(self, device_id: str) -> Path: |
|
|
d = self.get_device(device_id) |
|
|
if not getattr(d, "timestamps_path", None): |
|
|
raise CaptureBundleError( |
|
|
f"No timestamps_path for device_id={device_id} " |
|
|
f"(schema_version={self.schema_version}). " |
|
|
"Use the timeline.frames stream (v2) instead." |
|
|
) |
|
|
return (self.root / str(getattr(d, "timestamps_path"))).resolve() |
|
|
|
|
|
def device_arkit_poses_path(self, device_id: str) -> Optional[Path]: |
|
|
d = self.get_device(device_id) |
|
|
if not getattr(d, "arkit_poses_path", None): |
|
|
return None |
|
|
return (self.root / str(getattr(d, "arkit_poses_path"))).resolve() |
|
|
|
|
|
def device_lidar_depth_dir(self, device_id: str) -> Optional[Path]: |
|
|
d = self.get_device(device_id) |
|
|
if not getattr(d, "lidar_depth_dir", None): |
|
|
return None |
|
|
return (self.root / str(getattr(d, "lidar_depth_dir"))).resolve() |
|
|
|
|
|
def device_depth_dir_best_effort(self, device_id: str) -> Optional[Path]: |
|
|
""" |
|
|
Prefer canonical `lidar_depth_dir`, but fall back to WaveformMobile packed stream directory |
|
|
stored under `device.streams.depth.directory` if present. |
|
|
""" |
|
|
d = self.get_device(device_id) |
|
|
if getattr(d, "lidar_depth_dir", None): |
|
|
p = (self.root / str(getattr(d, "lidar_depth_dir"))).resolve() |
|
|
return p |
|
|
extra = getattr(d, "model_extra", None) or {} |
|
|
if not isinstance(extra, dict): |
|
|
return None |
|
|
streams = extra.get("streams") |
|
|
if not isinstance(streams, dict): |
|
|
return None |
|
|
depth = streams.get("depth") |
|
|
if not isinstance(depth, dict): |
|
|
return None |
|
|
rel = depth.get("directory") |
|
|
if not isinstance(rel, str) or not rel: |
|
|
return None |
|
|
return (self.root / rel).resolve() |
|
|
|
|
|
|
|
|
|
|
|
def v2_has_streams(self) -> bool: |
|
|
return bool(self.schema_version.startswith("2") and self._v2_stream_index) |
|
|
|
|
|
def v2_stream_ref(self, *, device_id: str, kind: str) -> Optional[Path]: |
|
|
""" |
|
|
Return the path to `stream.json` for (device_id, kind) when present in v2. |
|
|
""" |
|
|
if not self.schema_version.startswith("2"): |
|
|
return None |
|
|
rel = self._v2_stream_index.get((device_id, kind)) |
|
|
return (self.root / rel).resolve() if rel else None |
|
|
|
|
|
def v2_stream_json(self, *, device_id: str, kind: str) -> Optional[Dict[str, Any]]: |
|
|
p = self.v2_stream_ref(device_id=device_id, kind=kind) |
|
|
if not p or not p.exists(): |
|
|
return None |
|
|
try: |
|
|
obj = json.loads(p.read_text()) |
|
|
return obj if isinstance(obj, dict) else None |
|
|
except Exception: |
|
|
return None |
|
|
|
|
|
def v2_stream_data_path(self, *, device_id: str, kind: str) -> Optional[Path]: |
|
|
sj = self.v2_stream_json(device_id=device_id, kind=kind) |
|
|
if not sj: |
|
|
return None |
|
|
sp = self.v2_stream_ref(device_id=device_id, kind=kind) |
|
|
if not sp: |
|
|
return None |
|
|
data = sj.get("data") if isinstance(sj.get("data"), dict) else None |
|
|
if not isinstance(data, dict): |
|
|
return None |
|
|
rel = data.get("path") |
|
|
if not isinstance(rel, str) or not rel: |
|
|
return None |
|
|
return (sp.parent / rel).resolve() |
|
|
|
|
|
|
|
|
|
|
|
def _device_streams_extra(self, device_id: str) -> Dict[str, Any]: |
|
|
d = self.get_device(device_id) |
|
|
extra = getattr(d, "model_extra", None) or {} |
|
|
if not isinstance(extra, dict): |
|
|
return {} |
|
|
streams = extra.get("streams") |
|
|
return streams if isinstance(streams, dict) else {} |
|
|
|
|
|
def device_imu_paths( |
|
|
self, device_id: str |
|
|
) -> tuple[Optional[Path], Optional[Path], Optional[Path]]: |
|
|
""" |
|
|
Return (imu_stream.bin, imu_frames.bin, imu_index.json) paths when present |
|
|
in device.streams.imu. |
|
|
""" |
|
|
streams = self._device_streams_extra(device_id) |
|
|
imu = streams.get("imu") |
|
|
if not isinstance(imu, dict): |
|
|
return (None, None, None) |
|
|
stream = imu.get("stream") |
|
|
frames = imu.get("frames") |
|
|
index = imu.get("index") |
|
|
sp = (self.root / str(stream)).resolve() if isinstance(stream, str) and stream else None |
|
|
fp = (self.root / str(frames)).resolve() if isinstance(frames, str) and frames else None |
|
|
ip = (self.root / str(index)).resolve() if isinstance(index, str) and index else None |
|
|
return (sp, fp, ip) |
|
|
|
|
|
def device_barometer_paths(self, device_id: str) -> tuple[Optional[Path], Optional[Path]]: |
|
|
""" |
|
|
Return (barometer_stream.bin, barometer/index.json) paths when present in |
|
|
device.streams.barometer. |
|
|
""" |
|
|
streams = self._device_streams_extra(device_id) |
|
|
bar = streams.get("barometer") |
|
|
if not isinstance(bar, dict): |
|
|
return (None, None) |
|
|
stream = bar.get("stream") |
|
|
index = bar.get("index") |
|
|
sp = (self.root / str(stream)).resolve() if isinstance(stream, str) and stream else None |
|
|
ip = (self.root / str(index)).resolve() if isinstance(index, str) and index else None |
|
|
return (sp, ip) |
|
|
|
|
|
def load_detailed_annotation(self) -> Optional[DetailedAnnotation]: |
|
|
if not self.manifest.annotations or not self.manifest.annotations.detailed_annotation_path: |
|
|
return None |
|
|
p = (self.root / self.manifest.annotations.detailed_annotation_path).resolve() |
|
|
if not p.exists(): |
|
|
return None |
|
|
try: |
|
|
obj = json.loads(p.read_text()) |
|
|
return DetailedAnnotation.model_validate(obj) |
|
|
except Exception as e: |
|
|
raise CaptureBundleError(f"Failed to parse detailed annotation: {p}: {e}") from e |
|
|
|
|
|
def load_intrinsics_matrix(self, device_id: str) -> np.ndarray: |
|
|
""" |
|
|
Load intrinsics from JSON as a (3, 3) float32 matrix. |
|
|
|
|
|
The exact intrinsics JSON schema is intentionally flexible; we accept: |
|
|
- {"K": [[...],[...],[...]]} or |
|
|
- {"intrinsics": [[...],[...],[...]]} or |
|
|
- {"fx":..., "fy":..., "cx":..., "cy":...} |
|
|
""" |
|
|
p = self.device_intrinsics_path(device_id) |
|
|
obj = json.loads(p.read_text()) |
|
|
if "K" in obj: |
|
|
K = np.asarray(obj["K"], dtype=np.float32) |
|
|
elif "intrinsics" in obj: |
|
|
K = np.asarray(obj["intrinsics"], dtype=np.float32) |
|
|
elif all(k in obj for k in ("fx", "fy", "cx", "cy")): |
|
|
fx, fy, cx, cy = float(obj["fx"]), float(obj["fy"]), float(obj["cx"]), float(obj["cy"]) |
|
|
K = np.array([[fx, 0.0, cx], [0.0, fy, cy], [0.0, 0.0, 1.0]], dtype=np.float32) |
|
|
else: |
|
|
raise CaptureBundleError(f"Unsupported intrinsics JSON schema: {p}") |
|
|
if K.shape != (3, 3): |
|
|
raise CaptureBundleError(f"Intrinsics must be 3x3, got {K.shape} in {p}") |
|
|
return K |
|
|
|
|
|
def load_intrinsics_and_distortion( |
|
|
self, device_id: str |
|
|
) -> tuple[np.ndarray, Optional[np.ndarray]]: |
|
|
""" |
|
|
Load intrinsics matrix K and (optional) distortion coefficients. |
|
|
|
|
|
Distortion support is best-effort and forward-compatible. Accepted keys: |
|
|
- {"distortion": [...]} (OpenCV-style coeffs) |
|
|
- {"dist": [...]} or {"dist_coeffs": [...]} |
|
|
- {"k": [...]} (legacy) |
|
|
""" |
|
|
p = self.device_intrinsics_path(device_id) |
|
|
obj = json.loads(p.read_text()) |
|
|
K = self.load_intrinsics_matrix(device_id) |
|
|
dist = None |
|
|
for key in ("distortion", "dist", "dist_coeffs", "k"): |
|
|
if key in obj and isinstance(obj[key], list): |
|
|
try: |
|
|
dist = np.asarray(obj[key], dtype=np.float64).reshape(-1) |
|
|
except Exception: |
|
|
dist = None |
|
|
break |
|
|
return K.astype(np.float32, copy=False), dist |
|
|
|
|
|
def load_sync_offsets(self) -> Optional[Dict[str, float]]: |
|
|
""" |
|
|
Load per-device sync offsets (seconds) if available. |
|
|
""" |
|
|
if not self.manifest.calibration or not self.manifest.calibration.sync_offsets_path: |
|
|
return None |
|
|
p = (self.root / self.manifest.calibration.sync_offsets_path).resolve() |
|
|
if not p.exists(): |
|
|
return None |
|
|
obj = json.loads(p.read_text()) |
|
|
|
|
|
return {str(k): float(v) for k, v in obj.items()} |
|
|
|
|
|
def load_rig_extrinsics(self) -> Optional[Dict[str, Any]]: |
|
|
""" |
|
|
Load rig extrinsics if available. |
|
|
|
|
|
The exact schema is not locked yet; we return raw JSON for now. |
|
|
""" |
|
|
if not self.manifest.calibration or not self.manifest.calibration.rig_extrinsics_path: |
|
|
return None |
|
|
p = (self.root / self.manifest.calibration.rig_extrinsics_path).resolve() |
|
|
if not p.exists(): |
|
|
return None |
|
|
return json.loads(p.read_text()) |
|
|
|