|
|
import pandas as pd |
|
|
import numpy as np |
|
|
from typing import List, Dict, Tuple, Union |
|
|
from collections import defaultdict |
|
|
import argparse |
|
|
|
|
|
class CodonUsageAnalyzer: |
|
|
""" |
|
|
密码子使用分析器:集成ENC、CAI和RSCU计算 |
|
|
""" |
|
|
|
|
|
def __init__(self, freq_codon_usage_path=None): |
|
|
""" |
|
|
初始化分析器 |
|
|
|
|
|
参数: |
|
|
reference_sequences: 用于计算RSCU参考集的序列列表 |
|
|
""" |
|
|
|
|
|
self.codon_table = { |
|
|
'UUU': 'F', 'UUC': 'F', |
|
|
'UUA': 'L', 'UUG': 'L', 'CUU': 'L', 'CUC': 'L', 'CUA': 'L', 'CUG': 'L', |
|
|
'AUU': 'I', 'AUC': 'I', 'AUA': 'I', |
|
|
'AUG': 'M', |
|
|
'GUU': 'V', 'GUC': 'V', 'GUA': 'V', 'GUG': 'V', |
|
|
'UCU': 'S', 'UCC': 'S', 'UCA': 'S', 'UCG': 'S', 'AGU': 'S', 'AGC': 'S', |
|
|
'CCU': 'P', 'CCC': 'P', 'CCA': 'P', 'CCG': 'P', |
|
|
'ACU': 'T', 'ACC': 'T', 'ACA': 'T', 'ACG': 'T', |
|
|
'GCU': 'A', 'GCC': 'A', 'GCA': 'A', 'GCG': 'A', |
|
|
'UAU': 'Y', 'UAC': 'Y', |
|
|
'UAA': '*', 'UAG': '*', 'UGA': '*', |
|
|
'CAU': 'H', 'CAC': 'H', |
|
|
'CAA': 'Q', 'CAG': 'Q', |
|
|
'AAU': 'N', 'AAC': 'N', |
|
|
'AAA': 'K', 'AAG': 'K', |
|
|
'GAU': 'D', 'GAC': 'D', |
|
|
'GAA': 'E', 'GAG': 'E', |
|
|
'UGU': 'C', 'UGC': 'C', |
|
|
'UGG': 'W', |
|
|
'CGU': 'R', 'CGC': 'R', 'CGA': 'R', 'CGG': 'R', 'AGA': 'R', 'AGG': 'R', |
|
|
'GGU': 'G', 'GGC': 'G', 'GGA': 'G', 'GGG': 'G' |
|
|
} |
|
|
|
|
|
|
|
|
self.degeneracy_groups = { |
|
|
'2-fold': ['F', 'Y', 'C', 'H', 'Q', 'N', 'K', 'D', 'E'], |
|
|
'3-fold': ['I'], |
|
|
'4-fold': ['V', 'P', 'T', 'A', 'G'], |
|
|
'6-fold': ['L', 'S', 'R'] |
|
|
} |
|
|
|
|
|
|
|
|
self.reference_codon_usage = None |
|
|
if freq_codon_usage_path is not None: |
|
|
self.freq_codon_table = {} |
|
|
self.max_aa_table = {} |
|
|
with open(freq_codon_usage_path, 'r') as codon_file: |
|
|
next(codon_file) |
|
|
for line in codon_file: |
|
|
line = line.strip() |
|
|
if not line: |
|
|
continue |
|
|
codon, aa, fraction,*_= line.split(',') |
|
|
fraction = float(fraction) |
|
|
|
|
|
self.freq_codon_table[codon] = (aa, fraction) |
|
|
|
|
|
if aa not in self.max_aa_table or self.max_aa_table[aa] < fraction: |
|
|
self.max_aa_table[aa] = fraction |
|
|
self.reference_codon_usage = self.freq_codon_table |
|
|
|
|
|
|
|
|
def _validate_sequence(self, sequence: str) -> str: |
|
|
"""验证并标准化RNA序列""" |
|
|
sequence = sequence.upper().replace('T', 'U') |
|
|
if len(sequence) % 3 != 0: |
|
|
raise ValueError(f"序列长度必须是3的倍数,当前长度: {len(sequence)}") |
|
|
|
|
|
valid_bases = {'A', 'U', 'C', 'G'} |
|
|
if not all(base in valid_bases for base in sequence): |
|
|
raise ValueError("序列包含无效的碱基字符") |
|
|
|
|
|
return sequence |
|
|
|
|
|
def _count_codons(self, sequence: str) -> Dict[str, int]: |
|
|
"""统计序列中密码子使用次数""" |
|
|
sequence = self._validate_sequence(sequence) |
|
|
codon_count = {} |
|
|
num_codons = len(sequence) // 3 |
|
|
|
|
|
for i in range(num_codons): |
|
|
codon = sequence[i * 3:(i + 1) * 3] |
|
|
if codon in self.codon_table and self.codon_table[codon] != '*': |
|
|
codon_count[codon] = codon_count.get(codon, 0) + 1 |
|
|
|
|
|
return codon_count |
|
|
|
|
|
def calculate_ENC(self, sequence: str) -> float: |
|
|
""" |
|
|
计算单条RNA序列的ENC值 |
|
|
|
|
|
参数: |
|
|
sequence: RNA序列字符串 |
|
|
|
|
|
返回: |
|
|
enc_value: ENC值 |
|
|
""" |
|
|
codon_count = self._count_codons(sequence) |
|
|
|
|
|
|
|
|
amino_acid_counts = {} |
|
|
for codon, aa in self.codon_table.items(): |
|
|
if aa in ['M', 'W'] or aa == '*': |
|
|
continue |
|
|
if aa not in amino_acid_counts: |
|
|
amino_acid_counts[aa] = {} |
|
|
amino_acid_counts[aa][codon] = codon_count.get(codon, 0) |
|
|
|
|
|
|
|
|
F_values = {'2-fold': [], '3-fold': [], '4-fold': [], '6-fold': []} |
|
|
|
|
|
for aa, codon_counts in amino_acid_counts.items(): |
|
|
|
|
|
degeneracy = None |
|
|
for deg, aas in self.degeneracy_groups.items(): |
|
|
if aa in aas: |
|
|
degeneracy = deg |
|
|
break |
|
|
|
|
|
if not degeneracy: |
|
|
continue |
|
|
|
|
|
|
|
|
codons_for_aa = [c for c, a in self.codon_table.items() |
|
|
if a == aa and a not in ['M', 'W'] and a != '*'] |
|
|
s = len(codons_for_aa) |
|
|
|
|
|
|
|
|
n_i_values = [codon_counts.get(codon, 0) for codon in codons_for_aa] |
|
|
total_n = sum(n_i_values) |
|
|
|
|
|
if total_n == 0 or s <= 1: |
|
|
continue |
|
|
|
|
|
|
|
|
sum_squared_freq = sum((n_i / total_n) ** 2 for n_i in n_i_values) |
|
|
F = (s * sum_squared_freq - 1) / (s - 1) |
|
|
|
|
|
F_values[degeneracy].append(F) |
|
|
|
|
|
|
|
|
F2_avg = np.mean(F_values['2-fold']) if F_values['2-fold'] else 1.0 |
|
|
F3_avg = np.mean(F_values['3-fold']) if F_values['3-fold'] else 1.0 |
|
|
F4_avg = np.mean(F_values['4-fold']) if F_values['4-fold'] else 1.0 |
|
|
F6_avg = np.mean(F_values['6-fold']) if F_values['6-fold'] else 1.0 |
|
|
|
|
|
|
|
|
enc_value = 2 + 9 / F2_avg + 1 / F3_avg + 5 / F4_avg + 3 / F6_avg |
|
|
|
|
|
return enc_value |
|
|
|
|
|
def calculate_CAI(self, sequence: str) -> float: |
|
|
""" |
|
|
计算密码子适应指数 (Codon Adaptation Index, CAI) |
|
|
|
|
|
参数: |
|
|
sequence: RNA序列字符串 |
|
|
|
|
|
返回: |
|
|
cai_value: CAI值 (0-1之间) |
|
|
""" |
|
|
if self.reference_codon_usage is None: |
|
|
raise ValueError("请先设置参考序列集") |
|
|
codon_count = self._count_codons(sequence) |
|
|
|
|
|
|
|
|
product = 1.0 |
|
|
total_codons = 0 |
|
|
|
|
|
for codon, count in codon_count.items(): |
|
|
if codon in self.reference_codon_usage: |
|
|
aa,codon_freq = self.reference_codon_usage[codon] |
|
|
max_freq = self.max_aa_table[aa] |
|
|
|
|
|
if max_freq > 0: |
|
|
weight = codon_freq / max_freq |
|
|
product *= (weight ** count) |
|
|
total_codons += count |
|
|
|
|
|
if total_codons == 0: |
|
|
return 0.0 |
|
|
|
|
|
cai_value = product ** (1 / total_codons) |
|
|
return cai_value |
|
|
|
|
|
def calculate_RSCU(self, sequences: List[str]) -> Dict[str, float]: |
|
|
""" |
|
|
计算相对同义密码子使用度 (Relative Synonymous Codon Usage, RSCU) |
|
|
|
|
|
参数: |
|
|
sequences: RNA序列列表 |
|
|
|
|
|
返回: |
|
|
rscu_dict: 每个密码子的RSCU值字典 |
|
|
""" |
|
|
total_codon_count = defaultdict(int) |
|
|
aa_observed_codons = defaultdict(set) |
|
|
|
|
|
|
|
|
for seq in sequences: |
|
|
try: |
|
|
codon_count = self._count_codons(seq) |
|
|
for codon, count in codon_count.items(): |
|
|
aa = self.codon_table[codon] |
|
|
total_codon_count[codon] += count |
|
|
aa_observed_codons[aa].add(codon) |
|
|
except ValueError: |
|
|
continue |
|
|
|
|
|
|
|
|
rscu_dict = {} |
|
|
aa_total_count = defaultdict(int) |
|
|
|
|
|
|
|
|
for codon, count in total_codon_count.items(): |
|
|
aa = self.codon_table[codon] |
|
|
aa_total_count[aa] += count |
|
|
|
|
|
|
|
|
for codon, count in total_codon_count.items(): |
|
|
aa = self.codon_table[codon] |
|
|
if aa_total_count[aa] > 0: |
|
|
|
|
|
synonymous_codons = len([c for c in aa_observed_codons[aa] |
|
|
if self.codon_table[c] == aa]) |
|
|
expected_count = aa_total_count[aa] / synonymous_codons |
|
|
rscu_dict[codon] = count / expected_count if expected_count > 0 else 0.0 |
|
|
else: |
|
|
rscu_dict[codon] = 0.0 |
|
|
|
|
|
return rscu_dict |
|
|
|
|
|
def analyze_sequence(self, sequence: str, sequence_name: str = "") -> Dict: |
|
|
""" |
|
|
综合分析单条序列的密码子使用特征 |
|
|
|
|
|
参数: |
|
|
sequence: RNA序列字符串 |
|
|
sequence_name: 序列名称(可选) |
|
|
|
|
|
返回: |
|
|
包含所有指标的字典 |
|
|
""" |
|
|
try: |
|
|
enc = self.calculate_ENC(sequence) |
|
|
cai = self.calculate_CAI(sequence) if self.reference_codon_usage else None |
|
|
rcsu = self.calculate_RSCU([sequence]) |
|
|
result = { |
|
|
'Sequence_Name': sequence_name, |
|
|
'Sequence_Length': len(sequence), |
|
|
'ENC': round(enc, 3), |
|
|
'ENC_Preference': 'Strong' if enc <= 35 else 'Week', |
|
|
} |
|
|
|
|
|
if cai is not None: |
|
|
result['CAI'] = round(cai, 3) |
|
|
result['CAI_Level'] = 'High' if cai > 0.7 else 'Low' |
|
|
|
|
|
return result |
|
|
|
|
|
except Exception as e: |
|
|
return { |
|
|
'Sequence_Name': sequence_name, |
|
|
'Sequence_Length': len(sequence), |
|
|
'ENC': None, |
|
|
'CAI': None, |
|
|
'Error': str(e) |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
def example_usage(): |
|
|
"""使用示例""" |
|
|
|
|
|
|
|
|
codon_usage_path = "./data/codon_table/codon_usage_Escherichia_coli.csv" |
|
|
|
|
|
|
|
|
analyzer = CodonUsageAnalyzer(codon_usage_path) |
|
|
|
|
|
|
|
|
test_sequence = "AUGGCUUCUUUUCUCGUAUACACAGAUGACUACGUAGCAGCUACGUACGUACGUACG" |
|
|
|
|
|
|
|
|
result = analyzer.analyze_sequence(test_sequence, "Test_Gene") |
|
|
print("单序列分析结果:") |
|
|
for key, value in result.items(): |
|
|
print(f" {key}: {value}") |
|
|
|
|
|
|
|
|
test_sequences = [ |
|
|
"AUGGCUUCUUUUUUCUUCUUCUUCUUCUUCUUCCUCCUCCUCCUCCUCCUCCUCCUC", |
|
|
"AUGGCUUCUUUUCUCGUAUACACAGAUGACUACGUAGCAGCUACGUACGUACGUACG", |
|
|
"AUGGUUUGUUGGUUGGUUGGUUGGUUGGUUGGUUGGUUGGUUGGUUGGUUGGUUGG" |
|
|
] |
|
|
|
|
|
rscu_results = analyzer.calculate_RSCU(test_sequences) |
|
|
print(f"\nRSCU结果 (前10个密码子):") |
|
|
for i, (codon, rscu) in enumerate(list(rscu_results.items())[:10]): |
|
|
print(f" {codon}: {rscu:.3f}") |
|
|
|
|
|
|
|
|
print(f"\n批量分析示例:") |
|
|
sequences_to_analyze = [ |
|
|
("Gene1", "AUGGCUUCUUUUUUCUUCUUCUUCUUCUUCUUCCUCCUCCUCCUCCUCCUCCUCCUC"), |
|
|
("Gene2", "AUGGCUUCUUUUCUCGUAUACACAGAUGACUACGUAGCAGCUACGUACGUACGUACG"), |
|
|
("Gene3", "AUGGUUUGUUGGUUGGUUGGUUGGUUGGUUGGUUGGUUGGUUGGUUGGUUGGUUGG") |
|
|
] |
|
|
|
|
|
for name, seq in sequences_to_analyze: |
|
|
result = analyzer.analyze_sequence(seq, name) |
|
|
print(f"{name}: ENC={result['ENC']}, CAI={result.get('CAI', 'N/A')}") |
|
|
|
|
|
def single_seq_analysis(test_sequence,name,codon_usage_path): |
|
|
|
|
|
|
|
|
|
|
|
analyzer = CodonUsageAnalyzer(codon_usage_path) |
|
|
result = analyzer.analyze_sequence(test_sequence, name) |
|
|
return result |
|
|
|
|
|
|
|
|
|
|
|
if __name__ == '__main__': |
|
|
parser = argparse.ArgumentParser() |
|
|
parser.add_argument("--file_path",default="./ribo_input.csv") |
|
|
|
|
|
parser.add_argument("--codon_usage_path",default="./dataset/data/codon_table/codon_usage_Escherichia_coli.csv") |
|
|
parser.add_argument("--output_path",default="./ribo_output.csv") |
|
|
args = parser.parse_args() |
|
|
output_path = args.output_path |
|
|
|
|
|
|
|
|
|
|
|
tmp_df = pd.DataFrame(columns=["Sequence_Name","organism","Sequence_Length","ENC","ENC_Preference","CAI","CAI_Level","CAI_head","GC","GC_head"]) |
|
|
|
|
|
file_path = args.file_path |
|
|
df = pd.read_csv(file_path) |
|
|
columns = tmp_df.columns |
|
|
|
|
|
|
|
|
|
|
|
final_df = pd.DataFrame(columns=(["_id","RefSeq_aa"]+list(tmp_df.columns))) |
|
|
|
|
|
for idx, row in df.iterrows(): |
|
|
ori_gene_name = row["_id"] |
|
|
AA_seq = row['RefSeq_aa'] |
|
|
col = "CDS" |
|
|
seq = row[col] |
|
|
gene_name = str(ori_gene_name)+f"_{col}" |
|
|
organism = row["organism"] |
|
|
codon_usage_path = args.codon_usage_path |
|
|
print(codon_usage_path) |
|
|
|
|
|
analyzer = CodonUsageAnalyzer(codon_usage_path) |
|
|
gc_content = round((seq.count("G")+seq.count("C"))/len(seq),3) |
|
|
gc_head = round((seq[:60].count("G")+seq[:60].count("C"))/len(seq[:60]),3) |
|
|
result = single_seq_analysis(seq,gene_name,codon_usage_path) |
|
|
result['GC'] = gc_content |
|
|
result['GC_head'] = gc_head |
|
|
result['CAI_head'] = round(analyzer.calculate_CAI(seq[:60]),3) |
|
|
result['CAI'] = round(analyzer.calculate_CAI(seq[:60]),3) |
|
|
|
|
|
result['_id'] = gene_name |
|
|
result['RefSeq_aa'] = AA_seq |
|
|
result['CDS'] = seq |
|
|
result['organism'] = organism |
|
|
|
|
|
|
|
|
head_cols = ['_id', 'RefSeq_aa', 'CDS', "organism"] |
|
|
other_cols = [k for k in result.keys() if k not in head_cols] |
|
|
ordered_result = {k: result[k] for k in head_cols + other_cols} |
|
|
|
|
|
|
|
|
tmp_df = pd.DataFrame({k: [v] for k, v in ordered_result.items()}) |
|
|
final_df = pd.concat([final_df, tmp_df]) |
|
|
print(f"{gene_name}分析结果:") |
|
|
for key, value in result.items(): |
|
|
print(f" {key}: {value}") |
|
|
final_df.to_csv(output_path,index=False) |
|
|
print(f"已保存 → {output_path}") |
|
|
|
|
|
|
|
|
|
|
|
|