| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| import queue |
| import time |
| from multiprocessing import queues |
| from unittest.mock import MagicMock, patch |
|
|
| import numpy as np |
| import pytest |
| from PIL import Image |
|
|
| from lerobot.common.datasets.image_writer import ( |
| AsyncImageWriter, |
| image_array_to_pil_image, |
| safe_stop_image_writer, |
| write_image, |
| ) |
| from tests.fixtures.constants import DUMMY_HWC |
|
|
| DUMMY_IMAGE = "test_image.png" |
|
|
|
|
| def test_init_threading(): |
| writer = AsyncImageWriter(num_processes=0, num_threads=2) |
| try: |
| assert writer.num_processes == 0 |
| assert writer.num_threads == 2 |
| assert isinstance(writer.queue, queue.Queue) |
| assert len(writer.threads) == 2 |
| assert len(writer.processes) == 0 |
| assert all(t.is_alive() for t in writer.threads) |
| finally: |
| writer.stop() |
|
|
|
|
| def test_init_multiprocessing(): |
| writer = AsyncImageWriter(num_processes=2, num_threads=2) |
| try: |
| assert writer.num_processes == 2 |
| assert writer.num_threads == 2 |
| assert isinstance(writer.queue, queues.JoinableQueue) |
| assert len(writer.threads) == 0 |
| assert len(writer.processes) == 2 |
| assert all(p.is_alive() for p in writer.processes) |
| finally: |
| writer.stop() |
|
|
|
|
| def test_zero_threads(): |
| with pytest.raises(ValueError): |
| AsyncImageWriter(num_processes=0, num_threads=0) |
|
|
|
|
| def test_image_array_to_pil_image_float_array_wrong_range_0_255(): |
| image = np.random.rand(*DUMMY_HWC) * 255 |
| with pytest.raises(ValueError): |
| image_array_to_pil_image(image) |
|
|
|
|
| def test_image_array_to_pil_image_float_array_wrong_range_neg_1_1(): |
| image = np.random.rand(*DUMMY_HWC) * 2 - 1 |
| with pytest.raises(ValueError): |
| image_array_to_pil_image(image) |
|
|
|
|
| def test_image_array_to_pil_image_rgb(img_array_factory): |
| img_array = img_array_factory(100, 100) |
| result_image = image_array_to_pil_image(img_array) |
| assert isinstance(result_image, Image.Image) |
| assert result_image.size == (100, 100) |
| assert result_image.mode == "RGB" |
|
|
|
|
| def test_image_array_to_pil_image_pytorch_format(img_array_factory): |
| img_array = img_array_factory(100, 100).transpose(2, 0, 1) |
| result_image = image_array_to_pil_image(img_array) |
| assert isinstance(result_image, Image.Image) |
| assert result_image.size == (100, 100) |
| assert result_image.mode == "RGB" |
|
|
|
|
| def test_image_array_to_pil_image_single_channel(img_array_factory): |
| img_array = img_array_factory(channels=1) |
| with pytest.raises(NotImplementedError): |
| image_array_to_pil_image(img_array) |
|
|
|
|
| def test_image_array_to_pil_image_4_channels(img_array_factory): |
| img_array = img_array_factory(channels=4) |
| with pytest.raises(NotImplementedError): |
| image_array_to_pil_image(img_array) |
|
|
|
|
| def test_image_array_to_pil_image_float_array(img_array_factory): |
| img_array = img_array_factory(dtype=np.float32) |
| result_image = image_array_to_pil_image(img_array) |
| assert isinstance(result_image, Image.Image) |
| assert result_image.size == (100, 100) |
| assert result_image.mode == "RGB" |
| assert np.array(result_image).dtype == np.uint8 |
|
|
|
|
| def test_image_array_to_pil_image_uint8_array(img_array_factory): |
| img_array = img_array_factory(dtype=np.float32) |
| result_image = image_array_to_pil_image(img_array) |
| assert isinstance(result_image, Image.Image) |
| assert result_image.size == (100, 100) |
| assert result_image.mode == "RGB" |
| assert np.array(result_image).dtype == np.uint8 |
|
|
|
|
| def test_write_image_numpy(tmp_path, img_array_factory): |
| image_array = img_array_factory() |
| fpath = tmp_path / DUMMY_IMAGE |
| write_image(image_array, fpath) |
| assert fpath.exists() |
| saved_image = np.array(Image.open(fpath)) |
| assert np.array_equal(image_array, saved_image) |
|
|
|
|
| def test_write_image_image(tmp_path, img_factory): |
| image_pil = img_factory() |
| fpath = tmp_path / DUMMY_IMAGE |
| write_image(image_pil, fpath) |
| assert fpath.exists() |
| saved_image = Image.open(fpath) |
| assert list(saved_image.getdata()) == list(image_pil.getdata()) |
| assert np.array_equal(image_pil, saved_image) |
|
|
|
|
| def test_write_image_exception(tmp_path): |
| image_array = "invalid data" |
| fpath = tmp_path / DUMMY_IMAGE |
| with patch("builtins.print") as mock_print: |
| write_image(image_array, fpath) |
| mock_print.assert_called() |
| assert not fpath.exists() |
|
|
|
|
| def test_save_image_numpy(tmp_path, img_array_factory): |
| writer = AsyncImageWriter() |
| try: |
| image_array = img_array_factory() |
| fpath = tmp_path / DUMMY_IMAGE |
| fpath.parent.mkdir(parents=True, exist_ok=True) |
| writer.save_image(image_array, fpath) |
| writer.wait_until_done() |
| assert fpath.exists() |
| saved_image = np.array(Image.open(fpath)) |
| assert np.array_equal(image_array, saved_image) |
| finally: |
| writer.stop() |
|
|
|
|
| def test_save_image_numpy_multiprocessing(tmp_path, img_array_factory): |
| writer = AsyncImageWriter(num_processes=2, num_threads=2) |
| try: |
| image_array = img_array_factory() |
| fpath = tmp_path / DUMMY_IMAGE |
| writer.save_image(image_array, fpath) |
| writer.wait_until_done() |
| assert fpath.exists() |
| saved_image = np.array(Image.open(fpath)) |
| assert np.array_equal(image_array, saved_image) |
| finally: |
| writer.stop() |
|
|
|
|
| def test_save_image_torch(tmp_path, img_tensor_factory): |
| writer = AsyncImageWriter() |
| try: |
| image_tensor = img_tensor_factory() |
| fpath = tmp_path / DUMMY_IMAGE |
| fpath.parent.mkdir(parents=True, exist_ok=True) |
| writer.save_image(image_tensor, fpath) |
| writer.wait_until_done() |
| assert fpath.exists() |
| saved_image = np.array(Image.open(fpath)) |
| expected_image = (image_tensor.permute(1, 2, 0).cpu().numpy() * 255).astype(np.uint8) |
| assert np.array_equal(expected_image, saved_image) |
| finally: |
| writer.stop() |
|
|
|
|
| def test_save_image_torch_multiprocessing(tmp_path, img_tensor_factory): |
| writer = AsyncImageWriter(num_processes=2, num_threads=2) |
| try: |
| image_tensor = img_tensor_factory() |
| fpath = tmp_path / DUMMY_IMAGE |
| writer.save_image(image_tensor, fpath) |
| writer.wait_until_done() |
| assert fpath.exists() |
| saved_image = np.array(Image.open(fpath)) |
| expected_image = (image_tensor.permute(1, 2, 0).cpu().numpy() * 255).astype(np.uint8) |
| assert np.array_equal(expected_image, saved_image) |
| finally: |
| writer.stop() |
|
|
|
|
| def test_save_image_pil(tmp_path, img_factory): |
| writer = AsyncImageWriter() |
| try: |
| image_pil = img_factory() |
| fpath = tmp_path / DUMMY_IMAGE |
| fpath.parent.mkdir(parents=True, exist_ok=True) |
| writer.save_image(image_pil, fpath) |
| writer.wait_until_done() |
| assert fpath.exists() |
| saved_image = Image.open(fpath) |
| assert list(saved_image.getdata()) == list(image_pil.getdata()) |
| finally: |
| writer.stop() |
|
|
|
|
| def test_save_image_pil_multiprocessing(tmp_path, img_factory): |
| writer = AsyncImageWriter(num_processes=2, num_threads=2) |
| try: |
| image_pil = img_factory() |
| fpath = tmp_path / DUMMY_IMAGE |
| writer.save_image(image_pil, fpath) |
| writer.wait_until_done() |
| assert fpath.exists() |
| saved_image = Image.open(fpath) |
| assert list(saved_image.getdata()) == list(image_pil.getdata()) |
| finally: |
| writer.stop() |
|
|
|
|
| def test_save_image_invalid_data(tmp_path): |
| writer = AsyncImageWriter() |
| try: |
| image_array = "invalid data" |
| fpath = tmp_path / DUMMY_IMAGE |
| fpath.parent.mkdir(parents=True, exist_ok=True) |
| with patch("builtins.print") as mock_print: |
| writer.save_image(image_array, fpath) |
| writer.wait_until_done() |
| mock_print.assert_called() |
| assert not fpath.exists() |
| finally: |
| writer.stop() |
|
|
|
|
| def test_save_image_after_stop(tmp_path, img_array_factory): |
| writer = AsyncImageWriter() |
| writer.stop() |
| image_array = img_array_factory() |
| fpath = tmp_path / DUMMY_IMAGE |
| writer.save_image(image_array, fpath) |
| time.sleep(1) |
| assert not fpath.exists() |
|
|
|
|
| def test_stop(): |
| writer = AsyncImageWriter(num_processes=0, num_threads=2) |
| writer.stop() |
| assert not any(t.is_alive() for t in writer.threads) |
|
|
|
|
| def test_stop_multiprocessing(): |
| writer = AsyncImageWriter(num_processes=2, num_threads=2) |
| writer.stop() |
| assert not any(p.is_alive() for p in writer.processes) |
|
|
|
|
| def test_multiple_stops(): |
| writer = AsyncImageWriter() |
| writer.stop() |
| writer.stop() |
| assert not any(t.is_alive() for t in writer.threads) |
|
|
|
|
| def test_multiple_stops_multiprocessing(): |
| writer = AsyncImageWriter(num_processes=2, num_threads=2) |
| writer.stop() |
| writer.stop() |
| assert not any(t.is_alive() for t in writer.threads) |
|
|
|
|
| def test_wait_until_done(tmp_path, img_array_factory): |
| writer = AsyncImageWriter(num_processes=0, num_threads=4) |
| try: |
| num_images = 100 |
| image_arrays = [img_array_factory(height=500, width=500) for _ in range(num_images)] |
| fpaths = [tmp_path / f"frame_{i:06d}.png" for i in range(num_images)] |
| for image_array, fpath in zip(image_arrays, fpaths, strict=True): |
| fpath.parent.mkdir(parents=True, exist_ok=True) |
| writer.save_image(image_array, fpath) |
| writer.wait_until_done() |
| for i, fpath in enumerate(fpaths): |
| assert fpath.exists() |
| saved_image = np.array(Image.open(fpath)) |
| assert np.array_equal(saved_image, image_arrays[i]) |
| finally: |
| writer.stop() |
|
|
|
|
| def test_wait_until_done_multiprocessing(tmp_path, img_array_factory): |
| writer = AsyncImageWriter(num_processes=2, num_threads=2) |
| try: |
| num_images = 100 |
| image_arrays = [img_array_factory() for _ in range(num_images)] |
| fpaths = [tmp_path / f"frame_{i:06d}.png" for i in range(num_images)] |
| for image_array, fpath in zip(image_arrays, fpaths, strict=True): |
| fpath.parent.mkdir(parents=True, exist_ok=True) |
| writer.save_image(image_array, fpath) |
| writer.wait_until_done() |
| for i, fpath in enumerate(fpaths): |
| assert fpath.exists() |
| saved_image = np.array(Image.open(fpath)) |
| assert np.array_equal(saved_image, image_arrays[i]) |
| finally: |
| writer.stop() |
|
|
|
|
| def test_exception_handling(tmp_path, img_array_factory): |
| writer = AsyncImageWriter() |
| try: |
| image_array = img_array_factory() |
| with ( |
| patch.object(writer.queue, "put", side_effect=queue.Full("Queue is full")), |
| pytest.raises(queue.Full) as exc_info, |
| ): |
| writer.save_image(image_array, tmp_path / "test.png") |
| assert str(exc_info.value) == "Queue is full" |
| finally: |
| writer.stop() |
|
|
|
|
| def test_with_different_image_formats(tmp_path, img_array_factory): |
| writer = AsyncImageWriter() |
| try: |
| image_array = img_array_factory() |
| formats = ["png", "jpeg", "bmp"] |
| for fmt in formats: |
| fpath = tmp_path / f"test_image.{fmt}" |
| write_image(image_array, fpath) |
| assert fpath.exists() |
| finally: |
| writer.stop() |
|
|
|
|
| def test_safe_stop_image_writer_decorator(): |
| class MockDataset: |
| def __init__(self): |
| self.image_writer = MagicMock(spec=AsyncImageWriter) |
|
|
| @safe_stop_image_writer |
| def function_that_raises_exception(dataset=None): |
| raise Exception("Test exception") |
|
|
| dataset = MockDataset() |
|
|
| with pytest.raises(Exception) as exc_info: |
| function_that_raises_exception(dataset=dataset) |
|
|
| assert str(exc_info.value) == "Test exception" |
| dataset.image_writer.stop.assert_called_once() |
|
|
|
|
| def test_main_process_time(tmp_path, img_tensor_factory): |
| writer = AsyncImageWriter() |
| try: |
| image_tensor = img_tensor_factory() |
| fpath = tmp_path / DUMMY_IMAGE |
| start_time = time.perf_counter() |
| writer.save_image(image_tensor, fpath) |
| end_time = time.perf_counter() |
| time_spent = end_time - start_time |
| |
| assert time_spent < 0.01, f"Main process time exceeded threshold: {time_spent}s" |
| writer.wait_until_done() |
| assert fpath.exists() |
| finally: |
| writer.stop() |
|
|