| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
|
|
| import torch |
|
|
| from ...models import QwenImageMultiControlNetModel |
| from ..modular_pipeline import ModularPipelineBlocks, PipelineState |
| from ..modular_pipeline_utils import ComponentSpec, InputParam, OutputParam |
| from .modular_pipeline import QwenImageLayeredPachifier, QwenImageModularPipeline, QwenImagePachifier |
|
|
|
|
| def repeat_tensor_to_batch_size( |
| input_name: str, |
| input_tensor: torch.Tensor, |
| batch_size: int, |
| num_images_per_prompt: int = 1, |
| ) -> torch.Tensor: |
| """Repeat tensor elements to match the final batch size. |
| |
| This function expands a tensor's batch dimension to match the final batch size (batch_size * num_images_per_prompt) |
| by repeating each element along dimension 0. |
| |
| The input tensor must have batch size 1 or batch_size. The function will: |
| - If batch size is 1: repeat each element (batch_size * num_images_per_prompt) times |
| - If batch size equals batch_size: repeat each element num_images_per_prompt times |
| |
| Args: |
| input_name (str): Name of the input tensor (used for error messages) |
| input_tensor (torch.Tensor): The tensor to repeat. Must have batch size 1 or batch_size. |
| batch_size (int): The base batch size (number of prompts) |
| num_images_per_prompt (int, optional): Number of images to generate per prompt. Defaults to 1. |
| |
| Returns: |
| torch.Tensor: The repeated tensor with final batch size (batch_size * num_images_per_prompt) |
| |
| Raises: |
| ValueError: If input_tensor is not a torch.Tensor or has invalid batch size |
| |
| Examples: |
| tensor = torch.tensor([[1, 2, 3]]) # shape: [1, 3] repeated = repeat_tensor_to_batch_size("image", tensor, |
| batch_size=2, num_images_per_prompt=2) repeated # tensor([[1, 2, 3], [1, 2, 3], [1, 2, 3], [1, 2, 3]]) - shape: |
| [4, 3] |
| |
| tensor = torch.tensor([[1, 2, 3], [4, 5, 6]]) # shape: [2, 3] repeated = repeat_tensor_to_batch_size("image", |
| tensor, batch_size=2, num_images_per_prompt=2) repeated # tensor([[1, 2, 3], [1, 2, 3], [4, 5, 6], [4, 5, 6]]) |
| - shape: [4, 3] |
| """ |
| |
| if not isinstance(input_tensor, torch.Tensor): |
| raise ValueError(f"`{input_name}` must be a tensor") |
|
|
| |
| if input_tensor.shape[0] == 1: |
| repeat_by = batch_size * num_images_per_prompt |
| elif input_tensor.shape[0] == batch_size: |
| repeat_by = num_images_per_prompt |
| else: |
| raise ValueError( |
| f"`{input_name}` must have have batch size 1 or {batch_size}, but got {input_tensor.shape[0]}" |
| ) |
|
|
| |
| input_tensor = input_tensor.repeat_interleave(repeat_by, dim=0) |
|
|
| return input_tensor |
|
|
|
|
| def calculate_dimension_from_latents(latents: torch.Tensor, vae_scale_factor: int) -> tuple[int, int]: |
| """Calculate image dimensions from latent tensor dimensions. |
| |
| This function converts latent space dimensions to image space dimensions by multiplying the latent height and width |
| by the VAE scale factor. |
| |
| Args: |
| latents (torch.Tensor): The latent tensor. Must have 4 or 5 dimensions. |
| Expected shapes: [batch, channels, height, width] or [batch, channels, frames, height, width] |
| vae_scale_factor (int): The scale factor used by the VAE to compress images. |
| Typically 8 for most VAEs (image is 8x larger than latents in each dimension) |
| |
| Returns: |
| tuple[int, int]: The calculated image dimensions as (height, width) |
| |
| Raises: |
| ValueError: If latents tensor doesn't have 4 or 5 dimensions |
| |
| """ |
| |
| if latents.ndim != 4 and latents.ndim != 5: |
| raise ValueError(f"unpacked latents must have 4 or 5 dimensions, but got {latents.ndim}") |
|
|
| latent_height, latent_width = latents.shape[-2:] |
|
|
| height = latent_height * vae_scale_factor |
| width = latent_width * vae_scale_factor |
|
|
| return height, width |
|
|
|
|
| |
| class QwenImageTextInputsStep(ModularPipelineBlocks): |
| """ |
| Text input processing step that standardizes text embeddings for the pipeline. |
| This step: |
| 1. Determines `batch_size` and `dtype` based on `prompt_embeds` |
| 2. Ensures all text embeddings have consistent batch sizes (batch_size * num_images_per_prompt) |
| |
| This block should be placed after all encoder steps to process the text embeddings before they are used in |
| subsequent pipeline steps. |
| |
| Inputs: |
| num_images_per_prompt (`int`, *optional*, defaults to 1): |
| The number of images to generate per prompt. |
| prompt_embeds (`Tensor`): |
| text embeddings used to guide the image generation. Can be generated from text_encoder step. |
| prompt_embeds_mask (`Tensor`): |
| mask for the text embeddings. Can be generated from text_encoder step. |
| negative_prompt_embeds (`Tensor`, *optional*): |
| negative text embeddings used to guide the image generation. Can be generated from text_encoder step. |
| negative_prompt_embeds_mask (`Tensor`, *optional*): |
| mask for the negative text embeddings. Can be generated from text_encoder step. |
| |
| Outputs: |
| batch_size (`int`): |
| The batch size of the prompt embeddings |
| dtype (`dtype`): |
| The data type of the prompt embeddings |
| prompt_embeds (`Tensor`): |
| The prompt embeddings. (batch-expanded) |
| prompt_embeds_mask (`Tensor`): |
| The encoder attention mask. (batch-expanded) |
| negative_prompt_embeds (`Tensor`): |
| The negative prompt embeddings. (batch-expanded) |
| negative_prompt_embeds_mask (`Tensor`): |
| The negative prompt embeddings mask. (batch-expanded) |
| """ |
|
|
| model_name = "qwenimage" |
|
|
| @property |
| def description(self) -> str: |
| summary_section = ( |
| "Text input processing step that standardizes text embeddings for the pipeline.\n" |
| "This step:\n" |
| " 1. Determines `batch_size` and `dtype` based on `prompt_embeds`\n" |
| " 2. Ensures all text embeddings have consistent batch sizes (batch_size * num_images_per_prompt)" |
| ) |
|
|
| |
| placement_section = "\n\nThis block should be placed after all encoder steps to process the text embeddings before they are used in subsequent pipeline steps." |
|
|
| return summary_section + placement_section |
|
|
| @property |
| def inputs(self) -> list[InputParam]: |
| return [ |
| InputParam.template("num_images_per_prompt"), |
| InputParam.template("prompt_embeds"), |
| InputParam.template("prompt_embeds_mask"), |
| InputParam.template("negative_prompt_embeds"), |
| InputParam.template("negative_prompt_embeds_mask"), |
| ] |
|
|
| @property |
| def intermediate_outputs(self) -> list[OutputParam]: |
| return [ |
| OutputParam(name="batch_size", type_hint=int, description="The batch size of the prompt embeddings"), |
| OutputParam(name="dtype", type_hint=torch.dtype, description="The data type of the prompt embeddings"), |
| OutputParam.template("prompt_embeds", note="batch-expanded"), |
| OutputParam.template("prompt_embeds_mask", note="batch-expanded"), |
| OutputParam.template("negative_prompt_embeds", note="batch-expanded"), |
| OutputParam.template("negative_prompt_embeds_mask", note="batch-expanded"), |
| ] |
|
|
| @staticmethod |
| def check_inputs( |
| prompt_embeds, |
| prompt_embeds_mask, |
| negative_prompt_embeds, |
| negative_prompt_embeds_mask, |
| ): |
| if negative_prompt_embeds is not None and negative_prompt_embeds_mask is None: |
| raise ValueError("`negative_prompt_embeds_mask` is required when `negative_prompt_embeds` is not None") |
|
|
| if negative_prompt_embeds is None and negative_prompt_embeds_mask is not None: |
| raise ValueError("cannot pass `negative_prompt_embeds_mask` without `negative_prompt_embeds`") |
|
|
| if prompt_embeds_mask.shape[0] != prompt_embeds.shape[0]: |
| raise ValueError("`prompt_embeds_mask` must have the same batch size as `prompt_embeds`") |
|
|
| elif negative_prompt_embeds is not None and negative_prompt_embeds.shape[0] != prompt_embeds.shape[0]: |
| raise ValueError("`negative_prompt_embeds` must have the same batch size as `prompt_embeds`") |
|
|
| elif ( |
| negative_prompt_embeds_mask is not None and negative_prompt_embeds_mask.shape[0] != prompt_embeds.shape[0] |
| ): |
| raise ValueError("`negative_prompt_embeds_mask` must have the same batch size as `prompt_embeds`") |
|
|
| def __call__(self, components: QwenImageModularPipeline, state: PipelineState) -> PipelineState: |
| block_state = self.get_block_state(state) |
|
|
| self.check_inputs( |
| prompt_embeds=block_state.prompt_embeds, |
| prompt_embeds_mask=block_state.prompt_embeds_mask, |
| negative_prompt_embeds=block_state.negative_prompt_embeds, |
| negative_prompt_embeds_mask=block_state.negative_prompt_embeds_mask, |
| ) |
|
|
| block_state.batch_size = block_state.prompt_embeds.shape[0] |
| block_state.dtype = block_state.prompt_embeds.dtype |
|
|
| _, seq_len, _ = block_state.prompt_embeds.shape |
|
|
| block_state.prompt_embeds = block_state.prompt_embeds.repeat(1, block_state.num_images_per_prompt, 1) |
| block_state.prompt_embeds = block_state.prompt_embeds.view( |
| block_state.batch_size * block_state.num_images_per_prompt, seq_len, -1 |
| ) |
|
|
| block_state.prompt_embeds_mask = block_state.prompt_embeds_mask.repeat(1, block_state.num_images_per_prompt, 1) |
| block_state.prompt_embeds_mask = block_state.prompt_embeds_mask.view( |
| block_state.batch_size * block_state.num_images_per_prompt, seq_len |
| ) |
|
|
| if block_state.negative_prompt_embeds is not None: |
| _, seq_len, _ = block_state.negative_prompt_embeds.shape |
| block_state.negative_prompt_embeds = block_state.negative_prompt_embeds.repeat( |
| 1, block_state.num_images_per_prompt, 1 |
| ) |
| block_state.negative_prompt_embeds = block_state.negative_prompt_embeds.view( |
| block_state.batch_size * block_state.num_images_per_prompt, seq_len, -1 |
| ) |
|
|
| block_state.negative_prompt_embeds_mask = block_state.negative_prompt_embeds_mask.repeat( |
| 1, block_state.num_images_per_prompt, 1 |
| ) |
| block_state.negative_prompt_embeds_mask = block_state.negative_prompt_embeds_mask.view( |
| block_state.batch_size * block_state.num_images_per_prompt, seq_len |
| ) |
|
|
| self.set_block_state(state, block_state) |
|
|
| return components, state |
|
|
|
|
| |
| class QwenImageAdditionalInputsStep(ModularPipelineBlocks): |
| """ |
| Input processing step that: |
| 1. For image latent inputs: Updates height/width if None, patchifies, and expands batch size |
| 2. For additional batch inputs: Expands batch dimensions to match final batch size |
| |
| Configured inputs: |
| - Image latent inputs: ['image_latents'] |
| |
| This block should be placed after the encoder steps and the text input step. |
| |
| Components: |
| pachifier (`QwenImagePachifier`) |
| |
| Inputs: |
| num_images_per_prompt (`int`, *optional*, defaults to 1): |
| The number of images to generate per prompt. |
| batch_size (`int`, *optional*, defaults to 1): |
| Number of prompts, the final batch size of model inputs should be batch_size * num_images_per_prompt. Can |
| be generated in input step. |
| height (`int`, *optional*): |
| The height in pixels of the generated image. |
| width (`int`, *optional*): |
| The width in pixels of the generated image. |
| image_latents (`Tensor`): |
| image latents used to guide the image generation. Can be generated from vae_encoder step. |
| |
| Outputs: |
| image_height (`int`): |
| The image height calculated from the image latents dimension |
| image_width (`int`): |
| The image width calculated from the image latents dimension |
| height (`int`): |
| if not provided, updated to image height |
| width (`int`): |
| if not provided, updated to image width |
| image_latents (`Tensor`): |
| image latents used to guide the image generation. Can be generated from vae_encoder step. (patchified and |
| batch-expanded) |
| """ |
|
|
| model_name = "qwenimage" |
|
|
| def __init__( |
| self, |
| image_latent_inputs: list[InputParam] | None = None, |
| additional_batch_inputs: list[InputParam] | None = None, |
| ): |
| |
| if image_latent_inputs is None: |
| image_latent_inputs = [InputParam.template("image_latents")] |
| if additional_batch_inputs is None: |
| additional_batch_inputs = [] |
|
|
| if not isinstance(image_latent_inputs, list): |
| raise ValueError(f"image_latent_inputs must be a list, but got {type(image_latent_inputs)}") |
| else: |
| for input_param in image_latent_inputs: |
| if not isinstance(input_param, InputParam): |
| raise ValueError(f"image_latent_inputs must be a list of InputParam, but got {type(input_param)}") |
|
|
| if not isinstance(additional_batch_inputs, list): |
| raise ValueError(f"additional_batch_inputs must be a list, but got {type(additional_batch_inputs)}") |
| else: |
| for input_param in additional_batch_inputs: |
| if not isinstance(input_param, InputParam): |
| raise ValueError( |
| f"additional_batch_inputs must be a list of InputParam, but got {type(input_param)}" |
| ) |
|
|
| self._image_latent_inputs = image_latent_inputs |
| self._additional_batch_inputs = additional_batch_inputs |
| super().__init__() |
|
|
| @property |
| def description(self) -> str: |
| summary_section = ( |
| "Input processing step that:\n" |
| " 1. For image latent inputs: Updates height/width if None, patchifies, and expands batch size\n" |
| " 2. For additional batch inputs: Expands batch dimensions to match final batch size" |
| ) |
|
|
| inputs_info = "" |
| if self._image_latent_inputs or self._additional_batch_inputs: |
| inputs_info = "\n\nConfigured inputs:" |
| if self._image_latent_inputs: |
| inputs_info += f"\n - Image latent inputs: {[p.name for p in self._image_latent_inputs]}" |
| if self._additional_batch_inputs: |
| inputs_info += f"\n - Additional batch inputs: {[p.name for p in self._additional_batch_inputs]}" |
|
|
| placement_section = "\n\nThis block should be placed after the encoder steps and the text input step." |
|
|
| return summary_section + inputs_info + placement_section |
|
|
| @property |
| def expected_components(self) -> list[ComponentSpec]: |
| return [ |
| ComponentSpec("pachifier", QwenImagePachifier, default_creation_method="from_config"), |
| ] |
|
|
| @property |
| def inputs(self) -> list[InputParam]: |
| inputs = [ |
| InputParam.template("num_images_per_prompt"), |
| InputParam.template("batch_size"), |
| InputParam.template("height"), |
| InputParam.template("width"), |
| ] |
| |
| inputs += self._image_latent_inputs + self._additional_batch_inputs |
|
|
| return inputs |
|
|
| @property |
| def intermediate_outputs(self) -> list[OutputParam]: |
| outputs = [ |
| OutputParam( |
| name="image_height", |
| type_hint=int, |
| description="The image height calculated from the image latents dimension", |
| ), |
| OutputParam( |
| name="image_width", |
| type_hint=int, |
| description="The image width calculated from the image latents dimension", |
| ), |
| ] |
|
|
| |
| if len(self._image_latent_inputs) > 0: |
| outputs.append( |
| OutputParam(name="height", type_hint=int, description="if not provided, updated to image height") |
| ) |
| outputs.append( |
| OutputParam(name="width", type_hint=int, description="if not provided, updated to image width") |
| ) |
|
|
| |
| for input_param in self._image_latent_inputs: |
| outputs.append( |
| OutputParam( |
| name=input_param.name, |
| type_hint=input_param.type_hint, |
| description=input_param.description + " (patchified and batch-expanded)", |
| ) |
| ) |
|
|
| |
| for input_param in self._additional_batch_inputs: |
| outputs.append( |
| OutputParam( |
| name=input_param.name, |
| type_hint=input_param.type_hint, |
| description=input_param.description + " (batch-expanded)", |
| ) |
| ) |
|
|
| return outputs |
|
|
| def __call__(self, components: QwenImageModularPipeline, state: PipelineState) -> PipelineState: |
| block_state = self.get_block_state(state) |
|
|
| |
| for input_param in self._image_latent_inputs: |
| image_latent_input_name = input_param.name |
| image_latent_tensor = getattr(block_state, image_latent_input_name) |
| if image_latent_tensor is None: |
| continue |
|
|
| |
| height, width = calculate_dimension_from_latents(image_latent_tensor, components.vae_scale_factor) |
| block_state.height = block_state.height or height |
| block_state.width = block_state.width or width |
|
|
| if not hasattr(block_state, "image_height"): |
| block_state.image_height = height |
| if not hasattr(block_state, "image_width"): |
| block_state.image_width = width |
|
|
| |
| image_latent_tensor = components.pachifier.pack_latents(image_latent_tensor) |
|
|
| |
| image_latent_tensor = repeat_tensor_to_batch_size( |
| input_name=image_latent_input_name, |
| input_tensor=image_latent_tensor, |
| num_images_per_prompt=block_state.num_images_per_prompt, |
| batch_size=block_state.batch_size, |
| ) |
|
|
| setattr(block_state, image_latent_input_name, image_latent_tensor) |
|
|
| |
| for input_param in self._additional_batch_inputs: |
| input_name = input_param.name |
| input_tensor = getattr(block_state, input_name) |
| if input_tensor is None: |
| continue |
|
|
| input_tensor = repeat_tensor_to_batch_size( |
| input_name=input_name, |
| input_tensor=input_tensor, |
| num_images_per_prompt=block_state.num_images_per_prompt, |
| batch_size=block_state.batch_size, |
| ) |
|
|
| setattr(block_state, input_name, input_tensor) |
|
|
| self.set_block_state(state, block_state) |
| return components, state |
|
|
|
|
| |
| class QwenImageEditPlusAdditionalInputsStep(ModularPipelineBlocks): |
| """ |
| Input processing step for Edit Plus that: |
| 1. For image latent inputs (list): Collects heights/widths, patchifies each, concatenates, expands batch |
| 2. For additional batch inputs: Expands batch dimensions to match final batch size |
| Height/width defaults to last image in the list. |
| |
| Configured inputs: |
| - Image latent inputs: ['image_latents'] |
| |
| This block should be placed after the encoder steps and the text input step. |
| |
| Components: |
| pachifier (`QwenImagePachifier`) |
| |
| Inputs: |
| num_images_per_prompt (`int`, *optional*, defaults to 1): |
| The number of images to generate per prompt. |
| batch_size (`int`, *optional*, defaults to 1): |
| Number of prompts, the final batch size of model inputs should be batch_size * num_images_per_prompt. Can |
| be generated in input step. |
| height (`int`, *optional*): |
| The height in pixels of the generated image. |
| width (`int`, *optional*): |
| The width in pixels of the generated image. |
| image_latents (`Tensor`): |
| image latents used to guide the image generation. Can be generated from vae_encoder step. |
| |
| Outputs: |
| image_height (`list`): |
| The image heights calculated from the image latents dimension |
| image_width (`list`): |
| The image widths calculated from the image latents dimension |
| height (`int`): |
| if not provided, updated to image height |
| width (`int`): |
| if not provided, updated to image width |
| image_latents (`Tensor`): |
| image latents used to guide the image generation. Can be generated from vae_encoder step. (patchified, |
| concatenated, and batch-expanded) |
| """ |
|
|
| model_name = "qwenimage-edit-plus" |
|
|
| def __init__( |
| self, |
| image_latent_inputs: list[InputParam] | None = None, |
| additional_batch_inputs: list[InputParam] | None = None, |
| ): |
| if image_latent_inputs is None: |
| image_latent_inputs = [InputParam.template("image_latents")] |
| if additional_batch_inputs is None: |
| additional_batch_inputs = [] |
|
|
| if not isinstance(image_latent_inputs, list): |
| raise ValueError(f"image_latent_inputs must be a list, but got {type(image_latent_inputs)}") |
| else: |
| for input_param in image_latent_inputs: |
| if not isinstance(input_param, InputParam): |
| raise ValueError(f"image_latent_inputs must be a list of InputParam, but got {type(input_param)}") |
|
|
| if not isinstance(additional_batch_inputs, list): |
| raise ValueError(f"additional_batch_inputs must be a list, but got {type(additional_batch_inputs)}") |
| else: |
| for input_param in additional_batch_inputs: |
| if not isinstance(input_param, InputParam): |
| raise ValueError( |
| f"additional_batch_inputs must be a list of InputParam, but got {type(input_param)}" |
| ) |
|
|
| self._image_latent_inputs = image_latent_inputs |
| self._additional_batch_inputs = additional_batch_inputs |
| super().__init__() |
|
|
| @property |
| def description(self) -> str: |
| summary_section = ( |
| "Input processing step for Edit Plus that:\n" |
| " 1. For image latent inputs (list): Collects heights/widths, patchifies each, concatenates, expands batch\n" |
| " 2. For additional batch inputs: Expands batch dimensions to match final batch size\n" |
| " Height/width defaults to last image in the list." |
| ) |
|
|
| inputs_info = "" |
| if self._image_latent_inputs or self._additional_batch_inputs: |
| inputs_info = "\n\nConfigured inputs:" |
| if self._image_latent_inputs: |
| inputs_info += f"\n - Image latent inputs: {[p.name for p in self._image_latent_inputs]}" |
| if self._additional_batch_inputs: |
| inputs_info += f"\n - Additional batch inputs: {[p.name for p in self._additional_batch_inputs]}" |
|
|
| placement_section = "\n\nThis block should be placed after the encoder steps and the text input step." |
|
|
| return summary_section + inputs_info + placement_section |
|
|
| @property |
| def expected_components(self) -> list[ComponentSpec]: |
| return [ |
| ComponentSpec("pachifier", QwenImagePachifier, default_creation_method="from_config"), |
| ] |
|
|
| @property |
| def inputs(self) -> list[InputParam]: |
| inputs = [ |
| InputParam.template("num_images_per_prompt"), |
| InputParam.template("batch_size"), |
| InputParam.template("height"), |
| InputParam.template("width"), |
| ] |
|
|
| |
| inputs += self._image_latent_inputs + self._additional_batch_inputs |
|
|
| return inputs |
|
|
| @property |
| def intermediate_outputs(self) -> list[OutputParam]: |
| outputs = [ |
| OutputParam( |
| name="image_height", |
| type_hint=list[int], |
| description="The image heights calculated from the image latents dimension", |
| ), |
| OutputParam( |
| name="image_width", |
| type_hint=list[int], |
| description="The image widths calculated from the image latents dimension", |
| ), |
| ] |
|
|
| |
| if len(self._image_latent_inputs) > 0: |
| outputs.append( |
| OutputParam(name="height", type_hint=int, description="if not provided, updated to image height") |
| ) |
| outputs.append( |
| OutputParam(name="width", type_hint=int, description="if not provided, updated to image width") |
| ) |
|
|
| |
| for input_param in self._image_latent_inputs: |
| outputs.append( |
| OutputParam( |
| name=input_param.name, |
| type_hint=input_param.type_hint, |
| description=input_param.description + " (patchified, concatenated, and batch-expanded)", |
| ) |
| ) |
|
|
| |
| for input_param in self._additional_batch_inputs: |
| outputs.append( |
| OutputParam( |
| name=input_param.name, |
| type_hint=input_param.type_hint, |
| description=input_param.description + " (batch-expanded)", |
| ) |
| ) |
|
|
| return outputs |
|
|
| def __call__(self, components: QwenImageModularPipeline, state: PipelineState) -> PipelineState: |
| block_state = self.get_block_state(state) |
|
|
| |
| for input_param in self._image_latent_inputs: |
| image_latent_input_name = input_param.name |
| image_latent_tensor = getattr(block_state, image_latent_input_name) |
| if image_latent_tensor is None: |
| continue |
|
|
| is_list = isinstance(image_latent_tensor, list) |
| if not is_list: |
| image_latent_tensor = [image_latent_tensor] |
|
|
| image_heights = [] |
| image_widths = [] |
| packed_image_latent_tensors = [] |
|
|
| for i, img_latent_tensor in enumerate(image_latent_tensor): |
| |
| height, width = calculate_dimension_from_latents(img_latent_tensor, components.vae_scale_factor) |
| image_heights.append(height) |
| image_widths.append(width) |
|
|
| |
| img_latent_tensor = components.pachifier.pack_latents(img_latent_tensor) |
|
|
| |
| img_latent_tensor = repeat_tensor_to_batch_size( |
| input_name=f"{image_latent_input_name}[{i}]", |
| input_tensor=img_latent_tensor, |
| num_images_per_prompt=block_state.num_images_per_prompt, |
| batch_size=block_state.batch_size, |
| ) |
| packed_image_latent_tensors.append(img_latent_tensor) |
|
|
| |
| packed_image_latent_tensors = torch.cat(packed_image_latent_tensors, dim=1) |
|
|
| |
| block_state.image_height = image_heights |
| block_state.image_width = image_widths |
|
|
| |
| block_state.height = block_state.height or image_heights[-1] |
| block_state.width = block_state.width or image_widths[-1] |
|
|
| setattr(block_state, image_latent_input_name, packed_image_latent_tensors) |
|
|
| |
| for input_param in self._additional_batch_inputs: |
| input_name = input_param.name |
| input_tensor = getattr(block_state, input_name) |
| if input_tensor is None: |
| continue |
|
|
| input_tensor = repeat_tensor_to_batch_size( |
| input_name=input_name, |
| input_tensor=input_tensor, |
| num_images_per_prompt=block_state.num_images_per_prompt, |
| batch_size=block_state.batch_size, |
| ) |
|
|
| setattr(block_state, input_name, input_tensor) |
|
|
| self.set_block_state(state, block_state) |
| return components, state |
|
|
|
|
| |
|
|
|
|
| |
| class QwenImageLayeredAdditionalInputsStep(ModularPipelineBlocks): |
| """ |
| Input processing step for Layered that: |
| 1. For image latent inputs: Updates height/width if None, patchifies with layered pachifier, and expands batch |
| size |
| 2. For additional batch inputs: Expands batch dimensions to match final batch size |
| |
| Configured inputs: |
| - Image latent inputs: ['image_latents'] |
| |
| This block should be placed after the encoder steps and the text input step. |
| |
| Components: |
| pachifier (`QwenImageLayeredPachifier`) |
| |
| Inputs: |
| num_images_per_prompt (`int`, *optional*, defaults to 1): |
| The number of images to generate per prompt. |
| batch_size (`int`, *optional*, defaults to 1): |
| Number of prompts, the final batch size of model inputs should be batch_size * num_images_per_prompt. Can |
| be generated in input step. |
| image_latents (`Tensor`): |
| image latents used to guide the image generation. Can be generated from vae_encoder step. |
| |
| Outputs: |
| image_height (`int`): |
| The image height calculated from the image latents dimension |
| image_width (`int`): |
| The image width calculated from the image latents dimension |
| height (`int`): |
| if not provided, updated to image height |
| width (`int`): |
| if not provided, updated to image width |
| image_latents (`Tensor`): |
| image latents used to guide the image generation. Can be generated from vae_encoder step. (patchified |
| with layered pachifier and batch-expanded) |
| """ |
|
|
| model_name = "qwenimage-layered" |
|
|
| def __init__( |
| self, |
| image_latent_inputs: list[InputParam] | None = None, |
| additional_batch_inputs: list[InputParam] | None = None, |
| ): |
| if image_latent_inputs is None: |
| image_latent_inputs = [InputParam.template("image_latents")] |
| if additional_batch_inputs is None: |
| additional_batch_inputs = [] |
|
|
| if not isinstance(image_latent_inputs, list): |
| raise ValueError(f"image_latent_inputs must be a list, but got {type(image_latent_inputs)}") |
| else: |
| for input_param in image_latent_inputs: |
| if not isinstance(input_param, InputParam): |
| raise ValueError(f"image_latent_inputs must be a list of InputParam, but got {type(input_param)}") |
|
|
| if not isinstance(additional_batch_inputs, list): |
| raise ValueError(f"additional_batch_inputs must be a list, but got {type(additional_batch_inputs)}") |
| else: |
| for input_param in additional_batch_inputs: |
| if not isinstance(input_param, InputParam): |
| raise ValueError( |
| f"additional_batch_inputs must be a list of InputParam, but got {type(input_param)}" |
| ) |
|
|
| self._image_latent_inputs = image_latent_inputs |
| self._additional_batch_inputs = additional_batch_inputs |
| super().__init__() |
|
|
| @property |
| def description(self) -> str: |
| summary_section = ( |
| "Input processing step for Layered that:\n" |
| " 1. For image latent inputs: Updates height/width if None, patchifies with layered pachifier, and expands batch size\n" |
| " 2. For additional batch inputs: Expands batch dimensions to match final batch size" |
| ) |
|
|
| inputs_info = "" |
| if self._image_latent_inputs or self._additional_batch_inputs: |
| inputs_info = "\n\nConfigured inputs:" |
| if self._image_latent_inputs: |
| inputs_info += f"\n - Image latent inputs: {[p.name for p in self._image_latent_inputs]}" |
| if self._additional_batch_inputs: |
| inputs_info += f"\n - Additional batch inputs: {[p.name for p in self._additional_batch_inputs]}" |
|
|
| placement_section = "\n\nThis block should be placed after the encoder steps and the text input step." |
|
|
| return summary_section + inputs_info + placement_section |
|
|
| @property |
| def expected_components(self) -> list[ComponentSpec]: |
| return [ |
| ComponentSpec("pachifier", QwenImageLayeredPachifier, default_creation_method="from_config"), |
| ] |
|
|
| @property |
| def inputs(self) -> list[InputParam]: |
| inputs = [ |
| InputParam.template("num_images_per_prompt"), |
| InputParam.template("batch_size"), |
| ] |
| |
|
|
| inputs += self._image_latent_inputs + self._additional_batch_inputs |
|
|
| return inputs |
|
|
| @property |
| def intermediate_outputs(self) -> list[OutputParam]: |
| outputs = [ |
| OutputParam( |
| name="image_height", |
| type_hint=int, |
| description="The image height calculated from the image latents dimension", |
| ), |
| OutputParam( |
| name="image_width", |
| type_hint=int, |
| description="The image width calculated from the image latents dimension", |
| ), |
| ] |
|
|
| if len(self._image_latent_inputs) > 0: |
| outputs.append( |
| OutputParam(name="height", type_hint=int, description="if not provided, updated to image height") |
| ) |
| outputs.append( |
| OutputParam(name="width", type_hint=int, description="if not provided, updated to image width") |
| ) |
|
|
| |
| for input_param in self._image_latent_inputs: |
| outputs.append( |
| OutputParam( |
| name=input_param.name, |
| type_hint=input_param.type_hint, |
| description=input_param.description + " (patchified with layered pachifier and batch-expanded)", |
| ) |
| ) |
|
|
| |
| for input_param in self._additional_batch_inputs: |
| outputs.append( |
| OutputParam( |
| name=input_param.name, |
| type_hint=input_param.type_hint, |
| description=input_param.description + " (batch-expanded)", |
| ) |
| ) |
|
|
| return outputs |
|
|
| def __call__(self, components: QwenImageModularPipeline, state: PipelineState) -> PipelineState: |
| block_state = self.get_block_state(state) |
|
|
| |
| for input_param in self._image_latent_inputs: |
| image_latent_input_name = input_param.name |
| image_latent_tensor = getattr(block_state, image_latent_input_name) |
| if image_latent_tensor is None: |
| continue |
|
|
| |
| |
| height = image_latent_tensor.shape[3] * components.vae_scale_factor |
| width = image_latent_tensor.shape[4] * components.vae_scale_factor |
| block_state.height = height |
| block_state.width = width |
|
|
| if not hasattr(block_state, "image_height"): |
| block_state.image_height = height |
| if not hasattr(block_state, "image_width"): |
| block_state.image_width = width |
|
|
| |
| image_latent_tensor = components.pachifier.pack_latents(image_latent_tensor) |
|
|
| |
| image_latent_tensor = repeat_tensor_to_batch_size( |
| input_name=image_latent_input_name, |
| input_tensor=image_latent_tensor, |
| num_images_per_prompt=block_state.num_images_per_prompt, |
| batch_size=block_state.batch_size, |
| ) |
|
|
| setattr(block_state, image_latent_input_name, image_latent_tensor) |
|
|
| |
| for input_param in self._additional_batch_inputs: |
| input_name = input_param.name |
| input_tensor = getattr(block_state, input_name) |
| if input_tensor is None: |
| continue |
|
|
| input_tensor = repeat_tensor_to_batch_size( |
| input_name=input_name, |
| input_tensor=input_tensor, |
| num_images_per_prompt=block_state.num_images_per_prompt, |
| batch_size=block_state.batch_size, |
| ) |
|
|
| setattr(block_state, input_name, input_tensor) |
|
|
| self.set_block_state(state, block_state) |
| return components, state |
|
|
|
|
| |
| class QwenImageControlNetInputsStep(ModularPipelineBlocks): |
| """ |
| prepare the `control_image_latents` for controlnet. Insert after all the other inputs steps. |
| |
| Inputs: |
| control_image_latents (`Tensor`): |
| The control image latents to use for the denoising process. Can be generated in controlnet vae encoder |
| step. |
| batch_size (`int`, *optional*, defaults to 1): |
| Number of prompts, the final batch size of model inputs should be batch_size * num_images_per_prompt. Can |
| be generated in input step. |
| num_images_per_prompt (`int`, *optional*, defaults to 1): |
| The number of images to generate per prompt. |
| height (`int`, *optional*): |
| The height in pixels of the generated image. |
| width (`int`, *optional*): |
| The width in pixels of the generated image. |
| |
| Outputs: |
| control_image_latents (`Tensor`): |
| The control image latents (patchified and batch-expanded). |
| height (`int`): |
| if not provided, updated to control image height |
| width (`int`): |
| if not provided, updated to control image width |
| """ |
|
|
| model_name = "qwenimage" |
|
|
| @property |
| def description(self) -> str: |
| return "prepare the `control_image_latents` for controlnet. Insert after all the other inputs steps." |
|
|
| @property |
| def inputs(self) -> list[InputParam]: |
| return [ |
| InputParam( |
| name="control_image_latents", |
| required=True, |
| type_hint=torch.Tensor, |
| description="The control image latents to use for the denoising process. Can be generated in controlnet vae encoder step.", |
| ), |
| InputParam.template("batch_size"), |
| InputParam.template("num_images_per_prompt"), |
| InputParam.template("height"), |
| InputParam.template("width"), |
| ] |
|
|
| @property |
| def intermediate_outputs(self) -> list[OutputParam]: |
| return [ |
| OutputParam( |
| name="control_image_latents", |
| type_hint=torch.Tensor, |
| description="The control image latents (patchified and batch-expanded).", |
| ), |
| OutputParam(name="height", type_hint=int, description="if not provided, updated to control image height"), |
| OutputParam(name="width", type_hint=int, description="if not provided, updated to control image width"), |
| ] |
|
|
| @torch.no_grad() |
| def __call__(self, components: QwenImageModularPipeline, state: PipelineState) -> PipelineState: |
| block_state = self.get_block_state(state) |
|
|
| if isinstance(components.controlnet, QwenImageMultiControlNetModel): |
| control_image_latents = [] |
| |
| for i, control_image_latents_ in enumerate(block_state.control_image_latents): |
| |
| height, width = calculate_dimension_from_latents(control_image_latents_, components.vae_scale_factor) |
| block_state.height = block_state.height or height |
| block_state.width = block_state.width or width |
|
|
| |
| control_image_latents_ = components.pachifier.pack_latents(control_image_latents_) |
|
|
| |
| control_image_latents_ = repeat_tensor_to_batch_size( |
| input_name=f"control_image_latents[{i}]", |
| input_tensor=control_image_latents_, |
| num_images_per_prompt=block_state.num_images_per_prompt, |
| batch_size=block_state.batch_size, |
| ) |
|
|
| control_image_latents.append(control_image_latents_) |
|
|
| block_state.control_image_latents = control_image_latents |
|
|
| else: |
| |
| height, width = calculate_dimension_from_latents( |
| block_state.control_image_latents, components.vae_scale_factor |
| ) |
| block_state.height = block_state.height or height |
| block_state.width = block_state.width or width |
|
|
| |
| block_state.control_image_latents = components.pachifier.pack_latents(block_state.control_image_latents) |
|
|
| |
| block_state.control_image_latents = repeat_tensor_to_batch_size( |
| input_name="control_image_latents", |
| input_tensor=block_state.control_image_latents, |
| num_images_per_prompt=block_state.num_images_per_prompt, |
| batch_size=block_state.batch_size, |
| ) |
|
|
| block_state.control_image_latents = block_state.control_image_latents |
|
|
| self.set_block_state(state, block_state) |
|
|
| return components, state |
|
|