import torch from torch import nn from .LMConfig import LMConfig from typing import Any, Optional, Tuple, List from .model_ribo import MiniMindLM class ConvNetCodon(nn.Module): def __init__(self, in_dim:int, hid_dim: int, out_dim: int, dropout: float = 0.): super(ConvNetCodon, self).__init__() CovTransformer_layers = 3 self.nodes = hid_dim self.dropout = nn.Dropout(dropout, inplace=True) self.relu = nn.ReLU() self.flatten = nn.Flatten() # 处理实验来源信息的线性层 # self.experiment_dense = nn.Linear(2, self.nodes) # 处理 one-hot 实验指示符 self.linear = nn.Linear(in_features = in_dim*6, out_features = self.nodes) self.linear_2 = nn.Linear(in_features = self.nodes, out_features = self.nodes * 4) self.linear_3 = nn.Linear(in_features = self.nodes * 4, out_features = self.nodes) self.output = nn.Linear(in_features = self.nodes, out_features = out_dim) def forward(self,x,self_attn_padding_mask=None): # [1, 52, 256] # 调用父类的forward方法获取基础模型的输出 # print('x.shape',x.shape) # Select frames corresponding to frame 1, frame 2, and frame 3 frame_1 = x[:, 0::3, :] frame_2 = x[:, 1::3, :] frame_3 = x[:, 2::3, :] # 全局最大池化 frame_1_max = torch.max(frame_1, dim=1)[0] # B*C frame_2_max = torch.max(frame_2, dim=1)[0] # B*C frame_3_max = torch.max(frame_3, dim=1)[0] # B*C # 扩展 self_attn_padding_mask 的维度以匹配特征张量 # mask_expanded = ~self_attn_padding_mask.unsqueeze(2) # (batch_size, seq_len, 1),True 表示有效数据 # 全局均值池化 frame_1_avg = torch.mean(frame_1, dim=1) # B*C frame_2_avg = torch.mean(frame_2, dim=1) # B*C frame_3_avg = torch.mean(frame_3, dim=1) # # 计算有效位置的均值池化 # def masked_mean(frame, mask): # frame_sum = torch.sum(frame * mask, dim=1) # mask_sum = torch.sum(mask, dim=1) + 1e-8 # 避免除零 # return frame_sum / mask_sum # frame_1_avg = masked_mean(frame_1, mask_expanded[:, 0::3, :]) # frame_2_avg = masked_mean(frame_2, mask_expanded[:, 1::3, :]) # frame_3_avg = masked_mean(frame_3, mask_expanded[:, 2::3, :]) # 将池化后的张量拼接为一个张量 pooled_output = torch.cat([frame_1_max, frame_1_avg, frame_2_max, frame_2_avg, frame_3_max, frame_3_avg], dim=1) # B*(6*C) x_pooled = self.flatten(pooled_output) # 线性层处理实验指示符 # experiment_output = self.experiment_dense(experiment_indicator) # o_linear = self.linear(x_pooled) + experiment_output #将池化输出与实验信息拼接 # print('x_pooled',x_pooled.shape) # print('self.linear.weight.shape',self.linear.weight.shape) o_linear = self.linear(x_pooled) o_linear_2 = self.linear_2(o_linear) o_linear_3 = self.linear_3(o_linear_2) o_relu = self.relu(o_linear_3) o_dropout = self.dropout(o_relu) o = self.output(o_dropout) # B*1 return o class MiniMindLMForRegression(MiniMindLM): def __init__(self, params: LMConfig = None, output_dim=1): super().__init__(params) # 禁用或忽略原有的分类头 # 添加新的回归头 self.regression_head = ConvNetCodon(256,128,output_dim) # self.regression_head = nn.Linear(params.vocab_size, output_dim) def forward(self, input_ids: Optional[torch.Tensor] = None, twod_tokens: Optional[torch.Tensor] = None, past_key_values: Optional[List[Tuple[torch.Tensor, torch.Tensor]]] = None, use_cache: bool = False, **args): # 调用父类的forward方法获取基础模型的输出 # print(input_ids.shape) base_output = super().forward(input_ids=input_ids, twod_tokens=twod_tokens, past_key_values=past_key_values, use_cache=use_cache, **args) sentence_representation = base_output.embeddings # print('regression_head.weight.shape',self.regression_head.linear.weight.shape) #[128, 1536] # 应用回归头 regression_output = self.regression_head(sentence_representation) # print('sentence_representation.mean',sentence_representation.mean(dim=(1,2)).reshape(-1,1).shape) # print('regression_output.shape',regression_output.shape) # 返回回归任务的结果 return { 'te': regression_output, # 回归任务的预测结果 'aux_loss': base_output.aux_loss, # 如果有的话,辅助损失 'past_key_values': base_output.past_key_values, # 过去的关键值对 'zero_shot':sentence_representation.mean(dim=(1,2)).reshape(-1,1) # 零样本学习的结果 }