| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| |
|
| | import base64 |
| | import json |
| | import os |
| | from copy import deepcopy |
| |
|
| | from torch import optim |
| |
|
| | from ..optimizer import AcceleratedOptimizer |
| | from ..scheduler import AcceleratedScheduler |
| | from .dataclasses import DistributedType |
| | from .imports import is_bnb_available |
| | from .versions import compare_versions |
| |
|
| |
|
| | def map_pytorch_optim_to_deepspeed(optimizer): |
| | """ |
| | Args: |
| | optimizer: torch.optim.Optimizer |
| | |
| | Returns the DeepSeedCPUOptimizer (deepspeed.ops) version of the optimizer. |
| | """ |
| |
|
| | defaults = {k: v for k, v in optimizer.defaults.items() if k in ["lr", "weight_decay"]} |
| |
|
| | |
| | |
| | from deepspeed.ops.adam import DeepSpeedCPUAdam |
| |
|
| | optimizer_class = DeepSpeedCPUAdam |
| |
|
| | |
| | if compare_versions("deepspeed", ">=", "0.3.1"): |
| | defaults["adamw_mode"] = False |
| | is_adaw = isinstance(optimizer, optim.AdamW) |
| |
|
| | if is_bnb_available() and not is_adaw: |
| | import bitsandbytes.optim as bnb_opt |
| |
|
| | if isinstance(optimizer, (bnb_opt.AdamW, bnb_opt.AdamW32bit)): |
| | try: |
| | is_adaw = optimizer.optim_bits == 32 |
| | except AttributeError: |
| | is_adaw = optimizer.args.optim_bits == 32 |
| | else: |
| | is_adaw = False |
| |
|
| | if is_adaw: |
| | defaults["adamw_mode"] = True |
| |
|
| | |
| | if compare_versions("deepspeed", ">=", "0.5.5"): |
| | |
| | is_ada = isinstance(optimizer, optim.Adagrad) |
| | |
| | |
| | if is_bnb_available() and not is_ada: |
| | import bitsandbytes.optim as bnb_opt |
| |
|
| | if isinstance(optimizer, (bnb_opt.Adagrad, bnb_opt.Adagrad32bit)): |
| | try: |
| | is_ada = optimizer.optim_bits == 32 |
| | except AttributeError: |
| | is_ada = optimizer.args.optim_bits == 32 |
| | if is_ada: |
| | from deepspeed.ops.adagrad import DeepSpeedCPUAdagrad |
| |
|
| | optimizer_class = DeepSpeedCPUAdagrad |
| |
|
| | |
| | if is_bnb_available(min_version="0.38.0") and compare_versions("deepspeed", ">=", "0.11.0"): |
| | from bitsandbytes.optim import Lion, Lion32bit |
| |
|
| | if isinstance(optimizer, (Lion, Lion32bit)): |
| | try: |
| | is_bnb_32bits = optimizer.optim_bits == 32 |
| | except AttributeError: |
| | is_bnb_32bits = optimizer.args.optim_bits == 32 |
| | if is_bnb_32bits: |
| | from deepspeed.ops.lion import DeepSpeedCPULion |
| |
|
| | optimizer_class = DeepSpeedCPULion |
| |
|
| | return optimizer_class(optimizer.param_groups, **defaults) |
| |
|
| |
|
| | def get_active_deepspeed_plugin(state): |
| | """ |
| | Returns the currently active DeepSpeedPlugin. |
| | |
| | Raises: |
| | ValueError: If DeepSpeed was not enabled and this function is called. |
| | """ |
| | if state.distributed_type != DistributedType.DEEPSPEED: |
| | raise ValueError( |
| | "Couldn't retrieve the active `DeepSpeedPlugin` as none were enabled. " |
| | "Please make sure that either `Accelerator` is configured for `deepspeed` " |
| | "or make sure that the desired `DeepSpeedPlugin` has been enabled (`AcceleratorState().select_deepspeed_plugin(name)`) " |
| | "before calling this function." |
| | ) |
| | if not isinstance(state.deepspeed_plugins, dict): |
| | return state.deepspeed_plugins |
| | return next(plugin for plugin in state.deepspeed_plugins.values() if plugin.selected) |
| |
|
| |
|
| | class HfDeepSpeedConfig: |
| | """ |
| | This object contains a DeepSpeed configuration dictionary and can be quickly queried for things like zero stage. |
| | |
| | A `weakref` of this object is stored in the module's globals to be able to access the config from areas where |
| | things like the Trainer object is not available (e.g. `from_pretrained` and `_get_resized_embeddings`). Therefore |
| | it's important that this object remains alive while the program is still running. |
| | |
| | [`Trainer`] uses the `HfTrainerDeepSpeedConfig` subclass instead. That subclass has logic to sync the configuration |
| | with values of [`TrainingArguments`] by replacing special placeholder values: `"auto"`. Without this special logic |
| | the DeepSpeed configuration is not modified in any way. |
| | |
| | Args: |
| | config_file_or_dict (`Union[str, Dict]`): path to DeepSpeed config file or dict. |
| | |
| | """ |
| |
|
| | def __init__(self, config_file_or_dict): |
| | if isinstance(config_file_or_dict, dict): |
| | |
| | |
| | config = deepcopy(config_file_or_dict) |
| | elif os.path.exists(config_file_or_dict): |
| | with open(config_file_or_dict, encoding="utf-8") as f: |
| | config = json.load(f) |
| | else: |
| | try: |
| | try: |
| | |
| | config = json.loads(config_file_or_dict) |
| | except json.JSONDecodeError: |
| | |
| | config_decoded = base64.urlsafe_b64decode(config_file_or_dict).decode("utf-8") |
| | config = json.loads(config_decoded) |
| | except (UnicodeDecodeError, AttributeError, ValueError): |
| | raise ValueError( |
| | f"Expected a string path to an existing deepspeed config, or a dictionary, or a base64 encoded string. Received: {config_file_or_dict}" |
| | ) |
| |
|
| | self.config = config |
| |
|
| | self.set_stage_and_offload() |
| |
|
| | def set_stage_and_offload(self): |
| | |
| | |
| | |
| | self._stage = self.get_value("zero_optimization.stage", -1) |
| |
|
| | |
| | self._offload = False |
| | if self.is_zero2() or self.is_zero3(): |
| | offload_devices_valid = set(["cpu", "nvme"]) |
| | offload_devices = set( |
| | [ |
| | self.get_value("zero_optimization.offload_optimizer.device"), |
| | self.get_value("zero_optimization.offload_param.device"), |
| | ] |
| | ) |
| | if len(offload_devices & offload_devices_valid) > 0: |
| | self._offload = True |
| |
|
| | def find_config_node(self, ds_key_long): |
| | config = self.config |
| |
|
| | |
| | nodes = ds_key_long.split(".") |
| | ds_key = nodes.pop() |
| | for node in nodes: |
| | config = config.get(node) |
| | if config is None: |
| | return None, ds_key |
| |
|
| | return config, ds_key |
| |
|
| | def get_value(self, ds_key_long, default=None): |
| | """ |
| | Returns the set value or `default` if no value is set |
| | """ |
| | config, ds_key = self.find_config_node(ds_key_long) |
| | if config is None: |
| | return default |
| | return config.get(ds_key, default) |
| |
|
| | def del_config_sub_tree(self, ds_key_long, must_exist=False): |
| | """ |
| | Deletes a sub-section of the config file if it's found. |
| | |
| | Unless `must_exist` is `True` the section doesn't have to exist. |
| | """ |
| | config = self.config |
| |
|
| | |
| | nodes = ds_key_long.split(".") |
| | for node in nodes: |
| | parent_config = config |
| | config = config.get(node) |
| | if config is None: |
| | if must_exist: |
| | raise ValueError(f"Can't find {ds_key_long} entry in the config: {self.config}") |
| | else: |
| | return |
| |
|
| | |
| | if parent_config is not None: |
| | parent_config.pop(node) |
| |
|
| | def is_true(self, ds_key_long): |
| | """ |
| | Returns `True`/``False` only if the value is set, always `False` otherwise. So use this method to ask the very |
| | specific question of whether the value is set to `True` (and it's not set to `False`` or isn't set). |
| | |
| | """ |
| | value = self.get_value(ds_key_long) |
| | return False if value is None else bool(value) |
| |
|
| | def is_false(self, ds_key_long): |
| | """ |
| | Returns `True`/``False` only if the value is set, always `False` otherwise. So use this method to ask the very |
| | specific question of whether the value is set to `False` (and it's not set to `True`` or isn't set). |
| | """ |
| | value = self.get_value(ds_key_long) |
| | return False if value is None else not bool(value) |
| |
|
| | def is_zero2(self): |
| | return self._stage == 2 |
| |
|
| | def is_zero3(self): |
| | return self._stage == 3 |
| |
|
| | def is_offload(self): |
| | return self._offload |
| |
|
| |
|
| | class DeepSpeedEngineWrapper: |
| | """ |
| | Internal wrapper for deepspeed.runtime.engine.DeepSpeedEngine. This is used to follow conventional training loop. |
| | |
| | Args: |
| | engine (deepspeed.runtime.engine.DeepSpeedEngine): deepspeed engine to wrap |
| | """ |
| |
|
| | def __init__(self, engine): |
| | self.engine = engine |
| |
|
| | def backward(self, loss, sync_gradients=True, **kwargs): |
| | |
| | |
| | self.engine.set_gradient_accumulation_boundary(is_boundary=sync_gradients) |
| |
|
| | |
| | self.engine.backward(loss, **kwargs) |
| |
|
| | |
| | if sync_gradients: |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | self.engine.step() |
| | |
| | |
| | |
| |
|
| | def get_global_grad_norm(self): |
| | """Get the global gradient norm from DeepSpeed engine.""" |
| | grad_norm = self.engine.get_global_grad_norm() |
| | |
| | if hasattr(grad_norm, "item"): |
| | return grad_norm.item() |
| | return grad_norm |
| |
|
| |
|
| | class DeepSpeedOptimizerWrapper(AcceleratedOptimizer): |
| | """ |
| | Internal wrapper around a deepspeed optimizer. |
| | |
| | Args: |
| | optimizer (`torch.optim.optimizer.Optimizer`): |
| | The optimizer to wrap. |
| | """ |
| |
|
| | def __init__(self, optimizer): |
| | super().__init__(optimizer, device_placement=False, scaler=None) |
| | self.__has_overflow__ = hasattr(self.optimizer, "overflow") |
| |
|
| | def zero_grad(self, set_to_none=None): |
| | pass |
| |
|
| | def step(self): |
| | pass |
| |
|
| | @property |
| | def step_was_skipped(self): |
| | """Whether or not the optimizer step was done, or skipped because of gradient overflow.""" |
| | if self.__has_overflow__: |
| | return self.optimizer.overflow |
| | return False |
| |
|
| |
|
| | class DeepSpeedSchedulerWrapper(AcceleratedScheduler): |
| | """ |
| | Internal wrapper around a deepspeed scheduler. |
| | |
| | Args: |
| | scheduler (`torch.optim.lr_scheduler.LambdaLR`): |
| | The scheduler to wrap. |
| | optimizers (one or a list of `torch.optim.Optimizer`): |
| | """ |
| |
|
| | def __init__(self, scheduler, optimizers): |
| | super().__init__(scheduler, optimizers) |
| |
|
| | def step(self): |
| | pass |
| |
|
| |
|
| | class DummyOptim: |
| | """ |
| | Dummy optimizer presents model parameters or param groups, this is primarily used to follow conventional training |
| | loop when optimizer config is specified in the deepspeed config file. |
| | |
| | Args: |
| | lr (float): |
| | Learning rate. |
| | params (iterable): iterable of parameters to optimize or dicts defining |
| | parameter groups |
| | weight_decay (float): |
| | Weight decay. |
| | **kwargs (additional keyword arguments, *optional*): |
| | Other arguments. |
| | """ |
| |
|
| | def __init__(self, params, lr=0.001, weight_decay=0, **kwargs): |
| | self.params = params |
| | self.lr = lr |
| | self.weight_decay = weight_decay |
| | self.kwargs = kwargs |
| |
|
| |
|
| | class DummyScheduler: |
| | """ |
| | Dummy scheduler presents model parameters or param groups, this is primarily used to follow conventional training |
| | loop when scheduler config is specified in the deepspeed config file. |
| | |
| | Args: |
| | optimizer (`torch.optim.optimizer.Optimizer`): |
| | The optimizer to wrap. |
| | total_num_steps (int, *optional*): |
| | Total number of steps. |
| | warmup_num_steps (int, *optional*): |
| | Number of steps for warmup. |
| | lr_scheduler_callable (callable, *optional*): |
| | A callable function that creates an LR Scheduler. It accepts only one argument `optimizer`. |
| | **kwargs (additional keyword arguments, *optional*): |
| | Other arguments. |
| | """ |
| |
|
| | def __init__(self, optimizer, total_num_steps=None, warmup_num_steps=0, lr_scheduler_callable=None, **kwargs): |
| | self.optimizer = optimizer |
| | self.total_num_steps = total_num_steps |
| | self.warmup_num_steps = warmup_num_steps |
| | self.lr_scheduler_callable = lr_scheduler_callable |
| | self.kwargs = kwargs |
| |
|