Spaces:
Running
Running
| """Multi-task XGBoost baseline. | |
| One model per phenotype target, evaluated with group K-fold by taxonomic family to prevent | |
| leakage from closely-related strains. This is the v0 "what's the floor on tabular performance" | |
| sanity check before we invest in transformers. | |
| """ | |
| from __future__ import annotations | |
| import json | |
| from dataclasses import dataclass, field | |
| from pathlib import Path | |
| from typing import Any | |
| import numpy as np | |
| import pandas as pd | |
| import xgboost as xgb | |
| from sklearn.metrics import f1_score, mean_absolute_error | |
| from sklearn.model_selection import GroupKFold | |
| from sklearn.preprocessing import LabelEncoder | |
| from microbe_model import config | |
| class FoldResult: | |
| target: str | |
| task: str | |
| metric_name: str | |
| value: float | |
| n_train: int | |
| n_test: int | |
| class TargetResult: | |
| target: str | |
| task: str | |
| folds: list[FoldResult] = field(default_factory=list) | |
| importances: dict[str, float] = field(default_factory=dict) | |
| predictions: pd.DataFrame | None = None # one row per test-fold sample | |
| def mean(self) -> float: | |
| return float(np.mean([f.value for f in self.folds])) if self.folds else float("nan") | |
| def _select_xy(df: pd.DataFrame, target: str, feature_cols: list[str]) -> tuple[pd.DataFrame, pd.Series]: | |
| mask = df[target].notna() | |
| return df.loc[mask, feature_cols], df.loc[mask, target] | |
| def train_target( | |
| df: pd.DataFrame, | |
| target: str, | |
| task: str, | |
| feature_cols: list[str], | |
| group_col: str = "family", | |
| n_splits: int = 5, | |
| ) -> TargetResult: | |
| X, y = _select_xy(df, target, feature_cols) | |
| groups = df.loc[X.index, group_col].fillna("__unknown__") | |
| if len(X) < n_splits * 2: | |
| return TargetResult(target=target, task=task) | |
| if task == "classification": | |
| y_str = y.astype(str).to_numpy() | |
| else: | |
| y_arr = y.to_numpy(dtype=float) | |
| n_unique_groups = groups.nunique() | |
| splits = min(n_splits, max(2, n_unique_groups)) | |
| kfold = GroupKFold(n_splits=splits) | |
| result = TargetResult(target=target, task=task) | |
| importance_acc = np.zeros(len(feature_cols), dtype=float) | |
| fold_count = 0 | |
| pred_rows: list[dict[str, Any]] = [] | |
| split_iter = kfold.split(X, y_str if task == "classification" else y_arr, groups) | |
| for fold_idx, (tr_idx, te_idx) in enumerate(split_iter): | |
| if task == "classification": | |
| # Per-fold encoding: ensures contiguous 0..k-1 labels for xgboost. | |
| # Test samples whose class never appears in train are dropped from eval. | |
| fold_encoder = LabelEncoder() | |
| y_tr = fold_encoder.fit_transform(y_str[tr_idx]) | |
| if len(fold_encoder.classes_) < 2: | |
| continue | |
| known = set(fold_encoder.classes_) | |
| te_mask = np.array([c in known for c in y_str[te_idx]]) | |
| if te_mask.sum() == 0: | |
| continue | |
| y_te = fold_encoder.transform(y_str[te_idx][te_mask]) | |
| model = xgb.XGBClassifier( | |
| n_estimators=300, | |
| max_depth=5, | |
| learning_rate=0.05, | |
| tree_method="hist", | |
| n_jobs=-1, | |
| eval_metric="mlogloss", | |
| ) | |
| model.fit(X.iloc[tr_idx], y_tr) | |
| preds = model.predict(X.iloc[te_idx][te_mask]) | |
| score = f1_score(y_te, preds, average="macro") | |
| metric = "f1_macro" | |
| n_test = int(te_mask.sum()) | |
| test_indices = X.iloc[te_idx].index[te_mask] | |
| pred_labels = fold_encoder.inverse_transform(preds) | |
| obs_labels = y_str[te_idx][te_mask] | |
| for idx, p, o in zip(test_indices, pred_labels, obs_labels, strict=True): | |
| pred_rows.append({ | |
| "fold": fold_idx, "row_idx": int(idx), | |
| "predicted": str(p), "observed": str(o), | |
| }) | |
| else: | |
| model = xgb.XGBRegressor( | |
| n_estimators=500, | |
| max_depth=5, | |
| learning_rate=0.05, | |
| tree_method="hist", | |
| n_jobs=-1, | |
| ) | |
| model.fit(X.iloc[tr_idx], y_arr[tr_idx]) | |
| preds = model.predict(X.iloc[te_idx]) | |
| score = mean_absolute_error(y_arr[te_idx], preds) | |
| metric = "mae" | |
| n_test = int(len(te_idx)) | |
| test_indices = X.iloc[te_idx].index | |
| for idx, p, o in zip(test_indices, preds, y_arr[te_idx], strict=True): | |
| pred_rows.append({ | |
| "fold": fold_idx, "row_idx": int(idx), | |
| "predicted": float(p), "observed": float(o), | |
| }) | |
| result.folds.append(FoldResult( | |
| target=target, | |
| task=task, | |
| metric_name=metric, | |
| value=float(score), | |
| n_train=int(len(tr_idx)), | |
| n_test=n_test, | |
| )) | |
| importance_acc += model.feature_importances_ | |
| fold_count += 1 | |
| if fold_count: | |
| importance_acc /= fold_count | |
| result.importances = dict(zip(feature_cols, importance_acc.tolist(), strict=True)) | |
| if pred_rows: | |
| result.predictions = pd.DataFrame(pred_rows) | |
| return result | |
| def train_all( | |
| df: pd.DataFrame, | |
| feature_cols: list[str], | |
| *, | |
| group_col_override: str | None = None, | |
| ) -> dict[str, TargetResult]: | |
| results: dict[str, TargetResult] = {} | |
| group_col = group_col_override or "family" | |
| for target, task in config.PHENOTYPE_TARGETS.items(): | |
| if target not in df.columns: | |
| continue | |
| results[target] = train_target(df, target, task, feature_cols, group_col=group_col) | |
| return results | |
| def save_results( | |
| results: dict[str, TargetResult], | |
| path: Path, | |
| *, | |
| predictions_path: Path | None = None, | |
| feature_cols: list[str] | None = None, | |
| ) -> None: | |
| payload: dict[str, Any] = { | |
| target: { | |
| "task": r.task, | |
| "mean_metric": r.mean(), | |
| "folds": [f.__dict__ for f in r.folds], | |
| "top_features": dict( | |
| sorted(r.importances.items(), key=lambda kv: kv[1], reverse=True)[:20] | |
| ), | |
| } | |
| for target, r in results.items() | |
| } | |
| if feature_cols is not None: | |
| payload["__meta__"] = {"feature_cols": list(feature_cols)} | |
| path.write_text(json.dumps(payload, indent=2)) | |
| if predictions_path is not None: | |
| frames = [] | |
| for target, r in results.items(): | |
| if r.predictions is None or r.predictions.empty: | |
| continue | |
| df = r.predictions.copy() | |
| # Cast to str for parquet compatibility — predicted/observed can be float | |
| # (regression) or class label (classification). Eval re-casts numerics | |
| # via pd.to_numeric where needed. | |
| df["predicted"] = df["predicted"].astype(str) | |
| df["observed"] = df["observed"].astype(str) | |
| df["target"] = target | |
| df["task"] = r.task | |
| frames.append(df) | |
| if frames: | |
| pd.concat(frames, ignore_index=True).to_parquet(predictions_path, index=False) | |