"""Phase 1 — Sequence parsing, validation, and translation. Accepts a raw nucleotide string or a FASTA file/string and produces a :class:`SequenceRecord` carrying the validated DNA, the translated amino-acid sequence, and an index map between nucleotide coordinates and AA coordinates. The translation step uses the NCBI standard genetic code (Table 1) via Biopython. Ambiguous IUPAC codons are rejected at translation time because PLM scoring requires a fully determined wild-type protein. """ from __future__ import annotations import io import logging import re from dataclasses import dataclass, field from pathlib import Path from typing import Dict, List, Optional, Tuple, Union from Bio import SeqIO from Bio.Data.CodonTable import TranslationError from Bio.Seq import Seq logger = logging.getLogger(__name__) # IUPAC nucleotide alphabet (unambiguous + ambiguous). IUPAC_NT = set("ACGTURYSWKMBDHVN") UNAMBIGUOUS_NT = set("ACGT") START_CODONS = {"ATG"} STOP_CODONS = {"TAA", "TAG", "TGA"} # Canonical 20 amino acids + stop. Used when the input is a protein sequence # rather than a CDS (raw paste or .faa/.pep file). AA_ALPHABET = set("ACDEFGHIKLMNPQRSTVWY") AA_WITH_STOP = AA_ALPHABET | {"*"} class SequenceValidationError(ValueError): """Raised when an input DNA sequence violates a biological precondition. Carries the 0-indexed nucleotide position of the offending character (when known), so the UI can map it back to a line/column in the user's paste. """ def __init__( self, message: str, *, nt_position: Optional[int] = None, code: Optional[str] = None, ) -> None: super().__init__(message) self.nt_position = nt_position self.code = code @dataclass class SequenceRecord: """Container for a validated CDS and its translation. Attributes: identifier: FASTA header or user-supplied label. dna: Validated coding DNA sequence (5' -> 3', uppercase). protein: Translated amino-acid sequence, stop codon stripped. has_stop: Whether the source CDS ended in a canonical stop codon. nt_to_aa: Mapping from 0-indexed nucleotide position to 0-indexed AA position. Stop-codon nucleotides map to ``None``. aa_to_nt: Mapping from 0-indexed AA position to the triplet of 0-indexed nucleotide positions encoding it. """ identifier: str dna: str protein: str has_stop: bool nt_to_aa: Dict[int, Optional[int]] = field(default_factory=dict) aa_to_nt: Dict[int, Tuple[int, int, int]] = field(default_factory=dict) def __len__(self) -> int: return len(self.protein) def _clean(seq: str) -> str: """Strip whitespace, normalize uracil to thymine, uppercase.""" return re.sub(r"\s+", "", seq).upper().replace("U", "T") def _clean_protein(seq: str) -> str: """Strip whitespace, drop trailing stop symbol, uppercase. No U→T mapping.""" cleaned = re.sub(r"\s+", "", seq).upper().rstrip("*") return cleaned def looks_like_protein(seq: str) -> bool: """Heuristic: does ``seq`` read like a protein rather than DNA? A short DNA string (e.g. "ATGCAT") is fully contained in the AA alphabet, so we can't use AA-only characters as the discriminator. Instead we check whether *any* character is non-nucleic-acid — the presence of E, F, I, L, P, Q, etc. is a clear AA signal. If every character is in {A,C,G,T,U,N}, treat it as DNA. """ cleaned = re.sub(r"\s+", "", seq).upper().rstrip("*") if not cleaned: return False nt_safe = set("ACGTUN") return not set(cleaned).issubset(nt_safe) def validate_dna(seq: str, *, require_start: bool = True, require_stop: bool = True) -> str: """Validate that ``seq`` is a plausible CDS. Checks: * Every character is a valid IUPAC nucleotide symbol. * Length is a positive multiple of 3. * The first codon is ATG (start) — disable with ``require_start=False``. * The final codon is a stop codon — disable with ``require_stop=False``. Returns the cleaned, uppercase DNA string. Raises :class:`SequenceValidationError` on any failure. """ cleaned = _clean(seq) if not cleaned: raise SequenceValidationError("Empty sequence after cleaning.") bad = set(cleaned) - IUPAC_NT if bad: # Find the first offending character so we can point at it. bad_pos = next((idx for idx, ch in enumerate(cleaned) if ch in bad), None) raise SequenceValidationError( f"Sequence contains non-IUPAC characters: {sorted(bad)!r}", nt_position=bad_pos, code="invalid_char", ) if len(cleaned) % 3 != 0: raise SequenceValidationError( f"Sequence length ({len(cleaned)} nt) is not a multiple of 3; " "cannot translate as a CDS.", nt_position=len(cleaned) - (len(cleaned) % 3), code="bad_length", ) if require_start and cleaned[:3] not in START_CODONS: raise SequenceValidationError( f"Sequence does not begin with a start codon (saw {cleaned[:3]!r}).", nt_position=0, code="no_start", ) if require_stop and cleaned[-3:] not in STOP_CODONS: raise SequenceValidationError( f"Sequence does not end with a stop codon (saw {cleaned[-3:]!r}).", nt_position=len(cleaned) - 3, code="no_stop", ) # Reject *internal* stop codons. Strip ALL trailing stops iteratively — # many expression cassettes use a double or triple TAA/TGA for fail-safe # termination, and those tandem stops are not "premature". body = cleaned while len(body) >= 6 and body[-3:] in STOP_CODONS: body = body[:-3] for i in range(0, len(body), 3): if body[i : i + 3] in STOP_CODONS: raise SequenceValidationError( f"Premature stop codon at nucleotide position {i} ({body[i:i+3]}).", nt_position=i, code="premature_stop", ) return cleaned def _build_index_maps( n_nt: int, has_stop: bool ) -> Tuple[Dict[int, Optional[int]], Dict[int, Tuple[int, int, int]]]: """Construct bidirectional nt<->aa coordinate maps for a CDS of length ``n_nt``.""" n_codons = n_nt // 3 n_aa = n_codons - 1 if has_stop else n_codons nt_to_aa: Dict[int, Optional[int]] = {} aa_to_nt: Dict[int, Tuple[int, int, int]] = {} for aa_idx in range(n_aa): triplet = (aa_idx * 3, aa_idx * 3 + 1, aa_idx * 3 + 2) aa_to_nt[aa_idx] = triplet for nt in triplet: nt_to_aa[nt] = aa_idx if has_stop: for nt in range(n_aa * 3, n_nt): nt_to_aa[nt] = None return nt_to_aa, aa_to_nt def translate_dna( dna: str, identifier: str = "query", *, require_start: bool = True, require_stop: bool = True, ) -> SequenceRecord: """Validate and translate ``dna`` into a :class:`SequenceRecord`. Uses NCBI Table 1 (standard) genetic code via Biopython. Ambiguous codons (containing IUPAC ambiguity codes like N or R) raise :class:`SequenceValidationError` because PLM scoring requires a determined protein sequence. """ cleaned = validate_dna(dna, require_start=require_start, require_stop=require_stop) # Treat a terminal stop as a stop regardless of the require_stop flag: when # the user pastes a CDS but disables stop checking, we still want a clean # protein (no trailing "*") for downstream scoring. has_stop = cleaned[-3:] in STOP_CODONS try: protein = str(Seq(cleaned).translate(table=1, to_stop=has_stop, cds=False)) except TranslationError as exc: raise SequenceValidationError(f"Translation failed: {exc}") from exc if any(ch not in UNAMBIGUOUS_NT for ch in cleaned): raise SequenceValidationError( "Sequence contains ambiguous IUPAC codes; PLM scoring requires a " "fully determined wild-type. Resolve ambiguity before scoring." ) nt_to_aa, aa_to_nt = _build_index_maps(len(cleaned), has_stop=has_stop) logger.info( "Translated %s: %d nt -> %d aa (terminal stop=%s)", identifier, len(cleaned), len(protein), has_stop, ) return SequenceRecord( identifier=identifier, dna=cleaned, protein=protein, has_stop=has_stop, nt_to_aa=nt_to_aa, aa_to_nt=aa_to_nt, ) # Common selection-marker name patterns. When a plasmid annotation contains # any of these substrings we deprioritize that CDS as a default pick, because # the user almost certainly wants to evolve their gene of interest rather # than the antibiotic resistance gene that came with the cloning vector. _SELECTION_MARKERS = { "aph", "neo", "npt", "kan", # kanamycin / G418 "bla", "amp", # ampicillin "cat", "cmr", "chlor", # chloramphenicol "tet", # tetracycline "hph", "hyg", # hygromycin "ble", "zeo", # zeocin / phleomycin "pac", "puro", # puromycin "sm", "sptr", "aada", # streptomycin / spectinomycin } def _feature_label(feature) -> str: """Best-effort human-readable label for a SeqRecord feature.""" for key in ("gene", "product", "label", "note"): vals = feature.qualifiers.get(key) if vals: return str(vals[0]) return feature.type or "unnamed" def _is_selection_marker(label: str) -> bool: low = label.lower() return any(marker in low for marker in _SELECTION_MARKERS) def list_cds_features(path: Path) -> List[Tuple[str, int]]: """Return [(label, length_nt), ...] for every CDS feature in a structured file. Useful for the desktop launcher: it can present these as a chooser so the user explicitly picks the gene to evolve (rather than having the engine guess the wrong CDS, e.g. the kanamycin resistance marker). """ suffix = path.suffix.lower() fmt = _structured_format_for(suffix) record = next(SeqIO.parse(str(path), fmt), None) if record is None: return [] out: List[Tuple[str, int]] = [] for f in record.features: if f.type == "CDS": length = int(f.location.end) - int(f.location.start) out.append((_feature_label(f), length)) return out def _structured_format_for(suffix: str) -> str: return { ".dna": "snapgene", ".gb": "genbank", ".gbk": "genbank", ".genbank": "genbank", ".embl": "embl", }[suffix] def _parse_structured_file( path: Path, *, cds_feature: Optional[str] = None ) -> SequenceRecord: """Parse a GenBank, SnapGene, or EMBL file and pick one CDS to evolve. Selection rules (first match wins): 1. If ``cds_feature`` is given, find the CDS whose label contains that substring (case-insensitive). Raises if no match — better to fail loudly than silently evolve the wrong gene. 2. Otherwise pick the longest CDS that is NOT a recognized selection marker (kanR/ampR/cmR/etc.). Selection markers are skipped first so the user's gene of interest wins by default in a typical plasmid. 3. If every CDS looks like a selection marker, fall back to the longest. 4. If no CDS features exist at all, scan the six reading frames for the longest ORF (ATG...stop). """ fmt = _structured_format_for(path.suffix.lower()) record = next(SeqIO.parse(str(path), fmt), None) if record is None: raise SequenceValidationError(f"No records found in {path}.") cds_features = [f for f in record.features if f.type == "CDS"] if cds_features and cds_feature: wanted = cds_feature.lower() match = next( (f for f in cds_features if wanted in _feature_label(f).lower()), None, ) if match is None: available = ", ".join(_feature_label(f) for f in cds_features) raise SequenceValidationError( f"No CDS feature matching {cds_feature!r} in {path.name}. " f"Available CDS: {available}" ) chosen = match elif cds_features: non_marker = [f for f in cds_features if not _is_selection_marker(_feature_label(f))] pool = non_marker if non_marker else cds_features chosen = max(pool, key=lambda f: int(f.location.end) - int(f.location.start)) else: return _longest_orf_record(str(record.seq), identifier=record.id or path.stem) cds_dna = str(chosen.extract(record.seq)).upper().replace("U", "T") label = _feature_label(chosen) logger.info( "Selected CDS feature '%s' (%d nt) from %s.", label, len(cds_dna), path.name ) return translate_dna( cds_dna, identifier=str(label), require_start=False, require_stop=False ) @dataclass class OrfCandidate: """One ATG-bounded ORF discovered by a 6-frame scan of raw DNA.""" label: str # ORF_1, ORF_2, … assigned in descending length order. frame: int # +1/+2/+3 for forward strand, -1/-2/-3 for reverse. start_nt: int # 0-indexed start on the FORWARD strand. end_nt: int # 0-indexed exclusive end on the forward strand. dna: str # The ORF nucleotide sequence (ATG…stop). protein: str # Translated protein (no trailing *). def to_summary(self) -> Dict[str, object]: return { "label": self.label, "length_nt": len(self.dna), "frame": self.frame, "protein_length": len(self.protein), } def find_orfs_in_dna(dna: str, min_aa: int = 30) -> List[OrfCandidate]: """Scan all 6 frames for ATG…stop ORFs at least ``min_aa`` residues long. Returns ORFs sorted by descending protein length, labeled ``ORF_1``, ``ORF_2``, … This is the fallback when the user pastes a plasmid sequence (or any long DNA with multiple stop codons across frames) — we can't treat the whole thing as one CDS, but we can offer the ORFs we find as a chooser, the same way we do for annotated `.gb` files. """ cleaned = _clean(dna) if not cleaned or not set(cleaned).issubset(UNAMBIGUOUS_NT): return [] forward = cleaned reverse = str(Seq(cleaned).reverse_complement()) raw: List[OrfCandidate] = [] for strand_idx, seq in enumerate((forward, reverse)): for offset in range(3): i = offset while i + 3 <= len(seq): if seq[i : i + 3] != "ATG": i += 3 continue j = i while j + 3 <= len(seq): if seq[j : j + 3] in STOP_CODONS: orf_dna = seq[i : j + 3] orf_protein = str(Seq(orf_dna).translate(table=1, to_stop=True)) if len(orf_protein) >= min_aa: if strand_idx == 0: start = i end = j + 3 frame = (i % 3) + 1 else: end = len(cleaned) - i start = len(cleaned) - (j + 3) frame = -((i % 3) + 1) raw.append( OrfCandidate( label="", frame=frame, start_nt=start, end_nt=end, dna=orf_dna, protein=orf_protein, ) ) i = j + 3 break j += 3 else: break raw.sort(key=lambda o: -len(o.protein)) for idx, orf in enumerate(raw): orf.label = f"ORF_{idx + 1}" return raw def _longest_orf_record(dna: str, identifier: str) -> SequenceRecord: """Scan all 6 reading frames and translate the longest ATG-bounded ORF.""" cleaned = _clean(dna) if any(ch not in UNAMBIGUOUS_NT for ch in cleaned): raise SequenceValidationError( "Sequence contains ambiguous nucleotides; cannot find a clean ORF." ) candidates: List[str] = [] rev = str(Seq(cleaned).reverse_complement()) for strand_seq in (cleaned, rev): for offset in range(3): i = offset while i + 3 <= len(strand_seq): if strand_seq[i : i + 3] == "ATG": j = i while j + 3 <= len(strand_seq): codon = strand_seq[j : j + 3] if codon in STOP_CODONS: candidates.append(strand_seq[i : j + 3]) i = j + 3 break j += 3 else: i = j # unterminated ORF — skip. else: i += 3 if not candidates: raise SequenceValidationError( "Could not find any ORF (ATG...stop) in any of the six reading frames." ) best = max(candidates, key=len) logger.info( "No CDS feature available; selected longest ORF (%d nt) by 6-frame scan.", len(best), ) return translate_dna( best, identifier=identifier, require_start=True, require_stop=True ) def parse_protein(seq: str, identifier: str = "query") -> SequenceRecord: """Validate a raw amino-acid sequence and build a :class:`SequenceRecord`. No DNA is attached — Phase 4 will reverse-translate from ``.protein``. """ cleaned = _clean_protein(seq) if not cleaned: raise SequenceValidationError("Empty protein sequence.") bad = set(cleaned) - AA_ALPHABET if bad: raise SequenceValidationError( f"Protein contains non-canonical residues: {sorted(bad)!r}. " "Only the 20 standard amino acids (ACDEFGHIKLMNPQRSTVWY) are supported." ) logger.info("Parsed protein %s: %d aa.", identifier, len(cleaned)) return SequenceRecord( identifier=identifier, dna="", protein=cleaned, has_stop=False, nt_to_aa={}, aa_to_nt={}, ) def _looks_like_path(s: Union[str, Path]) -> bool: """Cheap pre-check: should we even attempt a filesystem stat on ``s``? A long pasted sequence (thousands of chars, possibly multi-line) would blow up :meth:`pathlib.Path.is_file` with ``OSError: File name too long`` on macOS (PATH_MAX = 1024, NAME_MAX = 255). Reject anything that obviously isn't a path before we hit the OS. """ if isinstance(s, Path): return True if not isinstance(s, str): return False if not s or len(s) > 1024: return False if "\n" in s or "\r" in s: return False try: return Path(s).is_file() except OSError: return False def parse_input( source: Union[str, Path], *, require_start: bool = True, require_stop: bool = True, cds_feature: Optional[str] = None, ) -> SequenceRecord: """Parse a sequence from a path, FASTA text, raw DNA, or raw protein. Detection order: 1. If ``source`` is a Path or names an existing file, read it (FASTA-aware, falls back to raw text). File contents are then auto-classified as DNA vs protein by :func:`looks_like_protein`. 2. Else if the string starts with ``>``, parse as inline FASTA and classify the first record's sequence. 3. Otherwise treat as a raw sequence and classify. """ # ------------------------------------------------------------- file input if _looks_like_path(source): path = Path(source) suffix = path.suffix.lower() # Structured formats — SnapGene/GenBank/EMBL — go through their own # parser because they're either binary (.dna) or have rich metadata # that text-only handling would munge. if suffix in {".dna", ".gb", ".gbk", ".genbank", ".embl"}: return _parse_structured_file(path, cds_feature=cds_feature) text = path.read_text() # FASTA-aware: if the file looks like FASTA, parse it; else treat # contents as one raw sequence whose identifier defaults to the stem. if text.lstrip().startswith(">"): record = next(SeqIO.parse(io.StringIO(text), "fasta"), None) if record is None: raise SequenceValidationError(f"No FASTA records found in {path}.") raw_seq = str(record.seq) ident = record.id or path.stem else: raw_seq = text ident = path.stem if looks_like_protein(raw_seq): return parse_protein(raw_seq, identifier=ident) return translate_dna( raw_seq, identifier=ident, require_start=require_start, require_stop=require_stop, ) # ----------------------------------------------------------- string input text = str(source) if text.lstrip().startswith(">"): record = next(SeqIO.parse(io.StringIO(text), "fasta"), None) if record is None: raise SequenceValidationError("Inline FASTA contained no records.") raw_seq = str(record.seq) ident = record.id or "query" if looks_like_protein(raw_seq): return parse_protein(raw_seq, identifier=ident) return translate_dna( raw_seq, identifier=ident, require_start=require_start, require_stop=require_stop, ) if looks_like_protein(text): return parse_protein(text, identifier="query") return translate_dna( text, identifier="query", require_start=require_start, require_stop=require_stop ) def list_codons(dna: str) -> List[str]: """Split a CDS into its in-frame codons (no validation; call ``validate_dna`` first).""" return [dna[i : i + 3] for i in range(0, len(dna), 3)]