File size: 6,613 Bytes
da23dfe
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
"""
Utility Functions
=================

Helper functions for image processing and file operations.
"""

import re
import logging
from pathlib import Path
from typing import Optional, Union
from datetime import datetime
from PIL import Image


logger = logging.getLogger(__name__)


def ensure_pil_image(
    obj: Union[Image.Image, str, Path, None],
    context: str = ""
) -> Image.Image:
    """
    Ensure object is a PIL Image.

    Args:
        obj: Image, path, or None
        context: Context for error messages

    Returns:
        PIL Image

    Raises:
        ValueError: If object cannot be converted to Image
    """
    if obj is None:
        raise ValueError(f"[{context}] Image is None")

    if isinstance(obj, Image.Image):
        return obj

    if isinstance(obj, (str, Path)):
        try:
            return Image.open(obj)
        except Exception as e:
            raise ValueError(f"[{context}] Failed to load image from path: {e}")

    raise ValueError(f"[{context}] Unsupported image type: {type(obj)}")


def sanitize_filename(name: str) -> str:
    """
    Sanitize string for use as filename.

    Args:
        name: Original name

    Returns:
        Safe filename string
    """
    # Replace problematic characters
    safe_name = re.sub(r'[<>:"/\\|?*]', '_', name)
    # Remove leading/trailing spaces and dots
    safe_name = safe_name.strip('. ')
    # Limit length
    if len(safe_name) > 100:
        safe_name = safe_name[:100]
    return safe_name or "unnamed"


def save_image(
    image: Image.Image,
    directory: Path,
    base_name: str,
    format: str = "PNG"
) -> Path:
    """
    Save image to directory.

    Args:
        image: PIL Image to save
        directory: Output directory
        base_name: Base filename (without extension)
        format: Image format

    Returns:
        Path to saved file
    """
    directory = Path(directory)
    directory.mkdir(parents=True, exist_ok=True)

    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    safe_name = sanitize_filename(base_name)
    ext = format.lower()

    filename = f"{safe_name}_{timestamp}.{ext}"
    filepath = directory / filename

    image.save(filepath, format=format)
    logger.info(f"Saved: {filepath}")

    return filepath


def resize_for_display(
    image: Image.Image,
    max_size: int = 1024
) -> Image.Image:
    """
    Resize image for display while maintaining aspect ratio.

    Args:
        image: PIL Image
        max_size: Maximum dimension

    Returns:
        Resized image
    """
    width, height = image.size

    if width <= max_size and height <= max_size:
        return image

    if width > height:
        new_width = max_size
        new_height = int(height * max_size / width)
    else:
        new_height = max_size
        new_width = int(width * max_size / height)

    return image.resize((new_width, new_height), Image.Resampling.LANCZOS)


def get_image_info(image: Image.Image) -> str:
    """Get human-readable image info string."""
    return f"{image.size[0]}x{image.size[1]} {image.mode}"


def preprocess_input_image(
    image: Image.Image,
    max_size: int = 1024,
    target_size: tuple = None,
    ensure_rgb: bool = True
) -> Image.Image:
    """
    Preprocess input image for model consumption.

    Handles various formats (JFIF, TIFF, WebP, etc.) by converting to RGB PNG-compatible format.

    Args:
        image: PIL Image to preprocess
        max_size: Maximum dimension (used if target_size not specified)
        target_size: Specific (width, height) to resize to
        ensure_rgb: Convert to RGB mode

    Returns:
        Preprocessed PIL Image in RGB format
    """
    # Ensure we have a copy to avoid modifying original
    img = image.copy()

    # Force re-encode as PNG-compatible by saving to memory and reloading
    # This handles weird formats like JFIF, TIFF, etc.
    import io
    buf = io.BytesIO()

    # Convert to RGB first if needed
    if img.mode not in ('RGB', 'RGBA'):
        img = img.convert('RGB')

    # Save as PNG to buffer and reload - this normalizes the format
    img.save(buf, format='PNG')
    buf.seek(0)
    img = Image.open(buf)
    img.load()  # Force load into memory

    # Convert to RGB if needed (handle RGBA)
    if ensure_rgb and img.mode != 'RGB':
        if img.mode == 'RGBA':
            # Handle transparency by compositing on white background
            background = Image.new('RGB', img.size, (255, 255, 255))
            background.paste(img, mask=img.split()[3])
            img = background
        else:
            img = img.convert('RGB')

    # Resize to target size or max_size
    if target_size:
        img = img.resize(target_size, Image.Resampling.LANCZOS)
    else:
        width, height = img.size
        if width > max_size or height > max_size:
            if width > height:
                new_width = max_size
                new_height = int(height * max_size / width)
            else:
                new_height = max_size
                new_width = int(width * max_size / height)
            img = img.resize((new_width, new_height), Image.Resampling.LANCZOS)

    return img


def preprocess_images_for_backend(
    images: list,
    backend_type: str,
    aspect_ratio: str = "1:1"
) -> list:
    """
    Preprocess a list of images for a specific backend.

    Args:
        images: List of PIL Images
        backend_type: Backend type string (e.g., 'flux_klein', 'qwen_comfyui')
        aspect_ratio: Target aspect ratio

    Returns:
        List of preprocessed PIL Images
    """
    if not images:
        return images

    # Backend-specific settings
    # FLUX models work best with smaller input images (512-768px)
    backend_configs = {
        'flux_klein': {'max_size': 768},           # 4B - faster with smaller inputs
        'flux_klein_9b_fp8': {'max_size': 768},    # 9B - same, quality comes from model not input size
        'qwen_image_edit': {'max_size': 1024},
        'qwen_comfyui': {'max_size': 1024},
        'zimage_turbo': {'max_size': 768},
        'zimage_base': {'max_size': 768},
        'longcat_edit': {'max_size': 768},
        'gemini_flash': {'max_size': 1024},        # Gemini handles larger but 1024 is fine
        'gemini_pro': {'max_size': 1024},
    }

    config = backend_configs.get(backend_type, {'max_size': 1024})
    max_size = config['max_size']

    processed = []
    for img in images:
        if img is not None:
            processed.append(preprocess_input_image(img, max_size=max_size))
        else:
            processed.append(None)

    return processed