Spaces:
Running
on
Zero
Running
on
Zero
| #import dependencies | |
| import os.path | |
| import torch | |
| import torch.nn as nn | |
| import torch.nn.functional as F | |
| from torch.nn import BCEWithLogitsLoss, CrossEntropyLoss, MSELoss | |
| from torch.utils.data import DataLoader | |
| import re | |
| import numpy as np | |
| import pandas as pd | |
| import copy | |
| import pdb | |
| import transformers, datasets | |
| from transformers.modeling_outputs import TokenClassifierOutput | |
| from transformers.models.t5.modeling_t5 import T5Config, T5PreTrainedModel, T5Stack | |
| from transformers.utils.model_parallel_utils import assert_device_map, get_device_map | |
| from transformers import T5EncoderModel, T5Tokenizer | |
| from transformers import TrainingArguments, Trainer, set_seed | |
| #DataCollator | |
| from transformers.data.data_collator import DataCollatorMixin | |
| from transformers.tokenization_utils_base import PreTrainedTokenizerBase | |
| from transformers.utils import PaddingStrategy | |
| import random | |
| import warnings | |
| from collections.abc import Mapping | |
| from dataclasses import dataclass | |
| from random import randint | |
| from typing import Any, Callable, Dict, List, NewType, Optional, Tuple, Union | |
| from evaluate import load | |
| from datasets import Dataset | |
| from tqdm import tqdm | |
| import random | |
| from scipy import stats | |
| from sklearn.metrics import accuracy_score | |
| import matplotlib.pyplot as plt | |
| from Bio import SeqIO | |
| from io import StringIO | |
| import requests | |
| import tempfile | |
| from sklearn.model_selection import train_test_split | |
| import csv | |
| #### UTILS | |
| class LoRAConfig: | |
| def __init__(self): | |
| self.lora_rank = 4 | |
| self.lora_init_scale = 0.01 | |
| self.lora_modules = ".*SelfAttention|.*EncDecAttention" | |
| self.lora_layers = "q|k|v|o" | |
| self.trainable_param_names = ".*layer_norm.*|.*lora_[ab].*" | |
| self.lora_scaling_rank = 1 | |
| # lora_modules and lora_layers are speicified with regular expressions | |
| # see https://www.w3schools.com/python/python_regex.asp for reference | |
| class LoRALinear(nn.Module): | |
| def __init__(self, linear_layer, rank, scaling_rank, init_scale): | |
| super().__init__() | |
| self.in_features = linear_layer.in_features | |
| self.out_features = linear_layer.out_features | |
| self.rank = rank | |
| self.scaling_rank = scaling_rank | |
| self.weight = linear_layer.weight | |
| self.bias = linear_layer.bias | |
| if self.rank > 0: | |
| self.lora_a = nn.Parameter(torch.randn(rank, linear_layer.in_features) * init_scale) | |
| if init_scale < 0: | |
| self.lora_b = nn.Parameter(torch.randn(linear_layer.out_features, rank) * init_scale) | |
| else: | |
| self.lora_b = nn.Parameter(torch.zeros(linear_layer.out_features, rank)) | |
| if self.scaling_rank: | |
| self.multi_lora_a = nn.Parameter( | |
| torch.ones(self.scaling_rank, linear_layer.in_features) | |
| + torch.randn(self.scaling_rank, linear_layer.in_features) * init_scale | |
| ) | |
| if init_scale < 0: | |
| self.multi_lora_b = nn.Parameter( | |
| torch.ones(linear_layer.out_features, self.scaling_rank) | |
| + torch.randn(linear_layer.out_features, self.scaling_rank) * init_scale | |
| ) | |
| else: | |
| self.multi_lora_b = nn.Parameter(torch.ones(linear_layer.out_features, self.scaling_rank)) | |
| def forward(self, input): | |
| if self.scaling_rank == 1 and self.rank == 0: | |
| # parsimonious implementation for ia3 and lora scaling | |
| if self.multi_lora_a.requires_grad: | |
| hidden = F.linear((input * self.multi_lora_a.flatten()), self.weight, self.bias) | |
| else: | |
| hidden = F.linear(input, self.weight, self.bias) | |
| if self.multi_lora_b.requires_grad: | |
| hidden = hidden * self.multi_lora_b.flatten() | |
| return hidden | |
| else: | |
| # general implementation for lora (adding and scaling) | |
| weight = self.weight | |
| if self.scaling_rank: | |
| weight = weight * torch.matmul(self.multi_lora_b, self.multi_lora_a) / self.scaling_rank | |
| if self.rank: | |
| weight = weight + torch.matmul(self.lora_b, self.lora_a) / self.rank | |
| return F.linear(input, weight, self.bias) | |
| def extra_repr(self): | |
| return "in_features={}, out_features={}, bias={}, rank={}, scaling_rank={}".format( | |
| self.in_features, self.out_features, self.bias is not None, self.rank, self.scaling_rank | |
| ) | |
| def modify_with_lora(transformer, config): | |
| for m_name, module in dict(transformer.named_modules()).items(): | |
| if re.fullmatch(config.lora_modules, m_name): | |
| for c_name, layer in dict(module.named_children()).items(): | |
| if re.fullmatch(config.lora_layers, c_name): | |
| assert isinstance( | |
| layer, nn.Linear | |
| ), f"LoRA can only be applied to torch.nn.Linear, but {layer} is {type(layer)}." | |
| setattr( | |
| module, | |
| c_name, | |
| LoRALinear(layer, config.lora_rank, config.lora_scaling_rank, config.lora_init_scale), | |
| ) | |
| return transformer | |
| class ClassConfig: | |
| def __init__(self, dropout=0.2, num_labels=1): | |
| self.dropout_rate = dropout | |
| self.num_labels = num_labels | |
| class T5EncoderForTokenClassification(T5PreTrainedModel): | |
| def __init__(self, config: T5Config, class_config): | |
| super().__init__(config) | |
| self.num_labels = class_config.num_labels | |
| self.config = config | |
| self.shared = nn.Embedding(config.vocab_size, config.d_model) | |
| encoder_config = copy.deepcopy(config) | |
| encoder_config.use_cache = False | |
| encoder_config.is_encoder_decoder = False | |
| self.encoder = T5Stack(encoder_config, self.shared) | |
| self.dropout = nn.Dropout(class_config.dropout_rate) | |
| self.classifier = nn.Linear(config.hidden_size, class_config.num_labels) | |
| # Initialize weights and apply final processing | |
| self.post_init() | |
| # Model parallel | |
| self.model_parallel = False | |
| self.device_map = None | |
| def parallelize(self, device_map=None): | |
| self.device_map = ( | |
| get_device_map(len(self.encoder.block), range(torch.cuda.device_count())) | |
| if device_map is None | |
| else device_map | |
| ) | |
| assert_device_map(self.device_map, len(self.encoder.block)) | |
| self.encoder.parallelize(self.device_map) | |
| self.classifier = self.classifier.to(self.encoder.first_device) | |
| self.model_parallel = True | |
| def deparallelize(self): | |
| self.encoder.deparallelize() | |
| self.encoder = self.encoder.to("cpu") | |
| self.model_parallel = False | |
| self.device_map = None | |
| torch.cuda.empty_cache() | |
| def get_input_embeddings(self): | |
| return self.shared | |
| def set_input_embeddings(self, new_embeddings): | |
| self.shared = new_embeddings | |
| self.encoder.set_input_embeddings(new_embeddings) | |
| def get_encoder(self): | |
| return self.encoder | |
| def _prune_heads(self, heads_to_prune): | |
| """ | |
| Prunes heads of the model. heads_to_prune: dict of {layer_num: list of heads to prune in this layer} See base | |
| class PreTrainedModel | |
| """ | |
| for layer, heads in heads_to_prune.items(): | |
| self.encoder.layer[layer].attention.prune_heads(heads) | |
| def forward( | |
| self, | |
| input_ids=None, | |
| attention_mask=None, | |
| head_mask=None, | |
| inputs_embeds=None, | |
| labels=None, | |
| output_attentions=None, | |
| output_hidden_states=None, | |
| return_dict=None, | |
| ): | |
| return_dict = return_dict if return_dict is not None else self.config.use_return_dict | |
| outputs = self.encoder( | |
| input_ids=input_ids, | |
| attention_mask=attention_mask, | |
| inputs_embeds=inputs_embeds, | |
| head_mask=head_mask, | |
| output_attentions=output_attentions, | |
| output_hidden_states=output_hidden_states, | |
| return_dict=return_dict, | |
| ) | |
| sequence_output = outputs[0] | |
| sequence_output = self.dropout(sequence_output) | |
| logits = self.classifier(sequence_output) | |
| loss = None | |
| if labels is not None: | |
| loss_fct = MSELoss() | |
| active_loss = attention_mask.view(-1) == 1 | |
| active_logits = logits.view(-1) | |
| active_labels = torch.where( | |
| active_loss, labels.view(-1), torch.tensor(-100).type_as(labels) | |
| ) | |
| valid_logits=active_logits[active_labels!=-100] | |
| valid_labels=active_labels[active_labels!=-100] | |
| loss = loss_fct(valid_logits, valid_labels) | |
| if not return_dict: | |
| output = (logits,) + outputs[2:] | |
| return ((loss,) + output) if loss is not None else output | |
| return TokenClassifierOutput( | |
| loss=loss, | |
| logits=logits, | |
| hidden_states=outputs.hidden_states, | |
| attentions=outputs.attentions, | |
| ) | |
| def PT5_classification_model(num_labels, half_precision): | |
| # Load PT5 and tokenizer | |
| # possible to load the half preciion model (thanks to @pawel-rezo for pointing that out) | |
| if not half_precision: | |
| model = T5EncoderModel.from_pretrained("Rostlab/prot_t5_xl_uniref50") | |
| tokenizer = T5Tokenizer.from_pretrained("Rostlab/prot_t5_xl_uniref50") | |
| elif half_precision and torch.cuda.is_available() : | |
| tokenizer = T5Tokenizer.from_pretrained('Rostlab/prot_t5_xl_half_uniref50-enc', do_lower_case=False) | |
| model = T5EncoderModel.from_pretrained("Rostlab/prot_t5_xl_half_uniref50-enc", torch_dtype=torch.float16).to(torch.device('cuda')) | |
| else: | |
| raise ValueError('Half precision can be run on GPU only.') | |
| # Create new Classifier model with PT5 dimensions | |
| class_config=ClassConfig(num_labels=num_labels) | |
| class_model=T5EncoderForTokenClassification(model.config,class_config) | |
| # Set encoder and embedding weights to checkpoint weights | |
| class_model.shared=model.shared | |
| class_model.encoder=model.encoder | |
| # Delete the checkpoint model | |
| model=class_model | |
| del class_model | |
| # Print number of trainable parameters | |
| model_parameters = filter(lambda p: p.requires_grad, model.parameters()) | |
| params = sum([np.prod(p.size()) for p in model_parameters]) | |
| print("ProtT5_Classfier\nTrainable Parameter: "+ str(params)) | |
| # Add model modification lora | |
| config = LoRAConfig() | |
| # Add LoRA layers | |
| model = modify_with_lora(model, config) | |
| # Freeze Embeddings and Encoder (except LoRA) | |
| for (param_name, param) in model.shared.named_parameters(): | |
| param.requires_grad = False | |
| for (param_name, param) in model.encoder.named_parameters(): | |
| param.requires_grad = False | |
| for (param_name, param) in model.named_parameters(): | |
| if re.fullmatch(config.trainable_param_names, param_name): | |
| param.requires_grad = True | |
| # Print trainable Parameter | |
| model_parameters = filter(lambda p: p.requires_grad, model.parameters()) | |
| params = sum([np.prod(p.size()) for p in model_parameters]) | |
| print("ProtT5_LoRA_Classfier\nTrainable Parameter: "+ str(params) + "\n") | |
| return model, tokenizer | |
| class DataCollatorForTokenRegression(DataCollatorMixin): | |
| """ | |
| Data collator that will dynamically pad the inputs received, as well as the labels. | |
| Args: | |
| tokenizer ([`PreTrainedTokenizer`] or [`PreTrainedTokenizerFast`]): | |
| The tokenizer used for encoding the data. | |
| padding (`bool`, `str` or [`~utils.PaddingStrategy`], *optional*, defaults to `True`): | |
| Select a strategy to pad the returned sequences (according to the model's padding side and padding index) | |
| among: | |
| - `True` or `'longest'` (default): Pad to the longest sequence in the batch (or no padding if only a single | |
| sequence is provided). | |
| - `'max_length'`: Pad to a maximum length specified with the argument `max_length` or to the maximum | |
| acceptable input length for the model if that argument is not provided. | |
| - `False` or `'do_not_pad'`: No padding (i.e., can output a batch with sequences of different lengths). | |
| max_length (`int`, *optional*): | |
| Maximum length of the returned list and optionally padding length (see above). | |
| pad_to_multiple_of (`int`, *optional*): | |
| If set will pad the sequence to a multiple of the provided value. | |
| This is especially useful to enable the use of Tensor Cores on NVIDIA hardware with compute capability >= | |
| 7.5 (Volta). | |
| label_pad_token_id (`int`, *optional*, defaults to -100): | |
| The id to use when padding the labels (-100 will be automatically ignore by PyTorch loss functions). | |
| return_tensors (`str`): | |
| The type of Tensor to return. Allowable values are "np", "pt" and "tf". | |
| """ | |
| tokenizer: PreTrainedTokenizerBase | |
| padding: Union[bool, str, PaddingStrategy] = True | |
| max_length: Optional[int] = None | |
| pad_to_multiple_of: Optional[int] = None | |
| label_pad_token_id: int = -100 | |
| return_tensors: str = "pt" | |
| def torch_call(self, features): | |
| import torch | |
| label_name = "label" if "label" in features[0].keys() else "labels" | |
| labels = [feature[label_name] for feature in features] if label_name in features[0].keys() else None | |
| no_labels_features = [{k: v for k, v in feature.items() if k != label_name} for feature in features] | |
| batch = self.tokenizer.pad( | |
| no_labels_features, | |
| padding=self.padding, | |
| max_length=self.max_length, | |
| pad_to_multiple_of=self.pad_to_multiple_of, | |
| return_tensors="pt", | |
| ) | |
| if labels is None: | |
| return batch | |
| sequence_length = batch["input_ids"].shape[1] | |
| padding_side = self.tokenizer.padding_side | |
| def to_list(tensor_or_iterable): | |
| if isinstance(tensor_or_iterable, torch.Tensor): | |
| return tensor_or_iterable.tolist() | |
| return list(tensor_or_iterable) | |
| if padding_side == "right": | |
| batch[label_name] = [ | |
| to_list(label) + [self.label_pad_token_id] * (sequence_length - len(label)) for label in labels | |
| ] | |
| else: | |
| batch[label_name] = [ | |
| [self.label_pad_token_id] * (sequence_length - len(label)) + to_list(label) for label in labels | |
| ] | |
| batch[label_name] = torch.tensor(batch[label_name], dtype=torch.float) | |
| return batch | |
| def _torch_collate_batch(examples, tokenizer, pad_to_multiple_of: Optional[int] = None): | |
| """Collate `examples` into a batch, using the information in `tokenizer` for padding if necessary.""" | |
| import torch | |
| # Tensorize if necessary. | |
| if isinstance(examples[0], (list, tuple, np.ndarray)): | |
| examples = [torch.tensor(e, dtype=torch.long) for e in examples] | |
| length_of_first = examples[0].size(0) | |
| # Check if padding is necessary. | |
| are_tensors_same_length = all(x.size(0) == length_of_first for x in examples) | |
| if are_tensors_same_length and (pad_to_multiple_of is None or length_of_first % pad_to_multiple_of == 0): | |
| return torch.stack(examples, dim=0) | |
| # If yes, check if we have a `pad_token`. | |
| if tokenizer._pad_token is None: | |
| raise ValueError( | |
| "You are attempting to pad samples but the tokenizer you are using" | |
| f" ({tokenizer.__class__.__name__}) does not have a pad token." | |
| ) | |
| # Creating the full tensor and filling it with our data. | |
| max_length = max(x.size(0) for x in examples) | |
| if pad_to_multiple_of is not None and (max_length % pad_to_multiple_of != 0): | |
| max_length = ((max_length // pad_to_multiple_of) + 1) * pad_to_multiple_of | |
| result = examples[0].new_full([len(examples), max_length], tokenizer.pad_token_id) | |
| for i, example in enumerate(examples): | |
| if tokenizer.padding_side == "right": | |
| result[i, : example.shape[0]] = example | |
| else: | |
| result[i, -example.shape[0] :] = example | |
| return result | |
| def tolist(x): | |
| if isinstance(x, list): | |
| return x | |
| elif hasattr(x, "numpy"): # Checks for TF tensors without needing the import | |
| x = x.numpy() | |
| return x.tolist() | |
| def do_topology_split(df, split_path): | |
| import json | |
| with open(split_path, 'r') as f: | |
| splits = json.load(f) | |
| #split the dataframe according to the splits | |
| train_df = df[df['name'].isin(splits['train'])] | |
| valid_df = df[df['name'].isin(splits['validation'])] | |
| test_df = df[df['name'].isin(splits['test'])] | |
| return train_df, valid_df, test_df | |
| class FlexibilityProtTrans(nn.Module): | |
| def __init__(self, checkpoint_path, num_labels, half_precision, gumbel_temperature, flex_loss_weight, **kwargs): | |
| super(FlexibilityProtTrans, self).__init__() | |
| # self.num_labels = num_labels #passed from the configs | |
| # self.half_precision = half_precision #passed from the configs | |
| model, tokenizer = self.load_finetuned_model(filepath=checkpoint_path, num_labels=num_labels, mixed = half_precision) | |
| self.model = model | |
| self.tokenizer = tokenizer | |
| self.device = torch.device('cuda') | |
| self.model.to(self.device) | |
| self.model.eval() | |
| self.gumbel_temperature = gumbel_temperature | |
| self.flex_loss_weight = flex_loss_weight | |
| self.logit_transform = nn.functional.gumbel_softmax #Use the Straight Through Gumbel SoftMax - in forward process it does argmax, | |
| # in the backward process it approximates the gradient of argmax by the gradient of the Gumbel Softmax | |
| # https://pytorch.org/docs/stable/generated/torch.nn.functional.gumbel_softmax.html set hard=True to do the Straight-Through trick | |
| def load_finetuned_model(self, filepath, num_labels=1, mixed = False): | |
| # load a new model | |
| model, tokenizer = PT5_classification_model(num_labels=num_labels, half_precision=mixed) | |
| # Load the non-frozen parameters from the saved file | |
| non_frozen_params = torch.load(filepath) | |
| # Assign the non-frozen parameters to the corresponding parameters of the model | |
| for param_name, param in model.named_parameters(): | |
| if param_name in non_frozen_params: | |
| param.data = non_frozen_params[param_name].data | |
| ### Turn off all Bfactor prediction gradients | |
| for param in model.parameters(): | |
| param.requires_grad = False | |
| model_parameters = filter(lambda p: p.requires_grad, model.parameters()) | |
| params = sum([np.prod(p.size()) for p in model_parameters]) | |
| print("ProtT5_Classfier - After loading to IF pipeline\nTrainable Parameter: "+ str(params)) | |
| return model, tokenizer | |
| def translate_to_model_vocab(self, batch_one_hot): | |
| #Translate the one-hot encoding to the model vocabulary | |
| #The model vocabulary is the same as the one-hot encoding, so it is just a tensor multiplication | |
| # pdb.set_trace() | |
| # ptt = {'<pad>': 0, '</s>': 1, '<unk>': 2, 'A': 3, 'L': 4, 'G': 5, 'V': 6, 'S': 7, 'R': 8, 'E': 9, 'D': 10, 'T': 11, 'I': 12, 'P': 13, 'K': 14, 'F': 15, 'Q': 16, 'N': 17, 'Y': 18, 'M': 19, 'H': 20, 'W': 21, 'C': 22, 'X': 23, 'B': 24, 'O': 25, 'U': 26, 'Z': 27} | |
| # pmt = {'<cls>': 0, '<pad>': 1, '<eos>': 2, '<unk>': 3, 'L': 4, 'A': 5, 'G': 6, 'V': 7, 'S': 8, 'E': 9, 'R': 10, 'T': 11, 'I': 12, 'D': 13, 'P': 14, 'K': 15, 'Q': 16, 'N': 17, 'F': 18, 'Y': 19, 'M': 20, 'H': 21, 'W': 22, 'C': 23, 'X': 24, 'B': 25, 'U': 26, 'Z': 27, 'O': 28, '.': 29, '-': 30, '<null_1>': 31, '<mask>': 32} | |
| # reference_list = [] | |
| # for k in pmt.keys(): | |
| # if k in ptt.keys(): | |
| # reference_list.append(ptt[k]) | |
| # elif k == '<eos>': | |
| # reference_list.append(1) | |
| # else: | |
| # reference_list.append(2) | |
| # pdb.set_trace() | |
| conversion_tensor = torch.tensor([2,0,1,2,4,3,5,6,7,9,8,11,12,10,13,14,16,17,15,18,19,20,21,22,23,24,26,27,25,2,2,2,2]).to(torch.device('cuda')) | |
| # pdb.set_trace() | |
| T5_translation = torch.einsum('j,ijk->ik', conversion_tensor.float(), batch_one_hot) | |
| T5_translation = F.pad(T5_translation, pad=(0, 1), mode='constant', value=1) | |
| #TODO: add the special tokens for the model, use batch['lengths'] to learn where to put it | |
| return T5_translation | |
| def forward(self, batch): #batch example 32x33x395 (batch_size x ProteinMPNN vocab size x seq length) | |
| batch_one_hot = self.logit_transform(batch, tau=self.gumbel_temperature, hard=True, dim=1) | |
| batch_token_ids = self.translate_to_model_vocab(batch_one_hot) | |
| inputs = batch_token_ids.to(self.device).int() | |
| # pdb.set_trace() | |
| # mask = batch['mask'].to(self.device) | |
| outputs = self.model(inputs) #TODO?: pass the mask as well (take it from the batch, pad it for the end of sequence, convert to Tensor) | |
| predicted_bfactors = outputs.logits | |
| return {'predicted_normalized_bfactors':predicted_bfactors} | |