AlienChen/Storage / pCoMole /gfp /alignment.py
AlienChen's picture
download
raw
10 kB
#!/usr/bin/env python3
"""
alignment_check_v2.py
Global alignment of a shrunken/mutated GFP sequence to a reference (e.g., eGFP),
plus a report mapping reference positions -> query positions and checking
required residues at specific reference coordinates.
Requires: biopython
Usage examples:
1) Paste sequences directly:
python alignment_check_v2.py --query "YOUR_SHRUNKEN_SEQ"
2) Provide FASTA files:
python alignment_check_v2.py --ref ref.fasta --qry query.fasta
3) Check specific sites (1-based reference positions):
python alignment_check_v2.py --sites 65:T 66:Y 67:G 96:R 148:H 203:T 205:S 222:E
"""
import argparse
from dataclasses import dataclass
from typing import Dict, List, Optional, Tuple
from Bio.Align import PairwiseAligner
from Bio.Align import substitution_matrices
DEFAULT_EGFP = (
"MVSKGEELFTGVVPILVELDGDVNGHKFSVSGEGEGDATYGKLTLKFICTTGKLPVPWPTLVTTLTYGVQCFSRYPDHMKQHDFFKSAMPEGYVQERTIFFKDDGNYKTRAEVKFEGDTLVNRIELKGIDFKEDGNILGHKLEYNYNSHNVYIMADKQKNGIKVNFKIRHNIEDGSVQLADHYQQNTPIGDGPVLLPDNHYLSTQSALSKDPNEKRDHMVLLEFVTAAGITLGMDELYK"
)
DEFAULT_SITES = {
# keep 61–76 intact
61: 'L', 62: 'V', 63: 'T', 64: 'T',
65: 'L', 66: 'T', 67: 'Y', 68: 'G',
69: 'V', 70: 'Q', 71: 'C', 72: 'F',
73: 'S', 74: 'R', 75: 'Y', 76: 'P',
# other key sites
97: 'R', 149: 'H',
204: 'T', 206: 'S',
223: 'E',
}
def read_fasta_one(path: str) -> str:
seq = []
with open(path, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line or line.startswith(">"):
continue
seq.append(line)
return "".join(seq).replace(" ", "").upper()
def clean_seq(s: str) -> str:
return "".join([c for c in s.strip().upper() if c.isalpha()])
def _parse_biopython_alignment_str(aln_str: str) -> Tuple[str, str, str]:
"""
Parse Biopython's pretty-printed Alignment string across versions.
It prints the alignment in blocks like:
target 0 ACDE... 60 (some versions omit trailing coordinate)
0 ||.|... 60 (some versions omit trailing coordinate)
query 0 ACD-... 59
Long alignments repeat multiple blocks. We concatenate chunks.
"""
ref_chunks: List[str] = []
mid_chunks: List[str] = []
qry_chunks: List[str] = []
def last_token(line: str) -> str:
parts = line.split()
return parts[-1] if parts else ""
for line in aln_str.splitlines():
if not line.strip():
continue
parts = line.split()
if not parts:
continue
tag = parts[0]
if tag == "target":
ref_chunks.append(last_token(line))
elif tag == "query":
qry_chunks.append(last_token(line))
else:
tok = last_token(line)
# match-string heuristic
if tok and all(c in "|.-:" for c in tok):
mid_chunks.append(tok)
aln_ref = "".join(ref_chunks)
aln_mid = "".join(mid_chunks)
aln_qry = "".join(qry_chunks)
return aln_ref, aln_mid, aln_qry
def best_global_alignment(ref: str, qry: str,
open_gap: float = -10.0,
extend_gap: float = -0.5) -> Tuple[str, str, str, float]:
"""
Returns (aln_ref, aln_mid, aln_qry, score) where aln_* are gapped strings of equal length.
aln_mid uses '|' for exact matches, '.' for mismatches, '-' for gaps.
"""
aligner = PairwiseAligner()
aligner.mode = "global"
aligner.substitution_matrix = substitution_matrices.load("BLOSUM62")
aligner.open_gap_score = open_gap
aligner.extend_gap_score = extend_gap
aln = aligner.align(ref, qry)[0]
score = float(aln.score)
aln_ref, aln_mid, aln_qry = _parse_biopython_alignment_str(str(aln))
if len(aln_ref) != len(aln_qry) or len(aln_ref) == 0:
raise RuntimeError(
f"Failed to parse gapped sequences from Biopython alignment. "
f"lens: ref={len(aln_ref)} qry={len(aln_qry)}"
)
# rebuild mid line if missing / mismatched
if len(aln_mid) != len(aln_ref):
rebuilt = []
for a, b in zip(aln_ref, aln_qry):
if a == "-" or b == "-":
rebuilt.append("-")
elif a == b:
rebuilt.append("|")
else:
rebuilt.append(".")
aln_mid = "".join(rebuilt)
return aln_ref, aln_mid, aln_qry, score
@dataclass
class MappingRow:
ref_pos: int
ref_aa: str
qry_pos: Optional[int]
qry_aa: Optional[str]
expected: str
ok: bool
def build_ref_to_qry_map(aln_ref: str, aln_qry: str) -> Tuple[Dict[int, Optional[int]], Dict[int, Optional[str]], Dict[int, int]]:
"""
Returns:
- ref_to_qry_pos: ref_pos (1-based) -> qry_pos (1-based) or None if deleted in query
- ref_to_qry_aa: ref_pos (1-based) -> query AA at that aligned position, or None if deleted
- ref_to_col: ref_pos (1-based) -> alignment column index where that ref_pos sits
"""
ref_pos = 0
qry_pos = 0
ref_to_qry_pos: Dict[int, Optional[int]] = {}
ref_to_qry_aa: Dict[int, Optional[str]] = {}
ref_to_col: Dict[int, int] = {}
for col, (a, b) in enumerate(zip(aln_ref, aln_qry)):
if a != "-":
ref_pos += 1
ref_to_col[ref_pos] = col
if b != "-":
qry_pos += 1
ref_to_qry_pos[ref_pos] = qry_pos
ref_to_qry_aa[ref_pos] = b
else:
ref_to_qry_pos[ref_pos] = None
ref_to_qry_aa[ref_pos] = None
else:
if b != "-":
qry_pos += 1
return ref_to_qry_pos, ref_to_qry_aa, ref_to_col
def summarize_sites(ref: str, sites: Dict[int, str],
ref_to_qry_pos: Dict[int, Optional[int]],
ref_to_qry_aa: Dict[int, Optional[str]]) -> List[MappingRow]:
rows: List[MappingRow] = []
for p in sorted(sites.keys()):
exp = sites[p]
ref_aa = ref[p-1] if 1 <= p <= len(ref) else "?"
qpos = ref_to_qry_pos.get(p, None)
qaa = ref_to_qry_aa.get(p, None)
ok = (qaa == exp) if qaa is not None else False
rows.append(MappingRow(ref_pos=p, ref_aa=ref_aa, qry_pos=qpos, qry_aa=qaa, expected=exp, ok=ok))
return rows
def print_table(rows: List[MappingRow]) -> None:
header = f"{'ref_pos':>7} {'ref':>3} {'qry_pos':>7} {'qry':>3} {'exp':>3} {'ok':>4}"
print(header)
print("-" * len(header))
for r in rows:
qp = "-" if r.qry_pos is None else str(r.qry_pos)
qa = "-" if r.qry_aa is None else r.qry_aa
ok = "YES" if r.ok else "NO"
print(f"{r.ref_pos:>7} {r.ref_aa:>3} {qp:>7} {qa:>3} {r.expected:>3} {ok:>4}")
def print_windows(aln_ref: str, aln_mid: str, aln_qry: str,
ref_to_col: Dict[int, int],
site_positions: List[int],
window: int = 30) -> None:
L = len(aln_ref)
for p in site_positions:
if p not in ref_to_col:
continue
c = ref_to_col[p]
start = max(0, c - window)
end = min(L, c + window + 1)
print(f"\nAlignment window around ref position {p} (cols {start}-{end-1}):")
print("ref :", aln_ref[start:end])
print(" :", aln_mid[start:end])
print("qry :", aln_qry[start:end])
def parse_sites(site_args: List[str]) -> Dict[int, str]:
out: Dict[int, str] = {}
for item in site_args:
if ":" not in item:
raise ValueError(f"Bad site spec '{item}'. Use e.g. 65:T")
p_str, aa = item.split(":", 1)
p = int(p_str)
aa = aa.strip().upper()
if len(aa) != 1:
raise ValueError(f"Expected single-letter AA in '{item}'")
out[p] = aa
return out
def main():
ap = argparse.ArgumentParser()
ap.add_argument("--ref", type=str, default="", help="Reference FASTA (optional). If omitted, uses built-in eGFP sequence.")
ap.add_argument("--qry", type=str, default="", help="Query FASTA (optional).")
ap.add_argument("--query", type=str, default="", help="Query sequence string (optional).")
ap.add_argument("--sites", nargs="*", default=[], help="Site constraints like 65:T 66:Y ... (optional).")
ap.add_argument("--open_gap", type=float, default=-10.0)
ap.add_argument("--extend_gap", type=float, default=-0.5)
ap.add_argument("--no_full", action="store_true", help="Do not print full alignment (only windows + table).")
ap.add_argument("--window", type=int, default=30, help="Half-window size for alignment snippets.")
args = ap.parse_args()
ref = clean_seq(read_fasta_one(args.ref)) if args.ref else DEFAULT_EGFP
if args.qry:
qry = clean_seq(read_fasta_one(args.qry))
elif args.query:
qry = clean_seq(args.query)
else:
raise SystemExit("Provide a query via --qry (FASTA) or --query (sequence string).")
sites = parse_sites(args.sites) if args.sites else DEFAULT_SITES
aln_ref, aln_mid, aln_qry, score = best_global_alignment(ref, qry, open_gap=args.open_gap, extend_gap=args.extend_gap)
ref_to_qry_pos, ref_to_qry_aa, ref_to_col = build_ref_to_qry_map(aln_ref, aln_qry)
rows = summarize_sites(ref, sites, ref_to_qry_pos, ref_to_qry_aa)
print(f"Reference length: {len(ref)} aa")
print(f"Query length: {len(qry)} aa")
print(f"Alignment length: {len(aln_ref)} columns")
print(f"Alignment score: {score:.2f}")
print("\nRequired-site mapping (reference -> query):")
print_table(rows)
print_windows(aln_ref, aln_mid, aln_qry, ref_to_col, sorted(sites.keys()), window=args.window)
if not args.no_full:
print("\nFull alignment:")
print("ref :", aln_ref)
print(" :", aln_mid)
print("qry :", aln_qry)
passed = all(r.ok for r in rows)
print(f"\nPASS = {passed}")
if __name__ == "__main__":
main()

Xet Storage Details

Size:
10 kB
·
Xet hash:
c2e1de1931616f038db78f06426042f633a103ba36e0385d4bd671185d144bbf

Xet efficiently stores files, intelligently splitting them into unique chunks and accelerating uploads and downloads. More info.