import numpy as np import torch from huggingface_hub import hf_hub_download from joblib import load from transformers import PreTrainedModel from transformers.utils import ModelOutput import os from dataclasses import dataclass from typing import Any, Optional from .configuration_dom_ml import DomMLConfig ELEMENTS = ("C", "H", "O", "N", "S") FORMULA_REGRESSOR_NAMES = {"DecisionTree", "RandomForest"} @dataclass class DomMLOutput(ModelOutput): predictions: Any = None formula_counts: Any = None formulas: Any = None distances: Optional[Any] = None indices: Optional[Any] = None class DomMLModel(PreTrainedModel): config_class = DomMLConfig base_model_prefix = "dom_ml" main_input_name = "features" def __init__(self, config, estimator=None, estimators=None): super().__init__(config) self.estimators = estimators or ([estimator] if estimator is not None else []) self.estimator = self.estimators[0] if self.estimators else None @staticmethod def _model_name_to_file(model_name): if not model_name: return None if model_name in FORMULA_REGRESSOR_NAMES: return f"{model_name}.joblib" if model_name.endswith(".joblib"): return model_name if model_name.startswith("knn_model_"): return f"{model_name}.joblib" if model_name.startswith("Model-"): return f"knn_model_{model_name}.joblib" return f"knn_model_Model-{model_name}.joblib" @classmethod def _model_name_to_files(cls, model_name): if not model_name: return None if model_name.startswith("L1-L3_") and model_name.endswith("_Ensemble"): base_name = model_name[: -len("_Ensemble")] return [ cls._model_name_to_file(f"{base_name}_7T"), cls._model_name_to_file(f"{base_name}_21T"), ] if model_name.startswith("Synthetic_") and model_name.endswith("_Ensemble"): base_name = model_name[: -len("_Ensemble")] return [ cls._model_name_to_file(f"{base_name}_7T"), cls._model_name_to_file(f"{base_name}_21T"), cls._model_name_to_file(f"{base_name}_SYN"), ] return [cls._model_name_to_file(model_name)] @staticmethod def _infer_model_kind(model_name, model_files): if model_name in FORMULA_REGRESSOR_NAMES: return "formula_regressor" if model_files and all( os.path.basename(model_file) in {"DecisionTree.joblib", "RandomForest.joblib"} for model_file in model_files ): return "formula_regressor" return "knn" @classmethod def from_pretrained(cls, pretrained_model_name_or_path, *model_args, **kwargs): config = kwargs.pop("config", None) model_name = kwargs.pop("model_name", None) model_file = kwargs.pop("model_file", None) cache_dir = kwargs.pop("cache_dir", None) force_download = kwargs.pop("force_download", False) local_files_only = kwargs.pop("local_files_only", False) token = kwargs.pop("token", None) revision = kwargs.pop("revision", None) subfolder = kwargs.pop("subfolder", "") kwargs.pop("trust_remote_code", None) kwargs.pop("code_revision", None) kwargs.pop("_commit_hash", None) if config is None: config = DomMLConfig.from_pretrained( pretrained_model_name_or_path, cache_dir=cache_dir, force_download=force_download, local_files_only=local_files_only, token=token, revision=revision, subfolder=subfolder, ) model_files = None if model_file is not None: model_files = [model_file] else: model_name = model_name or getattr(config, "model_name", None) model_files = cls._model_name_to_files(model_name) if model_files is None: configured_model_file = getattr(config, "model_file", None) if configured_model_file is not None: model_files = [configured_model_file] if model_files is None: raise ValueError("Pass model_name=... to select one of the available models.") config.model_name = model_name config.model_file = model_files[0] config.model_files = model_files config.model_kind = cls._infer_model_kind(model_name, model_files) if config.model_kind == "formula_regressor": config.feature_names = ["mz", "inv_k0", "ccs"] model_paths = [] for current_model_file in model_files: if os.path.isdir(pretrained_model_name_or_path): model_path = os.path.join( pretrained_model_name_or_path, subfolder, current_model_file, ) else: model_path = hf_hub_download( repo_id=pretrained_model_name_or_path, filename=current_model_file, cache_dir=cache_dir, force_download=force_download, local_files_only=local_files_only, token=token, revision=revision, subfolder=subfolder, ) model_paths.append(model_path) estimators = [load(model_path) for model_path in model_paths] model = cls(config, estimators=estimators) model.eval() return model def _to_numpy(self, features): if isinstance(features, torch.Tensor): values = features.detach().cpu().numpy() elif hasattr(features, "loc") and hasattr(features, "columns"): feature_names = getattr(self.config, "feature_names", None) if feature_names and all(name in features.columns for name in feature_names): values = features.loc[:, feature_names].to_numpy(dtype=np.float64) else: values = features.to_numpy(dtype=np.float64) elif hasattr(features, "to_numpy"): values = features.to_numpy(dtype=np.float64) else: values = np.asarray(features, dtype=np.float64) if getattr(self.config, "model_kind", None) == "formula_regressor": if values.ndim == 1: if values.size != 3: raise ValueError( "DecisionTree and RandomForest inputs must use [mz, inv_k0, ccs]." ) return values.reshape(1, 3) return values if values.ndim == 0: return values.reshape(1, 1) if values.ndim == 1: return values.reshape(-1, 1) return values @staticmethod def _maybe_tensor(array, as_numpy, dtype=None): if as_numpy or array is None: return array return torch.as_tensor(array, dtype=dtype) @staticmethod def _counts_to_formulas(counts): formulas = [] for row in np.asarray(counts, dtype=int): formula = "" for element, count in zip(ELEMENTS, row): if count > 0: formula += element if count != 1: formula += str(int(count)) formulas.append(formula) return np.asarray(formulas) def predict_counts(self, features, as_numpy=True): if not self.estimators: raise ValueError("No model is loaded.") if getattr(self.config, "model_kind", None) != "formula_regressor": raise ValueError("predict_counts is only available for DecisionTree and RandomForest.") values = self._to_numpy(features) raw_counts = self.estimators[0].predict(values) counts = np.rint(raw_counts).astype(int) counts = np.clip(counts, 0, None) if as_numpy: return counts return torch.as_tensor(counts, dtype=torch.long) def predict(self, features): if not self.estimators: raise ValueError("No model is loaded.") if getattr(self.config, "model_kind", None) == "formula_regressor": return self._counts_to_formulas(self.predict_counts(features)) values = self._to_numpy(features) predictions = [model.predict(values) for model in self.estimators] if len(predictions) == 1: return predictions[0] stacked = np.vstack(predictions).T voted_predictions = [] for row in stacked: counts = {} for prediction in row: counts[prediction] = counts.get(prediction, 0) + 1 voted_predictions.append( max(row, key=lambda prediction: counts[prediction]) ) return np.asarray(voted_predictions) def joblib_summary(self, max_items=5): if not self.estimators: raise ValueError("No model is loaded.") summaries = [] model_files = getattr(self.config, "model_files", None) or [self.config.model_file] for model_file, estimator in zip(model_files, self.estimators): summary = { "type": type(estimator).__name__, "model_file": model_file, "model_kind": getattr(self.config, "model_kind", None), "feature_names": getattr(self.config, "feature_names", None), "n_features_in": getattr(estimator, "n_features_in_", None), "n_samples_fit": getattr(estimator, "n_samples_fit_", None), } if getattr(self.config, "model_kind", None) == "formula_regressor": summary["output_elements"] = list(ELEMENTS) classes = getattr(estimator, "classes_", None) if classes is not None: summary["classes_preview"] = classes[:max_items].tolist() fit_x = getattr(estimator, "_fit_X", None) if fit_x is not None: summary["fit_masses_preview"] = np.asarray(fit_x[:max_items]).reshape(-1).tolist() summaries.append(summary) if len(summaries) == 1: return summaries[0] return { "type": "Ensemble", "model_name": self.config.model_name, "models": summaries, } def kneighbors(self, features, n_neighbors=None, as_numpy=False): if getattr(self.config, "model_kind", None) == "formula_regressor": raise ValueError("Nearest-neighbor lookup is only available for KNN models.") if not self.estimators: raise ValueError("No KNN model is loaded.") values = self._to_numpy(features) if len(self.estimators) > 1: model_files = getattr(self.config, "model_files", None) or [] distances = {} indices = {} for model_file, estimator in zip(model_files, self.estimators): model_distances, model_indices = estimator.kneighbors( values, n_neighbors=n_neighbors, ) distances[model_file] = self._maybe_tensor( model_distances, as_numpy, dtype=torch.float32, ) indices[model_file] = self._maybe_tensor( model_indices, as_numpy, dtype=torch.long, ) return distances, indices distances, indices = self.estimators[0].kneighbors( values, n_neighbors=n_neighbors, ) return ( self._maybe_tensor(distances, as_numpy, dtype=torch.float32), self._maybe_tensor(indices, as_numpy, dtype=torch.long), ) def neighbor_indices(self, features, n_neighbors=None, as_numpy=False): _, indices = self.kneighbors( features, n_neighbors=n_neighbors, as_numpy=as_numpy, ) return indices def forward( self, features=None, input_features=None, return_neighbors=False, n_neighbors=None, as_numpy=False, **kwargs, ): if features is None: features = input_features if features is None: raise ValueError("Pass model inputs with features=... or input_features=....") predictions = self.predict(features) formula_counts = None formulas = None distances = None indices = None if getattr(self.config, "model_kind", None) == "formula_regressor": formula_counts = self.predict_counts(features, as_numpy=as_numpy) formulas = predictions elif return_neighbors: distances, indices = self.kneighbors( features, n_neighbors=n_neighbors, as_numpy=as_numpy, ) return DomMLOutput( predictions=predictions, formula_counts=formula_counts, formulas=formulas, distances=distances, indices=indices, )