Spaces:
Running
Running
| """Tabular feature extraction from a microbial genome FASTA. | |
| These features are deliberately simple and biologically motivated: | |
| - genome size, GC content, coding density | |
| - predicted gene count and mean CDS length | |
| - proteome-level amino acid composition | |
| - aromatic, charged, and IVYWREL fractions (correlate with growth temperature) | |
| - mean isoelectric point and hydrophobicity | |
| The amino-acid-composition signals have well-established correlations with optimal growth | |
| temperature and pH (Zeldovich 2007; Tekaia 2002), so they give XGBoost real signal to learn from | |
| without any deep model. | |
| """ | |
| from __future__ import annotations | |
| import gzip | |
| from collections import Counter | |
| from collections.abc import Iterable | |
| from pathlib import Path | |
| import numpy as np | |
| import pyrodigal | |
| from Bio import SeqIO | |
| AA_ALPHABET = "ACDEFGHIKLMNPQRSTVWY" | |
| AA_AROMATIC = set("FWY") | |
| AA_CHARGED_POS = set("KRH") | |
| AA_CHARGED_NEG = set("DE") | |
| AA_IVYWREL = set("IVYWREL") # thermophile signature (Zeldovich 2007) | |
| # Kyte-Doolittle hydrophobicity | |
| HYDROPHOBICITY = { | |
| "A": 1.8, "C": 2.5, "D": -3.5, "E": -3.5, "F": 2.8, "G": -0.4, "H": -3.2, | |
| "I": 4.5, "K": -3.9, "L": 3.8, "M": 1.9, "N": -3.5, "P": -1.6, "Q": -3.5, | |
| "R": -4.5, "S": -0.8, "T": -0.7, "V": 4.2, "W": -0.9, "Y": -1.3, | |
| } | |
| # pKa values for isoelectric point estimation (Lehninger) | |
| PKA_NTERM = 9.69 | |
| PKA_CTERM = 2.34 | |
| PKA_SIDE = {"D": 3.65, "E": 4.25, "C": 8.33, "Y": 10.07, "H": 6.00, "K": 10.53, "R": 12.48} | |
| def read_fasta_records(path: Path) -> Iterable[tuple[str, str]]: | |
| opener = gzip.open if str(path).endswith(".gz") else open | |
| with opener(path, "rt") as handle: | |
| for record in SeqIO.parse(handle, "fasta"): | |
| yield record.id, str(record.seq).upper() | |
| MIN_TRAIN_NT = 20_000 # below this, pyrodigal can't train; fall back to meta mode | |
| def predict_genes(contigs: Iterable[tuple[str, str]]) -> tuple[list[str], list[str], int]: | |
| """Run Pyrodigal and return (proteins, nt_cds_sequences, total_nt). | |
| Uses single-genome mode with training on the concatenated contigs — ~7× faster than | |
| meta mode on assembled genomes. Falls back to meta mode for very short or highly | |
| fragmented assemblies that can't be trained. | |
| """ | |
| contigs = list(contigs) # we need to traverse twice | |
| encoded = [(name, seq.encode("ascii")) for name, seq in contigs] | |
| total_nt = sum(len(seq) for _, seq in encoded) | |
| if total_nt >= MIN_TRAIN_NT: | |
| finder = pyrodigal.GeneFinder(meta=False) | |
| train_seq = b"TTAATTAATTAA".join(seq for _, seq in encoded) | |
| try: | |
| finder.train(train_seq) | |
| except Exception: | |
| finder = pyrodigal.GeneFinder(meta=True) | |
| else: | |
| finder = pyrodigal.GeneFinder(meta=True) | |
| proteins: list[str] = [] | |
| cds: list[str] = [] | |
| for _name, seq in encoded: | |
| genes = finder.find_genes(seq) | |
| for gene in genes: | |
| proteins.append(gene.translate().rstrip("*")) | |
| cds.append(gene.sequence()) | |
| return proteins, cds, total_nt | |
| def predict_proteins(contigs: Iterable[tuple[str, str]]) -> tuple[list[str], int]: | |
| """Backwards-compat shim — returns (proteins, total_nt) only.""" | |
| proteins, _cds, total_nt = predict_genes(contigs) | |
| return proteins, total_nt | |
| def aa_composition(proteins: list[str]) -> dict[str, float]: | |
| counts: Counter[str] = Counter() | |
| total = 0 | |
| for p in proteins: | |
| counts.update(p) | |
| total += len(p) | |
| if total == 0: | |
| return {f"aa_frac_{a}": 0.0 for a in AA_ALPHABET} | |
| return {f"aa_frac_{a}": counts.get(a, 0) / total for a in AA_ALPHABET} | |
| def _isoelectric_point(seq: str) -> float: | |
| """Bisection over pH to find the point where net charge is zero.""" | |
| if not seq: | |
| return 7.0 | |
| counts = Counter(seq) | |
| lo, hi = 0.0, 14.0 | |
| for _ in range(50): | |
| ph = (lo + hi) / 2 | |
| net = ( | |
| 1 / (1 + 10 ** (ph - PKA_NTERM)) | |
| - 1 / (1 + 10 ** (PKA_CTERM - ph)) | |
| + counts.get("K", 0) / (1 + 10 ** (ph - PKA_SIDE["K"])) | |
| + counts.get("R", 0) / (1 + 10 ** (ph - PKA_SIDE["R"])) | |
| + counts.get("H", 0) / (1 + 10 ** (ph - PKA_SIDE["H"])) | |
| - counts.get("D", 0) / (1 + 10 ** (PKA_SIDE["D"] - ph)) | |
| - counts.get("E", 0) / (1 + 10 ** (PKA_SIDE["E"] - ph)) | |
| - counts.get("C", 0) / (1 + 10 ** (PKA_SIDE["C"] - ph)) | |
| - counts.get("Y", 0) / (1 + 10 ** (PKA_SIDE["Y"] - ph)) | |
| ) | |
| if net > 0: | |
| lo = ph | |
| else: | |
| hi = ph | |
| return (lo + hi) / 2 | |
| def extract_features_from_seqs( | |
| contigs: list[tuple[str, str]], | |
| *, | |
| include_composition: bool = True, | |
| ) -> dict[str, float]: | |
| """Compute the full feature dict given pre-loaded contigs. | |
| Used by the streaming pipeline to avoid round-tripping FASTA bytes through disk. | |
| When ``include_composition`` is True (default), tetranucleotide and codon-usage | |
| features are appended (320 extra columns). | |
| """ | |
| nt_total = sum(len(s) for _, s in contigs) | |
| gc = sum(s.count("G") + s.count("C") for _, s in contigs) | |
| gc_frac = gc / nt_total if nt_total else 0.0 | |
| proteins, cds, _ = predict_genes(contigs) | |
| aa_total = sum(len(p) for p in proteins) | |
| coding_density = (3 * aa_total) / nt_total if nt_total else 0.0 | |
| composition = aa_composition(proteins) | |
| aromatic = sum(composition[f"aa_frac_{a}"] for a in AA_AROMATIC) | |
| pos_charged = sum(composition[f"aa_frac_{a}"] for a in AA_CHARGED_POS) | |
| neg_charged = sum(composition[f"aa_frac_{a}"] for a in AA_CHARGED_NEG) | |
| ivywrel = sum(composition[f"aa_frac_{a}"] for a in AA_IVYWREL) | |
| hydrophobicity = ( | |
| sum(composition[f"aa_frac_{a}"] * HYDROPHOBICITY[a] for a in AA_ALPHABET) | |
| if proteins else 0.0 | |
| ) | |
| pi_values = [_isoelectric_point(p) for p in proteins[:200]] # 200 sampled proteins is plenty | |
| mean_pi = float(np.mean(pi_values)) if pi_values else 7.0 | |
| cds_lengths = [len(p) for p in proteins] | |
| feats: dict[str, float] = { | |
| "genome_size_nt": float(nt_total), | |
| "n_contigs": float(len(contigs)), | |
| "gc_content": gc_frac, | |
| "n_predicted_cds": float(len(proteins)), | |
| "coding_density": coding_density, | |
| "mean_cds_aa_length": float(np.mean(cds_lengths)) if cds_lengths else 0.0, | |
| "median_cds_aa_length": float(np.median(cds_lengths)) if cds_lengths else 0.0, | |
| "aromatic_frac": aromatic, | |
| "pos_charged_frac": pos_charged, | |
| "neg_charged_frac": neg_charged, | |
| "ivywrel_frac": ivywrel, | |
| "mean_hydrophobicity": hydrophobicity, | |
| "mean_isoelectric_point": mean_pi, | |
| **composition, | |
| } | |
| if include_composition: | |
| from microbe_model.features.composition import codon_freqs, tetranucleotide_freqs | |
| feats.update(tetranucleotide_freqs(contigs)) | |
| feats.update(codon_freqs(cds)) | |
| return feats | |
| def extract_features(fasta_path: Path) -> dict[str, float]: | |
| """Disk-based entry point — convenience wrapper for non-streaming use.""" | |
| return extract_features_from_seqs(list(read_fasta_records(fasta_path))) | |