Scikit-learn
Joblib
dom_ml
mass-spectrometry
molecular-formula
dissolved-organic-matter
machine-learning
scikit-learn
custom_code
Instructions to use SaeedLab/dom-formula-assignment-using-ml with libraries, inference providers, notebooks, and local apps. Follow these links to get started.
- Libraries
- Scikit-learn
How to use SaeedLab/dom-formula-assignment-using-ml with Scikit-learn:
from huggingface_hub import hf_hub_download import joblib model = joblib.load( hf_hub_download("SaeedLab/dom-formula-assignment-using-ml", "sklearn_model.joblib") ) # only load pickle files from sources you trust # read more about it here https://skops.readthedocs.io/en/stable/persistence.html - Notebooks
- Google Colab
- Kaggle
| import numpy as np | |
| import torch | |
| from huggingface_hub import hf_hub_download | |
| from joblib import load | |
| from transformers import PreTrainedModel | |
| from transformers.utils import ModelOutput | |
| import os | |
| from dataclasses import dataclass | |
| from typing import Any, Optional | |
| from .configuration_dom_ml import DomMLConfig | |
| ELEMENTS = ("C", "H", "O", "N", "S") | |
| FORMULA_REGRESSOR_NAMES = {"DecisionTree", "RandomForest"} | |
| class DomMLOutput(ModelOutput): | |
| predictions: Any = None | |
| formula_counts: Any = None | |
| formulas: Any = None | |
| distances: Optional[Any] = None | |
| indices: Optional[Any] = None | |
| class DomMLModel(PreTrainedModel): | |
| config_class = DomMLConfig | |
| base_model_prefix = "dom_ml" | |
| main_input_name = "features" | |
| def __init__(self, config, estimator=None, estimators=None): | |
| super().__init__(config) | |
| self.estimators = estimators or ([estimator] if estimator is not None else []) | |
| self.estimator = self.estimators[0] if self.estimators else None | |
| def _model_name_to_file(model_name): | |
| if not model_name: | |
| return None | |
| if model_name in FORMULA_REGRESSOR_NAMES: | |
| return f"{model_name}.joblib" | |
| if model_name.endswith(".joblib"): | |
| return model_name | |
| if model_name.startswith("knn_model_"): | |
| return f"{model_name}.joblib" | |
| if model_name.startswith("Model-"): | |
| return f"knn_model_{model_name}.joblib" | |
| return f"knn_model_Model-{model_name}.joblib" | |
| def _model_name_to_files(cls, model_name): | |
| if not model_name: | |
| return None | |
| if model_name.startswith("L1-L3_") and model_name.endswith("_Ensemble"): | |
| base_name = model_name[: -len("_Ensemble")] | |
| return [ | |
| cls._model_name_to_file(f"{base_name}_7T"), | |
| cls._model_name_to_file(f"{base_name}_21T"), | |
| ] | |
| if model_name.startswith("Synthetic_") and model_name.endswith("_Ensemble"): | |
| base_name = model_name[: -len("_Ensemble")] | |
| return [ | |
| cls._model_name_to_file(f"{base_name}_7T"), | |
| cls._model_name_to_file(f"{base_name}_21T"), | |
| cls._model_name_to_file(f"{base_name}_SYN"), | |
| ] | |
| return [cls._model_name_to_file(model_name)] | |
| def _infer_model_kind(model_name, model_files): | |
| if model_name in FORMULA_REGRESSOR_NAMES: | |
| return "formula_regressor" | |
| if model_files and all( | |
| os.path.basename(model_file) in {"DecisionTree.joblib", "RandomForest.joblib"} | |
| for model_file in model_files | |
| ): | |
| return "formula_regressor" | |
| return "knn" | |
| def from_pretrained(cls, pretrained_model_name_or_path, *model_args, **kwargs): | |
| config = kwargs.pop("config", None) | |
| model_name = kwargs.pop("model_name", None) | |
| model_file = kwargs.pop("model_file", None) | |
| cache_dir = kwargs.pop("cache_dir", None) | |
| force_download = kwargs.pop("force_download", False) | |
| local_files_only = kwargs.pop("local_files_only", False) | |
| token = kwargs.pop("token", None) | |
| revision = kwargs.pop("revision", None) | |
| subfolder = kwargs.pop("subfolder", "") | |
| kwargs.pop("trust_remote_code", None) | |
| kwargs.pop("code_revision", None) | |
| kwargs.pop("_commit_hash", None) | |
| if config is None: | |
| config = DomMLConfig.from_pretrained( | |
| pretrained_model_name_or_path, | |
| cache_dir=cache_dir, | |
| force_download=force_download, | |
| local_files_only=local_files_only, | |
| token=token, | |
| revision=revision, | |
| subfolder=subfolder, | |
| ) | |
| model_files = None | |
| if model_file is not None: | |
| model_files = [model_file] | |
| else: | |
| model_name = model_name or getattr(config, "model_name", None) | |
| model_files = cls._model_name_to_files(model_name) | |
| if model_files is None: | |
| configured_model_file = getattr(config, "model_file", None) | |
| if configured_model_file is not None: | |
| model_files = [configured_model_file] | |
| if model_files is None: | |
| raise ValueError("Pass model_name=... to select one of the available models.") | |
| config.model_name = model_name | |
| config.model_file = model_files[0] | |
| config.model_files = model_files | |
| config.model_kind = cls._infer_model_kind(model_name, model_files) | |
| if config.model_kind == "formula_regressor": | |
| config.feature_names = ["mz", "inv_k0", "ccs"] | |
| model_paths = [] | |
| for current_model_file in model_files: | |
| if os.path.isdir(pretrained_model_name_or_path): | |
| model_path = os.path.join( | |
| pretrained_model_name_or_path, | |
| subfolder, | |
| current_model_file, | |
| ) | |
| else: | |
| model_path = hf_hub_download( | |
| repo_id=pretrained_model_name_or_path, | |
| filename=current_model_file, | |
| cache_dir=cache_dir, | |
| force_download=force_download, | |
| local_files_only=local_files_only, | |
| token=token, | |
| revision=revision, | |
| subfolder=subfolder, | |
| ) | |
| model_paths.append(model_path) | |
| estimators = [load(model_path) for model_path in model_paths] | |
| model = cls(config, estimators=estimators) | |
| model.eval() | |
| return model | |
| def _to_numpy(self, features): | |
| if isinstance(features, torch.Tensor): | |
| values = features.detach().cpu().numpy() | |
| elif hasattr(features, "loc") and hasattr(features, "columns"): | |
| feature_names = getattr(self.config, "feature_names", None) | |
| if feature_names and all(name in features.columns for name in feature_names): | |
| values = features.loc[:, feature_names].to_numpy(dtype=np.float64) | |
| else: | |
| values = features.to_numpy(dtype=np.float64) | |
| elif hasattr(features, "to_numpy"): | |
| values = features.to_numpy(dtype=np.float64) | |
| else: | |
| values = np.asarray(features, dtype=np.float64) | |
| if getattr(self.config, "model_kind", None) == "formula_regressor": | |
| if values.ndim == 1: | |
| if values.size != 3: | |
| raise ValueError( | |
| "DecisionTree and RandomForest inputs must use [mz, inv_k0, ccs]." | |
| ) | |
| return values.reshape(1, 3) | |
| return values | |
| if values.ndim == 0: | |
| return values.reshape(1, 1) | |
| if values.ndim == 1: | |
| return values.reshape(-1, 1) | |
| return values | |
| def _maybe_tensor(array, as_numpy, dtype=None): | |
| if as_numpy or array is None: | |
| return array | |
| return torch.as_tensor(array, dtype=dtype) | |
| def _counts_to_formulas(counts): | |
| formulas = [] | |
| for row in np.asarray(counts, dtype=int): | |
| formula = "" | |
| for element, count in zip(ELEMENTS, row): | |
| if count > 0: | |
| formula += element | |
| if count != 1: | |
| formula += str(int(count)) | |
| formulas.append(formula) | |
| return np.asarray(formulas) | |
| def predict_counts(self, features, as_numpy=True): | |
| if not self.estimators: | |
| raise ValueError("No model is loaded.") | |
| if getattr(self.config, "model_kind", None) != "formula_regressor": | |
| raise ValueError("predict_counts is only available for DecisionTree and RandomForest.") | |
| values = self._to_numpy(features) | |
| raw_counts = self.estimators[0].predict(values) | |
| counts = np.rint(raw_counts).astype(int) | |
| counts = np.clip(counts, 0, None) | |
| if as_numpy: | |
| return counts | |
| return torch.as_tensor(counts, dtype=torch.long) | |
| def predict(self, features): | |
| if not self.estimators: | |
| raise ValueError("No model is loaded.") | |
| if getattr(self.config, "model_kind", None) == "formula_regressor": | |
| return self._counts_to_formulas(self.predict_counts(features)) | |
| values = self._to_numpy(features) | |
| predictions = [model.predict(values) for model in self.estimators] | |
| if len(predictions) == 1: | |
| return predictions[0] | |
| stacked = np.vstack(predictions).T | |
| voted_predictions = [] | |
| for row in stacked: | |
| counts = {} | |
| for prediction in row: | |
| counts[prediction] = counts.get(prediction, 0) + 1 | |
| voted_predictions.append( | |
| max(row, key=lambda prediction: counts[prediction]) | |
| ) | |
| return np.asarray(voted_predictions) | |
| def joblib_summary(self, max_items=5): | |
| if not self.estimators: | |
| raise ValueError("No model is loaded.") | |
| summaries = [] | |
| model_files = getattr(self.config, "model_files", None) or [self.config.model_file] | |
| for model_file, estimator in zip(model_files, self.estimators): | |
| summary = { | |
| "type": type(estimator).__name__, | |
| "model_file": model_file, | |
| "model_kind": getattr(self.config, "model_kind", None), | |
| "feature_names": getattr(self.config, "feature_names", None), | |
| "n_features_in": getattr(estimator, "n_features_in_", None), | |
| "n_samples_fit": getattr(estimator, "n_samples_fit_", None), | |
| } | |
| if getattr(self.config, "model_kind", None) == "formula_regressor": | |
| summary["output_elements"] = list(ELEMENTS) | |
| classes = getattr(estimator, "classes_", None) | |
| if classes is not None: | |
| summary["classes_preview"] = classes[:max_items].tolist() | |
| fit_x = getattr(estimator, "_fit_X", None) | |
| if fit_x is not None: | |
| summary["fit_masses_preview"] = np.asarray(fit_x[:max_items]).reshape(-1).tolist() | |
| summaries.append(summary) | |
| if len(summaries) == 1: | |
| return summaries[0] | |
| return { | |
| "type": "Ensemble", | |
| "model_name": self.config.model_name, | |
| "models": summaries, | |
| } | |
| def kneighbors(self, features, n_neighbors=None, as_numpy=False): | |
| if getattr(self.config, "model_kind", None) == "formula_regressor": | |
| raise ValueError("Nearest-neighbor lookup is only available for KNN models.") | |
| if not self.estimators: | |
| raise ValueError("No KNN model is loaded.") | |
| values = self._to_numpy(features) | |
| if len(self.estimators) > 1: | |
| model_files = getattr(self.config, "model_files", None) or [] | |
| distances = {} | |
| indices = {} | |
| for model_file, estimator in zip(model_files, self.estimators): | |
| model_distances, model_indices = estimator.kneighbors( | |
| values, | |
| n_neighbors=n_neighbors, | |
| ) | |
| distances[model_file] = self._maybe_tensor( | |
| model_distances, | |
| as_numpy, | |
| dtype=torch.float32, | |
| ) | |
| indices[model_file] = self._maybe_tensor( | |
| model_indices, | |
| as_numpy, | |
| dtype=torch.long, | |
| ) | |
| return distances, indices | |
| distances, indices = self.estimators[0].kneighbors( | |
| values, | |
| n_neighbors=n_neighbors, | |
| ) | |
| return ( | |
| self._maybe_tensor(distances, as_numpy, dtype=torch.float32), | |
| self._maybe_tensor(indices, as_numpy, dtype=torch.long), | |
| ) | |
| def neighbor_indices(self, features, n_neighbors=None, as_numpy=False): | |
| _, indices = self.kneighbors( | |
| features, | |
| n_neighbors=n_neighbors, | |
| as_numpy=as_numpy, | |
| ) | |
| return indices | |
| def forward( | |
| self, | |
| features=None, | |
| input_features=None, | |
| return_neighbors=False, | |
| n_neighbors=None, | |
| as_numpy=False, | |
| **kwargs, | |
| ): | |
| if features is None: | |
| features = input_features | |
| if features is None: | |
| raise ValueError("Pass model inputs with features=... or input_features=....") | |
| predictions = self.predict(features) | |
| formula_counts = None | |
| formulas = None | |
| distances = None | |
| indices = None | |
| if getattr(self.config, "model_kind", None) == "formula_regressor": | |
| formula_counts = self.predict_counts(features, as_numpy=as_numpy) | |
| formulas = predictions | |
| elif return_neighbors: | |
| distances, indices = self.kneighbors( | |
| features, | |
| n_neighbors=n_neighbors, | |
| as_numpy=as_numpy, | |
| ) | |
| return DomMLOutput( | |
| predictions=predictions, | |
| formula_counts=formula_counts, | |
| formulas=formulas, | |
| distances=distances, | |
| indices=indices, | |
| ) | |