| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| from types import MethodType |
|
|
| import torch.nn as nn |
|
|
| from .imports import is_hpu_available, is_transformer_engine_available |
| from .operations import GatheredParameters |
|
|
|
|
| |
|
|
|
|
| def convert_model(model, to_transformer_engine=True, _convert_linear=True, _convert_ln=True): |
| """ |
| Recursively converts the linear and layernorm layers of a model to their `transformers_engine` counterpart. |
| """ |
| if not is_transformer_engine_available(): |
| raise ImportError("Using `convert_model` requires transformer_engine to be installed.") |
|
|
| if is_hpu_available(): |
| import intel_transformer_engine as te |
|
|
| if not hasattr(te, "LayerNorm"): |
| |
| te.LayerNorm = nn.LayerNorm |
| else: |
| import transformer_engine.pytorch as te |
|
|
| for name, module in model.named_children(): |
| if isinstance(module, nn.Linear) and to_transformer_engine and _convert_linear: |
| has_bias = module.bias is not None |
| params_to_gather = [module.weight] |
| if has_bias: |
| params_to_gather.append(module.bias) |
|
|
| with GatheredParameters(params_to_gather, modifier_rank=0): |
| if any(p % 16 != 0 for p in module.weight.shape): |
| return |
| te_module = te.Linear( |
| module.in_features, module.out_features, bias=has_bias, params_dtype=module.weight.dtype |
| ) |
| te_module.weight.copy_(module.weight) |
| if has_bias: |
| te_module.bias.copy_(module.bias) |
|
|
| setattr(model, name, te_module) |
| |
| elif isinstance(module, nn.LayerNorm) and to_transformer_engine and _convert_ln: |
| with GatheredParameters([module.weight, module.bias], modifier_rank=0): |
| te_module = te.LayerNorm(module.normalized_shape[0], eps=module.eps, params_dtype=module.weight.dtype) |
| te_module.weight.copy_(module.weight) |
| te_module.bias.copy_(module.bias) |
|
|
| setattr(model, name, te_module) |
| elif isinstance(module, te.Linear) and not to_transformer_engine and _convert_linear: |
| has_bias = module.bias is not None |
| new_module = nn.Linear( |
| module.in_features, module.out_features, bias=has_bias, params_dtype=module.weight.dtype |
| ) |
| new_module.weight.copy_(module.weight) |
| if has_bias: |
| new_module.bias.copy_(module.bias) |
|
|
| setattr(model, name, new_module) |
| elif isinstance(module, te.LayerNorm) and not to_transformer_engine and _convert_ln: |
| new_module = nn.LayerNorm(module.normalized_shape[0], eps=module.eps, params_dtype=module.weight.dtype) |
| new_module.weight.copy_(module.weight) |
| new_module.bias.copy_(module.bias) |
|
|
| setattr(model, name, new_module) |
| else: |
| convert_model( |
| module, |
| to_transformer_engine=to_transformer_engine, |
| _convert_linear=_convert_linear, |
| _convert_ln=_convert_ln, |
| ) |
|
|
|
|
| def has_transformer_engine_layers(model): |
| """ |
| Returns whether a given model has some `transformer_engine` layer or not. |
| """ |
| if not is_transformer_engine_available(): |
| raise ImportError("Using `has_transformer_engine_layers` requires transformer_engine to be installed.") |
|
|
| if is_hpu_available(): |
| import intel_transformer_engine as te |
|
|
| module_cls_to_check = te.Linear |
| else: |
| import transformer_engine.pytorch as te |
|
|
| module_cls_to_check = (te.LayerNorm, te.Linear, te.TransformerLayer) |
|
|
| for m in model.modules(): |
| if isinstance(m, module_cls_to_check): |
| return True |
|
|
| return False |
|
|
|
|
| def contextual_fp8_autocast(model_forward, fp8_recipe, use_during_eval=False): |
| """ |
| Wrapper for a model's forward method to apply FP8 autocast. Is context aware, meaning that by default it will |
| disable FP8 autocast during eval mode, which is generally better for more accurate metrics. |
| """ |
| if not is_transformer_engine_available(): |
| raise ImportError("Using `contextual_fp8_autocast` requires transformer_engine to be installed.") |
|
|
| if is_hpu_available(): |
| from intel_transformer_engine import fp8_autocast |
| else: |
| from transformer_engine.pytorch import fp8_autocast |
|
|
| def forward(self, *args, **kwargs): |
| enabled = use_during_eval or self.training |
| with fp8_autocast(enabled=enabled, fp8_recipe=fp8_recipe): |
| return model_forward(*args, **kwargs) |
|
|
| |
| forward.__wrapped__ = model_forward |
|
|
| return forward |
|
|
|
|
| def apply_fp8_autowrap(model, fp8_recipe_handler): |
| """ |
| Applies FP8 context manager to the model's forward method |
| """ |
| if not is_transformer_engine_available(): |
| raise ImportError("Using `apply_fp8_autowrap` requires transformer_engine to be installed.") |
|
|
| if is_hpu_available(): |
| import intel_transformer_engine.recipe as te_recipe |
| else: |
| import transformer_engine.common.recipe as te_recipe |
|
|
| kwargs = fp8_recipe_handler.to_kwargs() if fp8_recipe_handler is not None else {} |
| if "fp8_format" in kwargs: |
| kwargs["fp8_format"] = getattr(te_recipe.Format, kwargs["fp8_format"]) |
| use_during_eval = kwargs.pop("use_autocast_during_eval", False) |
| fp8_recipe = te_recipe.DelayedScaling(**kwargs) |
| new_forward = contextual_fp8_autocast(model.forward, fp8_recipe, use_during_eval) |
|
|
| if hasattr(model.forward, "__func__"): |
| model.forward = MethodType(new_forward, model) |
| else: |
| model.forward = new_forward |
|
|
| return model |
|
|