Miyu Horiuchi
Streaming fetch+featurize pipeline + 6× pyrodigal speedup + GCA version resolution
383bb62
"""Streaming fetch+featurize pipeline.
Each worker:
1. Downloads a genome FASTA from NCBI Datasets into memory (no disk write)
2. Runs pyrodigal + AA-composition feature extraction
3. Returns the feature dict — caller persists to a JSONL append log
Workers are fully independent processes; the only shared state is the JSONL log
(written from the parent), so resumability is trivial: skip any bacdive_id whose
row is already in the log.
"""
from __future__ import annotations
import io
import json
import time
import zipfile
from collections.abc import Callable
from concurrent.futures import ProcessPoolExecutor, as_completed
from pathlib import Path
from typing import Any
import requests
from microbe_model import config
from microbe_model.features.genome import extract_features_from_seqs
DATASETS_URL = "https://api.ncbi.nlm.nih.gov/datasets/v2/genome/accession/{acc}/download"
RATE_LIMIT_S = 0.1 if config.NCBI_API_KEY else 0.34
VERSION_FALLBACKS = (".1", ".2", ".3", ".4") # tried in order for unversioned accessions
EMPTY_ZIP_BYTES = 2_000 # NCBI "no content" zips are ~850 bytes; real genomes are MB+
def _has_version(accession: str) -> bool:
"""True if the accession ends in `.<digit(s)>` (e.g. GCA_X.1, GCF_X.2)."""
if "." not in accession:
return False
suffix = accession.rsplit(".", 1)[-1]
return suffix.isdigit()
def _candidate_accessions(accession: str) -> list[str]:
if _has_version(accession):
return [accession]
return [accession + v for v in VERSION_FALLBACKS]
def _fetch_one_accession(accession: str) -> bytes | None:
"""Download a single (versioned) accession and return zip bytes, or None on miss."""
headers: dict[str, str] = {"Accept": "application/zip"}
if config.NCBI_API_KEY:
headers["api-key"] = config.NCBI_API_KEY
params = {"include_annotation_type": "GENOME_FASTA"}
for attempt in range(3):
try:
time.sleep(RATE_LIMIT_S)
resp = requests.get(
DATASETS_URL.format(acc=accession),
params=params,
headers=headers,
timeout=120,
)
if resp.status_code == 404:
return None
if resp.status_code in (429, 502, 503):
time.sleep(2 ** attempt)
continue
resp.raise_for_status()
except requests.RequestException:
if attempt == 2:
return None
time.sleep(2 ** attempt)
continue
# NCBI returns 200 + tiny "empty" zip when the accession doesn't exist
# (e.g. unversioned forms or version that was never assigned).
if len(resp.content) < EMPTY_ZIP_BYTES:
return None
return resp.content
return None
def _fetch_fasta_bytes(accession: str) -> list[tuple[str, str]] | None:
"""Download a genome FASTA and return [(contig_id, sequence_str), ...].
For unversioned accessions, tries ``.1``, ``.2``, ``.3``, ``.4`` in order
(BacDive stores accessions without version suffixes; we resolve to the actual
deposited version). Returns None if no version yields data.
"""
for candidate in _candidate_accessions(accession):
zip_bytes = _fetch_one_accession(candidate)
if zip_bytes is None:
continue
try:
with zipfile.ZipFile(io.BytesIO(zip_bytes)) as zf:
fasta_names = [n for n in zf.namelist() if n.endswith(".fna")]
if not fasta_names:
continue
with zf.open(fasta_names[0]) as src:
raw = src.read()
except zipfile.BadZipFile:
continue
return _parse_fasta_bytes(raw)
return None
def _parse_fasta_bytes(raw: bytes) -> list[tuple[str, str]]:
"""Minimal in-memory FASTA parser, returns list of (id, sequence) tuples.
Avoids biopython's overhead and the gzip→tempfile→biopython round-trip.
"""
contigs: list[tuple[str, str]] = []
current_id: str | None = None
current_chunks: list[str] = []
for line_bytes in raw.splitlines():
if not line_bytes:
continue
if line_bytes.startswith(b">"):
if current_id is not None:
contigs.append((current_id, "".join(current_chunks).upper()))
current_id = line_bytes[1:].decode("ascii", errors="replace").split()[0]
current_chunks = []
else:
current_chunks.append(line_bytes.decode("ascii", errors="replace"))
if current_id is not None:
contigs.append((current_id, "".join(current_chunks).upper()))
return contigs
def _process_one(args: tuple[int, str]) -> dict[str, Any] | None:
"""Worker entry point — runs in a child process. Returns None on any failure."""
bacdive_id, accession = args
contigs = _fetch_fasta_bytes(accession)
if not contigs:
return None
try:
feats = extract_features_from_seqs(contigs)
except Exception:
return None
feats["bacdive_id"] = bacdive_id
feats["genome_accession"] = accession
return feats
def stream_fetch_and_featurize(
tasks: list[tuple[int, str]],
*,
out_path: Path,
n_workers: int,
on_progress: Callable[[int, int, int], None] | None = None,
) -> None:
"""Fetch + featurize a list of (bacdive_id, accession) pairs in parallel.
Streams successful results as JSON lines into out_path. Skips tasks already in the file.
"""
done_ids = _load_done_ids(out_path)
pending = [(bid, acc) for bid, acc in tasks if bid not in done_ids]
if not pending:
return
out_path.parent.mkdir(parents=True, exist_ok=True)
with open(out_path, "a") as log, ProcessPoolExecutor(max_workers=n_workers) as pool:
futures = {pool.submit(_process_one, t): t for t in pending}
n_success = 0
for n_completed, future in enumerate(as_completed(futures), start=1):
try:
result = future.result()
except Exception:
result = None
if result is not None:
log.write(json.dumps(result) + "\n")
log.flush()
n_success += 1
if on_progress is not None:
on_progress(n_completed, n_success, len(pending))
def _load_done_ids(jsonl_path: Path) -> set[int]:
if not jsonl_path.exists():
return set()
done: set[int] = set()
with open(jsonl_path) as fh:
for line in fh:
try:
row = json.loads(line)
done.add(int(row["bacdive_id"]))
except (json.JSONDecodeError, KeyError, ValueError):
continue
return done