| import io
|
| from inspect import cleandoc
|
| import numpy as np
|
| import torch
|
| from PIL import Image
|
|
|
| from comfy.comfy_types.node_typing import IO, ComfyNodeABC, InputTypeDict
|
|
|
|
|
| from comfy_api_nodes.apis import (
|
| OpenAIImageGenerationRequest,
|
| OpenAIImageEditRequest,
|
| OpenAIImageGenerationResponse,
|
| )
|
|
|
| from comfy_api_nodes.apis.client import (
|
| ApiEndpoint,
|
| HttpMethod,
|
| SynchronousOperation,
|
| )
|
|
|
| from comfy_api_nodes.apinode_utils import (
|
| downscale_image_tensor,
|
| validate_and_cast_response,
|
| validate_string,
|
| )
|
|
|
| class OpenAIDalle2(ComfyNodeABC):
|
| """
|
| Generates images synchronously via OpenAI's DALL路E 2 endpoint.
|
| """
|
|
|
| def __init__(self):
|
| pass
|
|
|
| @classmethod
|
| def INPUT_TYPES(cls) -> InputTypeDict:
|
| return {
|
| "required": {
|
| "prompt": (
|
| IO.STRING,
|
| {
|
| "multiline": True,
|
| "default": "",
|
| "tooltip": "Text prompt for DALL路E",
|
| },
|
| ),
|
| },
|
| "optional": {
|
| "seed": (
|
| IO.INT,
|
| {
|
| "default": 0,
|
| "min": 0,
|
| "max": 2**31 - 1,
|
| "step": 1,
|
| "display": "number",
|
| "control_after_generate": True,
|
| "tooltip": "not implemented yet in backend",
|
| },
|
| ),
|
| "size": (
|
| IO.COMBO,
|
| {
|
| "options": ["256x256", "512x512", "1024x1024"],
|
| "default": "1024x1024",
|
| "tooltip": "Image size",
|
| },
|
| ),
|
| "n": (
|
| IO.INT,
|
| {
|
| "default": 1,
|
| "min": 1,
|
| "max": 8,
|
| "step": 1,
|
| "display": "number",
|
| "tooltip": "How many images to generate",
|
| },
|
| ),
|
| "image": (
|
| IO.IMAGE,
|
| {
|
| "default": None,
|
| "tooltip": "Optional reference image for image editing.",
|
| },
|
| ),
|
| "mask": (
|
| IO.MASK,
|
| {
|
| "default": None,
|
| "tooltip": "Optional mask for inpainting (white areas will be replaced)",
|
| },
|
| ),
|
| },
|
| "hidden": {
|
| "auth_token": "AUTH_TOKEN_COMFY_ORG",
|
| "comfy_api_key": "API_KEY_COMFY_ORG",
|
| "unique_id": "UNIQUE_ID",
|
| },
|
| }
|
|
|
| RETURN_TYPES = (IO.IMAGE,)
|
| FUNCTION = "api_call"
|
| CATEGORY = "api node/image/OpenAI"
|
| DESCRIPTION = cleandoc(__doc__ or "")
|
| API_NODE = True
|
|
|
| def api_call(
|
| self,
|
| prompt,
|
| seed=0,
|
| image=None,
|
| mask=None,
|
| n=1,
|
| size="1024x1024",
|
| unique_id=None,
|
| **kwargs
|
| ):
|
| validate_string(prompt, strip_whitespace=False)
|
| model = "dall-e-2"
|
| path = "/proxy/openai/images/generations"
|
| content_type = "application/json"
|
| request_class = OpenAIImageGenerationRequest
|
| img_binary = None
|
|
|
| if image is not None and mask is not None:
|
| path = "/proxy/openai/images/edits"
|
| content_type = "multipart/form-data"
|
| request_class = OpenAIImageEditRequest
|
|
|
| input_tensor = image.squeeze().cpu()
|
| height, width, channels = input_tensor.shape
|
| rgba_tensor = torch.ones(height, width, 4, device="cpu")
|
| rgba_tensor[:, :, :channels] = input_tensor
|
|
|
| if mask.shape[1:] != image.shape[1:-1]:
|
| raise Exception("Mask and Image must be the same size")
|
| rgba_tensor[:, :, 3] = 1 - mask.squeeze().cpu()
|
|
|
| rgba_tensor = downscale_image_tensor(rgba_tensor.unsqueeze(0)).squeeze()
|
|
|
| image_np = (rgba_tensor.numpy() * 255).astype(np.uint8)
|
| img = Image.fromarray(image_np)
|
| img_byte_arr = io.BytesIO()
|
| img.save(img_byte_arr, format="PNG")
|
| img_byte_arr.seek(0)
|
| img_binary = img_byte_arr
|
| img_binary.name = "image.png"
|
| elif image is not None or mask is not None:
|
| raise Exception("Dall-E 2 image editing requires an image AND a mask")
|
|
|
|
|
| operation = SynchronousOperation(
|
| endpoint=ApiEndpoint(
|
| path=path,
|
| method=HttpMethod.POST,
|
| request_model=request_class,
|
| response_model=OpenAIImageGenerationResponse,
|
| ),
|
| request=request_class(
|
| model=model,
|
| prompt=prompt,
|
| n=n,
|
| size=size,
|
| seed=seed,
|
| ),
|
| files=(
|
| {
|
| "image": img_binary,
|
| }
|
| if img_binary
|
| else None
|
| ),
|
| content_type=content_type,
|
| auth_kwargs=kwargs,
|
| )
|
|
|
| response = operation.execute()
|
|
|
| img_tensor = validate_and_cast_response(response, node_id=unique_id)
|
| return (img_tensor,)
|
|
|
|
|
| class OpenAIDalle3(ComfyNodeABC):
|
| """
|
| Generates images synchronously via OpenAI's DALL路E 3 endpoint.
|
| """
|
|
|
| def __init__(self):
|
| pass
|
|
|
| @classmethod
|
| def INPUT_TYPES(cls) -> InputTypeDict:
|
| return {
|
| "required": {
|
| "prompt": (
|
| IO.STRING,
|
| {
|
| "multiline": True,
|
| "default": "",
|
| "tooltip": "Text prompt for DALL路E",
|
| },
|
| ),
|
| },
|
| "optional": {
|
| "seed": (
|
| IO.INT,
|
| {
|
| "default": 0,
|
| "min": 0,
|
| "max": 2**31 - 1,
|
| "step": 1,
|
| "display": "number",
|
| "control_after_generate": True,
|
| "tooltip": "not implemented yet in backend",
|
| },
|
| ),
|
| "quality": (
|
| IO.COMBO,
|
| {
|
| "options": ["standard", "hd"],
|
| "default": "standard",
|
| "tooltip": "Image quality",
|
| },
|
| ),
|
| "style": (
|
| IO.COMBO,
|
| {
|
| "options": ["natural", "vivid"],
|
| "default": "natural",
|
| "tooltip": "Vivid causes the model to lean towards generating hyper-real and dramatic images. Natural causes the model to produce more natural, less hyper-real looking images.",
|
| },
|
| ),
|
| "size": (
|
| IO.COMBO,
|
| {
|
| "options": ["1024x1024", "1024x1792", "1792x1024"],
|
| "default": "1024x1024",
|
| "tooltip": "Image size",
|
| },
|
| ),
|
| },
|
| "hidden": {
|
| "auth_token": "AUTH_TOKEN_COMFY_ORG",
|
| "comfy_api_key": "API_KEY_COMFY_ORG",
|
| "unique_id": "UNIQUE_ID",
|
| },
|
| }
|
|
|
| RETURN_TYPES = (IO.IMAGE,)
|
| FUNCTION = "api_call"
|
| CATEGORY = "api node/image/OpenAI"
|
| DESCRIPTION = cleandoc(__doc__ or "")
|
| API_NODE = True
|
|
|
| def api_call(
|
| self,
|
| prompt,
|
| seed=0,
|
| style="natural",
|
| quality="standard",
|
| size="1024x1024",
|
| unique_id=None,
|
| **kwargs
|
| ):
|
| validate_string(prompt, strip_whitespace=False)
|
| model = "dall-e-3"
|
|
|
|
|
| operation = SynchronousOperation(
|
| endpoint=ApiEndpoint(
|
| path="/proxy/openai/images/generations",
|
| method=HttpMethod.POST,
|
| request_model=OpenAIImageGenerationRequest,
|
| response_model=OpenAIImageGenerationResponse,
|
| ),
|
| request=OpenAIImageGenerationRequest(
|
| model=model,
|
| prompt=prompt,
|
| quality=quality,
|
| size=size,
|
| style=style,
|
| seed=seed,
|
| ),
|
| auth_kwargs=kwargs,
|
| )
|
|
|
| response = operation.execute()
|
|
|
| img_tensor = validate_and_cast_response(response, node_id=unique_id)
|
| return (img_tensor,)
|
|
|
|
|
| class OpenAIGPTImage1(ComfyNodeABC):
|
| """
|
| Generates images synchronously via OpenAI's GPT Image 1 endpoint.
|
| """
|
|
|
| def __init__(self):
|
| pass
|
|
|
| @classmethod
|
| def INPUT_TYPES(cls) -> InputTypeDict:
|
| return {
|
| "required": {
|
| "prompt": (
|
| IO.STRING,
|
| {
|
| "multiline": True,
|
| "default": "",
|
| "tooltip": "Text prompt for GPT Image 1",
|
| },
|
| ),
|
| },
|
| "optional": {
|
| "seed": (
|
| IO.INT,
|
| {
|
| "default": 0,
|
| "min": 0,
|
| "max": 2**31 - 1,
|
| "step": 1,
|
| "display": "number",
|
| "control_after_generate": True,
|
| "tooltip": "not implemented yet in backend",
|
| },
|
| ),
|
| "quality": (
|
| IO.COMBO,
|
| {
|
| "options": ["low", "medium", "high"],
|
| "default": "low",
|
| "tooltip": "Image quality, affects cost and generation time.",
|
| },
|
| ),
|
| "background": (
|
| IO.COMBO,
|
| {
|
| "options": ["opaque", "transparent"],
|
| "default": "opaque",
|
| "tooltip": "Return image with or without background",
|
| },
|
| ),
|
| "size": (
|
| IO.COMBO,
|
| {
|
| "options": ["auto", "1024x1024", "1024x1536", "1536x1024"],
|
| "default": "auto",
|
| "tooltip": "Image size",
|
| },
|
| ),
|
| "n": (
|
| IO.INT,
|
| {
|
| "default": 1,
|
| "min": 1,
|
| "max": 8,
|
| "step": 1,
|
| "display": "number",
|
| "tooltip": "How many images to generate",
|
| },
|
| ),
|
| "image": (
|
| IO.IMAGE,
|
| {
|
| "default": None,
|
| "tooltip": "Optional reference image for image editing.",
|
| },
|
| ),
|
| "mask": (
|
| IO.MASK,
|
| {
|
| "default": None,
|
| "tooltip": "Optional mask for inpainting (white areas will be replaced)",
|
| },
|
| ),
|
| },
|
| "hidden": {
|
| "auth_token": "AUTH_TOKEN_COMFY_ORG",
|
| "comfy_api_key": "API_KEY_COMFY_ORG",
|
| "unique_id": "UNIQUE_ID",
|
| },
|
| }
|
|
|
| RETURN_TYPES = (IO.IMAGE,)
|
| FUNCTION = "api_call"
|
| CATEGORY = "api node/image/OpenAI"
|
| DESCRIPTION = cleandoc(__doc__ or "")
|
| API_NODE = True
|
|
|
| def api_call(
|
| self,
|
| prompt,
|
| seed=0,
|
| quality="low",
|
| background="opaque",
|
| image=None,
|
| mask=None,
|
| n=1,
|
| size="1024x1024",
|
| unique_id=None,
|
| **kwargs
|
| ):
|
| validate_string(prompt, strip_whitespace=False)
|
| model = "gpt-image-1"
|
| path = "/proxy/openai/images/generations"
|
| content_type="application/json"
|
| request_class = OpenAIImageGenerationRequest
|
| img_binaries = []
|
| mask_binary = None
|
| files = []
|
|
|
| if image is not None:
|
| path = "/proxy/openai/images/edits"
|
| request_class = OpenAIImageEditRequest
|
| content_type ="multipart/form-data"
|
|
|
| batch_size = image.shape[0]
|
|
|
| for i in range(batch_size):
|
| single_image = image[i : i + 1]
|
| scaled_image = downscale_image_tensor(single_image).squeeze()
|
|
|
| image_np = (scaled_image.numpy() * 255).astype(np.uint8)
|
| img = Image.fromarray(image_np)
|
| img_byte_arr = io.BytesIO()
|
| img.save(img_byte_arr, format="PNG")
|
| img_byte_arr.seek(0)
|
| img_binary = img_byte_arr
|
| img_binary.name = f"image_{i}.png"
|
|
|
| img_binaries.append(img_binary)
|
| if batch_size == 1:
|
| files.append(("image", img_binary))
|
| else:
|
| files.append(("image[]", img_binary))
|
|
|
| if mask is not None:
|
| if image is None:
|
| raise Exception("Cannot use a mask without an input image")
|
| if image.shape[0] != 1:
|
| raise Exception("Cannot use a mask with multiple image")
|
| if mask.shape[1:] != image.shape[1:-1]:
|
| raise Exception("Mask and Image must be the same size")
|
| batch, height, width = mask.shape
|
| rgba_mask = torch.zeros(height, width, 4, device="cpu")
|
| rgba_mask[:, :, 3] = 1 - mask.squeeze().cpu()
|
|
|
| scaled_mask = downscale_image_tensor(rgba_mask.unsqueeze(0)).squeeze()
|
|
|
| mask_np = (scaled_mask.numpy() * 255).astype(np.uint8)
|
| mask_img = Image.fromarray(mask_np)
|
| mask_img_byte_arr = io.BytesIO()
|
| mask_img.save(mask_img_byte_arr, format="PNG")
|
| mask_img_byte_arr.seek(0)
|
| mask_binary = mask_img_byte_arr
|
| mask_binary.name = "mask.png"
|
| files.append(("mask", mask_binary))
|
|
|
|
|
| operation = SynchronousOperation(
|
| endpoint=ApiEndpoint(
|
| path=path,
|
| method=HttpMethod.POST,
|
| request_model=request_class,
|
| response_model=OpenAIImageGenerationResponse,
|
| ),
|
| request=request_class(
|
| model=model,
|
| prompt=prompt,
|
| quality=quality,
|
| background=background,
|
| n=n,
|
| seed=seed,
|
| size=size,
|
| ),
|
| files=files if files else None,
|
| content_type=content_type,
|
| auth_kwargs=kwargs,
|
| )
|
|
|
| response = operation.execute()
|
|
|
| img_tensor = validate_and_cast_response(response, node_id=unique_id)
|
| return (img_tensor,)
|
|
|
|
|
|
|
|
|
| NODE_CLASS_MAPPINGS = {
|
| "OpenAIDalle2": OpenAIDalle2,
|
| "OpenAIDalle3": OpenAIDalle3,
|
| "OpenAIGPTImage1": OpenAIGPTImage1,
|
| }
|
|
|
|
|
| NODE_DISPLAY_NAME_MAPPINGS = {
|
| "OpenAIDalle2": "OpenAI DALL路E 2",
|
| "OpenAIDalle3": "OpenAI DALL路E 3",
|
| "OpenAIGPTImage1": "OpenAI GPT Image 1",
|
| }
|
|
|