Spaces:
Running
on
Zero
Running
on
Zero
| import numpy as np | |
| import torch | |
| import torch.nn as nn | |
| from .graphtrans_module import gather_edges, cat_neighbors_nodes | |
| class PositionWiseFeedForward(nn.Module): | |
| def __init__(self, num_hidden, num_ff): | |
| super(PositionWiseFeedForward, self).__init__() | |
| self.W_in = nn.Linear(num_hidden, num_ff, bias=True) | |
| self.W_out = nn.Linear(num_ff, num_hidden, bias=True) | |
| self.act = torch.nn.GELU() | |
| def forward(self, h_V): | |
| h = self.act(self.W_in(h_V)) | |
| h = self.W_out(h) | |
| return h | |
| class PositionalEncodings(nn.Module): | |
| def __init__(self, num_embeddings, max_relative_feature=32): | |
| super(PositionalEncodings, self).__init__() | |
| self.num_embeddings = num_embeddings | |
| self.max_relative_feature = max_relative_feature | |
| self.linear = nn.Linear(2*max_relative_feature+1+1, num_embeddings) | |
| def forward(self, offset, mask): | |
| d = torch.clip(offset + self.max_relative_feature, 0, 2*self.max_relative_feature)*mask + (1-mask)*(2*self.max_relative_feature+1) | |
| d_onehot = torch.nn.functional.one_hot(d, 2*self.max_relative_feature+1+1) | |
| E = self.linear(d_onehot.float()) | |
| return E | |
| class EncLayer(nn.Module): | |
| def __init__(self, num_hidden, num_in, dropout=0.1, num_heads=None, scale=30, proteinmpnn_type=0): | |
| super(EncLayer, self).__init__() | |
| self.num_hidden = num_hidden | |
| self.num_in = num_in | |
| self.scale = scale | |
| self.dropout1 = nn.Dropout(dropout) | |
| self.dropout2 = nn.Dropout(dropout) | |
| self.dropout3 = nn.Dropout(dropout) | |
| self.norm1 = nn.LayerNorm(num_hidden) | |
| self.norm2 = nn.LayerNorm(num_hidden) | |
| self.norm3 = nn.LayerNorm(num_hidden) | |
| self.W1 = nn.Linear(num_hidden + num_in, num_hidden, bias=True) | |
| self.W2 = nn.Linear(num_hidden, num_hidden, bias=True) | |
| self.W3 = nn.Linear(num_hidden, num_hidden, bias=True) | |
| self.W11 = nn.Linear(num_hidden + num_in, num_hidden, bias=True) | |
| self.W12 = nn.Linear(num_hidden, num_hidden, bias=True) | |
| self.W13 = nn.Linear(num_hidden, num_hidden, bias=True) | |
| self.act = torch.nn.GELU() | |
| self.dense = PositionWiseFeedForward(num_hidden, num_hidden * 4) | |
| self.proteinmpnn_type = proteinmpnn_type | |
| def forward(self, h_V, h_E, E_idx, mask_V=None, mask_attend=None): | |
| """ Parallel computation of full transformer layer """ | |
| h_EV = cat_neighbors_nodes(h_V, h_E, E_idx) | |
| h_V_expand = h_V.unsqueeze(-2).expand(-1,-1,h_EV.size(-2),-1) | |
| # import pdb; pdb.set_trace() | |
| h_EV = torch.cat([h_V_expand, h_EV], -1) | |
| h_message = self.W3(self.act(self.W2(self.act(self.W1(h_EV))))) | |
| if mask_attend is not None: | |
| h_message = mask_attend.unsqueeze(-1) * h_message | |
| dh = torch.sum(h_message, -2) / self.scale | |
| h_V = self.norm1(h_V + self.dropout1(dh)) | |
| dh = self.dense(h_V) | |
| h_V = self.norm2(h_V + self.dropout2(dh)) | |
| if mask_V is not None: | |
| mask_V = mask_V.unsqueeze(-1) | |
| h_V = mask_V * h_V | |
| if self.proteinmpnn_type != 3: | |
| h_EV = cat_neighbors_nodes(h_V, h_E, E_idx) | |
| h_V_expand = h_V.unsqueeze(-2).expand(-1,-1,h_EV.size(-2),-1) | |
| h_EV = torch.cat([h_V_expand, h_EV], -1) | |
| h_message = self.W13(self.act(self.W12(self.act(self.W11(h_EV))))) | |
| h_E = self.norm3(h_E + self.dropout3(h_message)) | |
| return h_V, h_E | |
| class DecLayer(nn.Module): | |
| def __init__(self, num_hidden, num_in, dropout=0.1, num_heads=None, scale=30): | |
| super(DecLayer, self).__init__() | |
| self.num_hidden = num_hidden | |
| self.num_in = num_in | |
| self.scale = scale | |
| self.dropout1 = nn.Dropout(dropout) | |
| self.dropout2 = nn.Dropout(dropout) | |
| self.norm1 = nn.LayerNorm(num_hidden) | |
| self.norm2 = nn.LayerNorm(num_hidden) | |
| self.W1 = nn.Linear(num_hidden + num_in, num_hidden, bias=True) | |
| self.W2 = nn.Linear(num_hidden, num_hidden, bias=True) | |
| self.W3 = nn.Linear(num_hidden, num_hidden, bias=True) | |
| self.act = torch.nn.GELU() | |
| self.dense = PositionWiseFeedForward(num_hidden, num_hidden * 4) | |
| def forward(self, h_V, h_E, mask_V=None, mask_attend=None): | |
| """ Parallel computation of full transformer layer """ | |
| # Concatenate h_V_i to h_E_ij | |
| h_V_expand = h_V.unsqueeze(-2).expand(-1,-1,h_E.size(-2),-1) | |
| h_EV = torch.cat([h_V_expand, h_E], -1) | |
| h_message = self.W3(self.act(self.W2(self.act(self.W1(h_EV))))) | |
| if mask_attend is not None: | |
| h_message = mask_attend.unsqueeze(-1) * h_message | |
| dh = torch.sum(h_message, -2) / self.scale | |
| h_V = self.norm1(h_V + self.dropout1(dh)) | |
| # Position-wise feedforward | |
| dh = self.dense(h_V) | |
| h_V = self.norm2(h_V + self.dropout2(dh)) | |
| if mask_V is not None: | |
| mask_V = mask_V.unsqueeze(-1) | |
| h_V = mask_V * h_V | |
| return h_V | |
| class ProteinFeatures(nn.Module): | |
| def __init__(self, edge_features, node_features, num_positional_embeddings=16, | |
| num_rbf=16, top_k=30, augment_eps=0., num_chain_embeddings=16, proteinmpnn_type=0): | |
| """ Extract protein features """ | |
| super(ProteinFeatures, self).__init__() | |
| self.edge_features = edge_features | |
| self.node_features = node_features | |
| self.top_k = top_k | |
| self.augment_eps = augment_eps | |
| self.num_rbf = num_rbf | |
| self.num_positional_embeddings = num_positional_embeddings | |
| self.embeddings = PositionalEncodings(num_positional_embeddings) | |
| node_in, edge_in = 6, num_positional_embeddings + num_rbf*25 | |
| self.proteinmpnn_type = proteinmpnn_type | |
| if self.proteinmpnn_type == 2: | |
| edge_in = 32 | |
| self.edge_embedding = nn.Linear(edge_in, edge_features, bias=False) | |
| self.norm_edges = nn.LayerNorm(edge_features) | |
| self.proteinmpnn_type = proteinmpnn_type | |
| def _dist(self, X, mask, eps=1E-6): | |
| mask_2D = torch.unsqueeze(mask,1) * torch.unsqueeze(mask,2) | |
| dX = torch.unsqueeze(X,1) - torch.unsqueeze(X,2) | |
| D = mask_2D * torch.sqrt(torch.sum(dX**2, 3) + eps) | |
| D_max, _ = torch.max(D, -1, keepdim=True) | |
| D_adjust = D + (1. - mask_2D) * D_max | |
| sampled_top_k = self.top_k | |
| D_neighbors, E_idx = torch.topk(D_adjust, np.minimum(self.top_k, X.shape[1]), dim=-1, largest=False) | |
| return D_neighbors, E_idx | |
| def _rbf(self, D): | |
| device = D.device | |
| D_min, D_max, D_count = 2., 22., self.num_rbf | |
| D_mu = torch.linspace(D_min, D_max, D_count, device=device) | |
| D_mu = D_mu.view([1,1,1,-1]) | |
| D_sigma = (D_max - D_min) / D_count | |
| D_expand = torch.unsqueeze(D, -1) | |
| RBF = torch.exp(-((D_expand - D_mu) / D_sigma)**2) | |
| return RBF | |
| def _get_rbf(self, A, B, E_idx): | |
| D_A_B = torch.sqrt(torch.sum((A[:,:,None,:] - B[:,None,:,:])**2,-1) + 1e-6) #[B, L, L] | |
| D_A_B_neighbors = gather_edges(D_A_B[:,:,:,None], E_idx)[:,:,:,0] #[B,L,K] | |
| RBF_A_B = self._rbf(D_A_B_neighbors) | |
| return RBF_A_B | |
| def forward(self, X, mask, residue_idx, chain_labels): | |
| b = X[:,:,1,:] - X[:,:,0,:] | |
| c = X[:,:,2,:] - X[:,:,1,:] | |
| a = torch.cross(b, c, dim=-1) | |
| Cb = -0.58273431*a + 0.56802827*b - 0.54067466*c + X[:,:,1,:] | |
| Ca = X[:,:,1,:] | |
| N = X[:,:,0,:] | |
| C = X[:,:,2,:] | |
| O = X[:,:,3,:] | |
| D_neighbors, E_idx = self._dist(Ca, mask) | |
| if self.proteinmpnn_type == 2: | |
| RBF_all = self._rbf(D_neighbors) | |
| else: | |
| RBF_all = [] | |
| RBF_all.append(self._rbf(D_neighbors)) #Ca-Ca | |
| RBF_all.append(self._get_rbf(N, N, E_idx)) #N-N | |
| RBF_all.append(self._get_rbf(C, C, E_idx)) #C-C | |
| RBF_all.append(self._get_rbf(O, O, E_idx)) #O-O | |
| RBF_all.append(self._get_rbf(Cb, Cb, E_idx)) #Cb-Cb | |
| RBF_all.append(self._get_rbf(Ca, N, E_idx)) #Ca-N | |
| RBF_all.append(self._get_rbf(Ca, C, E_idx)) #Ca-C | |
| RBF_all.append(self._get_rbf(Ca, O, E_idx)) #Ca-O | |
| RBF_all.append(self._get_rbf(Ca, Cb, E_idx)) #Ca-Cb | |
| RBF_all.append(self._get_rbf(N, C, E_idx)) #N-C | |
| RBF_all.append(self._get_rbf(N, O, E_idx)) #N-O | |
| RBF_all.append(self._get_rbf(N, Cb, E_idx)) #N-Cb | |
| RBF_all.append(self._get_rbf(Cb, C, E_idx)) #Cb-C | |
| RBF_all.append(self._get_rbf(Cb, O, E_idx)) #Cb-O | |
| RBF_all.append(self._get_rbf(O, C, E_idx)) #O-C | |
| RBF_all.append(self._get_rbf(N, Ca, E_idx)) #N-Ca | |
| RBF_all.append(self._get_rbf(C, Ca, E_idx)) #C-Ca | |
| RBF_all.append(self._get_rbf(O, Ca, E_idx)) #O-Ca | |
| RBF_all.append(self._get_rbf(Cb, Ca, E_idx)) #Cb-Ca | |
| RBF_all.append(self._get_rbf(C, N, E_idx)) #C-N | |
| RBF_all.append(self._get_rbf(O, N, E_idx)) #O-N | |
| RBF_all.append(self._get_rbf(Cb, N, E_idx)) #Cb-N | |
| RBF_all.append(self._get_rbf(C, Cb, E_idx)) #C-Cb | |
| RBF_all.append(self._get_rbf(O, Cb, E_idx)) #O-Cb | |
| RBF_all.append(self._get_rbf(C, O, E_idx)) #C-O | |
| RBF_all = torch.cat(tuple(RBF_all), dim=-1) | |
| offset = residue_idx[:,:,None]-residue_idx[:,None,:] | |
| offset = gather_edges(offset[:,:,:,None], E_idx)[:,:,:,0] #[B, L, K] | |
| d_chains = ((chain_labels[:, :, None] - chain_labels[:,None,:])==0).long() #find self vs non-self interaction | |
| E_chains = gather_edges(d_chains[:,:,:,None], E_idx)[:,:,:,0] | |
| E_positional = self.embeddings(offset.long(), E_chains) | |
| E = torch.cat((E_positional, RBF_all), -1) | |
| E = self.edge_embedding(E) | |
| E = self.norm_edges(E) | |
| return E, E_idx |