|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import unittest |
|
|
|
|
|
import numpy as np |
|
|
import torch |
|
|
from transformers import Qwen2_5_VLConfig, Qwen2_5_VLForConditionalGeneration, Qwen2Tokenizer |
|
|
|
|
|
from diffusers import ( |
|
|
AutoencoderKLQwenImage, |
|
|
FlowMatchEulerDiscreteScheduler, |
|
|
QwenImagePipeline, |
|
|
QwenImageTransformer2DModel, |
|
|
) |
|
|
|
|
|
from ...testing_utils import enable_full_determinism, torch_device |
|
|
from ..pipeline_params import TEXT_TO_IMAGE_BATCH_PARAMS, TEXT_TO_IMAGE_IMAGE_PARAMS, TEXT_TO_IMAGE_PARAMS |
|
|
from ..test_pipelines_common import PipelineTesterMixin, to_np |
|
|
|
|
|
|
|
|
enable_full_determinism() |
|
|
|
|
|
|
|
|
class QwenImagePipelineFastTests(PipelineTesterMixin, unittest.TestCase): |
|
|
pipeline_class = QwenImagePipeline |
|
|
params = TEXT_TO_IMAGE_PARAMS - {"cross_attention_kwargs"} |
|
|
batch_params = TEXT_TO_IMAGE_BATCH_PARAMS |
|
|
image_params = TEXT_TO_IMAGE_IMAGE_PARAMS |
|
|
image_latents_params = TEXT_TO_IMAGE_IMAGE_PARAMS |
|
|
required_optional_params = frozenset( |
|
|
[ |
|
|
"num_inference_steps", |
|
|
"generator", |
|
|
"latents", |
|
|
"return_dict", |
|
|
"callback_on_step_end", |
|
|
"callback_on_step_end_tensor_inputs", |
|
|
] |
|
|
) |
|
|
supports_dduf = False |
|
|
test_xformers_attention = False |
|
|
test_layerwise_casting = True |
|
|
test_group_offloading = True |
|
|
|
|
|
def get_dummy_components(self): |
|
|
torch.manual_seed(0) |
|
|
transformer = QwenImageTransformer2DModel( |
|
|
patch_size=2, |
|
|
in_channels=16, |
|
|
out_channels=4, |
|
|
num_layers=2, |
|
|
attention_head_dim=16, |
|
|
num_attention_heads=3, |
|
|
joint_attention_dim=16, |
|
|
guidance_embeds=False, |
|
|
axes_dims_rope=(8, 4, 4), |
|
|
) |
|
|
|
|
|
torch.manual_seed(0) |
|
|
z_dim = 4 |
|
|
vae = AutoencoderKLQwenImage( |
|
|
base_dim=z_dim * 6, |
|
|
z_dim=z_dim, |
|
|
dim_mult=[1, 2, 4], |
|
|
num_res_blocks=1, |
|
|
temperal_downsample=[False, True], |
|
|
|
|
|
latents_mean=[0.0] * 4, |
|
|
latents_std=[1.0] * 4, |
|
|
|
|
|
) |
|
|
|
|
|
torch.manual_seed(0) |
|
|
scheduler = FlowMatchEulerDiscreteScheduler() |
|
|
|
|
|
torch.manual_seed(0) |
|
|
config = Qwen2_5_VLConfig( |
|
|
text_config={ |
|
|
"hidden_size": 16, |
|
|
"intermediate_size": 16, |
|
|
"num_hidden_layers": 2, |
|
|
"num_attention_heads": 2, |
|
|
"num_key_value_heads": 2, |
|
|
"rope_scaling": { |
|
|
"mrope_section": [1, 1, 2], |
|
|
"rope_type": "default", |
|
|
"type": "default", |
|
|
}, |
|
|
"rope_theta": 1000000.0, |
|
|
}, |
|
|
vision_config={ |
|
|
"depth": 2, |
|
|
"hidden_size": 16, |
|
|
"intermediate_size": 16, |
|
|
"num_heads": 2, |
|
|
"out_hidden_size": 16, |
|
|
}, |
|
|
hidden_size=16, |
|
|
vocab_size=152064, |
|
|
vision_end_token_id=151653, |
|
|
vision_start_token_id=151652, |
|
|
vision_token_id=151654, |
|
|
) |
|
|
text_encoder = Qwen2_5_VLForConditionalGeneration(config) |
|
|
tokenizer = Qwen2Tokenizer.from_pretrained("hf-internal-testing/tiny-random-Qwen2VLForConditionalGeneration") |
|
|
|
|
|
components = { |
|
|
"transformer": transformer, |
|
|
"vae": vae, |
|
|
"scheduler": scheduler, |
|
|
"text_encoder": text_encoder, |
|
|
"tokenizer": tokenizer, |
|
|
} |
|
|
return components |
|
|
|
|
|
def get_dummy_inputs(self, device, seed=0): |
|
|
if str(device).startswith("mps"): |
|
|
generator = torch.manual_seed(seed) |
|
|
else: |
|
|
generator = torch.Generator(device=device).manual_seed(seed) |
|
|
|
|
|
inputs = { |
|
|
"prompt": "dance monkey", |
|
|
"negative_prompt": "bad quality", |
|
|
"generator": generator, |
|
|
"num_inference_steps": 2, |
|
|
"guidance_scale": 3.0, |
|
|
"true_cfg_scale": 1.0, |
|
|
"height": 32, |
|
|
"width": 32, |
|
|
"max_sequence_length": 16, |
|
|
"output_type": "pt", |
|
|
} |
|
|
|
|
|
return inputs |
|
|
|
|
|
def test_inference(self): |
|
|
device = "cpu" |
|
|
|
|
|
components = self.get_dummy_components() |
|
|
pipe = self.pipeline_class(**components) |
|
|
pipe.to(device) |
|
|
pipe.set_progress_bar_config(disable=None) |
|
|
|
|
|
inputs = self.get_dummy_inputs(device) |
|
|
image = pipe(**inputs).images |
|
|
generated_image = image[0] |
|
|
self.assertEqual(generated_image.shape, (3, 32, 32)) |
|
|
|
|
|
|
|
|
expected_slice = torch.tensor([0.56331, 0.63677, 0.6015, 0.56369, 0.58166, 0.55277, 0.57176, 0.63261, 0.41466, 0.35561, 0.56229, 0.48334, 0.49714, 0.52622, 0.40872, 0.50208]) |
|
|
|
|
|
|
|
|
generated_slice = generated_image.flatten() |
|
|
generated_slice = torch.cat([generated_slice[:8], generated_slice[-8:]]) |
|
|
self.assertTrue(torch.allclose(generated_slice, expected_slice, atol=1e-3)) |
|
|
|
|
|
def test_inference_batch_single_identical(self): |
|
|
self._test_inference_batch_single_identical(batch_size=3, expected_max_diff=1e-1) |
|
|
|
|
|
def test_attention_slicing_forward_pass( |
|
|
self, test_max_difference=True, test_mean_pixel_difference=True, expected_max_diff=1e-3 |
|
|
): |
|
|
if not self.test_attention_slicing: |
|
|
return |
|
|
|
|
|
components = self.get_dummy_components() |
|
|
pipe = self.pipeline_class(**components) |
|
|
for component in pipe.components.values(): |
|
|
if hasattr(component, "set_default_attn_processor"): |
|
|
component.set_default_attn_processor() |
|
|
pipe.to(torch_device) |
|
|
pipe.set_progress_bar_config(disable=None) |
|
|
|
|
|
generator_device = "cpu" |
|
|
inputs = self.get_dummy_inputs(generator_device) |
|
|
output_without_slicing = pipe(**inputs)[0] |
|
|
|
|
|
pipe.enable_attention_slicing(slice_size=1) |
|
|
inputs = self.get_dummy_inputs(generator_device) |
|
|
output_with_slicing1 = pipe(**inputs)[0] |
|
|
|
|
|
pipe.enable_attention_slicing(slice_size=2) |
|
|
inputs = self.get_dummy_inputs(generator_device) |
|
|
output_with_slicing2 = pipe(**inputs)[0] |
|
|
|
|
|
if test_max_difference: |
|
|
max_diff1 = np.abs(to_np(output_with_slicing1) - to_np(output_without_slicing)).max() |
|
|
max_diff2 = np.abs(to_np(output_with_slicing2) - to_np(output_without_slicing)).max() |
|
|
self.assertLess( |
|
|
max(max_diff1, max_diff2), |
|
|
expected_max_diff, |
|
|
"Attention slicing should not affect the inference results", |
|
|
) |
|
|
|
|
|
def test_vae_tiling(self, expected_diff_max: float = 0.2): |
|
|
generator_device = "cpu" |
|
|
components = self.get_dummy_components() |
|
|
|
|
|
pipe = self.pipeline_class(**components) |
|
|
pipe.to("cpu") |
|
|
pipe.set_progress_bar_config(disable=None) |
|
|
|
|
|
|
|
|
inputs = self.get_dummy_inputs(generator_device) |
|
|
inputs["height"] = inputs["width"] = 128 |
|
|
output_without_tiling = pipe(**inputs)[0] |
|
|
|
|
|
|
|
|
pipe.vae.enable_tiling( |
|
|
tile_sample_min_height=96, |
|
|
tile_sample_min_width=96, |
|
|
tile_sample_stride_height=64, |
|
|
tile_sample_stride_width=64, |
|
|
) |
|
|
inputs = self.get_dummy_inputs(generator_device) |
|
|
inputs["height"] = inputs["width"] = 128 |
|
|
output_with_tiling = pipe(**inputs)[0] |
|
|
|
|
|
self.assertLess( |
|
|
(to_np(output_without_tiling) - to_np(output_with_tiling)).max(), |
|
|
expected_diff_max, |
|
|
"VAE tiling should not affect the inference results", |
|
|
) |
|
|
|