Spaces:
Running
Running
| """Phase 1 — Sequence parsing, validation, and translation. | |
| Accepts a raw nucleotide string or a FASTA file/string and produces a | |
| :class:`SequenceRecord` carrying the validated DNA, the translated amino-acid | |
| sequence, and an index map between nucleotide coordinates and AA coordinates. | |
| The translation step uses the NCBI standard genetic code (Table 1) via | |
| Biopython. Ambiguous IUPAC codons are rejected at translation time because | |
| PLM scoring requires a fully determined wild-type protein. | |
| """ | |
| from __future__ import annotations | |
| import io | |
| import logging | |
| import re | |
| from dataclasses import dataclass, field | |
| from pathlib import Path | |
| from typing import Dict, List, Optional, Tuple, Union | |
| from Bio import SeqIO | |
| from Bio.Data.CodonTable import TranslationError | |
| from Bio.Seq import Seq | |
| logger = logging.getLogger(__name__) | |
| # IUPAC nucleotide alphabet (unambiguous + ambiguous). | |
| IUPAC_NT = set("ACGTURYSWKMBDHVN") | |
| UNAMBIGUOUS_NT = set("ACGT") | |
| START_CODONS = {"ATG"} | |
| STOP_CODONS = {"TAA", "TAG", "TGA"} | |
| # Canonical 20 amino acids + stop. Used when the input is a protein sequence | |
| # rather than a CDS (raw paste or .faa/.pep file). | |
| AA_ALPHABET = set("ACDEFGHIKLMNPQRSTVWY") | |
| AA_WITH_STOP = AA_ALPHABET | {"*"} | |
| class SequenceValidationError(ValueError): | |
| """Raised when an input DNA sequence violates a biological precondition. | |
| Carries the 0-indexed nucleotide position of the offending character (when | |
| known), so the UI can map it back to a line/column in the user's paste. | |
| """ | |
| def __init__( | |
| self, | |
| message: str, | |
| *, | |
| nt_position: Optional[int] = None, | |
| code: Optional[str] = None, | |
| ) -> None: | |
| super().__init__(message) | |
| self.nt_position = nt_position | |
| self.code = code | |
| class SequenceRecord: | |
| """Container for a validated CDS and its translation. | |
| Attributes: | |
| identifier: FASTA header or user-supplied label. | |
| dna: Validated coding DNA sequence (5' -> 3', uppercase). | |
| protein: Translated amino-acid sequence, stop codon stripped. | |
| has_stop: Whether the source CDS ended in a canonical stop codon. | |
| nt_to_aa: Mapping from 0-indexed nucleotide position to 0-indexed AA | |
| position. Stop-codon nucleotides map to ``None``. | |
| aa_to_nt: Mapping from 0-indexed AA position to the triplet of | |
| 0-indexed nucleotide positions encoding it. | |
| """ | |
| identifier: str | |
| dna: str | |
| protein: str | |
| has_stop: bool | |
| nt_to_aa: Dict[int, Optional[int]] = field(default_factory=dict) | |
| aa_to_nt: Dict[int, Tuple[int, int, int]] = field(default_factory=dict) | |
| def __len__(self) -> int: | |
| return len(self.protein) | |
| def _clean(seq: str) -> str: | |
| """Strip whitespace, normalize uracil to thymine, uppercase.""" | |
| return re.sub(r"\s+", "", seq).upper().replace("U", "T") | |
| def _clean_protein(seq: str) -> str: | |
| """Strip whitespace, drop trailing stop symbol, uppercase. No U→T mapping.""" | |
| cleaned = re.sub(r"\s+", "", seq).upper().rstrip("*") | |
| return cleaned | |
| def looks_like_protein(seq: str) -> bool: | |
| """Heuristic: does ``seq`` read like a protein rather than DNA? | |
| A short DNA string (e.g. "ATGCAT") is fully contained in the AA alphabet, | |
| so we can't use AA-only characters as the discriminator. Instead we check | |
| whether *any* character is non-nucleic-acid — the presence of E, F, I, L, | |
| P, Q, etc. is a clear AA signal. If every character is in {A,C,G,T,U,N}, | |
| treat it as DNA. | |
| """ | |
| cleaned = re.sub(r"\s+", "", seq).upper().rstrip("*") | |
| if not cleaned: | |
| return False | |
| nt_safe = set("ACGTUN") | |
| return not set(cleaned).issubset(nt_safe) | |
| def validate_dna(seq: str, *, require_start: bool = True, require_stop: bool = True) -> str: | |
| """Validate that ``seq`` is a plausible CDS. | |
| Checks: | |
| * Every character is a valid IUPAC nucleotide symbol. | |
| * Length is a positive multiple of 3. | |
| * The first codon is ATG (start) — disable with ``require_start=False``. | |
| * The final codon is a stop codon — disable with ``require_stop=False``. | |
| Returns the cleaned, uppercase DNA string. Raises | |
| :class:`SequenceValidationError` on any failure. | |
| """ | |
| cleaned = _clean(seq) | |
| if not cleaned: | |
| raise SequenceValidationError("Empty sequence after cleaning.") | |
| bad = set(cleaned) - IUPAC_NT | |
| if bad: | |
| # Find the first offending character so we can point at it. | |
| bad_pos = next((idx for idx, ch in enumerate(cleaned) if ch in bad), None) | |
| raise SequenceValidationError( | |
| f"Sequence contains non-IUPAC characters: {sorted(bad)!r}", | |
| nt_position=bad_pos, | |
| code="invalid_char", | |
| ) | |
| if len(cleaned) % 3 != 0: | |
| raise SequenceValidationError( | |
| f"Sequence length ({len(cleaned)} nt) is not a multiple of 3; " | |
| "cannot translate as a CDS.", | |
| nt_position=len(cleaned) - (len(cleaned) % 3), | |
| code="bad_length", | |
| ) | |
| if require_start and cleaned[:3] not in START_CODONS: | |
| raise SequenceValidationError( | |
| f"Sequence does not begin with a start codon (saw {cleaned[:3]!r}).", | |
| nt_position=0, | |
| code="no_start", | |
| ) | |
| if require_stop and cleaned[-3:] not in STOP_CODONS: | |
| raise SequenceValidationError( | |
| f"Sequence does not end with a stop codon (saw {cleaned[-3:]!r}).", | |
| nt_position=len(cleaned) - 3, | |
| code="no_stop", | |
| ) | |
| # Reject *internal* stop codons. Strip ALL trailing stops iteratively — | |
| # many expression cassettes use a double or triple TAA/TGA for fail-safe | |
| # termination, and those tandem stops are not "premature". | |
| body = cleaned | |
| while len(body) >= 6 and body[-3:] in STOP_CODONS: | |
| body = body[:-3] | |
| for i in range(0, len(body), 3): | |
| if body[i : i + 3] in STOP_CODONS: | |
| raise SequenceValidationError( | |
| f"Premature stop codon at nucleotide position {i} ({body[i:i+3]}).", | |
| nt_position=i, | |
| code="premature_stop", | |
| ) | |
| return cleaned | |
| def _build_index_maps( | |
| n_nt: int, has_stop: bool | |
| ) -> Tuple[Dict[int, Optional[int]], Dict[int, Tuple[int, int, int]]]: | |
| """Construct bidirectional nt<->aa coordinate maps for a CDS of length ``n_nt``.""" | |
| n_codons = n_nt // 3 | |
| n_aa = n_codons - 1 if has_stop else n_codons | |
| nt_to_aa: Dict[int, Optional[int]] = {} | |
| aa_to_nt: Dict[int, Tuple[int, int, int]] = {} | |
| for aa_idx in range(n_aa): | |
| triplet = (aa_idx * 3, aa_idx * 3 + 1, aa_idx * 3 + 2) | |
| aa_to_nt[aa_idx] = triplet | |
| for nt in triplet: | |
| nt_to_aa[nt] = aa_idx | |
| if has_stop: | |
| for nt in range(n_aa * 3, n_nt): | |
| nt_to_aa[nt] = None | |
| return nt_to_aa, aa_to_nt | |
| def translate_dna( | |
| dna: str, | |
| identifier: str = "query", | |
| *, | |
| require_start: bool = True, | |
| require_stop: bool = True, | |
| ) -> SequenceRecord: | |
| """Validate and translate ``dna`` into a :class:`SequenceRecord`. | |
| Uses NCBI Table 1 (standard) genetic code via Biopython. Ambiguous codons | |
| (containing IUPAC ambiguity codes like N or R) raise | |
| :class:`SequenceValidationError` because PLM scoring requires a determined | |
| protein sequence. | |
| """ | |
| cleaned = validate_dna(dna, require_start=require_start, require_stop=require_stop) | |
| # Treat a terminal stop as a stop regardless of the require_stop flag: when | |
| # the user pastes a CDS but disables stop checking, we still want a clean | |
| # protein (no trailing "*") for downstream scoring. | |
| has_stop = cleaned[-3:] in STOP_CODONS | |
| try: | |
| protein = str(Seq(cleaned).translate(table=1, to_stop=has_stop, cds=False)) | |
| except TranslationError as exc: | |
| raise SequenceValidationError(f"Translation failed: {exc}") from exc | |
| if any(ch not in UNAMBIGUOUS_NT for ch in cleaned): | |
| raise SequenceValidationError( | |
| "Sequence contains ambiguous IUPAC codes; PLM scoring requires a " | |
| "fully determined wild-type. Resolve ambiguity before scoring." | |
| ) | |
| nt_to_aa, aa_to_nt = _build_index_maps(len(cleaned), has_stop=has_stop) | |
| logger.info( | |
| "Translated %s: %d nt -> %d aa (terminal stop=%s)", | |
| identifier, | |
| len(cleaned), | |
| len(protein), | |
| has_stop, | |
| ) | |
| return SequenceRecord( | |
| identifier=identifier, | |
| dna=cleaned, | |
| protein=protein, | |
| has_stop=has_stop, | |
| nt_to_aa=nt_to_aa, | |
| aa_to_nt=aa_to_nt, | |
| ) | |
| # Common selection-marker name patterns. When a plasmid annotation contains | |
| # any of these substrings we deprioritize that CDS as a default pick, because | |
| # the user almost certainly wants to evolve their gene of interest rather | |
| # than the antibiotic resistance gene that came with the cloning vector. | |
| _SELECTION_MARKERS = { | |
| "aph", "neo", "npt", "kan", # kanamycin / G418 | |
| "bla", "amp", # ampicillin | |
| "cat", "cmr", "chlor", # chloramphenicol | |
| "tet", # tetracycline | |
| "hph", "hyg", # hygromycin | |
| "ble", "zeo", # zeocin / phleomycin | |
| "pac", "puro", # puromycin | |
| "sm", "sptr", "aada", # streptomycin / spectinomycin | |
| } | |
| def _feature_label(feature) -> str: | |
| """Best-effort human-readable label for a SeqRecord feature.""" | |
| for key in ("gene", "product", "label", "note"): | |
| vals = feature.qualifiers.get(key) | |
| if vals: | |
| return str(vals[0]) | |
| return feature.type or "unnamed" | |
| def _is_selection_marker(label: str) -> bool: | |
| low = label.lower() | |
| return any(marker in low for marker in _SELECTION_MARKERS) | |
| def list_cds_features(path: Path) -> List[Tuple[str, int]]: | |
| """Return [(label, length_nt), ...] for every CDS feature in a structured file. | |
| Useful for the desktop launcher: it can present these as a chooser so the | |
| user explicitly picks the gene to evolve (rather than having the engine | |
| guess the wrong CDS, e.g. the kanamycin resistance marker). | |
| """ | |
| suffix = path.suffix.lower() | |
| fmt = _structured_format_for(suffix) | |
| record = next(SeqIO.parse(str(path), fmt), None) | |
| if record is None: | |
| return [] | |
| out: List[Tuple[str, int]] = [] | |
| for f in record.features: | |
| if f.type == "CDS": | |
| length = int(f.location.end) - int(f.location.start) | |
| out.append((_feature_label(f), length)) | |
| return out | |
| def _structured_format_for(suffix: str) -> str: | |
| return { | |
| ".dna": "snapgene", | |
| ".gb": "genbank", | |
| ".gbk": "genbank", | |
| ".genbank": "genbank", | |
| ".embl": "embl", | |
| }[suffix] | |
| def _parse_structured_file( | |
| path: Path, *, cds_feature: Optional[str] = None | |
| ) -> SequenceRecord: | |
| """Parse a GenBank, SnapGene, or EMBL file and pick one CDS to evolve. | |
| Selection rules (first match wins): | |
| 1. If ``cds_feature`` is given, find the CDS whose label contains that | |
| substring (case-insensitive). Raises if no match — better to fail | |
| loudly than silently evolve the wrong gene. | |
| 2. Otherwise pick the longest CDS that is NOT a recognized selection | |
| marker (kanR/ampR/cmR/etc.). Selection markers are skipped first so | |
| the user's gene of interest wins by default in a typical plasmid. | |
| 3. If every CDS looks like a selection marker, fall back to the longest. | |
| 4. If no CDS features exist at all, scan the six reading frames for the | |
| longest ORF (ATG...stop). | |
| """ | |
| fmt = _structured_format_for(path.suffix.lower()) | |
| record = next(SeqIO.parse(str(path), fmt), None) | |
| if record is None: | |
| raise SequenceValidationError(f"No records found in {path}.") | |
| cds_features = [f for f in record.features if f.type == "CDS"] | |
| if cds_features and cds_feature: | |
| wanted = cds_feature.lower() | |
| match = next( | |
| (f for f in cds_features if wanted in _feature_label(f).lower()), | |
| None, | |
| ) | |
| if match is None: | |
| available = ", ".join(_feature_label(f) for f in cds_features) | |
| raise SequenceValidationError( | |
| f"No CDS feature matching {cds_feature!r} in {path.name}. " | |
| f"Available CDS: {available}" | |
| ) | |
| chosen = match | |
| elif cds_features: | |
| non_marker = [f for f in cds_features if not _is_selection_marker(_feature_label(f))] | |
| pool = non_marker if non_marker else cds_features | |
| chosen = max(pool, key=lambda f: int(f.location.end) - int(f.location.start)) | |
| else: | |
| return _longest_orf_record(str(record.seq), identifier=record.id or path.stem) | |
| cds_dna = str(chosen.extract(record.seq)).upper().replace("U", "T") | |
| label = _feature_label(chosen) | |
| logger.info( | |
| "Selected CDS feature '%s' (%d nt) from %s.", label, len(cds_dna), path.name | |
| ) | |
| return translate_dna( | |
| cds_dna, identifier=str(label), require_start=False, require_stop=False | |
| ) | |
| class OrfCandidate: | |
| """One ATG-bounded ORF discovered by a 6-frame scan of raw DNA.""" | |
| label: str # ORF_1, ORF_2, … assigned in descending length order. | |
| frame: int # +1/+2/+3 for forward strand, -1/-2/-3 for reverse. | |
| start_nt: int # 0-indexed start on the FORWARD strand. | |
| end_nt: int # 0-indexed exclusive end on the forward strand. | |
| dna: str # The ORF nucleotide sequence (ATG…stop). | |
| protein: str # Translated protein (no trailing *). | |
| def to_summary(self) -> Dict[str, object]: | |
| return { | |
| "label": self.label, | |
| "length_nt": len(self.dna), | |
| "frame": self.frame, | |
| "protein_length": len(self.protein), | |
| } | |
| def find_orfs_in_dna(dna: str, min_aa: int = 30) -> List[OrfCandidate]: | |
| """Scan all 6 frames for ATG…stop ORFs at least ``min_aa`` residues long. | |
| Returns ORFs sorted by descending protein length, labeled ``ORF_1``, | |
| ``ORF_2``, … This is the fallback when the user pastes a plasmid sequence | |
| (or any long DNA with multiple stop codons across frames) — we can't | |
| treat the whole thing as one CDS, but we can offer the ORFs we find as | |
| a chooser, the same way we do for annotated `.gb` files. | |
| """ | |
| cleaned = _clean(dna) | |
| if not cleaned or not set(cleaned).issubset(UNAMBIGUOUS_NT): | |
| return [] | |
| forward = cleaned | |
| reverse = str(Seq(cleaned).reverse_complement()) | |
| raw: List[OrfCandidate] = [] | |
| for strand_idx, seq in enumerate((forward, reverse)): | |
| for offset in range(3): | |
| i = offset | |
| while i + 3 <= len(seq): | |
| if seq[i : i + 3] != "ATG": | |
| i += 3 | |
| continue | |
| j = i | |
| while j + 3 <= len(seq): | |
| if seq[j : j + 3] in STOP_CODONS: | |
| orf_dna = seq[i : j + 3] | |
| orf_protein = str(Seq(orf_dna).translate(table=1, to_stop=True)) | |
| if len(orf_protein) >= min_aa: | |
| if strand_idx == 0: | |
| start = i | |
| end = j + 3 | |
| frame = (i % 3) + 1 | |
| else: | |
| end = len(cleaned) - i | |
| start = len(cleaned) - (j + 3) | |
| frame = -((i % 3) + 1) | |
| raw.append( | |
| OrfCandidate( | |
| label="", | |
| frame=frame, | |
| start_nt=start, | |
| end_nt=end, | |
| dna=orf_dna, | |
| protein=orf_protein, | |
| ) | |
| ) | |
| i = j + 3 | |
| break | |
| j += 3 | |
| else: | |
| break | |
| raw.sort(key=lambda o: -len(o.protein)) | |
| for idx, orf in enumerate(raw): | |
| orf.label = f"ORF_{idx + 1}" | |
| return raw | |
| def _longest_orf_record(dna: str, identifier: str) -> SequenceRecord: | |
| """Scan all 6 reading frames and translate the longest ATG-bounded ORF.""" | |
| cleaned = _clean(dna) | |
| if any(ch not in UNAMBIGUOUS_NT for ch in cleaned): | |
| raise SequenceValidationError( | |
| "Sequence contains ambiguous nucleotides; cannot find a clean ORF." | |
| ) | |
| candidates: List[str] = [] | |
| rev = str(Seq(cleaned).reverse_complement()) | |
| for strand_seq in (cleaned, rev): | |
| for offset in range(3): | |
| i = offset | |
| while i + 3 <= len(strand_seq): | |
| if strand_seq[i : i + 3] == "ATG": | |
| j = i | |
| while j + 3 <= len(strand_seq): | |
| codon = strand_seq[j : j + 3] | |
| if codon in STOP_CODONS: | |
| candidates.append(strand_seq[i : j + 3]) | |
| i = j + 3 | |
| break | |
| j += 3 | |
| else: | |
| i = j # unterminated ORF — skip. | |
| else: | |
| i += 3 | |
| if not candidates: | |
| raise SequenceValidationError( | |
| "Could not find any ORF (ATG...stop) in any of the six reading frames." | |
| ) | |
| best = max(candidates, key=len) | |
| logger.info( | |
| "No CDS feature available; selected longest ORF (%d nt) by 6-frame scan.", | |
| len(best), | |
| ) | |
| return translate_dna( | |
| best, identifier=identifier, require_start=True, require_stop=True | |
| ) | |
| def parse_protein(seq: str, identifier: str = "query") -> SequenceRecord: | |
| """Validate a raw amino-acid sequence and build a :class:`SequenceRecord`. | |
| No DNA is attached — Phase 4 will reverse-translate from ``.protein``. | |
| """ | |
| cleaned = _clean_protein(seq) | |
| if not cleaned: | |
| raise SequenceValidationError("Empty protein sequence.") | |
| bad = set(cleaned) - AA_ALPHABET | |
| if bad: | |
| raise SequenceValidationError( | |
| f"Protein contains non-canonical residues: {sorted(bad)!r}. " | |
| "Only the 20 standard amino acids (ACDEFGHIKLMNPQRSTVWY) are supported." | |
| ) | |
| logger.info("Parsed protein %s: %d aa.", identifier, len(cleaned)) | |
| return SequenceRecord( | |
| identifier=identifier, | |
| dna="", | |
| protein=cleaned, | |
| has_stop=False, | |
| nt_to_aa={}, | |
| aa_to_nt={}, | |
| ) | |
| def _looks_like_path(s: Union[str, Path]) -> bool: | |
| """Cheap pre-check: should we even attempt a filesystem stat on ``s``? | |
| A long pasted sequence (thousands of chars, possibly multi-line) would | |
| blow up :meth:`pathlib.Path.is_file` with ``OSError: File name too long`` | |
| on macOS (PATH_MAX = 1024, NAME_MAX = 255). Reject anything that obviously | |
| isn't a path before we hit the OS. | |
| """ | |
| if isinstance(s, Path): | |
| return True | |
| if not isinstance(s, str): | |
| return False | |
| if not s or len(s) > 1024: | |
| return False | |
| if "\n" in s or "\r" in s: | |
| return False | |
| try: | |
| return Path(s).is_file() | |
| except OSError: | |
| return False | |
| def parse_input( | |
| source: Union[str, Path], | |
| *, | |
| require_start: bool = True, | |
| require_stop: bool = True, | |
| cds_feature: Optional[str] = None, | |
| ) -> SequenceRecord: | |
| """Parse a sequence from a path, FASTA text, raw DNA, or raw protein. | |
| Detection order: | |
| 1. If ``source`` is a Path or names an existing file, read it (FASTA-aware, | |
| falls back to raw text). File contents are then auto-classified as DNA | |
| vs protein by :func:`looks_like_protein`. | |
| 2. Else if the string starts with ``>``, parse as inline FASTA and | |
| classify the first record's sequence. | |
| 3. Otherwise treat as a raw sequence and classify. | |
| """ | |
| # ------------------------------------------------------------- file input | |
| if _looks_like_path(source): | |
| path = Path(source) | |
| suffix = path.suffix.lower() | |
| # Structured formats — SnapGene/GenBank/EMBL — go through their own | |
| # parser because they're either binary (.dna) or have rich metadata | |
| # that text-only handling would munge. | |
| if suffix in {".dna", ".gb", ".gbk", ".genbank", ".embl"}: | |
| return _parse_structured_file(path, cds_feature=cds_feature) | |
| text = path.read_text() | |
| # FASTA-aware: if the file looks like FASTA, parse it; else treat | |
| # contents as one raw sequence whose identifier defaults to the stem. | |
| if text.lstrip().startswith(">"): | |
| record = next(SeqIO.parse(io.StringIO(text), "fasta"), None) | |
| if record is None: | |
| raise SequenceValidationError(f"No FASTA records found in {path}.") | |
| raw_seq = str(record.seq) | |
| ident = record.id or path.stem | |
| else: | |
| raw_seq = text | |
| ident = path.stem | |
| if looks_like_protein(raw_seq): | |
| return parse_protein(raw_seq, identifier=ident) | |
| return translate_dna( | |
| raw_seq, | |
| identifier=ident, | |
| require_start=require_start, | |
| require_stop=require_stop, | |
| ) | |
| # ----------------------------------------------------------- string input | |
| text = str(source) | |
| if text.lstrip().startswith(">"): | |
| record = next(SeqIO.parse(io.StringIO(text), "fasta"), None) | |
| if record is None: | |
| raise SequenceValidationError("Inline FASTA contained no records.") | |
| raw_seq = str(record.seq) | |
| ident = record.id or "query" | |
| if looks_like_protein(raw_seq): | |
| return parse_protein(raw_seq, identifier=ident) | |
| return translate_dna( | |
| raw_seq, | |
| identifier=ident, | |
| require_start=require_start, | |
| require_stop=require_stop, | |
| ) | |
| if looks_like_protein(text): | |
| return parse_protein(text, identifier="query") | |
| return translate_dna( | |
| text, identifier="query", require_start=require_start, require_stop=require_stop | |
| ) | |
| def list_codons(dna: str) -> List[str]: | |
| """Split a CDS into its in-frame codons (no validation; call ``validate_dna`` first).""" | |
| return [dna[i : i + 3] for i in range(0, len(dna), 3)] | |