|
|
""" |
|
|
Handler for QwenImageLayeredPipeline. |
|
|
Decomposes an input RGBA image into semantic layers (foreground, background, objects, etc.) |
|
|
""" |
|
|
from typing import Dict, List, Any |
|
|
import torch |
|
|
import base64 |
|
|
import io |
|
|
from PIL import Image |
|
|
|
|
|
|
|
|
try: |
|
|
from diffusers import QwenImageLayeredPipeline |
|
|
except ImportError: |
|
|
from diffusers import DiffusionPipeline |
|
|
QwenImageLayeredPipeline = None |
|
|
|
|
|
class EndpointHandler: |
|
|
def __init__(self, path=""): |
|
|
|
|
|
model_id = "Qwen/Qwen-Image-Layered" |
|
|
|
|
|
print(f"Loading model {model_id}...") |
|
|
|
|
|
if QwenImageLayeredPipeline: |
|
|
print("Using explicit QwenImageLayeredPipeline class.") |
|
|
self.pipeline = QwenImageLayeredPipeline.from_pretrained( |
|
|
model_id, |
|
|
torch_dtype=torch.bfloat16, |
|
|
) |
|
|
else: |
|
|
print("Falling back to DiffusionPipeline auto-load.") |
|
|
self.pipeline = DiffusionPipeline.from_pretrained( |
|
|
model_id, |
|
|
trust_remote_code=True, |
|
|
torch_dtype=torch.bfloat16, |
|
|
) |
|
|
print(f"Loaded pipeline class: {type(self.pipeline).__name__}") |
|
|
|
|
|
if torch.cuda.is_available(): |
|
|
self.pipeline.to("cuda") |
|
|
|
|
|
print("Model ready!") |
|
|
|
|
|
def __call__(self, data: Dict[str, Any]) -> List[Dict[str, Any]]: |
|
|
""" |
|
|
Expects: |
|
|
inputs.image: base64-encoded RGBA image |
|
|
parameters.layers: number of layers to decompose into (default: 4) |
|
|
parameters.num_inference_steps: inference steps (default: 50) |
|
|
parameters.resolution: output resolution (default: 640) |
|
|
|
|
|
Returns: |
|
|
List of base64-encoded layer images |
|
|
""" |
|
|
inputs = data.pop("inputs", data) |
|
|
parameters = data.pop("parameters", {}) |
|
|
|
|
|
|
|
|
image_data = inputs.get("image") |
|
|
if not image_data: |
|
|
raise ValueError("Missing 'image' in inputs. Please provide a base64-encoded RGBA image.") |
|
|
|
|
|
try: |
|
|
image_bytes = base64.b64decode(image_data) |
|
|
image = Image.open(io.BytesIO(image_bytes)).convert("RGBA") |
|
|
except Exception as e: |
|
|
raise ValueError(f"Failed to decode image: {e}") |
|
|
|
|
|
|
|
|
layers = parameters.get("layers", 4) |
|
|
num_inference_steps = parameters.get("num_inference_steps", 50) |
|
|
resolution = parameters.get("resolution", 640) |
|
|
prompt = parameters.get("prompt", "") |
|
|
|
|
|
print(f"Decomposing image into {layers} layers at resolution {resolution}...") |
|
|
|
|
|
|
|
|
with torch.autocast("cuda"): |
|
|
output = self.pipeline( |
|
|
image, |
|
|
prompt, |
|
|
num_inference_steps=num_inference_steps, |
|
|
layers=layers, |
|
|
resolution=resolution, |
|
|
true_cfg_scale=4.0, |
|
|
cfg_normalize=False, |
|
|
use_en_prompt=True, |
|
|
) |
|
|
|
|
|
|
|
|
images_response = [] |
|
|
|
|
|
if hasattr(output, "images") and output.images: |
|
|
|
|
|
layer_images = output.images[0] if isinstance(output.images[0], list) else output.images |
|
|
|
|
|
for i, layer_img in enumerate(layer_images): |
|
|
if isinstance(layer_img, Image.Image): |
|
|
buffered = io.BytesIO() |
|
|
layer_img.save(buffered, format="PNG") |
|
|
img_str = base64.b64encode(buffered.getvalue()).decode('utf-8') |
|
|
images_response.append({ |
|
|
"layer_index": i, |
|
|
"image": img_str |
|
|
}) |
|
|
|
|
|
print(f"Returned {len(images_response)} layers.") |
|
|
return images_response |
|
|
|