| """ |
| Codon-optimization analysis for an mRNA CDS. |
| |
| Goes beyond a single CAI number to show *where* codon usage helps or hurts |
| expression: |
| |
| - **Per-codon optimality** — each codon's relative adaptiveness (0–1) vs the |
| best synonymous codon for that amino acid in the host. |
| - **%MinMax profile** — the classic sliding-window measure (Clarke & Clark): |
| positive = a run of common/fast codons, negative = rare/slow codons (the |
| kind of cluster that stalls ribosomes). |
| - **Rare-codon clusters** — runs of low-optimality codons worth recoding. |
| - **Original vs optimized** — projected CAI gain and rare-codon reduction if |
| the CDS were codon-optimized for the host (reuses the existing optimizer). |
| |
| Pure-Python (stdlib only); reuses the host codon tables already in the project. |
| """ |
| from __future__ import annotations |
|
|
| from dataclasses import dataclass, field |
| from typing import Dict, List, Optional, Tuple |
|
|
| from core.analysis.cai import CODON_TABLES, calculate_cai |
| from core.sequence_tools.codon_optimizer import CODON_TABLE, AA_TO_CODONS |
|
|
| _STOP = {"TAA", "TAG", "TGA"} |
| RARE_THRESHOLD = 0.20 |
| RARE_CLUSTER_MIN = 3 |
| DEFAULT_WINDOW = 17 |
|
|
|
|
| def resolve_organism(organism: Optional[str]) -> str: |
| key = (organism or "human").lower().replace(" ", "").replace(".", "") |
| if key in ("ecoli", "escherichiacoli"): |
| return "ecoli" |
| return "human" if key not in CODON_TABLES else key |
|
|
|
|
| def _codons(cds: str) -> List[str]: |
| s = (cds or "").upper().replace("U", "T") |
| return [s[i:i + 3] for i in range(0, len(s) - len(s) % 3, 3)] |
|
|
|
|
| def _freq_stats(table: Dict[str, float]) -> Tuple[Dict[str, float], Dict[str, float], Dict[str, float], Dict[str, float]]: |
| """Per-codon synonymous frequency, and per-AA max/min/avg of those freqs.""" |
| freq: Dict[str, float] = {} |
| aa_max: Dict[str, float] = {} |
| aa_min: Dict[str, float] = {} |
| aa_avg: Dict[str, float] = {} |
| for aa, syns in AA_TO_CODONS.items(): |
| if aa in ("*", "Stop"): |
| continue |
| ws = [max(table.get(c, 0.0), 0.0) for c in syns] |
| tot = sum(ws) |
| fs = [w / tot if tot > 0 else 0.0 for w in ws] |
| for c, f in zip(syns, fs): |
| freq[c] = f |
| aa_max[aa] = max(fs) if fs else 0.0 |
| aa_min[aa] = min(fs) if fs else 0.0 |
| aa_avg[aa] = (sum(fs) / len(fs)) if fs else 0.0 |
| return freq, aa_max, aa_min, aa_avg |
|
|
|
|
| def per_codon_optimality(cds: str, organism: str = "human") -> List[float]: |
| """Relative adaptiveness (0–1) per non-stop codon.""" |
| table = CODON_TABLES[resolve_organism(organism)] |
| |
| aa_maxw = {aa: max((table.get(c, 0.0) for c in syns), default=0.0) |
| for aa, syns in AA_TO_CODONS.items()} |
| out: List[float] = [] |
| for c in _codons(cds): |
| aa = CODON_TABLE.get(c) |
| if aa is None or aa in ("*", "Stop") or c in _STOP: |
| continue |
| mx = aa_maxw.get(aa, 0.0) |
| out.append((table.get(c, 0.0) / mx) if mx > 0 else 0.0) |
| return out |
|
|
|
|
| def min_max_profile(cds: str, organism: str = "human", |
| window: int = DEFAULT_WINDOW) -> Tuple[List[int], List[float]]: |
| """%MinMax per sliding window; x positions are codon indices (window centres).""" |
| table = CODON_TABLES[resolve_organism(organism)] |
| freq, aa_max, aa_min, aa_avg = _freq_stats(table) |
| codons = [c for c in _codons(cds) if CODON_TABLE.get(c) not in (None, "*", "Stop")] |
| positions: List[int] = [] |
| values: List[float] = [] |
| n = len(codons) |
| if n < window: |
| return positions, values |
| for i in range(n - window + 1): |
| win = codons[i:i + window] |
| actual = sum(freq.get(c, 0.0) for c in win) |
| mx = sum(aa_max.get(CODON_TABLE.get(c, ""), 0.0) for c in win) |
| mn = sum(aa_min.get(CODON_TABLE.get(c, ""), 0.0) for c in win) |
| av = sum(aa_avg.get(CODON_TABLE.get(c, ""), 0.0) for c in win) |
| if actual >= av: |
| pmm = ((actual - av) / (mx - av) * 100.0) if mx > av else 0.0 |
| else: |
| pmm = (-(av - actual) / (av - mn) * 100.0) if av > mn else 0.0 |
| positions.append(i + window // 2) |
| values.append(pmm) |
| return positions, values |
|
|
|
|
| @dataclass |
| class CodonAnalysis: |
| organism: str |
| cai: Optional[float] |
| n_codons: int |
| rare_count: int |
| rare_fraction: float |
| rare_positions: List[int] = field(default_factory=list) |
| rare_clusters: List[Tuple[int, int]] = field(default_factory=list) |
| minmax_positions: List[int] = field(default_factory=list) |
| minmax_values: List[float] = field(default_factory=list) |
| optimality: List[float] = field(default_factory=list) |
| |
| optimized_cai: Optional[float] = None |
| optimized_rare_count: Optional[int] = None |
| codons_changed: Optional[int] = None |
|
|
|
|
| def _clusters(rare_positions: List[int], min_len: int = RARE_CLUSTER_MIN) -> List[Tuple[int, int]]: |
| if not rare_positions: |
| return [] |
| runs = [] |
| start = prev = rare_positions[0] |
| for p in rare_positions[1:]: |
| if p == prev + 1: |
| prev = p |
| else: |
| if prev - start + 1 >= min_len: |
| runs.append((start, prev)) |
| start = prev = p |
| if prev - start + 1 >= min_len: |
| runs.append((start, prev)) |
| return runs |
|
|
|
|
| def analyze_codons(cds: str, organism: str = "human", |
| window: int = DEFAULT_WINDOW, |
| include_optimized: bool = True) -> CodonAnalysis: |
| """Full codon analysis for a CDS.""" |
| org = resolve_organism(organism) |
| opt = per_codon_optimality(cds, org) |
| n = len(opt) |
| rare_positions = [i for i, w in enumerate(opt) if w < RARE_THRESHOLD] |
| mm_pos, mm_val = min_max_profile(cds, org, window) |
|
|
| try: |
| cai = calculate_cai(cds, org) |
| except Exception: |
| cai = None |
|
|
| result = CodonAnalysis( |
| organism=org, cai=cai, n_codons=n, |
| rare_count=len(rare_positions), |
| rare_fraction=(len(rare_positions) / n) if n else 0.0, |
| rare_positions=rare_positions, |
| rare_clusters=_clusters(rare_positions), |
| minmax_positions=mm_pos, minmax_values=mm_val, |
| optimality=opt, |
| ) |
|
|
| if include_optimized and n: |
| try: |
| from core.sequence_tools.codon_optimizer import optimize_codons |
| res = optimize_codons(cds, org) |
| result.optimized_cai = res.optimized_cai |
| result.codons_changed = res.codons_changed |
| result.optimized_rare_count = len( |
| [w for w in per_codon_optimality(res.optimized_cds, org) if w < RARE_THRESHOLD] |
| ) |
| except Exception: |
| pass |
| return result |
|
|