| | |
| | import gc |
| | import hashlib |
| | import os |
| | import pickle |
| | import re |
| | import time |
| | import uuid |
| | from bisect import bisect_right |
| | from contextlib import contextmanager, nullcontext |
| | from typing import Callable, Dict, List, Optional, Tuple, Union |
| |
|
| | import numpy as np |
| | import torch |
| | import torch.distributed as dist |
| | import torch.nn as nn |
| | from datasets.utils.filelock import FileLock |
| | from modelscope.hub.utils.utils import get_cache_dir |
| | from transformers.integrations import is_deepspeed_zero3_enabled |
| | from transformers.utils import is_torch_cuda_available, is_torch_mps_available, is_torch_npu_available |
| |
|
| | from .env import get_dist_setting, is_dist, is_dist_ta, is_local_master, is_master |
| | from .logger import get_logger |
| | from .utils import deep_getattr |
| |
|
| | logger = get_logger() |
| |
|
| |
|
| | def _find_local_mac() -> str: |
| | mac = uuid.getnode() |
| | mac_address = ':'.join(('%012x' % mac)[i:i + 2] for i in range(0, 12, 2)) |
| | return mac_address |
| |
|
| |
|
| | def get_n_params_grads(model) -> Tuple[List[int], List[int]]: |
| | n_params, n_grads = [], [] |
| | for p in model.parameters(): |
| | if is_deepspeed_zero3_enabled(): |
| | import deepspeed |
| | context = deepspeed.zero.GatheredParameters(p) |
| | else: |
| | context = nullcontext() |
| | with context: |
| | n_params.append(p.numel()) |
| | n_grads.append(p.numel() if p.requires_grad else 0) |
| | return n_params, n_grads |
| |
|
| |
|
| | def get_model_parameter_info(model: nn.Module, name: Optional[str] = None) -> str: |
| | n_params, n_grads = get_n_params_grads(model) |
| | n_params = sum(n_params) |
| | n_grads = sum(n_grads) |
| | n_buffers = sum(p.numel() for p in model.buffers()) |
| |
|
| | if name is None: |
| | name = model.__class__.__name__ |
| |
|
| | n_params /= 1e6 |
| | n_grads /= 1e6 |
| | n_buffers /= 1e6 |
| | s = (f'{name}: ' |
| | f'{n_params:.4f}M Params ({n_grads:.4f}M Trainable ' |
| | f'[{100 * n_grads / n_params:.4f}%]), ' |
| | f'{n_buffers:.4f}M Buffers.') |
| | return s |
| |
|
| |
|
| | def find_sub_module(module: torch.nn.Module, module_name: str) -> List[torch.nn.Module]: |
| | _modules = list() |
| | for name, sub_module in module.named_modules(): |
| | if not name: |
| | continue |
| | if name.endswith(module_name): |
| | _modules.append(sub_module) |
| | return _modules |
| |
|
| |
|
| | def show_layers(model: nn.Module, max_lines: Optional[int] = 20) -> None: |
| | named_p = list(model.named_parameters()) |
| | for i, (n, p) in enumerate(named_p): |
| | if max_lines is not None and i >= max_lines: |
| | logger.info('...') |
| | break |
| | logger.info(f'[{n}]: requires_grad={p.requires_grad}, dtype={p.dtype}, device={p.device}') |
| |
|
| |
|
| | def freeze_parameters(model: nn.Module, |
| | freeze_parameters_ratio: float, |
| | freeze_parameters: List[str], |
| | freeze_parameters_regex: Optional[str] = None) -> None: |
| | if freeze_parameters_ratio > 0: |
| | n_parameters = get_n_params_grads(model)[0] |
| | n_parameters = np.array(n_parameters, dtype=np.int64) |
| | n_freeze_parameters = int(np.sum(n_parameters) * freeze_parameters_ratio) |
| | n_parameters_cs = np.cumsum(n_parameters) |
| | idx = bisect_right(n_parameters_cs, n_freeze_parameters) |
| | for _, p in zip(range(idx), model.parameters()): |
| | p.requires_grad = False |
| |
|
| | if len(freeze_parameters) > 0: |
| | for n, p in model.named_parameters(): |
| | for freeze_p in freeze_parameters: |
| | if n.startswith(freeze_p): |
| | p.requires_grad = False |
| |
|
| | if freeze_parameters_regex is not None: |
| | try: |
| | pattern = re.compile(freeze_parameters_regex) |
| | except re.error as e: |
| | logger.warning(f"Invalid freeze_parameters_regex '{freeze_parameters_regex}': {e}") |
| | return |
| |
|
| | for n, p in model.named_parameters(): |
| | if pattern.search(n): |
| | p.requires_grad = False |
| |
|
| |
|
| | def activate_parameters(model: nn.Module, |
| | additional_trainable_parameters: List[str], |
| | trainable_parameters_regex: Optional[str] = None) -> None: |
| | has_activate = False |
| | if len(additional_trainable_parameters) > 0: |
| | for n, p in model.named_parameters(): |
| | for additional_tp in additional_trainable_parameters: |
| | if n.startswith(additional_tp): |
| | p.requires_grad = True |
| | has_activate = True |
| | if not has_activate: |
| | logger.warning('len(additional_trainable_parameters) > 0 but no parameters are activated. ' |
| | f'additional_trainable_parameters: {additional_trainable_parameters}') |
| |
|
| | has_activate = False |
| | if trainable_parameters_regex is not None: |
| | try: |
| | pattern = re.compile(trainable_parameters_regex) |
| | except re.error as e: |
| | logger.warning(f"Invalid trainable_parameters_regex '{trainable_parameters_regex}': {e}") |
| | return |
| |
|
| | for n, p in model.named_parameters(): |
| | if pattern.search(n): |
| | p.requires_grad = True |
| | has_activate = True |
| |
|
| | if not has_activate: |
| | logger.warning('trainable_parameters_regex is provided but no parameters are activated. ' |
| | f'trainable_parameters_regex: {trainable_parameters_regex}') |
| |
|
| |
|
| | def time_synchronize() -> float: |
| | torch.cuda.synchronize() |
| | return time.perf_counter() |
| |
|
| |
|
| | def _get_max_memory(device_ids: List[int]) -> Dict[Union[int, str], int]: |
| | """add feat in accelerate to support MP + DDP""" |
| | import psutil |
| | |
| | for i in device_ids: |
| | _ = torch.tensor([0], device=i) |
| |
|
| | device_ids_set = set(device_ids) |
| | max_memory = {} |
| | for i in range(get_device_count()): |
| | max_memory[i] = 0 |
| | if i in device_ids_set: |
| | max_memory[i] = torch.cuda.mem_get_info(i)[0] |
| | max_memory['cpu'] = psutil.virtual_memory().available |
| | return max_memory |
| |
|
| |
|
| | def _sync_max_memory(max_memory: Dict[Union[int, str], int]) -> Dict[Union[int, str], int]: |
| | """Make sure that the model structure of MP(device_map) is the same, when using DDP.""" |
| | max_memory_list = [v for k, v in max_memory.items() if (v > 0 and k != 'cpu')] |
| | _, local_rank, world_size, _ = get_dist_setting() |
| | src_tensor = torch.tensor(max_memory_list).to(local_rank) |
| | tgt_tensor_list = [torch.zeros_like(src_tensor) for _ in range(world_size)] |
| | dist.all_gather(tgt_tensor_list, src_tensor) |
| | tgt_tensor = torch.stack(tgt_tensor_list, dim=0) |
| | new_max_memory_iter = iter(tgt_tensor.min(dim=0)[0].tolist()) |
| | new_max_memory = {} |
| | for k, v in max_memory.items(): |
| | new_max_memory[k] = v |
| | if v > 0 and k != 'cpu': |
| | new_max_memory[k] = next(new_max_memory_iter) |
| | return new_max_memory |
| |
|
| |
|
| | def find_layers( |
| | model: nn.Module, |
| | cond: Callable[[str, nn.Module], bool], |
| | sub_module: Optional[str] = None, |
| | min_name_len: Optional[int] = None, |
| | ) -> List[str]: |
| | |
| | sub_module_str = sub_module |
| | if sub_module is None: |
| | sub_module = model |
| | else: |
| | sub_module = deep_getattr(model, sub_module) |
| | inner_nodes = set() |
| | for name, module in model.named_modules(): |
| | name = re.sub(r'\d+\.', '{}.', name) |
| | if not cond(name, module): |
| | inner_nodes.add(name) |
| | target_module_names = set() |
| | for name, module in sub_module.named_modules(): |
| | if sub_module_str: |
| | name = f'{sub_module_str}.{name}' if name else sub_module_str |
| | if cond(name, module): |
| | module_name_list = name.split('.') |
| | module_name = module_name_list.pop() |
| | i = 1 |
| | for inner_node in inner_nodes: |
| | while module_name_list and inner_node.endswith(re.sub( |
| | r'\d+\.', '{}.', module_name)) or min_name_len and i < min_name_len: |
| | module_name = f'{module_name_list.pop()}.{module_name}' |
| | i += 1 |
| | target_module_names.add(module_name) |
| | return list(target_module_names) |
| |
|
| |
|
| | def find_norm(model: nn.Module) -> List[str]: |
| | |
| | return find_layers( |
| | model, |
| | lambda name, module: isinstance(module, torch.nn.LayerNorm) or 'rmsnorm' in module.__class__.__name__.lower()) |
| |
|
| |
|
| | def find_embedding(model: nn.Module) -> List[str]: |
| | return find_layers(model, lambda name, module: isinstance(module, torch.nn.Embedding)) |
| |
|
| |
|
| | def find_all_linears(model, model_arch=None, extra_layers=None, sub_module=None): |
| | if model_arch is None: |
| | from swift.llm import get_model_arch |
| | model_arch = get_model_arch(model.model_meta.model_arch) |
| | |
| | if model_arch and model_arch.lm_head: |
| | output = model_arch.lm_head |
| | idx = output.rfind('.') |
| | lm_head_name = output[idx + 1:] |
| | else: |
| | lm_head_name = 'lm_head' |
| | |
| | |
| | ignore_layers = [lm_head_name, 'score', 'v_head', 'classifier'] + ['lora_A', 'lora_B', 'base_layer'] |
| | ignore_linear_cls = [ |
| | 'glulinear' |
| | ] |
| |
|
| | def _cond(name, module): |
| | module_name = module.__class__.__name__.lower() |
| | if (extra_layers and isinstance(module, tuple(extra_layers)) or |
| | ('linear' in module_name and all(linear_cls not in module_name |
| | for linear_cls in ignore_linear_cls))) and all(layer not in name |
| | for layer in ignore_layers): |
| | return True |
| | return False |
| |
|
| | return find_layers(model, _cond, sub_module=sub_module) |
| |
|
| |
|
| | @contextmanager |
| | def safe_ddp_context(hash_id: Optional[str], use_barrier: bool = False): |
| | if use_barrier and dist.is_initialized(): |
| | if is_dist() or is_dist_ta(): |
| | if not is_master(): |
| | dist.barrier() |
| | if not is_local_master(): |
| | |
| | |
| | dist.barrier() |
| | yield |
| | if is_dist() or is_dist_ta(): |
| | if is_master(): |
| | dist.barrier() |
| | if is_local_master(): |
| | dist.barrier() |
| | elif hash_id is not None: |
| | lock_dir = os.path.join(get_cache_dir(), 'lockers') |
| | os.makedirs(lock_dir, exist_ok=True) |
| | file_path = hashlib.sha256(hash_id.encode('utf-8')).hexdigest() + '.lock' |
| | file_path = os.path.join(lock_dir, file_path) |
| | with FileLock(file_path): |
| | yield |
| | else: |
| | yield |
| |
|
| |
|
| | def get_device(local_rank: Optional[Union[str, int]] = None) -> str: |
| | if local_rank is None: |
| | local_rank = max(0, get_dist_setting()[1]) |
| | local_rank = str(local_rank) |
| | if is_torch_npu_available(): |
| | device = 'npu:{}'.format(local_rank) |
| | elif is_torch_mps_available(): |
| | device = 'mps:{}'.format(local_rank) |
| | elif is_torch_cuda_available(): |
| | device = 'cuda:{}'.format(local_rank) |
| | else: |
| | device = 'cpu' |
| |
|
| | return device |
| |
|
| |
|
| | def get_current_device(): |
| | if is_torch_npu_available(): |
| | current_device = torch.npu.current_device() |
| | elif is_torch_cuda_available(): |
| | current_device = torch.cuda.current_device() |
| | elif is_torch_mps_available(): |
| | current_device = 'mps' |
| | else: |
| | current_device = 'cpu' |
| | return current_device |
| |
|
| |
|
| | def set_device(local_rank: Optional[Union[str, int]] = None): |
| | if local_rank is None: |
| | local_rank = max(0, get_dist_setting()[1]) |
| | if is_torch_npu_available(): |
| | torch.npu.set_device(local_rank) |
| | elif is_torch_cuda_available(): |
| | torch.cuda.set_device(local_rank) |
| |
|
| |
|
| | def get_device_count() -> int: |
| | if is_torch_npu_available(): |
| | return torch.npu.device_count() |
| | elif is_torch_cuda_available(): |
| | return torch.cuda.device_count() |
| | else: |
| | return 0 |
| |
|
| |
|
| | def gc_collect() -> None: |
| | gc.collect() |
| | if is_torch_npu_available(): |
| | torch.npu.empty_cache() |
| | elif is_torch_mps_available(): |
| | torch.mps.empty_cache() |
| | elif is_torch_cuda_available(): |
| | torch.cuda.empty_cache() |
| |
|
| |
|
| | class Serializer: |
| |
|
| | @staticmethod |
| | def to_tensor(obj): |
| | res = pickle.dumps(obj) |
| | res = np.array([len(res)], dtype=np.int64).tobytes() + res |
| | res = np.frombuffer(res, dtype=np.uint8).copy() |
| | res = torch.from_numpy(res) |
| | return res |
| |
|
| | @staticmethod |
| | def from_tensor(obj): |
| | if isinstance(obj, torch.Tensor): |
| | obj = obj.cpu().numpy() |
| | res = obj.tobytes() |
| | buffer_size = np.frombuffer(res[:8], dtype=np.int64)[0] |
| | res = res[8:] |
| | return pickle.loads(res[:buffer_size]) |
| |
|
| |
|
| | def set_default_ddp_config(): |
| | |
| | rank = int(os.getenv('RANK', -1)) |
| | if rank == -1: |
| | os.environ['NPROC_PER_NODE'] = '1' |
| | os.environ['RANK'] = '0' |
| | os.environ['LOCAL_RANK'] = '0' |
| | os.environ['WORLD_SIZE'] = '1' |
| | os.environ['LOCAL_WORLD_SIZE'] = '1' |
| | os.environ['MASTER_ADDR'] = '127.0.0.1' |
| | os.environ['MASTER_PORT'] = os.environ.get('MASTER_PORT', '29500') |
| |
|
| |
|
| | def init_process_group(ddp_backend: Optional[str] = None): |
| | if dist.is_initialized(): |
| | return |
| | set_device() |
| | if ddp_backend is None: |
| | if is_torch_npu_available(): |
| | ddp_backend = 'hccl' |
| | elif torch.cuda.is_available(): |
| | ddp_backend = 'nccl' |
| | else: |
| | ddp_backend = 'gloo' |
| | dist.init_process_group(backend=ddp_backend) |
| |
|