"""KOfam scan — assign KEGG Orthologue (KO) hits to every genome. Same architecture as scripts/24_unified_hmm_scan.py, but the HMM library is KOfam (~25,000 KOs) instead of our curated Pfam set. Output is the SET OF KOs present in each genome (not a per-marker count, since each KO is itself one specific gene we either find or don't). Per-KO bitscore thresholds come from KEGG's ko_list — using the threshold prevents false positives from distant Pfam-style domain matches. Usage: # one-time: fetch + extract KOfam (~5 min, 1.6 GB on disk) python scripts/28_kofam_scan.py --fetch-only # smoke test python scripts/28_kofam_scan.py --limit 10 --workers 4 # full corpus python scripts/28_kofam_scan.py --workers 8 Output: data/kofam_hits.parquet — one row per genome: genome_accession, ko_K00001, ko_K00002, ..., ko_K25000 (binary 0/1) WARNING: full corpus scan against 25K HMMs is slow on local CPU. Plan ~2-4 min per genome × 22K genomes / 8 workers ≈ ~3-4 days single machine, or run on Modal CPU containers (much cheaper than GPU; pennies per hour each). """ from __future__ import annotations import argparse import gzip import json import shutil import tarfile import time from concurrent.futures import ProcessPoolExecutor, as_completed from pathlib import Path from typing import Any import pandas as pd import pyhmmer import pyhmmer.easel import pyhmmer.plan7 import requests from tqdm import tqdm from microbe_model import config from microbe_model.features.genome import predict_genes from microbe_model.pipeline import _fetch_fasta_bytes KOFAM_DIR = config.DATA / "kofam" KO_LIST_URL = "https://www.genome.jp/ftp/db/kofam/ko_list.gz" PROFILES_URL = "https://www.genome.jp/ftp/db/kofam/profiles.tar.gz" KOFAM_HMM = KOFAM_DIR / "kofam.hmm" # concatenation of ALL 27K HMMs KOFAM_RELEVANT_HMM = KOFAM_DIR / "kofam_relevant.hmm" # subset matching kegg/relevant_kos.txt KO_THRESHOLDS = KOFAM_DIR / "ko_thresholds.tsv" # parsed thresholds RELEVANT_KOS_PATH = config.DATA / "kegg" / "relevant_kos.txt" DEFAULT_EVALUE = 1e-5 # used when KO has no recommended bitscore threshold def build_relevant_library() -> Path: """Concatenate only the HMMs whose KO is in data/kegg/relevant_kos.txt. Reduces scan cost by ~9× (3K vs 27K HMMs). Built once on first invocation. """ if KOFAM_RELEVANT_HMM.exists(): return KOFAM_RELEVANT_HMM if not RELEVANT_KOS_PATH.exists(): raise SystemExit( f"Missing {RELEVANT_KOS_PATH}. Run scripts/27_fetch_kegg_modules.py " "first so we know which KOs to scan." ) relevant = set(RELEVANT_KOS_PATH.read_text().strip().splitlines()) profiles_dir = KOFAM_DIR / "profiles" if not profiles_dir.exists(): raise SystemExit(f"Missing {profiles_dir}. Run --fetch-only first.") print(f"Building relevant-KO library ({len(relevant):,} KOs)...") found = 0 with open(KOFAM_RELEVANT_HMM, "wb") as out: for ko in sorted(relevant): f = profiles_dir / f"{ko}.hmm" if f.exists(): out.write(f.read_bytes()) found += 1 print(f" wrote {KOFAM_RELEVANT_HMM} ({found:,} HMMs)") return KOFAM_RELEVANT_HMM def fetch_kofam() -> None: """Download and extract KOfam profiles + thresholds.""" KOFAM_DIR.mkdir(parents=True, exist_ok=True) if KOFAM_RELEVANT_HMM.exists() and KO_THRESHOLDS.exists(): return ko_list_gz = KOFAM_DIR / "ko_list.gz" if not KO_THRESHOLDS.exists(): if not ko_list_gz.exists(): print(f"Downloading {KO_LIST_URL} (~900 KB)...") r = requests.get(KO_LIST_URL, stream=True, timeout=120) r.raise_for_status() with open(ko_list_gz, "wb") as fh: shutil.copyfileobj(r.raw, fh) with gzip.open(ko_list_gz, "rt") as fh, open(KO_THRESHOLDS, "w") as out: out.write("ko\tthreshold\tscore_type\tprofile_type\tf_measure\tnseq\tnseq_used\talen\tmlen\teff_nseq\tre_pos\tdefinition\n") next(fh) # skip header for line in fh: out.write(line) print(f" parsed → {KO_THRESHOLDS}") profiles_tgz = KOFAM_DIR / "profiles.tar.gz" profiles_dir = KOFAM_DIR / "profiles" if not profiles_dir.exists(): if not profiles_tgz.exists(): print(f"Downloading {PROFILES_URL} (~1.55 GB) — go grab coffee...") r = requests.get(PROFILES_URL, stream=True, timeout=600) r.raise_for_status() with open(profiles_tgz, "wb") as fh: shutil.copyfileobj(r.raw, fh, length=1024 * 1024) print("Extracting profiles tarball...") with tarfile.open(profiles_tgz, "r:gz") as tf: tf.extractall(KOFAM_DIR) if not KOFAM_HMM.exists(): print("Concatenating individual .hmm files into one library...") hmm_files = sorted(profiles_dir.glob("K*.hmm")) with open(KOFAM_HMM, "wb") as out: for h in hmm_files: out.write(h.read_bytes()) print(f" wrote {KOFAM_HMM} ({len(hmm_files)} HMMs)") def load_ko_thresholds() -> dict[str, float]: """Return {KO: bitscore_threshold}. Default to 0 if missing/non-numeric.""" if not KO_THRESHOLDS.exists(): return {} df = pd.read_csv(KO_THRESHOLDS, sep="\t") out: dict[str, float] = {} for ko, thr in zip(df["ko"], df["threshold"], strict=True): try: out[str(ko)] = float(thr) except (TypeError, ValueError): out[str(ko)] = 0.0 return out def _load_hmms(lib_path: Path) -> list[pyhmmer.plan7.HMM]: with pyhmmer.plan7.HMMFile(str(lib_path)) as fh: return list(fh) def scan_proteins( proteins: list[str], hmms: list[pyhmmer.plan7.HMM], alphabet: pyhmmer.easel.Alphabet, thresholds: dict[str, float], ) -> set[str]: seqs: list[pyhmmer.easel.DigitalSequence] = [] for i, prot in enumerate(proteins): if not prot: continue ts = pyhmmer.easel.TextSequence(name=f"p{i}".encode(), sequence=prot) seqs.append(ts.digitize(alphabet)) found: set[str] = set() if not seqs: return found for top_hits in pyhmmer.hmmer.hmmsearch(hmms, seqs, E=DEFAULT_EVALUE, cpus=1): raw_name = top_hits.query.name ko = raw_name.decode() if isinstance(raw_name, bytes) else raw_name thr = thresholds.get(ko, 0.0) for hit in top_hits: if hit.score >= thr and hit.evalue <= DEFAULT_EVALUE: found.add(ko) break return found def _process_one(args: tuple[str, str, str]) -> dict[str, Any] | None: accession, lib_path, thresholds_path = args contigs = _fetch_fasta_bytes(accession) if not contigs: return None try: proteins, _cds, _nt = predict_genes(contigs) except Exception: return None if not proteins: return None alphabet = pyhmmer.easel.Alphabet.amino() hmms = _load_hmms(Path(lib_path)) thresholds = load_ko_thresholds() ko_hits = scan_proteins(proteins, hmms, alphabet, thresholds) return {"genome_accession": accession, "ko_hits": sorted(ko_hits)} def _existing_accessions(jsonl_path: Path) -> set[str]: if not jsonl_path.exists(): return set() seen: set[str] = set() with open(jsonl_path) as fh: for line in fh: try: row = json.loads(line) except Exception: continue acc = row.get("genome_accession") or row.get("accession") if acc: seen.add(str(acc)) return seen def main() -> None: parser = argparse.ArgumentParser() parser.add_argument("--fetch-only", action="store_true", help="Just download + extract KOfam; don't scan.") parser.add_argument("--limit", type=int, default=None) parser.add_argument("--workers", type=int, default=4) args = parser.parse_args() fetch_kofam() if args.fetch_only: build_relevant_library() return lib_path = build_relevant_library() n_hmms = sum(1 for _ in _load_hmms(lib_path)) print(f"\nLoaded {n_hmms:,} KOfam HMMs from {lib_path}") feats = pd.read_parquet(config.DATA / "features.parquet") unique_accs = feats["genome_accession"].dropna().astype(str).unique().tolist() if args.limit: unique_accs = unique_accs[: args.limit] print(f"{len(unique_accs):,} unique genome accessions to scan") out_jsonl = config.DATA / "kofam_hits.jsonl" done = _existing_accessions(out_jsonl) pending = [(acc, str(lib_path), str(KO_THRESHOLDS)) for acc in unique_accs if acc not in done] print(f"{len(done):,} cached, {len(pending):,} new tasks") t0 = time.time() out_jsonl.parent.mkdir(parents=True, exist_ok=True) with open(out_jsonl, "a") as log, ProcessPoolExecutor(max_workers=args.workers) as pool: futures = {pool.submit(_process_one, t): t for t in pending} with tqdm(total=len(pending), unit="genome") as bar: n_ok = 0 for fut in as_completed(futures): try: r = fut.result() except Exception: r = None bar.update(1) if r is None: continue log.write(json.dumps(r) + "\n") log.flush() n_ok += 1 bar.set_postfix(ok=n_ok) print(f"Scan finished in {(time.time() - t0)/60:.1f} min") if __name__ == "__main__": main()