| from Bio import Entrez, SeqIO |
| from Bio.Seq import Seq |
| import requests |
| from io import StringIO |
| import re |
| from Bio import pairwise2 |
| from Bio.pairwise2 import format_alignment |
|
|
| Entrez.email = "wangliang.f@gmail.com" |
|
|
|
|
| def calculate_similarity(seq1, seq2): |
| """使用局部比对计算两个序列之间的相似度得分""" |
| alignments = pairwise2.align.localxx(seq1, seq2) |
| best_score = max(aln.score for aln in alignments) if alignments else 0 |
| return best_score |
|
|
|
|
|
|
| def extract_first_xref_id(info_string): |
| """ |
| 从给定的字符串中提取 xrefs: 后面的第一个 ID。 |
| |
| 参数: |
| - info_string (str): 包含 UniProtKB 信息的字符串 |
| |
| 返回: |
| - str 或 None: 如果找到,则返回第一个 ID;否则返回 None。 |
| """ |
| |
| match = re.search(r'xrefs:\s*([\w.-]+)', info_string) |
| if match: |
| return match.group(1) |
| else: |
| return None |
|
|
|
|
|
|
| def fetch_uniprot_protein_sequence(uniprot_id): |
| url = f"https://www.uniprot.org/uniprot/{uniprot_id}.fasta" |
| response = requests.get(url) |
| if response.status_code == 200: |
| fasta_data = response.text |
| record = SeqIO.read(StringIO(fasta_data), "fasta") |
| return str(record.seq) |
| else: |
| raise ValueError(f"未能从 UniProt 获取蛋白质序列,状态码:{response.status_code}") |
|
|
| def fetch_ncbi_genbank_data(protein_ncbi_id): |
| |
| handle = Entrez.efetch(db="protein", id=protein_ncbi_id, rettype="gb", retmode="text") |
| genbank_data = handle.read() |
| handle.close() |
|
|
| |
| rec = SeqIO.parse(StringIO(genbank_data), "genbank") |
| for item in rec: |
| record = item |
| break |
|
|
| |
| db_source = record.annotations["db_source"] |
| |
| xref_id = extract_first_xref_id(db_source) |
| print("xref_id", xref_id) |
| |
|
|
| |
| r_handle = Entrez.efetch(id=xref_id, db='nucleotide', rettype='gb', retmode='text') |
| dna_data = r_handle.read() |
| r_handle.close() |
| |
| return dna_data |
|
|
| def extract_cds_and_translate(genbank_data, protein_seq_ori=None): |
| results = [] |
| record = None |
|
|
| |
| for rec in SeqIO.parse(StringIO(genbank_data), "genbank"): |
| record = rec |
| break |
|
|
| |
| if not record: |
| raise ValueError("未能成功解析 GenBank 数据") |
|
|
| gene_id = record.id |
| |
| for feature in record.features: |
| if feature.type == "CDS": |
| |
| |
| |
| |
|
|
| cds_start = feature.location.start |
| cds_end = feature.location.end |
| cds_sequence = record.seq[cds_start:cds_end] |
| |
| protein_sequence = feature.qualifiers["translation"][0] |
| protein_id = feature.qualifiers["protein_id"][0] |
| |
| sim_score = calculate_similarity(protein_sequence, protein_seq_ori) |
| |
| results.append({ |
| "gene_id":gene_id, |
| "protein_id":protein_id, |
| "cds_start": cds_start, |
| "cds_end": cds_end, |
| "dna_sequence": str(cds_sequence), |
| "protein_sequence": str(protein_sequence), |
| "sim":sim_score |
| }) |
| |
| sorted_results = sorted(results, key=lambda x: x['sim'], reverse=True) |
| return sorted_results |
|
|
| def get_protein_and_dna_sequences(uniprot_id): |
| """ |
| 主函数,根据蛋白质id,获得dna和蛋白质匹配对 |
| :param uniprot_id: |
| :return: |
| """ |
| try: |
| print(f"正在获取 UniProt ID: {uniprot_id} 的蛋白质序列...") |
| protein_sequence = fetch_uniprot_protein_sequence(uniprot_id) |
|
|
| print("正在获取 NCBI 数据...") |
| handle = Entrez.esearch(db="protein", term=uniprot_id) |
| record = Entrez.read(handle) |
| handle.close() |
|
|
| if not record["IdList"]: |
| raise ValueError("未找到对应的蛋白质记录") |
|
|
| protein_ncbi_id = record["IdList"][0] |
|
|
| |
| |
| genbank_data = fetch_ncbi_genbank_data(protein_ncbi_id) |
|
|
| |
|
|
| print("正在提取 DNA 和蛋白质序列...") |
| cds_data = extract_cds_and_translate(genbank_data, protein_sequence) |
|
|
| return { |
| "uniprot_id": uniprot_id, |
| "protein_sequence": protein_sequence, |
| "cds_data": cds_data, |
| } |
|
|
| except Exception as e: |
| print(f"发生错误:{e}") |
| return {"error": str(e)} |
|
|
| def process_data(uniprot_id): |
| |
| result = get_protein_and_dna_sequences(uniprot_id) |
| if "error" in result: |
| print(f"错误:{result['error']}") |
| return -1 |
| else: |
| |
| if len(result["cds_data"])>0: |
| gene_data = result["cds_data"][0] |
| data = { |
| "seq_id_a":gene_data["gene_id"], |
| "seq_type_a":"gene", |
| "seq_a":gene_data["dna_sequence"], |
| "seq_id_b":uniprot_id, |
| "seq_type_b":"pro", |
| "seq_b":gene_data["protein_sequence"], |
| "protein_id":gene_data["protein_id"], |
| } |
| return data |
| else: |
| return -1 |
| |
| if __name__=="__main__": |
| ret = process_data("Q9Z3S1") |
| print(ret) |
|
|