"""Streaming fetch+featurize pipeline. Each worker: 1. Downloads a genome FASTA from NCBI Datasets into memory (no disk write) 2. Runs pyrodigal + AA-composition feature extraction 3. Returns the feature dict — caller persists to a JSONL append log Workers are fully independent processes; the only shared state is the JSONL log (written from the parent), so resumability is trivial: skip any bacdive_id whose row is already in the log. """ from __future__ import annotations import io import json import time import zipfile from collections.abc import Callable from concurrent.futures import ProcessPoolExecutor, as_completed from pathlib import Path from typing import Any import requests from microbe_model import config from microbe_model.features.genome import extract_features_from_seqs DATASETS_URL = "https://api.ncbi.nlm.nih.gov/datasets/v2/genome/accession/{acc}/download" RATE_LIMIT_S = 0.1 if config.NCBI_API_KEY else 0.34 VERSION_FALLBACKS = (".1", ".2", ".3", ".4") # tried in order for unversioned accessions EMPTY_ZIP_BYTES = 2_000 # NCBI "no content" zips are ~850 bytes; real genomes are MB+ def _has_version(accession: str) -> bool: """True if the accession ends in `.` (e.g. GCA_X.1, GCF_X.2).""" if "." not in accession: return False suffix = accession.rsplit(".", 1)[-1] return suffix.isdigit() def _candidate_accessions(accession: str) -> list[str]: if _has_version(accession): return [accession] return [accession + v for v in VERSION_FALLBACKS] def _fetch_one_accession(accession: str) -> bytes | None: """Download a single (versioned) accession and return zip bytes, or None on miss.""" headers: dict[str, str] = {"Accept": "application/zip"} if config.NCBI_API_KEY: headers["api-key"] = config.NCBI_API_KEY params = {"include_annotation_type": "GENOME_FASTA"} for attempt in range(3): try: time.sleep(RATE_LIMIT_S) resp = requests.get( DATASETS_URL.format(acc=accession), params=params, headers=headers, timeout=120, ) if resp.status_code == 404: return None if resp.status_code in (429, 502, 503): time.sleep(2 ** attempt) continue resp.raise_for_status() except requests.RequestException: if attempt == 2: return None time.sleep(2 ** attempt) continue # NCBI returns 200 + tiny "empty" zip when the accession doesn't exist # (e.g. unversioned forms or version that was never assigned). if len(resp.content) < EMPTY_ZIP_BYTES: return None return resp.content return None def _fetch_fasta_bytes(accession: str) -> list[tuple[str, str]] | None: """Download a genome FASTA and return [(contig_id, sequence_str), ...]. For unversioned accessions, tries ``.1``, ``.2``, ``.3``, ``.4`` in order (BacDive stores accessions without version suffixes; we resolve to the actual deposited version). Returns None if no version yields data. """ for candidate in _candidate_accessions(accession): zip_bytes = _fetch_one_accession(candidate) if zip_bytes is None: continue try: with zipfile.ZipFile(io.BytesIO(zip_bytes)) as zf: fasta_names = [n for n in zf.namelist() if n.endswith(".fna")] if not fasta_names: continue with zf.open(fasta_names[0]) as src: raw = src.read() except zipfile.BadZipFile: continue return _parse_fasta_bytes(raw) return None def _parse_fasta_bytes(raw: bytes) -> list[tuple[str, str]]: """Minimal in-memory FASTA parser, returns list of (id, sequence) tuples. Avoids biopython's overhead and the gzip→tempfile→biopython round-trip. """ contigs: list[tuple[str, str]] = [] current_id: str | None = None current_chunks: list[str] = [] for line_bytes in raw.splitlines(): if not line_bytes: continue if line_bytes.startswith(b">"): if current_id is not None: contigs.append((current_id, "".join(current_chunks).upper())) current_id = line_bytes[1:].decode("ascii", errors="replace").split()[0] current_chunks = [] else: current_chunks.append(line_bytes.decode("ascii", errors="replace")) if current_id is not None: contigs.append((current_id, "".join(current_chunks).upper())) return contigs def _process_one(args: tuple[int, str]) -> dict[str, Any] | None: """Worker entry point — runs in a child process. Returns None on any failure.""" bacdive_id, accession = args contigs = _fetch_fasta_bytes(accession) if not contigs: return None try: feats = extract_features_from_seqs(contigs) except Exception: return None feats["bacdive_id"] = bacdive_id feats["genome_accession"] = accession return feats def stream_fetch_and_featurize( tasks: list[tuple[int, str]], *, out_path: Path, n_workers: int, on_progress: Callable[[int, int, int], None] | None = None, ) -> None: """Fetch + featurize a list of (bacdive_id, accession) pairs in parallel. Streams successful results as JSON lines into out_path. Skips tasks already in the file. """ done_ids = _load_done_ids(out_path) pending = [(bid, acc) for bid, acc in tasks if bid not in done_ids] if not pending: return out_path.parent.mkdir(parents=True, exist_ok=True) with open(out_path, "a") as log, ProcessPoolExecutor(max_workers=n_workers) as pool: futures = {pool.submit(_process_one, t): t for t in pending} n_success = 0 for n_completed, future in enumerate(as_completed(futures), start=1): try: result = future.result() except Exception: result = None if result is not None: log.write(json.dumps(result) + "\n") log.flush() n_success += 1 if on_progress is not None: on_progress(n_completed, n_success, len(pending)) def _load_done_ids(jsonl_path: Path) -> set[int]: if not jsonl_path.exists(): return set() done: set[int] = set() with open(jsonl_path) as fh: for line in fh: try: row = json.loads(line) done.add(int(row["bacdive_id"])) except (json.JSONDecodeError, KeyError, ValueError): continue return done