File size: 15,106 Bytes
4707555 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 |
import pandas as pd
import numpy as np
from typing import List, Dict, Tuple, Union
from collections import defaultdict
import argparse
class CodonUsageAnalyzer:
"""
密码子使用分析器:集成ENC、CAI和RSCU计算
"""
def __init__(self, freq_codon_usage_path=None):
"""
初始化分析器
参数:
reference_sequences: 用于计算RSCU参考集的序列列表
"""
# RNA密码子表
self.codon_table = {
'UUU': 'F', 'UUC': 'F', # Phe (2-fold)
'UUA': 'L', 'UUG': 'L', 'CUU': 'L', 'CUC': 'L', 'CUA': 'L', 'CUG': 'L', # Leu (6-fold)
'AUU': 'I', 'AUC': 'I', 'AUA': 'I', # Ile (3-fold)
'AUG': 'M', # Met (无同义密码子,排除)
'GUU': 'V', 'GUC': 'V', 'GUA': 'V', 'GUG': 'V', # Val (4-fold)
'UCU': 'S', 'UCC': 'S', 'UCA': 'S', 'UCG': 'S', 'AGU': 'S', 'AGC': 'S', # Ser (6-fold)
'CCU': 'P', 'CCC': 'P', 'CCA': 'P', 'CCG': 'P', # Pro (4-fold)
'ACU': 'T', 'ACC': 'T', 'ACA': 'T', 'ACG': 'T', # Thr (4-fold)
'GCU': 'A', 'GCC': 'A', 'GCA': 'A', 'GCG': 'A', # Ala (4-fold)
'UAU': 'Y', 'UAC': 'Y', # Tyr (2-fold)
'UAA': '*', 'UAG': '*', 'UGA': '*', # 终止密码子 (排除)
'CAU': 'H', 'CAC': 'H', # His (2-fold)
'CAA': 'Q', 'CAG': 'Q', # Gln (2-fold)
'AAU': 'N', 'AAC': 'N', # Asn (2-fold)
'AAA': 'K', 'AAG': 'K', # Lys (2-fold)
'GAU': 'D', 'GAC': 'D', # Asp (2-fold)
'GAA': 'E', 'GAG': 'E', # Glu (2-fold)
'UGU': 'C', 'UGC': 'C', # Cys (2-fold)
'UGG': 'W', # Trp (无同义密码子,排除)
'CGU': 'R', 'CGC': 'R', 'CGA': 'R', 'CGG': 'R', 'AGA': 'R', 'AGG': 'R', # Arg (6-fold)
'GGU': 'G', 'GGC': 'G', 'GGA': 'G', 'GGG': 'G' # Gly (4-fold)
}
# 按简并度预分组氨基酸
self.degeneracy_groups = {
'2-fold': ['F', 'Y', 'C', 'H', 'Q', 'N', 'K', 'D', 'E'],
'3-fold': ['I'],
'4-fold': ['V', 'P', 'T', 'A', 'G'],
'6-fold': ['L', 'S', 'R']
}
# 初始化参考密码子使用表(用于CAI和RSCU)
self.reference_codon_usage = None
if freq_codon_usage_path is not None:
self.freq_codon_table = {}
self.max_aa_table = {}
with open(freq_codon_usage_path, 'r') as codon_file:
next(codon_file) # Skip the header line
for line in codon_file:
line = line.strip()
if not line:
continue
codon, aa, fraction,*_= line.split(',')
fraction = float(fraction)
self.freq_codon_table[codon] = (aa, fraction)
if aa not in self.max_aa_table or self.max_aa_table[aa] < fraction:
self.max_aa_table[aa] = fraction
self.reference_codon_usage = self.freq_codon_table
def _validate_sequence(self, sequence: str) -> str:
"""验证并标准化RNA序列"""
sequence = sequence.upper().replace('T', 'U')
if len(sequence) % 3 != 0:
raise ValueError(f"序列长度必须是3的倍数,当前长度: {len(sequence)}")
valid_bases = {'A', 'U', 'C', 'G'}
if not all(base in valid_bases for base in sequence):
raise ValueError("序列包含无效的碱基字符")
return sequence
def _count_codons(self, sequence: str) -> Dict[str, int]:
"""统计序列中密码子使用次数"""
sequence = self._validate_sequence(sequence)
codon_count = {}
num_codons = len(sequence) // 3
for i in range(num_codons):
codon = sequence[i * 3:(i + 1) * 3]
if codon in self.codon_table and self.codon_table[codon] != '*':
codon_count[codon] = codon_count.get(codon, 0) + 1
return codon_count
def calculate_ENC(self, sequence: str) -> float:
"""
计算单条RNA序列的ENC值
参数:
sequence: RNA序列字符串
返回:
enc_value: ENC值
"""
codon_count = self._count_codons(sequence)
# 按氨基酸分组
amino_acid_counts = {}
for codon, aa in self.codon_table.items():
if aa in ['M', 'W'] or aa == '*':
continue
if aa not in amino_acid_counts:
amino_acid_counts[aa] = {}
amino_acid_counts[aa][codon] = codon_count.get(codon, 0)
# 计算每个氨基酸组的F值
F_values = {'2-fold': [], '3-fold': [], '4-fold': [], '6-fold': []}
for aa, codon_counts in amino_acid_counts.items():
# 确定简并度
degeneracy = None
for deg, aas in self.degeneracy_groups.items():
if aa in aas:
degeneracy = deg
break
if not degeneracy:
continue
# 获取该氨基酸的所有同义密码子
codons_for_aa = [c for c, a in self.codon_table.items()
if a == aa and a not in ['M', 'W'] and a != '*']
s = len(codons_for_aa)
# 统计使用次数
n_i_values = [codon_counts.get(codon, 0) for codon in codons_for_aa]
total_n = sum(n_i_values)
if total_n == 0 or s <= 1:
continue
# 计算F值
sum_squared_freq = sum((n_i / total_n) ** 2 for n_i in n_i_values)
F = (s * sum_squared_freq - 1) / (s - 1)
F_values[degeneracy].append(F)
# 计算各简并度的平均F值
F2_avg = np.mean(F_values['2-fold']) if F_values['2-fold'] else 1.0
F3_avg = np.mean(F_values['3-fold']) if F_values['3-fold'] else 1.0
F4_avg = np.mean(F_values['4-fold']) if F_values['4-fold'] else 1.0
F6_avg = np.mean(F_values['6-fold']) if F_values['6-fold'] else 1.0
# 计算ENC值
enc_value = 2 + 9 / F2_avg + 1 / F3_avg + 5 / F4_avg + 3 / F6_avg
return enc_value
def calculate_CAI(self, sequence: str) -> float:
"""
计算密码子适应指数 (Codon Adaptation Index, CAI)
参数:
sequence: RNA序列字符串
返回:
cai_value: CAI值 (0-1之间)
"""
if self.reference_codon_usage is None:
raise ValueError("请先设置参考序列集")
codon_count = self._count_codons(sequence)
# 计算几何平均数
product = 1.0
total_codons = 0
for codon, count in codon_count.items():
if codon in self.reference_codon_usage:
aa,codon_freq = self.reference_codon_usage[codon]
max_freq = self.max_aa_table[aa]
if max_freq > 0:
weight = codon_freq / max_freq # 相对适应性权重
product *= (weight ** count)
total_codons += count
if total_codons == 0:
return 0.0
cai_value = product ** (1 / total_codons)
return cai_value
def calculate_RSCU(self, sequences: List[str]) -> Dict[str, float]:
"""
计算相对同义密码子使用度 (Relative Synonymous Codon Usage, RSCU)
参数:
sequences: RNA序列列表
返回:
rscu_dict: 每个密码子的RSCU值字典
"""
total_codon_count = defaultdict(int)
aa_observed_codons = defaultdict(set)
# 统计所有序列的密码子使用
for seq in sequences:
try:
codon_count = self._count_codons(seq)
for codon, count in codon_count.items():
aa = self.codon_table[codon]
total_codon_count[codon] += count ## 每个密码子的使用次数
aa_observed_codons[aa].add(codon)
except ValueError:
continue # 跳过无效序列
# 计算RSCU
rscu_dict = {}
aa_total_count = defaultdict(int)
# 首先计算每个氨基酸的总密码子数
for codon, count in total_codon_count.items():
aa = self.codon_table[codon]
aa_total_count[aa] += count
# 然后计算每个密码子的RSCU
for codon, count in total_codon_count.items():
aa = self.codon_table[codon]
if aa_total_count[aa] > 0:
# 该氨基酸的同义密码子数量
synonymous_codons = len([c for c in aa_observed_codons[aa]
if self.codon_table[c] == aa])
expected_count = aa_total_count[aa] / synonymous_codons
rscu_dict[codon] = count / expected_count if expected_count > 0 else 0.0
else:
rscu_dict[codon] = 0.0
return rscu_dict
def analyze_sequence(self, sequence: str, sequence_name: str = "") -> Dict:
"""
综合分析单条序列的密码子使用特征
参数:
sequence: RNA序列字符串
sequence_name: 序列名称(可选)
返回:
包含所有指标的字典
"""
try:
enc = self.calculate_ENC(sequence)
cai = self.calculate_CAI(sequence) if self.reference_codon_usage else None
rcsu = self.calculate_RSCU([sequence])
result = {
'Sequence_Name': sequence_name,
'Sequence_Length': len(sequence),
'ENC': round(enc, 3),
'ENC_Preference': 'Strong' if enc <= 35 else 'Week',
}
if cai is not None:
result['CAI'] = round(cai, 3)
result['CAI_Level'] = 'High' if cai > 0.7 else 'Low'
return result
except Exception as e:
return {
'Sequence_Name': sequence_name,
'Sequence_Length': len(sequence),
'ENC': None,
'CAI': None,
'Error': str(e)
}
# 使用示例
def example_usage():
"""使用示例"""
# 示例参考序列(高表达基因)
codon_usage_path = "./data/codon_table/codon_usage_Escherichia_coli.csv"
# 创建分析器
analyzer = CodonUsageAnalyzer(codon_usage_path)
# 测试序列
test_sequence = "AUGGCUUCUUUUCUCGUAUACACAGAUGACUACGUAGCAGCUACGUACGUACGUACG"
# 计算单序列的ENC和CAI
result = analyzer.analyze_sequence(test_sequence, "Test_Gene")
print("单序列分析结果:")
for key, value in result.items():
print(f" {key}: {value}")
# 计算RSCU(需要多条序列)
test_sequences = [
"AUGGCUUCUUUUUUCUUCUUCUUCUUCUUCUUCCUCCUCCUCCUCCUCCUCCUCCUC",
"AUGGCUUCUUUUCUCGUAUACACAGAUGACUACGUAGCAGCUACGUACGUACGUACG",
"AUGGUUUGUUGGUUGGUUGGUUGGUUGGUUGGUUGGUUGGUUGGUUGGUUGGUUGG"
]
rscu_results = analyzer.calculate_RSCU(test_sequences)
print(f"\nRSCU结果 (前10个密码子):")
for i, (codon, rscu) in enumerate(list(rscu_results.items())[:10]):
print(f" {codon}: {rscu:.3f}")
# 批量分析示例
print(f"\n批量分析示例:")
sequences_to_analyze = [
("Gene1", "AUGGCUUCUUUUUUCUUCUUCUUCUUCUUCUUCCUCCUCCUCCUCCUCCUCCUCCUC"),
("Gene2", "AUGGCUUCUUUUCUCGUAUACACAGAUGACUACGUAGCAGCUACGUACGUACGUACG"),
("Gene3", "AUGGUUUGUUGGUUGGUUGGUUGGUUGGUUGGUUGGUUGGUUGGUUGGUUGGUUGG")
]
for name, seq in sequences_to_analyze:
result = analyzer.analyze_sequence(seq, name)
print(f"{name}: ENC={result['ENC']}, CAI={result.get('CAI', 'N/A')}")
def single_seq_analysis(test_sequence,name,codon_usage_path):
# 示例参考序列(高表达基因)
# 创建分析器
analyzer = CodonUsageAnalyzer(codon_usage_path)
result = analyzer.analyze_sequence(test_sequence, name)
return result
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument("--file_path",default="./ribo_input.csv")
# parser.add_argument("--file_path",default="./ribo_input.csv")
parser.add_argument("--codon_usage_path",default="./dataset/data/codon_table/codon_usage_Escherichia_coli.csv")
parser.add_argument("--output_path",default="./ribo_output.csv")
args = parser.parse_args()
output_path = args.output_path
# example_usage()
# 示例参考序列(高表达基因)
# organism_lt = [[v[0],v[1]] for k, v in species_dt.items()]
tmp_df = pd.DataFrame(columns=["Sequence_Name","organism","Sequence_Length","ENC","ENC_Preference","CAI","CAI_Level","CAI_head","GC","GC_head"])
# file_path = "./ribo_output.csv"
file_path = args.file_path
df = pd.read_csv(file_path)
columns = tmp_df.columns
# for col in columns:
# if col not in df.columns:
# df[col] = [None] * len(df)
final_df = pd.DataFrame(columns=(["_id","RefSeq_aa"]+list(tmp_df.columns)))
# final_df = pd.DataFrame(columns=tmp_df.columns)
for idx, row in df.iterrows():
ori_gene_name = row["_id"]
AA_seq = row['RefSeq_aa']
col = "CDS"
seq = row[col]
gene_name = str(ori_gene_name)+f"_{col}"
organism = row["organism"]
codon_usage_path = args.codon_usage_path
print(codon_usage_path)
# 创建分析器
analyzer = CodonUsageAnalyzer(codon_usage_path)
gc_content = round((seq.count("G")+seq.count("C"))/len(seq),3)
gc_head = round((seq[:60].count("G")+seq[:60].count("C"))/len(seq[:60]),3)
result = single_seq_analysis(seq,gene_name,codon_usage_path)
result['GC'] = gc_content
result['GC_head'] = gc_head
result['CAI_head'] = round(analyzer.calculate_CAI(seq[:60]),3)
result['CAI'] = round(analyzer.calculate_CAI(seq[:60]),3)
# tmp_df =
result['_id'] = gene_name
result['RefSeq_aa'] = AA_seq
result['CDS'] = seq
result['organism'] = organism
# 2. 指定前三列顺序,其余按原顺序跟在后面
head_cols = ['_id', 'RefSeq_aa', 'CDS', "organism"]
other_cols = [k for k in result.keys() if k not in head_cols]
ordered_result = {k: result[k] for k in head_cols + other_cols}
# 3. 生成单行 DataFrame
tmp_df = pd.DataFrame({k: [v] for k, v in ordered_result.items()})
final_df = pd.concat([final_df, tmp_df])
print(f"{gene_name}分析结果:")
for key, value in result.items():
print(f" {key}: {value}")
final_df.to_csv(output_path,index=False)
print(f"已保存 → {output_path}")
# with pd.ExcelWriter("ribo_summary.xlsx", engine="openpyxl") as writer:
# df.to_excel(writer, sheet_name="original", index=False) # 第一张
# final_df.to_excel(writer, sheet_name="analysis", index=False) # 第二张
# print("已保存 → ribo_summary.xlsx") |