|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import gc |
|
|
import random |
|
|
import unittest |
|
|
|
|
|
import numpy as np |
|
|
import torch |
|
|
from parameterized import parameterized |
|
|
from transformers import CLIPTextConfig, CLIPTextModel, CLIPTextModelWithProjection, CLIPTokenizer |
|
|
|
|
|
import diffusers |
|
|
from diffusers import ( |
|
|
AutoencoderKL, |
|
|
EulerDiscreteScheduler, |
|
|
LCMScheduler, |
|
|
MultiAdapter, |
|
|
StableDiffusionXLAdapterPipeline, |
|
|
T2IAdapter, |
|
|
UNet2DConditionModel, |
|
|
) |
|
|
from diffusers.utils import load_image, logging |
|
|
from diffusers.utils.testing_utils import ( |
|
|
enable_full_determinism, |
|
|
floats_tensor, |
|
|
numpy_cosine_similarity_distance, |
|
|
require_torch_gpu, |
|
|
slow, |
|
|
torch_device, |
|
|
) |
|
|
|
|
|
from ..pipeline_params import TEXT_GUIDED_IMAGE_VARIATION_BATCH_PARAMS, TEXT_GUIDED_IMAGE_VARIATION_PARAMS |
|
|
from ..test_pipelines_common import ( |
|
|
IPAdapterTesterMixin, |
|
|
PipelineTesterMixin, |
|
|
SDXLOptionalComponentsTesterMixin, |
|
|
assert_mean_pixel_difference, |
|
|
) |
|
|
|
|
|
|
|
|
enable_full_determinism() |
|
|
|
|
|
|
|
|
class StableDiffusionXLAdapterPipelineFastTests( |
|
|
IPAdapterTesterMixin, PipelineTesterMixin, SDXLOptionalComponentsTesterMixin, unittest.TestCase |
|
|
): |
|
|
pipeline_class = StableDiffusionXLAdapterPipeline |
|
|
params = TEXT_GUIDED_IMAGE_VARIATION_PARAMS |
|
|
batch_params = TEXT_GUIDED_IMAGE_VARIATION_BATCH_PARAMS |
|
|
|
|
|
def get_dummy_components(self, adapter_type="full_adapter_xl", time_cond_proj_dim=None): |
|
|
torch.manual_seed(0) |
|
|
unet = UNet2DConditionModel( |
|
|
block_out_channels=(32, 64), |
|
|
layers_per_block=2, |
|
|
sample_size=32, |
|
|
in_channels=4, |
|
|
out_channels=4, |
|
|
down_block_types=("DownBlock2D", "CrossAttnDownBlock2D"), |
|
|
up_block_types=("CrossAttnUpBlock2D", "UpBlock2D"), |
|
|
|
|
|
attention_head_dim=(2, 4), |
|
|
use_linear_projection=True, |
|
|
addition_embed_type="text_time", |
|
|
addition_time_embed_dim=8, |
|
|
transformer_layers_per_block=(1, 2), |
|
|
projection_class_embeddings_input_dim=80, |
|
|
cross_attention_dim=64, |
|
|
time_cond_proj_dim=time_cond_proj_dim, |
|
|
) |
|
|
scheduler = EulerDiscreteScheduler( |
|
|
beta_start=0.00085, |
|
|
beta_end=0.012, |
|
|
steps_offset=1, |
|
|
beta_schedule="scaled_linear", |
|
|
timestep_spacing="leading", |
|
|
) |
|
|
torch.manual_seed(0) |
|
|
vae = AutoencoderKL( |
|
|
block_out_channels=[32, 64], |
|
|
in_channels=3, |
|
|
out_channels=3, |
|
|
down_block_types=["DownEncoderBlock2D", "DownEncoderBlock2D"], |
|
|
up_block_types=["UpDecoderBlock2D", "UpDecoderBlock2D"], |
|
|
latent_channels=4, |
|
|
sample_size=128, |
|
|
) |
|
|
torch.manual_seed(0) |
|
|
text_encoder_config = CLIPTextConfig( |
|
|
bos_token_id=0, |
|
|
eos_token_id=2, |
|
|
hidden_size=32, |
|
|
intermediate_size=37, |
|
|
layer_norm_eps=1e-05, |
|
|
num_attention_heads=4, |
|
|
num_hidden_layers=5, |
|
|
pad_token_id=1, |
|
|
vocab_size=1000, |
|
|
|
|
|
hidden_act="gelu", |
|
|
projection_dim=32, |
|
|
) |
|
|
text_encoder = CLIPTextModel(text_encoder_config) |
|
|
tokenizer = CLIPTokenizer.from_pretrained("hf-internal-testing/tiny-random-clip") |
|
|
|
|
|
text_encoder_2 = CLIPTextModelWithProjection(text_encoder_config) |
|
|
tokenizer_2 = CLIPTokenizer.from_pretrained("hf-internal-testing/tiny-random-clip") |
|
|
if adapter_type == "full_adapter_xl": |
|
|
adapter = T2IAdapter( |
|
|
in_channels=3, |
|
|
channels=[32, 64], |
|
|
num_res_blocks=2, |
|
|
downscale_factor=4, |
|
|
adapter_type=adapter_type, |
|
|
) |
|
|
elif adapter_type == "multi_adapter": |
|
|
adapter = MultiAdapter( |
|
|
[ |
|
|
T2IAdapter( |
|
|
in_channels=3, |
|
|
channels=[32, 64], |
|
|
num_res_blocks=2, |
|
|
downscale_factor=4, |
|
|
adapter_type="full_adapter_xl", |
|
|
), |
|
|
T2IAdapter( |
|
|
in_channels=3, |
|
|
channels=[32, 64], |
|
|
num_res_blocks=2, |
|
|
downscale_factor=4, |
|
|
adapter_type="full_adapter_xl", |
|
|
), |
|
|
] |
|
|
) |
|
|
else: |
|
|
raise ValueError( |
|
|
f"Unknown adapter type: {adapter_type}, must be one of 'full_adapter_xl', or 'multi_adapter''" |
|
|
) |
|
|
|
|
|
components = { |
|
|
"adapter": adapter, |
|
|
"unet": unet, |
|
|
"scheduler": scheduler, |
|
|
"vae": vae, |
|
|
"text_encoder": text_encoder, |
|
|
"tokenizer": tokenizer, |
|
|
"text_encoder_2": text_encoder_2, |
|
|
"tokenizer_2": tokenizer_2, |
|
|
|
|
|
"feature_extractor": None, |
|
|
"image_encoder": None, |
|
|
} |
|
|
return components |
|
|
|
|
|
def get_dummy_components_with_full_downscaling(self, adapter_type="full_adapter_xl"): |
|
|
"""Get dummy components with x8 VAE downscaling and 3 UNet down blocks. |
|
|
These dummy components are intended to fully-exercise the T2I-Adapter |
|
|
downscaling behavior. |
|
|
""" |
|
|
torch.manual_seed(0) |
|
|
unet = UNet2DConditionModel( |
|
|
block_out_channels=(32, 32, 64), |
|
|
layers_per_block=2, |
|
|
sample_size=32, |
|
|
in_channels=4, |
|
|
out_channels=4, |
|
|
down_block_types=("DownBlock2D", "CrossAttnDownBlock2D", "CrossAttnDownBlock2D"), |
|
|
up_block_types=("CrossAttnUpBlock2D", "CrossAttnUpBlock2D", "UpBlock2D"), |
|
|
|
|
|
attention_head_dim=2, |
|
|
use_linear_projection=True, |
|
|
addition_embed_type="text_time", |
|
|
addition_time_embed_dim=8, |
|
|
transformer_layers_per_block=1, |
|
|
projection_class_embeddings_input_dim=80, |
|
|
cross_attention_dim=64, |
|
|
) |
|
|
scheduler = EulerDiscreteScheduler( |
|
|
beta_start=0.00085, |
|
|
beta_end=0.012, |
|
|
steps_offset=1, |
|
|
beta_schedule="scaled_linear", |
|
|
timestep_spacing="leading", |
|
|
) |
|
|
torch.manual_seed(0) |
|
|
vae = AutoencoderKL( |
|
|
block_out_channels=[32, 32, 32, 64], |
|
|
in_channels=3, |
|
|
out_channels=3, |
|
|
down_block_types=["DownEncoderBlock2D", "DownEncoderBlock2D", "DownEncoderBlock2D", "DownEncoderBlock2D"], |
|
|
up_block_types=["UpDecoderBlock2D", "UpDecoderBlock2D", "UpDecoderBlock2D", "UpDecoderBlock2D"], |
|
|
latent_channels=4, |
|
|
sample_size=128, |
|
|
) |
|
|
torch.manual_seed(0) |
|
|
text_encoder_config = CLIPTextConfig( |
|
|
bos_token_id=0, |
|
|
eos_token_id=2, |
|
|
hidden_size=32, |
|
|
intermediate_size=37, |
|
|
layer_norm_eps=1e-05, |
|
|
num_attention_heads=4, |
|
|
num_hidden_layers=5, |
|
|
pad_token_id=1, |
|
|
vocab_size=1000, |
|
|
|
|
|
hidden_act="gelu", |
|
|
projection_dim=32, |
|
|
) |
|
|
text_encoder = CLIPTextModel(text_encoder_config) |
|
|
tokenizer = CLIPTokenizer.from_pretrained("hf-internal-testing/tiny-random-clip") |
|
|
|
|
|
text_encoder_2 = CLIPTextModelWithProjection(text_encoder_config) |
|
|
tokenizer_2 = CLIPTokenizer.from_pretrained("hf-internal-testing/tiny-random-clip") |
|
|
if adapter_type == "full_adapter_xl": |
|
|
adapter = T2IAdapter( |
|
|
in_channels=3, |
|
|
channels=[32, 32, 64], |
|
|
num_res_blocks=2, |
|
|
downscale_factor=16, |
|
|
adapter_type=adapter_type, |
|
|
) |
|
|
elif adapter_type == "multi_adapter": |
|
|
adapter = MultiAdapter( |
|
|
[ |
|
|
T2IAdapter( |
|
|
in_channels=3, |
|
|
channels=[32, 32, 64], |
|
|
num_res_blocks=2, |
|
|
downscale_factor=16, |
|
|
adapter_type="full_adapter_xl", |
|
|
), |
|
|
T2IAdapter( |
|
|
in_channels=3, |
|
|
channels=[32, 32, 64], |
|
|
num_res_blocks=2, |
|
|
downscale_factor=16, |
|
|
adapter_type="full_adapter_xl", |
|
|
), |
|
|
] |
|
|
) |
|
|
else: |
|
|
raise ValueError( |
|
|
f"Unknown adapter type: {adapter_type}, must be one of 'full_adapter_xl', or 'multi_adapter''" |
|
|
) |
|
|
|
|
|
components = { |
|
|
"adapter": adapter, |
|
|
"unet": unet, |
|
|
"scheduler": scheduler, |
|
|
"vae": vae, |
|
|
"text_encoder": text_encoder, |
|
|
"tokenizer": tokenizer, |
|
|
"text_encoder_2": text_encoder_2, |
|
|
"tokenizer_2": tokenizer_2, |
|
|
|
|
|
"feature_extractor": None, |
|
|
"image_encoder": None, |
|
|
} |
|
|
return components |
|
|
|
|
|
def get_dummy_inputs(self, device, seed=0, height=64, width=64, num_images=1): |
|
|
if num_images == 1: |
|
|
image = floats_tensor((1, 3, height, width), rng=random.Random(seed)).to(device) |
|
|
else: |
|
|
image = [ |
|
|
floats_tensor((1, 3, height, width), rng=random.Random(seed)).to(device) for _ in range(num_images) |
|
|
] |
|
|
|
|
|
if str(device).startswith("mps"): |
|
|
generator = torch.manual_seed(seed) |
|
|
else: |
|
|
generator = torch.Generator(device=device).manual_seed(seed) |
|
|
inputs = { |
|
|
"prompt": "A painting of a squirrel eating a burger", |
|
|
"image": image, |
|
|
"generator": generator, |
|
|
"num_inference_steps": 2, |
|
|
"guidance_scale": 5.0, |
|
|
"output_type": "np", |
|
|
} |
|
|
return inputs |
|
|
|
|
|
def test_ip_adapter_single(self, from_multi=False, expected_pipe_slice=None): |
|
|
if not from_multi: |
|
|
expected_pipe_slice = None |
|
|
if torch_device == "cpu": |
|
|
expected_pipe_slice = np.array( |
|
|
[0.5753, 0.6022, 0.4728, 0.4986, 0.5708, 0.4645, 0.5194, 0.5134, 0.4730] |
|
|
) |
|
|
return super().test_ip_adapter_single(expected_pipe_slice=expected_pipe_slice) |
|
|
|
|
|
def test_stable_diffusion_adapter_default_case(self): |
|
|
device = "cpu" |
|
|
components = self.get_dummy_components() |
|
|
sd_pipe = StableDiffusionXLAdapterPipeline(**components) |
|
|
sd_pipe = sd_pipe.to(device) |
|
|
sd_pipe.set_progress_bar_config(disable=None) |
|
|
|
|
|
inputs = self.get_dummy_inputs(device) |
|
|
image = sd_pipe(**inputs).images |
|
|
image_slice = image[0, -3:, -3:, -1] |
|
|
|
|
|
assert image.shape == (1, 64, 64, 3) |
|
|
expected_slice = np.array( |
|
|
[0.5752919, 0.6022097, 0.4728038, 0.49861962, 0.57084894, 0.4644975, 0.5193715, 0.5133664, 0.4729858] |
|
|
) |
|
|
assert np.abs(image_slice.flatten() - expected_slice).max() < 5e-3 |
|
|
|
|
|
@parameterized.expand( |
|
|
[ |
|
|
|
|
|
(((4 * 2 + 1) * 16),), |
|
|
|
|
|
(((4 * 1 + 1) * 32),), |
|
|
] |
|
|
) |
|
|
def test_multiple_image_dimensions(self, dim): |
|
|
"""Test that the T2I-Adapter pipeline supports any input dimension that |
|
|
is divisible by the adapter's `downscale_factor`. This test was added in |
|
|
response to an issue where the T2I Adapter's downscaling padding |
|
|
behavior did not match the UNet's behavior. |
|
|
|
|
|
Note that we have selected `dim` values to produce odd resolutions at |
|
|
each downscaling level. |
|
|
""" |
|
|
components = self.get_dummy_components_with_full_downscaling() |
|
|
sd_pipe = StableDiffusionXLAdapterPipeline(**components) |
|
|
sd_pipe = sd_pipe.to(torch_device) |
|
|
sd_pipe.set_progress_bar_config(disable=None) |
|
|
|
|
|
inputs = self.get_dummy_inputs(torch_device, height=dim, width=dim) |
|
|
image = sd_pipe(**inputs).images |
|
|
|
|
|
assert image.shape == (1, dim, dim, 3) |
|
|
|
|
|
@parameterized.expand(["full_adapter", "full_adapter_xl", "light_adapter"]) |
|
|
def test_total_downscale_factor(self, adapter_type): |
|
|
"""Test that the T2IAdapter correctly reports its total_downscale_factor.""" |
|
|
batch_size = 1 |
|
|
in_channels = 3 |
|
|
out_channels = [320, 640, 1280, 1280] |
|
|
in_image_size = 512 |
|
|
|
|
|
adapter = T2IAdapter( |
|
|
in_channels=in_channels, |
|
|
channels=out_channels, |
|
|
num_res_blocks=2, |
|
|
downscale_factor=8, |
|
|
adapter_type=adapter_type, |
|
|
) |
|
|
adapter.to(torch_device) |
|
|
|
|
|
in_image = floats_tensor((batch_size, in_channels, in_image_size, in_image_size)).to(torch_device) |
|
|
|
|
|
adapter_state = adapter(in_image) |
|
|
|
|
|
|
|
|
|
|
|
expected_out_image_size = in_image_size // adapter.total_downscale_factor |
|
|
assert adapter_state[-1].shape == ( |
|
|
batch_size, |
|
|
out_channels[-1], |
|
|
expected_out_image_size, |
|
|
expected_out_image_size, |
|
|
) |
|
|
|
|
|
def test_save_load_optional_components(self): |
|
|
return self._test_save_load_optional_components() |
|
|
|
|
|
def test_adapter_sdxl_lcm(self): |
|
|
device = "cpu" |
|
|
|
|
|
components = self.get_dummy_components(time_cond_proj_dim=256) |
|
|
sd_pipe = StableDiffusionXLAdapterPipeline(**components) |
|
|
sd_pipe.scheduler = LCMScheduler.from_config(sd_pipe.scheduler.config) |
|
|
sd_pipe = sd_pipe.to(torch_device) |
|
|
sd_pipe.set_progress_bar_config(disable=None) |
|
|
|
|
|
inputs = self.get_dummy_inputs(device) |
|
|
output = sd_pipe(**inputs) |
|
|
image = output.images |
|
|
|
|
|
image_slice = image[0, -3:, -3:, -1] |
|
|
|
|
|
assert image.shape == (1, 64, 64, 3) |
|
|
expected_slice = np.array([0.5425, 0.5385, 0.4964, 0.5045, 0.6149, 0.4974, 0.5469, 0.5332, 0.5426]) |
|
|
|
|
|
assert np.abs(image_slice.flatten() - expected_slice).max() < 1e-2 |
|
|
|
|
|
def test_adapter_sdxl_lcm_custom_timesteps(self): |
|
|
device = "cpu" |
|
|
|
|
|
components = self.get_dummy_components(time_cond_proj_dim=256) |
|
|
sd_pipe = StableDiffusionXLAdapterPipeline(**components) |
|
|
sd_pipe.scheduler = LCMScheduler.from_config(sd_pipe.scheduler.config) |
|
|
sd_pipe = sd_pipe.to(torch_device) |
|
|
sd_pipe.set_progress_bar_config(disable=None) |
|
|
|
|
|
inputs = self.get_dummy_inputs(device) |
|
|
del inputs["num_inference_steps"] |
|
|
inputs["timesteps"] = [999, 499] |
|
|
output = sd_pipe(**inputs) |
|
|
image = output.images |
|
|
|
|
|
image_slice = image[0, -3:, -3:, -1] |
|
|
|
|
|
assert image.shape == (1, 64, 64, 3) |
|
|
expected_slice = np.array([0.5425, 0.5385, 0.4964, 0.5045, 0.6149, 0.4974, 0.5469, 0.5332, 0.5426]) |
|
|
|
|
|
assert np.abs(image_slice.flatten() - expected_slice).max() < 1e-2 |
|
|
|
|
|
|
|
|
class StableDiffusionXLMultiAdapterPipelineFastTests( |
|
|
StableDiffusionXLAdapterPipelineFastTests, PipelineTesterMixin, unittest.TestCase |
|
|
): |
|
|
def get_dummy_components(self, time_cond_proj_dim=None): |
|
|
return super().get_dummy_components("multi_adapter", time_cond_proj_dim=time_cond_proj_dim) |
|
|
|
|
|
def get_dummy_components_with_full_downscaling(self): |
|
|
return super().get_dummy_components_with_full_downscaling("multi_adapter") |
|
|
|
|
|
def get_dummy_inputs(self, device, seed=0, height=64, width=64): |
|
|
inputs = super().get_dummy_inputs(device, seed, height, width, num_images=2) |
|
|
inputs["adapter_conditioning_scale"] = [0.5, 0.5] |
|
|
return inputs |
|
|
|
|
|
def test_stable_diffusion_adapter_default_case(self): |
|
|
device = "cpu" |
|
|
components = self.get_dummy_components() |
|
|
sd_pipe = StableDiffusionXLAdapterPipeline(**components) |
|
|
sd_pipe = sd_pipe.to(device) |
|
|
sd_pipe.set_progress_bar_config(disable=None) |
|
|
|
|
|
inputs = self.get_dummy_inputs(device) |
|
|
image = sd_pipe(**inputs).images |
|
|
image_slice = image[0, -3:, -3:, -1] |
|
|
|
|
|
assert image.shape == (1, 64, 64, 3) |
|
|
expected_slice = np.array( |
|
|
[0.5813032, 0.60995954, 0.47563356, 0.5056669, 0.57199144, 0.4631841, 0.5176794, 0.51252556, 0.47183886] |
|
|
) |
|
|
assert np.abs(image_slice.flatten() - expected_slice).max() < 5e-3 |
|
|
|
|
|
def test_ip_adapter_single(self): |
|
|
expected_pipe_slice = None |
|
|
if torch_device == "cpu": |
|
|
expected_pipe_slice = np.array([0.5813, 0.6100, 0.4756, 0.5057, 0.5720, 0.4632, 0.5177, 0.5125, 0.4718]) |
|
|
return super().test_ip_adapter_single(from_multi=True, expected_pipe_slice=expected_pipe_slice) |
|
|
|
|
|
def test_inference_batch_consistent( |
|
|
self, batch_sizes=[2, 4, 13], additional_params_copy_to_batched_inputs=["num_inference_steps"] |
|
|
): |
|
|
components = self.get_dummy_components() |
|
|
pipe = self.pipeline_class(**components) |
|
|
pipe.to(torch_device) |
|
|
pipe.set_progress_bar_config(disable=None) |
|
|
|
|
|
inputs = self.get_dummy_inputs(torch_device) |
|
|
|
|
|
logger = logging.get_logger(pipe.__module__) |
|
|
logger.setLevel(level=diffusers.logging.FATAL) |
|
|
|
|
|
|
|
|
for batch_size in batch_sizes: |
|
|
batched_inputs = {} |
|
|
for name, value in inputs.items(): |
|
|
if name in self.batch_params: |
|
|
|
|
|
if name == "prompt": |
|
|
len_prompt = len(value) |
|
|
|
|
|
batched_inputs[name] = [value[: len_prompt // i] for i in range(1, batch_size + 1)] |
|
|
|
|
|
|
|
|
batched_inputs[name][-1] = 100 * "very long" |
|
|
elif name == "image": |
|
|
batched_images = [] |
|
|
|
|
|
for image in value: |
|
|
batched_images.append(batch_size * [image]) |
|
|
|
|
|
batched_inputs[name] = batched_images |
|
|
else: |
|
|
batched_inputs[name] = batch_size * [value] |
|
|
|
|
|
elif name == "batch_size": |
|
|
batched_inputs[name] = batch_size |
|
|
else: |
|
|
batched_inputs[name] = value |
|
|
|
|
|
for arg in additional_params_copy_to_batched_inputs: |
|
|
batched_inputs[arg] = inputs[arg] |
|
|
|
|
|
batched_inputs["output_type"] = "np" |
|
|
|
|
|
output = pipe(**batched_inputs) |
|
|
|
|
|
assert len(output[0]) == batch_size |
|
|
|
|
|
batched_inputs["output_type"] = "np" |
|
|
|
|
|
output = pipe(**batched_inputs)[0] |
|
|
|
|
|
assert output.shape[0] == batch_size |
|
|
|
|
|
logger.setLevel(level=diffusers.logging.WARNING) |
|
|
|
|
|
def test_num_images_per_prompt(self): |
|
|
components = self.get_dummy_components() |
|
|
pipe = self.pipeline_class(**components) |
|
|
pipe = pipe.to(torch_device) |
|
|
pipe.set_progress_bar_config(disable=None) |
|
|
|
|
|
batch_sizes = [1, 2] |
|
|
num_images_per_prompts = [1, 2] |
|
|
|
|
|
for batch_size in batch_sizes: |
|
|
for num_images_per_prompt in num_images_per_prompts: |
|
|
inputs = self.get_dummy_inputs(torch_device) |
|
|
|
|
|
for key in inputs.keys(): |
|
|
if key in self.batch_params: |
|
|
if key == "image": |
|
|
batched_images = [] |
|
|
|
|
|
for image in inputs[key]: |
|
|
batched_images.append(batch_size * [image]) |
|
|
|
|
|
inputs[key] = batched_images |
|
|
else: |
|
|
inputs[key] = batch_size * [inputs[key]] |
|
|
|
|
|
images = pipe(**inputs, num_images_per_prompt=num_images_per_prompt)[0] |
|
|
|
|
|
assert images.shape[0] == batch_size * num_images_per_prompt |
|
|
|
|
|
def test_inference_batch_single_identical( |
|
|
self, |
|
|
batch_size=3, |
|
|
test_max_difference=None, |
|
|
test_mean_pixel_difference=None, |
|
|
relax_max_difference=False, |
|
|
expected_max_diff=2e-3, |
|
|
additional_params_copy_to_batched_inputs=["num_inference_steps"], |
|
|
): |
|
|
if test_max_difference is None: |
|
|
|
|
|
|
|
|
test_max_difference = torch_device != "mps" |
|
|
|
|
|
if test_mean_pixel_difference is None: |
|
|
|
|
|
test_mean_pixel_difference = torch_device != "mps" |
|
|
|
|
|
components = self.get_dummy_components() |
|
|
pipe = self.pipeline_class(**components) |
|
|
pipe.to(torch_device) |
|
|
pipe.set_progress_bar_config(disable=None) |
|
|
|
|
|
inputs = self.get_dummy_inputs(torch_device) |
|
|
|
|
|
logger = logging.get_logger(pipe.__module__) |
|
|
logger.setLevel(level=diffusers.logging.FATAL) |
|
|
|
|
|
|
|
|
batched_inputs = {} |
|
|
batch_size = batch_size |
|
|
for name, value in inputs.items(): |
|
|
if name in self.batch_params: |
|
|
|
|
|
if name == "prompt": |
|
|
len_prompt = len(value) |
|
|
|
|
|
batched_inputs[name] = [value[: len_prompt // i] for i in range(1, batch_size + 1)] |
|
|
|
|
|
|
|
|
batched_inputs[name][-1] = 100 * "very long" |
|
|
elif name == "image": |
|
|
batched_images = [] |
|
|
|
|
|
for image in value: |
|
|
batched_images.append(batch_size * [image]) |
|
|
|
|
|
batched_inputs[name] = batched_images |
|
|
else: |
|
|
batched_inputs[name] = batch_size * [value] |
|
|
elif name == "batch_size": |
|
|
batched_inputs[name] = batch_size |
|
|
elif name == "generator": |
|
|
batched_inputs[name] = [self.get_generator(i) for i in range(batch_size)] |
|
|
else: |
|
|
batched_inputs[name] = value |
|
|
|
|
|
for arg in additional_params_copy_to_batched_inputs: |
|
|
batched_inputs[arg] = inputs[arg] |
|
|
|
|
|
output_batch = pipe(**batched_inputs) |
|
|
assert output_batch[0].shape[0] == batch_size |
|
|
|
|
|
inputs["generator"] = self.get_generator(0) |
|
|
|
|
|
output = pipe(**inputs) |
|
|
|
|
|
logger.setLevel(level=diffusers.logging.WARNING) |
|
|
if test_max_difference: |
|
|
if relax_max_difference: |
|
|
|
|
|
|
|
|
diff = np.abs(output_batch[0][0] - output[0][0]) |
|
|
diff = diff.flatten() |
|
|
diff.sort() |
|
|
max_diff = np.median(diff[-5:]) |
|
|
else: |
|
|
max_diff = np.abs(output_batch[0][0] - output[0][0]).max() |
|
|
assert max_diff < expected_max_diff |
|
|
|
|
|
if test_mean_pixel_difference: |
|
|
assert_mean_pixel_difference(output_batch[0][0], output[0][0]) |
|
|
|
|
|
def test_adapter_sdxl_lcm(self): |
|
|
device = "cpu" |
|
|
|
|
|
components = self.get_dummy_components(time_cond_proj_dim=256) |
|
|
sd_pipe = StableDiffusionXLAdapterPipeline(**components) |
|
|
sd_pipe.scheduler = LCMScheduler.from_config(sd_pipe.scheduler.config) |
|
|
sd_pipe = sd_pipe.to(torch_device) |
|
|
sd_pipe.set_progress_bar_config(disable=None) |
|
|
|
|
|
inputs = self.get_dummy_inputs(device) |
|
|
output = sd_pipe(**inputs) |
|
|
image = output.images |
|
|
|
|
|
image_slice = image[0, -3:, -3:, -1] |
|
|
|
|
|
assert image.shape == (1, 64, 64, 3) |
|
|
expected_slice = np.array([0.5313, 0.5375, 0.4942, 0.5021, 0.6142, 0.4968, 0.5434, 0.5311, 0.5448]) |
|
|
|
|
|
debug = [str(round(i, 4)) for i in image_slice.flatten().tolist()] |
|
|
print(",".join(debug)) |
|
|
|
|
|
assert np.abs(image_slice.flatten() - expected_slice).max() < 1e-2 |
|
|
|
|
|
def test_adapter_sdxl_lcm_custom_timesteps(self): |
|
|
device = "cpu" |
|
|
|
|
|
components = self.get_dummy_components(time_cond_proj_dim=256) |
|
|
sd_pipe = StableDiffusionXLAdapterPipeline(**components) |
|
|
sd_pipe.scheduler = LCMScheduler.from_config(sd_pipe.scheduler.config) |
|
|
sd_pipe = sd_pipe.to(torch_device) |
|
|
sd_pipe.set_progress_bar_config(disable=None) |
|
|
|
|
|
inputs = self.get_dummy_inputs(device) |
|
|
del inputs["num_inference_steps"] |
|
|
inputs["timesteps"] = [999, 499] |
|
|
output = sd_pipe(**inputs) |
|
|
image = output.images |
|
|
|
|
|
image_slice = image[0, -3:, -3:, -1] |
|
|
|
|
|
assert image.shape == (1, 64, 64, 3) |
|
|
expected_slice = np.array([0.5313, 0.5375, 0.4942, 0.5021, 0.6142, 0.4968, 0.5434, 0.5311, 0.5448]) |
|
|
|
|
|
debug = [str(round(i, 4)) for i in image_slice.flatten().tolist()] |
|
|
print(",".join(debug)) |
|
|
|
|
|
assert np.abs(image_slice.flatten() - expected_slice).max() < 1e-2 |
|
|
|
|
|
|
|
|
@slow |
|
|
@require_torch_gpu |
|
|
class AdapterSDXLPipelineSlowTests(unittest.TestCase): |
|
|
def setUp(self): |
|
|
super().setUp() |
|
|
gc.collect() |
|
|
torch.cuda.empty_cache() |
|
|
|
|
|
def tearDown(self): |
|
|
super().tearDown() |
|
|
gc.collect() |
|
|
torch.cuda.empty_cache() |
|
|
|
|
|
def test_download_ckpt_diff_format_is_same(self): |
|
|
ckpt_path = ( |
|
|
"https://huggingface.co/stabilityai/stable-diffusion-xl-base-1.0/blob/main/sd_xl_base_1.0.safetensors" |
|
|
) |
|
|
adapter = T2IAdapter.from_pretrained("TencentARC/t2i-adapter-lineart-sdxl-1.0", torch_dtype=torch.float16) |
|
|
prompt = "toy" |
|
|
image = load_image( |
|
|
"https://huggingface.co/datasets/hf-internal-testing/diffusers-images/resolve/main/t2i_adapter/toy_canny.png" |
|
|
) |
|
|
pipe_single_file = StableDiffusionXLAdapterPipeline.from_single_file( |
|
|
ckpt_path, |
|
|
adapter=adapter, |
|
|
torch_dtype=torch.float16, |
|
|
) |
|
|
pipe_single_file.enable_model_cpu_offload() |
|
|
pipe_single_file.set_progress_bar_config(disable=None) |
|
|
|
|
|
generator = torch.Generator(device="cpu").manual_seed(0) |
|
|
images_single_file = pipe_single_file( |
|
|
prompt, image=image, generator=generator, output_type="np", num_inference_steps=3 |
|
|
).images |
|
|
|
|
|
generator = torch.Generator(device="cpu").manual_seed(0) |
|
|
pipe = StableDiffusionXLAdapterPipeline.from_pretrained( |
|
|
"stabilityai/stable-diffusion-xl-base-1.0", |
|
|
adapter=adapter, |
|
|
torch_dtype=torch.float16, |
|
|
) |
|
|
pipe.enable_model_cpu_offload() |
|
|
images = pipe(prompt, image=image, generator=generator, output_type="np", num_inference_steps=3).images |
|
|
|
|
|
assert images_single_file[0].shape == (768, 512, 3) |
|
|
assert images[0].shape == (768, 512, 3) |
|
|
|
|
|
max_diff = numpy_cosine_similarity_distance(images[0].flatten(), images_single_file[0].flatten()) |
|
|
assert max_diff < 5e-3 |
|
|
|