import torch import torch.nn as nn from transformers import PreTrainedModel, PretrainedConfig from peft import LoraConfig, get_peft_model, TaskType from transformers import EsmModel class TransHLA2Config(PretrainedConfig): model_type = "transhla2" def __init__( self, d_model=480, n_layers=4, n_head=8, d_ff=64, cnn_num_channel=256, region_embedding_size=3, cnn_kernel_size=3, cnn_padding_size=1, cnn_stride=1, pooling_size=2, esm_model_name="facebook/esm2_t12_35M_UR50D", lora_r=8, lora_alpha=32, lora_dropout=0.1, lora_inference_mode=False, target_modules=None, return_prob=True, # 是否在 forward 返回概率(softmax),否则返回 logits pad_token_id=1, # ESM 默认 pad id **kwargs, ): super().__init__(**kwargs) self.d_model = d_model self.n_layers = n_layers self.n_head = n_head self.d_ff = d_ff self.cnn_num_channel = cnn_num_channel self.region_embedding_size = region_embedding_size self.cnn_kernel_size = cnn_kernel_size self.cnn_padding_size = cnn_padding_size self.cnn_stride = cnn_stride self.pooling_size = pooling_size self.esm_model_name = esm_model_name self.lora_r = lora_r self.lora_alpha = lora_alpha self.lora_dropout = lora_dropout self.lora_inference_mode = lora_inference_mode self.target_modules = target_modules or ['query', 'out_proj', 'value', 'key', 'dense', 'regression'] self.return_prob = return_prob self.pad_token_id = pad_token_id class TransHLA2(PreTrainedModel): config_class = TransHLA2Config def __init__(self, config: TransHLA2Config): super().__init__(config) self.config = config d_model = config.d_model n_layers = config.n_layers n_head = config.n_head d_ff = config.d_ff cnn_num_channel = config.cnn_num_channel region_embedding_size = config.region_embedding_size cnn_kernel_size = config.cnn_kernel_size cnn_padding_size = config.cnn_padding_size cnn_stride = config.cnn_stride pooling_size = config.pooling_size # Backbone + LoRA self.esm = EsmModel.from_pretrained(config.esm_model_name) self.peft_config = LoraConfig( target_modules=config.target_modules, task_type=TaskType.FEATURE_EXTRACTION, inference_mode=config.lora_inference_mode, r=config.lora_r, lora_alpha=config.lora_alpha, lora_dropout=config.lora_dropout, ) # 两套 LoRA 头,分别用于 epitope 和 hla 分支 self.epitope_lora = get_peft_model(self.esm, self.peft_config) self.hla_lora = get_peft_model(self.esm, self.peft_config) # CNN branches self.region_cnn1 = nn.Conv1d(d_model, cnn_num_channel, region_embedding_size) self.region_cnn2 = nn.Conv1d(d_model, cnn_num_channel, region_embedding_size) self.padding1 = nn.ConstantPad1d((1, 1), 0) self.padding2 = nn.ConstantPad1d((0, 1), 0) self.relu = nn.SiLU() self.cnn1 = nn.Conv1d( cnn_num_channel, cnn_num_channel, kernel_size=cnn_kernel_size, padding=cnn_padding_size, stride=cnn_stride ) self.cnn2 = nn.Conv1d( cnn_num_channel, cnn_num_channel, kernel_size=cnn_kernel_size, padding=cnn_padding_size, stride=cnn_stride ) self.maxpooling = nn.MaxPool1d(kernel_size=pooling_size) # Transformer encoders (expect shape [S, B, D]) self.epitope_transformer_layers = nn.TransformerEncoderLayer( d_model=d_model, nhead=n_head, dim_feedforward=d_ff, dropout=0.2, batch_first=False ) self.epitope_transformer_encoder = nn.TransformerEncoder( self.epitope_transformer_layers, num_layers=n_layers ) self.hla_transformer_layers = nn.TransformerEncoderLayer( d_model=d_model, nhead=n_head, dim_feedforward=d_ff, dropout=0.2, batch_first=False ) self.hla_transformer_encoder = nn.TransformerEncoder( self.hla_transformer_layers, num_layers=n_layers ) # Cross Attention layers (expect [S, B, D]) self.cross_attention_epitope_layers = nn.ModuleList( [nn.MultiheadAttention(d_model, n_head, dropout=0.2, batch_first=False) for _ in range(4)] ) self.cross_attention_hla_layers = nn.ModuleList( [nn.MultiheadAttention(d_model, n_head, dropout=0.2, batch_first=False) for _ in range(4)] ) self.bn1 = nn.BatchNorm1d(cnn_num_channel) self.bn2 = nn.BatchNorm1d(cnn_num_channel) fused_dim = 2 * d_model + 2 * cnn_num_channel hidden_dim = 2 * (d_model + cnn_num_channel) // 4 self.fc_task = nn.Sequential( nn.Linear(fused_dim, hidden_dim), nn.BatchNorm1d(hidden_dim), nn.Dropout(0.2), nn.SiLU(), nn.Linear(hidden_dim, 96), nn.BatchNorm1d(96), ) self.classifier = nn.Linear(96, 2) def cnn_block1(self, x): # x: (B, C, L) return self.cnn1(self.relu(x)) def cnn_block2(self, x): # x: (B, C, L) x = self.padding2(x) # pad right by 1 px = self.maxpooling(x) # downsample x = self.relu(px) x = self.cnn1(x) x = self.relu(x) x = self.cnn1(x) x = px + x return x def structure_block1(self, x): return self.cnn2(self.relu(x)) def structure_block2(self, x): x = self.padding2(x) px = self.maxpooling(x) x = self.relu(px) x = self.cnn2(x) x = self.relu(x) x = self.cnn2(x) x = px + x return x def _ensure_mapping_input(self, x): # 允许两种输入形式: # 1) 字典: {"input_ids": ..., "attention_mask": ...} # 2) 直接的 input_ids 张量: (B, L) if isinstance(x, torch.Tensor): # 仅用 input_ids;如需自动构造 attention_mask,可解除注释: # pad_id = self.config.pad_token_id # return {"input_ids": x, "attention_mask": (x != pad_id).long()} return {"input_ids": x} elif isinstance(x, dict): return x else: raise TypeError(f"Unsupported input type: {type(x)}; expected Tensor or dict.") def forward(self, epitope_in, hla_in, return_dict=None): # 兼容张量或字典输入 epitope_in = self._ensure_mapping_input(epitope_in) hla_in = self._ensure_mapping_input(hla_in) epitope_outputs = self.epitope_lora(**epitope_in) hla_outputs = self.hla_lora(**hla_in) # last_hidden_state: (B, L, D) epitope_emb = epitope_outputs.last_hidden_state hla_emb = hla_outputs.last_hidden_state # Transformer encoder path (expects [S, B, D]) epitope_trans = self.epitope_transformer_encoder(epitope_emb.transpose(0, 1)) # (L, B, D) hla_trans = self.hla_transformer_encoder(hla_emb.transpose(0, 1)) # (L, B, D) # Cross Attention for ca_e, ca_h in zip(self.cross_attention_epitope_layers, self.cross_attention_hla_layers): epitope_trans, _ = ca_e(epitope_trans, hla_trans, hla_trans) # (L, B, D) hla_trans, _ = ca_h(hla_trans, epitope_trans, epitope_trans) # (L, B, D) # Mean Pooling over sequence length epitope_mean = epitope_trans.mean(dim=0) # (B, D) hla_mean = hla_trans.mean(dim=0) # (B, D) # CNN branches expect (B, C, L). Convert ESM embeddings to (B, D, L) epitope_cnn_emb = epitope_emb.transpose(1, 2) # (B, D, L) epitope_cnn_emb = self.region_cnn1(epitope_cnn_emb) # (B, C, L') epitope_cnn_emb = self.padding1(epitope_cnn_emb) conv = epitope_cnn_emb + self.cnn_block1(self.cnn_block1(epitope_cnn_emb)) # 迭代收缩长度直到 < 2 while conv.size(-1) >= 2: conv = self.cnn_block2(conv) epitope_cnn_out = torch.squeeze(conv, dim=-1) # (B, C) epitope_cnn_out = self.bn1(epitope_cnn_out) hla_cnn_emb = hla_emb.transpose(1, 2) # (B, D, L) hla_cnn_emb = self.region_cnn2(hla_cnn_emb) # (B, C, L') hla_cnn_emb = self.padding1(hla_cnn_emb) hla_conv = hla_cnn_emb + self.structure_block1(self.structure_block1(hla_cnn_emb)) while hla_conv.size(-1) >= 2: hla_conv = self.structure_block2(hla_conv) hla_cnn_out = torch.squeeze(hla_conv, dim=-1) # (B, C) hla_cnn_out = self.bn2(hla_cnn_out) # Fuse and classify representation = torch.cat((epitope_mean, hla_mean, epitope_cnn_out, hla_cnn_out), dim=1) # (B, 2D+2C) features = self.fc_task(representation) # (B, 96) logits = self.classifier(features) # (B, 2) if self.config.return_prob: probs = torch.softmax(logits, dim=1) return probs, representation else: return logits, representation