| from typing import Any |
| import math |
| import copy |
| from torch.utils.data import DataLoader |
|
|
|
|
| def get_warmup_steps( |
| dataloader_one_pass_outside_steps: int, |
| warmup_steps: int | None = None, |
| warmup_epochs: float | None = None, |
| epoch_length: int | None = None, |
| ) -> int: |
| """ |
| Derive warmup steps according to step number or epoch number. |
| If `warmup_steps` is provided, then just return it. Otherwise, derive |
| the warmup steps by epoch length and warmup epoch number. |
| """ |
| if warmup_steps is not None: |
| return warmup_steps |
| else: |
| if epoch_length is None: |
| epoch_length = dataloader_one_pass_outside_steps |
| assert warmup_epochs is not None, "warmup_steps and warmup_epochs cannot be both None" |
| return int(epoch_length * warmup_epochs) |
|
|
|
|
| def get_dataloader_one_pass_outside_steps( |
| train_dataloader: DataLoader, |
| num_processes: int = 1, |
| ): |
| """ |
| dataloader length after DDP, close to `original_length / gpu_number` |
| """ |
| return math.ceil(len(train_dataloader) / num_processes) |
|
|
|
|
| def get_total_training_steps( |
| train_dataloader: DataLoader, |
| epochs: int, |
| num_processes: int = 1, |
| epoch_length: int | None = None |
| ): |
| """ |
| Calculate the total number of "visible" training steps. |
| |
| If `epoch_length` is provided, it is used as the fixed length for each epoch. |
| Otherwise, the function will determine the epoch length from `train_dataloader`. |
| |
| Args: |
| train_dataloader: |
| Training dataloader object. |
| epochs: |
| The total number of epochs to run. |
| num_processes: |
| The number of parallel processes used for distributed training. |
| epoch_length: |
| A fixed number of training steps for each epoch. Defaults to None. |
| |
| Returns: |
| int: The total number of training steps (i.e., `epochs * epoch_length`). |
| """ |
| |
| if epoch_length is None: |
| |
| epoch_length = get_dataloader_one_pass_outside_steps( |
| train_dataloader, num_processes |
| ) |
| return epochs * epoch_length |
|
|
|
|
| def get_dataloader_one_pass_steps_inside_accelerator( |
| dataloader_one_pass_steps: int, gradient_accumulation_steps: int, |
| num_processes: int |
| ): |
| """ |
| Calculate the number of "visible" training steps for a single pass over the dataloader |
| inside an accelerator, accounting for gradient accumulation and distributed training. |
| |
| |
| Args: |
| dataloader_one_pass_steps: |
| The number of steps (batches) in one pass over the dataset. |
| gradient_accumulation_steps: |
| The number of steps to accumulate gradients before performing a parameter update. |
| num_processes: |
| The number of parallel processes used for distributed training. |
| |
| Returns: |
| int: The total number of "visible" training steps for one pass over the dataset, |
| multiplied by the number of processes. |
| """ |
| return math.ceil( |
| dataloader_one_pass_steps / gradient_accumulation_steps |
| ) * num_processes |
|
|
|
|
| def get_steps_inside_accelerator_from_outside_steps( |
| outside_steps: int, dataloader_one_pass_outside_steps: int, |
| dataloader_one_pass_steps_inside_accelerator: int, |
| gradient_accumulation_steps: int, num_processes: int |
| ): |
| """ |
| Convert "outside" steps (as observed in wandb logger or similar context) |
| to the corresponding number of "inside" steps (for accelerate lr scheduler). |
| |
| Specifically, accelerate lr scheduler call `step()` `num_processes` times for |
| every `gradient_accumulation_steps` outside steps. |
| |
| Args: |
| outside_steps: |
| The total number of steps counted outside accelerate context. |
| dataloader_one_pass_outside_steps: |
| The number of steps (batches) to complete one pass of the dataloader |
| outside accelerate. |
| dataloader_one_pass_steps_inside_accelerator: |
| The number of `lr_scheduler.step()` calls inside accelerate, calculated via |
| `get_dataloader_one_pass_steps_inside_accelerator`. |
| gradient_accumulation_steps: |
| The number of steps to accumulate gradients. |
| num_processes: |
| The number of parallel processes (GPUs) used in distributed training. |
| |
| Returns: |
| int: The total number of `lr_scheduler.step()` calls inside accelerate that |
| correspond to the given `outside_steps`. |
| """ |
| num_dataloader_epochs_passed = outside_steps // dataloader_one_pass_outside_steps |
| remaining_outside_steps = outside_steps % dataloader_one_pass_outside_steps |
| remaining_inside_accelerator_steps = ( |
| remaining_outside_steps // gradient_accumulation_steps * num_processes |
| ) |
| |
| |
| |
| total_steps = ( |
| num_dataloader_epochs_passed* |
| dataloader_one_pass_steps_inside_accelerator + |
| remaining_inside_accelerator_steps |
| ) |
| return total_steps |
|
|
|
|
| def lr_scheduler_param_adapter( |
| config_dict: dict[str, Any], num_training_steps: int, num_warmup_steps: int |
| ) -> dict[str, Any]: |
| target_class = config_dict["_target_"] |
| return_dict = copy.deepcopy(config_dict) |
| if target_class == "transformers.get_scheduler": |
| return_dict.update({ |
| "num_training_steps": num_training_steps, |
| "num_warmup_steps": num_warmup_steps |
| }) |
|
|
| return return_dict |
|
|