| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| import copy |
| from dataclasses import asdict, replace |
|
|
| import numpy as np |
| import pytest |
| from diffusers import StableDiffusionPipeline |
|
|
| from peft import ( |
| BOFTConfig, |
| HRAConfig, |
| LoHaConfig, |
| LoKrConfig, |
| LoraConfig, |
| OFTConfig, |
| get_peft_model, |
| get_peft_model_state_dict, |
| inject_adapter_in_model, |
| set_peft_model_state_dict, |
| ) |
| from peft.tuners.tuners_utils import BaseTunerLayer |
|
|
| from .testing_common import PeftCommonTester |
| from .testing_utils import set_init_weights_false, temp_seed |
|
|
|
|
| PEFT_DIFFUSERS_SD_MODELS_TO_TEST = ["hf-internal-testing/tiny-sd-pipe"] |
| DIFFUSERS_CONFIGS = [ |
| ( |
| LoraConfig, |
| { |
| "text_encoder": { |
| "r": 8, |
| "lora_alpha": 32, |
| "target_modules": ["k_proj", "q_proj", "v_proj", "out_proj", "fc1", "fc2"], |
| "lora_dropout": 0.0, |
| "bias": "none", |
| "init_lora_weights": False, |
| }, |
| "unet": { |
| "r": 8, |
| "lora_alpha": 32, |
| "target_modules": [ |
| "proj_in", |
| "proj_out", |
| "to_k", |
| "to_q", |
| "to_v", |
| "to_out.0", |
| "ff.net.0.proj", |
| "ff.net.2", |
| ], |
| "lora_dropout": 0.0, |
| "bias": "none", |
| "init_lora_weights": False, |
| }, |
| }, |
| ), |
| ( |
| LoHaConfig, |
| { |
| "text_encoder": { |
| "r": 8, |
| "alpha": 32, |
| "target_modules": ["k_proj", "q_proj", "v_proj", "out_proj", "fc1", "fc2"], |
| "rank_dropout": 0.0, |
| "module_dropout": 0.0, |
| "init_weights": False, |
| }, |
| "unet": { |
| "r": 8, |
| "alpha": 32, |
| "target_modules": [ |
| "proj_in", |
| "proj_out", |
| "to_k", |
| "to_q", |
| "to_v", |
| "to_out.0", |
| "ff.net.0.proj", |
| "ff.net.2", |
| ], |
| "rank_dropout": 0.0, |
| "module_dropout": 0.0, |
| "init_weights": False, |
| }, |
| }, |
| ), |
| ( |
| LoKrConfig, |
| { |
| "text_encoder": { |
| "r": 8, |
| "alpha": 32, |
| "target_modules": ["k_proj", "q_proj", "v_proj", "out_proj", "fc1", "fc2"], |
| "rank_dropout": 0.0, |
| "module_dropout": 0.0, |
| "init_weights": False, |
| }, |
| "unet": { |
| "r": 8, |
| "alpha": 32, |
| "target_modules": [ |
| "proj_in", |
| "proj_out", |
| "to_k", |
| "to_q", |
| "to_v", |
| "to_out.0", |
| "ff.net.0.proj", |
| "ff.net.2", |
| ], |
| "rank_dropout": 0.0, |
| "module_dropout": 0.0, |
| "init_weights": False, |
| }, |
| }, |
| ), |
| ( |
| OFTConfig, |
| { |
| "text_encoder": { |
| "r": 1, |
| "oft_block_size": 0, |
| "target_modules": ["k_proj", "q_proj", "v_proj", "out_proj", "fc1", "fc2"], |
| "module_dropout": 0.0, |
| "init_weights": False, |
| "use_cayley_neumann": False, |
| }, |
| "unet": { |
| "r": 1, |
| "oft_block_size": 0, |
| "target_modules": [ |
| "proj_in", |
| "proj_out", |
| "to_k", |
| "to_q", |
| "to_v", |
| "to_out.0", |
| "ff.net.0.proj", |
| "ff.net.2", |
| ], |
| "module_dropout": 0.0, |
| "init_weights": False, |
| "use_cayley_neumann": False, |
| }, |
| }, |
| ), |
| ( |
| BOFTConfig, |
| { |
| "text_encoder": { |
| "boft_block_num": 1, |
| "boft_block_size": 0, |
| "target_modules": ["k_proj", "q_proj", "v_proj", "out_proj", "fc1", "fc2"], |
| "boft_dropout": 0.0, |
| "init_weights": False, |
| }, |
| "unet": { |
| "boft_block_num": 1, |
| "boft_block_size": 0, |
| "target_modules": [ |
| "proj_in", |
| "proj_out", |
| "to_k", |
| "to_q", |
| "to_v", |
| "to_out.0", |
| "ff.net.0.proj", |
| "ff.net.2", |
| ], |
| "boft_dropout": 0.0, |
| "init_weights": False, |
| }, |
| }, |
| ), |
| ( |
| HRAConfig, |
| { |
| "text_encoder": { |
| "r": 8, |
| "target_modules": ["k_proj", "q_proj", "v_proj", "out_proj", "fc1", "fc2"], |
| "init_weights": False, |
| }, |
| "unet": { |
| "r": 8, |
| "target_modules": [ |
| "proj_in", |
| "proj_out", |
| "to_k", |
| "to_q", |
| "to_v", |
| "to_out.0", |
| "ff.net.0.proj", |
| "ff.net.2", |
| ], |
| "init_weights": False, |
| }, |
| }, |
| ), |
| ] |
|
|
|
|
| def skip_if_not_lora(config_cls): |
| if config_cls != LoraConfig: |
| pytest.skip("Skipping test because it is only applicable to LoraConfig") |
|
|
|
|
| class TestStableDiffusionModel(PeftCommonTester): |
| r""" |
| Tests that diffusers StableDiffusion model works with PEFT as expected. |
| """ |
|
|
| transformers_class = StableDiffusionPipeline |
| sd_model = StableDiffusionPipeline.from_pretrained("hf-internal-testing/tiny-sd-pipe") |
|
|
| def instantiate_sd_peft(self, model_id, config_cls, config_kwargs): |
| |
| if model_id == "hf-internal-testing/tiny-sd-pipe": |
| |
| model = copy.deepcopy(self.sd_model) |
| else: |
| model = self.transformers_class.from_pretrained(model_id) |
|
|
| config_kwargs = config_kwargs.copy() |
| text_encoder_kwargs = config_kwargs.pop("text_encoder") |
| unet_kwargs = config_kwargs.pop("unet") |
| |
| for key, val in config_kwargs.items(): |
| text_encoder_kwargs[key] = val |
| unet_kwargs[key] = val |
|
|
| |
| config_text_encoder = config_cls(**text_encoder_kwargs) |
| model.text_encoder = get_peft_model(model.text_encoder, config_text_encoder) |
|
|
| |
| config_unet = config_cls(**unet_kwargs) |
| model.unet = get_peft_model(model.unet, config_unet) |
|
|
| |
| model = model.to(self.torch_device) |
|
|
| return model |
|
|
| def prepare_inputs_for_testing(self): |
| return { |
| "prompt": "a high quality digital photo of a cute corgi", |
| "num_inference_steps": 3, |
| } |
|
|
| @pytest.mark.parametrize("model_id", PEFT_DIFFUSERS_SD_MODELS_TO_TEST) |
| @pytest.mark.parametrize("config_cls,config_kwargs", DIFFUSERS_CONFIGS) |
| def test_merge_layers(self, model_id, config_cls, config_kwargs): |
| if (config_cls == LoKrConfig) and (self.torch_device not in ["cuda", "xpu"]): |
| pytest.skip("Merging test with LoKr fails without GPU") |
|
|
| |
| config_kwargs = set_init_weights_false(config_cls, config_kwargs) |
| model = self.instantiate_sd_peft(model_id, config_cls, config_kwargs) |
|
|
| |
| dummy_input = self.prepare_inputs_for_testing() |
| with temp_seed(seed=42): |
| peft_output = np.array(model(**dummy_input).images[0]).astype(np.float32) |
|
|
| |
| if config_cls not in [LoHaConfig, OFTConfig, HRAConfig]: |
| |
| model.text_encoder = model.text_encoder.merge_and_unload() |
| model.unet = model.unet.merge_and_unload() |
|
|
| |
| with temp_seed(seed=42): |
| merged_output = np.array(model(**dummy_input).images[0]).astype(np.float32) |
|
|
| |
| assert np.allclose(peft_output, merged_output, atol=1.0) |
|
|
| @pytest.mark.parametrize("model_id", PEFT_DIFFUSERS_SD_MODELS_TO_TEST) |
| @pytest.mark.parametrize("config_cls,config_kwargs", DIFFUSERS_CONFIGS) |
| def test_merge_layers_safe_merge(self, model_id, config_cls, config_kwargs): |
| if (config_cls == LoKrConfig) and (self.torch_device not in ["cuda", "xpu"]): |
| pytest.skip("Merging test with LoKr fails without GPU") |
|
|
| |
| model = self.instantiate_sd_peft(model_id, config_cls, config_kwargs) |
|
|
| |
| dummy_input = self.prepare_inputs_for_testing() |
| with temp_seed(seed=42): |
| peft_output = np.array(model(**dummy_input).images[0]).astype(np.float32) |
|
|
| |
| if config_cls not in [LoHaConfig, OFTConfig, HRAConfig]: |
| |
| model.text_encoder = model.text_encoder.merge_and_unload(safe_merge=True) |
| model.unet = model.unet.merge_and_unload(safe_merge=True) |
|
|
| |
| with temp_seed(seed=42): |
| merged_output = np.array(model(**dummy_input).images[0]).astype(np.float32) |
|
|
| |
| assert np.allclose(peft_output, merged_output, atol=1.0) |
|
|
| @pytest.mark.parametrize("model_id", PEFT_DIFFUSERS_SD_MODELS_TO_TEST) |
| @pytest.mark.parametrize("config_cls,config_kwargs", DIFFUSERS_CONFIGS) |
| def test_add_weighted_adapter_base_unchanged(self, model_id, config_cls, config_kwargs): |
| skip_if_not_lora(config_cls) |
| |
| config_kwargs = set_init_weights_false(config_cls, config_kwargs) |
| model = self.instantiate_sd_peft(model_id, config_cls, config_kwargs) |
|
|
| |
| text_encoder_adapter_name = next(iter(model.text_encoder.peft_config.keys())) |
| unet_adapter_name = next(iter(model.unet.peft_config.keys())) |
| text_encoder_adapter_config = replace(model.text_encoder.peft_config[text_encoder_adapter_name]) |
| unet_adapter_config = replace(model.unet.peft_config[unet_adapter_name]) |
|
|
| |
| model.text_encoder.add_weighted_adapter([unet_adapter_name], [0.5], "weighted_adapter_test") |
| model.unet.add_weighted_adapter([unet_adapter_name], [0.5], "weighted_adapter_test") |
|
|
| |
| assert asdict(text_encoder_adapter_config) == asdict(model.text_encoder.peft_config[text_encoder_adapter_name]) |
| assert asdict(unet_adapter_config) == asdict(model.unet.peft_config[unet_adapter_name]) |
|
|
| @pytest.mark.parametrize("model_id", PEFT_DIFFUSERS_SD_MODELS_TO_TEST) |
| @pytest.mark.parametrize("config_cls,config_kwargs", DIFFUSERS_CONFIGS) |
| def test_disable_adapter(self, model_id, config_cls, config_kwargs): |
| config_kwargs = set_init_weights_false(config_cls, config_kwargs) |
| self._test_disable_adapter(model_id, config_cls, config_kwargs) |
|
|
| @pytest.mark.parametrize("model_id", PEFT_DIFFUSERS_SD_MODELS_TO_TEST) |
| @pytest.mark.parametrize("config_cls,config_kwargs", DIFFUSERS_CONFIGS) |
| def test_load_model_low_cpu_mem_usage(self, model_id, config_cls, config_kwargs): |
| |
| pipe = self.instantiate_sd_peft(model_id, config_cls, config_kwargs) |
|
|
| te_state_dict = get_peft_model_state_dict(pipe.text_encoder) |
| unet_state_dict = get_peft_model_state_dict(pipe.unet) |
|
|
| del pipe |
| pipe = self.instantiate_sd_peft(model_id, config_cls, config_kwargs) |
|
|
| config_kwargs = config_kwargs.copy() |
| text_encoder_kwargs = config_kwargs.pop("text_encoder") |
| unet_kwargs = config_kwargs.pop("unet") |
| |
| for key, val in config_kwargs.items(): |
| text_encoder_kwargs[key] = val |
| unet_kwargs[key] = val |
|
|
| config_text_encoder = config_cls(**text_encoder_kwargs) |
| config_unet = config_cls(**unet_kwargs) |
|
|
| |
| inject_adapter_in_model(config_text_encoder, pipe.text_encoder, low_cpu_mem_usage=True) |
| |
| assert any(isinstance(module, BaseTunerLayer) for module in pipe.text_encoder.modules()) |
|
|
| assert "meta" in {p.device.type for p in pipe.text_encoder.parameters()} |
| set_peft_model_state_dict(pipe.text_encoder, te_state_dict, low_cpu_mem_usage=True) |
| assert "meta" not in {p.device.type for p in pipe.text_encoder.parameters()} |
|
|
| |
| inject_adapter_in_model(config_unet, pipe.unet, low_cpu_mem_usage=True) |
| |
| assert any(isinstance(module, BaseTunerLayer) for module in pipe.unet.modules()) |
|
|
| assert "meta" in {p.device.type for p in pipe.unet.parameters()} |
| set_peft_model_state_dict(pipe.unet, unet_state_dict, low_cpu_mem_usage=True) |
| assert "meta" not in {p.device.type for p in pipe.unet.parameters()} |
|
|