TransHLA2.0-IM / modeling_transhla2.py
SkywalkerLu's picture
Update modeling_transhla2.py
3330148 verified
import torch
import torch.nn as nn
from transformers import PreTrainedModel, PretrainedConfig
from peft import LoraConfig, get_peft_model, TaskType
from transformers import EsmModel
class TransHLA2Config(PretrainedConfig):
model_type = "transhla2"
def __init__(
self,
d_model=480,
n_layers=4,
n_head=8,
d_ff=64,
cnn_num_channel=256,
region_embedding_size=3,
cnn_kernel_size=3,
cnn_padding_size=1,
cnn_stride=1,
pooling_size=2,
esm_model_name="facebook/esm2_t12_35M_UR50D",
lora_r=8,
lora_alpha=32,
lora_dropout=0.1,
lora_inference_mode=False,
target_modules=None,
return_prob=True, # 是否在 forward 返回概率(softmax),否则返回 logits
pad_token_id=1, # ESM 默认 pad id
**kwargs,
):
super().__init__(**kwargs)
self.d_model = d_model
self.n_layers = n_layers
self.n_head = n_head
self.d_ff = d_ff
self.cnn_num_channel = cnn_num_channel
self.region_embedding_size = region_embedding_size
self.cnn_kernel_size = cnn_kernel_size
self.cnn_padding_size = cnn_padding_size
self.cnn_stride = cnn_stride
self.pooling_size = pooling_size
self.esm_model_name = esm_model_name
self.lora_r = lora_r
self.lora_alpha = lora_alpha
self.lora_dropout = lora_dropout
self.lora_inference_mode = lora_inference_mode
self.target_modules = target_modules or ['query', 'out_proj', 'value', 'key', 'dense', 'regression']
self.return_prob = return_prob
self.pad_token_id = pad_token_id
class TransHLA2(PreTrainedModel):
config_class = TransHLA2Config
def __init__(self, config: TransHLA2Config):
super().__init__(config)
self.config = config
d_model = config.d_model
n_layers = config.n_layers
n_head = config.n_head
d_ff = config.d_ff
cnn_num_channel = config.cnn_num_channel
region_embedding_size = config.region_embedding_size
cnn_kernel_size = config.cnn_kernel_size
cnn_padding_size = config.cnn_padding_size
cnn_stride = config.cnn_stride
pooling_size = config.pooling_size
# Backbone + LoRA
self.esm = EsmModel.from_pretrained(config.esm_model_name)
self.peft_config = LoraConfig(
target_modules=config.target_modules,
task_type=TaskType.FEATURE_EXTRACTION,
inference_mode=config.lora_inference_mode,
r=config.lora_r,
lora_alpha=config.lora_alpha,
lora_dropout=config.lora_dropout,
)
# 两套 LoRA 头,分别用于 epitope 和 hla 分支
self.epitope_lora = get_peft_model(self.esm, self.peft_config)
self.hla_lora = get_peft_model(self.esm, self.peft_config)
# CNN branches
self.region_cnn1 = nn.Conv1d(d_model, cnn_num_channel, region_embedding_size)
self.region_cnn2 = nn.Conv1d(d_model, cnn_num_channel, region_embedding_size)
self.padding1 = nn.ConstantPad1d((1, 1), 0)
self.padding2 = nn.ConstantPad1d((0, 1), 0)
self.relu = nn.SiLU()
self.cnn1 = nn.Conv1d(
cnn_num_channel, cnn_num_channel,
kernel_size=cnn_kernel_size, padding=cnn_padding_size, stride=cnn_stride
)
self.cnn2 = nn.Conv1d(
cnn_num_channel, cnn_num_channel,
kernel_size=cnn_kernel_size, padding=cnn_padding_size, stride=cnn_stride
)
self.maxpooling = nn.MaxPool1d(kernel_size=pooling_size)
# Transformer encoders (expect shape [S, B, D])
self.epitope_transformer_layers = nn.TransformerEncoderLayer(
d_model=d_model, nhead=n_head, dim_feedforward=d_ff, dropout=0.2, batch_first=False
)
self.epitope_transformer_encoder = nn.TransformerEncoder(
self.epitope_transformer_layers, num_layers=n_layers
)
self.hla_transformer_layers = nn.TransformerEncoderLayer(
d_model=d_model, nhead=n_head, dim_feedforward=d_ff, dropout=0.2, batch_first=False
)
self.hla_transformer_encoder = nn.TransformerEncoder(
self.hla_transformer_layers, num_layers=n_layers
)
# Cross Attention layers (expect [S, B, D])
self.cross_attention_epitope_layers = nn.ModuleList(
[nn.MultiheadAttention(d_model, n_head, dropout=0.2, batch_first=False) for _ in range(4)]
)
self.cross_attention_hla_layers = nn.ModuleList(
[nn.MultiheadAttention(d_model, n_head, dropout=0.2, batch_first=False) for _ in range(4)]
)
self.bn1 = nn.BatchNorm1d(cnn_num_channel)
self.bn2 = nn.BatchNorm1d(cnn_num_channel)
fused_dim = 2 * d_model + 2 * cnn_num_channel
hidden_dim = 2 * (d_model + cnn_num_channel) // 4
self.fc_task = nn.Sequential(
nn.Linear(fused_dim, hidden_dim),
nn.BatchNorm1d(hidden_dim),
nn.Dropout(0.2),
nn.SiLU(),
nn.Linear(hidden_dim, 96),
nn.BatchNorm1d(96),
)
self.classifier = nn.Linear(96, 2)
def cnn_block1(self, x):
# x: (B, C, L)
return self.cnn1(self.relu(x))
def cnn_block2(self, x):
# x: (B, C, L)
x = self.padding2(x) # pad right by 1
px = self.maxpooling(x) # downsample
x = self.relu(px)
x = self.cnn1(x)
x = self.relu(x)
x = self.cnn1(x)
x = px + x
return x
def structure_block1(self, x):
return self.cnn2(self.relu(x))
def structure_block2(self, x):
x = self.padding2(x)
px = self.maxpooling(x)
x = self.relu(px)
x = self.cnn2(x)
x = self.relu(x)
x = self.cnn2(x)
x = px + x
return x
def _ensure_mapping_input(self, x):
# 允许两种输入形式:
# 1) 字典: {"input_ids": ..., "attention_mask": ...}
# 2) 直接的 input_ids 张量: (B, L)
if isinstance(x, torch.Tensor):
# 仅用 input_ids;如需自动构造 attention_mask,可解除注释:
# pad_id = self.config.pad_token_id
# return {"input_ids": x, "attention_mask": (x != pad_id).long()}
return {"input_ids": x}
elif isinstance(x, dict):
return x
else:
raise TypeError(f"Unsupported input type: {type(x)}; expected Tensor or dict.")
def forward(self, epitope_in, hla_in, return_dict=None):
# 兼容张量或字典输入
epitope_in = self._ensure_mapping_input(epitope_in)
hla_in = self._ensure_mapping_input(hla_in)
epitope_outputs = self.epitope_lora(**epitope_in)
hla_outputs = self.hla_lora(**hla_in)
# last_hidden_state: (B, L, D)
epitope_emb = epitope_outputs.last_hidden_state
hla_emb = hla_outputs.last_hidden_state
# Transformer encoder path (expects [S, B, D])
epitope_trans = self.epitope_transformer_encoder(epitope_emb.transpose(0, 1)) # (L, B, D)
hla_trans = self.hla_transformer_encoder(hla_emb.transpose(0, 1)) # (L, B, D)
# Cross Attention
for ca_e, ca_h in zip(self.cross_attention_epitope_layers, self.cross_attention_hla_layers):
epitope_trans, _ = ca_e(epitope_trans, hla_trans, hla_trans) # (L, B, D)
hla_trans, _ = ca_h(hla_trans, epitope_trans, epitope_trans) # (L, B, D)
# Mean Pooling over sequence length
epitope_mean = epitope_trans.mean(dim=0) # (B, D)
hla_mean = hla_trans.mean(dim=0) # (B, D)
# CNN branches expect (B, C, L). Convert ESM embeddings to (B, D, L)
epitope_cnn_emb = epitope_emb.transpose(1, 2) # (B, D, L)
epitope_cnn_emb = self.region_cnn1(epitope_cnn_emb) # (B, C, L')
epitope_cnn_emb = self.padding1(epitope_cnn_emb)
conv = epitope_cnn_emb + self.cnn_block1(self.cnn_block1(epitope_cnn_emb))
# 迭代收缩长度直到 < 2
while conv.size(-1) >= 2:
conv = self.cnn_block2(conv)
epitope_cnn_out = torch.squeeze(conv, dim=-1) # (B, C)
epitope_cnn_out = self.bn1(epitope_cnn_out)
hla_cnn_emb = hla_emb.transpose(1, 2) # (B, D, L)
hla_cnn_emb = self.region_cnn2(hla_cnn_emb) # (B, C, L')
hla_cnn_emb = self.padding1(hla_cnn_emb)
hla_conv = hla_cnn_emb + self.structure_block1(self.structure_block1(hla_cnn_emb))
while hla_conv.size(-1) >= 2:
hla_conv = self.structure_block2(hla_conv)
hla_cnn_out = torch.squeeze(hla_conv, dim=-1) # (B, C)
hla_cnn_out = self.bn2(hla_cnn_out)
# Fuse and classify
representation = torch.cat((epitope_mean, hla_mean, epitope_cnn_out, hla_cnn_out), dim=1) # (B, 2D+2C)
features = self.fc_task(representation) # (B, 96)
logits = self.classifier(features) # (B, 2)
if self.config.return_prob:
probs = torch.softmax(logits, dim=1)
return probs, representation
else:
return logits, representation