|
|
import json |
|
|
from pathlib import Path |
|
|
import pytest |
|
|
|
|
|
from ylff.utils.capture_bundle import CaptureBundle, CaptureBundleError |
|
|
|
|
|
|
|
|
def test_capture_bundle_load_fails_on_missing_manifest(tmp_path: Path): |
|
|
with pytest.raises(CaptureBundleError): |
|
|
CaptureBundle.load(tmp_path) |
|
|
|
|
|
|
|
|
def test_capture_bundle_load_fails_on_missing_referenced_paths(tmp_path: Path): |
|
|
(tmp_path / "devices" / "iphone_a").mkdir(parents=True) |
|
|
manifest = { |
|
|
"schema_version": "1.0", |
|
|
"capture_id": "cap_001", |
|
|
"devices": [ |
|
|
{ |
|
|
"device_id": "iphone_a", |
|
|
"device_type": "iphone", |
|
|
"video_path": "devices/iphone_a/video.mov", |
|
|
"intrinsics_path": "devices/iphone_a/intrinsics.json", |
|
|
"timestamps_path": "devices/iphone_a/timestamps.json", |
|
|
} |
|
|
], |
|
|
} |
|
|
(tmp_path / "manifest.json").write_text(json.dumps(manifest)) |
|
|
|
|
|
with pytest.raises(CaptureBundleError) as e: |
|
|
CaptureBundle.load(tmp_path) |
|
|
msg = str(e.value) |
|
|
assert "Missing path" in msg |
|
|
|
|
|
|
|
|
def test_capture_bundle_intrinsics_schema_variants(tmp_path: Path): |
|
|
(tmp_path / "devices" / "iphone_a").mkdir(parents=True) |
|
|
(tmp_path / "devices" / "iphone_a" / "video.mov").write_bytes(b"x") |
|
|
(tmp_path / "devices" / "iphone_a" / "timestamps.json").write_text(json.dumps({"t": []})) |
|
|
|
|
|
def write_manifest(): |
|
|
(tmp_path / "manifest.json").write_text( |
|
|
json.dumps( |
|
|
{ |
|
|
"schema_version": "1.0", |
|
|
"capture_id": "cap_001", |
|
|
"devices": [ |
|
|
{ |
|
|
"device_id": "iphone_a", |
|
|
"device_type": "iphone", |
|
|
"video_path": "devices/iphone_a/video.mov", |
|
|
"intrinsics_path": "devices/iphone_a/intrinsics.json", |
|
|
"timestamps_path": "devices/iphone_a/timestamps.json", |
|
|
} |
|
|
], |
|
|
} |
|
|
) |
|
|
) |
|
|
|
|
|
|
|
|
(tmp_path / "devices" / "iphone_a" / "intrinsics.json").write_text( |
|
|
json.dumps({"K": [[1, 0, 0], [0, 2, 0], [0, 0, 1]]}) |
|
|
) |
|
|
write_manifest() |
|
|
bundle = CaptureBundle.load(tmp_path) |
|
|
K = bundle.load_intrinsics_matrix("iphone_a") |
|
|
assert K[1, 1] == 2.0 |
|
|
|
|
|
|
|
|
(tmp_path / "devices" / "iphone_a" / "intrinsics.json").write_text( |
|
|
json.dumps({"intrinsics": [[3, 0, 0], [0, 4, 0], [0, 0, 1]]}) |
|
|
) |
|
|
bundle = CaptureBundle.load(tmp_path) |
|
|
K = bundle.load_intrinsics_matrix("iphone_a") |
|
|
assert K[0, 0] == 3.0 |
|
|
|
|
|
|
|
|
def test_capture_bundle_depth_stream_dir_fallback_and_sensor_paths(tmp_path: Path): |
|
|
(tmp_path / "devices" / "iphone_a").mkdir(parents=True) |
|
|
(tmp_path / "devices" / "iphone_a" / "video.mov").write_bytes(b"x") |
|
|
(tmp_path / "devices" / "iphone_a" / "timestamps.json").write_text(json.dumps({"t": []})) |
|
|
(tmp_path / "devices" / "iphone_a" / "intrinsics.json").write_text( |
|
|
json.dumps({"K": [[1, 0, 0], [0, 1, 0], [0, 0, 1]]}) |
|
|
) |
|
|
|
|
|
|
|
|
(tmp_path / "devices" / "iphone_a" / "depth").mkdir(parents=True) |
|
|
|
|
|
|
|
|
(tmp_path / "devices" / "iphone_a" / "imu_stream.bin").write_bytes(b"\x00") |
|
|
(tmp_path / "devices" / "iphone_a" / "imu_frames.bin").write_bytes(b"\x00") |
|
|
(tmp_path / "devices" / "iphone_a" / "imu_index.json").write_text(json.dumps({})) |
|
|
(tmp_path / "devices" / "iphone_a" / "barometer_stream.bin").write_bytes(b"\x00") |
|
|
(tmp_path / "devices" / "iphone_a" / "barometer").mkdir(parents=True) |
|
|
(tmp_path / "devices" / "iphone_a" / "barometer" / "index.json").write_text(json.dumps({})) |
|
|
|
|
|
manifest = { |
|
|
"schema_version": "1.0", |
|
|
"capture_id": "cap_001", |
|
|
"devices": [ |
|
|
{ |
|
|
"device_id": "iphone_a", |
|
|
"device_type": "iphone", |
|
|
"video_path": "devices/iphone_a/video.mov", |
|
|
"intrinsics_path": "devices/iphone_a/intrinsics.json", |
|
|
"timestamps_path": "devices/iphone_a/timestamps.json", |
|
|
|
|
|
"streams": { |
|
|
"depth": {"directory": "devices/iphone_a/depth"}, |
|
|
"imu": { |
|
|
"stream": "devices/iphone_a/imu_stream.bin", |
|
|
"frames": "devices/iphone_a/imu_frames.bin", |
|
|
"index": "devices/iphone_a/imu_index.json", |
|
|
}, |
|
|
"barometer": { |
|
|
"stream": "devices/iphone_a/barometer_stream.bin", |
|
|
"index": "devices/iphone_a/barometer/index.json", |
|
|
}, |
|
|
}, |
|
|
} |
|
|
], |
|
|
} |
|
|
(tmp_path / "manifest.json").write_text(json.dumps(manifest)) |
|
|
|
|
|
bundle = CaptureBundle.load(tmp_path) |
|
|
|
|
|
depth_dir = bundle.device_depth_dir_best_effort("iphone_a") |
|
|
assert depth_dir is not None |
|
|
assert depth_dir.name == "depth" |
|
|
assert depth_dir.exists() |
|
|
|
|
|
imu_stream, imu_frames, imu_index = bundle.device_imu_paths("iphone_a") |
|
|
assert imu_stream and imu_stream.exists() |
|
|
assert imu_frames and imu_frames.exists() |
|
|
assert imu_index and imu_index.exists() |
|
|
|
|
|
bar_stream, bar_index = bundle.device_barometer_paths("iphone_a") |
|
|
assert bar_stream and bar_stream.exists() |
|
|
assert bar_index and bar_index.exists() |
|
|
|