Spaces:
Sleeping
Sleeping
File size: 7,580 Bytes
bd082dc |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 |
import argparse, sys
from typing import Optional, Tuple
from pathlib import Path
import esm
import os
import torch
import numpy as np
import re
from Bio import SeqIO
from torch.utils.data import Dataset, DataLoader
import lightning.pytorch as pl
from protobind_diff.model import ModelGenerator
from protobind_diff.data_loader import InferenceDataset
from huggingface_hub import hf_hub_download
REPO_ID = "ai-gero/ProtoBind-Diff"
FILENAME = "model.ckpt"
TOKENIZER_FILENAME = "tokenizer_smiles_diffusion.json"
class ProtobindInference():
"""
Simplified inference class that only supports ProtobindMaskedDiffusion model.
"""
def __init__(self, checkpoint_path, tokenizer_path,
sequence_embedding_dim, lig_max_length: int=170, nucleus_p: float=0.9,
eta: float=0.1, sampling_steps: int=250,
**kwargs):
self.checkpoint_path = Path(checkpoint_path)
self.tokenizer_path = Path(tokenizer_path)
self.sequence_embedding_dim = sequence_embedding_dim
# Set up sampler params
self.lig_max_length = lig_max_length
self.nucleus_p = nucleus_p
self.eta = eta
self.sampling_steps = sampling_steps
# Load model
self.model = self.load_model()
def predict_on_dataloader(self, dl, devices=1, accelerator='cuda') -> Tuple[np.ndarray, np.ndarray]:
if accelerator == 'cuda':
torch.set_float32_matmul_precision('medium')
precision = "16-mixed"
else:
precision = "32-true"
trainer = pl.Trainer(precision=precision, use_distributed_sampler=False,
inference_mode=True, accelerator=accelerator, devices=devices)
predictions_batches = trainer.predict(model=self.model, dataloaders=dl)
return predictions_batches
def load_model(self):
"""Simplified model loading - only supports ModelGenerator"""
model = ModelGenerator.load_from_checkpoint(
self.checkpoint_path,
tokenizer_path=self.tokenizer_path,
seq_embedding_dim=self.sequence_embedding_dim,
load=True,
)
model.model_length = self.lig_max_length
model.nucleus_p = self.nucleus_p
model.eta = self.eta
model.sampling_steps = self.sampling_steps
model.model.eval()
return model
def get_esm_embedding(sequence: str, model_name: str, device: torch.device) -> torch.Tensor:
"""Generates a protein embedding using a pre-trained ESM model.
Args:
sequence (str): The amino acid sequence.
model_name (str): The name of the ESM model to use.
device (torch.device): The device to run the model on.
Returns:
torch.Tensor: The final residue-level embedding tensor, with start/end tokens removed.
"""
model, alphabet = esm.pretrained.load_model_and_alphabet(model_name)
model.eval()
number_layers = re.search(r'_t(\d+)_', model_name)
number_layers = int(number_layers.group(1))
model = model.to(device)
batch_converter = alphabet.get_batch_converter()
_, _, tokens = batch_converter([("protein", sequence)])
tokens = tokens.to(device)
with torch.no_grad():
out = model(tokens, repr_layers=[number_layers])
return out["representations"][number_layers][:, 1:-1, :] # [1, seq_len, emb_dim]
def download_from_hub_hf(cache: Path, filename) -> Path:
"""
Fetch file from Hugging Face into `cache`.
Returns the local path to the file inside HF’s cache structure.
"""
cache.mkdir(parents=True, exist_ok=True)
local_path = hf_hub_download(
repo_id=REPO_ID,
filename=filename,
cache_dir=cache,
)
return Path(local_path)
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--sequence", help="Amino acid sequence (1-letter code)")
parser.add_argument("--output_dir", default="./outputs", help="Output dir for SMILES")
parser.add_argument("--output", default="generated_smiles.txt", help="Output file for generated SMILES")
parser.add_argument("--n_batches", type=int, default=5, help="Number of batches to generate for this sequence")
parser.add_argument("--batch_size", type=int, default=10, help="Max number of generated molecules per batch")
parser.add_argument("--fasta_file", default="./examples/input.fasta", help="Input FASTA file")
parser.add_argument("--checkpoint_path", type=str, help="Path to the model checkpoint")
parser.add_argument('--model_name', type=str, default='esm2_t33_650M_UR50D',
help="ESM model name. See https://github.com/facebookresearch/esm")
parser.add_argument('--tokenizer_path', help='Path to tokenizer.json file. If not provided, uses a default path and downloads if needed.')
parser.add_argument('--cache', type=str, default = "./cache", help='Cache folder for ckpt')
parser.add_argument("--sampling_steps", type=int, default=250, help="Number of steps during sampling")
parser.add_argument("--lig_max_length", type=int, default=170, help="Max length of generated molecules")
parser.add_argument("--nucleus_p", type=float, default=0.9,
help="Value of the nucleus sampling parameter. For more details, see https://arxiv.org/abs/2503.00307")
parser.add_argument("--eta", type=float, default=0.1,
help="Value of the probability of remasking. For more details, see https://arxiv.org/abs/2503.00307")
args = parser.parse_args()
if args.fasta_file:
sequence = str(next(SeqIO.parse(args.fasta_file, "fasta")).seq)
elif args.sequence:
sequence = args.sequence.strip().upper()
else:
sys.exit("Error: provide --sequence of --fasta_file")
if args.checkpoint_path:
ckpt_path = Path(args.checkpoint_path)
else:
torch.hub.set_dir(args.cache) # for ESM model
ckpt_path = download_from_hub_hf(Path(args.cache), FILENAME)
if args.tokenizer_path:
tokenizer_path = Path(args.tokenizer_path)
if not tokenizer_path.exists():
sys.exit(f"Error: Tokenizer file not found at specified path: {tokenizer_path}")
else:
tokenizer_path = download_from_hub_hf(Path(args.cache), TOKENIZER_FILENAME)
# Determine the device
if torch.cuda.is_available():
device = torch.device("cuda") # Use CUDA if available
elif torch.backends.mps.is_available():
device = torch.device("mps") # Use MPS for Apple Silicon if available
else:
device = torch.device("cpu") # Fallback to CPU
embedding = get_esm_embedding(sequence, args.model_name, device).to(dtype=torch.bfloat16)
sequence_embedding_dim = embedding.shape[2]
dataset = InferenceDataset(embedding, batch_size=args.batch_size, n_batches=args.n_batches)
loader = DataLoader(dataset, batch_size=None)
model = ProtobindInference(ckpt_path, tokenizer_path, sequence_embedding_dim,
sampling_steps=args.sampling_steps, nucleus_p=args.nucleus_p,
eta=args.eta, lig_max_length=args.lig_max_length,)
predictions = model.predict_on_dataloader(loader, accelerator=str(device))
all_smiles = [smi for batch in predictions for smi in batch[0]]
out_dir = Path(args.output_dir)
os.makedirs(out_dir, exist_ok=True)
with open(out_dir / args.output, "w") as f:
f.write("SMILES\n")
for smi in all_smiles:
f.write(smi + "\n")
if __name__ == "__main__":
main()
|