|
|
from typing import Any |
|
|
import math |
|
|
import copy |
|
|
from torch.utils.data import DataLoader |
|
|
|
|
|
|
|
|
def get_warmup_steps( |
|
|
dataloader_one_pass_outside_steps: int, |
|
|
warmup_steps: int | None = None, |
|
|
warmup_epochs: float | None = None, |
|
|
epoch_length: int | None = None, |
|
|
) -> int: |
|
|
""" |
|
|
Derive warmup steps according to step number or epoch number. |
|
|
If `warmup_steps` is provided, then just return it. Otherwise, derive |
|
|
the warmup steps by epoch length and warmup epoch number. |
|
|
""" |
|
|
if warmup_steps is not None: |
|
|
return warmup_steps |
|
|
else: |
|
|
if epoch_length is None: |
|
|
epoch_length = dataloader_one_pass_outside_steps |
|
|
assert warmup_epochs is not None, "warmup_steps and warmup_epochs cannot be both None" |
|
|
return int(epoch_length * warmup_epochs) |
|
|
|
|
|
|
|
|
def get_dataloader_one_pass_outside_steps( |
|
|
train_dataloader: DataLoader, |
|
|
num_processes: int = 1, |
|
|
): |
|
|
""" |
|
|
dataloader length after DDP, close to `original_length / gpu_number` |
|
|
""" |
|
|
return math.ceil(len(train_dataloader) / num_processes) |
|
|
|
|
|
|
|
|
def get_total_training_steps( |
|
|
train_dataloader: DataLoader, |
|
|
epochs: int, |
|
|
num_processes: int = 1, |
|
|
epoch_length: int | None = None |
|
|
): |
|
|
""" |
|
|
Calculate the total number of "visible" training steps. |
|
|
|
|
|
If `epoch_length` is provided, it is used as the fixed length for each epoch. |
|
|
Otherwise, the function will determine the epoch length from `train_dataloader`. |
|
|
|
|
|
Args: |
|
|
train_dataloader: |
|
|
Training dataloader object. |
|
|
epochs: |
|
|
The total number of epochs to run. |
|
|
num_processes: |
|
|
The number of parallel processes used for distributed training. |
|
|
epoch_length: |
|
|
A fixed number of training steps for each epoch. Defaults to None. |
|
|
|
|
|
Returns: |
|
|
int: The total number of training steps (i.e., `epochs * epoch_length`). |
|
|
""" |
|
|
|
|
|
if epoch_length is None: |
|
|
|
|
|
epoch_length = get_dataloader_one_pass_outside_steps( |
|
|
train_dataloader, num_processes |
|
|
) |
|
|
return epochs * epoch_length |
|
|
|
|
|
|
|
|
def get_dataloader_one_pass_steps_inside_accelerator( |
|
|
dataloader_one_pass_steps: int, gradient_accumulation_steps: int, |
|
|
num_processes: int |
|
|
): |
|
|
""" |
|
|
Calculate the number of "visible" training steps for a single pass over the dataloader |
|
|
inside an accelerator, accounting for gradient accumulation and distributed training. |
|
|
|
|
|
|
|
|
Args: |
|
|
dataloader_one_pass_steps: |
|
|
The number of steps (batches) in one pass over the dataset. |
|
|
gradient_accumulation_steps: |
|
|
The number of steps to accumulate gradients before performing a parameter update. |
|
|
num_processes: |
|
|
The number of parallel processes used for distributed training. |
|
|
|
|
|
Returns: |
|
|
int: The total number of "visible" training steps for one pass over the dataset, |
|
|
multiplied by the number of processes. |
|
|
""" |
|
|
return math.ceil( |
|
|
dataloader_one_pass_steps / gradient_accumulation_steps |
|
|
) * num_processes |
|
|
|
|
|
|
|
|
def get_steps_inside_accelerator_from_outside_steps( |
|
|
outside_steps: int, dataloader_one_pass_outside_steps: int, |
|
|
dataloader_one_pass_steps_inside_accelerator: int, |
|
|
gradient_accumulation_steps: int, num_processes: int |
|
|
): |
|
|
""" |
|
|
Convert "outside" steps (as observed in wandb logger or similar context) |
|
|
to the corresponding number of "inside" steps (for accelerate lr scheduler). |
|
|
|
|
|
Specifically, accelerate lr scheduler call `step()` `num_processes` times for |
|
|
every `gradient_accumulation_steps` outside steps. |
|
|
|
|
|
Args: |
|
|
outside_steps: |
|
|
The total number of steps counted outside accelerate context. |
|
|
dataloader_one_pass_outside_steps: |
|
|
The number of steps (batches) to complete one pass of the dataloader |
|
|
outside accelerate. |
|
|
dataloader_one_pass_steps_inside_accelerator: |
|
|
The number of `lr_scheduler.step()` calls inside accelerate, calculated via |
|
|
`get_dataloader_one_pass_steps_inside_accelerator`. |
|
|
gradient_accumulation_steps: |
|
|
The number of steps to accumulate gradients. |
|
|
num_processes: |
|
|
The number of parallel processes (GPUs) used in distributed training. |
|
|
|
|
|
Returns: |
|
|
int: The total number of `lr_scheduler.step()` calls inside accelerate that |
|
|
correspond to the given `outside_steps`. |
|
|
""" |
|
|
num_dataloader_epochs_passed = outside_steps // dataloader_one_pass_outside_steps |
|
|
remaining_outside_steps = outside_steps % dataloader_one_pass_outside_steps |
|
|
remaining_inside_accelerator_steps = ( |
|
|
remaining_outside_steps // gradient_accumulation_steps * num_processes |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
total_steps = ( |
|
|
num_dataloader_epochs_passed* |
|
|
dataloader_one_pass_steps_inside_accelerator + |
|
|
remaining_inside_accelerator_steps |
|
|
) |
|
|
return total_steps |
|
|
|
|
|
|
|
|
def lr_scheduler_param_adapter( |
|
|
config_dict: dict[str, Any], num_training_steps: int, num_warmup_steps: int |
|
|
) -> dict[str, Any]: |
|
|
target_class = config_dict["_target_"] |
|
|
return_dict = copy.deepcopy(config_dict) |
|
|
if target_class == "transformers.get_scheduler": |
|
|
return_dict.update({ |
|
|
"num_training_steps": num_training_steps, |
|
|
"num_warmup_steps": num_warmup_steps |
|
|
}) |
|
|
|
|
|
return return_dict |
|
|
|