Spaces:
Running
Running
| """Phase 4 — Reverse translation, host-specific codon optimization, and | |
| restriction-site scrubbing for downstream Golden Gate / synthesis assembly. | |
| For each amino acid we choose the codon with the highest empirical frequency | |
| in the chosen host. The user can swap in a different host's table without | |
| touching the rest of the pipeline. | |
| After the initial reverse-translation we scan the DNA — and its reverse | |
| complement, because Type IIS enzymes like BsaI/BsmBI cut on either strand — | |
| for forbidden sites (BsaI ``GGTCTC``, BsmBI ``CGTCTC``, NotI ``GCGGCCGC``). | |
| When a hit is found we introduce a *synonymous* mutation in an overlapping | |
| codon (a different codon for the same amino acid) so the encoded protein is | |
| unchanged but the restriction enzyme recognition pattern is destroyed. This | |
| is the standard practice for synthesis-vendor DNA prep: silent edits to keep | |
| cloning enzymes from chewing the insert apart at unintended positions. | |
| """ | |
| from __future__ import annotations | |
| import logging | |
| import re | |
| from dataclasses import dataclass | |
| from typing import Any, Dict, List, Optional, Tuple | |
| import pandas as pd | |
| from Bio.Seq import Seq | |
| from Bio.SeqUtils import MeltingTemp as _mt | |
| from dee.optimizer.search import Variant, apply_variant | |
| logger = logging.getLogger(__name__) | |
| # --------------------------------------------------------------------- tables | |
| # E. coli K12 highly-expressed codon preferences. Values are codon-frequency | |
| # weights per amino acid (sourced from the standard Kazusa / HEG-based tables | |
| # commonly used in synthesis-vendor optimizers). The pipeline only needs the | |
| # *relative* ranking, so exact percentages are unnecessary. | |
| _E_COLI_K12: Dict[str, Dict[str, float]] = { | |
| "A": {"GCG": 0.36, "GCC": 0.27, "GCA": 0.21, "GCT": 0.16}, | |
| "R": {"CGT": 0.38, "CGC": 0.40, "CGG": 0.10, "CGA": 0.06, "AGA": 0.04, "AGG": 0.02}, | |
| "N": {"AAC": 0.55, "AAT": 0.45}, | |
| "D": {"GAT": 0.63, "GAC": 0.37}, | |
| "C": {"TGC": 0.55, "TGT": 0.45}, | |
| "Q": {"CAG": 0.65, "CAA": 0.35}, | |
| "E": {"GAA": 0.68, "GAG": 0.32}, | |
| "G": {"GGT": 0.34, "GGC": 0.40, "GGA": 0.11, "GGG": 0.15}, | |
| "H": {"CAT": 0.57, "CAC": 0.43}, | |
| "I": {"ATT": 0.51, "ATC": 0.42, "ATA": 0.07}, | |
| "L": {"CTG": 0.50, "CTC": 0.10, "CTT": 0.10, "CTA": 0.04, "TTA": 0.13, "TTG": 0.13}, | |
| "K": {"AAA": 0.74, "AAG": 0.26}, | |
| "M": {"ATG": 1.00}, | |
| "F": {"TTT": 0.58, "TTC": 0.42}, | |
| "P": {"CCG": 0.52, "CCA": 0.19, "CCT": 0.16, "CCC": 0.12}, | |
| "S": {"AGC": 0.28, "TCT": 0.17, "TCC": 0.15, "TCA": 0.14, "AGT": 0.15, "TCG": 0.14}, | |
| "T": {"ACC": 0.44, "ACG": 0.27, "ACA": 0.13, "ACT": 0.17}, | |
| "W": {"TGG": 1.00}, | |
| "Y": {"TAT": 0.59, "TAC": 0.41}, | |
| "V": {"GTG": 0.37, "GTT": 0.28, "GTC": 0.20, "GTA": 0.15}, | |
| "*": {"TAA": 0.61, "TAG": 0.09, "TGA": 0.30}, | |
| } | |
| _S_CEREVISIAE: Dict[str, Dict[str, float]] = { | |
| "A": {"GCT": 0.38, "GCC": 0.22, "GCA": 0.29, "GCG": 0.11}, | |
| "R": {"AGA": 0.48, "AGG": 0.21, "CGT": 0.14, "CGC": 0.06, "CGA": 0.07, "CGG": 0.04}, | |
| "N": {"AAT": 0.59, "AAC": 0.41}, | |
| "D": {"GAT": 0.65, "GAC": 0.35}, | |
| "C": {"TGT": 0.63, "TGC": 0.37}, | |
| "Q": {"CAA": 0.69, "CAG": 0.31}, | |
| "E": {"GAA": 0.71, "GAG": 0.29}, | |
| "G": {"GGT": 0.47, "GGC": 0.19, "GGA": 0.22, "GGG": 0.12}, | |
| "H": {"CAT": 0.64, "CAC": 0.36}, | |
| "I": {"ATT": 0.46, "ATC": 0.26, "ATA": 0.27}, | |
| "L": {"TTA": 0.28, "TTG": 0.29, "CTT": 0.13, "CTC": 0.06, "CTA": 0.14, "CTG": 0.11}, | |
| "K": {"AAA": 0.58, "AAG": 0.42}, | |
| "M": {"ATG": 1.00}, | |
| "F": {"TTT": 0.59, "TTC": 0.41}, | |
| "P": {"CCT": 0.31, "CCC": 0.15, "CCA": 0.42, "CCG": 0.12}, | |
| "S": {"TCT": 0.26, "TCC": 0.16, "TCA": 0.21, "TCG": 0.10, "AGT": 0.16, "AGC": 0.11}, | |
| "T": {"ACT": 0.35, "ACC": 0.22, "ACA": 0.30, "ACG": 0.14}, | |
| "W": {"TGG": 1.00}, | |
| "Y": {"TAT": 0.56, "TAC": 0.44}, | |
| "V": {"GTT": 0.39, "GTC": 0.21, "GTA": 0.21, "GTG": 0.19}, | |
| "*": {"TAA": 0.48, "TAG": 0.23, "TGA": 0.30}, | |
| } | |
| _H_SAPIENS: Dict[str, Dict[str, float]] = { | |
| "A": {"GCT": 0.27, "GCC": 0.40, "GCA": 0.23, "GCG": 0.11}, | |
| "R": {"CGT": 0.08, "CGC": 0.19, "CGA": 0.11, "CGG": 0.21, "AGA": 0.20, "AGG": 0.20}, | |
| "N": {"AAT": 0.46, "AAC": 0.54}, | |
| "D": {"GAT": 0.46, "GAC": 0.54}, | |
| "C": {"TGT": 0.45, "TGC": 0.55}, | |
| "Q": {"CAA": 0.25, "CAG": 0.75}, | |
| "E": {"GAA": 0.42, "GAG": 0.58}, | |
| "G": {"GGT": 0.16, "GGC": 0.34, "GGA": 0.25, "GGG": 0.25}, | |
| "H": {"CAT": 0.41, "CAC": 0.59}, | |
| "I": {"ATT": 0.36, "ATC": 0.48, "ATA": 0.16}, | |
| "L": {"TTA": 0.07, "TTG": 0.13, "CTT": 0.13, "CTC": 0.20, "CTA": 0.07, "CTG": 0.41}, | |
| "K": {"AAA": 0.42, "AAG": 0.58}, | |
| "M": {"ATG": 1.00}, | |
| "F": {"TTT": 0.45, "TTC": 0.55}, | |
| "P": {"CCT": 0.28, "CCC": 0.33, "CCA": 0.27, "CCG": 0.11}, | |
| "S": {"TCT": 0.15, "TCC": 0.22, "TCA": 0.15, "TCG": 0.06, "AGT": 0.15, "AGC": 0.24}, | |
| "T": {"ACT": 0.24, "ACC": 0.36, "ACA": 0.28, "ACG": 0.12}, | |
| "W": {"TGG": 1.00}, | |
| "Y": {"TAT": 0.43, "TAC": 0.57}, | |
| "V": {"GTT": 0.18, "GTC": 0.24, "GTA": 0.11, "GTG": 0.47}, | |
| "*": {"TAA": 0.28, "TAG": 0.20, "TGA": 0.52}, | |
| } | |
| CODON_USAGE_TABLES: Dict[str, Dict[str, Dict[str, float]]] = { | |
| "e_coli": _E_COLI_K12, | |
| "ecoli": _E_COLI_K12, | |
| "yeast": _S_CEREVISIAE, | |
| "s_cerevisiae": _S_CEREVISIAE, | |
| "human": _H_SAPIENS, | |
| "h_sapiens": _H_SAPIENS, | |
| } | |
| # Type IIS / Golden Gate-relevant sites we always want to scrub. | |
| DEFAULT_FORBIDDEN_SITES: Dict[str, str] = { | |
| "BsaI": "GGTCTC", | |
| "BsmBI": "CGTCTC", | |
| "NotI": "GCGGCCGC", | |
| } | |
| # ---------------------------------------------------------------- helpers | |
| def _ranked_codons(usage: Dict[str, float]) -> List[str]: | |
| return [c for c, _ in sorted(usage.items(), key=lambda kv: kv[1], reverse=True)] | |
| def _best_codon(aa: str, table: Dict[str, Dict[str, float]]) -> str: | |
| if aa not in table: | |
| raise ValueError(f"Amino acid {aa!r} not present in codon usage table.") | |
| return _ranked_codons(table[aa])[0] | |
| def _reverse_complement(dna: str) -> str: | |
| return str(Seq(dna).reverse_complement()) | |
| def _resolve_table(host: str) -> Dict[str, Dict[str, float]]: | |
| key = host.lower().replace(".", "_").replace(" ", "_") | |
| if key not in CODON_USAGE_TABLES: | |
| raise ValueError( | |
| f"Unknown host {host!r}. Known: {sorted(set(CODON_USAGE_TABLES))}." | |
| ) | |
| return CODON_USAGE_TABLES[key] | |
| # ---------------------------------------------------------------- API | |
| def reverse_translate( | |
| protein: str, | |
| host: str = "e_coli", | |
| *, | |
| append_stop: bool = True, | |
| ) -> str: | |
| """Reverse-translate ``protein`` using the host's most-frequent codon for each AA.""" | |
| table = _resolve_table(host) | |
| codons = [_best_codon(aa, table) for aa in protein] | |
| if append_stop: | |
| codons.append(_best_codon("*", table)) | |
| return "".join(codons) | |
| def _find_all(seq: str, motif: str) -> List[int]: | |
| """Return all 0-indexed start positions of ``motif`` inside ``seq``.""" | |
| return [m.start() for m in re.finditer(f"(?={re.escape(motif)})", seq)] | |
| def _try_clear_motif_at( | |
| dna: List[str], # mutable list of codons | |
| nt_start: int, # 0-indexed nt position of the offending motif | |
| motif_len: int, | |
| protein: str, | |
| table: Dict[str, Dict[str, float]], | |
| forbidden: Dict[str, str], | |
| ) -> bool: | |
| """Attempt to silently disrupt the motif starting at ``nt_start``. | |
| Iterates over every codon that overlaps the motif and tries each | |
| synonymous alternative (ranked by host frequency, best first). Returns | |
| ``True`` on success, ``False`` if no synonymous edit can break the site. | |
| """ | |
| first_codon = nt_start // 3 | |
| last_codon = (nt_start + motif_len - 1) // 3 | |
| motif_end = nt_start + motif_len | |
| for codon_idx in range(first_codon, last_codon + 1): | |
| if codon_idx >= len(protein): | |
| continue # Stop-codon region — skip. | |
| aa = protein[codon_idx] | |
| original = dna[codon_idx] | |
| alternatives = [c for c in _ranked_codons(table[aa]) if c != original] | |
| for alt in alternatives: | |
| dna[codon_idx] = alt | |
| full = "".join(dna) | |
| # Confirm the offending motif is gone in the local window AND no | |
| # new forbidden site was created on either strand by the edit. | |
| local = full[max(0, nt_start - 7) : motif_end + 7] | |
| local_rc = _reverse_complement(local) | |
| if all( | |
| site not in local and site not in local_rc | |
| for site in forbidden.values() | |
| ): | |
| logger.debug( | |
| "Cleared motif at nt %d via synonymous edit at codon %d: %s -> %s (%s).", | |
| nt_start, | |
| codon_idx, | |
| original, | |
| alt, | |
| aa, | |
| ) | |
| return True | |
| dna[codon_idx] = original # Undo before trying the next overlapping codon. | |
| return False | |
| class CleanupReport: | |
| """Diagnostic record for one variant's restriction-site scrubbing pass.""" | |
| sites_found: Dict[str, int] | |
| sites_cleared: Dict[str, int] | |
| unresolved: List[Tuple[str, int]] # (enzyme, nt_position) | |
| def fully_clean(self) -> bool: | |
| return not self.unresolved | |
| def scrub_restriction_sites( | |
| dna: str, | |
| protein: str, | |
| host: str = "e_coli", | |
| *, | |
| forbidden_sites: Optional[Dict[str, str]] = None, | |
| ) -> Tuple[str, CleanupReport]: | |
| """Iteratively introduce synonymous mutations until no forbidden site remains. | |
| Scans both strands (Type IIS enzymes recognize asymmetric sites on either | |
| strand, so the reverse complement must also be searched). Stops when the | |
| sequence is clean or no further synonymous fix exists. | |
| """ | |
| forbidden = forbidden_sites or DEFAULT_FORBIDDEN_SITES | |
| table = _resolve_table(host) | |
| codons = [dna[i : i + 3] for i in range(0, len(dna), 3)] | |
| found_counts: Dict[str, int] = {k: 0 for k in forbidden} | |
| cleared_counts: Dict[str, int] = {k: 0 for k in forbidden} | |
| unresolved: List[Tuple[str, int]] = [] | |
| max_passes = 20 # Defensive cap; in practice 2-3 passes suffice. | |
| for _ in range(max_passes): | |
| current = "".join(codons) | |
| rc = _reverse_complement(current) | |
| any_hit = False | |
| for enzyme, motif in forbidden.items(): | |
| # Forward strand hits. | |
| for hit in _find_all(current, motif): | |
| any_hit = True | |
| found_counts[enzyme] += 1 | |
| if _try_clear_motif_at(codons, hit, len(motif), protein, table, forbidden): | |
| cleared_counts[enzyme] += 1 | |
| else: | |
| unresolved.append((enzyme, hit)) | |
| break # Restart scan after any mutation; positions shift logically. | |
| # Reverse strand hits — translate the RC coordinate back to forward. | |
| for hit_rc in _find_all(rc, motif): | |
| any_hit = True | |
| found_counts[enzyme] += 1 | |
| fwd_start = len(current) - hit_rc - len(motif) | |
| if _try_clear_motif_at( | |
| codons, fwd_start, len(motif), protein, table, forbidden | |
| ): | |
| cleared_counts[enzyme] += 1 | |
| else: | |
| unresolved.append((enzyme, fwd_start)) | |
| break | |
| if not any_hit: | |
| break | |
| final = "".join(codons) | |
| # Final sanity check: protein must be unchanged. | |
| translated = str(Seq(final[: len(protein) * 3]).translate(table=1)) | |
| if translated != protein: | |
| raise RuntimeError( | |
| "Synonymous scrubbing altered the encoded protein; aborting. " | |
| f"Expected {protein!r}, got {translated!r}." | |
| ) | |
| # Deduplicate the unresolved list (we may re-record the same stuck site | |
| # across iterations). | |
| unresolved = sorted(set(unresolved)) | |
| report = CleanupReport( | |
| sites_found=found_counts, | |
| sites_cleared=cleared_counts, | |
| unresolved=unresolved, | |
| ) | |
| return final, report | |
| def gc_content(dna: str) -> float: | |
| """GC fraction (0-100) of a DNA string. Treats only A/C/G/T as real bases — | |
| ambiguity codes or stop-codon asterisks don't count toward the denominator.""" | |
| if not dna: | |
| return 0.0 | |
| dna_u = dna.upper() | |
| bases = sum(1 for c in dna_u if c in "ACGT") | |
| if not bases: | |
| return 0.0 | |
| gc = sum(1 for c in dna_u if c in "GC") | |
| return 100.0 * gc / bases | |
| def _tm(dna: str) -> Optional[float]: | |
| """Nearest-neighbor melting temperature in °C under standard PCR salt | |
| conditions (Na⁺ 50 mM, Mg²⁺ 1.5 mM, dNTPs 0.2 mM, primer 500 nM, template | |
| 50 nM). Returns ``None`` if the sequence is too short for the NN method.""" | |
| if len(dna) < 8: | |
| return None | |
| return float( | |
| _mt.Tm_NN(dna, dnac1=500, dnac2=50, Na=50, Mg=1.5, dNTPs=0.2) | |
| ) | |
| def _has_gc_clamp(primer: str) -> bool: | |
| """True if the primer's 3' end is G or C — improves polymerase priming.""" | |
| return primer[-1:].upper() in {"G", "C"} | |
| def design_primer( | |
| template: str, | |
| *, | |
| target_tm: float = 60.0, | |
| min_len: int = 18, | |
| max_len: int = 28, | |
| ) -> Tuple[str, Optional[float]]: | |
| """Pick a primer from the 5' end of ``template`` closest to ``target_tm``. | |
| Iterates lengths in [min_len, max_len], computes Tm at each, returns the | |
| candidate whose Tm is closest to the target — preferring 3'-end G/C | |
| ("GC clamp") at the upper end of the length range. For ``min_len`` up to | |
| the polymerase-friendly default 60 °C, this typically lands at 18-22 bp. | |
| """ | |
| if not template: | |
| return "", None | |
| best: Tuple[str, Optional[float]] = (template[:min_len], None) | |
| best_score = float("inf") | |
| for length in range(min_len, max_len + 1): | |
| if length > len(template): | |
| break | |
| candidate = template[:length] | |
| tm = _tm(candidate) | |
| if tm is None: | |
| continue | |
| # Score is distance from target; small bonus for having a GC clamp. | |
| score = abs(tm - target_tm) - (0.4 if _has_gc_clamp(candidate) else 0.0) | |
| if score < best_score: | |
| best_score = score | |
| best = (candidate, tm) | |
| return best | |
| def pcr_metrics(dna: str) -> Dict[str, Any]: | |
| """Compute PCR-relevant numbers + designed primers for a CDS. | |
| The forward primer reads from the 5' end of the CDS; the reverse primer | |
| reads from the 5' end of the *reverse complement* (i.e. binds the 3' end | |
| of the coding strand). Annealing temperature is conservatively set to | |
| Tm − 5 °C of the cooler of the two primers, which is the standard rule | |
| for high-fidelity polymerases like Q5/Phusion. | |
| """ | |
| if not dna: | |
| return {} | |
| rev = str(Seq(dna).reverse_complement()) | |
| fwd_primer, fwd_tm = design_primer(dna) | |
| rev_primer, rev_tm = design_primer(rev) | |
| annealing: Optional[float] = None | |
| if fwd_tm is not None and rev_tm is not None: | |
| annealing = round(min(fwd_tm, rev_tm) - 5.0, 1) | |
| return { | |
| "length_bp": len(dna), | |
| "gc_percent": round(gc_content(dna), 1), | |
| "primer_fwd": fwd_primer, | |
| "primer_fwd_tm_c": round(fwd_tm, 1) if fwd_tm is not None else None, | |
| "primer_fwd_gc": round(gc_content(fwd_primer), 1), | |
| "primer_fwd_clamp": _has_gc_clamp(fwd_primer), | |
| "primer_rev": rev_primer, | |
| "primer_rev_tm_c": round(rev_tm, 1) if rev_tm is not None else None, | |
| "primer_rev_gc": round(gc_content(rev_primer), 1), | |
| "primer_rev_clamp": _has_gc_clamp(rev_primer), | |
| "annealing_temp_c": annealing, | |
| } | |
| def variants_to_dataframe( | |
| wt_protein: str, | |
| variants: List[Variant], | |
| host: str = "e_coli", | |
| *, | |
| forbidden_sites: Optional[Dict[str, str]] = None, | |
| ) -> pd.DataFrame: | |
| """Build the Phase-4 output table: one row per optimized multi-mutant.""" | |
| rows: List[Dict[str, object]] = [] | |
| for v in variants: | |
| mut_protein = apply_variant(wt_protein, v) | |
| raw_dna = reverse_translate(mut_protein, host=host, append_stop=True) | |
| clean_dna, report = scrub_restriction_sites( | |
| raw_dna, mut_protein + "*", host=host, forbidden_sites=forbidden_sites | |
| ) | |
| pcr = pcr_metrics(clean_dna) | |
| rows.append( | |
| { | |
| "Variant_ID": f"V{v.rank:04d}", | |
| "Mutations_AA": ",".join(v.mutation_labels), | |
| "Mutant_AA_Seq": mut_protein, | |
| "Optimized_DNA_Seq": clean_dna, | |
| "Predicted_Fitness_Score": round(v.fitness, 6), | |
| "Length_bp": pcr.get("length_bp"), | |
| "GC_Percent": pcr.get("gc_percent"), | |
| "Primer_Fwd": pcr.get("primer_fwd"), | |
| "Primer_Fwd_Tm_C": pcr.get("primer_fwd_tm_c"), | |
| "Primer_Fwd_GC_Percent": pcr.get("primer_fwd_gc"), | |
| "Primer_Rev": pcr.get("primer_rev"), | |
| "Primer_Rev_Tm_C": pcr.get("primer_rev_tm_c"), | |
| "Primer_Rev_GC_Percent": pcr.get("primer_rev_gc"), | |
| "Annealing_Temp_C": pcr.get("annealing_temp_c"), | |
| "Restriction_Sites_Found": sum(report.sites_found.values()), | |
| "Restriction_Sites_Unresolved": len(report.unresolved), | |
| } | |
| ) | |
| return pd.DataFrame(rows) | |
| def write_library_csv(df: pd.DataFrame, path: str) -> None: | |
| """Write the final library to disk. Columns match the spec.""" | |
| df.to_csv(path, index=False) | |
| logger.info("Wrote %d variants to %s.", len(df), path) | |