"""Fetch GTDB representative genomes that don't appear in BacDive. The point: BacDive only contains organisms that have been cultured at least once. GTDB representative genomes that lack a BacDive entry are the "novel cultivation candidates" we want to predict on — these are the closest thing we have to genuinely uncultured organisms while still being concrete genomes we can fetch. Inputs: - data/bacdive_phenotypes.parquet (must exist) - https://data.gtdb.ecogenomic.org/releases/latest/bac120_metadata.tsv.gz Output: - data/gtdb_candidates.parquet: subset of GTDB representatives not in BacDive Usage: uv run python scripts/06_fetch_gtdb_candidates.py --max 5000 """ from __future__ import annotations import argparse import gzip import re from pathlib import Path import pandas as pd import requests from tqdm import tqdm from microbe_model import config GTDB_BAC120_URL = "https://data.gtdb.ecogenomic.org/releases/latest/bac120_metadata.tsv.gz" def _normalize_accession(acc: str) -> str: """Strip prefixes like 'GB_' / 'RS_' that GTDB prepends, return bare accession.""" if not isinstance(acc, str): return "" m = re.match(r"^(?:GB_|RS_)?(.+)$", acc) return m.group(1) if m else acc def _strip_version(acc: str) -> str: return re.sub(r"\.\d+$", "", acc) if acc else acc def download_gtdb_metadata() -> Path: """Cache the GTDB metadata TSV locally; ~275MB compressed.""" out = config.DATA / "bac120_metadata.tsv.gz" if out.exists(): return out print(f"Downloading {GTDB_BAC120_URL} → {out} (~275 MB, takes a few min)") with requests.get(GTDB_BAC120_URL, stream=True, timeout=600) as resp: resp.raise_for_status() total = int(resp.headers.get("content-length", 0)) with open(out, "wb") as fh, tqdm( total=total, unit="B", unit_scale=True, desc="GTDB" ) as bar: for chunk in resp.iter_content(chunk_size=1 << 16): fh.write(chunk) bar.update(len(chunk)) return out def main() -> None: parser = argparse.ArgumentParser() parser.add_argument("--max", type=int, default=5000, help="Cap number of candidates returned (default 5000).") parser.add_argument("--seed", type=int, default=0) args = parser.parse_args() pheno_path = config.DATA / "bacdive_phenotypes.parquet" if not pheno_path.exists(): raise SystemExit(f"Missing {pheno_path}. Run scripts/01_fetch_bacdive.py first.") pheno = pd.read_parquet(pheno_path) bacdive_acc = set(pheno["genome_accession"].dropna().astype(str).map(_strip_version)) print(f"BacDive has {len(bacdive_acc):,} unique unversioned accessions") metadata_path = download_gtdb_metadata() print("Reading GTDB metadata (this takes ~30-60s)...") with gzip.open(metadata_path, "rt") as fh: meta = pd.read_csv( fh, sep="\t", low_memory=False, usecols=lambda c: c in { "accession", "gtdb_representative", "gtdb_taxonomy", "ncbi_genbank_assembly_accession", "ncbi_organism_name", "checkm_completeness", "ncbi_assembly_level", }, ) print(f"Loaded {len(meta):,} GTDB rows") representatives = meta[meta["gtdb_representative"] == "t"].copy() print(f"{len(representatives):,} are GTDB representatives") # Build the bare-accession candidate set (use ncbi_genbank_assembly_accession; # GTDB's `accession` column has GB_/RS_ prefixes — strip them too as a fallback.) def _accession(row: pd.Series) -> str: acc = row.get("ncbi_genbank_assembly_accession") if isinstance(acc, str) and acc.startswith("GCA_"): return _strip_version(acc) return _strip_version(_normalize_accession(row.get("accession", ""))) representatives["_acc_clean"] = representatives.apply(_accession, axis=1) representatives = representatives[representatives["_acc_clean"] != ""] not_in_bacdive = representatives[~representatives["_acc_clean"].isin(bacdive_acc)].copy() print(f"{len(not_in_bacdive):,} representatives are not in BacDive") # Light quality filter: prefer reasonably complete genomes not_in_bacdive["checkm_completeness"] = pd.to_numeric( not_in_bacdive["checkm_completeness"], errors="coerce" ) high_quality = not_in_bacdive[not_in_bacdive["checkm_completeness"] >= 90].copy() print(f"{len(high_quality):,} are >=90% complete (CheckM)") if len(high_quality) < args.max: sample = high_quality else: sample = high_quality.sample(n=args.max, random_state=args.seed) sample = sample.rename(columns={ "_acc_clean": "genome_accession", "ncbi_genbank_assembly_accession": "ncbi_assembly_accession_versioned", }) sample = sample[[ "accession", "genome_accession", "ncbi_assembly_accession_versioned", "gtdb_taxonomy", "ncbi_organism_name", "checkm_completeness", ]] out = config.DATA / "gtdb_candidates.parquet" sample.to_parquet(out, index=False) print(f"\nWrote {len(sample):,} candidates to {out}") if __name__ == "__main__": main()