File size: 6,832 Bytes
ffc7197
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
"""
Codon-optimization analysis for an mRNA CDS.

Goes beyond a single CAI number to show *where* codon usage helps or hurts
expression:

  - **Per-codon optimality** — each codon's relative adaptiveness (0–1) vs the
    best synonymous codon for that amino acid in the host.
  - **%MinMax profile** — the classic sliding-window measure (Clarke & Clark):
    positive = a run of common/fast codons, negative = rare/slow codons (the
    kind of cluster that stalls ribosomes).
  - **Rare-codon clusters** — runs of low-optimality codons worth recoding.
  - **Original vs optimized** — projected CAI gain and rare-codon reduction if
    the CDS were codon-optimized for the host (reuses the existing optimizer).

Pure-Python (stdlib only); reuses the host codon tables already in the project.
"""
from __future__ import annotations

from dataclasses import dataclass, field
from typing import Dict, List, Optional, Tuple

from core.analysis.cai import CODON_TABLES, calculate_cai
from core.sequence_tools.codon_optimizer import CODON_TABLE, AA_TO_CODONS

_STOP = {"TAA", "TAG", "TGA"}
RARE_THRESHOLD = 0.20          # optimality below this = rare codon
RARE_CLUSTER_MIN = 3           # consecutive rare codons → a cluster
DEFAULT_WINDOW = 17            # codons, for the %MinMax sliding window


def resolve_organism(organism: Optional[str]) -> str:
    key = (organism or "human").lower().replace(" ", "").replace(".", "")
    if key in ("ecoli", "escherichiacoli"):
        return "ecoli"
    return "human" if key not in CODON_TABLES else key


def _codons(cds: str) -> List[str]:
    s = (cds or "").upper().replace("U", "T")
    return [s[i:i + 3] for i in range(0, len(s) - len(s) % 3, 3)]


def _freq_stats(table: Dict[str, float]) -> Tuple[Dict[str, float], Dict[str, float], Dict[str, float], Dict[str, float]]:
    """Per-codon synonymous frequency, and per-AA max/min/avg of those freqs."""
    freq: Dict[str, float] = {}
    aa_max: Dict[str, float] = {}
    aa_min: Dict[str, float] = {}
    aa_avg: Dict[str, float] = {}
    for aa, syns in AA_TO_CODONS.items():
        if aa in ("*", "Stop"):
            continue
        ws = [max(table.get(c, 0.0), 0.0) for c in syns]
        tot = sum(ws)
        fs = [w / tot if tot > 0 else 0.0 for w in ws]
        for c, f in zip(syns, fs):
            freq[c] = f
        aa_max[aa] = max(fs) if fs else 0.0
        aa_min[aa] = min(fs) if fs else 0.0
        aa_avg[aa] = (sum(fs) / len(fs)) if fs else 0.0
    return freq, aa_max, aa_min, aa_avg


def per_codon_optimality(cds: str, organism: str = "human") -> List[float]:
    """Relative adaptiveness (0–1) per non-stop codon."""
    table = CODON_TABLES[resolve_organism(organism)]
    # max synonymous weight per AA
    aa_maxw = {aa: max((table.get(c, 0.0) for c in syns), default=0.0)
               for aa, syns in AA_TO_CODONS.items()}
    out: List[float] = []
    for c in _codons(cds):
        aa = CODON_TABLE.get(c)
        if aa is None or aa in ("*", "Stop") or c in _STOP:
            continue
        mx = aa_maxw.get(aa, 0.0)
        out.append((table.get(c, 0.0) / mx) if mx > 0 else 0.0)
    return out


def min_max_profile(cds: str, organism: str = "human",
                    window: int = DEFAULT_WINDOW) -> Tuple[List[int], List[float]]:
    """%MinMax per sliding window; x positions are codon indices (window centres)."""
    table = CODON_TABLES[resolve_organism(organism)]
    freq, aa_max, aa_min, aa_avg = _freq_stats(table)
    codons = [c for c in _codons(cds) if CODON_TABLE.get(c) not in (None, "*", "Stop")]
    positions: List[int] = []
    values: List[float] = []
    n = len(codons)
    if n < window:
        return positions, values
    for i in range(n - window + 1):
        win = codons[i:i + window]
        actual = sum(freq.get(c, 0.0) for c in win)
        mx = sum(aa_max.get(CODON_TABLE.get(c, ""), 0.0) for c in win)
        mn = sum(aa_min.get(CODON_TABLE.get(c, ""), 0.0) for c in win)
        av = sum(aa_avg.get(CODON_TABLE.get(c, ""), 0.0) for c in win)
        if actual >= av:
            pmm = ((actual - av) / (mx - av) * 100.0) if mx > av else 0.0
        else:
            pmm = (-(av - actual) / (av - mn) * 100.0) if av > mn else 0.0
        positions.append(i + window // 2)
        values.append(pmm)
    return positions, values


@dataclass
class CodonAnalysis:
    organism: str
    cai: Optional[float]
    n_codons: int
    rare_count: int
    rare_fraction: float
    rare_positions: List[int] = field(default_factory=list)
    rare_clusters: List[Tuple[int, int]] = field(default_factory=list)  # (start, end) codon idx
    minmax_positions: List[int] = field(default_factory=list)
    minmax_values: List[float] = field(default_factory=list)
    optimality: List[float] = field(default_factory=list)
    # original-vs-optimized projection
    optimized_cai: Optional[float] = None
    optimized_rare_count: Optional[int] = None
    codons_changed: Optional[int] = None


def _clusters(rare_positions: List[int], min_len: int = RARE_CLUSTER_MIN) -> List[Tuple[int, int]]:
    if not rare_positions:
        return []
    runs = []
    start = prev = rare_positions[0]
    for p in rare_positions[1:]:
        if p == prev + 1:
            prev = p
        else:
            if prev - start + 1 >= min_len:
                runs.append((start, prev))
            start = prev = p
    if prev - start + 1 >= min_len:
        runs.append((start, prev))
    return runs


def analyze_codons(cds: str, organism: str = "human",
                   window: int = DEFAULT_WINDOW,
                   include_optimized: bool = True) -> CodonAnalysis:
    """Full codon analysis for a CDS."""
    org = resolve_organism(organism)
    opt = per_codon_optimality(cds, org)
    n = len(opt)
    rare_positions = [i for i, w in enumerate(opt) if w < RARE_THRESHOLD]
    mm_pos, mm_val = min_max_profile(cds, org, window)

    try:
        cai = calculate_cai(cds, org)
    except Exception:
        cai = None

    result = CodonAnalysis(
        organism=org, cai=cai, n_codons=n,
        rare_count=len(rare_positions),
        rare_fraction=(len(rare_positions) / n) if n else 0.0,
        rare_positions=rare_positions,
        rare_clusters=_clusters(rare_positions),
        minmax_positions=mm_pos, minmax_values=mm_val,
        optimality=opt,
    )

    if include_optimized and n:
        try:
            from core.sequence_tools.codon_optimizer import optimize_codons
            res = optimize_codons(cds, org)
            result.optimized_cai = res.optimized_cai
            result.codons_changed = res.codons_changed
            result.optimized_rare_count = len(
                [w for w in per_codon_optimality(res.optimized_cds, org) if w < RARE_THRESHOLD]
            )
        except Exception:
            pass
    return result