RNARL / utils.py
SII-ShenggengLin's picture
update utils
b11db1e
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import math
from torch.utils.data.distributed import DistributedSampler
import torch.optim.lr_scheduler as lr_scheduler
from transformer_encoder_MoE import Encoder,Encoder_nomoe
from itertools import chain
from torch.nn.parallel import parallel_apply
from typing import List, Dict, Tuple, Optional, Union
class Tokenizer:
"""处理序列编码和解码的分词器,支持蛋白质序列和mRNA序列。"""
def __init__(self):
# 定义特殊标记和生物序列标记
self.special_tokens = ['[START]', '[END]', '[PAD]', '[UNK]', '[SEG]']
self.amino_acids = ['A', 'R', 'S', 'I', 'L', 'G', 'V', 'T', 'P', 'N',
'D', 'C', 'Q', 'E', 'H', 'K', 'F', 'Y', 'M', 'W', '*']
self.protein_alphabet = ['A', 'R', 'N', 'D', 'C', 'Q', 'E', 'G', 'H', 'I',
'L', 'K', 'M', 'F', 'P', 'S', 'T', 'W', 'Y', 'V']
# 生成所有可能的密码子组合
self.codons = [''.join([n1, n2, n3]) for n1 in 'UCAG' for n2 in 'UCAG' for n3 in 'UCAG']
# 合并所有标记并创建映射
self.tokens = self.special_tokens + self.amino_acids + self.codons
self.token_to_id = {token: idx for idx, token in enumerate(self.tokens)}
self.id_to_token = {idx: token for token, idx in self.token_to_id.items()}
# 缓存常用的特殊标记索引以提高性能
self.padding_idx = self.token_to_id['[PAD]']
self.start_idx = self.token_to_id['[START]']
self.end_idx = self.token_to_id['[END]']
self.unk_idx = self.token_to_id['[UNK]']
self.seg_idx = self.token_to_id['[SEG]']
def encode_pro(self, sequence: str, max_length: int) -> List[int]:
"""编码蛋白质序列。
Args:
sequence: 输入的蛋白质序列
max_length: 编码后序列的最大长度
Returns:
编码后的ID列表
"""
# 添加开始标记,并为每个字符获取ID
ids = [self.start_idx] + [self.token_to_id.get(token, self.unk_idx) for token in sequence]
# 处理序列长度并添加结束标记
if len(ids) < max_length - 1:
ids.append(self.end_idx)
else:
ids = ids[:max_length-1] + [self.end_idx]
return ids
def encode_mrna(self, sequence: str, max_length: int) -> List[int]:
"""编码mRNA序列,每三个核苷酸作为一个密码子。
Args:
sequence: 输入的mRNA序列
max_length: 编码后序列的最大长度
Returns:
编码后的ID列表
"""
ids = [self.start_idx]
# 每三个字符(一个密码子)作为一个单位处理
for i in range(0, len(sequence), 3):
codon = sequence[i:i+3]
if len(codon) == 3 and codon in self.token_to_id:
ids.append(self.token_to_id[codon])
else:
ids.append(self.unk_idx)
# 处理序列长度并添加结束标记
if len(ids) < max_length - 1:
ids.append(self.end_idx)
else:
ids = ids[:max_length-1] + [self.end_idx]
return ids
def decode(self, ids: List[int]) -> str:
"""将ID序列解码为文本。
Args:
ids: 编码后的ID列表
Returns:
解码后的文本
"""
return ''.join([self.id_to_token.get(id, '[UNK]') for id in ids])
def pad(self, ids: List[int], max_length: int) -> List[int]:
"""对序列进行填充至指定长度。
Args:
ids: 编码后的ID列表
max_length: 目标长度
Returns:
填充后的ID列表
"""
padding_length = max_length - len(ids)
if padding_length > 0:
return ids + [self.padding_idx] * padding_length
return ids
# 生成密码子表和相关映射
class BiologicalMappings:
"""生物序列编码的映射工具类。"""
@staticmethod
def get_codon_table() -> Dict[str, str]:
"""返回密码子到氨基酸的映射表。"""
return {
'GCU':'A', 'GCC':'A', 'GCA':'A', 'GCG':'A', 'CGU':'R', 'CGC':'R',
'CGA':'R', 'CGG':'R', 'AGA':'R', 'AGG':'R', 'UCU':'S', 'UCC':'S',
'UCA':'S', 'UCG':'S', 'AGU':'S', 'AGC':'S', 'AUU':'I', 'AUC':'I',
'AUA':'I', 'UUA':'L', 'UUG':'L', 'CUU':'L', 'CUC':'L', 'CUA':'L',
'CUG':'L', 'GGU':'G', 'GGC':'G', 'GGA':'G', 'GGG':'G', 'GUU':'V',
'GUC':'V', 'GUA':'V', 'GUG':'V', 'ACU':'T', 'ACC':'T', 'ACA':'T',
'ACG':'T', 'CCU':'P', 'CCC':'P', 'CCA':'P', 'CCG':'P', 'AAU':'N',
'AAC':'N', 'GAU':'D', 'GAC':'D', 'UGU':'C', 'UGC':'C', 'CAA':'Q',
'CAG':'Q', 'GAA':'E', 'GAG':'E', 'CAU':'H', 'CAC':'H', 'AAA':'K',
'AAG':'K', 'UUU':'F', 'UUC':'F', 'UAU':'Y', 'UAC':'Y', 'AUG':'M',
'UGG':'W','UAG':'*', 'UGA':'*', 'UAA':'*'}
@staticmethod
def get_amino_acid_to_codon() -> Dict[str, List[str]]:
"""返回氨基酸到密码子的映射表。"""
return {
'A':['GCU','GCC','GCA','GCG'], 'R':['CGU','CGC','CGA','CGG','AGA','AGG'],
'S':['UCU','UCC','UCA','UCG','AGU','AGC'],'I':['AUU','AUC','AUA'],
'L':['UUA','UUG','CUU','CUC','CUA','CUG'],'G':['GGU','GGC','GGA','GGG'],
'V':['GUU','GUC','GUA','GUG'],'T':['ACU','ACC','ACA','ACG'],
'P':['CCU','CCC','CCA','CCG'],'N':['AAU','AAC'],'D':['GAU','GAC'],
'C':['UGU','UGC'],'Q':['CAA','CAG'],'E':['GAA','GAG'],'H':['CAU','CAC'],
'K':['AAA','AAG'],'F':['UUU','UUC'],'Y':['UAU','UAC'],'M':['AUG'],'W':['UGG'],
'*':['UAG','UGA','UAA']
}
@staticmethod
def create_token_mapping(tokenizer: Tokenizer) -> torch.Tensor:
"""创建从密码子令牌到氨基酸令牌的映射张量。
Args:
tokenizer: 用于获取令牌到ID映射的分词器
Returns:
映射张量,索引为密码子ID,值为对应的氨基酸ID
"""
codon_table = BiologicalMappings.get_codon_table()
token_codon_to_amino_acid = torch.full((len(tokenizer.tokens),),
tokenizer.unk_idx,
dtype=torch.long)
for codon, amino_acid in codon_table.items():
codon_id = tokenizer.token_to_id.get(codon, tokenizer.unk_idx)
amino_acid_id = tokenizer.token_to_id.get(amino_acid, tokenizer.unk_idx)
token_codon_to_amino_acid[codon_id] = amino_acid_id
return token_codon_to_amino_acid
class ActorModel_encoder_noesm2(nn.Module):
"""基于编码器的Actor模型,用于序列生成任务。"""
def __init__(self, vocab_size: int, d_model: int, nhead: int,
num_encoder_layers: int, dim_feedforward: int, dropout: float,
num_experts: int, top_k_experts: int, device: torch.device):
"""初始化模型。
Args:
vocab_size: 词汇表大小
d_model: 模型维度
nhead: 注意力头数
num_encoder_layers: 编码器层数
dim_feedforward: 前馈网络维度
dropout: Dropout率
num_experts: 专家数量
top_k_experts: 使用的顶部专家数量
device: 计算设备
"""
super(ActorModel_encoder_noesm2, self).__init__()
self.device = device
# 获取生物映射并预计算掩码
self.amino_acid_to_codon = BiologicalMappings.get_amino_acid_to_codon()
self.precomputed_masks = self._precompute_masks()
# 创建编码器和输出层
self.encoder = Encoder(vocab_size, d_model, nhead, num_encoder_layers,
dim_feedforward, dropout, num_experts, top_k_experts)
# 使用序列化的输出层以提高性能
self.mrna_output_layer = nn.Sequential(
nn.Linear(d_model, d_model//2),
nn.LayerNorm(d_model//2),
nn.ReLU(),
nn.Dropout(dropout),
nn.Linear(d_model//2, vocab_size)
)
def _precompute_masks(self) -> Dict[int, torch.Tensor]:
"""预计算每个氨基酸对应的密码子掩码,以提高性能。"""
tokenizer = Tokenizer() # 创建分词器实例
masks = {}
for amino_acid, codons in self.amino_acid_to_codon.items():
amino_acid_id = tokenizer.token_to_id.get(amino_acid, tokenizer.unk_idx)
mask = torch.zeros(len(tokenizer.tokens), dtype=torch.bool, device=self.device)
for codon in codons:
codon_id = tokenizer.token_to_id.get(codon, tokenizer.unk_idx)
if codon_id != tokenizer.unk_idx:
mask[codon_id] = True
masks[amino_acid_id] = mask
return masks
def forward(self, tokenizer_encoded_proteins: torch.Tensor) -> Tuple[torch.Tensor, list, torch.Tensor]:
"""模型前向传播。
Args:
tokenizer_encoded_proteins: 编码后的蛋白质序列,形状为(batch_size, seq_len)
Returns:
logits: 输出逻辑值,表示模型预测
router_logits_list: 路由器逻辑值列表
entropy_loss: 熵损失
"""
# 创建源序列的填充掩码
tokenizer = Tokenizer() # 创建分词器实例
src_padding_mask = (tokenizer_encoded_proteins == tokenizer.padding_idx)
# 通过编码器处理
x, router_logits_list, entropy_loss = self.encoder(
tokenizer_encoded_proteins,
src_key_padding_mask=src_padding_mask
)
# 为批次中的每个项目和序列位置生成掩码
batch_size, seq_len = tokenizer_encoded_proteins.shape
# 使用索引查询预计算的掩码,通过广播优化性能
amino_acid_to_codon_mask = torch.stack([
self.precomputed_masks.get(
tok.item(),
torch.zeros(len(tokenizer.tokens), dtype=torch.bool, device=self.device)
)
for tok in tokenizer_encoded_proteins.reshape(-1)
]).view(batch_size, seq_len, -1)
# 计算输出逻辑值并应用掩码
mrna_logits = self.mrna_output_layer(x)
# 使用masking而不是scatter来提高性能
mrna_logits = mrna_logits.masked_fill(~amino_acid_to_codon_mask, -6.0e4)
return mrna_logits, router_logits_list, entropy_loss
class ActorModel_encoder_esm2(nn.Module):
"""基于编码器的Actor模型,用于序列生成任务。"""
def __init__(self, vocab_size: int, d_model: int, nhead: int,
num_encoder_layers: int, dim_feedforward: int, esm2_dim: int,dropout: float,
num_experts: int, top_k_experts: int, device: torch.device):
super(ActorModel_encoder_esm2, self).__init__()
self.device = device
# 获取生物映射并预计算掩码
self.amino_acid_to_codon = BiologicalMappings.get_amino_acid_to_codon()
self.precomputed_masks = self._precompute_masks()
self.dim_trans=nn.Linear(esm2_dim, d_model)
# 创建编码器和输出层
self.encoder = Encoder(vocab_size, d_model, nhead, num_encoder_layers,
dim_feedforward, dropout, num_experts, top_k_experts,if_embedding=False,if_pos_encoding=False)
# 使用序列化的输出层以提高性能
self.mrna_output_layer = nn.Sequential(
nn.Linear(d_model, d_model//2),
nn.LayerNorm(d_model//2),
nn.ReLU(),
nn.Dropout(dropout),
nn.Linear(d_model//2, vocab_size)
)
def _precompute_masks(self) -> Dict[int, torch.Tensor]:
"""预计算每个氨基酸对应的密码子掩码,以提高性能。"""
tokenizer = Tokenizer() # 创建分词器实例
masks = {}
for amino_acid, codons in self.amino_acid_to_codon.items():
amino_acid_id = tokenizer.token_to_id.get(amino_acid, tokenizer.unk_idx)
mask = torch.zeros(len(tokenizer.tokens), dtype=torch.bool, device=self.device)
for codon in codons:
codon_id = tokenizer.token_to_id.get(codon, tokenizer.unk_idx)
if codon_id != tokenizer.unk_idx:
mask[codon_id] = True
masks[amino_acid_id] = mask
return masks
def forward(self, tokenizer_encoded_proteins,esm2_encoded_proteins) -> Tuple[torch.Tensor, list, torch.Tensor]:
# 创建源序列的填充掩码
tokenizer = Tokenizer() # 创建分词器实例
src_padding_mask = (tokenizer_encoded_proteins == tokenizer.padding_idx)
# 通过编码器处理
x=self.dim_trans(esm2_encoded_proteins)
x, router_logits_list, entropy_loss = self.encoder(
x,
src_key_padding_mask=src_padding_mask
)
# 为批次中的每个项目和序列位置生成掩码
batch_size, seq_len = tokenizer_encoded_proteins.shape
# 使用索引查询预计算的掩码,通过广播优化性能
amino_acid_to_codon_mask = torch.stack([
self.precomputed_masks.get(
tok.item(),
torch.zeros(len(tokenizer.tokens), dtype=torch.bool, device=self.device)
)
for tok in tokenizer_encoded_proteins.reshape(-1)
]).view(batch_size, seq_len, -1)
# 计算输出逻辑值并应用掩码
mrna_logits = self.mrna_output_layer(x)
# 使用masking而不是scatter来提高性能
mrna_logits = mrna_logits.masked_fill(~amino_acid_to_codon_mask, -6.0e4)
return mrna_logits, router_logits_list, entropy_loss
def get_embedding(self, tokenizer_encoded_proteins,esm2_encoded_proteins):
# 创建源序列的填充掩码
tokenizer = Tokenizer() # 创建分词器实例
src_padding_mask = (tokenizer_encoded_proteins == tokenizer.padding_idx)
# 通过编码器处理
x=self.dim_trans(esm2_encoded_proteins)
x, router_logits_list, entropy_loss = self.encoder(
x,
src_key_padding_mask=src_padding_mask
)
return x
def get_router_logits(self, tokenizer_encoded_proteins,esm2_encoded_proteins):
# 创建源序列的填充掩码
tokenizer = Tokenizer() # 创建分词器实例
src_padding_mask = (tokenizer_encoded_proteins == tokenizer.padding_idx)
# 通过编码器处理
x=self.dim_trans(esm2_encoded_proteins)
x, router_logits_list, entropy_loss = self.encoder(
x,
src_key_padding_mask=src_padding_mask
)
return router_logits_list
class ActorModel_encoder_nomoe(nn.Module):
"""基于编码器的Actor模型,用于序列生成任务。"""
def __init__(self, vocab_size: int, d_model: int, nhead: int,
num_encoder_layers: int, dim_feedforward: int, esm2_dim: int,dropout: float, device: torch.device):
super(ActorModel_encoder_nomoe, self).__init__()
self.device = device
# 获取生物映射并预计算掩码
self.amino_acid_to_codon = BiologicalMappings.get_amino_acid_to_codon()
self.precomputed_masks = self._precompute_masks()
self.dim_trans=nn.Linear(esm2_dim, d_model)
# 创建编码器和输出层
self.encoder = Encoder_nomoe(vocab_size, d_model, nhead, num_encoder_layers,
dim_feedforward, dropout,if_embedding=False,if_pos_encoding=False)
# 使用序列化的输出层以提高性能
self.output_layer = nn.Sequential(
nn.Linear(d_model, d_model//2),
nn.LayerNorm(d_model//2),
nn.ReLU(),
nn.Dropout(dropout),
nn.Linear(d_model//2, vocab_size)
)
def _precompute_masks(self) -> Dict[int, torch.Tensor]:
"""预计算每个氨基酸对应的密码子掩码,以提高性能。"""
tokenizer = Tokenizer() # 创建分词器实例
masks = {}
for amino_acid, codons in self.amino_acid_to_codon.items():
amino_acid_id = tokenizer.token_to_id.get(amino_acid, tokenizer.unk_idx)
mask = torch.zeros(len(tokenizer.tokens), dtype=torch.bool, device=self.device)
for codon in codons:
codon_id = tokenizer.token_to_id.get(codon, tokenizer.unk_idx)
if codon_id != tokenizer.unk_idx:
mask[codon_id] = True
masks[amino_acid_id] = mask
return masks
def forward(self, tokenizer_encoded_proteins,esm2_encoded_proteins):
"""模型前向传播。
Args:
tokenizer_encoded_proteins: 编码后的蛋白质序列,形状为(batch_size, seq_len)
Returns:
logits: 输出逻辑值,表示模型预测
router_logits_list: 路由器逻辑值列表
entropy_loss: 熵损失
"""
# 创建源序列的填充掩码
tokenizer = Tokenizer() # 创建分词器实例
src_padding_mask = (tokenizer_encoded_proteins == tokenizer.padding_idx)
x=self.dim_trans(esm2_encoded_proteins)
# 通过编码器处理
x= self.encoder(
x,
src_key_padding_mask=src_padding_mask
)
# 为批次中的每个项目和序列位置生成掩码
batch_size, seq_len = tokenizer_encoded_proteins.shape
# 使用索引查询预计算的掩码,通过广播优化性能
amino_acid_to_codon_mask = torch.stack([
self.precomputed_masks.get(
tok.item(),
torch.zeros(len(tokenizer.tokens), dtype=torch.bool, device=self.device)
)
for tok in tokenizer_encoded_proteins.reshape(-1)
]).view(batch_size, seq_len, -1)
# 计算输出逻辑值并应用掩码
logits = self.output_layer(x)
# 使用masking而不是scatter来提高性能
logits = logits.masked_fill(~amino_acid_to_codon_mask, -6.0e4)
return logits
class RewardModel_encoder(nn.Module):
def __init__(self, vocab_size, d_model, nhead, num_encoder_layers, dim_feedforward,dropout,num_experts,top_k_experts,device):
super(RewardModel_encoder, self).__init__()
self.tokenizer=Tokenizer()
self.device=device
self.encoder = Encoder(vocab_size, d_model, nhead, num_encoder_layers,
dim_feedforward, dropout, num_experts, top_k_experts)
self.reward_output_layer = nn.Sequential(
nn.Linear(d_model, d_model//2),
nn.LayerNorm(d_model//2), # 对线性层的输出进行归一化
nn.ReLU(),
nn.Dropout(dropout),
nn.Linear(d_model//2, 1)
)
def forward(self, tokenizer_encoded_mrnas):
src_padding_mask = (tokenizer_encoded_mrnas==self.tokenizer.padding_idx)
x,router_logits_list,entropy_loss = self.encoder(tokenizer_encoded_mrnas, src_key_padding_mask=src_padding_mask)
reward=self.reward_output_layer(x)
reward=reward[:,0,:].squeeze()
return reward,router_logits_list,entropy_loss
class LengthAwareDistributedSampler_human(DistributedSampler):
def __init__(self, dataset, lengths, data_num_rat=None,num_replicas=None, rank=None, shuffle=True):
super().__init__(dataset, num_replicas=num_replicas, rank=rank, shuffle=shuffle)
self.lengths = lengths # 每个样本的长度列表
self.weights = self.calculate_weights() # 根据长度初始化权重
self.data_num_rat=data_num_rat
self.total_size = int(len(dataset) * data_num_rat)
def calculate_weights(self):
# 分段式加权策略
weights = np.ones(len(self.lengths))
weights[np.array(self.lengths) >= 1300] = 85.64*200
weights[(np.array(self.lengths) >= 1200) & (np.array(self.lengths) < 1300)] = 5.02*200
weights[(np.array(self.lengths) >= 1100) & (np.array(self.lengths) < 1200)] = 4.36*100
weights[(np.array(self.lengths) >= 1000) & (np.array(self.lengths) < 1100)] = 3.63*100
weights[(np.array(self.lengths) >= 900) & (np.array(self.lengths) < 1000)] = 3.15
weights[(np.array(self.lengths) >= 800) & (np.array(self.lengths) < 900)] = 2.20
weights[(np.array(self.lengths) >= 700) & (np.array(self.lengths) < 800)] = 1.64
weights[(np.array(self.lengths) >= 600) & (np.array(self.lengths) < 700)] = 1.36
weights[(np.array(self.lengths) >= 500) & (np.array(self.lengths) < 600)] = 1.0
weights[(np.array(self.lengths) >= 400) & (np.array(self.lengths) < 500)] = 0.75
weights[(np.array(self.lengths) >= 300) & (np.array(self.lengths) < 400)] = 0.63
weights[(np.array(self.lengths) >= 200) & (np.array(self.lengths) < 300)] = 0.60
weights[(np.array(self.lengths) >= 100) & (np.array(self.lengths) < 200)] = 0.71
weights[np.array(self.lengths) < 100] = 3.68*100
return weights / np.sum(weights) # 将权重归一化
def __iter__(self):
# 根据加权采样进行索引选择
indices = np.random.choice(len(self.dataset), self.total_size, replace=True, p=self.weights)
# 边界处理:截断到可以整除 num_replicas 的长度
total_size_local = (len(indices) // self.num_replicas) * self.num_replicas
indices = indices[:total_size_local] # 截断多余的样本
# 将样本分配给不同进程
indices = indices[self.rank:total_size_local:self.num_replicas]
if self.shuffle:
np.random.shuffle(indices)
return iter(indices.tolist())
def set_epoch(self, epoch):
super().set_epoch(epoch)
class LengthAwareDistributedSampler_Arabidopsis(DistributedSampler):
def __init__(self, dataset, lengths, data_num_rat=None,num_replicas=None, rank=None, shuffle=True):
super().__init__(dataset, num_replicas=num_replicas, rank=rank, shuffle=shuffle)
self.lengths = lengths # 每个样本的长度列表
self.weights = self.calculate_weights() # 根据长度初始化权重
self.data_num_rat=data_num_rat
self.total_size = int(len(dataset) * data_num_rat)
def calculate_weights(self):
# 分段式加权策略
weights = np.ones(len(self.lengths))
weights[np.array(self.lengths) >= 1300] = 630.75*20
weights[(np.array(self.lengths) >= 1200) & (np.array(self.lengths) < 1300)] = 17.05*20
weights[(np.array(self.lengths) >= 1100) & (np.array(self.lengths) < 1200)] = 11.52*20
weights[(np.array(self.lengths) >= 1000) & (np.array(self.lengths) < 1100)] = 7.17*10
weights[(np.array(self.lengths) >= 900) & (np.array(self.lengths) < 1000)] = 5.56*10
weights[(np.array(self.lengths) >= 800) & (np.array(self.lengths) < 900)] = 3.54
weights[(np.array(self.lengths) >= 700) & (np.array(self.lengths) < 800)] = 2.51
weights[(np.array(self.lengths) >= 600) & (np.array(self.lengths) < 700)] = 1.62
weights[(np.array(self.lengths) >= 500) & (np.array(self.lengths) < 600)] = 1.0
weights[(np.array(self.lengths) >= 400) & (np.array(self.lengths) < 500)] = 0.68
weights[(np.array(self.lengths) >= 300) & (np.array(self.lengths) < 400)] = 0.49
weights[(np.array(self.lengths) >= 200) & (np.array(self.lengths) < 300)] = 0.49
weights[(np.array(self.lengths) >= 100) & (np.array(self.lengths) < 200)] = 0.49
weights[np.array(self.lengths) < 100] = 1.23*10
return weights / np.sum(weights) # 将权重归一化
def __iter__(self):
# 根据加权采样进行索引选择
indices = np.random.choice(len(self.dataset), self.total_size, replace=True, p=self.weights)
# 边界处理:截断到可以整除 num_replicas 的长度
total_size_local = (len(indices) // self.num_replicas) * self.num_replicas
indices = indices[:total_size_local] # 截断多余的样本
# 将样本分配给不同进程
indices = indices[self.rank:total_size_local:self.num_replicas]
if self.shuffle:
np.random.shuffle(indices)
return iter(indices.tolist())
def set_epoch(self, epoch):
super().set_epoch(epoch)
class LengthAwareDistributedSampler_CR(DistributedSampler):
def __init__(self, dataset, lengths, data_num_rat=None,num_replicas=None, rank=None, shuffle=True):
super().__init__(dataset, num_replicas=num_replicas, rank=rank, shuffle=shuffle)
self.lengths = lengths # 每个样本的长度列表
self.weights = self.calculate_weights() # 根据长度初始化权重
self.data_num_rat=data_num_rat
self.total_size = int(len(dataset) * data_num_rat)
def calculate_weights(self):
# 分段式加权策略
weights = np.ones(len(self.lengths))
weights[np.array(self.lengths) >= 1300] = 61.55*20
weights[(np.array(self.lengths) >= 1200) & (np.array(self.lengths) < 1300)] = 3.66*20
weights[(np.array(self.lengths) >= 1100) & (np.array(self.lengths) < 1200)] = 2.96*10
weights[(np.array(self.lengths) >= 1000) & (np.array(self.lengths) < 1100)] = 2.54*10
weights[(np.array(self.lengths) >= 900) & (np.array(self.lengths) < 1000)] = 2.11*10
weights[(np.array(self.lengths) >= 800) & (np.array(self.lengths) < 900)] = 1.79
weights[(np.array(self.lengths) >= 700) & (np.array(self.lengths) < 800)] = 1.39
weights[(np.array(self.lengths) >= 600) & (np.array(self.lengths) < 700)] = 1.11
weights[(np.array(self.lengths) >= 500) & (np.array(self.lengths) < 600)] = 1.0
weights[(np.array(self.lengths) >= 400) & (np.array(self.lengths) < 500)] = 0.82
weights[(np.array(self.lengths) >= 300) & (np.array(self.lengths) < 400)] = 0.73
weights[(np.array(self.lengths) >= 200) & (np.array(self.lengths) < 300)] = 0.67
weights[(np.array(self.lengths) >= 100) & (np.array(self.lengths) < 200)] = 0.66
weights[np.array(self.lengths) < 100] = 1.18*10
return weights / np.sum(weights) # 将权重归一化
def __iter__(self):
# 根据加权采样进行索引选择
indices = np.random.choice(len(self.dataset), self.total_size, replace=True, p=self.weights)
# 边界处理:截断到可以整除 num_replicas 的长度
total_size_local = (len(indices) // self.num_replicas) * self.num_replicas
indices = indices[:total_size_local] # 截断多余的样本
# 将样本分配给不同进程
indices = indices[self.rank:total_size_local:self.num_replicas]
if self.shuffle:
np.random.shuffle(indices)
return iter(indices.tolist())
def set_epoch(self, epoch):
super().set_epoch(epoch)
class LengthAwareDistributedSampler_PC(DistributedSampler):
def __init__(self, dataset, lengths, data_num_rat=None,num_replicas=None, rank=None, shuffle=True):
super().__init__(dataset, num_replicas=num_replicas, rank=rank, shuffle=shuffle)
self.lengths = lengths # 每个样本的长度列表
self.weights = self.calculate_weights() # 根据长度初始化权重
self.data_num_rat=data_num_rat
self.total_size = int(len(dataset) * data_num_rat)
def calculate_weights(self):
# 分段式加权策略
weights = np.ones(len(self.lengths))
weights[np.array(self.lengths) >= 1300] = 318.0*200
weights[(np.array(self.lengths) >= 1200) & (np.array(self.lengths) < 1300)] = 13.98*200
weights[(np.array(self.lengths) >= 1100) & (np.array(self.lengths) < 1200)] = 10.26*100
weights[(np.array(self.lengths) >= 1000) & (np.array(self.lengths) < 1100)] = 7.62*100
weights[(np.array(self.lengths) >= 900) & (np.array(self.lengths) < 1000)] = 6.14*100
weights[(np.array(self.lengths) >= 800) & (np.array(self.lengths) < 900)] = 3.80
weights[(np.array(self.lengths) >= 700) & (np.array(self.lengths) < 800)] = 2.67
weights[(np.array(self.lengths) >= 600) & (np.array(self.lengths) < 700)] = 1.88
weights[(np.array(self.lengths) >= 500) & (np.array(self.lengths) < 600)] = 1.0
weights[(np.array(self.lengths) >= 400) & (np.array(self.lengths) < 500)] = 0.88
weights[(np.array(self.lengths) >= 300) & (np.array(self.lengths) < 400)] = 0.75
weights[(np.array(self.lengths) >= 200) & (np.array(self.lengths) < 300)] = 0.76
weights[(np.array(self.lengths) >= 100) & (np.array(self.lengths) < 200)] = 0.83
weights[np.array(self.lengths) < 100] = 1.87*100
return weights / np.sum(weights) # 将权重归一化
def __iter__(self):
# 根据加权采样进行索引选择
indices = np.random.choice(len(self.dataset), self.total_size, replace=True, p=self.weights)
# 边界处理:截断到可以整除 num_replicas 的长度
total_size_local = (len(indices) // self.num_replicas) * self.num_replicas
indices = indices[:total_size_local] # 截断多余的样本
# 将样本分配给不同进程
indices = indices[self.rank:total_size_local:self.num_replicas]
if self.shuffle:
np.random.shuffle(indices)
return iter(indices.tolist())
def set_epoch(self, epoch):
super().set_epoch(epoch)
class LengthAwareDistributedSampler_EscherichiaColi(DistributedSampler):
def __init__(self, dataset, lengths, data_num_rat=None,num_replicas=None, rank=None, shuffle=True):
super().__init__(dataset, num_replicas=num_replicas, rank=rank, shuffle=shuffle)
self.lengths = lengths # 每个样本的长度列表
self.weights = self.calculate_weights() # 根据长度初始化权重
self.data_num_rat=data_num_rat
self.total_size = int(len(dataset) * data_num_rat)
def calculate_weights(self):
# 分段式加权策略
weights = np.ones(len(self.lengths))
weights[np.array(self.lengths) >= 1300] = 211.0*200
weights[(np.array(self.lengths) >= 1200) & (np.array(self.lengths) < 1300)] = 26.38*200
weights[(np.array(self.lengths) >= 1100) & (np.array(self.lengths) < 1200)] = 15.07*100
weights[(np.array(self.lengths) >= 1000) & (np.array(self.lengths) < 1100)] = 11.72*100
weights[(np.array(self.lengths) >= 900) & (np.array(self.lengths) < 1000)] = 11.11*100
weights[(np.array(self.lengths) >= 800) & (np.array(self.lengths) < 900)] = 4.06
weights[(np.array(self.lengths) >= 700) & (np.array(self.lengths) < 800)] = 2.81
weights[(np.array(self.lengths) >= 600) & (np.array(self.lengths) < 700)] = 2.07
weights[(np.array(self.lengths) >= 500) & (np.array(self.lengths) < 600)] = 1.0
weights[(np.array(self.lengths) >= 400) & (np.array(self.lengths) < 500)] = 0.46
weights[(np.array(self.lengths) >= 300) & (np.array(self.lengths) < 400)] = 0.30
weights[(np.array(self.lengths) >= 200) & (np.array(self.lengths) < 300)] = 0.25
weights[(np.array(self.lengths) >= 100) & (np.array(self.lengths) < 200)] = 0.25
weights[np.array(self.lengths) < 100] = 0.47
return weights / np.sum(weights) # 将权重归一化
def __iter__(self):
# 根据加权采样进行索引选择
indices = np.random.choice(len(self.dataset), self.total_size, replace=True, p=self.weights)
# 边界处理:截断到可以整除 num_replicas 的长度
total_size_local = (len(indices) // self.num_replicas) * self.num_replicas
indices = indices[:total_size_local] # 截断多余的样本
# 将样本分配给不同进程
indices = indices[self.rank:total_size_local:self.num_replicas]
if self.shuffle:
np.random.shuffle(indices)
return iter(indices.tolist())
def set_epoch(self, epoch):
super().set_epoch(epoch)
class LengthAwareDistributedSampler_TK(DistributedSampler):
def __init__(self, dataset, lengths, data_num_rat=None,num_replicas=None, rank=None, shuffle=True):
super().__init__(dataset, num_replicas=num_replicas, rank=rank, shuffle=shuffle)
self.lengths = lengths # 每个样本的长度列表
self.weights = self.calculate_weights() # 根据长度初始化权重
self.data_num_rat=data_num_rat
self.total_size = int(len(dataset) * data_num_rat)
def calculate_weights(self):
# 分段式加权策略
weights = np.ones(len(self.lengths))
weights[(np.array(self.lengths) >= 1200) & (np.array(self.lengths) < 1300)] = 12.25*10
weights[(np.array(self.lengths) >= 1100) & (np.array(self.lengths) < 1200)] = 8.17*10
weights[(np.array(self.lengths) >= 1000) & (np.array(self.lengths) < 1100)] = 24.5*10
weights[(np.array(self.lengths) >= 900) & (np.array(self.lengths) < 1000)] = 8.17*10
weights[(np.array(self.lengths) >= 800) & (np.array(self.lengths) < 900)] = 3.27
weights[(np.array(self.lengths) >= 700) & (np.array(self.lengths) < 800)] = 2.33
weights[(np.array(self.lengths) >= 600) & (np.array(self.lengths) < 700)] = 1.09
weights[(np.array(self.lengths) >= 500) & (np.array(self.lengths) < 600)] = 1.0
weights[(np.array(self.lengths) >= 400) & (np.array(self.lengths) < 500)] = 0.25
weights[(np.array(self.lengths) >= 300) & (np.array(self.lengths) < 400)] = 0.17
weights[(np.array(self.lengths) >= 200) & (np.array(self.lengths) < 300)] = 0.13
weights[(np.array(self.lengths) >= 100) & (np.array(self.lengths) < 200)] = 0.10
weights[np.array(self.lengths) < 100] = 0.22
return weights / np.sum(weights) # 将权重归一化
def __iter__(self):
# 根据加权采样进行索引选择
indices = np.random.choice(len(self.dataset), self.total_size, replace=True, p=self.weights)
# 边界处理:截断到可以整除 num_replicas 的长度
total_size_local = (len(indices) // self.num_replicas) * self.num_replicas
indices = indices[:total_size_local] # 截断多余的样本
# 将样本分配给不同进程
indices = indices[self.rank:total_size_local:self.num_replicas]
if self.shuffle:
np.random.shuffle(indices)
return iter(indices.tolist())
def set_epoch(self, epoch):
super().set_epoch(epoch)
class LengthAwareDistributedSampler_human_circ(DistributedSampler):
def __init__(self, dataset, lengths, data_num_rat=None,num_replicas=None, rank=None, shuffle=True):
super().__init__(dataset, num_replicas=num_replicas, rank=rank, shuffle=shuffle)
self.lengths = lengths # 每个样本的长度列表
self.weights = self.calculate_weights() # 根据长度初始化权重
self.data_num_rat=data_num_rat
self.total_size = int(len(dataset) * data_num_rat)
def calculate_weights(self):
# 分段式加权策略
weights = np.ones(len(self.lengths))
weights[np.array(self.lengths) >= 1300] = 89.62*20
weights[(np.array(self.lengths) >= 1200) & (np.array(self.lengths) < 1300)] = 5.24*20
weights[(np.array(self.lengths) >= 1100) & (np.array(self.lengths) < 1200)] = 4.58*10
weights[(np.array(self.lengths) >= 1000) & (np.array(self.lengths) < 1100)] = 3.82*10
weights[(np.array(self.lengths) >= 900) & (np.array(self.lengths) < 1000)] = 3.30
weights[(np.array(self.lengths) >= 800) & (np.array(self.lengths) < 900)] = 2.34
weights[(np.array(self.lengths) >= 700) & (np.array(self.lengths) < 800)] = 1.74
weights[(np.array(self.lengths) >= 600) & (np.array(self.lengths) < 700)] = 1.36
weights[(np.array(self.lengths) >= 500) & (np.array(self.lengths) < 600)] = 1.0
weights[(np.array(self.lengths) >= 400) & (np.array(self.lengths) < 500)] = 0.74
weights[(np.array(self.lengths) >= 300) & (np.array(self.lengths) < 400)] = 0.57
weights[(np.array(self.lengths) >= 200) & (np.array(self.lengths) < 300)] = 0.46
weights[(np.array(self.lengths) >= 100) & (np.array(self.lengths) < 200)] = 0.38
weights[np.array(self.lengths) < 100] = 0.48
return weights / np.sum(weights) # 将权重归一化
def __iter__(self):
# 根据加权采样进行索引选择
indices = np.random.choice(len(self.dataset), self.total_size, replace=True, p=self.weights)
# 边界处理:截断到可以整除 num_replicas 的长度
total_size_local = (len(indices) // self.num_replicas) * self.num_replicas
indices = indices[:total_size_local] # 截断多余的样本
# 将样本分配给不同进程
indices = indices[self.rank:total_size_local:self.num_replicas]
if self.shuffle:
np.random.shuffle(indices)
return iter(indices.tolist())
def set_epoch(self, epoch):
super().set_epoch(epoch)