syntheogenesis / dee /core /sequence.py
WINTER4000's picture
Initial deploy · DEE Flask app via Docker SDK
060bb47 verified
"""Phase 1 — Sequence parsing, validation, and translation.
Accepts a raw nucleotide string or a FASTA file/string and produces a
:class:`SequenceRecord` carrying the validated DNA, the translated amino-acid
sequence, and an index map between nucleotide coordinates and AA coordinates.
The translation step uses the NCBI standard genetic code (Table 1) via
Biopython. Ambiguous IUPAC codons are rejected at translation time because
PLM scoring requires a fully determined wild-type protein.
"""
from __future__ import annotations
import io
import logging
import re
from dataclasses import dataclass, field
from pathlib import Path
from typing import Dict, List, Optional, Tuple, Union
from Bio import SeqIO
from Bio.Data.CodonTable import TranslationError
from Bio.Seq import Seq
logger = logging.getLogger(__name__)
# IUPAC nucleotide alphabet (unambiguous + ambiguous).
IUPAC_NT = set("ACGTURYSWKMBDHVN")
UNAMBIGUOUS_NT = set("ACGT")
START_CODONS = {"ATG"}
STOP_CODONS = {"TAA", "TAG", "TGA"}
# Canonical 20 amino acids + stop. Used when the input is a protein sequence
# rather than a CDS (raw paste or .faa/.pep file).
AA_ALPHABET = set("ACDEFGHIKLMNPQRSTVWY")
AA_WITH_STOP = AA_ALPHABET | {"*"}
class SequenceValidationError(ValueError):
"""Raised when an input DNA sequence violates a biological precondition.
Carries the 0-indexed nucleotide position of the offending character (when
known), so the UI can map it back to a line/column in the user's paste.
"""
def __init__(
self,
message: str,
*,
nt_position: Optional[int] = None,
code: Optional[str] = None,
) -> None:
super().__init__(message)
self.nt_position = nt_position
self.code = code
@dataclass
class SequenceRecord:
"""Container for a validated CDS and its translation.
Attributes:
identifier: FASTA header or user-supplied label.
dna: Validated coding DNA sequence (5' -> 3', uppercase).
protein: Translated amino-acid sequence, stop codon stripped.
has_stop: Whether the source CDS ended in a canonical stop codon.
nt_to_aa: Mapping from 0-indexed nucleotide position to 0-indexed AA
position. Stop-codon nucleotides map to ``None``.
aa_to_nt: Mapping from 0-indexed AA position to the triplet of
0-indexed nucleotide positions encoding it.
"""
identifier: str
dna: str
protein: str
has_stop: bool
nt_to_aa: Dict[int, Optional[int]] = field(default_factory=dict)
aa_to_nt: Dict[int, Tuple[int, int, int]] = field(default_factory=dict)
def __len__(self) -> int:
return len(self.protein)
def _clean(seq: str) -> str:
"""Strip whitespace, normalize uracil to thymine, uppercase."""
return re.sub(r"\s+", "", seq).upper().replace("U", "T")
def _clean_protein(seq: str) -> str:
"""Strip whitespace, drop trailing stop symbol, uppercase. No U→T mapping."""
cleaned = re.sub(r"\s+", "", seq).upper().rstrip("*")
return cleaned
def looks_like_protein(seq: str) -> bool:
"""Heuristic: does ``seq`` read like a protein rather than DNA?
A short DNA string (e.g. "ATGCAT") is fully contained in the AA alphabet,
so we can't use AA-only characters as the discriminator. Instead we check
whether *any* character is non-nucleic-acid — the presence of E, F, I, L,
P, Q, etc. is a clear AA signal. If every character is in {A,C,G,T,U,N},
treat it as DNA.
"""
cleaned = re.sub(r"\s+", "", seq).upper().rstrip("*")
if not cleaned:
return False
nt_safe = set("ACGTUN")
return not set(cleaned).issubset(nt_safe)
def validate_dna(seq: str, *, require_start: bool = True, require_stop: bool = True) -> str:
"""Validate that ``seq`` is a plausible CDS.
Checks:
* Every character is a valid IUPAC nucleotide symbol.
* Length is a positive multiple of 3.
* The first codon is ATG (start) — disable with ``require_start=False``.
* The final codon is a stop codon — disable with ``require_stop=False``.
Returns the cleaned, uppercase DNA string. Raises
:class:`SequenceValidationError` on any failure.
"""
cleaned = _clean(seq)
if not cleaned:
raise SequenceValidationError("Empty sequence after cleaning.")
bad = set(cleaned) - IUPAC_NT
if bad:
# Find the first offending character so we can point at it.
bad_pos = next((idx for idx, ch in enumerate(cleaned) if ch in bad), None)
raise SequenceValidationError(
f"Sequence contains non-IUPAC characters: {sorted(bad)!r}",
nt_position=bad_pos,
code="invalid_char",
)
if len(cleaned) % 3 != 0:
raise SequenceValidationError(
f"Sequence length ({len(cleaned)} nt) is not a multiple of 3; "
"cannot translate as a CDS.",
nt_position=len(cleaned) - (len(cleaned) % 3),
code="bad_length",
)
if require_start and cleaned[:3] not in START_CODONS:
raise SequenceValidationError(
f"Sequence does not begin with a start codon (saw {cleaned[:3]!r}).",
nt_position=0,
code="no_start",
)
if require_stop and cleaned[-3:] not in STOP_CODONS:
raise SequenceValidationError(
f"Sequence does not end with a stop codon (saw {cleaned[-3:]!r}).",
nt_position=len(cleaned) - 3,
code="no_stop",
)
# Reject *internal* stop codons. Strip ALL trailing stops iteratively —
# many expression cassettes use a double or triple TAA/TGA for fail-safe
# termination, and those tandem stops are not "premature".
body = cleaned
while len(body) >= 6 and body[-3:] in STOP_CODONS:
body = body[:-3]
for i in range(0, len(body), 3):
if body[i : i + 3] in STOP_CODONS:
raise SequenceValidationError(
f"Premature stop codon at nucleotide position {i} ({body[i:i+3]}).",
nt_position=i,
code="premature_stop",
)
return cleaned
def _build_index_maps(
n_nt: int, has_stop: bool
) -> Tuple[Dict[int, Optional[int]], Dict[int, Tuple[int, int, int]]]:
"""Construct bidirectional nt<->aa coordinate maps for a CDS of length ``n_nt``."""
n_codons = n_nt // 3
n_aa = n_codons - 1 if has_stop else n_codons
nt_to_aa: Dict[int, Optional[int]] = {}
aa_to_nt: Dict[int, Tuple[int, int, int]] = {}
for aa_idx in range(n_aa):
triplet = (aa_idx * 3, aa_idx * 3 + 1, aa_idx * 3 + 2)
aa_to_nt[aa_idx] = triplet
for nt in triplet:
nt_to_aa[nt] = aa_idx
if has_stop:
for nt in range(n_aa * 3, n_nt):
nt_to_aa[nt] = None
return nt_to_aa, aa_to_nt
def translate_dna(
dna: str,
identifier: str = "query",
*,
require_start: bool = True,
require_stop: bool = True,
) -> SequenceRecord:
"""Validate and translate ``dna`` into a :class:`SequenceRecord`.
Uses NCBI Table 1 (standard) genetic code via Biopython. Ambiguous codons
(containing IUPAC ambiguity codes like N or R) raise
:class:`SequenceValidationError` because PLM scoring requires a determined
protein sequence.
"""
cleaned = validate_dna(dna, require_start=require_start, require_stop=require_stop)
# Treat a terminal stop as a stop regardless of the require_stop flag: when
# the user pastes a CDS but disables stop checking, we still want a clean
# protein (no trailing "*") for downstream scoring.
has_stop = cleaned[-3:] in STOP_CODONS
try:
protein = str(Seq(cleaned).translate(table=1, to_stop=has_stop, cds=False))
except TranslationError as exc:
raise SequenceValidationError(f"Translation failed: {exc}") from exc
if any(ch not in UNAMBIGUOUS_NT for ch in cleaned):
raise SequenceValidationError(
"Sequence contains ambiguous IUPAC codes; PLM scoring requires a "
"fully determined wild-type. Resolve ambiguity before scoring."
)
nt_to_aa, aa_to_nt = _build_index_maps(len(cleaned), has_stop=has_stop)
logger.info(
"Translated %s: %d nt -> %d aa (terminal stop=%s)",
identifier,
len(cleaned),
len(protein),
has_stop,
)
return SequenceRecord(
identifier=identifier,
dna=cleaned,
protein=protein,
has_stop=has_stop,
nt_to_aa=nt_to_aa,
aa_to_nt=aa_to_nt,
)
# Common selection-marker name patterns. When a plasmid annotation contains
# any of these substrings we deprioritize that CDS as a default pick, because
# the user almost certainly wants to evolve their gene of interest rather
# than the antibiotic resistance gene that came with the cloning vector.
_SELECTION_MARKERS = {
"aph", "neo", "npt", "kan", # kanamycin / G418
"bla", "amp", # ampicillin
"cat", "cmr", "chlor", # chloramphenicol
"tet", # tetracycline
"hph", "hyg", # hygromycin
"ble", "zeo", # zeocin / phleomycin
"pac", "puro", # puromycin
"sm", "sptr", "aada", # streptomycin / spectinomycin
}
def _feature_label(feature) -> str:
"""Best-effort human-readable label for a SeqRecord feature."""
for key in ("gene", "product", "label", "note"):
vals = feature.qualifiers.get(key)
if vals:
return str(vals[0])
return feature.type or "unnamed"
def _is_selection_marker(label: str) -> bool:
low = label.lower()
return any(marker in low for marker in _SELECTION_MARKERS)
def list_cds_features(path: Path) -> List[Tuple[str, int]]:
"""Return [(label, length_nt), ...] for every CDS feature in a structured file.
Useful for the desktop launcher: it can present these as a chooser so the
user explicitly picks the gene to evolve (rather than having the engine
guess the wrong CDS, e.g. the kanamycin resistance marker).
"""
suffix = path.suffix.lower()
fmt = _structured_format_for(suffix)
record = next(SeqIO.parse(str(path), fmt), None)
if record is None:
return []
out: List[Tuple[str, int]] = []
for f in record.features:
if f.type == "CDS":
length = int(f.location.end) - int(f.location.start)
out.append((_feature_label(f), length))
return out
def _structured_format_for(suffix: str) -> str:
return {
".dna": "snapgene",
".gb": "genbank",
".gbk": "genbank",
".genbank": "genbank",
".embl": "embl",
}[suffix]
def _parse_structured_file(
path: Path, *, cds_feature: Optional[str] = None
) -> SequenceRecord:
"""Parse a GenBank, SnapGene, or EMBL file and pick one CDS to evolve.
Selection rules (first match wins):
1. If ``cds_feature`` is given, find the CDS whose label contains that
substring (case-insensitive). Raises if no match — better to fail
loudly than silently evolve the wrong gene.
2. Otherwise pick the longest CDS that is NOT a recognized selection
marker (kanR/ampR/cmR/etc.). Selection markers are skipped first so
the user's gene of interest wins by default in a typical plasmid.
3. If every CDS looks like a selection marker, fall back to the longest.
4. If no CDS features exist at all, scan the six reading frames for the
longest ORF (ATG...stop).
"""
fmt = _structured_format_for(path.suffix.lower())
record = next(SeqIO.parse(str(path), fmt), None)
if record is None:
raise SequenceValidationError(f"No records found in {path}.")
cds_features = [f for f in record.features if f.type == "CDS"]
if cds_features and cds_feature:
wanted = cds_feature.lower()
match = next(
(f for f in cds_features if wanted in _feature_label(f).lower()),
None,
)
if match is None:
available = ", ".join(_feature_label(f) for f in cds_features)
raise SequenceValidationError(
f"No CDS feature matching {cds_feature!r} in {path.name}. "
f"Available CDS: {available}"
)
chosen = match
elif cds_features:
non_marker = [f for f in cds_features if not _is_selection_marker(_feature_label(f))]
pool = non_marker if non_marker else cds_features
chosen = max(pool, key=lambda f: int(f.location.end) - int(f.location.start))
else:
return _longest_orf_record(str(record.seq), identifier=record.id or path.stem)
cds_dna = str(chosen.extract(record.seq)).upper().replace("U", "T")
label = _feature_label(chosen)
logger.info(
"Selected CDS feature '%s' (%d nt) from %s.", label, len(cds_dna), path.name
)
return translate_dna(
cds_dna, identifier=str(label), require_start=False, require_stop=False
)
@dataclass
class OrfCandidate:
"""One ATG-bounded ORF discovered by a 6-frame scan of raw DNA."""
label: str # ORF_1, ORF_2, … assigned in descending length order.
frame: int # +1/+2/+3 for forward strand, -1/-2/-3 for reverse.
start_nt: int # 0-indexed start on the FORWARD strand.
end_nt: int # 0-indexed exclusive end on the forward strand.
dna: str # The ORF nucleotide sequence (ATG…stop).
protein: str # Translated protein (no trailing *).
def to_summary(self) -> Dict[str, object]:
return {
"label": self.label,
"length_nt": len(self.dna),
"frame": self.frame,
"protein_length": len(self.protein),
}
def find_orfs_in_dna(dna: str, min_aa: int = 30) -> List[OrfCandidate]:
"""Scan all 6 frames for ATG…stop ORFs at least ``min_aa`` residues long.
Returns ORFs sorted by descending protein length, labeled ``ORF_1``,
``ORF_2``, … This is the fallback when the user pastes a plasmid sequence
(or any long DNA with multiple stop codons across frames) — we can't
treat the whole thing as one CDS, but we can offer the ORFs we find as
a chooser, the same way we do for annotated `.gb` files.
"""
cleaned = _clean(dna)
if not cleaned or not set(cleaned).issubset(UNAMBIGUOUS_NT):
return []
forward = cleaned
reverse = str(Seq(cleaned).reverse_complement())
raw: List[OrfCandidate] = []
for strand_idx, seq in enumerate((forward, reverse)):
for offset in range(3):
i = offset
while i + 3 <= len(seq):
if seq[i : i + 3] != "ATG":
i += 3
continue
j = i
while j + 3 <= len(seq):
if seq[j : j + 3] in STOP_CODONS:
orf_dna = seq[i : j + 3]
orf_protein = str(Seq(orf_dna).translate(table=1, to_stop=True))
if len(orf_protein) >= min_aa:
if strand_idx == 0:
start = i
end = j + 3
frame = (i % 3) + 1
else:
end = len(cleaned) - i
start = len(cleaned) - (j + 3)
frame = -((i % 3) + 1)
raw.append(
OrfCandidate(
label="",
frame=frame,
start_nt=start,
end_nt=end,
dna=orf_dna,
protein=orf_protein,
)
)
i = j + 3
break
j += 3
else:
break
raw.sort(key=lambda o: -len(o.protein))
for idx, orf in enumerate(raw):
orf.label = f"ORF_{idx + 1}"
return raw
def _longest_orf_record(dna: str, identifier: str) -> SequenceRecord:
"""Scan all 6 reading frames and translate the longest ATG-bounded ORF."""
cleaned = _clean(dna)
if any(ch not in UNAMBIGUOUS_NT for ch in cleaned):
raise SequenceValidationError(
"Sequence contains ambiguous nucleotides; cannot find a clean ORF."
)
candidates: List[str] = []
rev = str(Seq(cleaned).reverse_complement())
for strand_seq in (cleaned, rev):
for offset in range(3):
i = offset
while i + 3 <= len(strand_seq):
if strand_seq[i : i + 3] == "ATG":
j = i
while j + 3 <= len(strand_seq):
codon = strand_seq[j : j + 3]
if codon in STOP_CODONS:
candidates.append(strand_seq[i : j + 3])
i = j + 3
break
j += 3
else:
i = j # unterminated ORF — skip.
else:
i += 3
if not candidates:
raise SequenceValidationError(
"Could not find any ORF (ATG...stop) in any of the six reading frames."
)
best = max(candidates, key=len)
logger.info(
"No CDS feature available; selected longest ORF (%d nt) by 6-frame scan.",
len(best),
)
return translate_dna(
best, identifier=identifier, require_start=True, require_stop=True
)
def parse_protein(seq: str, identifier: str = "query") -> SequenceRecord:
"""Validate a raw amino-acid sequence and build a :class:`SequenceRecord`.
No DNA is attached — Phase 4 will reverse-translate from ``.protein``.
"""
cleaned = _clean_protein(seq)
if not cleaned:
raise SequenceValidationError("Empty protein sequence.")
bad = set(cleaned) - AA_ALPHABET
if bad:
raise SequenceValidationError(
f"Protein contains non-canonical residues: {sorted(bad)!r}. "
"Only the 20 standard amino acids (ACDEFGHIKLMNPQRSTVWY) are supported."
)
logger.info("Parsed protein %s: %d aa.", identifier, len(cleaned))
return SequenceRecord(
identifier=identifier,
dna="",
protein=cleaned,
has_stop=False,
nt_to_aa={},
aa_to_nt={},
)
def _looks_like_path(s: Union[str, Path]) -> bool:
"""Cheap pre-check: should we even attempt a filesystem stat on ``s``?
A long pasted sequence (thousands of chars, possibly multi-line) would
blow up :meth:`pathlib.Path.is_file` with ``OSError: File name too long``
on macOS (PATH_MAX = 1024, NAME_MAX = 255). Reject anything that obviously
isn't a path before we hit the OS.
"""
if isinstance(s, Path):
return True
if not isinstance(s, str):
return False
if not s or len(s) > 1024:
return False
if "\n" in s or "\r" in s:
return False
try:
return Path(s).is_file()
except OSError:
return False
def parse_input(
source: Union[str, Path],
*,
require_start: bool = True,
require_stop: bool = True,
cds_feature: Optional[str] = None,
) -> SequenceRecord:
"""Parse a sequence from a path, FASTA text, raw DNA, or raw protein.
Detection order:
1. If ``source`` is a Path or names an existing file, read it (FASTA-aware,
falls back to raw text). File contents are then auto-classified as DNA
vs protein by :func:`looks_like_protein`.
2. Else if the string starts with ``>``, parse as inline FASTA and
classify the first record's sequence.
3. Otherwise treat as a raw sequence and classify.
"""
# ------------------------------------------------------------- file input
if _looks_like_path(source):
path = Path(source)
suffix = path.suffix.lower()
# Structured formats — SnapGene/GenBank/EMBL — go through their own
# parser because they're either binary (.dna) or have rich metadata
# that text-only handling would munge.
if suffix in {".dna", ".gb", ".gbk", ".genbank", ".embl"}:
return _parse_structured_file(path, cds_feature=cds_feature)
text = path.read_text()
# FASTA-aware: if the file looks like FASTA, parse it; else treat
# contents as one raw sequence whose identifier defaults to the stem.
if text.lstrip().startswith(">"):
record = next(SeqIO.parse(io.StringIO(text), "fasta"), None)
if record is None:
raise SequenceValidationError(f"No FASTA records found in {path}.")
raw_seq = str(record.seq)
ident = record.id or path.stem
else:
raw_seq = text
ident = path.stem
if looks_like_protein(raw_seq):
return parse_protein(raw_seq, identifier=ident)
return translate_dna(
raw_seq,
identifier=ident,
require_start=require_start,
require_stop=require_stop,
)
# ----------------------------------------------------------- string input
text = str(source)
if text.lstrip().startswith(">"):
record = next(SeqIO.parse(io.StringIO(text), "fasta"), None)
if record is None:
raise SequenceValidationError("Inline FASTA contained no records.")
raw_seq = str(record.seq)
ident = record.id or "query"
if looks_like_protein(raw_seq):
return parse_protein(raw_seq, identifier=ident)
return translate_dna(
raw_seq,
identifier=ident,
require_start=require_start,
require_stop=require_stop,
)
if looks_like_protein(text):
return parse_protein(text, identifier="query")
return translate_dna(
text, identifier="query", require_start=require_start, require_stop=require_stop
)
def list_codons(dna: str) -> List[str]:
"""Split a CDS into its in-frame codons (no validation; call ``validate_dna`` first)."""
return [dna[i : i + 3] for i in range(0, len(dna), 3)]