central_dogma / get_data /get_protein_dna_pair.py
marisming's picture
Upload folder using huggingface_hub
d6e5490 verified
from Bio import Entrez, SeqIO
from Bio.Seq import Seq
import requests
from io import StringIO
import re
from Bio import pairwise2
from Bio.pairwise2 import format_alignment
Entrez.email = "wangliang.f@gmail.com"
def calculate_similarity(seq1, seq2):
"""使用局部比对计算两个序列之间的相似度得分"""
alignments = pairwise2.align.localxx(seq1, seq2)
best_score = max(aln.score for aln in alignments) if alignments else 0
return best_score
def extract_first_xref_id(info_string):
"""
从给定的字符串中提取 xrefs: 后面的第一个 ID。
参数:
- info_string (str): 包含 UniProtKB 信息的字符串
返回:
- str 或 None: 如果找到,则返回第一个 ID;否则返回 None。
"""
# 使用正则表达式查找 'xrefs:' 后面的第一个 ID
match = re.search(r'xrefs:\s*([\w.-]+)', info_string)
if match:
return match.group(1) # 返回匹配到的第一个 ID
else:
return None
def fetch_uniprot_protein_sequence(uniprot_id):
url = f"https://www.uniprot.org/uniprot/{uniprot_id}.fasta"
response = requests.get(url)
if response.status_code == 200:
fasta_data = response.text
record = SeqIO.read(StringIO(fasta_data), "fasta")
return str(record.seq)
else:
raise ValueError(f"未能从 UniProt 获取蛋白质序列,状态码:{response.status_code}")
def fetch_ncbi_genbank_data(protein_ncbi_id):
#STEP 1, 获得protein 数据
handle = Entrez.efetch(db="protein", id=protein_ncbi_id, rettype="gb", retmode="text")
genbank_data = handle.read()
handle.close()
#获得dna的id
rec = SeqIO.parse(StringIO(genbank_data), "genbank")
for item in rec:
record = item
break
#protein_seq = str(record.seq)
db_source = record.annotations["db_source"]
#print("db_source", db_source)
xref_id = extract_first_xref_id(db_source)
print("xref_id", xref_id)
#step2,获得dna数据
r_handle = Entrez.efetch(id=xref_id, db='nucleotide', rettype='gb', retmode='text')
dna_data = r_handle.read()
r_handle.close()
return dna_data
def extract_cds_and_translate(genbank_data, protein_seq_ori=None):
results = []
record = None
# 使用 StringIO 加载 GenBank 数据
for rec in SeqIO.parse(StringIO(genbank_data), "genbank"):
record = rec
break
if not record:
raise ValueError("未能成功解析 GenBank 数据")
gene_id = record.id
for feature in record.features:
if feature.type == "CDS":
#protein_sequence = features.qualifiers["translation"][0]
# if protein_id and "protein_id" in feature.qualifiers:
# if protein_id != feature.qualifiers["protein_id"][0]:
# continue
cds_start = feature.location.start
cds_end = feature.location.end
cds_sequence = record.seq[cds_start:cds_end]
#protein_sequence = cds_sequence.translate(to_stop=True)
protein_sequence = feature.qualifiers["translation"][0]
protein_id = feature.qualifiers["protein_id"][0]
sim_score = calculate_similarity(protein_sequence, protein_seq_ori)
#if p_s==protein_seq: #只要1个
results.append({
"gene_id":gene_id,
"protein_id":protein_id,
"cds_start": cds_start,
"cds_end": cds_end,
"dna_sequence": str(cds_sequence),
"protein_sequence": str(protein_sequence),
"sim":sim_score
})
# 使用 sorted() 函数并指定 key 和 reverse 参数
sorted_results = sorted(results, key=lambda x: x['sim'], reverse=True)
return sorted_results
def get_protein_and_dna_sequences(uniprot_id):
"""
主函数,根据蛋白质id,获得dna和蛋白质匹配对
:param uniprot_id:
:return:
"""
try:
print(f"正在获取 UniProt ID: {uniprot_id} 的蛋白质序列...")
protein_sequence = fetch_uniprot_protein_sequence(uniprot_id)
print("正在获取 NCBI 数据...")
handle = Entrez.esearch(db="protein", term=uniprot_id)
record = Entrez.read(handle)
handle.close()
if not record["IdList"]:
raise ValueError("未找到对应的蛋白质记录")
protein_ncbi_id = record["IdList"][0]
#print("protein_ncbi_id", protein_ncbi_id)
genbank_data = fetch_ncbi_genbank_data(protein_ncbi_id)
#print("genbank_data", genbank_data)
print("正在提取 DNA 和蛋白质序列...")
cds_data = extract_cds_and_translate(genbank_data, protein_sequence)
return {
"uniprot_id": uniprot_id,
"protein_sequence": protein_sequence,
"cds_data": cds_data,
}
except Exception as e:
print(f"发生错误:{e}")
return {"error": str(e)}
def process_data(uniprot_id):
# 示例:获取蛋白质和 DNA 序列
result = get_protein_and_dna_sequences(uniprot_id)
if "error" in result:
print(f"错误:{result['error']}")
return -1
else:
#print(result)
if len(result["cds_data"])>0:
gene_data = result["cds_data"][0]
data = {
"seq_id_a":gene_data["gene_id"],
"seq_type_a":"gene",
"seq_a":gene_data["dna_sequence"],
"seq_id_b":uniprot_id,
"seq_type_b":"pro",
"seq_b":gene_data["protein_sequence"],
"protein_id":gene_data["protein_id"],
}
return data
else:
return -1
if __name__=="__main__":
ret = process_data("Q9Z3S1")
print(ret)