import math import torch from torch import nn class AddNorm(nn.Module): """残差连接后进行层归一化""" def __init__(self, normalized, dropout): super(AddNorm, self).__init__() self.dropout = nn.Dropout(dropout) self.ln = nn.LayerNorm(normalized) def forward(self, x, y): return self.ln(x + self.dropout(y)) class PositionWiseFFN(nn.Module): """基于位置的前馈⽹络""" def __init__(self, ffn_input, ffn_hiddens,mlp_bias=True): super(PositionWiseFFN, self).__init__() self.ffn = nn.Sequential( nn.Linear(ffn_input, ffn_hiddens, bias=mlp_bias), nn.ReLU(), nn.Linear(ffn_hiddens, ffn_input, bias=mlp_bias), ) def forward(self, x): return self.ffn(x) from torch.autograd import Variable class PositionalEncoding1(nn.Module): "Implement the PE function." def __init__(self, d_model, dropout, max_len=5000): super(PositionalEncoding1, self).__init__() self.dropout = nn.Dropout(p=dropout) # Compute the positional encodings once in log space. pe = torch.zeros(max_len, d_model) position = torch.arange(0, max_len).unsqueeze(1) div_term = torch.exp(torch.arange(0, d_model, 2) * -(math.log(10000.0) / d_model)) pe[:, 0::2] = torch.sin(position * div_term) pe[:, 1::2] = torch.cos(position * div_term) pe = pe.unsqueeze(0) self.register_buffer('pe', pe) def forward(self, x): x = x + Variable(self.pe[:, :x.size(1)], requires_grad=False) return self.dropout(x) class PositionalEncoding(nn.Module): """位置编码""" def __init__(self, num_hiddens, dropout, max_len=1000): super(PositionalEncoding, self).__init__() self.dropout = nn.Dropout(dropout) # 创建⼀个⾜够⻓的P self.P = torch.zeros((1, max_len, num_hiddens)) X = torch.arange(max_len, dtype=torch.float32).reshape(-1, 1) / torch.pow(10000, torch.arange(0, num_hiddens, 2, dtype=torch.float32) / num_hiddens) self.P[:, :, 0::2] = torch.sin(X) self.P[:, :, 1::2] = torch.cos(X) def forward(self, X): X = X + self.P[:, :X.shape[1], :].to(X.device) return self.dropout(X) class AttentionEncode(nn.Module): def __init__(self, dropout, embedding_size, num_heads,seq_len: int=40,ffn=False): super(AttentionEncode, self).__init__() self.dropout = dropout self.embedding_size = embedding_size self.num_heads = num_heads self.seq_len = seq_len self.is_ffn = ffn self.att = nn.MultiheadAttention(embed_dim=self.embedding_size, num_heads=num_heads, dropout=0.6 ) self.addNorm = AddNorm(normalized=[self.seq_len, self.embedding_size], dropout=self.dropout) self.FFN = PositionWiseFFN(ffn_input=self.embedding_size, ffn_hiddens=self.embedding_size*2) def forward(self, x): bs,_,_ = x.size() MHAtt, _ = self.att(x, x, x) MHAtt_encode = self.addNorm(x, MHAtt) if self.is_ffn: ffn_in = MHAtt_encode # bs,seq_len,feat_dims ffn_out = self.FFN(ffn_in) MHAtt_encode = self.addNorm(ffn_in,ffn_out) return MHAtt_encode class FAN_encode(nn.Module): def __init__(self, dropout, shape): super(FAN_encode, self).__init__() self.dropout = dropout self.addNorm = AddNorm(normalized=[1, shape], dropout=self.dropout) self.FFN = PositionWiseFFN(ffn_input=shape, ffn_hiddens=(2*shape)) self.ln = nn.LayerNorm(shape) def forward(self, x): #x = self.ln(x) ffn_out = self.FFN(x) encode_output = self.addNorm(x, ffn_out) return encode_output class ffn_norm(nn.Module): # 可接受二维输入和一维输入 def __init__(self,input_dims:int,hidden_dims:int,dropout:float,bias:bool=True): super(ffn_norm,self).__init__() self.inps_dims = input_dims self.hidden_dims = hidden_dims self.dropout = nn.Dropout(dropout) self.ffn_bias = bias self.ffn = nn.Sequential( nn.Linear(self.inps_dims, self.hidden_dims, bias=self.ffn_bias), nn.LeakyReLU(), nn.Linear(self.hidden_dims, self.inps_dims, bias=self.ffn_bias), ) self.ln = nn.LayerNorm(self.inps_dims) def forward(self,x): # x:[B,S,H] OR [B,shape],shape:S*H ffn_out = self.ffn(x) norm_out = self.ln(x + self.dropout(ffn_out)) return norm_out def sequence_mask(X, valid_len, value=0.): """在序列中屏蔽不相关的项""" valid_len = valid_len.float() MaxLen = X.size(1) mask = torch.arange(MaxLen, dtype=torch.float32, device=X.device)[None, :] < valid_len[:, None].to(X.device) X[~mask] = value return X def masked_softmax(X, valid_lens): """通过在最后⼀个轴上掩蔽元素来执⾏softmax操作""" # X:3D张量,valid_lens:1D或2D张量 if valid_lens is None: return nn.functional.softmax(X, dim=-1) else: shape = X.shape if valid_lens.dim() == 1: valid_lens = torch.repeat_interleave(valid_lens, shape[1]) else: valid_lens = valid_lens.reshape(-1) # 最后⼀轴上被掩蔽的元素使⽤⼀个⾮常⼤的负值替换,从⽽其softmax输出为0 X = sequence_mask(X.reshape(-1, shape[-1]), valid_lens, value=-1e6) return nn.functional.softmax(X.reshape(shape), dim=-1) # class AdditiveAttention(nn.Module): # """加性注意⼒""" # # def __init__(self, key_size, query_size, num_hiddens, dropout): # super(AdditiveAttention, self).__init__() # self.W_k = nn.Linear(key_size, num_hiddens, bias=False) # self.W_q = nn.Linear(query_size, num_hiddens, bias=False) # self.w_v = nn.Linear(num_hiddens, 1, bias=False) # self.dropout = nn.Dropout(dropout) # # def forward(self, queries, keys, values, valid_lens): # queries, keys = self.W_q(queries), self.W_k(keys) # # 在维度扩展后, # # queries的形状:(batch_size,查询的个数,1,num_hidden) # # key的形状:(batch_size,1,“键-值”对的个数,num_hiddens) # # 使⽤⼴播⽅式进⾏求和 # features = queries.unsqueeze(2) + keys.unsqueeze(1) # features = torch.tanh(features) # # self.w_v仅有⼀个输出,因此从形状中移除最后那个维度。 # # scores的形状:(batch_size,查询的个数,“键-值”对的个数) # scores = self.w_v(features).squeeze(-1) # attention_weights = masked_softmax(scores, valid_lens) # # values的形状:(batch_size,“键-值”对的个数,值的维度) # return torch.bmm(self.dropout(attention_weights), values) class AdditiveAttention(nn.Module): """注意⼒机制""" def __init__(self, input_size, value_size, num_hiddens, dropout): super(AdditiveAttention, self).__init__() self.W_k = nn.Linear(input_size, num_hiddens, bias=False) self.W_q = nn.Linear(input_size, num_hiddens, bias=False) self.w_v = nn.Linear(input_size, num_hiddens, bias=False) self.w_o = nn.Linear(50, value_size, bias=False) self.dropout = nn.Dropout(dropout) def forward(self, queries, keys, values, valid_lens=None): queries, keys = self.W_q(queries), self.W_k(keys) d = queries.shape[-1] # 在维度扩展后, # queries的形状:(batch_size,查询的个数,1,num_hidden) # key的形状:(batch_size,1,“键-值”对的个数,num_hiddens) # 使⽤⼴播⽅式进⾏求和 # features = queries + keys # features = torch.tanh(features) # self.w_v仅有⼀个输出,因此从形状中移除最后那个维度。 # scores的形状:(batch_size,查询的个数,“键-值”对的个数) scores = torch.bmm(queries, keys.transpose(1, 2)) / math.sqrt(d) scores = self.w_o(scores).permute(0, 2, 1) attention_weights = masked_softmax(scores, valid_lens) # attention_weights = nn.Softmax(dim=1)(scores) values = self.w_v(values) # values = torch.transpose(values, 1, 2) # values的形状:(batch_size,“键-值”对的个数,值的维度) return torch.bmm(self.dropout(attention_weights), values), attention_weights class MultiHeadAttention(nn.Module): """多头注意力""" def __init__(self, key_size, query_size, value_size, num_hiddens, num_heads, dropout, bias=False): super(MultiHeadAttention, self).__init__() self.num_heads = num_heads self.attention = DotProductAttention(dropout) self.W_q = nn.Linear(query_size, num_hiddens, bias=bias) self.W_k = nn.Linear(key_size, num_hiddens, bias=bias) self.W_v = nn.Linear(value_size, num_hiddens, bias=bias) self.W_o = nn.Linear(num_hiddens, num_hiddens, bias=bias) def forward(self, queries, keys, values, valid_lens=None): # queries,keys,values的形状: # (batch_size,查询或者“键-值”对的个数,num_hiddens) # valid_lens 的形状: # (batch_size,)或(batch_size,查询的个数) # 经过变换后,输出的queries,keys,values 的形状: # (batch_size*num_heads,查询或者“键-值”对的个数, # num_hiddens/num_heads) queries = transpose_qkv(self.W_q(queries), self.num_heads) keys = transpose_qkv(self.W_k(keys), self.num_heads) values = transpose_qkv(self.W_v(values), self.num_heads) if valid_lens is not None: # 在轴0,将第一项(标量或者矢量)复制num_heads次, # 然后如此复制第二项,然后诸如此类。 valid_lens = torch.repeat_interleave(valid_lens, repeats=self.num_heads, dim=0) # output的形状:(batch_size*num_heads,查询的个数,num_hiddens/num_heads) output = self.attention(queries, keys, values, valid_lens) # output_concat的形状:(batch_size,查询的个数,num_hiddens) output_concat = transpose_output(output, self.num_heads) return self.W_o(output_concat) def transpose_qkv(X, num_heads): """为了多注意力头的并行计算而变换形状""" # 输入X的形状:(batch_size,查询或者“键-值”对的个数,num_hiddens) # 输出X的形状:(batch_size,查询或者“键-值”对的个数,num_heads, # num_hiddens/num_heads) X = X.reshape(X.shape[0], X.shape[1], num_heads, -1) # 输出X的形状:(batch_size,num_heads,查询或者“键-值”对的个数, # num_hiddens/num_heads) X = X.permute(0, 2, 1, 3) # 最终输出的形状:(batch_size*num_heads,查询或者“键-值”对的个数, # num_hiddens/num_heads) return X.reshape(-1, X.shape[2], X.shape[3]) def transpose_output(X, num_heads): """逆转transpose_qkv函数的操作""" X = X.reshape(-1, num_heads, X.shape[1], X.shape[2]) X = X.permute(0, 2, 1, 3) return X.reshape(X.shape[0], X.shape[1], -1) class DotProductAttention(nn.Module): """缩放点积注意力""" def __init__(self, dropout): super(DotProductAttention, self).__init__() self.dropout = nn.Dropout(dropout) # queries的形状:(batch_size,查询的个数,d) # keys的形状:(batch_size,“键-值”对的个数,d) # values的形状:(batch_size,“键-值”对的个数,值的维度) # valid_lens的形状:(batch_size,)或者(batch_size,查询的个数) def forward(self, queries, keys, values, valid_lens=None): d = queries.shape[-1] # 设置transpose_b=True为了交换keys的最后两个维度 scores = torch.bmm(queries, keys.transpose(1, 2)) / math.sqrt(d) attention_weights = masked_softmax(scores, valid_lens) return torch.bmm(self.dropout(attention_weights), values) class MASK_AttentionEncode(nn.Module): def __init__(self, dropout, embedding_size, num_heads): super(MASK_AttentionEncode, self).__init__() self.dropout = dropout self.embedding_size = embedding_size self.num_heads = num_heads self.at1 = MultiHeadAttention(key_size=self.embedding_size, query_size=self.embedding_size, value_size=self.embedding_size, num_hiddens=self.embedding_size, num_heads=self.num_heads, dropout=self.dropout) self.addNorm = AddNorm(normalized=[50, self.embedding_size], dropout=self.dropout) self.FFN = PositionWiseFFN(ffn_num_input=64, ffn_num_hiddens=192, ffn_num_outputs=64) def forward(self, x, y=None): # Multi, _ = self.at1(x, x, x) Multi = self.at1(x, x, x, y) Multi_encode = self.addNorm(x, Multi) # encode_output = self.addNorm(Multi_encode, self.FFN(Multi_encode)) return Multi_encode class transformer_encode(nn.Module): def __init__(self, dropout, embedding, num_heads): super(transformer_encode, self).__init__() self.dropout = dropout self.embedding_size = embedding self.num_heads = num_heads self.attention = nn.MultiheadAttention(embed_dim=192, num_heads=8, dropout=0.6 ) self.at1 = MultiHeadAttention(key_size=self.embedding_size, query_size=self.embedding_size, value_size=self.embedding_size, num_hiddens=self.embedding_size, num_heads=self.num_heads, dropout=self.dropout) self.addNorm = AddNorm(normalized=[50, self.embedding_size], dropout=self.dropout) self.ffn = PositionWiseFFN(ffn_num_input=self.embedding_size, ffn_num_hiddens=2*self.embedding_size, ffn_num_outputs=self.embedding_size) def forward(self, x, valid=None): # Multi, _ = self.attention(x, x, x) Multi = self.at1(x, x, x, valid) Multi_encode = self.addNorm(x, Multi) encode_output = self.addNorm(Multi_encode, self.ffn(Multi_encode)) return encode_output