maotao / eval_sequence.py
julse's picture
upload AA2CDS
4707555 verified
import pandas as pd
import numpy as np
from typing import List, Dict, Tuple, Union
from collections import defaultdict
import argparse
class CodonUsageAnalyzer:
"""
密码子使用分析器:集成ENC、CAI和RSCU计算
"""
def __init__(self, freq_codon_usage_path=None):
"""
初始化分析器
参数:
reference_sequences: 用于计算RSCU参考集的序列列表
"""
# RNA密码子表
self.codon_table = {
'UUU': 'F', 'UUC': 'F', # Phe (2-fold)
'UUA': 'L', 'UUG': 'L', 'CUU': 'L', 'CUC': 'L', 'CUA': 'L', 'CUG': 'L', # Leu (6-fold)
'AUU': 'I', 'AUC': 'I', 'AUA': 'I', # Ile (3-fold)
'AUG': 'M', # Met (无同义密码子,排除)
'GUU': 'V', 'GUC': 'V', 'GUA': 'V', 'GUG': 'V', # Val (4-fold)
'UCU': 'S', 'UCC': 'S', 'UCA': 'S', 'UCG': 'S', 'AGU': 'S', 'AGC': 'S', # Ser (6-fold)
'CCU': 'P', 'CCC': 'P', 'CCA': 'P', 'CCG': 'P', # Pro (4-fold)
'ACU': 'T', 'ACC': 'T', 'ACA': 'T', 'ACG': 'T', # Thr (4-fold)
'GCU': 'A', 'GCC': 'A', 'GCA': 'A', 'GCG': 'A', # Ala (4-fold)
'UAU': 'Y', 'UAC': 'Y', # Tyr (2-fold)
'UAA': '*', 'UAG': '*', 'UGA': '*', # 终止密码子 (排除)
'CAU': 'H', 'CAC': 'H', # His (2-fold)
'CAA': 'Q', 'CAG': 'Q', # Gln (2-fold)
'AAU': 'N', 'AAC': 'N', # Asn (2-fold)
'AAA': 'K', 'AAG': 'K', # Lys (2-fold)
'GAU': 'D', 'GAC': 'D', # Asp (2-fold)
'GAA': 'E', 'GAG': 'E', # Glu (2-fold)
'UGU': 'C', 'UGC': 'C', # Cys (2-fold)
'UGG': 'W', # Trp (无同义密码子,排除)
'CGU': 'R', 'CGC': 'R', 'CGA': 'R', 'CGG': 'R', 'AGA': 'R', 'AGG': 'R', # Arg (6-fold)
'GGU': 'G', 'GGC': 'G', 'GGA': 'G', 'GGG': 'G' # Gly (4-fold)
}
# 按简并度预分组氨基酸
self.degeneracy_groups = {
'2-fold': ['F', 'Y', 'C', 'H', 'Q', 'N', 'K', 'D', 'E'],
'3-fold': ['I'],
'4-fold': ['V', 'P', 'T', 'A', 'G'],
'6-fold': ['L', 'S', 'R']
}
# 初始化参考密码子使用表(用于CAI和RSCU)
self.reference_codon_usage = None
if freq_codon_usage_path is not None:
self.freq_codon_table = {}
self.max_aa_table = {}
with open(freq_codon_usage_path, 'r') as codon_file:
next(codon_file) # Skip the header line
for line in codon_file:
line = line.strip()
if not line:
continue
codon, aa, fraction,*_= line.split(',')
fraction = float(fraction)
self.freq_codon_table[codon] = (aa, fraction)
if aa not in self.max_aa_table or self.max_aa_table[aa] < fraction:
self.max_aa_table[aa] = fraction
self.reference_codon_usage = self.freq_codon_table
def _validate_sequence(self, sequence: str) -> str:
"""验证并标准化RNA序列"""
sequence = sequence.upper().replace('T', 'U')
if len(sequence) % 3 != 0:
raise ValueError(f"序列长度必须是3的倍数,当前长度: {len(sequence)}")
valid_bases = {'A', 'U', 'C', 'G'}
if not all(base in valid_bases for base in sequence):
raise ValueError("序列包含无效的碱基字符")
return sequence
def _count_codons(self, sequence: str) -> Dict[str, int]:
"""统计序列中密码子使用次数"""
sequence = self._validate_sequence(sequence)
codon_count = {}
num_codons = len(sequence) // 3
for i in range(num_codons):
codon = sequence[i * 3:(i + 1) * 3]
if codon in self.codon_table and self.codon_table[codon] != '*':
codon_count[codon] = codon_count.get(codon, 0) + 1
return codon_count
def calculate_ENC(self, sequence: str) -> float:
"""
计算单条RNA序列的ENC值
参数:
sequence: RNA序列字符串
返回:
enc_value: ENC值
"""
codon_count = self._count_codons(sequence)
# 按氨基酸分组
amino_acid_counts = {}
for codon, aa in self.codon_table.items():
if aa in ['M', 'W'] or aa == '*':
continue
if aa not in amino_acid_counts:
amino_acid_counts[aa] = {}
amino_acid_counts[aa][codon] = codon_count.get(codon, 0)
# 计算每个氨基酸组的F值
F_values = {'2-fold': [], '3-fold': [], '4-fold': [], '6-fold': []}
for aa, codon_counts in amino_acid_counts.items():
# 确定简并度
degeneracy = None
for deg, aas in self.degeneracy_groups.items():
if aa in aas:
degeneracy = deg
break
if not degeneracy:
continue
# 获取该氨基酸的所有同义密码子
codons_for_aa = [c for c, a in self.codon_table.items()
if a == aa and a not in ['M', 'W'] and a != '*']
s = len(codons_for_aa)
# 统计使用次数
n_i_values = [codon_counts.get(codon, 0) for codon in codons_for_aa]
total_n = sum(n_i_values)
if total_n == 0 or s <= 1:
continue
# 计算F值
sum_squared_freq = sum((n_i / total_n) ** 2 for n_i in n_i_values)
F = (s * sum_squared_freq - 1) / (s - 1)
F_values[degeneracy].append(F)
# 计算各简并度的平均F值
F2_avg = np.mean(F_values['2-fold']) if F_values['2-fold'] else 1.0
F3_avg = np.mean(F_values['3-fold']) if F_values['3-fold'] else 1.0
F4_avg = np.mean(F_values['4-fold']) if F_values['4-fold'] else 1.0
F6_avg = np.mean(F_values['6-fold']) if F_values['6-fold'] else 1.0
# 计算ENC值
enc_value = 2 + 9 / F2_avg + 1 / F3_avg + 5 / F4_avg + 3 / F6_avg
return enc_value
def calculate_CAI(self, sequence: str) -> float:
"""
计算密码子适应指数 (Codon Adaptation Index, CAI)
参数:
sequence: RNA序列字符串
返回:
cai_value: CAI值 (0-1之间)
"""
if self.reference_codon_usage is None:
raise ValueError("请先设置参考序列集")
codon_count = self._count_codons(sequence)
# 计算几何平均数
product = 1.0
total_codons = 0
for codon, count in codon_count.items():
if codon in self.reference_codon_usage:
aa,codon_freq = self.reference_codon_usage[codon]
max_freq = self.max_aa_table[aa]
if max_freq > 0:
weight = codon_freq / max_freq # 相对适应性权重
product *= (weight ** count)
total_codons += count
if total_codons == 0:
return 0.0
cai_value = product ** (1 / total_codons)
return cai_value
def calculate_RSCU(self, sequences: List[str]) -> Dict[str, float]:
"""
计算相对同义密码子使用度 (Relative Synonymous Codon Usage, RSCU)
参数:
sequences: RNA序列列表
返回:
rscu_dict: 每个密码子的RSCU值字典
"""
total_codon_count = defaultdict(int)
aa_observed_codons = defaultdict(set)
# 统计所有序列的密码子使用
for seq in sequences:
try:
codon_count = self._count_codons(seq)
for codon, count in codon_count.items():
aa = self.codon_table[codon]
total_codon_count[codon] += count ## 每个密码子的使用次数
aa_observed_codons[aa].add(codon)
except ValueError:
continue # 跳过无效序列
# 计算RSCU
rscu_dict = {}
aa_total_count = defaultdict(int)
# 首先计算每个氨基酸的总密码子数
for codon, count in total_codon_count.items():
aa = self.codon_table[codon]
aa_total_count[aa] += count
# 然后计算每个密码子的RSCU
for codon, count in total_codon_count.items():
aa = self.codon_table[codon]
if aa_total_count[aa] > 0:
# 该氨基酸的同义密码子数量
synonymous_codons = len([c for c in aa_observed_codons[aa]
if self.codon_table[c] == aa])
expected_count = aa_total_count[aa] / synonymous_codons
rscu_dict[codon] = count / expected_count if expected_count > 0 else 0.0
else:
rscu_dict[codon] = 0.0
return rscu_dict
def analyze_sequence(self, sequence: str, sequence_name: str = "") -> Dict:
"""
综合分析单条序列的密码子使用特征
参数:
sequence: RNA序列字符串
sequence_name: 序列名称(可选)
返回:
包含所有指标的字典
"""
try:
enc = self.calculate_ENC(sequence)
cai = self.calculate_CAI(sequence) if self.reference_codon_usage else None
rcsu = self.calculate_RSCU([sequence])
result = {
'Sequence_Name': sequence_name,
'Sequence_Length': len(sequence),
'ENC': round(enc, 3),
'ENC_Preference': 'Strong' if enc <= 35 else 'Week',
}
if cai is not None:
result['CAI'] = round(cai, 3)
result['CAI_Level'] = 'High' if cai > 0.7 else 'Low'
return result
except Exception as e:
return {
'Sequence_Name': sequence_name,
'Sequence_Length': len(sequence),
'ENC': None,
'CAI': None,
'Error': str(e)
}
# 使用示例
def example_usage():
"""使用示例"""
# 示例参考序列(高表达基因)
codon_usage_path = "./data/codon_table/codon_usage_Escherichia_coli.csv"
# 创建分析器
analyzer = CodonUsageAnalyzer(codon_usage_path)
# 测试序列
test_sequence = "AUGGCUUCUUUUCUCGUAUACACAGAUGACUACGUAGCAGCUACGUACGUACGUACG"
# 计算单序列的ENC和CAI
result = analyzer.analyze_sequence(test_sequence, "Test_Gene")
print("单序列分析结果:")
for key, value in result.items():
print(f" {key}: {value}")
# 计算RSCU(需要多条序列)
test_sequences = [
"AUGGCUUCUUUUUUCUUCUUCUUCUUCUUCUUCCUCCUCCUCCUCCUCCUCCUCCUC",
"AUGGCUUCUUUUCUCGUAUACACAGAUGACUACGUAGCAGCUACGUACGUACGUACG",
"AUGGUUUGUUGGUUGGUUGGUUGGUUGGUUGGUUGGUUGGUUGGUUGGUUGGUUGG"
]
rscu_results = analyzer.calculate_RSCU(test_sequences)
print(f"\nRSCU结果 (前10个密码子):")
for i, (codon, rscu) in enumerate(list(rscu_results.items())[:10]):
print(f" {codon}: {rscu:.3f}")
# 批量分析示例
print(f"\n批量分析示例:")
sequences_to_analyze = [
("Gene1", "AUGGCUUCUUUUUUCUUCUUCUUCUUCUUCUUCCUCCUCCUCCUCCUCCUCCUCCUC"),
("Gene2", "AUGGCUUCUUUUCUCGUAUACACAGAUGACUACGUAGCAGCUACGUACGUACGUACG"),
("Gene3", "AUGGUUUGUUGGUUGGUUGGUUGGUUGGUUGGUUGGUUGGUUGGUUGGUUGGUUGG")
]
for name, seq in sequences_to_analyze:
result = analyzer.analyze_sequence(seq, name)
print(f"{name}: ENC={result['ENC']}, CAI={result.get('CAI', 'N/A')}")
def single_seq_analysis(test_sequence,name,codon_usage_path):
# 示例参考序列(高表达基因)
# 创建分析器
analyzer = CodonUsageAnalyzer(codon_usage_path)
result = analyzer.analyze_sequence(test_sequence, name)
return result
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument("--file_path",default="./ribo_input.csv")
# parser.add_argument("--file_path",default="./ribo_input.csv")
parser.add_argument("--codon_usage_path",default="./dataset/data/codon_table/codon_usage_Escherichia_coli.csv")
parser.add_argument("--output_path",default="./ribo_output.csv")
args = parser.parse_args()
output_path = args.output_path
# example_usage()
# 示例参考序列(高表达基因)
# organism_lt = [[v[0],v[1]] for k, v in species_dt.items()]
tmp_df = pd.DataFrame(columns=["Sequence_Name","organism","Sequence_Length","ENC","ENC_Preference","CAI","CAI_Level","CAI_head","GC","GC_head"])
# file_path = "./ribo_output.csv"
file_path = args.file_path
df = pd.read_csv(file_path)
columns = tmp_df.columns
# for col in columns:
# if col not in df.columns:
# df[col] = [None] * len(df)
final_df = pd.DataFrame(columns=(["_id","RefSeq_aa"]+list(tmp_df.columns)))
# final_df = pd.DataFrame(columns=tmp_df.columns)
for idx, row in df.iterrows():
ori_gene_name = row["_id"]
AA_seq = row['RefSeq_aa']
col = "CDS"
seq = row[col]
gene_name = str(ori_gene_name)+f"_{col}"
organism = row["organism"]
codon_usage_path = args.codon_usage_path
print(codon_usage_path)
# 创建分析器
analyzer = CodonUsageAnalyzer(codon_usage_path)
gc_content = round((seq.count("G")+seq.count("C"))/len(seq),3)
gc_head = round((seq[:60].count("G")+seq[:60].count("C"))/len(seq[:60]),3)
result = single_seq_analysis(seq,gene_name,codon_usage_path)
result['GC'] = gc_content
result['GC_head'] = gc_head
result['CAI_head'] = round(analyzer.calculate_CAI(seq[:60]),3)
result['CAI'] = round(analyzer.calculate_CAI(seq[:60]),3)
# tmp_df =
result['_id'] = gene_name
result['RefSeq_aa'] = AA_seq
result['CDS'] = seq
result['organism'] = organism
# 2. 指定前三列顺序,其余按原顺序跟在后面
head_cols = ['_id', 'RefSeq_aa', 'CDS', "organism"]
other_cols = [k for k in result.keys() if k not in head_cols]
ordered_result = {k: result[k] for k in head_cols + other_cols}
# 3. 生成单行 DataFrame
tmp_df = pd.DataFrame({k: [v] for k, v in ordered_result.items()})
final_df = pd.concat([final_df, tmp_df])
print(f"{gene_name}分析结果:")
for key, value in result.items():
print(f" {key}: {value}")
final_df.to_csv(output_path,index=False)
print(f"已保存 → {output_path}")
# with pd.ExcelWriter("ribo_summary.xlsx", engine="openpyxl") as writer:
# df.to_excel(writer, sheet_name="original", index=False) # 第一张
# final_df.to_excel(writer, sheet_name="analysis", index=False) # 第二张
# print("已保存 → ribo_summary.xlsx")