File size: 5,590 Bytes
7a87926 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 |
import json
from pathlib import Path
import pytest
from ylff.utils.capture_bundle import CaptureBundle, CaptureBundleError
def test_capture_bundle_load_fails_on_missing_manifest(tmp_path: Path):
with pytest.raises(CaptureBundleError):
CaptureBundle.load(tmp_path)
def test_capture_bundle_load_fails_on_missing_referenced_paths(tmp_path: Path):
(tmp_path / "devices" / "iphone_a").mkdir(parents=True)
manifest = {
"schema_version": "1.0",
"capture_id": "cap_001",
"devices": [
{
"device_id": "iphone_a",
"device_type": "iphone",
"video_path": "devices/iphone_a/video.mov", # missing
"intrinsics_path": "devices/iphone_a/intrinsics.json", # missing
"timestamps_path": "devices/iphone_a/timestamps.json", # missing
}
],
}
(tmp_path / "manifest.json").write_text(json.dumps(manifest))
with pytest.raises(CaptureBundleError) as e:
CaptureBundle.load(tmp_path)
msg = str(e.value)
assert "Missing path" in msg
def test_capture_bundle_intrinsics_schema_variants(tmp_path: Path):
(tmp_path / "devices" / "iphone_a").mkdir(parents=True)
(tmp_path / "devices" / "iphone_a" / "video.mov").write_bytes(b"x")
(tmp_path / "devices" / "iphone_a" / "timestamps.json").write_text(json.dumps({"t": []}))
def write_manifest():
(tmp_path / "manifest.json").write_text(
json.dumps(
{
"schema_version": "1.0",
"capture_id": "cap_001",
"devices": [
{
"device_id": "iphone_a",
"device_type": "iphone",
"video_path": "devices/iphone_a/video.mov",
"intrinsics_path": "devices/iphone_a/intrinsics.json",
"timestamps_path": "devices/iphone_a/timestamps.json",
}
],
}
)
)
# K matrix
(tmp_path / "devices" / "iphone_a" / "intrinsics.json").write_text(
json.dumps({"K": [[1, 0, 0], [0, 2, 0], [0, 0, 1]]})
)
write_manifest()
bundle = CaptureBundle.load(tmp_path)
K = bundle.load_intrinsics_matrix("iphone_a")
assert K[1, 1] == 2.0
# intrinsics matrix key
(tmp_path / "devices" / "iphone_a" / "intrinsics.json").write_text(
json.dumps({"intrinsics": [[3, 0, 0], [0, 4, 0], [0, 0, 1]]})
)
bundle = CaptureBundle.load(tmp_path)
K = bundle.load_intrinsics_matrix("iphone_a")
assert K[0, 0] == 3.0
def test_capture_bundle_depth_stream_dir_fallback_and_sensor_paths(tmp_path: Path):
(tmp_path / "devices" / "iphone_a").mkdir(parents=True)
(tmp_path / "devices" / "iphone_a" / "video.mov").write_bytes(b"x")
(tmp_path / "devices" / "iphone_a" / "timestamps.json").write_text(json.dumps({"t": []}))
(tmp_path / "devices" / "iphone_a" / "intrinsics.json").write_text(
json.dumps({"K": [[1, 0, 0], [0, 1, 0], [0, 0, 1]]})
)
# WaveformMobile packed depth stream directory (manifest extra field).
(tmp_path / "devices" / "iphone_a" / "depth").mkdir(parents=True)
# Optional IMU + barometer streams (manifest extra field).
(tmp_path / "devices" / "iphone_a" / "imu_stream.bin").write_bytes(b"\x00")
(tmp_path / "devices" / "iphone_a" / "imu_frames.bin").write_bytes(b"\x00")
(tmp_path / "devices" / "iphone_a" / "imu_index.json").write_text(json.dumps({}))
(tmp_path / "devices" / "iphone_a" / "barometer_stream.bin").write_bytes(b"\x00")
(tmp_path / "devices" / "iphone_a" / "barometer").mkdir(parents=True)
(tmp_path / "devices" / "iphone_a" / "barometer" / "index.json").write_text(json.dumps({}))
manifest = {
"schema_version": "1.0",
"capture_id": "cap_001",
"devices": [
{
"device_id": "iphone_a",
"device_type": "iphone",
"video_path": "devices/iphone_a/video.mov",
"intrinsics_path": "devices/iphone_a/intrinsics.json",
"timestamps_path": "devices/iphone_a/timestamps.json",
# Intentionally omit lidar_depth_dir to exercise the fallback.
"streams": {
"depth": {"directory": "devices/iphone_a/depth"},
"imu": {
"stream": "devices/iphone_a/imu_stream.bin",
"frames": "devices/iphone_a/imu_frames.bin",
"index": "devices/iphone_a/imu_index.json",
},
"barometer": {
"stream": "devices/iphone_a/barometer_stream.bin",
"index": "devices/iphone_a/barometer/index.json",
},
},
}
],
}
(tmp_path / "manifest.json").write_text(json.dumps(manifest))
bundle = CaptureBundle.load(tmp_path)
depth_dir = bundle.device_depth_dir_best_effort("iphone_a")
assert depth_dir is not None
assert depth_dir.name == "depth"
assert depth_dir.exists()
imu_stream, imu_frames, imu_index = bundle.device_imu_paths("iphone_a")
assert imu_stream and imu_stream.exists()
assert imu_frames and imu_frames.exists()
assert imu_index and imu_index.exists()
bar_stream, bar_index = bundle.device_barometer_paths("iphone_a")
assert bar_stream and bar_stream.exists()
assert bar_index and bar_index.exists()
|