import pandas as pd import numpy as np from typing import List, Dict, Tuple, Union from collections import defaultdict import argparse class CodonUsageAnalyzer: """ 密码子使用分析器:集成ENC、CAI和RSCU计算 """ def __init__(self, freq_codon_usage_path=None): """ 初始化分析器 参数: reference_sequences: 用于计算RSCU参考集的序列列表 """ # RNA密码子表 self.codon_table = { 'UUU': 'F', 'UUC': 'F', # Phe (2-fold) 'UUA': 'L', 'UUG': 'L', 'CUU': 'L', 'CUC': 'L', 'CUA': 'L', 'CUG': 'L', # Leu (6-fold) 'AUU': 'I', 'AUC': 'I', 'AUA': 'I', # Ile (3-fold) 'AUG': 'M', # Met (无同义密码子,排除) 'GUU': 'V', 'GUC': 'V', 'GUA': 'V', 'GUG': 'V', # Val (4-fold) 'UCU': 'S', 'UCC': 'S', 'UCA': 'S', 'UCG': 'S', 'AGU': 'S', 'AGC': 'S', # Ser (6-fold) 'CCU': 'P', 'CCC': 'P', 'CCA': 'P', 'CCG': 'P', # Pro (4-fold) 'ACU': 'T', 'ACC': 'T', 'ACA': 'T', 'ACG': 'T', # Thr (4-fold) 'GCU': 'A', 'GCC': 'A', 'GCA': 'A', 'GCG': 'A', # Ala (4-fold) 'UAU': 'Y', 'UAC': 'Y', # Tyr (2-fold) 'UAA': '*', 'UAG': '*', 'UGA': '*', # 终止密码子 (排除) 'CAU': 'H', 'CAC': 'H', # His (2-fold) 'CAA': 'Q', 'CAG': 'Q', # Gln (2-fold) 'AAU': 'N', 'AAC': 'N', # Asn (2-fold) 'AAA': 'K', 'AAG': 'K', # Lys (2-fold) 'GAU': 'D', 'GAC': 'D', # Asp (2-fold) 'GAA': 'E', 'GAG': 'E', # Glu (2-fold) 'UGU': 'C', 'UGC': 'C', # Cys (2-fold) 'UGG': 'W', # Trp (无同义密码子,排除) 'CGU': 'R', 'CGC': 'R', 'CGA': 'R', 'CGG': 'R', 'AGA': 'R', 'AGG': 'R', # Arg (6-fold) 'GGU': 'G', 'GGC': 'G', 'GGA': 'G', 'GGG': 'G' # Gly (4-fold) } # 按简并度预分组氨基酸 self.degeneracy_groups = { '2-fold': ['F', 'Y', 'C', 'H', 'Q', 'N', 'K', 'D', 'E'], '3-fold': ['I'], '4-fold': ['V', 'P', 'T', 'A', 'G'], '6-fold': ['L', 'S', 'R'] } # 初始化参考密码子使用表(用于CAI和RSCU) self.reference_codon_usage = None if freq_codon_usage_path is not None: self.freq_codon_table = {} self.max_aa_table = {} with open(freq_codon_usage_path, 'r') as codon_file: next(codon_file) # Skip the header line for line in codon_file: line = line.strip() if not line: continue codon, aa, fraction,*_= line.split(',') fraction = float(fraction) self.freq_codon_table[codon] = (aa, fraction) if aa not in self.max_aa_table or self.max_aa_table[aa] < fraction: self.max_aa_table[aa] = fraction self.reference_codon_usage = self.freq_codon_table def _validate_sequence(self, sequence: str) -> str: """验证并标准化RNA序列""" sequence = sequence.upper().replace('T', 'U') if len(sequence) % 3 != 0: raise ValueError(f"序列长度必须是3的倍数,当前长度: {len(sequence)}") valid_bases = {'A', 'U', 'C', 'G'} if not all(base in valid_bases for base in sequence): raise ValueError("序列包含无效的碱基字符") return sequence def _count_codons(self, sequence: str) -> Dict[str, int]: """统计序列中密码子使用次数""" sequence = self._validate_sequence(sequence) codon_count = {} num_codons = len(sequence) // 3 for i in range(num_codons): codon = sequence[i * 3:(i + 1) * 3] if codon in self.codon_table and self.codon_table[codon] != '*': codon_count[codon] = codon_count.get(codon, 0) + 1 return codon_count def calculate_ENC(self, sequence: str) -> float: """ 计算单条RNA序列的ENC值 参数: sequence: RNA序列字符串 返回: enc_value: ENC值 """ codon_count = self._count_codons(sequence) # 按氨基酸分组 amino_acid_counts = {} for codon, aa in self.codon_table.items(): if aa in ['M', 'W'] or aa == '*': continue if aa not in amino_acid_counts: amino_acid_counts[aa] = {} amino_acid_counts[aa][codon] = codon_count.get(codon, 0) # 计算每个氨基酸组的F值 F_values = {'2-fold': [], '3-fold': [], '4-fold': [], '6-fold': []} for aa, codon_counts in amino_acid_counts.items(): # 确定简并度 degeneracy = None for deg, aas in self.degeneracy_groups.items(): if aa in aas: degeneracy = deg break if not degeneracy: continue # 获取该氨基酸的所有同义密码子 codons_for_aa = [c for c, a in self.codon_table.items() if a == aa and a not in ['M', 'W'] and a != '*'] s = len(codons_for_aa) # 统计使用次数 n_i_values = [codon_counts.get(codon, 0) for codon in codons_for_aa] total_n = sum(n_i_values) if total_n == 0 or s <= 1: continue # 计算F值 sum_squared_freq = sum((n_i / total_n) ** 2 for n_i in n_i_values) F = (s * sum_squared_freq - 1) / (s - 1) F_values[degeneracy].append(F) # 计算各简并度的平均F值 F2_avg = np.mean(F_values['2-fold']) if F_values['2-fold'] else 1.0 F3_avg = np.mean(F_values['3-fold']) if F_values['3-fold'] else 1.0 F4_avg = np.mean(F_values['4-fold']) if F_values['4-fold'] else 1.0 F6_avg = np.mean(F_values['6-fold']) if F_values['6-fold'] else 1.0 # 计算ENC值 enc_value = 2 + 9 / F2_avg + 1 / F3_avg + 5 / F4_avg + 3 / F6_avg return enc_value def calculate_CAI(self, sequence: str) -> float: """ 计算密码子适应指数 (Codon Adaptation Index, CAI) 参数: sequence: RNA序列字符串 返回: cai_value: CAI值 (0-1之间) """ if self.reference_codon_usage is None: raise ValueError("请先设置参考序列集") codon_count = self._count_codons(sequence) # 计算几何平均数 product = 1.0 total_codons = 0 for codon, count in codon_count.items(): if codon in self.reference_codon_usage: aa,codon_freq = self.reference_codon_usage[codon] max_freq = self.max_aa_table[aa] if max_freq > 0: weight = codon_freq / max_freq # 相对适应性权重 product *= (weight ** count) total_codons += count if total_codons == 0: return 0.0 cai_value = product ** (1 / total_codons) return cai_value def calculate_RSCU(self, sequences: List[str]) -> Dict[str, float]: """ 计算相对同义密码子使用度 (Relative Synonymous Codon Usage, RSCU) 参数: sequences: RNA序列列表 返回: rscu_dict: 每个密码子的RSCU值字典 """ total_codon_count = defaultdict(int) aa_observed_codons = defaultdict(set) # 统计所有序列的密码子使用 for seq in sequences: try: codon_count = self._count_codons(seq) for codon, count in codon_count.items(): aa = self.codon_table[codon] total_codon_count[codon] += count ## 每个密码子的使用次数 aa_observed_codons[aa].add(codon) except ValueError: continue # 跳过无效序列 # 计算RSCU rscu_dict = {} aa_total_count = defaultdict(int) # 首先计算每个氨基酸的总密码子数 for codon, count in total_codon_count.items(): aa = self.codon_table[codon] aa_total_count[aa] += count # 然后计算每个密码子的RSCU for codon, count in total_codon_count.items(): aa = self.codon_table[codon] if aa_total_count[aa] > 0: # 该氨基酸的同义密码子数量 synonymous_codons = len([c for c in aa_observed_codons[aa] if self.codon_table[c] == aa]) expected_count = aa_total_count[aa] / synonymous_codons rscu_dict[codon] = count / expected_count if expected_count > 0 else 0.0 else: rscu_dict[codon] = 0.0 return rscu_dict def analyze_sequence(self, sequence: str, sequence_name: str = "") -> Dict: """ 综合分析单条序列的密码子使用特征 参数: sequence: RNA序列字符串 sequence_name: 序列名称(可选) 返回: 包含所有指标的字典 """ try: enc = self.calculate_ENC(sequence) cai = self.calculate_CAI(sequence) if self.reference_codon_usage else None rcsu = self.calculate_RSCU([sequence]) result = { 'Sequence_Name': sequence_name, 'Sequence_Length': len(sequence), 'ENC': round(enc, 3), 'ENC_Preference': 'Strong' if enc <= 35 else 'Week', } if cai is not None: result['CAI'] = round(cai, 3) result['CAI_Level'] = 'High' if cai > 0.7 else 'Low' return result except Exception as e: return { 'Sequence_Name': sequence_name, 'Sequence_Length': len(sequence), 'ENC': None, 'CAI': None, 'Error': str(e) } # 使用示例 def example_usage(): """使用示例""" # 示例参考序列(高表达基因) codon_usage_path = "./data/codon_table/codon_usage_Escherichia_coli.csv" # 创建分析器 analyzer = CodonUsageAnalyzer(codon_usage_path) # 测试序列 test_sequence = "AUGGCUUCUUUUCUCGUAUACACAGAUGACUACGUAGCAGCUACGUACGUACGUACG" # 计算单序列的ENC和CAI result = analyzer.analyze_sequence(test_sequence, "Test_Gene") print("单序列分析结果:") for key, value in result.items(): print(f" {key}: {value}") # 计算RSCU(需要多条序列) test_sequences = [ "AUGGCUUCUUUUUUCUUCUUCUUCUUCUUCUUCCUCCUCCUCCUCCUCCUCCUCCUC", "AUGGCUUCUUUUCUCGUAUACACAGAUGACUACGUAGCAGCUACGUACGUACGUACG", "AUGGUUUGUUGGUUGGUUGGUUGGUUGGUUGGUUGGUUGGUUGGUUGGUUGGUUGG" ] rscu_results = analyzer.calculate_RSCU(test_sequences) print(f"\nRSCU结果 (前10个密码子):") for i, (codon, rscu) in enumerate(list(rscu_results.items())[:10]): print(f" {codon}: {rscu:.3f}") # 批量分析示例 print(f"\n批量分析示例:") sequences_to_analyze = [ ("Gene1", "AUGGCUUCUUUUUUCUUCUUCUUCUUCUUCUUCCUCCUCCUCCUCCUCCUCCUCCUC"), ("Gene2", "AUGGCUUCUUUUCUCGUAUACACAGAUGACUACGUAGCAGCUACGUACGUACGUACG"), ("Gene3", "AUGGUUUGUUGGUUGGUUGGUUGGUUGGUUGGUUGGUUGGUUGGUUGGUUGGUUGG") ] for name, seq in sequences_to_analyze: result = analyzer.analyze_sequence(seq, name) print(f"{name}: ENC={result['ENC']}, CAI={result.get('CAI', 'N/A')}") def single_seq_analysis(test_sequence,name,codon_usage_path): # 示例参考序列(高表达基因) # 创建分析器 analyzer = CodonUsageAnalyzer(codon_usage_path) result = analyzer.analyze_sequence(test_sequence, name) return result if __name__ == '__main__': parser = argparse.ArgumentParser() parser.add_argument("--file_path",default="./ribo_input.csv") # parser.add_argument("--file_path",default="./ribo_input.csv") parser.add_argument("--codon_usage_path",default="./dataset/data/codon_table/codon_usage_Escherichia_coli.csv") parser.add_argument("--output_path",default="./ribo_output.csv") args = parser.parse_args() output_path = args.output_path # example_usage() # 示例参考序列(高表达基因) # organism_lt = [[v[0],v[1]] for k, v in species_dt.items()] tmp_df = pd.DataFrame(columns=["Sequence_Name","organism","Sequence_Length","ENC","ENC_Preference","CAI","CAI_Level","CAI_head","GC","GC_head"]) # file_path = "./ribo_output.csv" file_path = args.file_path df = pd.read_csv(file_path) columns = tmp_df.columns # for col in columns: # if col not in df.columns: # df[col] = [None] * len(df) final_df = pd.DataFrame(columns=(["_id","RefSeq_aa"]+list(tmp_df.columns))) # final_df = pd.DataFrame(columns=tmp_df.columns) for idx, row in df.iterrows(): ori_gene_name = row["_id"] AA_seq = row['RefSeq_aa'] col = "CDS" seq = row[col] gene_name = str(ori_gene_name)+f"_{col}" organism = row["organism"] codon_usage_path = args.codon_usage_path print(codon_usage_path) # 创建分析器 analyzer = CodonUsageAnalyzer(codon_usage_path) gc_content = round((seq.count("G")+seq.count("C"))/len(seq),3) gc_head = round((seq[:60].count("G")+seq[:60].count("C"))/len(seq[:60]),3) result = single_seq_analysis(seq,gene_name,codon_usage_path) result['GC'] = gc_content result['GC_head'] = gc_head result['CAI_head'] = round(analyzer.calculate_CAI(seq[:60]),3) result['CAI'] = round(analyzer.calculate_CAI(seq[:60]),3) # tmp_df = result['_id'] = gene_name result['RefSeq_aa'] = AA_seq result['CDS'] = seq result['organism'] = organism # 2. 指定前三列顺序,其余按原顺序跟在后面 head_cols = ['_id', 'RefSeq_aa', 'CDS', "organism"] other_cols = [k for k in result.keys() if k not in head_cols] ordered_result = {k: result[k] for k in head_cols + other_cols} # 3. 生成单行 DataFrame tmp_df = pd.DataFrame({k: [v] for k, v in ordered_result.items()}) final_df = pd.concat([final_df, tmp_df]) print(f"{gene_name}分析结果:") for key, value in result.items(): print(f" {key}: {value}") final_df.to_csv(output_path,index=False) print(f"已保存 → {output_path}") # with pd.ExcelWriter("ribo_summary.xlsx", engine="openpyxl") as writer: # df.to_excel(writer, sheet_name="original", index=False) # 第一张 # final_df.to_excel(writer, sheet_name="analysis", index=False) # 第二张 # print("已保存 → ribo_summary.xlsx")