|
|
|
|
|
import gc |
|
|
import hashlib |
|
|
import os |
|
|
import pickle |
|
|
import re |
|
|
import time |
|
|
import uuid |
|
|
from bisect import bisect_right |
|
|
from contextlib import contextmanager, nullcontext |
|
|
from typing import Callable, Dict, List, Optional, Tuple, Union |
|
|
|
|
|
import numpy as np |
|
|
import torch |
|
|
import torch.distributed as dist |
|
|
import torch.nn as nn |
|
|
from datasets.utils.filelock import FileLock |
|
|
from modelscope.hub.utils.utils import get_cache_dir |
|
|
from transformers.integrations import is_deepspeed_zero3_enabled |
|
|
from transformers.utils import is_torch_cuda_available, is_torch_mps_available, is_torch_npu_available |
|
|
|
|
|
from .env import get_dist_setting, is_dist, is_dist_ta, is_local_master, is_master |
|
|
from .logger import get_logger |
|
|
from .utils import deep_getattr |
|
|
|
|
|
logger = get_logger() |
|
|
|
|
|
|
|
|
def _find_local_mac() -> str: |
|
|
mac = uuid.getnode() |
|
|
mac_address = ':'.join(('%012x' % mac)[i:i + 2] for i in range(0, 12, 2)) |
|
|
return mac_address |
|
|
|
|
|
|
|
|
def get_n_params_grads(model) -> Tuple[List[int], List[int]]: |
|
|
n_params, n_grads = [], [] |
|
|
for p in model.parameters(): |
|
|
if is_deepspeed_zero3_enabled(): |
|
|
import deepspeed |
|
|
context = deepspeed.zero.GatheredParameters(p) |
|
|
else: |
|
|
context = nullcontext() |
|
|
with context: |
|
|
n_params.append(p.numel()) |
|
|
n_grads.append(p.numel() if p.requires_grad else 0) |
|
|
return n_params, n_grads |
|
|
|
|
|
|
|
|
def get_model_parameter_info(model: nn.Module, name: Optional[str] = None) -> str: |
|
|
n_params, n_grads = get_n_params_grads(model) |
|
|
n_params = sum(n_params) |
|
|
n_grads = sum(n_grads) |
|
|
n_buffers = sum(p.numel() for p in model.buffers()) |
|
|
|
|
|
if name is None: |
|
|
name = model.__class__.__name__ |
|
|
|
|
|
n_params /= 1e6 |
|
|
n_grads /= 1e6 |
|
|
n_buffers /= 1e6 |
|
|
s = (f'{name}: ' |
|
|
f'{n_params:.4f}M Params ({n_grads:.4f}M Trainable ' |
|
|
f'[{100 * n_grads / n_params:.4f}%]), ' |
|
|
f'{n_buffers:.4f}M Buffers.') |
|
|
return s |
|
|
|
|
|
|
|
|
def find_sub_module(module: torch.nn.Module, module_name: str) -> List[torch.nn.Module]: |
|
|
_modules = list() |
|
|
for name, sub_module in module.named_modules(): |
|
|
if not name: |
|
|
continue |
|
|
if name.endswith(module_name): |
|
|
_modules.append(sub_module) |
|
|
return _modules |
|
|
|
|
|
|
|
|
def show_layers(model: nn.Module, max_lines: Optional[int] = 20) -> None: |
|
|
named_p = list(model.named_parameters()) |
|
|
for i, (n, p) in enumerate(named_p): |
|
|
if max_lines is not None and i >= max_lines: |
|
|
logger.info('...') |
|
|
break |
|
|
logger.info(f'[{n}]: requires_grad={p.requires_grad}, dtype={p.dtype}, device={p.device}') |
|
|
|
|
|
|
|
|
def freeze_parameters(model: nn.Module, |
|
|
freeze_parameters_ratio: float, |
|
|
freeze_parameters: List[str], |
|
|
freeze_parameters_regex: Optional[str] = None) -> None: |
|
|
if freeze_parameters_ratio > 0: |
|
|
n_parameters = get_n_params_grads(model)[0] |
|
|
n_parameters = np.array(n_parameters, dtype=np.int64) |
|
|
n_freeze_parameters = int(np.sum(n_parameters) * freeze_parameters_ratio) |
|
|
n_parameters_cs = np.cumsum(n_parameters) |
|
|
idx = bisect_right(n_parameters_cs, n_freeze_parameters) |
|
|
for _, p in zip(range(idx), model.parameters()): |
|
|
p.requires_grad = False |
|
|
|
|
|
if len(freeze_parameters) > 0: |
|
|
for n, p in model.named_parameters(): |
|
|
for freeze_p in freeze_parameters: |
|
|
if n.startswith(freeze_p): |
|
|
p.requires_grad = False |
|
|
|
|
|
if freeze_parameters_regex is not None: |
|
|
try: |
|
|
pattern = re.compile(freeze_parameters_regex) |
|
|
except re.error as e: |
|
|
logger.warning(f"Invalid freeze_parameters_regex '{freeze_parameters_regex}': {e}") |
|
|
return |
|
|
|
|
|
for n, p in model.named_parameters(): |
|
|
if pattern.search(n): |
|
|
p.requires_grad = False |
|
|
|
|
|
|
|
|
def activate_parameters(model: nn.Module, |
|
|
additional_trainable_parameters: List[str], |
|
|
trainable_parameters_regex: Optional[str] = None) -> None: |
|
|
has_activate = False |
|
|
if len(additional_trainable_parameters) > 0: |
|
|
for n, p in model.named_parameters(): |
|
|
for additional_tp in additional_trainable_parameters: |
|
|
if n.startswith(additional_tp): |
|
|
p.requires_grad = True |
|
|
has_activate = True |
|
|
if not has_activate: |
|
|
logger.warning('len(additional_trainable_parameters) > 0 but no parameters are activated. ' |
|
|
f'additional_trainable_parameters: {additional_trainable_parameters}') |
|
|
|
|
|
has_activate = False |
|
|
if trainable_parameters_regex is not None: |
|
|
try: |
|
|
pattern = re.compile(trainable_parameters_regex) |
|
|
except re.error as e: |
|
|
logger.warning(f"Invalid trainable_parameters_regex '{trainable_parameters_regex}': {e}") |
|
|
return |
|
|
|
|
|
for n, p in model.named_parameters(): |
|
|
if pattern.search(n): |
|
|
p.requires_grad = True |
|
|
has_activate = True |
|
|
|
|
|
if not has_activate: |
|
|
logger.warning('trainable_parameters_regex is provided but no parameters are activated. ' |
|
|
f'trainable_parameters_regex: {trainable_parameters_regex}') |
|
|
|
|
|
|
|
|
def time_synchronize() -> float: |
|
|
torch.cuda.synchronize() |
|
|
return time.perf_counter() |
|
|
|
|
|
|
|
|
def _get_max_memory(device_ids: List[int]) -> Dict[Union[int, str], int]: |
|
|
"""add feat in accelerate to support MP + DDP""" |
|
|
import psutil |
|
|
|
|
|
for i in device_ids: |
|
|
_ = torch.tensor([0], device=i) |
|
|
|
|
|
device_ids_set = set(device_ids) |
|
|
max_memory = {} |
|
|
for i in range(get_device_count()): |
|
|
max_memory[i] = 0 |
|
|
if i in device_ids_set: |
|
|
max_memory[i] = torch.cuda.mem_get_info(i)[0] |
|
|
max_memory['cpu'] = psutil.virtual_memory().available |
|
|
return max_memory |
|
|
|
|
|
|
|
|
def _sync_max_memory(max_memory: Dict[Union[int, str], int]) -> Dict[Union[int, str], int]: |
|
|
"""Make sure that the model structure of MP(device_map) is the same, when using DDP.""" |
|
|
max_memory_list = [v for k, v in max_memory.items() if (v > 0 and k != 'cpu')] |
|
|
_, local_rank, world_size, _ = get_dist_setting() |
|
|
src_tensor = torch.tensor(max_memory_list).to(local_rank) |
|
|
tgt_tensor_list = [torch.zeros_like(src_tensor) for _ in range(world_size)] |
|
|
dist.all_gather(tgt_tensor_list, src_tensor) |
|
|
tgt_tensor = torch.stack(tgt_tensor_list, dim=0) |
|
|
new_max_memory_iter = iter(tgt_tensor.min(dim=0)[0].tolist()) |
|
|
new_max_memory = {} |
|
|
for k, v in max_memory.items(): |
|
|
new_max_memory[k] = v |
|
|
if v > 0 and k != 'cpu': |
|
|
new_max_memory[k] = next(new_max_memory_iter) |
|
|
return new_max_memory |
|
|
|
|
|
|
|
|
def find_layers( |
|
|
model: nn.Module, |
|
|
cond: Callable[[str, nn.Module], bool], |
|
|
sub_module: Optional[str] = None, |
|
|
min_name_len: Optional[int] = None, |
|
|
) -> List[str]: |
|
|
|
|
|
sub_module_str = sub_module |
|
|
if sub_module is None: |
|
|
sub_module = model |
|
|
else: |
|
|
sub_module = deep_getattr(model, sub_module) |
|
|
inner_nodes = set() |
|
|
for name, module in model.named_modules(): |
|
|
name = re.sub(r'\d+\.', '{}.', name) |
|
|
if not cond(name, module): |
|
|
inner_nodes.add(name) |
|
|
target_module_names = set() |
|
|
for name, module in sub_module.named_modules(): |
|
|
if sub_module_str: |
|
|
name = f'{sub_module_str}.{name}' if name else sub_module_str |
|
|
if cond(name, module): |
|
|
module_name_list = name.split('.') |
|
|
module_name = module_name_list.pop() |
|
|
i = 1 |
|
|
for inner_node in inner_nodes: |
|
|
while module_name_list and inner_node.endswith(re.sub( |
|
|
r'\d+\.', '{}.', module_name)) or min_name_len and i < min_name_len: |
|
|
module_name = f'{module_name_list.pop()}.{module_name}' |
|
|
i += 1 |
|
|
target_module_names.add(module_name) |
|
|
return list(target_module_names) |
|
|
|
|
|
|
|
|
def find_norm(model: nn.Module) -> List[str]: |
|
|
|
|
|
return find_layers( |
|
|
model, |
|
|
lambda name, module: isinstance(module, torch.nn.LayerNorm) or 'rmsnorm' in module.__class__.__name__.lower()) |
|
|
|
|
|
|
|
|
def find_embedding(model: nn.Module) -> List[str]: |
|
|
return find_layers(model, lambda name, module: isinstance(module, torch.nn.Embedding)) |
|
|
|
|
|
|
|
|
def find_all_linears(model, model_arch=None, extra_layers=None, sub_module=None): |
|
|
if model_arch is None: |
|
|
from swift.llm import get_model_arch |
|
|
model_arch = get_model_arch(model.model_meta.model_arch) |
|
|
|
|
|
if model_arch and model_arch.lm_head: |
|
|
output = model_arch.lm_head |
|
|
idx = output.rfind('.') |
|
|
lm_head_name = output[idx + 1:] |
|
|
else: |
|
|
lm_head_name = 'lm_head' |
|
|
|
|
|
|
|
|
ignore_layers = [lm_head_name, 'score', 'v_head', 'classifier'] + ['lora_A', 'lora_B', 'base_layer'] |
|
|
ignore_linear_cls = [ |
|
|
'glulinear' |
|
|
] |
|
|
|
|
|
def _cond(name, module): |
|
|
module_name = module.__class__.__name__.lower() |
|
|
if (extra_layers and isinstance(module, tuple(extra_layers)) or |
|
|
('linear' in module_name and all(linear_cls not in module_name |
|
|
for linear_cls in ignore_linear_cls))) and all(layer not in name |
|
|
for layer in ignore_layers): |
|
|
return True |
|
|
return False |
|
|
|
|
|
return find_layers(model, _cond, sub_module=sub_module) |
|
|
|
|
|
|
|
|
@contextmanager |
|
|
def safe_ddp_context(hash_id: Optional[str], use_barrier: bool = False): |
|
|
if use_barrier and dist.is_initialized(): |
|
|
if is_dist() or is_dist_ta(): |
|
|
if not is_master(): |
|
|
dist.barrier() |
|
|
if not is_local_master(): |
|
|
|
|
|
|
|
|
dist.barrier() |
|
|
yield |
|
|
if is_dist() or is_dist_ta(): |
|
|
if is_master(): |
|
|
dist.barrier() |
|
|
if is_local_master(): |
|
|
dist.barrier() |
|
|
elif hash_id is not None: |
|
|
lock_dir = os.path.join(get_cache_dir(), 'lockers') |
|
|
os.makedirs(lock_dir, exist_ok=True) |
|
|
file_path = hashlib.sha256(hash_id.encode('utf-8')).hexdigest() + '.lock' |
|
|
file_path = os.path.join(lock_dir, file_path) |
|
|
with FileLock(file_path): |
|
|
yield |
|
|
else: |
|
|
yield |
|
|
|
|
|
|
|
|
def get_device(local_rank: Optional[Union[str, int]] = None) -> str: |
|
|
if local_rank is None: |
|
|
local_rank = max(0, get_dist_setting()[1]) |
|
|
local_rank = str(local_rank) |
|
|
if is_torch_npu_available(): |
|
|
device = 'npu:{}'.format(local_rank) |
|
|
elif is_torch_mps_available(): |
|
|
device = 'mps:{}'.format(local_rank) |
|
|
elif is_torch_cuda_available(): |
|
|
device = 'cuda:{}'.format(local_rank) |
|
|
else: |
|
|
device = 'cpu' |
|
|
|
|
|
return device |
|
|
|
|
|
|
|
|
def get_current_device(): |
|
|
if is_torch_npu_available(): |
|
|
current_device = torch.npu.current_device() |
|
|
elif is_torch_cuda_available(): |
|
|
current_device = torch.cuda.current_device() |
|
|
elif is_torch_mps_available(): |
|
|
current_device = 'mps' |
|
|
else: |
|
|
current_device = 'cpu' |
|
|
return current_device |
|
|
|
|
|
|
|
|
def set_device(local_rank: Optional[Union[str, int]] = None): |
|
|
if local_rank is None: |
|
|
local_rank = max(0, get_dist_setting()[1]) |
|
|
if is_torch_npu_available(): |
|
|
torch.npu.set_device(local_rank) |
|
|
elif is_torch_cuda_available(): |
|
|
torch.cuda.set_device(local_rank) |
|
|
|
|
|
|
|
|
def get_device_count() -> int: |
|
|
if is_torch_npu_available(): |
|
|
return torch.npu.device_count() |
|
|
elif is_torch_cuda_available(): |
|
|
return torch.cuda.device_count() |
|
|
else: |
|
|
return 0 |
|
|
|
|
|
|
|
|
def gc_collect() -> None: |
|
|
gc.collect() |
|
|
if is_torch_npu_available(): |
|
|
torch.npu.empty_cache() |
|
|
elif is_torch_mps_available(): |
|
|
torch.mps.empty_cache() |
|
|
elif is_torch_cuda_available(): |
|
|
torch.cuda.empty_cache() |
|
|
|
|
|
|
|
|
class Serializer: |
|
|
|
|
|
@staticmethod |
|
|
def to_tensor(obj): |
|
|
res = pickle.dumps(obj) |
|
|
res = np.array([len(res)], dtype=np.int64).tobytes() + res |
|
|
res = np.frombuffer(res, dtype=np.uint8).copy() |
|
|
res = torch.from_numpy(res) |
|
|
return res |
|
|
|
|
|
@staticmethod |
|
|
def from_tensor(obj): |
|
|
if isinstance(obj, torch.Tensor): |
|
|
obj = obj.cpu().numpy() |
|
|
res = obj.tobytes() |
|
|
buffer_size = np.frombuffer(res[:8], dtype=np.int64)[0] |
|
|
res = res[8:] |
|
|
return pickle.loads(res[:buffer_size]) |
|
|
|
|
|
|
|
|
def set_default_ddp_config(): |
|
|
|
|
|
rank = int(os.getenv('RANK', -1)) |
|
|
if rank == -1: |
|
|
os.environ['NPROC_PER_NODE'] = '1' |
|
|
os.environ['RANK'] = '0' |
|
|
os.environ['LOCAL_RANK'] = '0' |
|
|
os.environ['WORLD_SIZE'] = '1' |
|
|
os.environ['LOCAL_WORLD_SIZE'] = '1' |
|
|
os.environ['MASTER_ADDR'] = '127.0.0.1' |
|
|
os.environ['MASTER_PORT'] = os.environ.get('MASTER_PORT', '29500') |
|
|
|
|
|
|
|
|
def init_process_group(ddp_backend: Optional[str] = None): |
|
|
if dist.is_initialized(): |
|
|
return |
|
|
set_device() |
|
|
if ddp_backend is None: |
|
|
if is_torch_npu_available(): |
|
|
ddp_backend = 'hccl' |
|
|
elif torch.cuda.is_available(): |
|
|
ddp_backend = 'nccl' |
|
|
else: |
|
|
ddp_backend = 'gloo' |
|
|
dist.init_process_group(backend=ddp_backend) |
|
|
|