File size: 14,581 Bytes
991a517
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
import logging
from typing import Optional, Tuple

import cv2
import numpy as np
import torch
from PIL import Image, ImageFilter

from transformers import AutoImageProcessor, AutoModelForDepthEstimation
from transformers import DPTImageProcessor, DPTForDepthEstimation

logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)


class ControlImageProcessor:
    """
    Generates control images for ControlNet conditioning.

    Supports Canny edge detection and depth map estimation with
    mask-aware processing for selective structure preservation.

    Attributes:
        device: Computation device (cuda/mps/cpu)
        canny_low_threshold: Low threshold for Canny edge detection
        canny_high_threshold: High threshold for Canny edge detection

    Example:
        >>> processor = ControlImageProcessor(device="cuda")
        >>> canny_image = processor.generate_canny_edges(image)
        >>> depth_map = processor.generate_depth_map(image)
    """

    # Depth model identifiers
    DEPTH_MODEL_PRIMARY = "LiheYoung/depth-anything-small-hf"
    DEPTH_MODEL_FALLBACK = "Intel/dpt-hybrid-midas"

    def __init__(
        self,
        device: str = "cuda",
        canny_low_threshold: int = 100,
        canny_high_threshold: int = 200
    ):
        """
        Initialize the ControlImageProcessor.

        Parameters
        ----------
        device : str
            Computation device
        canny_low_threshold : int
            Low threshold for Canny edge detection
        canny_high_threshold : int
            High threshold for Canny edge detection
        """
        self.device = device
        self.canny_low_threshold = canny_low_threshold
        self.canny_high_threshold = canny_high_threshold

        # Depth estimation models (lazy loaded)
        self._depth_estimator = None
        self._depth_processor = None
        self._depth_model_loaded = False

        logger.info(f"ControlImageProcessor initialized on {device}")

    def generate_canny_edges(self, image: np.ndarray) -> Image.Image:
        """
        Generate Canny edge detection image.

        Parameters
        ----------
        image : np.ndarray
            Input image as numpy array (RGB)

        Returns
        -------
        PIL.Image
            Canny edge image (grayscale)
        """
        # Convert to grayscale
        if len(image.shape) == 3:
            gray = cv2.cvtColor(image, cv2.COLOR_RGB2GRAY)
        else:
            gray = image

        # Apply Gaussian blur to reduce noise
        blurred = cv2.GaussianBlur(gray, (5, 5), 1.4)

        # Canny edge detection
        edges = cv2.Canny(
            blurred,
            self.canny_low_threshold,
            self.canny_high_threshold
        )

        # Convert to 3-channel for ControlNet
        edges_3ch = cv2.cvtColor(edges, cv2.COLOR_GRAY2RGB)

        return Image.fromarray(edges_3ch)

    def load_depth_estimator(self) -> bool:
        """
        Load depth estimation model.

        Returns
        -------
        bool
            True if loaded successfully
        """
        if self._depth_model_loaded:
            return True

        logger.info("Loading depth estimation model...")

        try:
            # Try primary model first (Depth Anything)
            self._depth_processor = AutoImageProcessor.from_pretrained(
                self.DEPTH_MODEL_PRIMARY
            )
            self._depth_estimator = AutoModelForDepthEstimation.from_pretrained(
                self.DEPTH_MODEL_PRIMARY,
                torch_dtype=torch.float16 if self.device == "cuda" else torch.float32
            )
            self._depth_estimator = self._depth_estimator.to(self.device)
            self._depth_estimator.eval()
            self._depth_model_loaded = True
            logger.info(f"Loaded depth model: {self.DEPTH_MODEL_PRIMARY}")
            return True

        except Exception as e:
            logger.warning(f"Primary depth model failed: {e}, trying fallback...")

            try:
                # Fallback to DPT
                self._depth_processor = DPTImageProcessor.from_pretrained(
                    self.DEPTH_MODEL_FALLBACK
                )
                self._depth_estimator = DPTForDepthEstimation.from_pretrained(
                    self.DEPTH_MODEL_FALLBACK,
                    torch_dtype=torch.float16 if self.device == "cuda" else torch.float32
                )
                self._depth_estimator = self._depth_estimator.to(self.device)
                self._depth_estimator.eval()
                self._depth_model_loaded = True
                logger.info(f"Loaded fallback depth model: {self.DEPTH_MODEL_FALLBACK}")
                return True

            except Exception as e2:
                logger.error(f"All depth models failed: {e2}")
                return False

    def generate_depth_map(self, image: Image.Image) -> Image.Image:
        """
        Generate depth map using depth estimation model.

        Parameters
        ----------
        image : PIL.Image
            Input image

        Returns
        -------
        PIL.Image
            Depth map image (grayscale, normalized)
        """
        if not self._depth_model_loaded:
            if not self.load_depth_estimator():
                # Fallback to simple gradient
                logger.warning("Using fallback gradient depth")
                return self._generate_fallback_depth(image)

        try:
            # Prepare image for model
            inputs = self._depth_processor(
                images=image,
                return_tensors="pt"
            )
            inputs = {k: v.to(self.device) for k, v in inputs.items()}

            # Run inference
            with torch.no_grad():
                outputs = self._depth_estimator(**inputs)
                predicted_depth = outputs.predicted_depth

            # Normalize depth map
            depth = predicted_depth.squeeze().cpu().numpy()
            depth = (depth - depth.min()) / (depth.max() - depth.min() + 1e-8)
            depth = (depth * 255).astype(np.uint8)

            # Resize to match input
            depth_image = Image.fromarray(depth)
            depth_image = depth_image.resize(image.size, Image.Resampling.BILINEAR)

            # Convert to 3-channel for ControlNet
            depth_3ch = np.stack([np.array(depth_image)] * 3, axis=-1)

            return Image.fromarray(depth_3ch)

        except Exception as e:
            logger.error(f"Depth estimation failed: {e}")
            return self._generate_fallback_depth(image)

    def _generate_fallback_depth(self, image: Image.Image) -> Image.Image:
        """
        Generate a simple fallback depth map using gradient.

        Parameters
        ----------
        image : PIL.Image
            Input image

        Returns
        -------
        PIL.Image
            Simple gradient depth map
        """
        w, h = image.size
        # Create vertical gradient (top = far, bottom = near)
        gradient = np.linspace(50, 200, h).reshape(-1, 1)
        gradient = np.tile(gradient, (1, w))
        gradient = gradient.astype(np.uint8)

        # Stack to 3 channels
        depth_3ch = np.stack([gradient] * 3, axis=-1)
        return Image.fromarray(depth_3ch)

    def prepare_control_image(
        self,
        image: Image.Image,
        mode: str = "canny",
        mask: Optional[Image.Image] = None,
        preserve_structure: bool = False,
        edge_guidance_mode: str = "boundary"
    ) -> Image.Image:
        """
        Generate ControlNet conditioning image.

        Parameters
        ----------
        image : PIL.Image
            Input image
        mode : str
            Conditioning mode: "canny" or "depth"
        mask : PIL.Image, optional
            If provided, can modify edges based on edge_guidance_mode
        preserve_structure : bool
            If True, keep all edges in masked region (for color change tasks)
            If False, use edge_guidance_mode to determine edge handling
        edge_guidance_mode : str
            How to handle edges when preserve_structure=False:
            - "none": Completely remove edges in masked region (removal tasks)
            - "boundary": Keep only boundary edges of masked region (replacement tasks)
            - "soft": Gradually fade edges from boundary (default for better blending)

        Returns
        -------
        PIL.Image
            Generated control image
        """
        logger.info(f"Preparing control image: mode={mode}, preserve_structure={preserve_structure}, edge_guidance={edge_guidance_mode}")

        # Convert to RGB if needed
        if image.mode != 'RGB':
            image = image.convert('RGB')

        img_array = np.array(image)

        if mode == "canny":
            control_image = self.generate_canny_edges(img_array)

            if mask is not None:
                control_array = np.array(control_image)
                mask_array = np.array(mask.convert('L'))

                if preserve_structure:
                    # Keep all edges - no modification needed
                    logger.info("Preserving all edges in masked region for color change")

                elif edge_guidance_mode == "none":
                    # Completely suppress edges in masked region (for removal)
                    mask_region = mask_array > 128
                    control_array[mask_region] = 0
                    logger.info("Suppressed all edges in masked region for removal")

                elif edge_guidance_mode == "mask_outline":
                    # For object replacement: clear inside edges, draw clear mask outline
                    # Outline guides WHERE and WHAT SIZE the new object should be
                    mask_binary = (mask_array > 128).astype(np.uint8) * 255

                    # Step 1: Clear all edges inside the mask
                    mask_region = mask_array > 128
                    control_array[mask_region] = 0

                    # Step 2: Draw clear mask outline for position/size guidance
                    contours, _ = cv2.findContours(
                        mask_binary,
                        cv2.RETR_EXTERNAL,
                        cv2.CHAIN_APPROX_SIMPLE
                    )

                    if contours:
                        # Draw visible white outline (thickness=2) for clear guidance
                        cv2.drawContours(control_array, contours, -1, (255, 255, 255), thickness=2)
                        logger.info(f"Drew {len(contours)} mask outline(s) for placement guidance")

                elif edge_guidance_mode == "boundary":
                    # Keep boundary edges to guide object placement and size
                    # This helps ControlNet understand WHERE to place the new object
                    mask_binary = (mask_array > 128).astype(np.uint8) * 255

                    # Create boundary mask using morphological operations
                    kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (15, 15))
                    dilated = cv2.dilate(mask_binary, kernel, iterations=1)
                    eroded = cv2.erode(mask_binary, kernel, iterations=1)
                    boundary = dilated - eroded

                    # Inner region (not boundary) - suppress edges
                    inner_region = (mask_array > 128) & (boundary == 0)
                    control_array[inner_region] = 0

                    # Keep boundary edges intact for object placement guidance
                    logger.info("Keeping boundary edges for object replacement guidance")

                elif edge_guidance_mode == "soft":
                    # Soft fade: gradually reduce edges from boundary to center
                    mask_binary = (mask_array > 128).astype(np.uint8) * 255

                    # Calculate distance from boundary
                    dist_transform = cv2.distanceTransform(mask_binary, cv2.DIST_L2, 5)
                    max_dist = dist_transform.max()
                    if max_dist > 0:
                        # Normalize and invert: 1 at boundary, 0 at center
                        fade_factor = 1 - (dist_transform / max_dist)
                        fade_factor = np.clip(fade_factor, 0, 1)

                        # Apply fade to masked region only
                        mask_region = mask_array > 128
                        for c in range(3):
                            control_array[:, :, c][mask_region] = (
                                control_array[:, :, c][mask_region] * fade_factor[mask_region]
                            ).astype(np.uint8)

                    logger.info("Applied soft edge fading in masked region")

                control_image = Image.fromarray(control_array)

            return control_image

        elif mode == "depth":
            control_image = self.generate_depth_map(image)

            # For depth mode with replacement, we want to keep depth info for context
            # but allow flexibility in the masked region
            if mask is not None and not preserve_structure:
                control_array = np.array(control_image)
                mask_array = np.array(mask.convert('L'))

                # Smooth the depth in masked region using surrounding context
                if edge_guidance_mode in ["boundary", "soft"]:
                    mask_binary = (mask_array > 128).astype(np.uint8)

                    # Inpaint the depth map in masked region using surrounding values
                    depth_gray = control_array[:, :, 0]
                    inpainted_depth = cv2.inpaint(
                        depth_gray,
                        mask_binary,
                        inpaintRadius=10,
                        flags=cv2.INPAINT_TELEA
                    )
                    control_array = np.stack([inpainted_depth] * 3, axis=-1)
                    logger.info("Inpainted depth map in masked region")

                control_image = Image.fromarray(control_array)

            return control_image

        else:
            raise ValueError(f"Unknown control mode: {mode}")

    def unload_depth_model(self) -> None:
        """Unload depth estimation model to free memory."""
        if self._depth_estimator is not None:
            del self._depth_estimator
            self._depth_estimator = None

        if self._depth_processor is not None:
            del self._depth_processor
            self._depth_processor = None

        self._depth_model_loaded = False
        logger.info("Depth model unloaded")