microbe-model / scripts /06_fetch_gtdb_candidates.py
Miyu Horiuchi
Fix GTDB column names + accession resolution for v226 metadata schema
0fbea89
"""Fetch GTDB representative genomes that don't appear in BacDive.
The point: BacDive only contains organisms that have been cultured at least once.
GTDB representative genomes that lack a BacDive entry are the "novel cultivation
candidates" we want to predict on — these are the closest thing we have to
genuinely uncultured organisms while still being concrete genomes we can fetch.
Inputs:
- data/bacdive_phenotypes.parquet (must exist)
- https://data.gtdb.ecogenomic.org/releases/latest/bac120_metadata.tsv.gz
Output:
- data/gtdb_candidates.parquet: subset of GTDB representatives not in BacDive
Usage:
uv run python scripts/06_fetch_gtdb_candidates.py --max 5000
"""
from __future__ import annotations
import argparse
import gzip
import re
from pathlib import Path
import pandas as pd
import requests
from tqdm import tqdm
from microbe_model import config
GTDB_BAC120_URL = "https://data.gtdb.ecogenomic.org/releases/latest/bac120_metadata.tsv.gz"
def _normalize_accession(acc: str) -> str:
"""Strip prefixes like 'GB_' / 'RS_' that GTDB prepends, return bare accession."""
if not isinstance(acc, str):
return ""
m = re.match(r"^(?:GB_|RS_)?(.+)$", acc)
return m.group(1) if m else acc
def _strip_version(acc: str) -> str:
return re.sub(r"\.\d+$", "", acc) if acc else acc
def download_gtdb_metadata() -> Path:
"""Cache the GTDB metadata TSV locally; ~275MB compressed."""
out = config.DATA / "bac120_metadata.tsv.gz"
if out.exists():
return out
print(f"Downloading {GTDB_BAC120_URL}{out} (~275 MB, takes a few min)")
with requests.get(GTDB_BAC120_URL, stream=True, timeout=600) as resp:
resp.raise_for_status()
total = int(resp.headers.get("content-length", 0))
with open(out, "wb") as fh, tqdm(
total=total, unit="B", unit_scale=True, desc="GTDB"
) as bar:
for chunk in resp.iter_content(chunk_size=1 << 16):
fh.write(chunk)
bar.update(len(chunk))
return out
def main() -> None:
parser = argparse.ArgumentParser()
parser.add_argument("--max", type=int, default=5000,
help="Cap number of candidates returned (default 5000).")
parser.add_argument("--seed", type=int, default=0)
args = parser.parse_args()
pheno_path = config.DATA / "bacdive_phenotypes.parquet"
if not pheno_path.exists():
raise SystemExit(f"Missing {pheno_path}. Run scripts/01_fetch_bacdive.py first.")
pheno = pd.read_parquet(pheno_path)
bacdive_acc = set(pheno["genome_accession"].dropna().astype(str).map(_strip_version))
print(f"BacDive has {len(bacdive_acc):,} unique unversioned accessions")
metadata_path = download_gtdb_metadata()
print("Reading GTDB metadata (this takes ~30-60s)...")
with gzip.open(metadata_path, "rt") as fh:
meta = pd.read_csv(
fh, sep="\t", low_memory=False,
usecols=lambda c: c in {
"accession", "gtdb_representative", "gtdb_taxonomy",
"ncbi_genbank_assembly_accession", "ncbi_organism_name",
"checkm_completeness", "ncbi_assembly_level",
},
)
print(f"Loaded {len(meta):,} GTDB rows")
representatives = meta[meta["gtdb_representative"] == "t"].copy()
print(f"{len(representatives):,} are GTDB representatives")
# Build the bare-accession candidate set (use ncbi_genbank_assembly_accession;
# GTDB's `accession` column has GB_/RS_ prefixes — strip them too as a fallback.)
def _accession(row: pd.Series) -> str:
acc = row.get("ncbi_genbank_assembly_accession")
if isinstance(acc, str) and acc.startswith("GCA_"):
return _strip_version(acc)
return _strip_version(_normalize_accession(row.get("accession", "")))
representatives["_acc_clean"] = representatives.apply(_accession, axis=1)
representatives = representatives[representatives["_acc_clean"] != ""]
not_in_bacdive = representatives[~representatives["_acc_clean"].isin(bacdive_acc)].copy()
print(f"{len(not_in_bacdive):,} representatives are not in BacDive")
# Light quality filter: prefer reasonably complete genomes
not_in_bacdive["checkm_completeness"] = pd.to_numeric(
not_in_bacdive["checkm_completeness"], errors="coerce"
)
high_quality = not_in_bacdive[not_in_bacdive["checkm_completeness"] >= 90].copy()
print(f"{len(high_quality):,} are >=90% complete (CheckM)")
if len(high_quality) < args.max:
sample = high_quality
else:
sample = high_quality.sample(n=args.max, random_state=args.seed)
sample = sample.rename(columns={
"_acc_clean": "genome_accession",
"ncbi_genbank_assembly_accession": "ncbi_assembly_accession_versioned",
})
sample = sample[[
"accession", "genome_accession", "ncbi_assembly_accession_versioned",
"gtdb_taxonomy", "ncbi_organism_name", "checkm_completeness",
]]
out = config.DATA / "gtdb_candidates.parquet"
sample.to_parquet(out, index=False)
print(f"\nWrote {len(sample):,} candidates to {out}")
if __name__ == "__main__":
main()