| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| import math |
| from dataclasses import asdict, dataclass |
| from typing import Callable |
|
|
| import torch |
|
|
| from ..utils import get_logger |
| from ..utils.torch_utils import unwrap_module |
| from ._common import ( |
| _ALL_TRANSFORMER_BLOCK_IDENTIFIERS, |
| _ATTENTION_CLASSES, |
| _FEEDFORWARD_CLASSES, |
| _get_submodule_from_fqn, |
| ) |
| from ._helpers import AttentionProcessorRegistry, TransformerBlockRegistry |
| from .hooks import HookRegistry, ModelHook |
|
|
|
|
| logger = get_logger(__name__) |
|
|
| _LAYER_SKIP_HOOK = "layer_skip_hook" |
|
|
|
|
| |
| |
| @dataclass |
| class LayerSkipConfig: |
| r""" |
| Configuration for skipping internal transformer blocks when executing a transformer model. |
| |
| Args: |
| indices (`list[int]`): |
| The indices of the layer to skip. This is typically the first layer in the transformer block. |
| fqn (`str`, defaults to `"auto"`): |
| The fully qualified name identifying the stack of transformer blocks. Typically, this is |
| `transformer_blocks`, `single_transformer_blocks`, `blocks`, `layers`, or `temporal_transformer_blocks`. |
| For automatic detection, set this to `"auto"`. "auto" only works on DiT models. For UNet models, you must |
| provide the correct fqn. |
| skip_attention (`bool`, defaults to `True`): |
| Whether to skip attention blocks. |
| skip_ff (`bool`, defaults to `True`): |
| Whether to skip feed-forward blocks. |
| skip_attention_scores (`bool`, defaults to `False`): |
| Whether to skip attention score computation in the attention blocks. This is equivalent to using `value` |
| projections as the output of scaled dot product attention. |
| dropout (`float`, defaults to `1.0`): |
| The dropout probability for dropping the outputs of the skipped layers. By default, this is set to `1.0`, |
| meaning that the outputs of the skipped layers are completely ignored. If set to `0.0`, the outputs of the |
| skipped layers are fully retained, which is equivalent to not skipping any layers. |
| """ |
|
|
| indices: list[int] |
| fqn: str = "auto" |
| skip_attention: bool = True |
| skip_attention_scores: bool = False |
| skip_ff: bool = True |
| dropout: float = 1.0 |
|
|
| def __post_init__(self): |
| if not (0 <= self.dropout <= 1): |
| raise ValueError(f"Expected `dropout` to be between 0.0 and 1.0, but got {self.dropout}.") |
| if not math.isclose(self.dropout, 1.0) and self.skip_attention_scores: |
| raise ValueError( |
| "Cannot set `skip_attention_scores` to True when `dropout` is not 1.0. Please set `dropout` to 1.0." |
| ) |
|
|
| def to_dict(self): |
| return asdict(self) |
|
|
| @staticmethod |
| def from_dict(data: dict) -> "LayerSkipConfig": |
| return LayerSkipConfig(**data) |
|
|
|
|
| class AttentionScoreSkipFunctionMode(torch.overrides.TorchFunctionMode): |
| def __torch_function__(self, func, types, args=(), kwargs=None): |
| if kwargs is None: |
| kwargs = {} |
| if func is torch.nn.functional.scaled_dot_product_attention: |
| query = kwargs.get("query", None) |
| key = kwargs.get("key", None) |
| value = kwargs.get("value", None) |
| query = query if query is not None else args[0] |
| key = key if key is not None else args[1] |
| value = value if value is not None else args[2] |
| |
| |
| |
| |
| |
| if query.shape[2] == value.shape[2]: |
| return value |
| return func(*args, **kwargs) |
|
|
|
|
| class AttentionProcessorSkipHook(ModelHook): |
| def __init__(self, skip_processor_output_fn: Callable, skip_attention_scores: bool = False, dropout: float = 1.0): |
| self.skip_processor_output_fn = skip_processor_output_fn |
| self.skip_attention_scores = skip_attention_scores |
| self.dropout = dropout |
|
|
| def new_forward(self, module: torch.nn.Module, *args, **kwargs): |
| if self.skip_attention_scores: |
| if not math.isclose(self.dropout, 1.0): |
| raise ValueError( |
| "Cannot set `skip_attention_scores` to True when `dropout` is not 1.0. Please set `dropout` to 1.0." |
| ) |
| with AttentionScoreSkipFunctionMode(): |
| output = self.fn_ref.original_forward(*args, **kwargs) |
| else: |
| if math.isclose(self.dropout, 1.0): |
| output = self.skip_processor_output_fn(module, *args, **kwargs) |
| else: |
| output = self.fn_ref.original_forward(*args, **kwargs) |
| output = torch.nn.functional.dropout(output, p=self.dropout) |
| return output |
|
|
|
|
| class FeedForwardSkipHook(ModelHook): |
| def __init__(self, dropout: float): |
| super().__init__() |
| self.dropout = dropout |
|
|
| def new_forward(self, module: torch.nn.Module, *args, **kwargs): |
| if math.isclose(self.dropout, 1.0): |
| output = kwargs.get("hidden_states", None) |
| if output is None: |
| output = kwargs.get("x", None) |
| if output is None and len(args) > 0: |
| output = args[0] |
| else: |
| output = self.fn_ref.original_forward(*args, **kwargs) |
| output = torch.nn.functional.dropout(output, p=self.dropout) |
| return output |
|
|
|
|
| class TransformerBlockSkipHook(ModelHook): |
| def __init__(self, dropout: float): |
| super().__init__() |
| self.dropout = dropout |
|
|
| def initialize_hook(self, module): |
| self._metadata = TransformerBlockRegistry.get(unwrap_module(module).__class__) |
| return module |
|
|
| def new_forward(self, module: torch.nn.Module, *args, **kwargs): |
| if math.isclose(self.dropout, 1.0): |
| original_hidden_states = self._metadata._get_parameter_from_args_kwargs("hidden_states", args, kwargs) |
| if self._metadata.return_encoder_hidden_states_index is None: |
| output = original_hidden_states |
| else: |
| original_encoder_hidden_states = self._metadata._get_parameter_from_args_kwargs( |
| "encoder_hidden_states", args, kwargs |
| ) |
| output = (original_hidden_states, original_encoder_hidden_states) |
| else: |
| output = self.fn_ref.original_forward(*args, **kwargs) |
| output = torch.nn.functional.dropout(output, p=self.dropout) |
| return output |
|
|
|
|
| def apply_layer_skip(module: torch.nn.Module, config: LayerSkipConfig) -> None: |
| r""" |
| Apply layer skipping to internal layers of a transformer. |
| |
| Args: |
| module (`torch.nn.Module`): |
| The transformer model to which the layer skip hook should be applied. |
| config (`LayerSkipConfig`): |
| The configuration for the layer skip hook. |
| |
| Example: |
| |
| ```python |
| >>> from diffusers import apply_layer_skip_hook, CogVideoXTransformer3DModel, LayerSkipConfig |
| |
| >>> transformer = CogVideoXTransformer3DModel.from_pretrained("THUDM/CogVideoX-5b", torch_dtype=torch.bfloat16) |
| >>> config = LayerSkipConfig(layer_index=[10, 20], fqn="transformer_blocks") |
| >>> apply_layer_skip_hook(transformer, config) |
| ``` |
| """ |
| _apply_layer_skip_hook(module, config) |
|
|
|
|
| def _apply_layer_skip_hook(module: torch.nn.Module, config: LayerSkipConfig, name: str | None = None) -> None: |
| name = name or _LAYER_SKIP_HOOK |
|
|
| if config.skip_attention and config.skip_attention_scores: |
| raise ValueError("Cannot set both `skip_attention` and `skip_attention_scores` to True. Please choose one.") |
| if not math.isclose(config.dropout, 1.0) and config.skip_attention_scores: |
| raise ValueError( |
| "Cannot set `skip_attention_scores` to True when `dropout` is not 1.0. Please set `dropout` to 1.0." |
| ) |
|
|
| if config.fqn == "auto": |
| for identifier in _ALL_TRANSFORMER_BLOCK_IDENTIFIERS: |
| if hasattr(module, identifier): |
| config.fqn = identifier |
| break |
| else: |
| raise ValueError( |
| "Could not find a suitable identifier for the transformer blocks automatically. Please provide a valid " |
| "`fqn` (fully qualified name) that identifies a stack of transformer blocks." |
| ) |
|
|
| transformer_blocks = _get_submodule_from_fqn(module, config.fqn) |
| if transformer_blocks is None or not isinstance(transformer_blocks, torch.nn.ModuleList): |
| raise ValueError( |
| f"Could not find {config.fqn} in the provided module, or configured `fqn` (fully qualified name) does not identify " |
| f"a `torch.nn.ModuleList`. Please provide a valid `fqn` that identifies a stack of transformer blocks." |
| ) |
| if len(config.indices) == 0: |
| raise ValueError("Layer index list is empty. Please provide a non-empty list of layer indices to skip.") |
|
|
| blocks_found = False |
| for i, block in enumerate(transformer_blocks): |
| if i not in config.indices: |
| continue |
|
|
| blocks_found = True |
|
|
| if config.skip_attention and config.skip_ff: |
| logger.debug(f"Applying TransformerBlockSkipHook to '{config.fqn}.{i}'") |
| registry = HookRegistry.check_if_exists_or_initialize(block) |
| hook = TransformerBlockSkipHook(config.dropout) |
| registry.register_hook(hook, name) |
|
|
| elif config.skip_attention or config.skip_attention_scores: |
| for submodule_name, submodule in block.named_modules(): |
| if isinstance(submodule, _ATTENTION_CLASSES) and not submodule.is_cross_attention: |
| logger.debug(f"Applying AttentionProcessorSkipHook to '{config.fqn}.{i}.{submodule_name}'") |
| output_fn = AttentionProcessorRegistry.get(submodule.processor.__class__).skip_processor_output_fn |
| registry = HookRegistry.check_if_exists_or_initialize(submodule) |
| hook = AttentionProcessorSkipHook(output_fn, config.skip_attention_scores, config.dropout) |
| registry.register_hook(hook, name) |
|
|
| if config.skip_ff: |
| for submodule_name, submodule in block.named_modules(): |
| if isinstance(submodule, _FEEDFORWARD_CLASSES): |
| logger.debug(f"Applying FeedForwardSkipHook to '{config.fqn}.{i}.{submodule_name}'") |
| registry = HookRegistry.check_if_exists_or_initialize(submodule) |
| hook = FeedForwardSkipHook(config.dropout) |
| registry.register_hook(hook, name) |
|
|
| if not blocks_found: |
| raise ValueError( |
| f"Could not find any transformer blocks matching the provided indices {config.indices} and " |
| f"fully qualified name '{config.fqn}'. Please check the indices and fqn for correctness." |
| ) |
|
|