| import json | |
| from pathlib import Path | |
| from ylff.services.ingest_pipeline import IngestConfig, ingest_capture_bundle | |
| def test_ingest_capture_bundle_single_device(tmp_path: Path): | |
| raw = tmp_path / "raw_export" | |
| dev = raw / "iphone_a" | |
| dev.mkdir(parents=True) | |
| # Minimal required assets | |
| (dev / "video.mov").write_bytes(b"") # placeholder (no decoding in this test) | |
| (dev / "intrinsics.json").write_text( | |
| json.dumps({"fx": 1000.0, "fy": 1000.0, "cx": 512.0, "cy": 384.0}) | |
| ) | |
| (dev / "timestamps.json").write_text(json.dumps({"t": [0.0, 0.033, 0.066]})) | |
| out_root = tmp_path / "out" | |
| meta = ingest_capture_bundle( | |
| raw, | |
| output_root=out_root, | |
| config=IngestConfig(capture_id="001", overwrite=False, run_quality_gates=False), | |
| ) | |
| bundle_dir = Path(meta["bundle_dir"]) | |
| assert bundle_dir.exists() | |
| assert (bundle_dir / "manifest.json").exists() | |
| assert (bundle_dir / "devices" / "iphone_a" / "intrinsics.json").exists() | |
| assert (bundle_dir / "devices" / "iphone_a" / "timestamps.json").exists() | |