| #!/usr/bin/env python3 | |
| """ | |
| alignment_check_v2.py | |
| Global alignment of a shrunken/mutated GFP sequence to a reference (e.g., eGFP), | |
| plus a report mapping reference positions -> query positions and checking | |
| required residues at specific reference coordinates. | |
| Requires: biopython | |
| Usage examples: | |
| 1) Paste sequences directly: | |
| python alignment_check_v2.py --query "YOUR_SHRUNKEN_SEQ" | |
| 2) Provide FASTA files: | |
| python alignment_check_v2.py --ref ref.fasta --qry query.fasta | |
| 3) Check specific sites (1-based reference positions): | |
| python alignment_check_v2.py --sites 65:T 66:Y 67:G 96:R 148:H 203:T 205:S 222:E | |
| """ | |
| import argparse | |
| from dataclasses import dataclass | |
| from typing import Dict, List, Optional, Tuple | |
| from Bio.Align import PairwiseAligner | |
| from Bio.Align import substitution_matrices | |
| DEFAULT_EGFP = ( | |
| "MVSKGEELFTGVVPILVELDGDVNGHKFSVSGEGEGDATYGKLTLKFICTTGKLPVPWPTLVTTLTYGVQCFSRYPDHMKQHDFFKSAMPEGYVQERTIFFKDDGNYKTRAEVKFEGDTLVNRIELKGIDFKEDGNILGHKLEYNYNSHNVYIMADKQKNGIKVNFKIRHNIEDGSVQLADHYQQNTPIGDGPVLLPDNHYLSTQSALSKDPNEKRDHMVLLEFVTAAGITLGMDELYK" | |
| ) | |
| DEFAULT_SITES = { | |
| # keep 61–76 intact | |
| 61: 'L', 62: 'V', 63: 'T', 64: 'T', | |
| 65: 'L', 66: 'T', 67: 'Y', 68: 'G', | |
| 69: 'V', 70: 'Q', 71: 'C', 72: 'F', | |
| 73: 'S', 74: 'R', 75: 'Y', 76: 'P', | |
| # other key sites | |
| 97: 'R', 149: 'H', | |
| 204: 'T', 206: 'S', | |
| 223: 'E', | |
| } | |
| def read_fasta_one(path: str) -> str: | |
| seq = [] | |
| with open(path, "r", encoding="utf-8") as f: | |
| for line in f: | |
| line = line.strip() | |
| if not line or line.startswith(">"): | |
| continue | |
| seq.append(line) | |
| return "".join(seq).replace(" ", "").upper() | |
| def clean_seq(s: str) -> str: | |
| return "".join([c for c in s.strip().upper() if c.isalpha()]) | |
| def _parse_biopython_alignment_str(aln_str: str) -> Tuple[str, str, str]: | |
| """ | |
| Parse Biopython's pretty-printed Alignment string across versions. | |
| It prints the alignment in blocks like: | |
| target 0 ACDE... 60 (some versions omit trailing coordinate) | |
| 0 ||.|... 60 (some versions omit trailing coordinate) | |
| query 0 ACD-... 59 | |
| Long alignments repeat multiple blocks. We concatenate chunks. | |
| """ | |
| ref_chunks: List[str] = [] | |
| mid_chunks: List[str] = [] | |
| qry_chunks: List[str] = [] | |
| def last_token(line: str) -> str: | |
| parts = line.split() | |
| return parts[-1] if parts else "" | |
| for line in aln_str.splitlines(): | |
| if not line.strip(): | |
| continue | |
| parts = line.split() | |
| if not parts: | |
| continue | |
| tag = parts[0] | |
| if tag == "target": | |
| ref_chunks.append(last_token(line)) | |
| elif tag == "query": | |
| qry_chunks.append(last_token(line)) | |
| else: | |
| tok = last_token(line) | |
| # match-string heuristic | |
| if tok and all(c in "|.-:" for c in tok): | |
| mid_chunks.append(tok) | |
| aln_ref = "".join(ref_chunks) | |
| aln_mid = "".join(mid_chunks) | |
| aln_qry = "".join(qry_chunks) | |
| return aln_ref, aln_mid, aln_qry | |
| def best_global_alignment(ref: str, qry: str, | |
| open_gap: float = -10.0, | |
| extend_gap: float = -0.5) -> Tuple[str, str, str, float]: | |
| """ | |
| Returns (aln_ref, aln_mid, aln_qry, score) where aln_* are gapped strings of equal length. | |
| aln_mid uses '|' for exact matches, '.' for mismatches, '-' for gaps. | |
| """ | |
| aligner = PairwiseAligner() | |
| aligner.mode = "global" | |
| aligner.substitution_matrix = substitution_matrices.load("BLOSUM62") | |
| aligner.open_gap_score = open_gap | |
| aligner.extend_gap_score = extend_gap | |
| aln = aligner.align(ref, qry)[0] | |
| score = float(aln.score) | |
| aln_ref, aln_mid, aln_qry = _parse_biopython_alignment_str(str(aln)) | |
| if len(aln_ref) != len(aln_qry) or len(aln_ref) == 0: | |
| raise RuntimeError( | |
| f"Failed to parse gapped sequences from Biopython alignment. " | |
| f"lens: ref={len(aln_ref)} qry={len(aln_qry)}" | |
| ) | |
| # rebuild mid line if missing / mismatched | |
| if len(aln_mid) != len(aln_ref): | |
| rebuilt = [] | |
| for a, b in zip(aln_ref, aln_qry): | |
| if a == "-" or b == "-": | |
| rebuilt.append("-") | |
| elif a == b: | |
| rebuilt.append("|") | |
| else: | |
| rebuilt.append(".") | |
| aln_mid = "".join(rebuilt) | |
| return aln_ref, aln_mid, aln_qry, score | |
| class MappingRow: | |
| ref_pos: int | |
| ref_aa: str | |
| qry_pos: Optional[int] | |
| qry_aa: Optional[str] | |
| expected: str | |
| ok: bool | |
| def build_ref_to_qry_map(aln_ref: str, aln_qry: str) -> Tuple[Dict[int, Optional[int]], Dict[int, Optional[str]], Dict[int, int]]: | |
| """ | |
| Returns: | |
| - ref_to_qry_pos: ref_pos (1-based) -> qry_pos (1-based) or None if deleted in query | |
| - ref_to_qry_aa: ref_pos (1-based) -> query AA at that aligned position, or None if deleted | |
| - ref_to_col: ref_pos (1-based) -> alignment column index where that ref_pos sits | |
| """ | |
| ref_pos = 0 | |
| qry_pos = 0 | |
| ref_to_qry_pos: Dict[int, Optional[int]] = {} | |
| ref_to_qry_aa: Dict[int, Optional[str]] = {} | |
| ref_to_col: Dict[int, int] = {} | |
| for col, (a, b) in enumerate(zip(aln_ref, aln_qry)): | |
| if a != "-": | |
| ref_pos += 1 | |
| ref_to_col[ref_pos] = col | |
| if b != "-": | |
| qry_pos += 1 | |
| ref_to_qry_pos[ref_pos] = qry_pos | |
| ref_to_qry_aa[ref_pos] = b | |
| else: | |
| ref_to_qry_pos[ref_pos] = None | |
| ref_to_qry_aa[ref_pos] = None | |
| else: | |
| if b != "-": | |
| qry_pos += 1 | |
| return ref_to_qry_pos, ref_to_qry_aa, ref_to_col | |
| def summarize_sites(ref: str, sites: Dict[int, str], | |
| ref_to_qry_pos: Dict[int, Optional[int]], | |
| ref_to_qry_aa: Dict[int, Optional[str]]) -> List[MappingRow]: | |
| rows: List[MappingRow] = [] | |
| for p in sorted(sites.keys()): | |
| exp = sites[p] | |
| ref_aa = ref[p-1] if 1 <= p <= len(ref) else "?" | |
| qpos = ref_to_qry_pos.get(p, None) | |
| qaa = ref_to_qry_aa.get(p, None) | |
| ok = (qaa == exp) if qaa is not None else False | |
| rows.append(MappingRow(ref_pos=p, ref_aa=ref_aa, qry_pos=qpos, qry_aa=qaa, expected=exp, ok=ok)) | |
| return rows | |
| def print_table(rows: List[MappingRow]) -> None: | |
| header = f"{'ref_pos':>7} {'ref':>3} {'qry_pos':>7} {'qry':>3} {'exp':>3} {'ok':>4}" | |
| print(header) | |
| print("-" * len(header)) | |
| for r in rows: | |
| qp = "-" if r.qry_pos is None else str(r.qry_pos) | |
| qa = "-" if r.qry_aa is None else r.qry_aa | |
| ok = "YES" if r.ok else "NO" | |
| print(f"{r.ref_pos:>7} {r.ref_aa:>3} {qp:>7} {qa:>3} {r.expected:>3} {ok:>4}") | |
| def print_windows(aln_ref: str, aln_mid: str, aln_qry: str, | |
| ref_to_col: Dict[int, int], | |
| site_positions: List[int], | |
| window: int = 30) -> None: | |
| L = len(aln_ref) | |
| for p in site_positions: | |
| if p not in ref_to_col: | |
| continue | |
| c = ref_to_col[p] | |
| start = max(0, c - window) | |
| end = min(L, c + window + 1) | |
| print(f"\nAlignment window around ref position {p} (cols {start}-{end-1}):") | |
| print("ref :", aln_ref[start:end]) | |
| print(" :", aln_mid[start:end]) | |
| print("qry :", aln_qry[start:end]) | |
| def parse_sites(site_args: List[str]) -> Dict[int, str]: | |
| out: Dict[int, str] = {} | |
| for item in site_args: | |
| if ":" not in item: | |
| raise ValueError(f"Bad site spec '{item}'. Use e.g. 65:T") | |
| p_str, aa = item.split(":", 1) | |
| p = int(p_str) | |
| aa = aa.strip().upper() | |
| if len(aa) != 1: | |
| raise ValueError(f"Expected single-letter AA in '{item}'") | |
| out[p] = aa | |
| return out | |
| def main(): | |
| ap = argparse.ArgumentParser() | |
| ap.add_argument("--ref", type=str, default="", help="Reference FASTA (optional). If omitted, uses built-in eGFP sequence.") | |
| ap.add_argument("--qry", type=str, default="", help="Query FASTA (optional).") | |
| ap.add_argument("--query", type=str, default="", help="Query sequence string (optional).") | |
| ap.add_argument("--sites", nargs="*", default=[], help="Site constraints like 65:T 66:Y ... (optional).") | |
| ap.add_argument("--open_gap", type=float, default=-10.0) | |
| ap.add_argument("--extend_gap", type=float, default=-0.5) | |
| ap.add_argument("--no_full", action="store_true", help="Do not print full alignment (only windows + table).") | |
| ap.add_argument("--window", type=int, default=30, help="Half-window size for alignment snippets.") | |
| args = ap.parse_args() | |
| ref = clean_seq(read_fasta_one(args.ref)) if args.ref else DEFAULT_EGFP | |
| if args.qry: | |
| qry = clean_seq(read_fasta_one(args.qry)) | |
| elif args.query: | |
| qry = clean_seq(args.query) | |
| else: | |
| raise SystemExit("Provide a query via --qry (FASTA) or --query (sequence string).") | |
| sites = parse_sites(args.sites) if args.sites else DEFAULT_SITES | |
| aln_ref, aln_mid, aln_qry, score = best_global_alignment(ref, qry, open_gap=args.open_gap, extend_gap=args.extend_gap) | |
| ref_to_qry_pos, ref_to_qry_aa, ref_to_col = build_ref_to_qry_map(aln_ref, aln_qry) | |
| rows = summarize_sites(ref, sites, ref_to_qry_pos, ref_to_qry_aa) | |
| print(f"Reference length: {len(ref)} aa") | |
| print(f"Query length: {len(qry)} aa") | |
| print(f"Alignment length: {len(aln_ref)} columns") | |
| print(f"Alignment score: {score:.2f}") | |
| print("\nRequired-site mapping (reference -> query):") | |
| print_table(rows) | |
| print_windows(aln_ref, aln_mid, aln_qry, ref_to_col, sorted(sites.keys()), window=args.window) | |
| if not args.no_full: | |
| print("\nFull alignment:") | |
| print("ref :", aln_ref) | |
| print(" :", aln_mid) | |
| print("qry :", aln_qry) | |
| passed = all(r.ok for r in rows) | |
| print(f"\nPASS = {passed}") | |
| if __name__ == "__main__": | |
| main() | |
Xet Storage Details
- Size:
- 10 kB
- Xet hash:
- c2e1de1931616f038db78f06426042f633a103ba36e0385d4bd671185d144bbf
·
Xet efficiently stores files, intelligently splitting them into unique chunks and accelerating uploads and downloads. More info.