maotao / model /model_downstream.py
julse's picture
upload AA2CDS
4707555 verified
import torch
from torch import nn
from .LMConfig import LMConfig
from typing import Any, Optional, Tuple, List
from .model_ribo import MiniMindLM
class ConvNetCodon(nn.Module):
def __init__(self,
in_dim:int,
hid_dim: int,
out_dim: int,
dropout: float = 0.):
super(ConvNetCodon, self).__init__()
CovTransformer_layers = 3
self.nodes = hid_dim
self.dropout = nn.Dropout(dropout, inplace=True)
self.relu = nn.ReLU()
self.flatten = nn.Flatten()
# 处理实验来源信息的线性层
# self.experiment_dense = nn.Linear(2, self.nodes) # 处理 one-hot 实验指示符
self.linear = nn.Linear(in_features = in_dim*6, out_features = self.nodes)
self.linear_2 = nn.Linear(in_features = self.nodes, out_features = self.nodes * 4)
self.linear_3 = nn.Linear(in_features = self.nodes * 4, out_features = self.nodes)
self.output = nn.Linear(in_features = self.nodes, out_features = out_dim)
def forward(self,x,self_attn_padding_mask=None): # [1, 52, 256]
# 调用父类的forward方法获取基础模型的输出
# print('x.shape',x.shape)
# Select frames corresponding to frame 1, frame 2, and frame 3
frame_1 = x[:, 0::3, :]
frame_2 = x[:, 1::3, :]
frame_3 = x[:, 2::3, :]
# 全局最大池化
frame_1_max = torch.max(frame_1, dim=1)[0] # B*C
frame_2_max = torch.max(frame_2, dim=1)[0] # B*C
frame_3_max = torch.max(frame_3, dim=1)[0] # B*C
# 扩展 self_attn_padding_mask 的维度以匹配特征张量
# mask_expanded = ~self_attn_padding_mask.unsqueeze(2) # (batch_size, seq_len, 1),True 表示有效数据
# 全局均值池化
frame_1_avg = torch.mean(frame_1, dim=1) # B*C
frame_2_avg = torch.mean(frame_2, dim=1) # B*C
frame_3_avg = torch.mean(frame_3, dim=1)
# # 计算有效位置的均值池化
# def masked_mean(frame, mask):
# frame_sum = torch.sum(frame * mask, dim=1)
# mask_sum = torch.sum(mask, dim=1) + 1e-8 # 避免除零
# return frame_sum / mask_sum
# frame_1_avg = masked_mean(frame_1, mask_expanded[:, 0::3, :])
# frame_2_avg = masked_mean(frame_2, mask_expanded[:, 1::3, :])
# frame_3_avg = masked_mean(frame_3, mask_expanded[:, 2::3, :])
# 将池化后的张量拼接为一个张量
pooled_output = torch.cat([frame_1_max, frame_1_avg, frame_2_max, frame_2_avg, frame_3_max, frame_3_avg], dim=1) # B*(6*C)
x_pooled = self.flatten(pooled_output)
# 线性层处理实验指示符
# experiment_output = self.experiment_dense(experiment_indicator)
# o_linear = self.linear(x_pooled) + experiment_output #将池化输出与实验信息拼接
# print('x_pooled',x_pooled.shape)
# print('self.linear.weight.shape',self.linear.weight.shape)
o_linear = self.linear(x_pooled)
o_linear_2 = self.linear_2(o_linear)
o_linear_3 = self.linear_3(o_linear_2)
o_relu = self.relu(o_linear_3)
o_dropout = self.dropout(o_relu)
o = self.output(o_dropout) # B*1
return o
class MiniMindLMForRegression(MiniMindLM):
def __init__(self, params: LMConfig = None, output_dim=1):
super().__init__(params)
# 禁用或忽略原有的分类头
# 添加新的回归头
self.regression_head = ConvNetCodon(256,128,output_dim)
# self.regression_head = nn.Linear(params.vocab_size, output_dim)
def forward(self,
input_ids: Optional[torch.Tensor] = None,
twod_tokens: Optional[torch.Tensor] = None,
past_key_values: Optional[List[Tuple[torch.Tensor, torch.Tensor]]] = None,
use_cache: bool = False,
**args):
# 调用父类的forward方法获取基础模型的输出
# print(input_ids.shape)
base_output = super().forward(input_ids=input_ids, twod_tokens=twod_tokens,
past_key_values=past_key_values, use_cache=use_cache, **args)
sentence_representation = base_output.embeddings
# print('regression_head.weight.shape',self.regression_head.linear.weight.shape) #[128, 1536]
# 应用回归头
regression_output = self.regression_head(sentence_representation)
# print('sentence_representation.mean',sentence_representation.mean(dim=(1,2)).reshape(-1,1).shape)
# print('regression_output.shape',regression_output.shape)
# 返回回归任务的结果
return {
'te': regression_output, # 回归任务的预测结果
'aux_loss': base_output.aux_loss, # 如果有的话,辅助损失
'past_key_values': base_output.past_key_values, # 过去的关键值对
'zero_shot':sentence_representation.mean(dim=(1,2)).reshape(-1,1) # 零样本学习的结果
}