| | import os |
| | import base64 |
| | import requests |
| | from collections import defaultdict, Counter |
| | from Bio.PDB import PDBParser, PPBuilder |
| | from Bio.Align import PairwiseAligner |
| | from Bio.PDB.MMCIFParser import MMCIFParser |
| |
|
| | from tempfile import NamedTemporaryFile |
| | import pandas as pd |
| | import matplotlib.pyplot as plt |
| | import logomaker |
| |
|
| | from utils import read_file, extract_contact_residues |
| |
|
| |
|
| | def get_uniprot_info(uniprot_id): |
| | |
| | url = f'https://rest.uniprot.org/uniprotkb/{uniprot_id}.json' |
| | response = requests.get(url) |
| | if response.status_code != 200: |
| | print("UniProt request failed:", response.status_code) |
| | return None |
| | |
| | data = response.json() |
| | |
| | |
| | sequence = data.get('sequence', {}).get('value', '') |
| | |
| | |
| | pdb_ids = [] |
| | for xref in data.get('uniProtKBCrossReferences', []): |
| | if xref.get('database') == 'PDB': |
| | pdb_ids.append(xref['id']) |
| | |
| | return sequence, pdb_ids |
| |
|
| |
|
| | def sequence_fix(seq: str) -> str: |
| | """修复序列中的非标准氨基酸""" |
| | |
| | if seq.startswith(">"): |
| | seq = seq.split("\n", 1)[-1].replace("\n", "") |
| | |
| | seq = seq.replace("\n", "").strip() |
| |
|
| | fixed_seq = [] |
| | for aa in seq: |
| | if aa not in "ACDEFGHIKLMNPQRSTVWY": |
| | fixed_seq.append("X") |
| | else: |
| | fixed_seq.append(aa) |
| |
|
| | if len(fixed_seq) < 2: |
| | return None |
| | return "".join(fixed_seq) |
| |
|
| |
|
| | def find_best_chain(file_path, reference_seq, match_ratio=0.8): |
| | """在每个 PDB 中用全局比对选出和 reference_seq 最匹配的链 ID""" |
| | if file_path.endswith(".pdb") or file_path.endswith(".ent"): |
| | parser = PDBParser(QUIET=True) |
| | elif file_path.endswith(".cif"): |
| | parser = MMCIFParser(QUIET=True) |
| | with open(file_path, 'r') as f: |
| | content = f.read() |
| | |
| | if not content.lstrip().startswith("data_"): |
| | content = "data_auto\n" + content |
| | |
| | with NamedTemporaryFile(suffix=".cif", delete=False, mode='w') as tmp: |
| | tmp.write(content) |
| | file_path = tmp.name |
| | parser = MMCIFParser(QUIET=True) |
| |
|
| | structure = parser.get_structure("s", file_path) |
| | |
| | aligner = PairwiseAligner() |
| | aligner.mode = 'local' |
| | aligner.match_score = 1 |
| | aligner.mismatch_score = 0 |
| | aligner.open_gap_score = -5 |
| | aligner.extend_gap_score = -0.5 |
| |
|
| | best = (None, 0.0, '-') |
| | for model in structure: |
| | for chain in model: |
| | seq = "".join(str(pp.get_sequence()) for pp in PPBuilder().build_peptides(chain)) |
| | score = aligner.score(reference_seq, seq) |
| | if score > best[1]: |
| | best = (chain.id, score, seq) |
| | |
| | min_score = len(best[2]) * match_ratio |
| | if best[1] < min_score: |
| | return None |
| | return best[0] |
| |
|
| |
|
| | def analyze_group( |
| | group_path: str, |
| | reference_seq: str, |
| | cutoff: float = 3.5, |
| | match_ratio: float = 0.8, |
| | target_entity_keys: list[str] | None = None, |
| | cnt_by_file: bool = False |
| | ) -> list[dict]: |
| | """ |
| | 批量统计:目标蛋白 vs 指定实体 的接触残基, |
| | 并记录每个残基在哪些结构文件中出现 |
| | 返回格式: |
| | [ |
| | {'chain':'A','resi':'25','resn':'LYS','count':12, |
| | 'structures':['1abc.pdb','2def.pdb',...]}, |
| | ... |
| | ] |
| | """ |
| | |
| | |
| | |
| | |
| | |
| | |
| |
|
| | |
| | |
| |
|
| | |
| | residue_files = defaultdict(set) |
| | |
| | counter = Counter() |
| |
|
| | for fname in os.listdir(group_path): |
| | print(f"Processing {fname}...") |
| | if not fname.lower().endswith((".pdb", ".ent", ".cif")): |
| | continue |
| | pdb_file = os.path.join(group_path, fname) |
| |
|
| | |
| | prot_chain = find_best_chain(pdb_file, reference_seq, match_ratio) |
| | if prot_chain is None: |
| | print(f"Warning: {fname} 中未找到匹配链,跳过") |
| | continue |
| | prot_key = f"Chain {prot_chain} (Protein)" |
| |
|
| | |
| | _, summary, _, _ = read_file(pdb_file) |
| |
|
| | |
| | all_keys = set(summary.keys()) |
| | all_keys.discard(prot_key) |
| | if target_entity_keys is not None: |
| | target_entity_keys_canonical = [f'({k.lower()})' for k in target_entity_keys] |
| | keys_to_analyze = [] |
| | for k in all_keys: |
| | if k.split()[-1].lower() in target_entity_keys_canonical: |
| | keys_to_analyze.append(k) |
| | else: |
| | keys_to_analyze = list(all_keys) |
| | file_residues = set() |
| | |
| | |
| | for key in keys_to_analyze: |
| | contacts = extract_contact_residues( |
| | summary, |
| | selected_keys=[prot_key, key], |
| | cutoff=cutoff |
| | ) |
| | for res in contacts.get(prot_key, []): |
| | tag = (res['chain'], res['resi'], res['resn']) |
| | if cnt_by_file: |
| | file_residues.add(tag) |
| | else: |
| | counter[tag] += 1 |
| | residue_files[tag].add(fname) |
| |
|
| | if cnt_by_file: |
| | |
| | for tag in file_residues: |
| | counter[tag] += 1 |
| | residue_files[tag].add(fname) |
| |
|
| | |
| | result = [] |
| | for (c, r, n), cnt in counter.most_common(): |
| | result.append({ |
| | 'chain': c, |
| | 'resi': r, |
| | 'resn': n, |
| | 'count': cnt, |
| | 'structures': sorted(residue_files[(c, r, n)]) |
| | }) |
| | return result |
| |
|
| | def logo_plot(seq: str, results: list[dict]) -> None: |
| | |
| | pos2count = {int(r['resi']): r['count'] for r in results} |
| |
|
| | |
| | alphabet = list("ACDEFGHIKLMNPQRSTVWY") |
| | df = pd.DataFrame(0, index=range(len(seq)), columns=alphabet) |
| |
|
| | for i, aa in enumerate(seq): |
| | df.at[i, aa] = pos2count.get(i, 0) |
| |
|
| | |
| | unit_width = 0.25 |
| | fixed_height = 5 |
| | plt.rcParams['font.size'] = 14 |
| |
|
| | fig, ax = plt.subplots(figsize=(len(seq) * unit_width, fixed_height), dpi=300) |
| | logo = logomaker.Logo(df, ax=ax) |
| | |
| | |
| | max_count = max(pos2count.values()) if pos2count else 1 |
| | y_max = max_count * 1.2 |
| | text_offset = y_max * 0.20 |
| | |
| | for i, aa in enumerate(seq): |
| | ax.text(i, -text_offset, aa, ha='center', va='top', fontfamily='monospace') |
| |
|
| | |
| | ax.set_ylabel("As-binding-site Count") |
| | ax.set_xticks(range(len(seq))) |
| | ax.set_xticklabels([i+1 for i in range(len(seq))], rotation=45) |
| | ax.set_xlim(-1, len(seq)) |
| | ax.set_ylim(0, max(pos2count.values()) * 1.2 if pos2count else 1) |
| |
|
| | plt.tight_layout() |
| | |
| | with NamedTemporaryFile(suffix=".svg", delete=False) as tmpfile: |
| | plt.savefig(tmpfile.name, format='svg', dpi=30, bbox_inches='tight') |
| | svg_html = render_svg_from_file(tmpfile.name) |
| |
|
| | return svg_html |
| |
|
| | def render_svg(svg_content: str) -> str: |
| | |
| | svg_base64 = base64.b64encode(svg_content.encode('utf-8')).decode('utf-8') |
| | download_href = f"data:image/svg+xml;base64,{svg_base64}" |
| |
|
| | svg_html = f""" |
| | <div style="display: flex; justify-content: flex-end; padding: 8px;"> |
| | <a download="diagram.svg" |
| | href="{download_href}" |
| | style=" |
| | padding: 6px 12px; |
| | background: #4CAF50; |
| | color: white; |
| | text-decoration: none; |
| | border-radius: 4px; |
| | font-size: 14px;"> |
| | Download SVG |
| | </a> |
| | </div> |
| | |
| | <div style="overflow-x: auto; white-space: nowrap; border: 1px solid #ddd; padding: 10px;"> |
| | {svg_content} |
| | </div> |
| | """ |
| | return svg_html |
| |
|
| | def render_svg_from_file(file_path: str) -> str: |
| | with open(file_path, 'r') as f: |
| | svg_content = f.read() |
| | return render_svg(svg_content) |
| |
|
| |
|
| | MULTI_HTML_HOLDER = """ |
| | <div style="display: flex; justify-content: flex-end; padding: 8px;"> |
| | <button disabled |
| | style=" |
| | padding: 6px 12px; |
| | background: #aaa; |
| | color: white; |
| | text-decoration: none; |
| | border-radius: 4px; |
| | font-size: 14px; |
| | cursor: not-allowed;"> |
| | Download SVG |
| | </button> |
| | </div> |
| | |
| | <div style="overflow-x: auto; white-space: nowrap; border: 1px dashed #ccc; padding: 30px; text-align: center; color: #999; font-size: 16px;"> |
| | 暂无结果,请通过输入UniProt ID或上传文件和指定序列开始分析。 |
| | </div> |
| | """ |