Miyu Horiuchi
v1 features: wire tetranucleotides + codon usage into streaming pipeline
457bc35
"""Tabular feature extraction from a microbial genome FASTA.
These features are deliberately simple and biologically motivated:
- genome size, GC content, coding density
- predicted gene count and mean CDS length
- proteome-level amino acid composition
- aromatic, charged, and IVYWREL fractions (correlate with growth temperature)
- mean isoelectric point and hydrophobicity
The amino-acid-composition signals have well-established correlations with optimal growth
temperature and pH (Zeldovich 2007; Tekaia 2002), so they give XGBoost real signal to learn from
without any deep model.
"""
from __future__ import annotations
import gzip
from collections import Counter
from collections.abc import Iterable
from pathlib import Path
import numpy as np
import pyrodigal
from Bio import SeqIO
AA_ALPHABET = "ACDEFGHIKLMNPQRSTVWY"
AA_AROMATIC = set("FWY")
AA_CHARGED_POS = set("KRH")
AA_CHARGED_NEG = set("DE")
AA_IVYWREL = set("IVYWREL") # thermophile signature (Zeldovich 2007)
# Kyte-Doolittle hydrophobicity
HYDROPHOBICITY = {
"A": 1.8, "C": 2.5, "D": -3.5, "E": -3.5, "F": 2.8, "G": -0.4, "H": -3.2,
"I": 4.5, "K": -3.9, "L": 3.8, "M": 1.9, "N": -3.5, "P": -1.6, "Q": -3.5,
"R": -4.5, "S": -0.8, "T": -0.7, "V": 4.2, "W": -0.9, "Y": -1.3,
}
# pKa values for isoelectric point estimation (Lehninger)
PKA_NTERM = 9.69
PKA_CTERM = 2.34
PKA_SIDE = {"D": 3.65, "E": 4.25, "C": 8.33, "Y": 10.07, "H": 6.00, "K": 10.53, "R": 12.48}
def read_fasta_records(path: Path) -> Iterable[tuple[str, str]]:
opener = gzip.open if str(path).endswith(".gz") else open
with opener(path, "rt") as handle:
for record in SeqIO.parse(handle, "fasta"):
yield record.id, str(record.seq).upper()
MIN_TRAIN_NT = 20_000 # below this, pyrodigal can't train; fall back to meta mode
def predict_genes(contigs: Iterable[tuple[str, str]]) -> tuple[list[str], list[str], int]:
"""Run Pyrodigal and return (proteins, nt_cds_sequences, total_nt).
Uses single-genome mode with training on the concatenated contigs — ~7× faster than
meta mode on assembled genomes. Falls back to meta mode for very short or highly
fragmented assemblies that can't be trained.
"""
contigs = list(contigs) # we need to traverse twice
encoded = [(name, seq.encode("ascii")) for name, seq in contigs]
total_nt = sum(len(seq) for _, seq in encoded)
if total_nt >= MIN_TRAIN_NT:
finder = pyrodigal.GeneFinder(meta=False)
train_seq = b"TTAATTAATTAA".join(seq for _, seq in encoded)
try:
finder.train(train_seq)
except Exception:
finder = pyrodigal.GeneFinder(meta=True)
else:
finder = pyrodigal.GeneFinder(meta=True)
proteins: list[str] = []
cds: list[str] = []
for _name, seq in encoded:
genes = finder.find_genes(seq)
for gene in genes:
proteins.append(gene.translate().rstrip("*"))
cds.append(gene.sequence())
return proteins, cds, total_nt
def predict_proteins(contigs: Iterable[tuple[str, str]]) -> tuple[list[str], int]:
"""Backwards-compat shim — returns (proteins, total_nt) only."""
proteins, _cds, total_nt = predict_genes(contigs)
return proteins, total_nt
def aa_composition(proteins: list[str]) -> dict[str, float]:
counts: Counter[str] = Counter()
total = 0
for p in proteins:
counts.update(p)
total += len(p)
if total == 0:
return {f"aa_frac_{a}": 0.0 for a in AA_ALPHABET}
return {f"aa_frac_{a}": counts.get(a, 0) / total for a in AA_ALPHABET}
def _isoelectric_point(seq: str) -> float:
"""Bisection over pH to find the point where net charge is zero."""
if not seq:
return 7.0
counts = Counter(seq)
lo, hi = 0.0, 14.0
for _ in range(50):
ph = (lo + hi) / 2
net = (
1 / (1 + 10 ** (ph - PKA_NTERM))
- 1 / (1 + 10 ** (PKA_CTERM - ph))
+ counts.get("K", 0) / (1 + 10 ** (ph - PKA_SIDE["K"]))
+ counts.get("R", 0) / (1 + 10 ** (ph - PKA_SIDE["R"]))
+ counts.get("H", 0) / (1 + 10 ** (ph - PKA_SIDE["H"]))
- counts.get("D", 0) / (1 + 10 ** (PKA_SIDE["D"] - ph))
- counts.get("E", 0) / (1 + 10 ** (PKA_SIDE["E"] - ph))
- counts.get("C", 0) / (1 + 10 ** (PKA_SIDE["C"] - ph))
- counts.get("Y", 0) / (1 + 10 ** (PKA_SIDE["Y"] - ph))
)
if net > 0:
lo = ph
else:
hi = ph
return (lo + hi) / 2
def extract_features_from_seqs(
contigs: list[tuple[str, str]],
*,
include_composition: bool = True,
) -> dict[str, float]:
"""Compute the full feature dict given pre-loaded contigs.
Used by the streaming pipeline to avoid round-tripping FASTA bytes through disk.
When ``include_composition`` is True (default), tetranucleotide and codon-usage
features are appended (320 extra columns).
"""
nt_total = sum(len(s) for _, s in contigs)
gc = sum(s.count("G") + s.count("C") for _, s in contigs)
gc_frac = gc / nt_total if nt_total else 0.0
proteins, cds, _ = predict_genes(contigs)
aa_total = sum(len(p) for p in proteins)
coding_density = (3 * aa_total) / nt_total if nt_total else 0.0
composition = aa_composition(proteins)
aromatic = sum(composition[f"aa_frac_{a}"] for a in AA_AROMATIC)
pos_charged = sum(composition[f"aa_frac_{a}"] for a in AA_CHARGED_POS)
neg_charged = sum(composition[f"aa_frac_{a}"] for a in AA_CHARGED_NEG)
ivywrel = sum(composition[f"aa_frac_{a}"] for a in AA_IVYWREL)
hydrophobicity = (
sum(composition[f"aa_frac_{a}"] * HYDROPHOBICITY[a] for a in AA_ALPHABET)
if proteins else 0.0
)
pi_values = [_isoelectric_point(p) for p in proteins[:200]] # 200 sampled proteins is plenty
mean_pi = float(np.mean(pi_values)) if pi_values else 7.0
cds_lengths = [len(p) for p in proteins]
feats: dict[str, float] = {
"genome_size_nt": float(nt_total),
"n_contigs": float(len(contigs)),
"gc_content": gc_frac,
"n_predicted_cds": float(len(proteins)),
"coding_density": coding_density,
"mean_cds_aa_length": float(np.mean(cds_lengths)) if cds_lengths else 0.0,
"median_cds_aa_length": float(np.median(cds_lengths)) if cds_lengths else 0.0,
"aromatic_frac": aromatic,
"pos_charged_frac": pos_charged,
"neg_charged_frac": neg_charged,
"ivywrel_frac": ivywrel,
"mean_hydrophobicity": hydrophobicity,
"mean_isoelectric_point": mean_pi,
**composition,
}
if include_composition:
from microbe_model.features.composition import codon_freqs, tetranucleotide_freqs
feats.update(tetranucleotide_freqs(contigs))
feats.update(codon_freqs(cds))
return feats
def extract_features(fasta_path: Path) -> dict[str, float]:
"""Disk-based entry point — convenience wrapper for non-streaming use."""
return extract_features_from_seqs(list(read_fasta_records(fasta_path)))