| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| from typing import Callable, Optional, Tuple |
|
|
| import torch |
| from torch import nn |
|
|
| import torch_npu |
| from torch_npu.contrib import transfer_to_npu |
| if "910" in torch.npu.get_device_name(): |
| NPU_ATTN_INFR = True |
| print("[INFO] torch_npu detected. Using NPU fused infer attention.") |
| else: |
| NPU_ATTN_INFR = False |
|
|
| from transformers.cache_utils import Cache |
| from transformers.modeling_flash_attention_utils import FlashAttentionKwargs |
| from transformers.modeling_utils import ALL_ATTENTION_FUNCTIONS |
| from transformers.processing_utils import Unpack |
| from transformers.utils import logging |
| from transformers.models.llama.modeling_llama import ( |
| LlamaAttention, |
| LlamaDecoderLayer, |
| LlamaForCausalLM, |
| LlamaForSequenceClassification, |
| LlamaMLP, |
| LlamaModel, |
| apply_rotary_pos_emb, |
| eager_attention_forward, |
| ) |
| from .configuration_openpangu_dense import PanguEmbeddedConfig |
|
|
|
|
| logger = logging.get_logger(__name__) |
|
|
|
|
| class PanguEmbeddedMLP(LlamaMLP): |
| def __init__(self, config): |
| super().__init__(config) |
| self.gate_proj = nn.Linear(self.hidden_size, self.intermediate_size, bias=False) |
| self.up_proj = nn.Linear(self.hidden_size, self.intermediate_size, bias=False) |
| self.down_proj = nn.Linear(self.intermediate_size, self.hidden_size, bias=False) |
|
|
|
|
| class PanguEmbeddedAttention(LlamaAttention): |
| def __init__(self, config: PanguEmbeddedConfig, layer_idx: int): |
| super().__init__() |
| self.config = config |
| self.layer_idx = layer_idx |
| self.head_dim = getattr(config, "head_dim", config.hidden_size // config.num_attention_heads) |
| self.num_heads = config.num_attention_heads |
| self.num_key_value_heads = config.num_key_value_heads |
| self.num_key_value_groups = config.num_attention_heads // config.num_key_value_heads |
| self.scaling = self.head_dim**-0.5 |
| self.attention_dropout = config.attention_dropout |
| self.is_causal = True |
|
|
| self.q_proj = nn.Linear(config.hidden_size, config.num_attention_heads * self.head_dim, bias=config.bias) |
| self.k_proj = nn.Linear(config.hidden_size, config.num_key_value_heads * self.head_dim, bias=config.bias) |
| self.v_proj = nn.Linear(config.hidden_size, config.num_key_value_heads * self.head_dim, bias=config.bias) |
| self.o_proj = nn.Linear(config.num_attention_heads * self.head_dim, config.hidden_size, bias=config.bias) |
|
|
| def forward( |
| self, |
| hidden_states: torch.Tensor, |
| position_embeddings: tuple[torch.Tensor, torch.Tensor], |
| attention_mask: Optional[torch.Tensor], |
| past_key_value: Optional[Cache] = None, |
| cache_position: Optional[torch.LongTensor] = None, |
| **kwargs: Unpack[FlashAttentionKwargs], |
| ) -> tuple[torch.Tensor, Optional[torch.Tensor], Optional[tuple[torch.Tensor]]]: |
| input_shape = hidden_states.shape[:-1] |
| hidden_shape = (*input_shape, -1, self.head_dim) |
|
|
| query_states = self.q_proj(hidden_states).view(hidden_shape).transpose(1, 2) |
| key_states = self.k_proj(hidden_states).view(hidden_shape).transpose(1, 2) |
| value_states = self.v_proj(hidden_states).view(hidden_shape).transpose(1, 2) |
|
|
| cos, sin = position_embeddings |
| query_states, key_states = apply_rotary_pos_emb(query_states, key_states, cos, sin) |
|
|
| if past_key_value is not None: |
| |
| cache_kwargs = {"sin": sin, "cos": cos, "cache_position": cache_position} |
| key_states, value_states = past_key_value.update(key_states, value_states, self.layer_idx, cache_kwargs) |
|
|
| attention_interface: Callable = eager_attention_forward |
| if self.config._attn_implementation != "eager": |
| attention_interface = ALL_ATTENTION_FUNCTIONS[self.config._attn_implementation] |
| |
| if not self.training and NPU_ATTN_INFR: |
| q_len = input_shape[1] |
| if attention_mask is not None: |
| attention_mask = ~attention_mask.bool() |
| elif q_len > 1: |
| attention_mask = torch.triu(torch.ones([q_len, q_len]), diagonal=1).bool().unsqueeze(0).unsqueeze(0).to(query_states.device) |
|
|
| attn_output, _ = torch_npu.npu_fused_infer_attention_score( |
| query_states, key_states, value_states, |
| num_heads=self.num_heads, num_key_value_heads=self.num_key_value_heads, |
| input_layout="BNSD", atten_mask=attention_mask, scale=self.scaling) |
| attn_output = attn_output.transpose(1, 2) |
| attn_weights = None |
| else: |
| attn_output, attn_weights = attention_interface( |
| self, |
| query_states, |
| key_states, |
| value_states, |
| attention_mask, |
| dropout=0.0 if not self.training else self.attention_dropout, |
| scaling=self.scaling, |
| **kwargs, |
| ) |
|
|
| attn_output = attn_output.reshape(*input_shape, -1).contiguous() |
| attn_output = self.o_proj(attn_output) |
| return attn_output, attn_weights |
| |
|
|
| class PanguEmbeddedDecoderLayer(LlamaDecoderLayer): |
| pass |
| |
|
|
| class PanguEmbeddedModel(LlamaModel): |
| pass |
|
|
|
|
| class PanguEmbeddedForCausalLM(LlamaForCausalLM): |
| pass |