"""Train per-medium binary classifiers to recommend cultivation media for a genome. Setup: - Filter to media used by >= MIN_STRAINS_PER_MEDIUM strains (default 100). - For each such medium m, build a binary label: y_i = 1 if strain i has a growth=yes link to m, else 0. - Train one XGBoost classifier per medium with GroupKFold by family. - At inference, output a (n_strains × n_media) probability matrix. The deliverable: given a new (possibly uncultured) genome, output the top-K media ranked by predicted probability. This is the "what should I try first?" output microbiologists actually want. Limitations: - All BacDive `culture medium` entries are growth=yes — we have positive examples but no explicit negatives. We construct negatives from strains that have *some* media link but not this one. This may bias toward media that are just under-recorded. - No concentration prediction yet — only recipe selection. v1 will add a secondary regression head that adjusts compound concentrations. """ from __future__ import annotations import json from dataclasses import dataclass, field from pathlib import Path import numpy as np import pandas as pd import xgboost as xgb from sklearn.metrics import average_precision_score, roc_auc_score from sklearn.model_selection import GroupKFold MIN_STRAINS_PER_MEDIUM = 100 @dataclass class MediumModelResult: medium_id: str medium_name: str n_positives: int n_negatives: int fold_metrics: list[dict] = field(default_factory=list) def mean_pr_auc(self) -> float: if not self.fold_metrics: return float("nan") return float(np.mean([m["pr_auc"] for m in self.fold_metrics])) def mean_roc_auc(self) -> float: if not self.fold_metrics: return float("nan") return float(np.mean([m["roc_auc"] for m in self.fold_metrics])) def build_training_table( features: pd.DataFrame, strain_media: pd.DataFrame, bacdive: pd.DataFrame, ) -> tuple[pd.DataFrame, pd.DataFrame, list[str]]: """Return (X, y_matrix, medium_ids) for media meeting the strain-count threshold. X: (n_strains × n_features) feature DataFrame, indexed by bacdive_id y_matrix: (n_strains × n_media) {0,1} DataFrame, columns are medium_ids """ # Strains with both genome features and at least one positive medium link strain_ids = sorted(set(features["bacdive_id"]).intersection(set(strain_media["bacdive_id"]))) if not strain_ids: raise ValueError("No overlap between feature table and strain_media links.") X = features[features["bacdive_id"].isin(strain_ids)].set_index("bacdive_id").sort_index() feature_cols = [c for c in X.columns if c not in {"genome_accession"}] X = X[feature_cols] # Build sparse positive-link table → wide y matrix sm = strain_media[strain_media["bacdive_id"].isin(strain_ids)] sm = sm[sm["growth"] == "yes"] counts = sm.groupby("medium_id").size() keep_media = counts[counts >= MIN_STRAINS_PER_MEDIUM].index.tolist() sm = sm[sm["medium_id"].isin(keep_media)] y_matrix = ( sm.assign(_one=1) .pivot_table(index="bacdive_id", columns="medium_id", values="_one", fill_value=0) .reindex(index=X.index, columns=keep_media, fill_value=0) .astype(np.uint8) ) return X, y_matrix, keep_media def train_per_medium( X: pd.DataFrame, y_matrix: pd.DataFrame, medium_metadata: dict[str, str], groups: pd.Series, *, n_splits: int = 5, n_estimators: int = 200, max_depth: int = 5, ) -> dict[str, MediumModelResult]: """Train one classifier per medium with GroupKFold by `groups` (e.g. taxonomic family).""" results: dict[str, MediumModelResult] = {} splits = min(n_splits, max(2, groups.nunique())) kfold = GroupKFold(n_splits=splits) for medium_id in y_matrix.columns: y = y_matrix[medium_id].to_numpy() n_pos, n_neg = int(y.sum()), int((y == 0).sum()) result = MediumModelResult( medium_id=str(medium_id), medium_name=medium_metadata.get(str(medium_id), ""), n_positives=n_pos, n_negatives=n_neg, ) # Need both classes in train/test for fold_idx, (tr_idx, te_idx) in enumerate(kfold.split(X, y, groups)): y_tr = y[tr_idx] y_te = y[te_idx] if y_tr.sum() < 5 or y_te.sum() < 1: continue scale_pos_weight = (y_tr == 0).sum() / max(1, y_tr.sum()) model = xgb.XGBClassifier( n_estimators=n_estimators, max_depth=max_depth, learning_rate=0.05, tree_method="hist", n_jobs=-1, scale_pos_weight=scale_pos_weight, eval_metric="logloss", ) model.fit(X.iloc[tr_idx], y_tr) proba = model.predict_proba(X.iloc[te_idx])[:, 1] try: roc = roc_auc_score(y_te, proba) pr = average_precision_score(y_te, proba) except ValueError: continue result.fold_metrics.append({ "fold": fold_idx, "n_train": int(len(tr_idx)), "n_test": int(len(te_idx)), "n_test_positives": int(y_te.sum()), "roc_auc": float(roc), "pr_auc": float(pr), }) results[str(medium_id)] = result return results def train_production_models( X: pd.DataFrame, y_matrix: pd.DataFrame, *, n_estimators: int = 300, max_depth: int = 5, ) -> dict[str, xgb.XGBClassifier]: """Fit one classifier per medium on ALL data (no CV split). Used at inference. Returns {medium_id: trained_model}. Caller is responsible for persistence — see scripts/10_train_media_recommender.py for the disk layout. """ models: dict[str, xgb.XGBClassifier] = {} for medium_id in y_matrix.columns: y = y_matrix[medium_id].to_numpy() if y.sum() < 10 or (y == 0).sum() < 10: continue scale_pos_weight = (y == 0).sum() / max(1, y.sum()) model = xgb.XGBClassifier( n_estimators=n_estimators, max_depth=max_depth, learning_rate=0.05, tree_method="hist", n_jobs=-1, scale_pos_weight=scale_pos_weight, eval_metric="logloss", ) model.fit(X, y) models[str(medium_id)] = model return models def save_models( models: dict[str, xgb.XGBClassifier], feature_cols: list[str], out_dir: Path, ) -> None: """Save each XGBoost model + feature column order for inference.""" out_dir.mkdir(parents=True, exist_ok=True) for medium_id, model in models.items(): # Sanitize medium_id for filename safety safe_id = "".join(c if c.isalnum() else "_" for c in medium_id) model.save_model(out_dir / f"medium_{safe_id}.ubj") (out_dir / "feature_cols.json").write_text(json.dumps(feature_cols)) def load_models(out_dir: Path) -> tuple[dict[str, xgb.XGBClassifier], list[str]]: """Load all saved per-medium models + the feature-column order.""" feature_cols = json.loads((out_dir / "feature_cols.json").read_text()) models: dict[str, xgb.XGBClassifier] = {} for path in out_dir.glob("medium_*.ubj"): medium_id = path.stem.removeprefix("medium_") model = xgb.XGBClassifier() model.load_model(path) models[medium_id] = model return models, feature_cols def save_results(results: dict[str, MediumModelResult], path: Path) -> None: payload = { mid: { "medium_name": r.medium_name, "n_positives": r.n_positives, "n_negatives": r.n_negatives, "mean_pr_auc": r.mean_pr_auc(), "mean_roc_auc": r.mean_roc_auc(), "folds": r.fold_metrics, } for mid, r in results.items() } path.write_text(json.dumps(payload, indent=2))