maotao / model /codon_attr.py
julse's picture
Update model/codon_attr.py
a0727ad verified
import os
import random
import pandas as pd
import numpy as np
from typing import List, Dict, Tuple, Union
from collections import defaultdict
class Codon:
CODON_TO_AA = {
'UUU': 'F', 'UUC': 'F', # Phe (2-fold)
'UUA': 'L', 'UUG': 'L', 'CUU': 'L', 'CUC': 'L', 'CUA': 'L', 'CUG': 'L', # Leu (6-fold)
'AUU': 'I', 'AUC': 'I', 'AUA': 'I', # Ile (3-fold)
'AUG': 'M', # Met (无同义密码子,排除)
'GUU': 'V', 'GUC': 'V', 'GUA': 'V', 'GUG': 'V', # Val (4-fold)
'UCU': 'S', 'UCC': 'S', 'UCA': 'S', 'UCG': 'S', 'AGU': 'S', 'AGC': 'S', # Ser (6-fold)
'CCU': 'P', 'CCC': 'P', 'CCA': 'P', 'CCG': 'P', # Pro (4-fold)
'ACU': 'T', 'ACC': 'T', 'ACA': 'T', 'ACG': 'T', # Thr (4-fold)
'GCU': 'A', 'GCC': 'A', 'GCA': 'A', 'GCG': 'A', # Ala (4-fold)
'UAU': 'Y', 'UAC': 'Y', # Tyr (2-fold)
'UAA': '*', 'UAG': '*', 'UGA': '*', # 终止密码子 (排除)
'CAU': 'H', 'CAC': 'H', # His (2-fold)
'CAA': 'Q', 'CAG': 'Q', # Gln (2-fold)
'AAU': 'N', 'AAC': 'N', # Asn (2-fold)
'AAA': 'K', 'AAG': 'K', # Lys (2-fold)
'GAU': 'D', 'GAC': 'D', # Asp (2-fold)
'GAA': 'E', 'GAG': 'E', # Glu (2-fold)
'UGU': 'C', 'UGC': 'C', # Cys (2-fold)
'UGG': 'W', # Trp (无同义密码子,排除)
'CGU': 'R', 'CGC': 'R', 'CGA': 'R', 'CGG': 'R', 'AGA': 'R', 'AGG': 'R', # Arg (6-fold)
'GGU': 'G', 'GGC': 'G', 'GGA': 'G', 'GGG': 'G' # Gly (4-fold)
}
def __init__(self, codon_usage_path, rna=True):
self.bases = 'GAUC'
self.aas = 'ACDEFGHIKLMNPQRSTVWY*'.lower()
self.codon_table = {}
self.max_aa_table = {}
self.cai_best_aa2nn_table = {}
self.frame_ith_aa_base_fraction = {
i: {
a: {
base: 0.0 for base in self.bases
} for a in self.aas
} for i in range(3)
}
# 1: {'A': {'A': 0.0, 'C': 0.0, 'G': 0.0, 'U': 0.0},
# 'C': {'A': 0.0, 'C': 0.0, 'G': 0.0, 'U': 0.0},
# 'G': {'A': 0.0, 'C': 0.0, 'G': 0.0, 'U': 0.0},
# 'U': {'A': 0.0, 'C': 0.0, 'G': 0.0, 'U': 0.0}},
# self.frame_ith_aa_base_fraction = {0: defaultdict(list), 1: defaultdict(list), 2: defaultdict(list)}
# self.frame_ith_aa_base_fraction = {i:{a:{base:defaultdict(float)} for a in self.aas for base in self.bases} for i in range(3)}
# rna参数现在只用于控制输出格式,输入可以是RNA或DNA
self.output_rna = rna
# RNA标准密码子表(用于ENC和RSCU计算)
self.standard_codon_table = self.CODON_TO_AA
# 按简并度预分组氨基酸
self.degeneracy_groups = {
'2-fold': ['F', 'Y', 'C', 'H', 'Q', 'N', 'K', 'D', 'E'],
'3-fold': ['I'],
'4-fold': ['V', 'P', 'T', 'A', 'G'],
'6-fold': ['L', 'S', 'R']
}
# print(f"\nOutput format: {'RNA' if self.output_rna else 'DNA'}")
# print(f"Loading codon usage table from {codon_usage_path}")
# print("suppose csv in the format columns: 'codon', 'amino_acid', 'fraction'\n")
if os.access(codon_usage_path, os.R_OK) and os.path.getsize(codon_usage_path) > 0:
with open(codon_usage_path, 'r') as codon_file:
next(codon_file) # Skip the header line
for line in codon_file:
line = line.strip()
if not line:
continue
codon, aa, fraction, *_ = line.split(',')
# 内部统一存储为RNA格式, AA 小写
codon = codon.upper().replace('T', 'U')
aa = aa.lower()
fraction = float(fraction)
self.codon_table[codon] = (aa, fraction)
for i,base in enumerate(codon):
# print(i,aa,base,fraction,self.frame_ith_aa_base_fraction[i][aa])
# self.frame_ith_table[i][aa].append((base, fraction))
self.frame_ith_aa_base_fraction[i][aa][base] = fraction + self.frame_ith_aa_base_fraction[i][aa][base]
# self.frame_ith_table[i][aa][base] = fraction + self.frame_ith_table[i][aa][base]
if aa not in self.max_aa_table or self.max_aa_table[aa] < fraction:
self.max_aa_table[aa] = fraction
self.cai_best_aa2nn_table[aa] = codon
# frame_ith_table = [self.frame_ith_table[i][aa] for aa in self.frame_ith_table[i] for i in range(3)]
print(f"Codon usage table loaded, {len(self.codon_table)} codons loaded from {codon_usage_path}")
else:
print(f'codon usage table is missing',codon_usage_path)
self.aa_to_codons = self._build_aa_to_codons()
# 预计算氨基酸到密码子权重的映射(用于加权随机)
self.aa_to_weights = self._build_aa_to_weights()
self.calculate_CAI = self.calc_cai
def _build_aa_to_codons(self):
"""构建氨基酸到密码子列表的映射"""
aa_to_codons = defaultdict(list)
for codon, (aa, _) in self.codon_table.items():
aa_to_codons[aa].append(codon)
return dict(aa_to_codons)
def _build_aa_to_weights(self):
"""构建氨基酸到密码子权重的映射"""
aa_to_weights = defaultdict(list)
for codon, (aa, weight) in self.codon_table.items():
aa_to_weights[aa].append(weight)
return dict(aa_to_weights)
def _normalize_sequence(self, sequence: str) -> str:
"""标准化序列为RNA格式"""
sequence = sequence.upper()
# 将DNA转换为RNA格式(内部统一使用RNA)
sequence = sequence.replace('T', 'U')
return sequence
def _validate_sequence(self, sequence: str) -> str:
"""验证并标准化序列"""
sequence = self._normalize_sequence(sequence)
if len(sequence) % 3 != 0:
raise ValueError(f"序列长度必须是3的倍数,当前长度: {len(sequence)}")
valid_bases = {'A', 'U', 'C', 'G'}
if not all(base in valid_bases for base in sequence):
raise ValueError("序列包含无效的碱基字符")
return sequence
def _count_codons(self, sequence: str) -> Dict[str, int]:
"""统计序列中密码子使用次数"""
sequence = self._validate_sequence(sequence)
codon_count = {}
num_codons = len(sequence) // 3
for i in range(num_codons):
codon = sequence[i * 3:(i + 1) * 3]
if codon in self.standard_codon_table and self.standard_codon_table[codon] != '*':
codon_count[codon] = codon_count.get(codon, 0) + 1
return codon_count
@staticmethod
def translate_sequence(sequence: str) -> str:
"""将序列翻译为氨基酸序列"""
sequence = sequence.upper().replace('T', 'U')
aa_seq = ''
for i in range(0, len(sequence), 3):
codon = sequence[i:i + 3]
if codon in Codon.CODON_TO_AA:
aa = Codon.CODON_TO_AA[codon]
aa_seq += aa
return aa_seq
def calc_cai(self, seq):
"""计算CAI值,输入可以是RNA或DNA序列"""
# 标准化序列为RNA格式
seq = self._normalize_sequence(seq)
if len(seq) % 3 != 0:
# raise ValueError(f"序列长度必须是3的倍数, 当前长度: {len(seq)},{seq}")
return np.nan
cai = 0.0
valid_num = 0
for i in range(0, len(seq), 3):
codon = seq[i:i + 3]
if codon not in self.codon_table:
continue
aa, fraction = self.codon_table[codon]
f_c_max = self.max_aa_table[aa]
w_i = fraction / f_c_max
cai += np.log2(w_i)
valid_num += 1
return np.exp2(cai / valid_num) if valid_num > 0 else 0.0
def cai_opt_codon(self, aa_seq):
aa_seq = aa_seq.lower()
"""获取CAI最优密码子序列"""
cai_opt_codon = []
for i in range(0, len(aa_seq), 1):
aa = aa_seq[i]
codon = self.cai_best_aa2nn_table.get(aa, '___')
# 根据输出格式转换
if not self.output_rna:
codon = codon.replace('U', 'T')
cai_opt_codon.append(codon)
return ''.join(cai_opt_codon)
def random_codon(self, aa_seq):
"""
根据密码子频率加权随机生成CDS序列
参数:
aa_sequence (str): 氨基酸序列(单字母)
返回:
str: 随机生成的DNA序列
"""
aa_seq = aa_seq.lower()
opt_codon = []
for i in range(0, len(aa_seq), 1):
aa = aa_seq[i]
if aa not in self.aa_to_codons:
codon = '___'
else:
codons = self.aa_to_codons[aa] # ['AUG']
weights = self.aa_to_weights[aa] # [1.0]
codon = random.choices(codons, weights=weights, k=1)[0]
opt_codon.append(codon)
opt_nn = ''.join(opt_codon)
# 根据输出格式转换
if not self.output_rna:
opt_nn = opt_nn.replace('U', 'T')
return opt_nn
def random_codon_weight(self, aa_seq,weights_df=None):
"""
根据密码子频率加权随机生成CDS序列
参数:
aa_sequence (str): 氨基酸序列(单字母)
返回:
str: 随机生成的DNA序列
"""
if weights_df is None:
return self.random_codon(aa_seq)
# weights_df.columns = ['triplet', 'amino_acid', 'fraction']
# weights_df_gp = weights_df.groupby(by='amino_acid')
aa_seq = aa_seq.lower()
opt_codon = []
for i in range(0, len(aa_seq), 1):
aa = aa_seq[i]
if aa not in self.aa_to_codons:
codon = '___'
else:
tmp = weights_df[weights_df['amino_acid']==aa]
codon = random.choices(tmp['triplet'].to_list(), weights=tmp['fraction'].to_list(), k=1)[0]
opt_codon.append(codon)
opt_nn = ''.join(opt_codon)
# 根据输出格式转换
if not self.output_rna:
opt_nn = opt_nn.replace('U', 'T')
return opt_nn
def calculate_ENC(self, sequence: str) -> float:
"""
计算单条序列的ENC值,输入可以是RNA或DNA序列
参数:
sequence: 序列字符串
返回:
enc_value: ENC值
"""
codon_count = self._count_codons(sequence)
# 按氨基酸分组
amino_acid_counts = {}
for codon, aa in self.standard_codon_table.items():
if aa in ['M', 'W'] or aa == '*':
continue
if aa not in amino_acid_counts:
amino_acid_counts[aa] = {}
amino_acid_counts[aa][codon] = codon_count.get(codon, 0)
# 计算每个氨基酸组的F值
F_values = {'2-fold': [], '3-fold': [], '4-fold': [], '6-fold': []}
for aa, codon_counts in amino_acid_counts.items():
# 确定简并度
degeneracy = None
for deg, aas in self.degeneracy_groups.items():
if aa in aas:
degeneracy = deg
break
if not degeneracy:
continue
# 获取该氨基酸的所有同义密码子
codons_for_aa = [c for c, a in self.standard_codon_table.items()
if a == aa and a not in ['M', 'W'] and a != '*']
s = len(codons_for_aa)
# 统计使用次数
n_i_values = [codon_counts.get(codon, 0) for codon in codons_for_aa]
total_n = sum(n_i_values)
if total_n == 0 or s <= 1:
continue
# 计算F值
sum_squared_freq = sum((n_i / total_n) ** 2 for n_i in n_i_values)
F = (s * sum_squared_freq - 1) / (s - 1)
F_values[degeneracy].append(F)
# 计算各简并度的平均F值
# F2_avg = np.mean(F_values['2-fold']) if F_values['2-fold'] else 1.0
# F3_avg = np.mean(F_values['3-fold']) if F_values['3-fold'] else 1.0
# F4_avg = np.mean(F_values['4-fold']) if F_values['4-fold'] else 1.0
# F6_avg = np.mean(F_values['6-fold']) if F_values['6-fold'] else 1.0
enc_value = 2.0
if F_values['2-fold']:
enc_value += 9.0 / np.mean(F_values['2-fold'])
if F_values['3-fold']:
enc_value += 1.0 / np.mean(F_values['3-fold'])
if F_values['4-fold']:
enc_value += 5.0 / np.mean(F_values['4-fold'])
if F_values['6-fold']:
enc_value += 3.0 / np.mean(F_values['6-fold'])
# 计算ENC值
# enc_value = 2 + 9 / F2_avg + 1 / F3_avg + 5 / F4_avg + 3 / F6_avg
return enc_value
def calculate_RSCU(self, sequences: List[str]) -> Dict[str, float]:
"""
计算相对同义密码子使用度 (Relative Synonymous Codon Usage, RSCU)
参数:
sequences: 序列列表(可以是RNA或DNA)
返回:
rscu_dict: 每个密码子的RSCU值字典(RNA格式)
"""
total_codon_count = defaultdict(int)
aa_observed_codons = defaultdict(set)
# 统计所有序列的密码子使用
for seq in sequences:
try:
codon_count = self._count_codons(seq)
for codon, count in codon_count.items():
aa = self.standard_codon_table[codon]
total_codon_count[codon] += count
aa_observed_codons[aa].add(codon)
except ValueError:
continue # 跳过无效序列
# 计算RSCU
rscu_dict = {}
aa_total_count = defaultdict(int)
# 首先计算每个氨基酸的总密码子数
for codon, count in total_codon_count.items():
aa = self.standard_codon_table[codon]
aa_total_count[aa] += count
# 然后计算每个密码子的RSCU
for codon, count in total_codon_count.items():
aa = self.standard_codon_table[codon]
if aa_total_count[aa] > 0:
# 该氨基酸的同义密码子数量
synonymous_codons = len([c for c in aa_observed_codons[aa]
if self.standard_codon_table[c] == aa])
expected_count = aa_total_count[aa] / synonymous_codons
rscu_dict[codon] = count / expected_count if expected_count > 0 else 0.0
else:
rscu_dict[codon] = 0.0
return rscu_dict
def analyze_sequence(self, sequence: str, sequence_name: str = "") -> Dict:
"""
综合分析单条序列的密码子使用特征
参数:
sequence: 序列字符串(可以是RNA或DNA)
sequence_name: 序列名称(可选)
返回:
包含所有指标的字典
"""
try:
enc = self.calculate_ENC(sequence)
cai = self.calc_cai(sequence)
result = {
'Sequence_Name': sequence_name,
'Sequence_Length': len(sequence),
'ENC': round(enc, 3),
'ENC_Preference': 'strong' if enc <= 35 else 'week',
'CAI': round(cai, 3),
'CAI_Level': 'high' if cai > 0.7 else 'low'
}
return result
except Exception as e:
return {
'Sequence_Name': sequence_name,
'Sequence_Length': len(sequence),
'ENC': None,
'CAI': None,
'Error': str(e)
}
@staticmethod
def modify_func(sequence):
return '_'*len(sequence)
@staticmethod
def modify_codon_by_frames(sequence, frames=[1,2,3], modify_func=None):
"""
高级版本:支持自定义修改函数
参数:
sequence (str): 输入序列
frame (int): 要修改的密码子位置 (1, 2, 3)
modify_func (callable): 修改函数,接收原帧字符串,返回修改后的字符串
返回:
str: 修改后的重建序列
"""
# 清理序列
seq = sequence.upper().replace(' ', '').replace('\n', '')
seq = seq[:len(seq) - len(seq) % 3]
# 使用切片提取帧
frames = [seq[0::3], seq[1::3], seq[2::3]]
reconstructed_list =[]
# 应用修改函数
for frame in frames:
frame_index = frame - 1
if modify_func:
frames[frame_index] = modify_func(frames[frame_index])
# 重建序列
reconstructed = ''.join(
frames[0][i] + frames[1][i] + frames[2][i]
for i in range(len(frames[0]))
)
reconstructed_list.append(reconstructed)
return reconstructed_list
# 使用示例 - 测试所有功能
def example_usage():
"""测试所有功能"""
print("=" * 60)
print("测试 Codon 类的所有功能")
print("=" * 60)
# 测试数据
species_list = ["mouse", "Ec", "Sac", "Pic", "Human"]
test_species = "mouse" # 选择一个物种进行详细测试
# 测试序列
aa_seq = "MASV"
dna_seq = "ATGGCCATGGCGCCCAGAACTGAGATCAAATAGTACCCGTATTAACGGGTA"
rna_seq = dna_seq.replace('T', 'U')
# 测试序列集合(用于RSCU计算)
test_sequences = [
"AUGGCUUCUUUUUUCUUCUUCUUCUUCUUCUUCCUCCUCCUCCUCCUCCUCCUCCUC", # RNA
"ATGGCUUCUUUUCUCGUAUACACAGATGACTACGTTAGCAGCTACGTTACGTTACGTTACG", # DNA
"AUGGUUUGUUGGUUGGUUGGUUGGUUGGUUGGUUGGUUGGUUGGUUGGUUGGUUGGA" # RNA
]
# 单个测试序列
test_sequence = "AUGGCUUCUUUUCUCGUAUACACAGAUGACUACGUAGCAGCUACGUACGUACGUACG"
Codon.translate_sequence(dna_seq) # 验证translate_sequence函数
# 假设的密码子使用表路径
codon_table_path = "/Users/gz_julse/code/minimind_RiboUTR/maotao_file/codon_table/codon_usage_{species}.csv"
print(f"\n1. 初始化 Codon 实例 (物种: {species_list})")
print("-" * 50)
# 创建分析器实例,输出格式为RNA和DNA各一个
codon_instance_dna = {species: Codon(codon_table_path.format(species=species), rna=False) for species in
species_list}
codon_instance_rna = {species: Codon(codon_table_path.format(species=species), rna=True) for species in
species_list}
print(f"✓ 成功创建 {len(species_list)} 个物种的 Codon 实例")
print(f"\n2. 测试 CAI 计算")
print("-" * 50)
# 测试DNA和RNA序列输入
print("DNA序列CAI:", [codon_instance_rna[species].calc_cai(dna_seq) for species in species_list])
print("RNA序列CAI:", [codon_instance_rna[species].calc_cai(rna_seq) for species in species_list])
# 验证DNA和RNA输入结果一致
dna_cai = codon_instance_rna[test_species].calc_cai(dna_seq)
rna_cai = codon_instance_rna[test_species].calc_cai(rna_seq)
print(f"✓ DNA和RNA输入结果一致: {np.isclose(dna_cai, rna_cai)}")
print(f"\n3. 测试 CAI 最优密码子序列")
print("-" * 50)
# 测试最优密码子序列
opt_rna = codon_instance_rna[test_species].cai_opt_codon(aa_seq)
opt_dna = codon_instance_dna[test_species].cai_opt_codon(aa_seq)
print(f"氨基酸序列: {aa_seq}")
print(f"RNA格式最优密码子: {opt_rna}")
print(f"DNA格式最优密码子: {opt_dna}")
print(f"✓ 输出格式正确: RNA={opt_rna.replace('T', '') == opt_rna}, DNA={opt_dna.replace('U', '') == opt_dna}")
print(f"\n4. 测试 ENC 计算")
print("-" * 50)
# 测试ENC计算
enc_dna = codon_instance_rna[test_species].calculate_ENC(dna_seq)
enc_rna = codon_instance_rna[test_species].calculate_ENC(rna_seq)
print(f"DNA序列ENC: {enc_dna:.3f}")
print(f"RNA序列ENC: {enc_rna:.3f}")
print(f"✓ DNA和RNA输入结果一致: {np.isclose(enc_dna, enc_rna)}")
print(f"\n5. 测试 RSCU 计算")
print("-" * 50)
# 测试RSCU计算
rscu_results = codon_instance_rna[test_species].calculate_RSCU(test_sequences)
print(f"计算了 {len(rscu_results)} 个密码子的RSCU值")
print("前10个密码子的RSCU值:")
for i, (codon, rscu) in enumerate(list(rscu_results.items())[:10]):
print(f" {codon}: {rscu:.3f}")
print(f"\n6. 测试综合分析 (analyze_sequence)")
print("-" * 50)
# 测试综合分析
analysis_result = codon_instance_rna[test_species].analyze_sequence(test_sequence, "Test_Gene")
print("综合分析结果:")
for key, value in analysis_result.items():
print(f" {key}: {value}")
print(f"\n7. 测试序列验证功能")
print("-" * 50)
# 测试无效序列
invalid_seqs = [
"AUGGCUUCUUUUCUCG", # 长度不是3的倍数
"AUGXXXUUUUCUCGUAUACACAGAUGACUACGUAGCAGCUACGUACGUACGUACG", # 包含无效字符
]
for i, seq in enumerate(invalid_seqs):
try:
codon_instance_rna[test_species]._validate_sequence(seq)
print(f"序列 {i + 1}: 错误地通过了验证")
except ValueError as e:
print(f"序列 {i + 1}: 正确捕获错误 - {e}")
print(f"\n8. 测试密码子计数")
print("-" * 50)
# 测试密码子计数
codon_count = codon_instance_rna[test_species]._count_codons(test_sequence)
print(f"序列 '{test_sequence[:20]}...' 的密码子计数:")
for codon, count in list(codon_count.items())[:5]:
print(f" {codon}: {count}")
print(f" ... (共 {len(codon_count)} 种密码子)")
print(f"\n9. 测试不同输出格式的兼容性")
print("-" * 50)
# 验证RNA和DNA输出实例的CAI计算相同
cai_rna_instance = codon_instance_rna[test_species].calc_cai(test_sequence)
cai_dna_instance = codon_instance_dna[test_species].calc_cai(test_sequence)
print(f"RNA输出实例CAI: {cai_rna_instance:.4f}")
print(f"DNA输出实例CAI: {cai_dna_instance:.4f}")
print(f"✓ 不同输出格式实例的CAI计算相同: {np.isclose(cai_rna_instance, cai_dna_instance)}")
print(f"\n" + "=" * 60)
print("所有功能测试完成!")
print("=" * 60)
if __name__ == "__main__":
example_usage()