import os import base64 import requests from collections import defaultdict, Counter from Bio.PDB import PDBParser, PPBuilder from Bio.Align import PairwiseAligner from Bio.PDB.MMCIFParser import MMCIFParser from tempfile import NamedTemporaryFile import pandas as pd import matplotlib.pyplot as plt import logomaker from utils import read_file, extract_contact_residues def get_uniprot_info(uniprot_id): # 获取 UniProt 数据 url = f'https://rest.uniprot.org/uniprotkb/{uniprot_id}.json' response = requests.get(url) if response.status_code != 200: print("UniProt request failed:", response.status_code) return None data = response.json() # 获取序列 sequence = data.get('sequence', {}).get('value', '') # 获取 PDB IDs pdb_ids = [] for xref in data.get('uniProtKBCrossReferences', []): if xref.get('database') == 'PDB': pdb_ids.append(xref['id']) return sequence, pdb_ids def sequence_fix(seq: str) -> str: """修复序列中的非标准氨基酸""" # remove fasta header if present if seq.startswith(">"): seq = seq.split("\n", 1)[-1].replace("\n", "") # remove \n seq = seq.replace("\n", "").strip() fixed_seq = [] for aa in seq: if aa not in "ACDEFGHIKLMNPQRSTVWY": fixed_seq.append("X") # 用 X 替代非标准氨基酸 else: fixed_seq.append(aa) if len(fixed_seq) < 2: return None return "".join(fixed_seq) def find_best_chain(file_path, reference_seq, match_ratio=0.8): """在每个 PDB 中用全局比对选出和 reference_seq 最匹配的链 ID""" if file_path.endswith(".pdb") or file_path.endswith(".ent"): parser = PDBParser(QUIET=True) elif file_path.endswith(".cif"): parser = MMCIFParser(QUIET=True) with open(file_path, 'r') as f: content = f.read() # 如果缺少 data_ 开头,就加上一个默认块名 if not content.lstrip().startswith("data_"): content = "data_auto\n" + content # 3. 写入临时 mmCIF 文件 with NamedTemporaryFile(suffix=".cif", delete=False, mode='w') as tmp: tmp.write(content) file_path = tmp.name parser = MMCIFParser(QUIET=True) structure = parser.get_structure("s", file_path) # 初始化 aligner aligner = PairwiseAligner() aligner.mode = 'local' aligner.match_score = 1 aligner.mismatch_score = 0 aligner.open_gap_score = -5 aligner.extend_gap_score = -0.5 best = (None, 0.0, '-') for model in structure: for chain in model: seq = "".join(str(pp.get_sequence()) for pp in PPBuilder().build_peptides(chain)) score = aligner.score(reference_seq, seq) if score > best[1]: best = (chain.id, score, seq) min_score = len(best[2]) * match_ratio if best[1] < min_score: return None return best[0] def analyze_group( group_path: str, reference_seq: str, cutoff: float = 3.5, match_ratio: float = 0.8, target_entity_keys: list[str] | None = None, cnt_by_file: bool = False ) -> list[dict]: """ 批量统计:目标蛋白 vs 指定实体 的接触残基, 并记录每个残基在哪些结构文件中出现 返回格式: [ {'chain':'A','resi':'25','resn':'LYS','count':12, 'structures':['1abc.pdb','2def.pdb',...]}, ... ] """ # # 1. 解压 # if zip_path.endswith(".zip"): # z = zipfile.ZipFile(zip_path) # tmpdir_obj = TemporaryDirectory() # tmpdir = tmpdir_obj.name # z.extractall(tmpdir) # else: # tmpdir = zip_path # 记录残基 -> 文件列表 residue_files = defaultdict(set) # 计数 counter = Counter() for fname in os.listdir(group_path): print(f"Processing {fname}...") if not fname.lower().endswith((".pdb", ".ent", ".cif")): continue pdb_file = os.path.join(group_path, fname) # 2. 识别蛋白链 prot_chain = find_best_chain(pdb_file, reference_seq, match_ratio) if prot_chain is None: print(f"Warning: {fname} 中未找到匹配链,跳过") continue prot_key = f"Chain {prot_chain} (Protein)" # 3. 解析结构 _, summary, _, _ = read_file(pdb_file) # 4. 筛选实体 all_keys = set(summary.keys()) all_keys.discard(prot_key) if target_entity_keys is not None: target_entity_keys_canonical = [f'({k.lower()})' for k in target_entity_keys] keys_to_analyze = [] for k in all_keys: if k.split()[-1].lower() in target_entity_keys_canonical: keys_to_analyze.append(k) else: keys_to_analyze = list(all_keys) file_residues = set() # 5. 提取接触 # TODO: 似乎可以不用循环 for key in keys_to_analyze: contacts = extract_contact_residues( summary, selected_keys=[prot_key, key], cutoff=cutoff ) for res in contacts.get(prot_key, []): tag = (res['chain'], res['resi'], res['resn']) if cnt_by_file: file_residues.add(tag) else: counter[tag] += 1 residue_files[tag].add(fname) if cnt_by_file: # 只对每个残基 +1 次 for tag in file_residues: counter[tag] += 1 residue_files[tag].add(fname) # 6. 输出结果 result = [] for (c, r, n), cnt in counter.most_common(): result.append({ 'chain': c, 'resi': r, 'resn': n, 'count': cnt, 'structures': sorted(residue_files[(c, r, n)]) }) return result def logo_plot(seq: str, results: list[dict]) -> None: # 转 dict pos2count = {int(r['resi']): r['count'] for r in results} # 2. 构建 DataFrame alphabet = list("ACDEFGHIKLMNPQRSTVWY") df = pd.DataFrame(0, index=range(len(seq)), columns=alphabet) for i, aa in enumerate(seq): df.at[i, aa] = pos2count.get(i, 0) # 3. 画超长 logo 图 unit_width = 0.25 # 每个氨基酸宽度(可调!) fixed_height = 5 plt.rcParams['font.size'] = 14 fig, ax = plt.subplots(figsize=(len(seq) * unit_width, fixed_height), dpi=300) logo = logomaker.Logo(df, ax=ax) # 计算 y 轴范围和按比例的偏移量 max_count = max(pos2count.values()) if pos2count else 1 y_max = max_count * 1.2 text_offset = y_max * 0.20 # 相对于 y 轴最大值的百分数作为偏移量 for i, aa in enumerate(seq): ax.text(i, -text_offset, aa, ha='center', va='top', fontfamily='monospace') # 美化 ax.set_ylabel("As-binding-site Count") ax.set_xticks(range(len(seq))) # 所有位置都有刻度 ax.set_xticklabels([i+1 for i in range(len(seq))], rotation=45) ax.set_xlim(-1, len(seq)) ax.set_ylim(0, max(pos2count.values()) * 1.2 if pos2count else 1) plt.tight_layout() # get svg content with NamedTemporaryFile(suffix=".svg", delete=False) as tmpfile: plt.savefig(tmpfile.name, format='svg', dpi=30, bbox_inches='tight') svg_html = render_svg_from_file(tmpfile.name) return svg_html def render_svg(svg_content: str) -> str: # 把 SVG 裹在一个可横向滚动的 div 里 svg_base64 = base64.b64encode(svg_content.encode('utf-8')).decode('utf-8') download_href = f"data:image/svg+xml;base64,{svg_base64}" svg_html = f"""
Download SVG
{svg_content}
""" return svg_html def render_svg_from_file(file_path: str) -> str: with open(file_path, 'r') as f: svg_content = f.read() return render_svg(svg_content) MULTI_HTML_HOLDER = """
暂无结果,请通过输入UniProt ID或上传文件和指定序列开始分析。
"""