| import copy |
| import math |
| import os |
| import re |
| import shutil |
| import tempfile |
| import unittest |
| from concurrent.futures import ThreadPoolExecutor |
|
|
| import peft |
| import torch |
| from modelscope import Model, Preprocessor |
| from modelscope.models.nlp.structbert import SbertConfig, SbertForSequenceClassification |
| from peft import PeftModel |
| from peft.utils import WEIGHTS_NAME |
| from torch import nn |
|
|
| from swift import AdapterConfig, LoRAConfig, PromptConfig, ResTuningConfig, SideConfig, Swift, SwiftModel |
| from swift.tuners.part import Part, PartConfig |
|
|
|
|
| class TestSwift(unittest.TestCase): |
|
|
| def setUp(self): |
| print(('Testing %s.%s' % (type(self).__name__, self._testMethodName))) |
| self.tmp_dir = tempfile.TemporaryDirectory().name |
| if not os.path.exists(self.tmp_dir): |
| os.makedirs(self.tmp_dir) |
|
|
| def tearDown(self): |
| shutil.rmtree(self.tmp_dir) |
| super().tearDown() |
|
|
| def test_swift_lora_forward(self): |
|
|
| from swift.tuners.lora import Linear |
|
|
| def reset_lora_parameters(self, adapter_name, init_lora_weights): |
| if init_lora_weights is False: |
| return |
|
|
| if adapter_name in self.lora_A.keys(): |
| if init_lora_weights is True: |
| |
| |
| nn.init.kaiming_uniform_(self.lora_A[adapter_name].weight, a=math.sqrt(5)) |
| elif init_lora_weights.lower() == 'gaussian': |
| nn.init.normal_(self.lora_A[adapter_name].weight, std=1 / self.r[adapter_name]) |
| else: |
| raise ValueError(f'Unknown initialization {init_lora_weights=}') |
| nn.init.ones_(self.lora_B[adapter_name].weight) |
| if adapter_name in self.lora_embedding_A.keys(): |
| |
| nn.init.ones_(self.lora_embedding_A[adapter_name]) |
| nn.init.normal_(self.lora_embedding_B[adapter_name]) |
|
|
| Linear.reset_lora_parameters = reset_lora_parameters |
|
|
| model = Model.from_pretrained('damo/nlp_structbert_sentence-similarity_chinese-base') |
| preprocessor = Preprocessor.from_pretrained('damo/nlp_structbert_sentence-similarity_chinese-base') |
| inputs = preprocessor('how are you') |
| lora_config = LoRAConfig(target_modules=['query', 'key', 'value']) |
| outputs = model(**inputs) |
| model = Swift.prepare_model(model, config=lora_config) |
| model.eval() |
| outputs_lora = model(**inputs) |
| model.deactivate_adapter('default') |
| outputs_deactivate = model(**inputs) |
| model.activate_adapter('default') |
| outputs_reactivate = model(**inputs) |
| self.assertTrue(torch.allclose(outputs.logits, outputs_deactivate.logits)) |
| self.assertTrue(not torch.allclose(outputs.logits, outputs_lora.logits)) |
| self.assertTrue(torch.allclose(outputs_lora.logits, outputs_reactivate.logits)) |
|
|
| def test_swift_adapter_forward(self): |
| model = Model.from_pretrained('damo/nlp_structbert_sentence-similarity_chinese-base') |
| preprocessor = Preprocessor.from_pretrained('damo/nlp_structbert_sentence-similarity_chinese-base') |
| inputs = preprocessor('how are you') |
| adapter_config = AdapterConfig( |
| dim=model.config.hidden_size, |
| target_modules=r'.*layer\.\d+$', |
| method_name='feed_forward_chunk', |
| hidden_pos=0) |
| outputs = model(**inputs) |
| model = Swift.prepare_model(model, config=adapter_config) |
| outputs_lora = model(**inputs) |
| model.deactivate_adapter('default') |
| outputs_deactivate = model(**inputs) |
| model.activate_adapter('default') |
| outputs_reactivate = model(**inputs) |
| self.assertTrue(torch.allclose(outputs.logits, outputs_deactivate.logits)) |
| self.assertTrue(not torch.allclose(outputs.logits, outputs_lora.logits)) |
| self.assertTrue(torch.allclose(outputs_lora.logits, outputs_reactivate.logits)) |
|
|
| def test_swift_prompt_forward(self): |
| model = Model.from_pretrained('damo/nlp_structbert_sentence-similarity_chinese-base') |
| preprocessor = Preprocessor.from_pretrained('damo/nlp_structbert_sentence-similarity_chinese-base') |
| inputs = preprocessor('how are you') |
| prompt_config = PromptConfig( |
| dim=model.config.hidden_size, target_modules=r'.*layer\.\d+$', embedding_pos=0, attention_mask_pos=1) |
| outputs = model(**inputs) |
| model = Swift.prepare_model(model, config=prompt_config) |
| outputs_lora = model(**inputs) |
| model.deactivate_adapter('default') |
| outputs_deactivate = model(**inputs) |
| model.activate_adapter('default') |
| outputs_reactivate = model(**inputs) |
| self.assertTrue(torch.allclose(outputs.logits, outputs_deactivate.logits)) |
| self.assertTrue(not torch.allclose(outputs.logits, outputs_lora.logits)) |
| self.assertTrue(torch.allclose(outputs_lora.logits, outputs_reactivate.logits)) |
|
|
| def test_swift_restuner_forward(self): |
| model = Model.from_pretrained('damo/nlp_structbert_sentence-similarity_chinese-base') |
| preprocessor = Preprocessor.from_pretrained('damo/nlp_structbert_sentence-similarity_chinese-base') |
| inputs = preprocessor('how are you') |
| restuner_config = ResTuningConfig( |
| dims=model.config.hidden_size, |
| root_modules=r'.*layer.0$', |
| stem_modules=r'.*layer\.\d+$', |
| target_modules=r'.*pooler', |
| target_modules_hook='input', |
| tuner_cfg='res_adapter', |
| ) |
| outputs = model(**inputs) |
| model = Swift.prepare_model(model, config=restuner_config) |
| outputs_lora = model(**inputs) |
| model.deactivate_adapter('default') |
| outputs_deactivate = model(**inputs) |
| model.activate_adapter('default') |
| outputs_reactivate = model(**inputs) |
| self.assertTrue(torch.allclose(outputs.logits, outputs_deactivate.logits)) |
| self.assertTrue(not torch.allclose(outputs.logits, outputs_lora.logits)) |
| self.assertTrue(torch.allclose(outputs_lora.logits, outputs_reactivate.logits)) |
|
|
| def lora_injection_with_dtype(self, dtype=torch.float32): |
| from swift.tuners.lora import Linear |
|
|
| def reset_lora_parameters(self, adapter_name, init_lora_weights): |
| if init_lora_weights is False: |
| return |
|
|
| if adapter_name in self.lora_A.keys(): |
| if init_lora_weights is True: |
| nn.init.kaiming_uniform_(self.lora_A[adapter_name].weight, a=math.sqrt(5)) |
| elif init_lora_weights.lower() == 'gaussian': |
| nn.init.normal_(self.lora_A[adapter_name].weight, std=1 / self.r[adapter_name]) |
| else: |
| raise ValueError(f'Unknown initialization {init_lora_weights=}') |
| nn.init.ones_(self.lora_B[adapter_name].weight) |
| if adapter_name in self.lora_embedding_A.keys(): |
| |
| nn.init.ones_(self.lora_embedding_A[adapter_name]) |
| nn.init.normal_(self.lora_embedding_B[adapter_name]) |
|
|
| Linear.reset_lora_parameters = reset_lora_parameters |
|
|
| model = Model.from_pretrained('damo/nlp_structbert_sentence-similarity_chinese-base') |
| preprocessor = Preprocessor.from_pretrained('damo/nlp_structbert_sentence-similarity_chinese-base') |
| input = preprocessor('this is a test') |
| model = model.to(dtype) |
| model2 = Model.from_pretrained('damo/nlp_structbert_sentence-similarity_chinese-base') |
| model2 = model2.to(dtype) |
| lora_config = LoRAConfig(target_modules=['query', 'key', 'value']) |
| model = Swift.prepare_model(model, config=lora_config) |
| self.assertTrue(isinstance(model, SwiftModel)) |
| output1 = model(**input) |
| model.save_pretrained(self.tmp_dir) |
| self.assertTrue(os.path.exists(os.path.join(self.tmp_dir, 'default'))) |
| self.assertTrue(os.path.exists(os.path.join(self.tmp_dir, 'default', WEIGHTS_NAME))) |
|
|
| model2 = Swift.from_pretrained(model2, self.tmp_dir, adapter_name={'default': 'test'}) |
| self.assertTrue('test' in model2.adapters) |
| output2 = model2(**input) |
| self.assertTrue(torch.allclose(output1.logits, output2.logits)) |
| model2 = Swift.from_pretrained(model2, self.tmp_dir) |
| state_dict = model.state_dict() |
| state_dict2 = model2.state_dict() |
| for key in state_dict: |
| self.assertTrue(key in state_dict2) |
| self.assertTrue(all(torch.isclose(state_dict[key], state_dict2[key]).flatten().detach().cpu())) |
|
|
| if dtype == torch.float32 and os.environ.get('USE_UNIQUE_THREAD') == '1': |
| Swift.merge_and_unload(model2) |
| output3 = model2(**input) |
| self.assertTrue(torch.allclose(output1.logits, output3.logits)) |
|
|
| def test_swift_lora_injection(self): |
| self.lora_injection_with_dtype() |
|
|
| def test_swift_lora_injection_bf16(self): |
| self.lora_injection_with_dtype(torch.bfloat16) |
|
|
| def test_save_to_peft_mix(self): |
| model = SbertForSequenceClassification(SbertConfig()) |
| lora_config = LoRAConfig(target_modules=['query', 'key', 'value']) |
| adapter_config = AdapterConfig( |
| dim=model.config.hidden_size, |
| target_modules=r'.*layer\.\d+$', |
| method_name='feed_forward_chunk', |
| hidden_pos=0) |
| model = Swift.prepare_model(model, config={'lora': lora_config, 'adapter': adapter_config}) |
| model.save_pretrained(os.path.join(self.tmp_dir, 'original')) |
| try: |
| Swift.save_to_peft_format(os.path.join(self.tmp_dir, 'original'), os.path.join(self.tmp_dir, 'converted')) |
| self.assertTrue(False) |
| except AssertionError as e: |
| print(e) |
| pass |
|
|
| def test_save_to_peft_param(self): |
| model = SbertForSequenceClassification(SbertConfig()) |
| lora_config = LoRAConfig(target_modules=['query', 'key', 'value'], lora_dtype='float16') |
| model = Swift.prepare_model(model, config={'lora': lora_config}) |
| model.save_pretrained(os.path.join(self.tmp_dir, 'original')) |
| try: |
| Swift.save_to_peft_format(os.path.join(self.tmp_dir, 'original'), os.path.join(self.tmp_dir, 'converted')) |
| self.assertTrue(False) |
| except AssertionError as e: |
| print(e) |
| pass |
|
|
| def test_save_to_peft_ok(self): |
| model = SbertForSequenceClassification(SbertConfig()) |
| lora_config = LoRAConfig(target_modules=['query', 'key', 'value'], use_dora=True) |
| lora2_config = LoRAConfig(target_modules=['query', 'key', 'value'], use_dora=True) |
| model = Swift.prepare_model(model, config={'default': lora_config, 'lora': lora2_config}) |
| model.save_pretrained(os.path.join(self.tmp_dir, 'original')) |
| Swift.save_to_peft_format(os.path.join(self.tmp_dir, 'original'), os.path.join(self.tmp_dir, 'converted')) |
| |
| Swift.save_to_peft_format(os.path.join(self.tmp_dir, 'original'), os.path.join(self.tmp_dir, 'converted')) |
|
|
| |
| model2 = SbertForSequenceClassification(SbertConfig()) |
| model2 = PeftModel.from_pretrained(model2, os.path.join(self.tmp_dir, 'converted')) |
| model2.load_adapter(os.path.join(os.path.join(self.tmp_dir, 'converted'), 'lora'), 'lora') |
| state_dict = model.state_dict() |
| state_dict2 = { |
| key[len('base_model.model.'):]: value |
| for key, value in model2.state_dict().items() if 'lora' in key |
| } |
| for key in state_dict: |
| self.assertTrue(key in state_dict2) |
| self.assertTrue(all(torch.isclose(state_dict[key], state_dict2[key]).flatten().detach().cpu())) |
|
|
| |
| Swift.save_to_peft_format(os.path.join(self.tmp_dir, 'converted'), os.path.join(self.tmp_dir, 'converted')) |
| model2 = SbertForSequenceClassification(SbertConfig()) |
| model2 = PeftModel.from_pretrained(model2, os.path.join(self.tmp_dir, 'converted')) |
| model2.load_adapter(os.path.join(os.path.join(self.tmp_dir, 'converted'), 'lora'), 'lora') |
| state_dict = model.state_dict() |
| state_dict2 = { |
| key[len('base_model.model.'):]: value |
| for key, value in model2.state_dict().items() if 'lora' in key |
| } |
| for key in state_dict: |
| self.assertTrue(key in state_dict2) |
| self.assertTrue(all(torch.isclose(state_dict[key], state_dict2[key]).flatten().detach().cpu())) |
|
|
| def test_swift_multiple_adapters(self): |
| model = SbertForSequenceClassification(SbertConfig()) |
| model2 = copy.deepcopy(model) |
| lora_config = LoRAConfig(target_modules=['query', 'key', 'value']) |
| adapter_config = AdapterConfig( |
| dim=model.config.hidden_size, |
| target_modules=r'.*layer\.\d+$', |
| method_name='feed_forward_chunk', |
| hidden_pos=0) |
| model = Swift.prepare_model(model, config={'lora': lora_config, 'adapter': adapter_config}) |
| self.assertTrue(isinstance(model, SwiftModel)) |
| model.save_pretrained(self.tmp_dir, adapter_name=['lora', 'adapter']) |
| with open(os.path.join(self.tmp_dir, 'configuration.json'), 'w') as f: |
| f.write('{}') |
| self.assertTrue(os.path.exists(os.path.join(self.tmp_dir, 'lora'))) |
| self.assertTrue(os.path.exists(os.path.join(self.tmp_dir, 'lora', WEIGHTS_NAME))) |
| self.assertTrue(os.path.exists(os.path.join(self.tmp_dir, 'adapter'))) |
| self.assertTrue(os.path.exists(os.path.join(self.tmp_dir, 'adapter', WEIGHTS_NAME))) |
| model2 = Swift.from_pretrained(model2, self.tmp_dir, adapter_name=['lora', 'adapter']) |
| state_dict = model.state_dict() |
| state_dict2 = model2.state_dict() |
| for key in state_dict: |
| self.assertTrue(key in state_dict2) |
| self.assertTrue(all(torch.isclose(state_dict[key], state_dict2[key]).flatten().detach().cpu())) |
|
|
| def test_part(self): |
| preprocessor = Preprocessor.from_pretrained('damo/nlp_structbert_sentence-similarity_chinese-base') |
| inputs = preprocessor('how are you') |
| model = SbertForSequenceClassification.from_pretrained('damo/nlp_structbert_sentence-similarity_chinese-base') |
| model_origin = copy.deepcopy(model) |
| model2 = copy.deepcopy(model) |
| targets = r'.*(query|key|value).*' |
| part_config = PartConfig(target_modules=targets) |
| model = Swift.prepare_model(model, config={'part': part_config}) |
| self.assertTrue(isinstance(model, SwiftModel)) |
|
|
| model.base_model.encoder.encoder.layer[0].attention.self.query._part_part.weight.data = torch.ones_like( |
| model.base_model.encoder.encoder.layer[0].attention.self.query._part_part.weight.data) |
|
|
| for name, module in model.named_modules(): |
| if re.fullmatch(targets, name) and '_part_' not in name: |
| self.assertTrue(not module.weight.requires_grad) |
| self.assertTrue(model.get_submodule(name + '._part_part').weight.requires_grad) |
|
|
| model.save_pretrained(self.tmp_dir, adapter_name=['part']) |
| with open(os.path.join(self.tmp_dir, 'configuration.json'), 'w') as f: |
| f.write('{}') |
| self.assertTrue(os.path.exists(os.path.join(self.tmp_dir, 'part'))) |
| self.assertTrue(os.path.exists(os.path.join(self.tmp_dir, 'part', WEIGHTS_NAME))) |
| model2 = Swift.from_pretrained(model2, self.tmp_dir, adapter_name=['part']) |
| self.assertTrue( |
| all( |
| torch.isclose(model.base_model.encoder.encoder.layer[0].attention.self.query._part_part.weight.data, |
| model2.base_model.encoder.encoder.layer[0].attention.self.query._part_part.weight.data). |
| flatten().detach().cpu())) |
|
|
| state_dict = model.model.state_dict() |
| state_dict2 = model2.model.state_dict() |
| self.assertTrue(str(state_dict) == str(state_dict2)) |
|
|
| output = model(**inputs) |
| output2 = model2(**inputs) |
| output_origin = model_origin(**inputs) |
| self.assertTrue(all(torch.isclose(output.logits, output2.logits).flatten().detach().cpu())) |
| self.assertTrue(not all(torch.isclose(output_origin.logits, output2.logits).flatten().detach().cpu())) |
|
|
| model2.deactivate_adapter('part') |
| output = model(**inputs) |
| output2 = model2(**inputs) |
| output_origin = model_origin(**inputs) |
| self.assertTrue(not all(torch.isclose(output.logits, output2.logits).flatten().detach().cpu())) |
| self.assertTrue(all(torch.isclose(output_origin.logits, output2.logits).flatten().detach().cpu())) |
|
|
| model2.activate_adapter('part') |
| output = model(**inputs) |
| output2 = model2(**inputs) |
| output_origin = model_origin(**inputs) |
| self.assertTrue(all(torch.isclose(output.logits, output2.logits).flatten().detach().cpu())) |
| self.assertTrue(not all(torch.isclose(output_origin.logits, output2.logits).flatten().detach().cpu())) |
|
|
| targets = r'.*(query|key|value).*' |
| part_config = PartConfig(target_modules=targets) |
| lora_config = LoRAConfig(target_modules=targets) |
| model2 = Swift.prepare_model(model2, config={'part2': part_config}) |
| model2 = Swift.prepare_model(model2, config={'lora': lora_config}) |
| model2 = Swift.prepare_model(model2, config={'part3': part_config}) |
| model2.set_active_adapters('part2', offload='meta') |
| model2.set_active_adapters('part3', offload='meta') |
| model2.set_active_adapters('lora', offload='meta') |
| model2.set_active_adapters('part2', offload='meta') |
| self.assertTrue( |
| not model2.base_model.encoder.encoder.layer[0].attention.self.query.base_layer._part_part.activated) |
| self.assertTrue( |
| model2.base_model.encoder.encoder.layer[0].attention.self.query.base_layer._part_part2.activated) |
| model2.set_active_adapters('part', offload='meta') |
| self.assertTrue( |
| not model2.base_model.encoder.encoder.layer[0].attention.self.query.base_layer._part_part2.activated) |
| self.assertTrue(model2.base_model.encoder.encoder.layer[0].attention.self.query.base_layer._part_part.activated) |
| output = model(**inputs) |
| output2 = model2(**inputs) |
| output_origin = model_origin(**inputs) |
| self.assertTrue(all(torch.isclose(output.logits, output2.logits).flatten().detach().cpu())) |
| self.assertTrue(not all(torch.isclose(output_origin.logits, output2.logits).flatten().detach().cpu())) |
|
|
| model2.set_active_adapters('part2', offload='meta') |
| model2.deactivate_adapter('part2', offload='meta') |
| model2.deactivate_adapter('lora', offload='cpu') |
| self.assertTrue( |
| not model2.base_model.encoder.encoder.layer[0].attention.self.query.base_layer._part_part2.activated) |
| self.assertTrue( |
| not model2.base_model.encoder.encoder.layer[0].attention.self.query.base_layer._part_part.activated) |
| output = model(**inputs) |
| output2 = model2(**inputs) |
| output_origin = model_origin(**inputs) |
| self.assertTrue(not all(torch.isclose(output.logits, output2.logits).flatten().detach().cpu())) |
| self.assertTrue(all(torch.isclose(output_origin.logits, output2.logits).flatten().detach().cpu())) |
| model2.activate_adapter('lora') |
| self.assertTrue( |
| not model2.base_model.encoder.encoder.layer[0].attention.self.query.base_layer._part_part2.activated) |
| self.assertTrue( |
| not model2.base_model.encoder.encoder.layer[0].attention.self.query.base_layer._part_part.activated) |
| self.assertTrue( |
| not model2.base_model.encoder.encoder.layer[0].attention.self.query.base_layer._part_part3.activated) |
| self.assertTrue(model2.base_model.encoder.encoder.layer[0].attention.self.query.active_adapters == ['lora']) |
|
|
| def test_swift_multiple_adapters_switching(self): |
| from swift.tuners.lora import Linear |
| from swift.tuners.adapter import AdapterModule |
|
|
| def reset_lora_parameters(self, adapter_name, init_lora_weights): |
| if init_lora_weights is False: |
| return |
|
|
| if adapter_name in self.lora_A.keys(): |
| if init_lora_weights is True: |
| |
| |
| nn.init.ones_(self.lora_A[adapter_name].weight) |
| elif init_lora_weights.lower() == 'gaussian': |
| nn.init.normal_(self.lora_A[adapter_name].weight, std=1 / self.r[adapter_name]) |
| else: |
| raise ValueError(f'Unknown initialization {init_lora_weights=}') |
| nn.init.ones_(self.lora_B[adapter_name].weight) |
| if adapter_name in self.lora_embedding_A.keys(): |
| |
| nn.init.ones_(self.lora_embedding_A[adapter_name]) |
| nn.init.normal_(self.lora_embedding_B[adapter_name]) |
|
|
| Linear.reset_lora_parameters = reset_lora_parameters |
|
|
| def init_weights(self): |
|
|
| def _init_weights(m): |
| if isinstance(m, nn.Linear): |
| nn.init.ones_(m.weight) |
| nn.init.ones_(m.bias) |
|
|
| self.apply(_init_weights) |
|
|
| AdapterModule.init_weights = init_weights |
|
|
| model = Model.from_pretrained('damo/nlp_structbert_sentence-similarity_chinese-base') |
| preprocessor = Preprocessor.from_pretrained('damo/nlp_structbert_sentence-similarity_chinese-base') |
| inputs = preprocessor('how are you') |
| model1 = copy.deepcopy(model) |
| model2 = copy.deepcopy(model) |
| model1 = Swift.prepare_model( |
| model1, |
| config={ |
| 'lora1': |
| LoRAConfig(target_modules=['query', 'key', 'value']), |
| 'adapter1': |
| AdapterConfig( |
| dim=model.config.hidden_size, |
| target_modules=r'.*layer\.\d+$', |
| method_name='feed_forward_chunk', |
| hidden_pos=0) |
| }) |
| model2 = Swift.prepare_model( |
| model2, |
| config={ |
| 'lora2': |
| LoRAConfig(target_modules=['query', 'key', 'value']), |
| 'adapter2': |
| AdapterConfig( |
| dim=model.config.hidden_size, |
| target_modules=r'.*layer\.\d+$', |
| method_name='feed_forward_chunk', |
| hidden_pos=0) |
| }) |
| model = Swift.prepare_model( |
| model, |
| config={ |
| 'lora1': LoRAConfig(target_modules=['query', 'key', 'value']), |
| 'lora2': LoRAConfig(target_modules=['query', 'key', 'value']), |
| }) |
|
|
| model = Swift.prepare_model( |
| model, |
| config={ |
| 'adapter1': |
| AdapterConfig( |
| dim=model.config.hidden_size, |
| target_modules=r'.*layer\.\d+$', |
| method_name='feed_forward_chunk', |
| hidden_pos=0), |
| 'adapter2': |
| AdapterConfig( |
| dim=model.config.hidden_size, |
| target_modules=r'.*layer\.\d+$', |
| method_name='feed_forward_chunk', |
| hidden_pos=0), |
| }) |
|
|
| model.deactivate_adapter('adapter2', offload='meta') |
| model.deactivate_adapter('lora2', offload='meta') |
| outputs1 = model(**inputs) |
| outputs2 = model1(**inputs) |
| self.assertTrue(torch.allclose(outputs1.logits, outputs2.logits)) |
| model.activate_adapter('adapter2') |
| model.activate_adapter('lora2') |
| model.deactivate_adapter('adapter1', offload='meta') |
| model.deactivate_adapter('lora1', offload='meta') |
| outputs1 = model(**inputs) |
| outputs2 = model2(**inputs) |
| self.assertTrue(torch.allclose(outputs1.logits, outputs2.logits)) |
|
|
| if os.environ.get('USE_UNIQUE_THREAD') == '0': |
|
|
| def thread_func1(): |
| model1.set_active_adapters(['lora1', 'adapter1'], offload=None) |
| model.set_active_adapters(['lora1', 'adapter1'], offload=None) |
| outputs_single = model1(**inputs) |
| outputs_t1 = model(**inputs) |
| self.assertTrue(torch.allclose(outputs_single.logits, outputs_t1.logits)) |
|
|
| def thread_func2(): |
| model2.set_active_adapters(['lora2', 'adapter2'], offload=None) |
| model.set_active_adapters(['lora2', 'adapter2'], offload=None) |
| outputs_single = model2(**inputs) |
| outputs_t2 = model(**inputs) |
| self.assertTrue(torch.allclose(outputs_single.logits, outputs_t2.logits)) |
|
|
| with ThreadPoolExecutor(2) as executor: |
| f1 = executor.submit(thread_func1) |
| f2 = executor.submit(thread_func2) |
| e1 = f1.exception() |
| e2 = f2.exception() |
| if e1 is not None: |
| raise e1 |
| if e2 is not None: |
| raise e2 |
|
|
| def test_swift_side_bert(self): |
| model = Model.from_pretrained('damo/nlp_structbert_sentence-similarity_chinese-base') |
| preprocessor = Preprocessor.from_pretrained('damo/nlp_structbert_sentence-similarity_chinese-base') |
| inputs = preprocessor('how are you') |
| model2 = copy.deepcopy(model) |
| result_origin = model(**inputs).logits |
| print(f'test_swift_side_bert result_origin shape: {result_origin.shape}, ' |
| f'result_origin sum: {torch.sum(result_origin)}') |
|
|
| side_config = SideConfig( |
| dim=model.config.hidden_size, |
| target_modules=r'.*encoder.encoder', |
| side_module_name='mlp', |
| target_hidden_pos='last_hidden_state') |
|
|
| model = Swift.prepare_model(model, config=side_config) |
| result_activate = model(**inputs).logits |
| model.deactivate_adapter('default') |
| result_deactivate = model(**inputs).logits |
| model.activate_adapter('default') |
| result_reactivate = model(**inputs).logits |
| self.assertTrue(torch.allclose(result_origin, result_deactivate)) |
| self.assertTrue(not torch.allclose(result_origin, result_activate)) |
| self.assertTrue(torch.allclose(result_activate, result_reactivate)) |
| print(f'test_swift_side_bert result shape: {result_origin.shape}, result sum: {torch.sum(result_origin)}') |
|
|
| self.assertTrue(isinstance(model, SwiftModel)) |
| model.save_pretrained(self.tmp_dir) |
| self.assertTrue(os.path.exists(os.path.join(self.tmp_dir, 'default'))) |
| self.assertTrue(os.path.exists(os.path.join(self.tmp_dir, 'default', WEIGHTS_NAME))) |
|
|
| model2 = Swift.from_pretrained(model2, self.tmp_dir) |
|
|
| state_dict = model.state_dict() |
| state_dict2 = model2.state_dict() |
| for key in state_dict: |
| self.assertTrue(key in state_dict2) |
| self.assertTrue(all(torch.isclose(state_dict[key], state_dict2[key]).flatten().detach().cpu())) |
|
|
|
|
| if __name__ == '__main__': |
| unittest.main() |
|
|