File size: 2,516 Bytes
1c77735
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
import numpy as np
from app.preprocessing.base import PreprocessingStep, PreprocessingContext


class AugmentStep(PreprocessingStep):
    name = "augment"
    description = "Optional augmentations (flip, rotate, brightness, contrast) — disabled by default for inference"
    version = "1.0.0"
    order = 7
    enabled = False
    required = False

    async def process(self, ctx: PreprocessingContext, params: dict) -> PreprocessingContext:
        applied = []
        skipped = []

        if ctx.image_array is None:
            ctx.step_outputs["augment"] = {"applied": [], "skipped": ["all"], "reason": "no image_array"}
            return ctx

        arr = ctx.image_array

        if params.get("horizontal_flip", False):
            arr = np.fliplr(arr)
            applied.append("horizontal_flip")
        else:
            skipped.append("horizontal_flip")

        if params.get("vertical_flip", False):
            arr = np.flipud(arr)
            applied.append("vertical_flip")
        else:
            skipped.append("vertical_flip")

        rotation = params.get("rotation_degrees", 0)
        if rotation != 0:
            from PIL import Image
            import io
            # Rotate via PIL for simplicity
            pil_img = ctx.image
            if pil_img is not None:
                pil_img = pil_img.rotate(rotation, expand=False)
                ctx.image = pil_img
                arr = np.array(pil_img, dtype=np.float32)
            applied.append(f"rotation_{rotation}deg")
        else:
            skipped.append("rotation")

        brightness_range = params.get("brightness_range", [1.0, 1.0])
        if brightness_range[0] != 1.0 or brightness_range[1] != 1.0:
            factor = np.random.uniform(brightness_range[0], brightness_range[1])
            arr = np.clip(arr * factor, arr.min(), arr.max())
            applied.append(f"brightness_{factor:.2f}")
        else:
            skipped.append("brightness")

        contrast_range = params.get("contrast_range", [1.0, 1.0])
        if contrast_range[0] != 1.0 or contrast_range[1] != 1.0:
            factor = np.random.uniform(contrast_range[0], contrast_range[1])
            mean = arr.mean()
            arr = np.clip((arr - mean) * factor + mean, arr.min(), arr.max())
            applied.append(f"contrast_{factor:.2f}")
        else:
            skipped.append("contrast")

        ctx.image_array = arr
        ctx.step_outputs["augment"] = {"applied": applied, "skipped": skipped}

        return ctx