| | import math |
| | import torch |
| | from torch import nn |
| |
|
| |
|
| | class AddNorm(nn.Module): |
| | """残差连接后进行层归一化""" |
| |
|
| | def __init__(self, normalized, dropout): |
| | super(AddNorm, self).__init__() |
| | self.dropout = nn.Dropout(dropout) |
| | self.ln = nn.LayerNorm(normalized) |
| |
|
| | def forward(self, x, y): |
| | return self.ln(x + self.dropout(y)) |
| |
|
| |
|
| | class PositionWiseFFN(nn.Module): |
| | """基于位置的前馈⽹络""" |
| |
|
| | def __init__(self, ffn_input, ffn_hiddens,mlp_bias=True): |
| | super(PositionWiseFFN, self).__init__() |
| | self.ffn = nn.Sequential( |
| | nn.Linear(ffn_input, ffn_hiddens, bias=mlp_bias), |
| | nn.ReLU(), |
| | nn.Linear(ffn_hiddens, ffn_input, bias=mlp_bias), |
| | ) |
| |
|
| | def forward(self, x): |
| | return self.ffn(x) |
| |
|
| | from torch.autograd import Variable |
| | class PositionalEncoding1(nn.Module): |
| | "Implement the PE function." |
| | def __init__(self, d_model, dropout, max_len=5000): |
| | super(PositionalEncoding1, self).__init__() |
| | self.dropout = nn.Dropout(p=dropout) |
| | |
| | |
| | pe = torch.zeros(max_len, d_model) |
| | position = torch.arange(0, max_len).unsqueeze(1) |
| | div_term = torch.exp(torch.arange(0, d_model, 2) * |
| | -(math.log(10000.0) / d_model)) |
| | pe[:, 0::2] = torch.sin(position * div_term) |
| | pe[:, 1::2] = torch.cos(position * div_term) |
| | pe = pe.unsqueeze(0) |
| | self.register_buffer('pe', pe) |
| | |
| | def forward(self, x): |
| | x = x + Variable(self.pe[:, :x.size(1)], |
| | requires_grad=False) |
| | return self.dropout(x) |
| | |
| | class PositionalEncoding(nn.Module): |
| | """位置编码""" |
| |
|
| | def __init__(self, num_hiddens, dropout, max_len=1000): |
| | super(PositionalEncoding, self).__init__() |
| | self.dropout = nn.Dropout(dropout) |
| | |
| | self.P = torch.zeros((1, max_len, num_hiddens)) |
| | X = torch.arange(max_len, dtype=torch.float32).reshape(-1, 1) / torch.pow(10000, torch.arange(0, num_hiddens, 2, |
| | dtype=torch.float32) / num_hiddens) |
| | self.P[:, :, 0::2] = torch.sin(X) |
| | self.P[:, :, 1::2] = torch.cos(X) |
| |
|
| | def forward(self, X): |
| | X = X + self.P[:, :X.shape[1], :].to(X.device) |
| | return self.dropout(X) |
| |
|
| |
|
| | class AttentionEncode(nn.Module): |
| |
|
| | def __init__(self, dropout, embedding_size, num_heads,seq_len: int=40,ffn=False): |
| | super(AttentionEncode, self).__init__() |
| | self.dropout = dropout |
| | self.embedding_size = embedding_size |
| | self.num_heads = num_heads |
| | self.seq_len = seq_len |
| | self.is_ffn = ffn |
| | |
| | self.att = nn.MultiheadAttention(embed_dim=self.embedding_size, |
| | num_heads=num_heads, |
| | dropout=0.6 |
| | ) |
| | |
| | self.addNorm = AddNorm(normalized=[self.seq_len, self.embedding_size], dropout=self.dropout) |
| |
|
| | self.FFN = PositionWiseFFN(ffn_input=self.embedding_size, ffn_hiddens=self.embedding_size*2) |
| |
|
| | def forward(self, x): |
| | bs,_,_ = x.size() |
| | MHAtt, _ = self.att(x, x, x) |
| | MHAtt_encode = self.addNorm(x, MHAtt) |
| |
|
| | if self.is_ffn: |
| | ffn_in = MHAtt_encode |
| | ffn_out = self.FFN(ffn_in) |
| | MHAtt_encode = self.addNorm(ffn_in,ffn_out) |
| |
|
| | return MHAtt_encode |
| |
|
| |
|
| | class FAN_encode(nn.Module): |
| |
|
| | def __init__(self, dropout, shape): |
| | super(FAN_encode, self).__init__() |
| | self.dropout = dropout |
| | self.addNorm = AddNorm(normalized=[1, shape], dropout=self.dropout) |
| | self.FFN = PositionWiseFFN(ffn_input=shape, ffn_hiddens=(2*shape)) |
| | self.ln = nn.LayerNorm(shape) |
| |
|
| | def forward(self, x): |
| | |
| | ffn_out = self.FFN(x) |
| | encode_output = self.addNorm(x, ffn_out) |
| |
|
| | return encode_output |
| |
|
| | class ffn_norm(nn.Module): |
| | |
| | def __init__(self,input_dims:int,hidden_dims:int,dropout:float,bias:bool=True): |
| | super(ffn_norm,self).__init__() |
| |
|
| | self.inps_dims = input_dims |
| | self.hidden_dims = hidden_dims |
| | self.dropout = nn.Dropout(dropout) |
| | self.ffn_bias = bias |
| | self.ffn = nn.Sequential( |
| | nn.Linear(self.inps_dims, self.hidden_dims, bias=self.ffn_bias), |
| | nn.LeakyReLU(), |
| | nn.Linear(self.hidden_dims, self.inps_dims, bias=self.ffn_bias), |
| | ) |
| |
|
| | self.ln = nn.LayerNorm(self.inps_dims) |
| | |
| | def forward(self,x): |
| | |
| | ffn_out = self.ffn(x) |
| | norm_out = self.ln(x + self.dropout(ffn_out)) |
| |
|
| | return norm_out |
| |
|
| |
|
| | def sequence_mask(X, valid_len, value=0.): |
| | """在序列中屏蔽不相关的项""" |
| | valid_len = valid_len.float() |
| | MaxLen = X.size(1) |
| | mask = torch.arange(MaxLen, dtype=torch.float32, device=X.device)[None, :] < valid_len[:, None].to(X.device) |
| | X[~mask] = value |
| | return X |
| |
|
| |
|
| | def masked_softmax(X, valid_lens): |
| | """通过在最后⼀个轴上掩蔽元素来执⾏softmax操作""" |
| | |
| | if valid_lens is None: |
| | return nn.functional.softmax(X, dim=-1) |
| | else: |
| | shape = X.shape |
| | if valid_lens.dim() == 1: |
| | valid_lens = torch.repeat_interleave(valid_lens, shape[1]) |
| | else: |
| | valid_lens = valid_lens.reshape(-1) |
| | X = sequence_mask(X.reshape(-1, shape[-1]), valid_lens, value=-1e6) |
| | return nn.functional.softmax(X.reshape(shape), dim=-1) |
| |
|
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| |
|
| |
|
| | class AdditiveAttention(nn.Module): |
| | """注意⼒机制""" |
| |
|
| | def __init__(self, input_size, value_size, num_hiddens, dropout): |
| | super(AdditiveAttention, self).__init__() |
| | self.W_k = nn.Linear(input_size, num_hiddens, bias=False) |
| | self.W_q = nn.Linear(input_size, num_hiddens, bias=False) |
| | self.w_v = nn.Linear(input_size, num_hiddens, bias=False) |
| | self.w_o = nn.Linear(50, value_size, bias=False) |
| | self.dropout = nn.Dropout(dropout) |
| |
|
| | def forward(self, queries, keys, values, valid_lens=None): |
| | queries, keys = self.W_q(queries), self.W_k(keys) |
| | d = queries.shape[-1] |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| |
|
| | scores = torch.bmm(queries, keys.transpose(1, 2)) / math.sqrt(d) |
| | scores = self.w_o(scores).permute(0, 2, 1) |
| | attention_weights = masked_softmax(scores, valid_lens) |
| |
|
| | |
| | values = self.w_v(values) |
| | |
| | |
| | return torch.bmm(self.dropout(attention_weights), values), attention_weights |
| |
|
| |
|
| | class MultiHeadAttention(nn.Module): |
| | """多头注意力""" |
| |
|
| | def __init__(self, key_size, query_size, value_size, num_hiddens, |
| | num_heads, dropout, bias=False): |
| | super(MultiHeadAttention, self).__init__() |
| | self.num_heads = num_heads |
| | self.attention = DotProductAttention(dropout) |
| | self.W_q = nn.Linear(query_size, num_hiddens, bias=bias) |
| | self.W_k = nn.Linear(key_size, num_hiddens, bias=bias) |
| | self.W_v = nn.Linear(value_size, num_hiddens, bias=bias) |
| | self.W_o = nn.Linear(num_hiddens, num_hiddens, bias=bias) |
| |
|
| | def forward(self, queries, keys, values, valid_lens=None): |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | queries = transpose_qkv(self.W_q(queries), self.num_heads) |
| | keys = transpose_qkv(self.W_k(keys), self.num_heads) |
| | values = transpose_qkv(self.W_v(values), self.num_heads) |
| |
|
| | if valid_lens is not None: |
| | |
| | |
| | valid_lens = torch.repeat_interleave(valid_lens, repeats=self.num_heads, dim=0) |
| |
|
| | |
| | output = self.attention(queries, keys, values, valid_lens) |
| |
|
| | |
| | output_concat = transpose_output(output, self.num_heads) |
| | return self.W_o(output_concat) |
| |
|
| |
|
| | def transpose_qkv(X, num_heads): |
| | """为了多注意力头的并行计算而变换形状""" |
| | |
| | |
| | |
| | X = X.reshape(X.shape[0], X.shape[1], num_heads, -1) |
| |
|
| | |
| | |
| | X = X.permute(0, 2, 1, 3) |
| |
|
| | |
| | |
| | return X.reshape(-1, X.shape[2], X.shape[3]) |
| |
|
| |
|
| | def transpose_output(X, num_heads): |
| | """逆转transpose_qkv函数的操作""" |
| | X = X.reshape(-1, num_heads, X.shape[1], X.shape[2]) |
| | X = X.permute(0, 2, 1, 3) |
| | return X.reshape(X.shape[0], X.shape[1], -1) |
| |
|
| |
|
| | class DotProductAttention(nn.Module): |
| | """缩放点积注意力""" |
| |
|
| | def __init__(self, dropout): |
| | super(DotProductAttention, self).__init__() |
| | self.dropout = nn.Dropout(dropout) |
| |
|
| | |
| | |
| | |
| | |
| | def forward(self, queries, keys, values, valid_lens=None): |
| | d = queries.shape[-1] |
| | |
| | scores = torch.bmm(queries, keys.transpose(1, 2)) / math.sqrt(d) |
| | attention_weights = masked_softmax(scores, valid_lens) |
| | return torch.bmm(self.dropout(attention_weights), values) |
| |
|
| |
|
| | class MASK_AttentionEncode(nn.Module): |
| |
|
| | def __init__(self, dropout, embedding_size, num_heads): |
| | super(MASK_AttentionEncode, self).__init__() |
| | self.dropout = dropout |
| | self.embedding_size = embedding_size |
| | self.num_heads = num_heads |
| |
|
| | self.at1 = MultiHeadAttention(key_size=self.embedding_size, |
| | query_size=self.embedding_size, |
| | value_size=self.embedding_size, |
| | num_hiddens=self.embedding_size, |
| | num_heads=self.num_heads, |
| | dropout=self.dropout) |
| | self.addNorm = AddNorm(normalized=[50, self.embedding_size], dropout=self.dropout) |
| |
|
| | self.FFN = PositionWiseFFN(ffn_num_input=64, ffn_num_hiddens=192, ffn_num_outputs=64) |
| |
|
| | def forward(self, x, y=None): |
| | |
| | Multi = self.at1(x, x, x, y) |
| | Multi_encode = self.addNorm(x, Multi) |
| |
|
| | |
| |
|
| | return Multi_encode |
| |
|
| |
|
| | class transformer_encode(nn.Module): |
| |
|
| | def __init__(self, dropout, embedding, num_heads): |
| | super(transformer_encode, self).__init__() |
| | self.dropout = dropout |
| | self.embedding_size = embedding |
| | self.num_heads = num_heads |
| | self.attention = nn.MultiheadAttention(embed_dim=192, |
| | num_heads=8, |
| | dropout=0.6 |
| | ) |
| | self.at1 = MultiHeadAttention(key_size=self.embedding_size, |
| | query_size=self.embedding_size, |
| | value_size=self.embedding_size, |
| | num_hiddens=self.embedding_size, |
| | num_heads=self.num_heads, |
| | dropout=self.dropout) |
| |
|
| | self.addNorm = AddNorm(normalized=[50, self.embedding_size], dropout=self.dropout) |
| |
|
| | self.ffn = PositionWiseFFN(ffn_num_input=self.embedding_size, ffn_num_hiddens=2*self.embedding_size, |
| | ffn_num_outputs=self.embedding_size) |
| |
|
| | def forward(self, x, valid=None): |
| | |
| | Multi = self.at1(x, x, x, valid) |
| | Multi_encode = self.addNorm(x, Multi) |
| |
|
| | encode_output = self.addNorm(Multi_encode, self.ffn(Multi_encode)) |
| |
|
| | return encode_output |
| |
|