"""Train the multi-task XGBoost baseline. Joins phenotypes + features, derives a stable group column for GroupKFold, trains, saves the merged training table for the eval renderer, and writes per-target metrics. """ from __future__ import annotations import time import pandas as pd from microbe_model import config from microbe_model.train.baseline import save_results, train_all def derive_group(row: pd.Series) -> str: """Group-K-fold key. Prefer LPSN family (from BacDive); fall back to genus then species.""" for col in ("family", "genus"): val = row.get(col) if isinstance(val, str) and val: return val species = row.get("species") if isinstance(species, str) and species: return species.split()[0] return "__unknown__" def encode_isolation_categories( df: pd.DataFrame, *, min_count: int = 10, ) -> tuple[pd.DataFrame, list[str]]: """One-hot encode isolation_cat1/cat2 (pipe-joined multi-labels). Each strain's category cell is "Tag1|Tag2|..." (or NaN). We split, then create one iso__ column per tag that appears in ≥min_count training rows. Strains without any isolation info get all-zero rows for these features (XGBoost treats this as "no signal" rather than missing). """ new_cols: list[str] = [] for level in ("isolation_cat1", "isolation_cat2"): if level not in df.columns: continue from collections import Counter tag_counts: Counter[str] = Counter() for v in df[level].dropna(): tag_counts.update(v.split("|")) kept = [t for t, n in tag_counts.items() if n >= min_count] seen_slugs: set[str] = set() import re for tag in sorted(kept): slug = tag.lower().replace(">", "gt").replace("<", "lt") slug = re.sub(r"[^a-z0-9]+", "_", slug).strip("_") col = f"iso_{level.split('_')[1]}_{slug}" if col in seen_slugs: continue seen_slugs.add(col) df[col] = df[level].fillna("").apply(lambda v, t=tag: int(t in v.split("|"))) new_cols.append(col) return df, new_cols def main() -> None: t0 = time.time() pheno = pd.read_parquet(config.DATA / "bacdive_phenotypes.parquet") feats = pd.read_parquet(config.DATA / "features.parquet") df = pheno.merge(feats, on=["bacdive_id", "genome_accession"], how="inner") df["group"] = df.apply(derive_group, axis=1) df, iso_cols = encode_isolation_categories(df) print(f"Encoded {len(iso_cols)} isolation-category features " f"({df[iso_cols].sum().sum():.0f} non-zero entries)") md_path = config.DATA / "mediadive_features.parquet" md_cols: list[str] = [] if md_path.exists(): md = pd.read_parquet(md_path) md["bacdive_id"] = md["bacdive_id"].astype(int) df["bacdive_id"] = df["bacdive_id"].astype(int) md_cols = [c for c in md.columns if c != "bacdive_id"] df = df.merge(md, on="bacdive_id", how="left") n_with_md = df[md_cols[0]].notna().sum() if md_cols else 0 print(f"Joined MediaDive features ({len(md_cols)} cols) — " f"{n_with_md:,}/{len(df):,} training rows have MediaDive data") hmm_path = config.DATA / "hmm_features.parquet" hmm_cols: list[str] = [] if hmm_path.exists(): hmm = pd.read_parquet(hmm_path) hmm_cols = [c for c in hmm.columns if c != "genome_accession"] df = df.merge(hmm, on="genome_accession", how="left") n_with_hmm = df[hmm_cols[0]].notna().sum() if hmm_cols else 0 print(f"Joined HMM features ({len(hmm_cols)} cols) — " f"{n_with_hmm:,}/{len(df):,} training rows have HMM data") kegg_path = config.DATA / "kegg_modules.parquet" kegg_cols: list[str] = [] if kegg_path.exists(): kegg = pd.read_parquet(kegg_path) kegg_cols = [c for c in kegg.columns if c != "genome_accession"] df = df.merge(kegg, on="genome_accession", how="left") n_with_kegg = df[kegg_cols[0]].notna().sum() if kegg_cols else 0 print(f"Joined KEGG module completeness ({len(kegg_cols)} cols) — " f"{n_with_kegg:,}/{len(df):,} training rows have KEGG data") pme_path = config.DATA / "per_marker_embeddings.parquet" pme_cols: list[str] = [] if pme_path.exists(): pme = pd.read_parquet(pme_path) pme_cols = [c for c in pme.columns if c.startswith("pme_")] pme_join = pme[["genome_accession"] + pme_cols].drop_duplicates("genome_accession") df = df.merge(pme_join, on="genome_accession", how="left") n_with_pme = df[pme_cols[0]].notna().sum() if pme_cols else 0 print(f"Joined per-marker ESM-2 embeddings ({len(pme_cols)} cols) — " f"{n_with_pme:,}/{len(df):,} training rows have PME data") iso_meta_path = config.DATA / "isolation_metadata.parquet" iso_meta_cols: list[str] = [] if iso_meta_path.exists(): iso_meta = pd.read_parquet(iso_meta_path) iso_meta["bacdive_id"] = iso_meta["bacdive_id"].astype(int) df["bacdive_id"] = df["bacdive_id"].astype(int) # Use only the numeric/binary columns; leave free-text out of XGBoost keep = ["iso_lat", "iso_lon", "iso_collection_year"] keep += [c for c in iso_meta.columns if c.startswith(("iso_continent_", "iso_country_", "iso_host_kingdom_"))] iso_meta_cols = [c for c in keep if c in iso_meta.columns] df = df.merge(iso_meta[["bacdive_id"] + iso_meta_cols], on="bacdive_id", how="left") print(f"Joined isolation metadata ({len(iso_meta_cols)} cols)") feature_cols = [c for c in feats.columns if c not in {"bacdive_id", "genome_accession"}] feature_cols = feature_cols + iso_cols + md_cols + hmm_cols + kegg_cols + iso_meta_cols + pme_cols print(f"Training table: {len(df):,} strains × {len(feature_cols)} features") print(f"Distinct groups: {df['group'].nunique():,}") print(f"Group sizes (top 10): {df['group'].value_counts().head(10).to_dict()}") print() training_table = config.DATA / "training_table.parquet" df.to_parquet(training_table, index=False) print(f"Wrote training table to {training_table}") results = train_all(df, feature_cols, group_col_override="group") out = config.ARTIFACTS / "baseline_results.json" predictions_out = config.ARTIFACTS / "predictions.parquet" save_results(results, out, predictions_path=predictions_out, feature_cols=feature_cols) print(f"Wrote per-strain predictions to {predictions_out}") print(f"\nResults summary ({time.time() - t0:.1f}s):\n") for target, r in results.items(): if r.folds: metric = r.folds[0].metric_name print(f" {target:25s} {metric:10s} = {r.mean():.4f} (n_folds={len(r.folds)})") else: print(f" {target:25s} skipped (insufficient data)") if __name__ == "__main__": main()