File size: 5,590 Bytes
7a87926
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
import json
from pathlib import Path
import pytest

from ylff.utils.capture_bundle import CaptureBundle, CaptureBundleError


def test_capture_bundle_load_fails_on_missing_manifest(tmp_path: Path):
    with pytest.raises(CaptureBundleError):
        CaptureBundle.load(tmp_path)


def test_capture_bundle_load_fails_on_missing_referenced_paths(tmp_path: Path):
    (tmp_path / "devices" / "iphone_a").mkdir(parents=True)
    manifest = {
        "schema_version": "1.0",
        "capture_id": "cap_001",
        "devices": [
            {
                "device_id": "iphone_a",
                "device_type": "iphone",
                "video_path": "devices/iphone_a/video.mov",  # missing
                "intrinsics_path": "devices/iphone_a/intrinsics.json",  # missing
                "timestamps_path": "devices/iphone_a/timestamps.json",  # missing
            }
        ],
    }
    (tmp_path / "manifest.json").write_text(json.dumps(manifest))

    with pytest.raises(CaptureBundleError) as e:
        CaptureBundle.load(tmp_path)
    msg = str(e.value)
    assert "Missing path" in msg


def test_capture_bundle_intrinsics_schema_variants(tmp_path: Path):
    (tmp_path / "devices" / "iphone_a").mkdir(parents=True)
    (tmp_path / "devices" / "iphone_a" / "video.mov").write_bytes(b"x")
    (tmp_path / "devices" / "iphone_a" / "timestamps.json").write_text(json.dumps({"t": []}))

    def write_manifest():
        (tmp_path / "manifest.json").write_text(
            json.dumps(
                {
                    "schema_version": "1.0",
                    "capture_id": "cap_001",
                    "devices": [
                        {
                            "device_id": "iphone_a",
                            "device_type": "iphone",
                            "video_path": "devices/iphone_a/video.mov",
                            "intrinsics_path": "devices/iphone_a/intrinsics.json",
                            "timestamps_path": "devices/iphone_a/timestamps.json",
                        }
                    ],
                }
            )
        )

    # K matrix
    (tmp_path / "devices" / "iphone_a" / "intrinsics.json").write_text(
        json.dumps({"K": [[1, 0, 0], [0, 2, 0], [0, 0, 1]]})
    )
    write_manifest()
    bundle = CaptureBundle.load(tmp_path)
    K = bundle.load_intrinsics_matrix("iphone_a")
    assert K[1, 1] == 2.0

    # intrinsics matrix key
    (tmp_path / "devices" / "iphone_a" / "intrinsics.json").write_text(
        json.dumps({"intrinsics": [[3, 0, 0], [0, 4, 0], [0, 0, 1]]})
    )
    bundle = CaptureBundle.load(tmp_path)
    K = bundle.load_intrinsics_matrix("iphone_a")
    assert K[0, 0] == 3.0


def test_capture_bundle_depth_stream_dir_fallback_and_sensor_paths(tmp_path: Path):
    (tmp_path / "devices" / "iphone_a").mkdir(parents=True)
    (tmp_path / "devices" / "iphone_a" / "video.mov").write_bytes(b"x")
    (tmp_path / "devices" / "iphone_a" / "timestamps.json").write_text(json.dumps({"t": []}))
    (tmp_path / "devices" / "iphone_a" / "intrinsics.json").write_text(
        json.dumps({"K": [[1, 0, 0], [0, 1, 0], [0, 0, 1]]})
    )

    # WaveformMobile packed depth stream directory (manifest extra field).
    (tmp_path / "devices" / "iphone_a" / "depth").mkdir(parents=True)

    # Optional IMU + barometer streams (manifest extra field).
    (tmp_path / "devices" / "iphone_a" / "imu_stream.bin").write_bytes(b"\x00")
    (tmp_path / "devices" / "iphone_a" / "imu_frames.bin").write_bytes(b"\x00")
    (tmp_path / "devices" / "iphone_a" / "imu_index.json").write_text(json.dumps({}))
    (tmp_path / "devices" / "iphone_a" / "barometer_stream.bin").write_bytes(b"\x00")
    (tmp_path / "devices" / "iphone_a" / "barometer").mkdir(parents=True)
    (tmp_path / "devices" / "iphone_a" / "barometer" / "index.json").write_text(json.dumps({}))

    manifest = {
        "schema_version": "1.0",
        "capture_id": "cap_001",
        "devices": [
            {
                "device_id": "iphone_a",
                "device_type": "iphone",
                "video_path": "devices/iphone_a/video.mov",
                "intrinsics_path": "devices/iphone_a/intrinsics.json",
                "timestamps_path": "devices/iphone_a/timestamps.json",
                # Intentionally omit lidar_depth_dir to exercise the fallback.
                "streams": {
                    "depth": {"directory": "devices/iphone_a/depth"},
                    "imu": {
                        "stream": "devices/iphone_a/imu_stream.bin",
                        "frames": "devices/iphone_a/imu_frames.bin",
                        "index": "devices/iphone_a/imu_index.json",
                    },
                    "barometer": {
                        "stream": "devices/iphone_a/barometer_stream.bin",
                        "index": "devices/iphone_a/barometer/index.json",
                    },
                },
            }
        ],
    }
    (tmp_path / "manifest.json").write_text(json.dumps(manifest))

    bundle = CaptureBundle.load(tmp_path)

    depth_dir = bundle.device_depth_dir_best_effort("iphone_a")
    assert depth_dir is not None
    assert depth_dir.name == "depth"
    assert depth_dir.exists()

    imu_stream, imu_frames, imu_index = bundle.device_imu_paths("iphone_a")
    assert imu_stream and imu_stream.exists()
    assert imu_frames and imu_frames.exists()
    assert imu_index and imu_index.exists()

    bar_stream, bar_index = bundle.device_barometer_paths("iphone_a")
    assert bar_stream and bar_stream.exists()
    assert bar_index and bar_index.exists()