File size: 9,185 Bytes
5374a2d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
from typing import Dict, Optional, List
from ...tool import Tool
from ...storage_handler import FileStorageHandler, LocalStorageHandler
from .openai_utils import create_openai_client


class OpenAIImageEditTool(Tool):
    name: str = "openai_image_edit"
    description: str = "Edit images using OpenAI gpt-image-1 (direct, minimal validation)."

    inputs: Dict[str, Dict[str, str]] = {
        "prompt": {"type": "string", "description": "Edit instruction. Required."},
        "images": {"type": "array", "description": "Image path(s) png/webp/jpg <50MB. Required. Single string accepted and normalized to array."},
        "mask_path": {"type": "string", "description": "Optional PNG mask path (same size as first image)."},
        "size": {"type": "string", "description": "1024x1024 | 1536x1024 | 1024x1536 | auto"},
        "n": {"type": "integer", "description": "1-10"},
        "background": {"type": "string", "description": "transparent | opaque | auto"},
        "input_fidelity": {"type": "string", "description": "high | low"},
        "output_compression": {"type": "integer", "description": "0-100 for jpeg/webp"},
        "output_format": {"type": "string", "description": "png | jpeg | webp (default png)"},
        "partial_images": {"type": "integer", "description": "0-3 partial streaming"},
        "quality": {"type": "string", "description": "auto | high | medium | low"},
        "stream": {"type": "boolean", "description": "streaming mode"},
        "image_name": {"type": "string", "description": "Optional output base name"},
    }
    required: Optional[List[str]] = ["prompt", "images"]

    def __init__(self, api_key: str, organization_id: str = None, save_path: str = "./edited_images", 
                 storage_handler: Optional[FileStorageHandler] = None):
        super().__init__()
        self.api_key = api_key
        self.organization_id = organization_id
        self.save_path = save_path
        self.storage_handler = storage_handler or LocalStorageHandler(base_path=save_path)

    def __call__(
        self,
        prompt: str,
        images: list,
        mask_path: str = None,
        size: str = None,
        n: int = None,
        background: str = None,
        input_fidelity: str = None,
        output_compression: int = None,
        output_format: str = None,
        partial_images: int = None,
        quality: str = None,
        stream: bool = None,
        image_name: str = None,
    ):
        try:
            client = create_openai_client(self.api_key, self.organization_id)

            # Accept either list[str] or a single string at runtime
            if isinstance(images, str):
                image_paths = [images]
            else:
                image_paths = list(images)

            opened_images = []
            temp_paths = []
            mask_fh = None
            try:
            # ensure compatibility and open files using storage handler
                for p in image_paths:
                    use_path, tmp = self._ensure_image_edit_compatible(p)
                    if tmp:
                        temp_paths.append(tmp)
                    opened_images.append(open(use_path, "rb"))

                api_kwargs = {
                    "model": "gpt-image-1",
                    "prompt": prompt,
                    "image": opened_images if len(opened_images) > 1 else opened_images[0],
                }
                if size is not None:
                    api_kwargs["size"] = size
                if n is not None:
                    api_kwargs["n"] = n
                if background is not None:
                    api_kwargs["background"] = background
                if input_fidelity is not None:
                    api_kwargs["input_fidelity"] = input_fidelity
                if output_compression is not None:
                    api_kwargs["output_compression"] = output_compression
                if output_format is not None:
                    api_kwargs["output_format"] = output_format
                if partial_images is not None:
                    api_kwargs["partial_images"] = partial_images
                if quality is not None:
                    api_kwargs["quality"] = quality
                if stream is not None:
                    api_kwargs["stream"] = stream

                if mask_path:
                    mask_fh = open(mask_path, "rb")
                    api_kwargs["mask"] = mask_fh

                response = client.images.edit(**api_kwargs)
            finally:
                for fh in opened_images:
                    try:
                        fh.close()
                    except Exception:
                        pass
                if mask_fh:
                    try:
                        mask_fh.close()
                    except Exception:
                        pass
                # cleanup temps
                import os
                for tp in temp_paths:
                    try:
                        if tp and os.path.exists(tp):
                            os.remove(tp)
                    except Exception:
                        pass

            # Save base64 images using storage handler
            import base64
            import time
            results = []
            for i, img in enumerate(response.data):
                try:
                    img_bytes = base64.b64decode(img.b64_json)
                    ts = int(time.time())
                    if image_name:
                        filename = f"{image_name.rsplit('.', 1)[0]}_{i+1}.png"
                    else:
                        filename = f"image_edit_{ts}_{i+1}.png"
                    
                    # Save using storage handler
                    result = self.storage_handler.save(filename, img_bytes)
                    
                    if result["success"]:
                        # Return the translated path that was actually used for saving
                        translated_path = self.storage_handler.translate_in(filename)
                        results.append(translated_path)
                    else:
                        results.append(f"Error saving image {i+1}: {result.get('error', 'Unknown error')}")
                except Exception as e:
                    results.append(f"Error saving image {i+1}: {e}")

            return {"results": results, "count": len(results)}
        except Exception as e:
            return {"error": f"gpt-image-1 editing failed: {e}"}
    
    def _ensure_image_edit_compatible(self, image_path: str) -> tuple[str, str | None]:
        """
        Ensure the image matches OpenAI edit requirements using storage handler.
        If not, convert to RGBA and save to a temporary path. Return (usable_path, temp_path).
        Caller may delete temp_path after the request completes.
        """
        try:
            from PIL import Image
            from io import BytesIO
            import os
            
            # Use storage handler to read the image
            result = self.storage_handler.read(image_path)
            if not result["success"]:
                raise FileNotFoundError(f"Could not read image {image_path}: {result.get('error', 'Unknown error')}")
            
            # Get image content as bytes
            if isinstance(result["content"], bytes):
                content = result["content"]
            else:
                # If content is not bytes, convert to bytes
                content = str(result["content"]).encode('utf-8')
            
            # Open image from bytes
            with Image.open(BytesIO(content)) as img:
                if img.mode in ("RGBA", "LA", "L"):
                    # Image is already compatible, return the translated path
                    translated_path = self.storage_handler.translate_in(image_path)
                    return translated_path, None
                
                # Convert to RGBA
                rgba_img = img.convert("RGBA")
                
                # Save to temporary file using storage handler
                temp_filename = f"temp_rgba_{hash(image_path) % 10000}.png"
                buffer = BytesIO()
                rgba_img.save(buffer, format='PNG')
                temp_content = buffer.getvalue()
                
                # Save using storage handler
                result = self.storage_handler.save(temp_filename, temp_content)
                if result["success"]:
                    temp_path = self.storage_handler.translate_in(temp_filename)
                    return temp_path, temp_path
                else:
                    # Fallback to direct file I/O if storage handler fails
                    temp_path = os.path.join("workplace", "images", "temp_rgba_image.png")
                    os.makedirs(os.path.dirname(temp_path), exist_ok=True)
                    rgba_img.save(temp_path)
                    return temp_path, temp_path
                
        except Exception:
            # On error, return the translated path and let the caller decide
            translated_path = self.storage_handler.translate_in(image_path)
            return translated_path, None