iLOVE2D's picture
Upload 2846 files
5374a2d verified
from typing import Dict, Optional, List
from ...tool import Tool
from ...storage_handler import FileStorageHandler, LocalStorageHandler
from .openai_utils import create_openai_client
class OpenAIImageEditTool(Tool):
name: str = "openai_image_edit"
description: str = "Edit images using OpenAI gpt-image-1 (direct, minimal validation)."
inputs: Dict[str, Dict[str, str]] = {
"prompt": {"type": "string", "description": "Edit instruction. Required."},
"images": {"type": "array", "description": "Image path(s) png/webp/jpg <50MB. Required. Single string accepted and normalized to array."},
"mask_path": {"type": "string", "description": "Optional PNG mask path (same size as first image)."},
"size": {"type": "string", "description": "1024x1024 | 1536x1024 | 1024x1536 | auto"},
"n": {"type": "integer", "description": "1-10"},
"background": {"type": "string", "description": "transparent | opaque | auto"},
"input_fidelity": {"type": "string", "description": "high | low"},
"output_compression": {"type": "integer", "description": "0-100 for jpeg/webp"},
"output_format": {"type": "string", "description": "png | jpeg | webp (default png)"},
"partial_images": {"type": "integer", "description": "0-3 partial streaming"},
"quality": {"type": "string", "description": "auto | high | medium | low"},
"stream": {"type": "boolean", "description": "streaming mode"},
"image_name": {"type": "string", "description": "Optional output base name"},
}
required: Optional[List[str]] = ["prompt", "images"]
def __init__(self, api_key: str, organization_id: str = None, save_path: str = "./edited_images",
storage_handler: Optional[FileStorageHandler] = None):
super().__init__()
self.api_key = api_key
self.organization_id = organization_id
self.save_path = save_path
self.storage_handler = storage_handler or LocalStorageHandler(base_path=save_path)
def __call__(
self,
prompt: str,
images: list,
mask_path: str = None,
size: str = None,
n: int = None,
background: str = None,
input_fidelity: str = None,
output_compression: int = None,
output_format: str = None,
partial_images: int = None,
quality: str = None,
stream: bool = None,
image_name: str = None,
):
try:
client = create_openai_client(self.api_key, self.organization_id)
# Accept either list[str] or a single string at runtime
if isinstance(images, str):
image_paths = [images]
else:
image_paths = list(images)
opened_images = []
temp_paths = []
mask_fh = None
try:
# ensure compatibility and open files using storage handler
for p in image_paths:
use_path, tmp = self._ensure_image_edit_compatible(p)
if tmp:
temp_paths.append(tmp)
opened_images.append(open(use_path, "rb"))
api_kwargs = {
"model": "gpt-image-1",
"prompt": prompt,
"image": opened_images if len(opened_images) > 1 else opened_images[0],
}
if size is not None:
api_kwargs["size"] = size
if n is not None:
api_kwargs["n"] = n
if background is not None:
api_kwargs["background"] = background
if input_fidelity is not None:
api_kwargs["input_fidelity"] = input_fidelity
if output_compression is not None:
api_kwargs["output_compression"] = output_compression
if output_format is not None:
api_kwargs["output_format"] = output_format
if partial_images is not None:
api_kwargs["partial_images"] = partial_images
if quality is not None:
api_kwargs["quality"] = quality
if stream is not None:
api_kwargs["stream"] = stream
if mask_path:
mask_fh = open(mask_path, "rb")
api_kwargs["mask"] = mask_fh
response = client.images.edit(**api_kwargs)
finally:
for fh in opened_images:
try:
fh.close()
except Exception:
pass
if mask_fh:
try:
mask_fh.close()
except Exception:
pass
# cleanup temps
import os
for tp in temp_paths:
try:
if tp and os.path.exists(tp):
os.remove(tp)
except Exception:
pass
# Save base64 images using storage handler
import base64
import time
results = []
for i, img in enumerate(response.data):
try:
img_bytes = base64.b64decode(img.b64_json)
ts = int(time.time())
if image_name:
filename = f"{image_name.rsplit('.', 1)[0]}_{i+1}.png"
else:
filename = f"image_edit_{ts}_{i+1}.png"
# Save using storage handler
result = self.storage_handler.save(filename, img_bytes)
if result["success"]:
# Return the translated path that was actually used for saving
translated_path = self.storage_handler.translate_in(filename)
results.append(translated_path)
else:
results.append(f"Error saving image {i+1}: {result.get('error', 'Unknown error')}")
except Exception as e:
results.append(f"Error saving image {i+1}: {e}")
return {"results": results, "count": len(results)}
except Exception as e:
return {"error": f"gpt-image-1 editing failed: {e}"}
def _ensure_image_edit_compatible(self, image_path: str) -> tuple[str, str | None]:
"""
Ensure the image matches OpenAI edit requirements using storage handler.
If not, convert to RGBA and save to a temporary path. Return (usable_path, temp_path).
Caller may delete temp_path after the request completes.
"""
try:
from PIL import Image
from io import BytesIO
import os
# Use storage handler to read the image
result = self.storage_handler.read(image_path)
if not result["success"]:
raise FileNotFoundError(f"Could not read image {image_path}: {result.get('error', 'Unknown error')}")
# Get image content as bytes
if isinstance(result["content"], bytes):
content = result["content"]
else:
# If content is not bytes, convert to bytes
content = str(result["content"]).encode('utf-8')
# Open image from bytes
with Image.open(BytesIO(content)) as img:
if img.mode in ("RGBA", "LA", "L"):
# Image is already compatible, return the translated path
translated_path = self.storage_handler.translate_in(image_path)
return translated_path, None
# Convert to RGBA
rgba_img = img.convert("RGBA")
# Save to temporary file using storage handler
temp_filename = f"temp_rgba_{hash(image_path) % 10000}.png"
buffer = BytesIO()
rgba_img.save(buffer, format='PNG')
temp_content = buffer.getvalue()
# Save using storage handler
result = self.storage_handler.save(temp_filename, temp_content)
if result["success"]:
temp_path = self.storage_handler.translate_in(temp_filename)
return temp_path, temp_path
else:
# Fallback to direct file I/O if storage handler fails
temp_path = os.path.join("workplace", "images", "temp_rgba_image.png")
os.makedirs(os.path.dirname(temp_path), exist_ok=True)
rgba_img.save(temp_path)
return temp_path, temp_path
except Exception:
# On error, return the translated path and let the caller decide
translated_path = self.storage_handler.translate_in(image_path)
return translated_path, None