microbe-model / src /microbe_model /train /media_recommender.py
Miyu Horiuchi
Phase E #2: scripts/recommend.py — single-genome → ranked media + phenotype CLI
31110fe
"""Train per-medium binary classifiers to recommend cultivation media for a genome.
Setup:
- Filter to media used by >= MIN_STRAINS_PER_MEDIUM strains (default 100).
- For each such medium m, build a binary label: y_i = 1 if strain i has a
growth=yes link to m, else 0.
- Train one XGBoost classifier per medium with GroupKFold by family.
- At inference, output a (n_strains × n_media) probability matrix.
The deliverable: given a new (possibly uncultured) genome, output the top-K media
ranked by predicted probability. This is the "what should I try first?" output
microbiologists actually want.
Limitations:
- All BacDive `culture medium` entries are growth=yes — we have positive
examples but no explicit negatives. We construct negatives from strains that
have *some* media link but not this one. This may bias toward media that are
just under-recorded.
- No concentration prediction yet — only recipe selection. v1 will add a
secondary regression head that adjusts compound concentrations.
"""
from __future__ import annotations
import json
from dataclasses import dataclass, field
from pathlib import Path
import numpy as np
import pandas as pd
import xgboost as xgb
from sklearn.metrics import average_precision_score, roc_auc_score
from sklearn.model_selection import GroupKFold
MIN_STRAINS_PER_MEDIUM = 100
@dataclass
class MediumModelResult:
medium_id: str
medium_name: str
n_positives: int
n_negatives: int
fold_metrics: list[dict] = field(default_factory=list)
def mean_pr_auc(self) -> float:
if not self.fold_metrics:
return float("nan")
return float(np.mean([m["pr_auc"] for m in self.fold_metrics]))
def mean_roc_auc(self) -> float:
if not self.fold_metrics:
return float("nan")
return float(np.mean([m["roc_auc"] for m in self.fold_metrics]))
def build_training_table(
features: pd.DataFrame,
strain_media: pd.DataFrame,
bacdive: pd.DataFrame,
) -> tuple[pd.DataFrame, pd.DataFrame, list[str]]:
"""Return (X, y_matrix, medium_ids) for media meeting the strain-count threshold.
X: (n_strains × n_features) feature DataFrame, indexed by bacdive_id
y_matrix: (n_strains × n_media) {0,1} DataFrame, columns are medium_ids
"""
# Strains with both genome features and at least one positive medium link
strain_ids = sorted(set(features["bacdive_id"]).intersection(set(strain_media["bacdive_id"])))
if not strain_ids:
raise ValueError("No overlap between feature table and strain_media links.")
X = features[features["bacdive_id"].isin(strain_ids)].set_index("bacdive_id").sort_index()
feature_cols = [c for c in X.columns if c not in {"genome_accession"}]
X = X[feature_cols]
# Build sparse positive-link table → wide y matrix
sm = strain_media[strain_media["bacdive_id"].isin(strain_ids)]
sm = sm[sm["growth"] == "yes"]
counts = sm.groupby("medium_id").size()
keep_media = counts[counts >= MIN_STRAINS_PER_MEDIUM].index.tolist()
sm = sm[sm["medium_id"].isin(keep_media)]
y_matrix = (
sm.assign(_one=1)
.pivot_table(index="bacdive_id", columns="medium_id", values="_one", fill_value=0)
.reindex(index=X.index, columns=keep_media, fill_value=0)
.astype(np.uint8)
)
return X, y_matrix, keep_media
def train_per_medium(
X: pd.DataFrame,
y_matrix: pd.DataFrame,
medium_metadata: dict[str, str],
groups: pd.Series,
*,
n_splits: int = 5,
n_estimators: int = 200,
max_depth: int = 5,
) -> dict[str, MediumModelResult]:
"""Train one classifier per medium with GroupKFold by `groups` (e.g. taxonomic family)."""
results: dict[str, MediumModelResult] = {}
splits = min(n_splits, max(2, groups.nunique()))
kfold = GroupKFold(n_splits=splits)
for medium_id in y_matrix.columns:
y = y_matrix[medium_id].to_numpy()
n_pos, n_neg = int(y.sum()), int((y == 0).sum())
result = MediumModelResult(
medium_id=str(medium_id),
medium_name=medium_metadata.get(str(medium_id), ""),
n_positives=n_pos,
n_negatives=n_neg,
)
# Need both classes in train/test
for fold_idx, (tr_idx, te_idx) in enumerate(kfold.split(X, y, groups)):
y_tr = y[tr_idx]
y_te = y[te_idx]
if y_tr.sum() < 5 or y_te.sum() < 1:
continue
scale_pos_weight = (y_tr == 0).sum() / max(1, y_tr.sum())
model = xgb.XGBClassifier(
n_estimators=n_estimators,
max_depth=max_depth,
learning_rate=0.05,
tree_method="hist",
n_jobs=-1,
scale_pos_weight=scale_pos_weight,
eval_metric="logloss",
)
model.fit(X.iloc[tr_idx], y_tr)
proba = model.predict_proba(X.iloc[te_idx])[:, 1]
try:
roc = roc_auc_score(y_te, proba)
pr = average_precision_score(y_te, proba)
except ValueError:
continue
result.fold_metrics.append({
"fold": fold_idx,
"n_train": int(len(tr_idx)),
"n_test": int(len(te_idx)),
"n_test_positives": int(y_te.sum()),
"roc_auc": float(roc),
"pr_auc": float(pr),
})
results[str(medium_id)] = result
return results
def train_production_models(
X: pd.DataFrame,
y_matrix: pd.DataFrame,
*,
n_estimators: int = 300,
max_depth: int = 5,
) -> dict[str, xgb.XGBClassifier]:
"""Fit one classifier per medium on ALL data (no CV split). Used at inference.
Returns {medium_id: trained_model}. Caller is responsible for persistence —
see scripts/10_train_media_recommender.py for the disk layout.
"""
models: dict[str, xgb.XGBClassifier] = {}
for medium_id in y_matrix.columns:
y = y_matrix[medium_id].to_numpy()
if y.sum() < 10 or (y == 0).sum() < 10:
continue
scale_pos_weight = (y == 0).sum() / max(1, y.sum())
model = xgb.XGBClassifier(
n_estimators=n_estimators,
max_depth=max_depth,
learning_rate=0.05,
tree_method="hist",
n_jobs=-1,
scale_pos_weight=scale_pos_weight,
eval_metric="logloss",
)
model.fit(X, y)
models[str(medium_id)] = model
return models
def save_models(
models: dict[str, xgb.XGBClassifier],
feature_cols: list[str],
out_dir: Path,
) -> None:
"""Save each XGBoost model + feature column order for inference."""
out_dir.mkdir(parents=True, exist_ok=True)
for medium_id, model in models.items():
# Sanitize medium_id for filename safety
safe_id = "".join(c if c.isalnum() else "_" for c in medium_id)
model.save_model(out_dir / f"medium_{safe_id}.ubj")
(out_dir / "feature_cols.json").write_text(json.dumps(feature_cols))
def load_models(out_dir: Path) -> tuple[dict[str, xgb.XGBClassifier], list[str]]:
"""Load all saved per-medium models + the feature-column order."""
feature_cols = json.loads((out_dir / "feature_cols.json").read_text())
models: dict[str, xgb.XGBClassifier] = {}
for path in out_dir.glob("medium_*.ubj"):
medium_id = path.stem.removeprefix("medium_")
model = xgb.XGBClassifier()
model.load_model(path)
models[medium_id] = model
return models, feature_cols
def save_results(results: dict[str, MediumModelResult], path: Path) -> None:
payload = {
mid: {
"medium_name": r.medium_name,
"n_positives": r.n_positives,
"n_negatives": r.n_negatives,
"mean_pr_auc": r.mean_pr_auc(),
"mean_roc_auc": r.mean_roc_auc(),
"folds": r.fold_metrics,
}
for mid, r in results.items()
}
path.write_text(json.dumps(payload, indent=2))