dom-formula-assignment-using-ml / modeling_dom_ml.py
bilalsm's picture
Upload folder using huggingface_hub
211dde8 verified
Raw
History Blame Contribute Delete
13.3 kB
import numpy as np
import torch
from huggingface_hub import hf_hub_download
from joblib import load
from transformers import PreTrainedModel
from transformers.utils import ModelOutput
import os
from dataclasses import dataclass
from typing import Any, Optional
from .configuration_dom_ml import DomMLConfig
ELEMENTS = ("C", "H", "O", "N", "S")
FORMULA_REGRESSOR_NAMES = {"DecisionTree", "RandomForest"}
@dataclass
class DomMLOutput(ModelOutput):
predictions: Any = None
formula_counts: Any = None
formulas: Any = None
distances: Optional[Any] = None
indices: Optional[Any] = None
class DomMLModel(PreTrainedModel):
config_class = DomMLConfig
base_model_prefix = "dom_ml"
main_input_name = "features"
def __init__(self, config, estimator=None, estimators=None):
super().__init__(config)
self.estimators = estimators or ([estimator] if estimator is not None else [])
self.estimator = self.estimators[0] if self.estimators else None
@staticmethod
def _model_name_to_file(model_name):
if not model_name:
return None
if model_name in FORMULA_REGRESSOR_NAMES:
return f"{model_name}.joblib"
if model_name.endswith(".joblib"):
return model_name
if model_name.startswith("knn_model_"):
return f"{model_name}.joblib"
if model_name.startswith("Model-"):
return f"knn_model_{model_name}.joblib"
return f"knn_model_Model-{model_name}.joblib"
@classmethod
def _model_name_to_files(cls, model_name):
if not model_name:
return None
if model_name.startswith("L1-L3_") and model_name.endswith("_Ensemble"):
base_name = model_name[: -len("_Ensemble")]
return [
cls._model_name_to_file(f"{base_name}_7T"),
cls._model_name_to_file(f"{base_name}_21T"),
]
if model_name.startswith("Synthetic_") and model_name.endswith("_Ensemble"):
base_name = model_name[: -len("_Ensemble")]
return [
cls._model_name_to_file(f"{base_name}_7T"),
cls._model_name_to_file(f"{base_name}_21T"),
cls._model_name_to_file(f"{base_name}_SYN"),
]
return [cls._model_name_to_file(model_name)]
@staticmethod
def _infer_model_kind(model_name, model_files):
if model_name in FORMULA_REGRESSOR_NAMES:
return "formula_regressor"
if model_files and all(
os.path.basename(model_file) in {"DecisionTree.joblib", "RandomForest.joblib"}
for model_file in model_files
):
return "formula_regressor"
return "knn"
@classmethod
def from_pretrained(cls, pretrained_model_name_or_path, *model_args, **kwargs):
config = kwargs.pop("config", None)
model_name = kwargs.pop("model_name", None)
model_file = kwargs.pop("model_file", None)
cache_dir = kwargs.pop("cache_dir", None)
force_download = kwargs.pop("force_download", False)
local_files_only = kwargs.pop("local_files_only", False)
token = kwargs.pop("token", None)
revision = kwargs.pop("revision", None)
subfolder = kwargs.pop("subfolder", "")
kwargs.pop("trust_remote_code", None)
kwargs.pop("code_revision", None)
kwargs.pop("_commit_hash", None)
if config is None:
config = DomMLConfig.from_pretrained(
pretrained_model_name_or_path,
cache_dir=cache_dir,
force_download=force_download,
local_files_only=local_files_only,
token=token,
revision=revision,
subfolder=subfolder,
)
model_files = None
if model_file is not None:
model_files = [model_file]
else:
model_name = model_name or getattr(config, "model_name", None)
model_files = cls._model_name_to_files(model_name)
if model_files is None:
configured_model_file = getattr(config, "model_file", None)
if configured_model_file is not None:
model_files = [configured_model_file]
if model_files is None:
raise ValueError("Pass model_name=... to select one of the available models.")
config.model_name = model_name
config.model_file = model_files[0]
config.model_files = model_files
config.model_kind = cls._infer_model_kind(model_name, model_files)
if config.model_kind == "formula_regressor":
config.feature_names = ["mz", "inv_k0", "ccs"]
model_paths = []
for current_model_file in model_files:
if os.path.isdir(pretrained_model_name_or_path):
model_path = os.path.join(
pretrained_model_name_or_path,
subfolder,
current_model_file,
)
else:
model_path = hf_hub_download(
repo_id=pretrained_model_name_or_path,
filename=current_model_file,
cache_dir=cache_dir,
force_download=force_download,
local_files_only=local_files_only,
token=token,
revision=revision,
subfolder=subfolder,
)
model_paths.append(model_path)
estimators = [load(model_path) for model_path in model_paths]
model = cls(config, estimators=estimators)
model.eval()
return model
def _to_numpy(self, features):
if isinstance(features, torch.Tensor):
values = features.detach().cpu().numpy()
elif hasattr(features, "loc") and hasattr(features, "columns"):
feature_names = getattr(self.config, "feature_names", None)
if feature_names and all(name in features.columns for name in feature_names):
values = features.loc[:, feature_names].to_numpy(dtype=np.float64)
else:
values = features.to_numpy(dtype=np.float64)
elif hasattr(features, "to_numpy"):
values = features.to_numpy(dtype=np.float64)
else:
values = np.asarray(features, dtype=np.float64)
if getattr(self.config, "model_kind", None) == "formula_regressor":
if values.ndim == 1:
if values.size != 3:
raise ValueError(
"DecisionTree and RandomForest inputs must use [mz, inv_k0, ccs]."
)
return values.reshape(1, 3)
return values
if values.ndim == 0:
return values.reshape(1, 1)
if values.ndim == 1:
return values.reshape(-1, 1)
return values
@staticmethod
def _maybe_tensor(array, as_numpy, dtype=None):
if as_numpy or array is None:
return array
return torch.as_tensor(array, dtype=dtype)
@staticmethod
def _counts_to_formulas(counts):
formulas = []
for row in np.asarray(counts, dtype=int):
formula = ""
for element, count in zip(ELEMENTS, row):
if count > 0:
formula += element
if count != 1:
formula += str(int(count))
formulas.append(formula)
return np.asarray(formulas)
def predict_counts(self, features, as_numpy=True):
if not self.estimators:
raise ValueError("No model is loaded.")
if getattr(self.config, "model_kind", None) != "formula_regressor":
raise ValueError("predict_counts is only available for DecisionTree and RandomForest.")
values = self._to_numpy(features)
raw_counts = self.estimators[0].predict(values)
counts = np.rint(raw_counts).astype(int)
counts = np.clip(counts, 0, None)
if as_numpy:
return counts
return torch.as_tensor(counts, dtype=torch.long)
def predict(self, features):
if not self.estimators:
raise ValueError("No model is loaded.")
if getattr(self.config, "model_kind", None) == "formula_regressor":
return self._counts_to_formulas(self.predict_counts(features))
values = self._to_numpy(features)
predictions = [model.predict(values) for model in self.estimators]
if len(predictions) == 1:
return predictions[0]
stacked = np.vstack(predictions).T
voted_predictions = []
for row in stacked:
counts = {}
for prediction in row:
counts[prediction] = counts.get(prediction, 0) + 1
voted_predictions.append(
max(row, key=lambda prediction: counts[prediction])
)
return np.asarray(voted_predictions)
def joblib_summary(self, max_items=5):
if not self.estimators:
raise ValueError("No model is loaded.")
summaries = []
model_files = getattr(self.config, "model_files", None) or [self.config.model_file]
for model_file, estimator in zip(model_files, self.estimators):
summary = {
"type": type(estimator).__name__,
"model_file": model_file,
"model_kind": getattr(self.config, "model_kind", None),
"feature_names": getattr(self.config, "feature_names", None),
"n_features_in": getattr(estimator, "n_features_in_", None),
"n_samples_fit": getattr(estimator, "n_samples_fit_", None),
}
if getattr(self.config, "model_kind", None) == "formula_regressor":
summary["output_elements"] = list(ELEMENTS)
classes = getattr(estimator, "classes_", None)
if classes is not None:
summary["classes_preview"] = classes[:max_items].tolist()
fit_x = getattr(estimator, "_fit_X", None)
if fit_x is not None:
summary["fit_masses_preview"] = np.asarray(fit_x[:max_items]).reshape(-1).tolist()
summaries.append(summary)
if len(summaries) == 1:
return summaries[0]
return {
"type": "Ensemble",
"model_name": self.config.model_name,
"models": summaries,
}
def kneighbors(self, features, n_neighbors=None, as_numpy=False):
if getattr(self.config, "model_kind", None) == "formula_regressor":
raise ValueError("Nearest-neighbor lookup is only available for KNN models.")
if not self.estimators:
raise ValueError("No KNN model is loaded.")
values = self._to_numpy(features)
if len(self.estimators) > 1:
model_files = getattr(self.config, "model_files", None) or []
distances = {}
indices = {}
for model_file, estimator in zip(model_files, self.estimators):
model_distances, model_indices = estimator.kneighbors(
values,
n_neighbors=n_neighbors,
)
distances[model_file] = self._maybe_tensor(
model_distances,
as_numpy,
dtype=torch.float32,
)
indices[model_file] = self._maybe_tensor(
model_indices,
as_numpy,
dtype=torch.long,
)
return distances, indices
distances, indices = self.estimators[0].kneighbors(
values,
n_neighbors=n_neighbors,
)
return (
self._maybe_tensor(distances, as_numpy, dtype=torch.float32),
self._maybe_tensor(indices, as_numpy, dtype=torch.long),
)
def neighbor_indices(self, features, n_neighbors=None, as_numpy=False):
_, indices = self.kneighbors(
features,
n_neighbors=n_neighbors,
as_numpy=as_numpy,
)
return indices
def forward(
self,
features=None,
input_features=None,
return_neighbors=False,
n_neighbors=None,
as_numpy=False,
**kwargs,
):
if features is None:
features = input_features
if features is None:
raise ValueError("Pass model inputs with features=... or input_features=....")
predictions = self.predict(features)
formula_counts = None
formulas = None
distances = None
indices = None
if getattr(self.config, "model_kind", None) == "formula_regressor":
formula_counts = self.predict_counts(features, as_numpy=as_numpy)
formulas = predictions
elif return_neighbors:
distances, indices = self.kneighbors(
features,
n_neighbors=n_neighbors,
as_numpy=as_numpy,
)
return DomMLOutput(
predictions=predictions,
formula_counts=formula_counts,
formulas=formulas,
distances=distances,
indices=indices,
)