File size: 3,610 Bytes
1c77735
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
import pytest
import io
import asyncio
import numpy as np
from PIL import Image

from app.preprocessing.base import PreprocessingContext
from app.preprocessing.steps.s01_validate import ValidateStep
from app.preprocessing.steps.s02_decode import DecodeStep
from app.preprocessing.steps.s03_resize import ResizeStep
from app.preprocessing.steps.s04_color_convert import ColorConvertStep
from app.preprocessing.steps.s05_normalize import NormalizeStep
from app.preprocessing.steps.s06_patch import PatchStep
from app.preprocessing.steps.s08_tensorize import TensorizeStep


def make_jpeg_bytes(width=100, height=100) -> bytes:
    img = Image.new("RGB", (width, height), color=(128, 64, 32))
    buf = io.BytesIO()
    img.save(buf, format="JPEG")
    return buf.getvalue()


@pytest.mark.asyncio
async def test_validate_step_valid():
    step = ValidateStep()
    ctx = PreprocessingContext(raw_bytes=make_jpeg_bytes())
    ctx = await step.process(ctx, {"max_file_size_mb": 20, "allowed_formats": ["jpeg", "png"]})
    assert ctx.step_outputs["validate"]["valid"] is True
    assert ctx.step_outputs["validate"]["format"] == "jpeg"


@pytest.mark.asyncio
async def test_decode_step():
    step = DecodeStep()
    ctx = PreprocessingContext(raw_bytes=make_jpeg_bytes(50, 50))
    validate = ValidateStep()
    ctx = await validate.process(ctx, {"max_file_size_mb": 20, "allowed_formats": ["jpeg"]})
    ctx = await step.process(ctx, {})
    assert ctx.image is not None
    assert ctx.step_outputs["decode"]["width"] == 50
    assert ctx.step_outputs["decode"]["height"] == 50


@pytest.mark.asyncio
async def test_resize_step():
    validate = ValidateStep()
    decode = DecodeStep()
    resize = ResizeStep()
    ctx = PreprocessingContext(raw_bytes=make_jpeg_bytes(200, 100))
    ctx = await validate.process(ctx, {"max_file_size_mb": 20, "allowed_formats": ["jpeg"]})
    ctx = await decode.process(ctx, {})
    ctx = await resize.process(ctx, {"target_width": 224, "target_height": 224, "method": "bilinear", "keep_aspect_ratio": True})
    assert ctx.image.size == (224, 224)


@pytest.mark.asyncio
async def test_normalize_step():
    validate = ValidateStep()
    decode = DecodeStep()
    normalize = NormalizeStep()
    ctx = PreprocessingContext(raw_bytes=make_jpeg_bytes(32, 32))
    ctx = await validate.process(ctx, {"max_file_size_mb": 20, "allowed_formats": ["jpeg"]})
    ctx = await decode.process(ctx, {})
    ctx = await normalize.process(ctx, {"method": "minmax"})
    assert ctx.image_array is not None
    assert ctx.image_array.max() <= 1.0 + 1e-6


@pytest.mark.asyncio
async def test_patch_step():
    from PIL import Image as PILImage
    ctx = PreprocessingContext(raw_bytes=b"x")
    ctx.image = PILImage.new("RGB", (1280, 1280), (128, 128, 128))
    step = PatchStep()
    ctx = await step.process(ctx, {"patch_size": 640, "overlap": 128, "pad_incomplete": True})
    # 1280px with 640 patch, 128 overlap → stride=512 → tiles at x=0, x=512, x=640 → 3 columns = 9 tiles
    assert ctx.metadata["num_tiles"] > 0
    assert len(ctx.tiles) == ctx.metadata["num_tiles"]
    assert len(ctx.tile_offsets) == ctx.metadata["num_tiles"]
    assert all(t.size == (640, 640) for t in ctx.tiles)


@pytest.mark.asyncio
async def test_tensorize_from_patches():
    ctx = PreprocessingContext(raw_bytes=b"x")
    ctx.patches = [np.zeros(768, dtype=np.float32) for _ in range(196)]
    step = TensorizeStep()
    ctx = await step.process(ctx, {"dtype": "float32", "add_batch_dim": True, "channel_first": True})
    assert ctx.tensor is not None
    assert ctx.tensor.shape == (1, 196, 768)